In [1]:
import os, glob, time, sys
import pandas as pd
import csv
import random
from math import ceil
from sklearn.utils import shuffle
from sklearn.preprocessing import LabelEncoder
import librosa
import librosa.display
import seaborn as sn
import matplotlib.pyplot as plt
import numpy as np
import joblib
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data_utils
from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader
from sklearn.metrics import confusion_matrix
random.seed(45)
audio_dir = "myAudio"
data_dir = "TESS"


In [2]:
# Given the number of training, testing and validation splits
# this function generates datasets that are as equally balanced as possible over the number of classes.
#

def get_data( dir, num_train, num_test, num_valid ):    
    df = {}
    for folder in glob.iglob(dir+"/*"):
        folder_list = []
        for file in glob.iglob(folder+"/*"):
            folder_list.append(file)
        df[folder] = folder_list
    
    df = pd.DataFrame.from_dict(df)
    num_test = int(num_test/14)
    num_valid =int(num_valid/14)
    test = df.sample(num_test,replace=False)
    df.drop(test.index,inplace=True)
    test = test.stack()
    test = test.to_frame('filenames')
    valid = df.sample(num_valid,replace=False)
    df.drop(valid.index,inplace=True)
    valid= valid.stack()
    valid = valid.to_frame('filenames')
    train = df.stack()
    train = train.to_frame('filenames')
    
    return train['filenames'], test['filenames'], valid['filenames']


In [3]:
tot_train, tot_test, tot_valid = 2240, 280, 280
train_fnames, test_fnames, valid_fnames = get_data( data_dir, tot_train, tot_test, tot_valid )

#save training, testing and validation csv's
csv_train = 'train_filenames.csv'
csv_test = 'test_filenames.csv'
csv_valid = 'valid_filenames.csv'
train_fnames.to_csv( csv_train, index=False )
test_fnames.to_csv( csv_test, index=False )
valid_fnames.to_csv( csv_valid, index=False )


## Audio Features

In [4]:

minDur = 1.254
avgDur = 2.055
maxDur = 2.984
categories = {'angry': [0],
              'disgust': [1],
              'fear': [2],
              'happy': [3],
              'neutral': [4],
              'sad': [5],
              'ps': [6]}

In [5]:
# Standardize length
def replicateGrowWav(y, sr, target_duration):
    k = ceil(target_duration * sr) - y.shape[0]
    if k < 0:
        y = y[:ceil(target_duration * sr)]
    else:
        y = np.pad(y,(0,k),mode='wrap')
    return y

In [6]:

def convertWavToMFCC(file_name, resize_duration):
    y,sr = librosa.load(file_name)
    mfcc = librosa.feature.mfcc(y= replicateGrowWav(y,sr,resize_duration),n_mfcc=13)
    mfcc = mfcc.flatten()
    return mfcc

In [7]:
# Extract features
def extractFeatures( csv_file, desc ):
    df = pd.read_csv(csv_file)
    df['class'] = df['filenames'].apply(lambda x: x.split("_")[-1].split(".")[0]);
    df['mfcc'] = df['filenames'].apply(lambda x: convertWavToMFCC(x,2.984));
    print(f'Done extracting for {desc}')
    return df
    

In [8]:
# Create train test valid
csv_train = 'train_filenames.csv'
csv_test = 'test_filenames.csv'
csv_valid = 'valid_filenames.csv'

X_train= extractFeatures( csv_train, 'Training -> ');
X_test = extractFeatures( csv_test, 'Testing -> ');
X_valid = extractFeatures( csv_valid, 'Validating -> ');




Done extracting for Training -> 
Done extracting for Testing -> 
Done extracting for Validating -> 


In [9]:
## Set LSTM-related parameters
input_size = 13
h1 = 50
output_dim = 7
num_layers = 1


In [10]:
# Batching
def lstm_style_batching(batch):
    data = [item[0] for item in batch]
    label = [item[1] for item in batch]
    data = torch.cat(data, dim=1)
    label = torch.cat(label, dim=0)
    return data, label

In [11]:
#Loader for MFCC speech data (reshapes the data such that input size is 13)
class SpeechLoader(Dataset):
    def __init__(self, dataset_file):
        self.label = list()
        self.dataset = list()
        try:
            df = dataset_file
            for index in range(len(df)):
                self.label.append(torch.tensor(categories[df.loc[index,'class']]))
                np_array = np.array(df.loc[index,'mfcc'], dtype=np.float32).reshape(input_size, -1)
                self.dataset.append(torch.from_numpy(np_array).permute(1, 0).reshape(-1, 1, input_size))
        except FileNotFoundError:
            print('generate features for [' + dataset_file + ']')
            exit(1)
            
    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        return self.dataset[idx], self.label[idx]

    def to(self, device):
        for i in range(len(self.dataset)):
            self.label[i] = self.label[i].to(device=device)
            self.dataset[i] = self.dataset[i].to(device=device)
        return self


In [12]:
# loaders
train_loader = DataLoader(SpeechLoader(X_train),batch_size=32,collate_fn=lstm_style_batching, shuffle=True)
test_loader = DataLoader(SpeechLoader(X_test),batch_size=32, collate_fn=lstm_style_batching, shuffle=True)
valid_loader = DataLoader(SpeechLoader(X_valid),batch_size=32, collate_fn=lstm_style_batching, shuffle=True)



In [13]:
#Here is the LSTM class for speech emotion
class LSTMSpeechEmo(nn.Module):
    def __init__(self, input_dim, hidden_dim, target_size, num_lstm_layers):
        super(LSTMSpeechEmo,self).__init__()
        self.input_dim = input_dim
        self.target_size = target_size
        self.hidden_dim = hidden_dim
        self.num_lstm_layers = num_lstm_layers
        self.lstm = nn.LSTM(self.input_dim,self.hidden_dim,self.num_lstm_layers)
        self.fc= nn.Linear(self.hidden_dim,self.target_size)

    def forward(self, x):
        h_0 =autograd.Variable(torch.zeros(self.num_lstm_layers, x.size(1), self.hidden_dim))
        c_0 =autograd.Variable(torch.zeros(self.num_lstm_layers, x.size(1), self.hidden_dim))
        out, (h_out, _) = self.lstm(x, (h_0, c_0))
        out = h_out
        output = out.view(out.size(1),-1)
        output = self.fc(output)
        # BUG WAS HERE
        # returned out INSTEAD OF output
        # return out
        return output
    
   

In [14]:
# confusion matrix
def validation_metrics (model, dataset,nb_classes=7):
    predlist=torch.zeros(0,dtype=torch.long, device='cpu')
    labellist=torch.zeros(0,dtype=torch.long, device='cpu')
    
    labels =np.asarray(['angry','disgust','fear','happy','neutral','sad','ps'])
    demo_conf_mat = np.vstack((labels,labels,labels,labels,labels,labels,labels))
    
    with torch.no_grad():
        model.eval()
        for i, (inputs, classes) in enumerate(dataset):
            inputs = inputs
            classes = classes
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            predlist=torch.cat([predlist,preds.view(-1)])
            labellist=torch.cat([labellist,classes.view(-1)])

    # Confusion matrix
    conf_mat=confusion_matrix(labellist.numpy(), predlist.numpy())
    print(conf_mat)
    print('READ CONFUSION MATRIX AS DEMO CONFUSION MATRIX')
    print('DEMO CONFUSION MATRIX:')
    print(demo_conf_mat)
    # Per-class accuracy
    class_accuracy=100*conf_mat.diagonal()/conf_mat.sum(1)
    total_acc = (conf_mat.sum() - (conf_mat.sum() - conf_mat.diagonal().sum())) / conf_mat.sum()
    print('PER CLASS ACCURACY: ',class_accuracy)
    print('Total ACCURACY',total_acc)
    
    return class_accuracy, conf_mat


In [16]:
loss_fn = nn.CrossEntropyLoss()
model = LSTMSpeechEmo(input_size, h1, output_dim, num_layers) 
optimizer = optim.Adam(params=model.parameters())

#####################
# Train model
#####################
model.train()
train_samples,test_samples,valid_samples = 2240,280,280
epochs = 10
history=[]
for epoch in range(epochs):
    print("Epoch: {}/{}".format(epoch + 1, epochs))
    train_loss = 0.0
    train_acc = 0.0
    valid_loss = 0.0
    valid_acc = 0.0
    for ind,(x,y) in enumerate(train_loader):
        optimizer.zero_grad()
        output = model(x)
        loss = loss_fn(output,y)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * x.size(1)
        ret, predictions = torch.max(output.data, 1)
        correct_counts = predictions.eq(y.data.view_as(predictions))
        acc = torch.mean(correct_counts.type(torch.FloatTensor))
        train_acc += acc.item() * x.size(1)
#         uncomment to view loss per batch
        print("Batch number: {:03d}, Training: Loss: {:.4f}, Accuracy: {:.4f}".format(ind, loss.item(), acc.item()))
    with torch.no_grad():
        model.eval()
        for ind, (x, y) in enumerate(valid_loader):
            output = model(x)
            loss = loss_fn(output, y)
            valid_loss += loss.item() * x.size(1)
            ret, predictions = torch.max(output.data, 1)
            correct_counts = predictions.eq(y.data.view_as(predictions))
            acc = torch.mean(correct_counts.type(torch.FloatTensor))
            valid_acc += acc.item() * x.size(1)
#             uncomment to view loss per batch
            print("Validation Batch number: {:03d}, Validation: Loss: {:.4f}, Accuracy: {:.4f}".format(ind,loss.item(),acc.item()))

        
        
        

Epoch: 1/10
Batch number: 000, Training: Loss: 1.9549, Accuracy: 0.1250
Batch number: 001, Training: Loss: 1.9470, Accuracy: 0.1562
Batch number: 002, Training: Loss: 1.9493, Accuracy: 0.1875
Batch number: 003, Training: Loss: 1.9204, Accuracy: 0.2500
Batch number: 004, Training: Loss: 1.9433, Accuracy: 0.1875
Batch number: 005, Training: Loss: 1.9671, Accuracy: 0.1250
Batch number: 006, Training: Loss: 1.9577, Accuracy: 0.1250
Batch number: 007, Training: Loss: 1.9394, Accuracy: 0.1875
Batch number: 008, Training: Loss: 1.8944, Accuracy: 0.1875
Batch number: 009, Training: Loss: 1.9694, Accuracy: 0.1875
Batch number: 010, Training: Loss: 1.9786, Accuracy: 0.1250
Batch number: 011, Training: Loss: 1.8196, Accuracy: 0.3750
Batch number: 012, Training: Loss: 1.8915, Accuracy: 0.4062
Batch number: 013, Training: Loss: 1.9189, Accuracy: 0.2500
Batch number: 014, Training: Loss: 1.8860, Accuracy: 0.2812
Batch number: 015, Training: Loss: 1.8681, Accuracy: 0.3438
Batch number: 016, Training:

In [17]:
# Confusion matrix and performance metrics
validation_metrics(model,test_loader);

[[40  0  0  0  0  0  0]
 [ 0 39  0  0  1  0  0]
 [ 0  1 38  1  0  0  0]
 [ 0  0  0 39  0  0  1]
 [ 0  2  0  0 37  1  0]
 [ 0  0  0  0  4 36  0]
 [ 0  1  0  1  0  0 38]]
READ CONFUSION MATRIX AS DEMO CONFUSION MATRIX
DEMO CONFUSION MATRIX:
[['angry' 'disgust' 'fear' 'happy' 'neutral' 'sad' 'ps']
 ['angry' 'disgust' 'fear' 'happy' 'neutral' 'sad' 'ps']
 ['angry' 'disgust' 'fear' 'happy' 'neutral' 'sad' 'ps']
 ['angry' 'disgust' 'fear' 'happy' 'neutral' 'sad' 'ps']
 ['angry' 'disgust' 'fear' 'happy' 'neutral' 'sad' 'ps']
 ['angry' 'disgust' 'fear' 'happy' 'neutral' 'sad' 'ps']
 ['angry' 'disgust' 'fear' 'happy' 'neutral' 'sad' 'ps']]
PER CLASS ACCURACY:  [100.   97.5  95.   97.5  92.5  90.   95. ]
Total ACCURACY 0.9535714285714286
