In [50]:
"""
Micah Holness
10/13/2024
CSC 8850
"""
import numpy as np
import torch
import scipy.io.wavfile as sw
import scipy.signal as ss
from glob import glob
import matplotlib.pyplot as plt
import pandas as pd
import os

In [None]:
# import .wav data & labels
img_root = "/Users/mholness1/Desktop/CSC 8850 Project/techcabal-ewe-audio-translation-challenge20240903-4068-o1ckqz/TechCabal Ewe Audio Files-20241014T013956Z-002/TechCabal Ewe Audio Files/"
csv_data = pd.read_csv('./Train.csv', sep=',')
print(csv_data.keys())
images = csv_data['audio_filepath']
labels = csv_data['class']

unique_labels = np.unique(labels)
print(unique_labels)
labels_class = np.arange(0,len(np.unique(labels)))
print(labels_class)

# create class to load data into DataLoader (and preprocess)
freq_spectrum = []
freq_labels = []
for widx, wv in enumerate(images):
    fpath = os.path.join(img_root, wv)
    if os.path.isfile(fpath) == 1:
        fs, data = sw.read(fpath)
        class_tmp = labels[widx]
        cidx = np.where(class_tmp == unique_labels)[0]
        freq_labels.append(cidx)
        # convert time domain to frequency domain
        if len(data.shape) > 1:
            data = data[:,0]
        # transforming the 1-D time-series into a frequency spectrum
        fft = np.fft.fft(data)
        fft_centered = np.fft.fftshift(fft)
        fft_magn = np.log10(np.abs(fft_centered)**2)
        # print(fft_magn.shape)
        fft_magn_dwn = ss.resample(fft_magn, 51744, axis=0).astype(np.float32)
        freq_spectrum.append(fft_magn_dwn.T)

freq_spectrum_arr = np.stack(freq_spectrum, axis=0)
freq_labels_arr = np.array(freq_labels)

imgs_length = [freq.shape[0] for freq in freq_spectrum_arr]
print(np.min(imgs_length))      # 51744

np.save('training_data.npy', freq_spectrum_arr)
np.save('training_labels.npy', freq_labels_arr)

In [None]:
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import scipy.stats as stat

# load the training data arrays
training_data = np.load('./training_data.npy', allow_pickle=True)
training_labels = np.load('./training_labels.npy', allow_pickle=True)

print(training_data.shape)      # 799, 51744

# visualize data (means)
fig = plt.figure()
plt.scatter(np.arange(0,training_data.shape[0]), np.mean(training_data, axis=1))

# Perform the Shapiro-Wilk test ---> test for normality
statistic, p_value = stat.shapiro(np.mean(training_data, axis=1))
print("Shapiro-Wilk Statistic:", statistic)
print("P-value:", p_value)

# note: according to Shapiro-Wilk test, data is significantly skewed
# thus, when data is non-normal/non-Gaussian, normalization is preferred [for unknown distribution]
# normalize data range = 0 -> 1
training_data = (training_data - np.min(training_data)) / (np.max(training_data) - np.min(training_data))
print(np.min(training_data), np.max(training_data))

# one-hot encode labels
training_labels_tmp = torch.tensor(training_labels.flatten())
training_labels = nn.functional.one_hot(training_labels_tmp)

class MyDataset(Dataset):
    def __init__(self):
        self.imgs = training_data
        self.labels = training_labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = self.imgs[idx]
        label = self.labels[idx]
        return image, label

dataset = MyDataset()
dataloader_train = DataLoader(dataset,
                              batch_size=64,
                              shuffle=True,
                              num_workers=0,
                              pin_memory=False)

print(dataloader_train)

In [None]:
import torch.nn as nn
import torch.optim as optim

# Define your CNN model
class MyCNN(nn.Module):
    def __init__(self, n_classes):
        super(MyCNN, self).__init__()

        self.conv1 = nn.Sequential(
            nn.Conv1d(in_channels=1, out_channels=32, kernel_size=3),
            nn.Conv1d(in_channels=32, out_channels=75, kernel_size=3),
            nn.MaxPool1d(kernel_size = 2, stride = 2)
        )

        self.conv2 = nn.Sequential(
            nn.Conv1d(in_channels=75, out_channels=100, kernel_size=3),
            nn.Conv1d(in_channels=100, out_channels=75, kernel_size=3),
            nn.Conv1d(in_channels=75, out_channels=32, kernel_size=3),
            nn.Conv1d(in_channels=32, out_channels=1, kernel_size=3),
            nn.MaxPool1d(kernel_size = 2, stride = 2),
            nn.AdaptiveAvgPool1d(100)
        )

        self.fc_layers = nn.Sequential(
            nn.Linear(100, 225),
            nn.ReLU(),
            nn.Linear(225, 475),
            nn.ReLU(),
            nn.Linear(475, 210),
            nn.ReLU(),
            nn.Linear(210, 100),
            nn.ReLU(),
            nn.Linear(100, 25),
            nn.ReLU(),
            nn.Linear(25, n_classes)
        )     

    def forward(self, x):
        x = torch.unsqueeze(x, axis=0)
        x = torch.permute(x, (1,0,2))
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.fc_layers(x)
        return x
    
# set weight initialization
def init_weights(model):
    """Set Conv weights to be He initialization (Kaiming uniform distr) for all Convs
        bound = gain x sqrt(3 / fan_mode), 
        where final tensor has bounds (-bound, +bound) from a uniform distri.
        gain (multiplicative factor adjusting weights prior to feeding into neurons), that
        has influence on the weight magnitudes, by preserving weight magnitudes in backwards pass (fan_out: n = number of inputs to node)
        **** He = recommended for use with ReLU ****
    
        Linear layer weights are truncated normal distribution
        mean = 0, std = 1, with all values within bounds a <= u <= b
    """
    for m in model.modules():
        if isinstance(m, (nn.Conv1d)):
            nn.init.kaiming_uniform_(m.weight, mode='fan_out', nonlinearity='relu') 
            nn.init.constant_(m.bias, 0.0)
        if isinstance(m, (nn.Linear)):
            nn.init.trunc_normal_(m.weight, mean=0.0, std=1.0, a=-2.0, b=2.0, generator=None)
    
model = MyCNN(n_classes=7)
print(model.eval())

In [None]:
for didx, data in enumerate(dataloader_train):
    print(type(data))
    img, label = data
    print(img, label)

In [None]:
# Instantiate model, loss, and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-08)

# Training loop
epochs = 100
for epoch in range(epochs):
    for i, data in enumerate(dataloader_train):
        inputs, labels = data

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        outputs = torch.squeeze(outputs)
        outputs = outputs.float()
        labels = labels.float()
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # assess accuracy
        softmax_func = nn.Softmax(dim=1)
        softmax = softmax_func(outputs)
        acc_vector = torch.stack([(torch.argmax(softmax[iidx])==torch.argmax(labels[iidx])) for iidx in np.arange(0,labels.shape[0])], dim=0).flatten()
        accuracy = torch.sum(acc_vector) / labels.shape[0]

        # Print progress
        print(f"Epoch [{epoch+1}/{epochs}], Step [{i+1}/{len(dataloader_train)}], \
              Loss: {loss.item():.4f}, Accuracy: {accuracy}")
