<div style="font-size: 40px; color: red; text-align: center;">RECURRENT NEURAL NETWORKS(RNNs)</div>

In [11]:
# Import Libraries
import os
import librosa
import numpy as np
import torch
from torch.utils.data import Dataset, random_split, DataLoader
from sklearn.utils.class_weight import compute_class_weight
import torch.nn as nn
import torch.optim as optim
import tqdm as tqdm

In [12]:
# Modify Dataset Class
class CremaDataset(Dataset):
    def __init__(self, data_path, emotion=["ANG", "SAD", "DIS", "NEU", "HAP", "FEA"], max_len=200):
        self.data = [] # Store processed MFCCs from audio
        self.labels = [] # Store emotion classes(as intergers)
        self.emotion_map = {"ANG": 0, "SAD": 1, "DIS": 2, "NEU": 3, "HAP": 4, "FEA": 5} # Map emotions into numbers
        self.max_len = max_len # store max length for trim/pad

        for file in os.listdir(data_path):
            # Check for the right file type
            if not file.endswith('.wav'):
                continue 
            # Get emotion
            emotion = file.split('_')[2]
            # Ensure it's all in the map
            if emotion not in self.emotion_map:
                continue
            # Extract MFCCs
            path = os.path.join(data_path,file)
            y, sr = librosa.load(path, sr=16000)
            mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40).T # (time, 40)
            # Trim or pad
            if len(mfcc) < max_len:
                pad_width = max_len - len(mfcc)
                mfcc = np.pad(mfcc, ((0, pad_width), (0, 0)))
            else:
                mfcc = mfcc[:max_len]
            # Normalize
            mfcc = (mfcc - np.mean(mfcc)) / np.std(mfcc)
            # Append data and labels
            self.data.append(mfcc)
            self.labels.append(self.emotion_map[emotion])
    # Check the length of data
    def __len__(self):
        return len(self.data)
    # Conversion for DataLoader, then model training
    def __getitem__(self, idx):
        x = torch.tensor(self.data[idx], dtype=torch.float32) # mfcc matrix into tensor float
        y = torch.tensor(self.labels[idx], dtype=torch.long) # labels into tensor long
        return x, y

In [15]:
# Create model architecture
class EmotionRNN(nn.Module):
    def __init__(self, input_size=40, hidden_size=128, num_layers=2, num_classes=6):
        # Call the constructor for the base class nn.Module
        super(EmotionRNN, self).__init__()
        # Instantiate the architecture
        self.rnn = nn.LSTM(input_size=input_size, hidden_size=hidden_size, 
                           num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
    # The forward pass
    def forward(self, x):
        # x shape: (batch_size, sequence_length, input_size)
        output, (hidden, cell) = self.rnn(x)
        # output shape: (batch_size, seq_len, hidden_size)
        # We take the last time step's output
        last_output = output[:, -1, :] # Shape: (batch_size, hidden_size)
        logits = self.fc(hidden[-1])  # hidden shape: (num_layers, batch, hidden_size)
        return logits

In [29]:
# Create CNN x RNN architecture
class EmotionCRNN(nn.Module):
    def __init__(self, input_size=40, hidden_size=128, num_layers=2, num_classes=6, dropout_rate=0.3):
        # Call the constructor for the base class nn.Module
        super(EmotionCRNN, self).__init__()
        # CNN layer: Extract local features from MFCCs
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=(3, 3), padding=1), # Output: (batch, 16, time, 40)
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.MaxPool2d(kernel_size=(2, 2)), # Output: (batch, 16, time//2, 20)
            nn.Dropout2d(dropout_rate),

            nn.Conv2d(16, 32, kernel_size=(3, 3), padding=1),  # (B, 32, T/2, 20)
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(kernel_size=(2, 2)),                  # (B, 32, T/4, 10)
            nn.Dropout2d(dropout_rate)
        )
        self.rnn_input_size = 32 * 10  # After 2 MaxPool2d layers
        self.rnn = nn.LSTM(
            input_size=self.rnn_input_size,
            hidden_size=hidden_size,
            num_layers=num_layers, batch_first=True,
            bidirectional=True,
            dropout=dropout_rate if num_layers > 1 else 0.0
        )
        # Linear layer doubles hidden_size if bidirectional
        self.fc = nn.Linear(hidden_size * 2, num_classes)

    def forward(self, x):
        x = x.unsqueeze(1) # Add channel dim for processing by CNN
        # CNN Feature extraction
        x = self.cnn(x) # (batch, channels, new_time, new_mfcc)
        # Change shape for RNN
        x = x.permute(0, 2, 1, 3)  # (batch, new_time, channels, new_mfcc)
        x = x.contiguous().view(x.size(0), x.size(1), -1)  # (batch, new_time, channels * new_mfcc)
        # Pass through RNN
        output, (hidden, _) = self.rnn(x)

        # hidden shape: (num_layers * 2, B, hidden_size) -> concat last layer's forward & backward
        last_layer_forward = hidden[-2]  # (B, hidden_size)
        last_layer_backward = hidden[-1]  # (B, hidden_size)
        combined_hidden = torch.cat((last_layer_forward, last_layer_backward), dim=1)  # (B, hidden_size*2)

        logits = self.fc(combined_hidden)  # (B, num_classes)
        return logits

In [30]:
# Process Dataset
dataset = CremaDataset('../data/')
# Split into train and validation sets
train_size = int(0.8 * len(dataset))
train_data, val_data = random_split(dataset, [train_size, (len(dataset) - train_size)])
# Create data loaders
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_loader = DataLoader(val_data, batch_size=32)
# The model, loss function & Optimizer
model = EmotionCRNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
# Training loop begins
for epoch in range(30):
    # Switch model to train mode
    model.train()
    # Initialize loss for each epoch
    total_loss = 0
    # Mini-batch training
    for x_batch, y_batch in train_loader:
        # Logits pre-softmax
        y_pred = model(x_batch)
        # Calculate loss
        loss = criterion(y_pred, y_batch)
        # Clear out gradients from last calculation to prevent accummulation
        optimizer.zero_grad()
        # Backpropagation
        loss.backward()
        # Update model parameters(weights and biases)
        optimizer.step()
        # Apply current epoch loss to total
        total_loss += loss.item()
    # Log progress
    print(f'Epoch {epoch+1} Loss: {total_loss / len(train_loader):4f}')

Epoch 1 Loss: 1.605916
Epoch 2 Loss: 1.468640
Epoch 3 Loss: 1.401549
Epoch 4 Loss: 1.370378
Epoch 5 Loss: 1.321991
Epoch 6 Loss: 1.271911
Epoch 7 Loss: 1.240535
Epoch 8 Loss: 1.222128
Epoch 9 Loss: 1.196238
Epoch 10 Loss: 1.152558
Epoch 11 Loss: 1.127106
Epoch 12 Loss: 1.099697
Epoch 13 Loss: 1.098837
Epoch 14 Loss: 1.073986
Epoch 15 Loss: 1.038205
Epoch 16 Loss: 1.010409
Epoch 17 Loss: 0.990869
Epoch 18 Loss: 0.953155
Epoch 19 Loss: 0.939073
Epoch 20 Loss: 0.916111
Epoch 21 Loss: 0.897056
Epoch 22 Loss: 0.878602
Epoch 23 Loss: 0.891754
Epoch 24 Loss: 0.855107
Epoch 25 Loss: 0.838787
Epoch 26 Loss: 0.811167
Epoch 27 Loss: 0.800190
Epoch 28 Loss: 0.768018
Epoch 29 Loss: 0.745192
Epoch 30 Loss: 0.736223


In [31]:
# Evaluation
# Switch model to evaluate mode
model.eval()
# Initialize counters to keep track of model accuracy
correct = 0
total = 0
# Disable gradient tracking
with torch.no_grad():
    for x_batch, y_batch in val_loader:
        # Forward pass through model
        outputs = model(x_batch)
        # Pick the predicted emotion label
        preds = torch.argmax(outputs, dim=1)
        # Correct predictions in this batch and add to the running total
        correct += (preds == y_batch).sum().item()
        # Add to number of samples ni this batch to total
        total += y_batch.size(0)
# Display final accuracy
print(f"Accuracy: {100 * correct / total:.2f}%")

Accuracy: 60.85%
