<a href="https://colab.research.google.com/github/wasef-c/emotion_rec/blob/main/BiLSTM_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Dependencies
import numpy as np
import pandas as pd
import os
import librosa
import matplotlib.pyplot as plt
import gc
import time
from tqdm import tqdm, tqdm_notebook; tqdm.pandas() # Progress bar
from sklearn.metrics import label_ranking_average_precision_score
from sklearn.model_selection import train_test_split

# Machine Learning
import tensorflow as tf
from keras import backend as K
from tensorflow.keras.layers import Layer, InputSpec

# from keras.engine.topology import Layer
from keras import initializers, regularizers, constraints, optimizers, layers

from tensorflow.keras.layers import (Dense, Bidirectional, ELU,
                          Dropout, LeakyReLU, Conv1D, BatchNormalization)
from keras.models import Sequential
# from keras.optimizers import Adam
from keras.callbacks import EarlyStopping


# Set seed for reproducability
seed = 1234
np.random.seed(seed)
tf.random.set_seed(seed)

t_start = time.time()

In [None]:
EMOTIONS  = {
    0: 'neutral',
    1: 'happy',
    2: 'sad',
    3: 'angry',
    4: 'fearful',
    5: 'disgust',
}

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
save_directory = r'/content/drive/MyDrive/MaSc/emo_rec/Saved_Sets/008'
X = np.load(os.path.join(save_directory, 'X_Tr008.npy'))
Y = np.load(os.path.join(save_directory, 'Y_Tr008.npy'))


X_test = np.load(os.path.join(save_directory, 'X_Te008.npy'))
Y_test = np.load(os.path.join(save_directory, 'Y_Te008.npy'))

In [None]:

x_train, x_val, y_train, y_val = train_test_split(X, Y, test_size=0.1, random_state=42)
from keras.preprocessing.sequence import pad_sequences
x_train_padded = pad_sequences(x_train, maxlen=87, padding='post', truncating='post')
x_val_padded = pad_sequences(x_val, maxlen=87, padding='post', truncating='post')


In [None]:
import torch
import torch.nn as nn

def splitIntoChunks(mel_spec,win_size,stride):
    t = mel_spec.shape[1]
    num_of_chunks = int(t/stride)
    chunks = []
    for i in range(num_of_chunks):
        chunk = mel_spec[:,i*stride:i*stride+win_size]
        if chunk.shape[1] == win_size:
            chunks.append(chunk)
    return np.stack(chunks,axis=0)


class TimeDistributed(nn.Module):
    def __init__(self, module):
        super(TimeDistributed, self).__init__()
        self.module = module

    def forward(self, x):

        if len(x.size()) <= 2:
            return self.module(x)
        # squash samples and timesteps into a single axis
        elif len(x.size()) == 3: # (samples, timesteps, inp1)
            x_reshape = x.contiguous().view(-1, x.size(2))  # (samples * timesteps, inp1)
        elif len(x.size()) == 4: # (samples,timesteps,inp1,inp2)
            x_reshape = x.contiguous().view(-1, x.size(2), x.size(3)) # (samples*timesteps,inp1,inp2)
        else: # (samples,timesteps,inp1,inp2,inp3)
            x_reshape = x.contiguous().view(-1, x.size(2), x.size(3),x.size(4)) # (samples*timesteps,inp1,inp2,inp3)

        y = self.module(x_reshape)

        # we have to reshape Y
        if len(x.size()) == 3:
            y = y.contiguous().view(x.size(0), -1, y.size(1))  # (samples, timesteps, out1)
        elif len(x.size()) == 4:
            y = y.contiguous().view(x.size(0), -1, y.size(1), y.size(2)) # (samples, timesteps, out1,out2)
        else:
            y = y.contiguous().view(x.size(0), -1, y.size(1), y.size(2),y.size(3)) # (samples, timesteps, out1,out2, out3)
        return y

In [None]:
''' https://github.com/Data-Science-kosta/Speech-Emotion-Classification-with-PyTorch/blob/master/notebooks/stacked_cnn_attention_lstm.ipynb'''
class HybridModel(nn.Module):
    def __init__(self,num_emotions, dropout_rate, hidden_size, lstm_layers):
        super().__init__()
        # conv block
        self.conv2Dblock = nn.Sequential(
            # 1. conv block
            TimeDistributed(nn.Conv2d(in_channels=1,
                                   out_channels=16,
                                   kernel_size=3,
                                   stride=1,
                                   padding=1
                                  )),
            TimeDistributed(nn.BatchNorm2d(16)),
            TimeDistributed(nn.ReLU()),
            TimeDistributed(nn.MaxPool2d(kernel_size=2, stride=2)),
            TimeDistributed(nn.Dropout(p=dropout_rate)),
            # 2. conv block
            TimeDistributed(nn.Conv2d(in_channels=16,
                                   out_channels=32,
                                   kernel_size=3,
                                   stride=1,
                                   padding=1
                                  )),
            TimeDistributed(nn.BatchNorm2d(32)),
            TimeDistributed(nn.ReLU()),
            TimeDistributed(nn.MaxPool2d(kernel_size=4, stride=4)),
            TimeDistributed(nn.Dropout(p=dropout_rate)),
            # 3. conv block
            TimeDistributed(nn.Conv2d(in_channels=32,
                                   out_channels=64,
                                   kernel_size=3,
                                   stride=1,
                                   padding=1
                                  )),
            TimeDistributed(nn.BatchNorm2d(64)),
            TimeDistributed(nn.ReLU()),
            TimeDistributed(nn.MaxPool2d(kernel_size=4, stride=4)),
            TimeDistributed(nn.Dropout(p=dropout_rate))
        )
        # LSTM block

        # hidden_size = 64
        self.lstm = nn.LSTM(input_size=1024,hidden_size=hidden_size,bidirectional=True, batch_first=True, num_layers=lstm_layers)

        self.dropout_lstm = nn.Dropout(p=0.4)
        self.attention_linear = nn.Linear(2*hidden_size,1) # 2*hidden_size for the 2 outputs of bidir LSTM
        # Linear softmax layer
        self.out_linear = nn.Linear(2*hidden_size,num_emotions)

        # hidden_size = 32
        # self.lstm = nn.LSTM(input_size=1024,hidden_size=hidden_size,bidirectional=False, batch_first=True)
        # ''' # ADDED FOR GPU
        # self.lstm.flatten_parameters() '''

        # self.dropout_lstm = nn.Dropout(p=0.4)
        # self.attention_linear = nn.Linear(hidden_size,1) # 2*hidden_size for the 2 outputs of bidir LSTM
        # # Linear softmax layer
        # self.out_linear = nn.Linear(hidden_size,num_emotions)
    def forward(self,x):
        conv_embedding = self.conv2Dblock(x)
        conv_embedding = torch.flatten(conv_embedding, start_dim=2) # do not flatten batch dimension and time
        lstm_embedding, (h,c) = self.lstm(conv_embedding)
        lstm_embedding = self.dropout_lstm(lstm_embedding)
        # lstm_embedding (batch, time, hidden_size*2)
        batch_size,T,_ = lstm_embedding.shape
        attention_weights = [None]*T
        for t in range(T):
            embedding = lstm_embedding[:,t,:]
            attention_weights[t] = self.attention_linear(embedding)
        attention_weights_norm = nn.functional.softmax(torch.stack(attention_weights,-1),dim=-1)
        attention = torch.bmm(attention_weights_norm,lstm_embedding) # (Bx1xT)*(B,T,hidden_size*2)=(B,1,2*hidden_size)
        attention = torch.squeeze(attention, 1)
        output_logits = self.out_linear(attention)
        output_softmax = nn.functional.softmax(output_logits,dim=1)
        return output_logits, output_softmax, attention_weights_norm


In [None]:
def loss_fnc(predictions, targets):
    return nn.CrossEntropyLoss()(input=predictions,target=targets)


def make_train_step(model, loss_fnc, optimizer):
    def train_step(X,Y):
        # set model to train mode
        model.train()
        # forward pass
        output_logits, output_softmax, attention_weights_norm = model(X)
        predictions = torch.argmax(output_softmax,dim=1)
        accuracy = torch.sum(Y==predictions)/float(len(Y))
        # compute loss
        loss = loss_fnc(output_logits, Y)
        # compute gradients
        loss.backward()
        # update parameters and zero gradients
        optimizer.step()
        optimizer.zero_grad()
        return loss.item(), accuracy*100
    return train_step

def make_validate_fnc(model,loss_fnc):
    def validate(X,Y):
        with torch.no_grad():
            model.eval()
            output_logits, output_softmax, attention_weights_norm = model(X)
            predictions = torch.argmax(output_softmax,dim=1)
            accuracy = torch.sum(Y==predictions)/float(len(Y))
            loss = loss_fnc(output_logits,Y)
        return loss.item(), accuracy*100, predictions
    return validate


In [None]:
# get chunks
# train set
mel_train_chunked = []
for mel_spec in x_train:
    chunks = splitIntoChunks(mel_spec, win_size=128,stride=64)
    mel_train_chunked.append(chunks)
print("Number of chunks is {}".format(chunks.shape[0]))
# val set
mel_val_chunked = []
for mel_spec in x_val:
    chunks = splitIntoChunks(mel_spec, win_size=128,stride=64)
    mel_val_chunked.append(chunks)
print("Number of chunks is {}".format(chunks.shape[0]))
# test set
mel_test_chunked = []
for mel_spec in X_test:
    chunks = splitIntoChunks(mel_spec, win_size=128,stride=64)
    mel_test_chunked.append(chunks)
print("Number of chunks is {}".format(chunks.shape[0]))

X_train = np.stack(mel_train_chunked,axis=0)
X_train = np.expand_dims(X_train,2)
print('Shape of X_train: ',X_train.shape)
X_val = np.stack(mel_val_chunked,axis=0)
X_val = np.expand_dims(X_val,2)
print('Shape of X_val: ',X_val.shape)
X_test = np.stack(mel_test_chunked,axis=0)
X_test = np.expand_dims(X_test,2)
print('Shape of X_test: ',X_test.shape)

Number of chunks is 1
Number of chunks is 1
Number of chunks is 1
Shape of X_train:  (13010, 1, 1, 87, 128)
Shape of X_val:  (1446, 1, 1, 87, 128)
Shape of X_test:  (3614, 1, 1, 87, 128)


In [None]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()

b,t,c,h,w = X_train.shape
X_train = np.reshape(X_train, newshape=(b,-1))
X_train = scaler.fit_transform(X_train)
X_train = np.reshape(X_train, newshape=(b,t,c,h,w))

b,t,c,h,w = X_test.shape
X_test = np.reshape(X_test, newshape=(b,-1))
X_test = scaler.transform(X_test)
X_test = np.reshape(X_test, newshape=(b,t,c,h,w))

b,t,c,h,w = X_val.shape
X_val = np.reshape(X_val, newshape=(b,-1))
X_val = scaler.transform(X_val)
X_val = np.reshape(X_val, newshape=(b,t,c,h,w))

In [None]:
import matplotlib.pyplot as plt
from IPython.display import clear_output

import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
import torch.nn as nn
import numpy as np

# Assuming X_train, y_train, X_val, y_val are your training and validation data


EPOCHS = 2
DATASET_SIZE = X_train.shape[0]
BATCH_SIZE = 64
# device = 'cuda' if torch.cuda.is_available() else 'cpu'
device = 'cpu'
print('Selected device is {}'.format(device))
# model = HybridModel(num_emotions=6).to(device)
# print('Number of trainable params: ',sum(p.numel() for p in model.parameters()))
# OPTIMIZER = torch.optim.SGD(model.parameters(),lr=0.01, weight_decay=1e-3, momentum=0.8)


accuracies = []
val_accuracies = []
val_losses = []
losses = []

# Initialize figure for live plot
# plt.figure(figsize=(10, 5))


# Assuming X_train, y_train, X_val, y_val are your training and validation data

# Define hyperparameters to tune
learning_rates = [0.001, 0.01, 0.05, 0.1]
momentums = [0.8, 0.85, 0.9, 0.95]
dropout_rates = [0.2, 0.3, 0.4, 0.5]

lstm_hidden_sizes = [64, 128, 256]  # Try different hidden sizes
lstm_layers = [1, 2]
# Add other hyperparameters you want to tune

best_accuracy = 0.0
best_hyperparameters = {}
cuurent_hparam = {}

for lr in learning_rates:
    for momentum in momentums:
        for dr in dropout_rates:
            for hs in lstm_hidden_sizes:
                for ly in lstm_layers:
                    cuurent_hparam = {'lr': lr, 'momentum': momentum,
                                      'dropout': dr, 'hidden_LSTM': hs, 'lstm_layers': ly}
                    print(cuurent_hparam)
                    model = HybridModel(
                        num_emotions=6, dropout_rate=dr, hidden_size=hs, lstm_layers=ly).to(device)
                    optimizer = torch.optim.SGD(
                        model.parameters(), lr=lr, momentum=momentum)

                    train_step = make_train_step(
                        model, loss_fnc, optimizer=optimizer)
                    validate = make_validate_fnc(model, loss_fnc)

                    for epoch in range(EPOCHS):
                        # Existing code...

                        epoch_acc = 0
                        epoch_loss = 0

                        # shuffle data
                        ind = np.random.permutation(DATASET_SIZE)
                        X_train = X_train[ind, :, :, :, :]
                        y_train = y_train[ind]

                        iters = int(DATASET_SIZE / BATCH_SIZE)
                        for i in range(iters):
                            batch_start = i * BATCH_SIZE
                            batch_end = min(
                                batch_start + BATCH_SIZE, DATASET_SIZE)
                            actual_batch_size = batch_end - batch_start
                            X = X_train[batch_start:batch_end, :, :, :, :]
                            Y = y_train[batch_start:batch_end]
                            X_tensor = torch.tensor(X, device=device).float()
                            Y_tensor = torch.tensor(
                                Y, dtype=torch.long, device=device)

                            loss, acc = train_step(X_tensor, Y_tensor)
                            epoch_acc += acc * actual_batch_size / DATASET_SIZE
                            epoch_loss += loss * actual_batch_size / DATASET_SIZE
                            print(f"\r Epoch {epoch}: iteration {i}/{iters}", end='')

                        X_val_tensor = torch.tensor(
                            X_val, device=device).float()
                        Y_val_tensor = torch.tensor(
                            y_val, dtype=torch.long, device=device)
                        val_loss, val_acc, _ = validate(
                            X_val_tensor, Y_val_tensor)
                        val_losses.append(val_loss)
                        val_accuracies.append(val_acc)

                        if val_acc > best_accuracy:
                            best_accuracy = val_acc
                            best_hyperparameters = {
                                'lr': lr, 'momentum': momentum, 'dropout': dr}
                            print(best_hyperparameters)
                            # Optionally, save the best model
                            torch.save(model.state_dict(), 'best_model.pth')

                        # Print progress
                        # print('')
                        print(f"Epoch {epoch} --> loss:{epoch_loss:.4f}, acc:{epoch_acc:.2f}%, val_loss:{val_loss:.4f}, val_acc:{val_acc:.2f}%")

                        # Append metrics to lists
                        losses.append(epoch_loss)
                        accuracies.append(epoch_acc)

                        # Print epoch metrics
                        # clear_output(wait=True)
                        # display(
                        #     f"Epoch {epoch} --> loss:{epoch_loss:.4f}, acc:{epoch_acc:.2f}%, val_loss:{val_loss:.4f}, val_acc:{val_acc:.2f}%")

                    # Reset model and optimizer for each hyperparameter combination


In [None]:
import matplotlib.pyplot as plt
from IPython.display import clear_output
import numpy as np
import torch
from torch import nn
from torch.optim import Adam

EPOCHS=100
DATASET_SIZE = X_train.shape[0]
BATCH_SIZE = 128
# device = 'cuda' if torch.cuda.is_available() else 'cpu'
device = 'cpu'
print('Selected device is {}'.format(device))


# Set constants for early stopping
PATIENCE = 10  # Number of epochs to wait before stopping if no improvement
best_val_loss = float('inf')
no_improvement_count = 0


model = HybridModel(num_emotions=6, dropout_rate=0.2, hidden_size= 256,lstm_layers=1 ).to(device)
OPTIMIZER = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)
train_step = make_train_step(model, loss_fnc, optimizer=OPTIMIZER)
validate = make_validate_fnc(model,loss_fnc)

accuracies = []
val_accuracies = []
val_losses = []
losses = []
for epoch in range(EPOCHS):
    # Existing code...

    epoch_acc = 0
    epoch_loss = 0

    # shuffle data
    ind = np.random.permutation(DATASET_SIZE)
    X_train = X_train[ind, :, :, :, :]
    y_train = y_train[ind]

    iters = int(DATASET_SIZE / BATCH_SIZE)
    for i in range(iters):
        batch_start = i * BATCH_SIZE
        batch_end = min(batch_start + BATCH_SIZE, DATASET_SIZE)
        actual_batch_size = batch_end - batch_start
        X = X_train[batch_start:batch_end, :, :, :, :]
        Y = y_train[batch_start:batch_end]
        X_tensor = torch.tensor(X, device=device).float()
        Y_tensor = torch.tensor(Y, dtype=torch.long, device=device)

        loss, acc = train_step(X_tensor, Y_tensor)
        epoch_acc += acc * actual_batch_size / DATASET_SIZE
        epoch_loss += loss * actual_batch_size / DATASET_SIZE
        print(f"\r Epoch {epoch}: iteration {i}/{iters}", end='')

    X_val_tensor = torch.tensor(X_val, device=device).float()
    Y_val_tensor = torch.tensor(y_val, dtype=torch.long, device=device)
    val_loss, val_acc, _ = validate(X_val_tensor, Y_val_tensor)
    val_losses.append(val_loss)
    val_accuracies.append(val_acc)

    # Append metrics to lists
    losses.append(epoch_loss)
    accuracies.append(epoch_acc)

    # Check for early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        no_improvement_count = 0
    else:
        no_improvement_count += 1

    if no_improvement_count >= PATIENCE:
        print(f"Early stopping at epoch {epoch} as there's no improvement in validation loss.")
        break

    # Print epoch metrics
    display(f"Epoch {epoch} --> loss:{epoch_loss:.4f}, acc:{epoch_acc:.2f}%, val_loss:{val_loss:.4f}, val_acc:{val_acc:.2f}%")

    plt.figure(figsize=(12, 6))
    # Update live plot
    clear_output(wait=True)
    plt.subplot(1, 2, 1)
    plt.plot(losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(accuracies, label='Training Accuracy')
    plt.plot(val_accuracies, label='Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.annotate(f'Last Epoch Acc: {epoch_acc:.2f}%', xy=(epoch-1, accuracies[-1]), xytext=(epoch-1, accuracies[-1] + 5),
             arrowprops=dict(facecolor='black', arrowstyle='->'), fontsize=8)

    plt.legend()

    plt.tight_layout()
    plt.show()


In [None]:
SAVE_PATH = os.path.join(os.getcwd(),'models')
os.makedirs('models',exist_ok=True)
torch.save(model.state_dict(),os.path.join(SAVE_PATH,'cnn_attention_lstm_model_8_e.pt'))
print('Model is saved to {}'.format(os.path.join(SAVE_PATH,'cnn_attention_lstm_model_8_e.pt')))

In [None]:
LOAD_PATH = os.path.join(os.getcwd(),'models')
model = HybridModel(len(EMOTIONS),dropout_rate=0.2, hidden_size= 256,lstm_layers=2 ).to(device)
model.load_state_dict(torch.load(os.path.join(LOAD_PATH,'cnn_attention_lstm_model_8_d.pt')))
print('Model is loaded from {}'.format(os.path.join(LOAD_PATH,'cnn_attention_lstm_model_8_d.pt')))