In [1]:
# ROBERT HEETER
# ELEC 378 Machine Learning
# 14 April 2023

# PROJECT


In [2]:
import numpy as np
import os
import pandas as pd
import matplotlib.pyplot as plt

import math
import random
import string

import torch
from torch.utils.data import DataLoader, Dataset, TensorDataset, random_split
# from torch.utils.data import DataLoader, Dataset, random_split
import torch.nn.functional as F
import torch.nn as nn
import torchaudio
from torchaudio import transforms

import torchvision

import jovian

from IPython.display import Audio


<IPython.core.display.Javascript object>

## FUNCTIONS TO CONVERT AUDIO FILE TO SPECTROGRAM
* from: https://towardsdatascience.com/audio-deep-learning-made-simple-sound-classification-step-by-step-cebc936bbe5

In [3]:
# load an audio file; return the signal as a tensor and the sample rate

class AudioUtil():
    def open(audio_file):
        sig, sr = torchaudio.load(audio_file)
        return (sig, sr)

    def rechannel(aud, new_channel):
        sig, sr = aud
        
        if (sig.shape[0] == new_channel):
            # nothing to do
            return aud

        if (new_channel == 1):
            # convert from stereo to mono by selecting only the first channel
            resig = sig[:1, :]
        else:
            # convert from mono to stereo by duplicating the first channel
            resig = torch.cat([sig, sig])

        return ((resig, sr))

    def resample(aud, newsr):
        sig, sr = aud

        if (sr == newsr):
            # nothing to do
            return aud

        num_channels = sig.shape[0]
        # resample first channel
        resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1, :])
        
        if (num_channels > 1):
            # resample the second channel and merge both channels
            retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:, :])
            resig = torch.cat([resig, retwo])

        return ((resig, newsr))

    def pad_trunc(aud, max_ms):
        sig, sr = aud
        num_rows, sig_len = sig.shape
        max_len = sr//1000 * max_ms

        if (sig_len > max_len):
            # truncate the signal to the given length
            sig = sig[:, :max_len]

        elif (sig_len < max_len):
            # length of padding to add at the beginning and end of the signal
            pad_begin_len = random.randint(0, max_len - sig_len)
            pad_end_len = max_len - sig_len - pad_begin_len

            # pad with 0s
            pad_begin = torch.zeros((num_rows, pad_begin_len))
            pad_end = torch.zeros((num_rows, pad_end_len))

            sig = torch.cat((pad_begin, sig, pad_end), 1)

        return (sig, sr)

    def time_shift(aud, shift_limit):
        sig, sr = aud
        _, sig_len = sig.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        
        return (sig.roll(shift_amt), sr)

    def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
        sig, sr = aud
        top_db = 80

        # spec has shape [channel, n_mels, time], where channel is mono, stereo, etc.
        spec = transforms.MelSpectrogram(
            sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)

        # convert to decibels
        spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
        
        return (spec)

    def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
        _, n_mels, n_steps = spec.shape
        mask_value = spec.mean()
        aug_spec = spec

        freq_mask_param = max_mask_pct * n_mels
        for _ in range(n_freq_masks):
            aug_spec = transforms.FrequencyMasking(
                freq_mask_param)(aug_spec, mask_value)

        time_mask_param = max_mask_pct * n_steps
        for _ in range(n_time_masks):
            aug_spec = transforms.TimeMasking(
                time_mask_param)(aug_spec, mask_value)

        return aug_spec
    

In [4]:
class SoundDS(Dataset):
    def __init__(self, df, data_path):
        self.df = df
        self.data_path = str(data_path)
        self.duration = 4000
        self.sr = 44100 # sampling rate
        self.channel = 2
        self.shift_pct = 0.4

    # number of items in dataset
    def __len__(self):
        return len(self.df)

    # get i'th item in dataset
    def __getitem__(self, idx):
        # absolute file path of the audio file; concatenate the audio directory with the relative path
        audio_file = self.data_path + self.df.loc[idx, 'relative_path']
        # get the Class ID
        class_id = self.df.loc[idx, 'classID']

        aud = AudioUtil.open(audio_file)
        
        # make all sounds have the same number of channels and same sample rate
        reaud = AudioUtil.resample(aud, self.sr)
        rechan = AudioUtil.rechannel(reaud, self.channel)

        dur_aud = AudioUtil.pad_trunc(rechan, self.duration)
        shift_aud = AudioUtil.time_shift(dur_aud, self.shift_pct)
        sgram = AudioUtil.spectro_gram(
            shift_aud, n_mels=64, n_fft=1024, hop_len=None)
        aug_sgram = AudioUtil.spectro_augment(
            sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)

        return aug_sgram, class_id


## IMPORT & PROCESS TRAINING AND TRIAL DATA

In [5]:
# get train dataset

train_data = np.empty((1125, 2), dtype=object)
count = 0

train_dir = os.path.join(os.getcwd(),'elec-378-sp2023-speech-emotion-classification/data/data')

for filename in os.listdir(train_dir):
    f = os.path.join(train_dir, filename)

    # check if it is a file
    if os.path.isfile(f):
        name = filename[:len(filename)-7]
        train_data[count][0] = "/"+filename
        train_data[count][1] = name
        count += 1

train_df = pd.DataFrame(train_data)
train_df.rename(columns={0: "relative_path", 1: "classID"}, inplace=True)

train_ds = SoundDS(train_df, train_dir)


In [None]:
# # FOR TESTING; DELETE FOR FINAL KAGGLE SUBMISSION
# # splits 85/15 train_ds into train_ds and trial_ds

# train_ds, trial_ds = random_split(train_ds, [round(0.85*len(train_ds)),round(0.15*len(train_ds))])

In [6]:
# get audio tensor and label from train dataset; stack channels into one array

X_train = []
y_train = []

for i in range(len(train_ds)):
    audio_file = train_ds[i]
    sample = audio_file[0]
    label = audio_file[1]
    
    channel_1 = sample[0,:,:]
    channel_2 = sample[1,:,:]
    
    audio_sample_combined = np.vstack((channel_1,channel_2))
    
    X_train.append(audio_sample_combined.flatten())
    y_train.append(label)
    
X_train = np.array(X_train)

# map each label description to a label number

description_to_number = {}
for label in set(y_train):
    description_to_number[label] = len(description_to_number)

y_train = np.array([description_to_number[label] for label in y_train])


In [7]:
# get trial dataset

trial_data = np.empty((315, 2), dtype=object) 
count = 0

trial_dir = os.path.join(os.getcwd(),'elec-378-sp2023-speech-emotion-classification/test/test')

for filename in os.listdir(trial_dir):
    f = os.path.join(trial_dir, filename)

    # check if it is a file
    if os.path.isfile(f):
        name = filename[:len(filename)-7]
        trial_data[count][0] = "/"+filename
        trial_data[count][1] = name
        count += 1

trial_df = pd.DataFrame(trial_data)
trial_df.rename(columns={0: "relative_path", 1: "classID"}, inplace=True)

trial_ds = SoundDS(trial_df, trial_dir)


In [8]:
# get audio tensor from trial dataset; stack channels into one array

X_trial = []

for i in range(len(trial_ds)):
    audio_file = trial_ds[i]
    sample = audio_file[0]
    
    channel_1 = sample[0,:,:]
    channel_2 = sample[1,:,:]
    
    audio_sample_combined = np.vstack((channel_1,channel_2))

    X_trial.append(audio_sample_combined.flatten())
    
X_trial = np.array(X_trial)

# # get audio tensor and label from train dataset; stack channels into one array

# X_trial = []
# y_trial = []

# for i in range(len(trial_ds)):
#     audio_file = trial_ds[i]
#     sample = audio_file[0]
#     label = audio_file[1]
    
#     channel_1 = sample[0,:,:]
#     channel_2 = sample[1,:,:]
    
#     audio_sample_combined = np.vstack((channel_1,channel_2))

#     X_trial.append(audio_sample_combined)
#     y_trial.append(label)
    
# X_trial = np.array(X_trial)

# # map each label description to a label number

# description_to_number = {}
# for label in set(y_trial):
#     description_to_number[label] = len(description_to_number)

# y_trial = np.array([description_to_number[label] for label in y_trial])


In [None]:
from sklearn.model_selection import train_test_split

X_train,X_test,Y_train,Y_test = train_test_split(X_train,y_train,test_size=0.2,random_state=42)

In [None]:
spectrogram = X_train[10]
label = y_train[10]
plt.imshow(spectrogram.reshape((128,-1)), cmap = 'gray')
print("Emotion #: ", label)


In [None]:
print(np.shape(X_train))
print(np.shape(Y_train))
a = np.where(Y_train==1)[0]
print(a)
for i in a:
    spectrogram = X_train[i,:]
#     label = y_train[10]
    plt.imshow(spectrogram.reshape((128,-1)), cmap = 'gray')
    plt.show()
# print("Emotion #: ", label)



In [10]:
from sklearn.linear_model import LogisticRegression

In [11]:
logreg =  LogisticRegression(max_iter = 1000)
logreg.fit(X_train,y_train)

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


In [14]:
y_pred=logreg.predict(X_trial)
y_pred_final = y_pred

In [46]:
#getting validation (testing) data 
test = np.empty((315, 2), dtype=object) 
count = 0
directory = os.path.join(os.getcwd(),'elec-378-sp2023-speech-emotion-classification/test/test')
for filename in os.listdir(directory):
    f = os.path.join(directory, filename)
    # checking if it is a file

    if os.path.isfile(f):
        name = filename[:len(filename)-7]
        test[count][0] = "/"+filename
        test[count][1] = name
        count += 1

df = pd.DataFrame(test)
df.rename(columns={0: "relative_path", 1: "classID"}, inplace=True)
filename = df['relative_path'].to_list()
val_ds = SoundDS(df, directory)

In [48]:
print(description_to_number)
print(y_pred)

{0: 'calm', 1: 'angry', 2: 'neutral', 3: 'fearful', 4: 'surprised', 5: 'sad', 6: 'disgust', 7: 'happy'}
[6 1 5 4 6 7 6 7 5 1 3 1 4 5 7 6 6 7 5 4 5 5 0 4 1 7 0 4 7 7 2 7 7 3 1 7 6
 3 2 7 2 6 1 7 3 0 4 0 3 5 1 3 5 4 6 5 1 7 3 1 0 2 6 1 6 1 1 5 1 7 2 7 0 6
 1 1 0 5 0 2 3 7 6 1 2 5 4 2 5 6 2 2 1 5 6 5 1 6 5 6 3 5 7 4 3 2 5 3 3 5 0
 1 4 4 3 0 3 4 7 3 6 5 0 3 6 7 5 1 6 7 6 3 5 5 7 1 5 0 1 5 3 3 4 1 4 4 6 3
 4 4 7 5 5 1 7 4 2 6 3 0 0 3 7 7 4 4 7 1 7 5 0 7 2 7 7 1 4 1 0 4 1 5 6 1 5
 5 2 7 3 1 4 0 5 5 4 7 4 0 6 4 7 3 6 5 4 7 2 7 5 3 6 5 1 2 1 6 3 4 3 3 1 2
 4 5 5 7 0 1 0 0 4 5 0 3 2 5 0 3 6 7 3 5 6 6 2 7 4 5 6 7 2 4 6 5 5 1 7 6 5
 4 7 3 3 5 6 3 7 7 1 6 7 3 3 5 0 7 7 5 4 4 5 3 1 4 6 6 6 6 2 1 5 5 0 3 0 1
 3 7 6 1 5 0 0 4 0 7 6 0 3 7 1 4 5 3 2]
{'calm': 0, 'angry': 1, 'neutral': 2, 'fearful': 3, 'surprised': 4, 'sad': 5, 'disgust': 6, 'happy': 7}


In [53]:
#Convert to kaggle upload
number_to_label = {v: k for k, v in label_to_number.items()}
# Map each number back to its corresponding label
label = [label_to_number[number] for number in y_pred]
clean_filename = [file[1:-4] for file in filename]

In [54]:
import os
df = pd.DataFrame(list(zip(clean_filename, label)), columns=['filename', 'label'])
df.to_csv("y_kaggle_LOGREG.csv", index=False)

In [None]:
# print(np.sum(y_pred==Y_test)/len(Y_test))

In [None]:
# for data transformation
import numpy as np
# for visualizing the data
import matplotlib.pyplot as plt
# for opening the media file
import scipy.io.wavfile as wavfile

In [None]:
val_ds = trial_ds

In [None]:
# print(description_to_number)
# emotions = ['surprised','sad','happy','fearful','calm','neutral','disgust','angry']

In [9]:
# hyperparameters
batch_size = 256
learning_rate = 1e-5

# other constants
input_size = 128*344
num_classes = 8


In [None]:
inputs = torch.from_numpy(X_train).float()
targets = torch.from_numpy(y_train).long()
# testinputs = torch.from_numpy(X_trial).float()
# testtargets = torch.from_numpy(y_trial).long()


In [None]:
# training validation & test dataset
dataset = TensorDataset(inputs, targets)
# testdataset = TensorDataset(testinputs, testtargets)

# let's use 15% of our training dataset to validate our model
num_rows = 956
val_percent = 0.15
val_size = int(num_rows * val_percent)
train_size = num_rows - val_size
train_ds, val_ds = random_split(dataset, [train_size, val_size])



In [None]:
# dataloaders
train_loader = DataLoader(train_ds, batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size*2)
# test_loader = DataLoader(testdataset, batch_size*2)


In [None]:
img, label = train_ds[3]
plt.imshow(img.reshape((128,-1)), cmap = 'gray')
plt.colorbar()
print("Emotion: ", label)


In [None]:
class MnistModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(input_size, num_classes)
        
    def forward(self, xb):
        xb = xb.reshape(-1, 44032)
        out = self.linear(xb)
        return out
    
    def training_step(self, batch):
        images, labels = batch 
        out = self(images)                  # Generate predictions
        loss = F.cross_entropy(out, labels) # Calculate loss
        return loss
    
    def validation_step(self, batch):
        images, labels = batch 
        out = self(images)                    # Generate predictions
        loss = F.cross_entropy(out, labels)   # Calculate loss
        acc = accuracy(out, labels)           # Calculate accuracy
        return {'val_loss': loss.detach(), 'val_acc': acc.detach()}
        
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      # Combine accuracies
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
    
    def epoch_end(self, epoch, result):
        print("Epoch [{}], val_loss: {:.4f}, val_acc: {:.4f}".format(epoch, result['val_loss'], result['val_acc']))
    
model = MnistModel()


In [None]:
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))


In [None]:
def evaluate(model, val_loader):
    outputs = [model.validation_step(batch) for batch in val_loader]
    return model.validation_epoch_end(outputs)

def fit(epochs, lr, model, train_loader, val_loader, opt_func=torch.optim.SGD):
    history = []
    optimizer = opt_func(model.parameters(), lr)
    for epoch in range(epochs):
        # Training Phase 
        for batch in train_loader:
            loss = model.training_step(batch)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
        # Validation phase
        result = evaluate(model, val_loader)
        model.epoch_end(epoch, result)
        history.append(result)
    return history


In [None]:
evaluate(model, val_loader)

In [None]:
history = fit(50, 1e-5, model, train_loader, val_loader)

In [None]:
# Visualizing how our model performed after each epoch
accuracies = [r['val_acc'] for r in history]
plt.plot(accuracies, '-x')
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.title('Accuracy vs. No. of epochs');


In [None]:

'''
train_ds [audio file #] [tensor 0 or label 1]

3D tensor [num_channels, Mel freq_bands, time_steps]

'''

In [None]:
import matplotlib.pyplot as plt
plt.imshow(X[1][0,:,:], cmap='hot', interpolation='nearest')
plt.show()

In [None]:
import numpy as np
import os
import librosa as lb


In [None]:
data = os.path.join(os.getcwd(),'elec-378-sp2023-speech-emotion-classification/data/data')

data_sr = 48000 # sampling rate of audio files is 48000

# y_maxsize = 253053 # max size audio sample in data
y_maxsize = 300000

X = []
for filename in os.listdir(data):
    audiofile = os.path.join(data,filename)
    
    if os.path.isfile(audiofile):
        y, sr = lb.load(audiofile,sr=data_sr)
        
        y = librosa.util.fix_length(y, size=y_maxsize)
        X.append(y)
        
X = np.array(X)
# print(np.shape(X))

for y in X:
    chroma_stft = lb.feature.chroma_stft(y=y, sr=data_sr)
    chroma_cqt = lb.feature.chroma_cqt(y=y, sr=data_sr)
    chroma_cens = lb.feature.chroma_cens(y=y, sr=data_sr)
#     chroma_stft = lb.feature.chroma_stft(y=y, sr=data_sr)
    mfcc = lb.feature.mfcc(y=y, sr=data_sr)
    rms = lb.feature.rms(y=y)
    
    break
# 
# print(chroma_stft)
print(np.shape(chroma_stft))
print(np.shape(chroma_cqt))
print(np.shape(chroma_cens))
print(np.shape(mfcc))
print(np.shape(rms))





In [None]:
np.set_printoptions(threshold=np.inf)
# print(chroma_stft.T)
print(rms[0][0])