# Speech Emotion Recognition Model Training
## Multi-Branch Model with MFCC and Prosodic Features

This notebook implements a deep learning model that combines MFCC and prosodic features for speech emotion recognition using PyTorch.

In [4]:
import pickle

# Load training and test data
with open('data/train_data.pkl', 'rb') as f:
    data = pickle.load(f)

# Extract data components
X_train_mfcc = data['X_train_mfcc']
X_train_prosodic = data['X_train_prosodic']
X_test_mfcc = data['X_test_mfcc']
X_test_prosodic = data['X_test_prosodic']
y_train = data['y_train']
y_test = data['y_test']
encoder = data['encoder']
feature_names = data['feature_names']

print("Data successfully loaded!")

Data successfully loaded!


In [5]:
# Convert string labels to numerical values
from sklearn.preprocessing import LabelEncoder

# Create label encoder
label_encoder = LabelEncoder()

# Fit and transform the labels
y_train = label_encoder.fit_transform(y_train)
y_test = label_encoder.transform(y_test)

print("Labels encoded to numerical values.")
print("Label mapping:", dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_))))

Labels encoded to numerical values.
Label mapping: {'angry': np.int64(0), 'calm': np.int64(1), 'disgust': np.int64(2), 'fear': np.int64(3), 'happy': np.int64(4), 'neutral': np.int64(5), 'sad': np.int64(6), 'surprise': np.int64(7)}


In [None]:
# Add data preprocessing functions
import numpy as np  # Added missing import

def normalize_features(X):
    """Normalize features using z-score normalization"""
    mean = np.mean(X, axis=0)
    std = np.std(X, axis=0)
    return (X - mean) / (std + 1e-8), mean, std

# Normalize MFCC and prosodic features
X_train_mfcc_norm, mfcc_mean, mfcc_std = normalize_features(X_train_mfcc)
X_test_mfcc_norm, _, _ = normalize_features(X_test_mfcc)
X_train_prosodic_norm, prosodic_mean, prosodic_std = normalize_features(X_train_prosodic)
X_test_prosodic_norm, _, _ = normalize_features(X_test_prosodic)

print("Features normalized. Shapes:")
print(f"MFCC train: {X_train_mfcc_norm.shape}, test: {X_test_mfcc_norm.shape}")
print(f"Prosodic train: {X_train_prosodic_norm.shape}, test: {X_test_prosodic_norm.shape}")

NameError: name 'np' is not defined

In [None]:
import os
import pickle
import torch
import math  # Added missing import
from torch.utils.data import DataLoader, TensorDataset, Dataset
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
from tqdm.auto import tqdm
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

print("Data shapes from feature engineering:")
print(f"MFCC shape: {X_train_mfcc.shape}")
print(f"Prosodic features shape: {X_train_prosodic.shape}")
print(f"Test MFCC shape: {X_test_mfcc.shape}")
print(f"Test prosodic shape: {X_test_prosodic.shape}")
print(f"Labels: {len(y_train)} training, {len(y_test)} test samples")

Using device: cuda
Data shapes from feature engineering:
MFCC shape: (1152, 1, 128, 345)
Prosodic features shape: (1152, 9)
Test MFCC shape: (288, 1, 128, 345)
Test prosodic shape: (288, 9)
Labels: 1152 training, 288 test samples


In [None]:
class ProsodyAwareAttention(nn.Module):
    def __init__(self, mfcc_dim, prosodic_dim):
        super(ProsodyAwareAttention, self).__init__()
        # Project both features to the same dimension for attention
        self.hidden_dim = 256  # Fixed hidden dimension
        
        # Transform MFCC features (from BiLSTM output)
        self.mfcc_transform = nn.Linear(mfcc_dim, self.hidden_dim)
        
        # Transform prosodic features first to match MFCC hidden dimension
        self.prosodic_branch = nn.Sequential(
            nn.Linear(prosodic_dim, 64),
            nn.LayerNorm(64),
            nn.ReLU(),
            nn.Linear(64, self.hidden_dim),
            nn.LayerNorm(self.hidden_dim),
            nn.ReLU(),
        )
        
        # Attention mechanism
        self.attention = nn.Sequential(
            nn.Linear(self.hidden_dim, self.hidden_dim // 2),
            nn.Tanh(),
            nn.Linear(self.hidden_dim // 2, 1)
        )
        
    def forward(self, mfcc_features, prosodic_features):
        batch_size, time_steps, _ = mfcc_features.shape
        
        # Transform MFCC features
        mfcc_transformed = self.mfcc_transform(mfcc_features)  # [B, T, hidden_dim]
        
        # Transform prosodic features
        prosodic_transformed = self.prosodic_branch(prosodic_features)  # [B, hidden_dim]
        prosodic_transformed = prosodic_transformed.unsqueeze(1)  # [B, 1, hidden_dim]
        prosodic_transformed = prosodic_transformed.expand(-1, time_steps, -1)  # [B, T, hidden_dim]
        
        # Combine features using element-wise multiplication
        combined = mfcc_transformed * prosodic_transformed  # [B, T, hidden_dim]
        
        # Calculate attention weights
        attention_weights = self.attention(combined)  # [B, T, 1]
        attention_weights = torch.softmax(attention_weights, dim=1)  # [B, T, 1]
        
        # Apply attention to MFCC features
        context_vector = (mfcc_features * attention_weights).sum(dim=1)  # [B, mfcc_dim]
        
        return context_vector

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.shortcut = nn.Sequential()
        if in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1),
                nn.BatchNorm2d(out_channels)
            )
    
    def forward(self, x):
        residual = self.shortcut(x)
        out = nn.ReLU()(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += residual
        out = nn.ReLU()(out)
        return out

class EmotionRecognitionModel(nn.Module):
    def __init__(self, mfcc_input_size=(128, 345), prosodic_input_size=9, num_classes=6):
        super(EmotionRecognitionModel, self).__init__()
        
        # MFCC branch with simplified CNN
        self.mfcc_branch = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            ResidualBlock(32, 64),
            nn.MaxPool2d(2, 2),
            ResidualBlock(64, 128),
            nn.MaxPool2d(2, 2)
        )
        
        # Calculate CNN output size
        self.cnn_output_height = mfcc_input_size[0] // 8
        self.cnn_output_width = mfcc_input_size[1] // 8
        
        # BiLSTM for temporal modeling
        lstm_input_size = 128  # From CNN output channels
        lstm_hidden_size = 128
        self.lstm = nn.LSTM(
            input_size=lstm_input_size,
            hidden_size=lstm_hidden_size,
            num_layers=2,
            batch_first=True,
            bidirectional=True,
            dropout=0.3
        )
        
        # Prosody-aware attention
        self.attention = ProsodyAwareAttention(lstm_hidden_size * 2, prosodic_input_size)  # *2 for bidirectional
        
        # Prosodic features branch
        self.prosodic_branch = nn.Sequential(
            nn.Linear(prosodic_input_size, 64),
            nn.LayerNorm(64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(64, 32),
            nn.LayerNorm(32),
            nn.ReLU(),
            nn.Dropout(0.3)
        )
        
        # Combined layers
        lstm_output_size = lstm_hidden_size * 2  # *2 for bidirectional
        prosodic_output_size = 32
        combined_size = lstm_output_size + prosodic_output_size
        
        self.classifier = nn.Sequential(
            nn.Linear(combined_size, 128),
            nn.LayerNorm(128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 64),
            nn.LayerNorm(64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(64, num_classes)
        )
        
    def forward(self, mfcc, prosodic):
        batch_size = mfcc.size(0)
        
        # Process MFCC features
        if len(mfcc.shape) == 3:
            mfcc = mfcc.unsqueeze(1)  # Add channel dimension [B, 1, H, W]
        
        # CNN feature extraction
        mfcc_features = self.mfcc_branch(mfcc)  # [B, C, H, W]
        
        # Reshape for LSTM: [batch, channels, height, width] -> [batch, time_steps, features]
        mfcc_features = mfcc_features.permute(0, 2, 3, 1)  # [B, H, W, C]
        time_steps = self.cnn_output_height * self.cnn_output_width
        mfcc_features = mfcc_features.reshape(batch_size, time_steps, 128)  # [B, T, F]
        
        # BiLSTM processing
        lstm_out, _ = self.lstm(mfcc_features)  # [B, T, 2*hidden_size]
        
        # Process prosodic features
        prosodic_output = self.prosodic_branch(prosodic)  # [B, 32]
        
        # Apply prosody-aware attention
        mfcc_output = self.attention(lstm_out, prosodic)  # [B, 256]
        
        # Combine features
        combined = torch.cat((mfcc_output, prosodic_output), dim=1)  # [B, 256+32]
        
        # Final classification
        output = self.classifier(combined)
        
        return output

class EmotionDataset(Dataset):
    def __init__(self, mfcc_features, prosodic_features, labels):
        self.mfcc_features = torch.FloatTensor(mfcc_features)
        self.prosodic_features = torch.FloatTensor(prosodic_features)
        self.labels = torch.LongTensor(labels)
        
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return (self.mfcc_features[idx], self.prosodic_features[idx], self.labels[idx])

def create_data_loaders(mfcc_features, prosodic_features, labels, batch_size=32):
    dataset = EmotionDataset(mfcc_features, prosodic_features, labels)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [None]:
def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    # Create progress bar for training
    pbar = tqdm(train_loader, desc="Training", leave=False)
    
    for mfcc, prosodic, labels in pbar:
        mfcc, prosodic, labels = mfcc.to(device), prosodic.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(mfcc, prosodic)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
        # Update progress bar with current loss and accuracy
        current_loss = running_loss / (pbar.n + 1)
        current_acc = 100 * correct / total
        pbar.set_postfix({
            'loss': f'{current_loss:.4f}',
            'acc': f'{current_acc:.2f}%'
        })
    
    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100 * correct / total
    return epoch_loss, epoch_acc

def validate(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    
    # Create progress bar for validation
    pbar = tqdm(val_loader, desc="Validating", leave=False)
    
    with torch.no_grad():
        for mfcc, prosodic, labels in pbar:
            mfcc, prosodic, labels = mfcc.to(device), prosodic.to(device), labels.to(device)
            outputs = model(mfcc, prosodic)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            
            # Update progress bar with current loss and accuracy
            current_loss = running_loss / (pbar.n + 1)
            current_acc = 100 * correct / total
            pbar.set_postfix({
                'loss': f'{current_loss:.4f}',
                'acc': f'{current_acc:.2f}%'
            })
    
    val_loss = running_loss / len(val_loader)
    val_acc = 100 * correct / total
    return val_loss, val_acc, all_preds, all_labels

def plot_training_history(train_losses, val_losses, train_accs, val_accs):
    plt.figure(figsize=(12, 4))
    
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(train_accs, label='Train Accuracy')
    plt.plot(val_accs, label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

def plot_confusion_matrix(y_true, y_pred, labels):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=labels, yticklabels=labels)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()

In [None]:
# Set memory management for CUDA
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:128,expandable_segments:True'

# Enable garbage collection
import gc
gc.collect()
torch.cuda.empty_cache()

# Create data loaders with smaller batch size
batch_size = 16  # Reduced from 32

# Reshape MFCC features to match the expected input shape (batch, height, width)
X_train_mfcc_reshaped = X_train_mfcc_norm.reshape(-1, 128, 345)
X_test_mfcc_reshaped = X_test_mfcc_norm.reshape(-1, 128, 345)

train_loader = create_data_loaders(X_train_mfcc_reshaped, X_train_prosodic_norm, y_train, batch_size)
val_loader = create_data_loaders(X_test_mfcc_reshaped, X_test_prosodic_norm, y_test, batch_size)

# Initialize model
mfcc_input_size = (128, 345)
prosodic_input_size = X_train_prosodic_norm.shape[1]
model = EmotionRecognitionModel(mfcc_input_size, prosodic_input_size, num_classes=len(np.unique(y_train))).to(device)

# Enable automatic mixed precision training
scaler = torch.cuda.amp.GradScaler()

# Optimizer with adjusted learning rate and gradient clipping
optimizer = optim.AdamW(model.parameters(), lr=0.0005, weight_decay=0.01, betas=(0.9, 0.999))
grad_clip_value = 1.0

# Learning rate scheduler with gradual warmup
scheduler = optim.lr_scheduler.OneCycleLR(
    optimizer,
    max_lr=0.003,
    epochs=200,
    steps_per_epoch=len(train_loader),
    pct_start=0.2,  # Spend 20% of time warming up
    anneal_strategy='cos',
    cycle_momentum=True,
    base_momentum=0.85,
    max_momentum=0.95,
    div_factor=10.0
)

# Loss function with label smoothing
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

# Training parameters
num_epochs = 200
best_val_acc = 0
train_losses = []
val_losses = []
train_accs = []
val_accs = []

# Create models directory
if not os.path.exists('models'):
    os.makedirs('models')

print("Starting training...")
print(f"MFCC input shape: {X_train_mfcc_reshaped.shape}")
print(f"Prosodic input shape: {X_train_prosodic_norm.shape}")
print(f"Number of classes: {len(np.unique(y_train))}")
print(f"Batch size: {batch_size}")
print(f"Initial learning rate: {optimizer.param_groups[0]['lr']}")
print(f"Maximum learning rate: 0.003")
print("Using OneCycleLR scheduler with cosine annealing")
print("Mixed precision training enabled")

epoch_pbar = tqdm(range(num_epochs), desc="Epochs")

for epoch in epoch_pbar:
    # Clear memory before each epoch
    gc.collect()
    torch.cuda.empty_cache()
    
    # Training phase
    model.train()
    train_loss = 0.0
    train_correct = 0
    train_total = 0
    
    # Create progress bar for training
    train_pbar = tqdm(train_loader, desc=f"Training Epoch {epoch}", leave=False)
    
    for mfcc, prosodic, labels in train_pbar:
        mfcc, prosodic, labels = mfcc.to(device), prosodic.to(device), labels.to(device)
        
        # Clear gradients
        optimizer.zero_grad()
        
        # Forward pass with mixed precision
        with torch.cuda.amp.autocast():
            outputs = model(mfcc, prosodic)
            loss = criterion(outputs, labels)
        
        # Backward pass with gradient scaling
        scaler.scale(loss).backward()
        
        # Clip gradients
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip_value)
        
        # Update weights with gradient scaling
        scaler.step(optimizer)
        scaler.update()
        
        # Update metrics
        train_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        train_total += labels.size(0)
        train_correct += (predicted == labels).sum().item()
        
        # Update progress bar
        current_loss = train_loss / (train_pbar.n + 1)
        current_acc = 100 * train_correct / train_total
        train_pbar.set_postfix({
            'loss': f'{current_loss:.4f}',
            'acc': f'{current_acc:.2f}%'
        })
        
        # Clear unnecessary tensors
        del outputs, loss
        torch.cuda.empty_cache()
    
    train_epoch_loss = train_loss / len(train_loader)
    train_epoch_acc = 100 * train_correct / train_total
    train_losses.append(train_epoch_loss)
    train_accs.append(train_epoch_acc)
    
    # Validation phase
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0
    val_preds = []
    val_labels_list = []
    
    with torch.no_grad():
        val_pbar = tqdm(val_loader, desc=f"Validating Epoch {epoch}", leave=False)
        for mfcc, prosodic, labels in val_pbar:
            mfcc, prosodic, labels = mfcc.to(device), prosodic.to(device), labels.to(device)
            
            # Forward pass with mixed precision
            with torch.cuda.amp.autocast():
                outputs = model(mfcc, prosodic)
                loss = criterion(outputs, labels)
            
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()
            
            val_preds.extend(predicted.cpu().numpy())
            val_labels_list.extend(labels.cpu().numpy())
            
            # Update progress bar
            current_loss = val_loss / (val_pbar.n + 1)
            current_acc = 100 * val_correct / val_total
            val_pbar.set_postfix({
                'loss': f'{current_loss:.4f}',
                'acc': f'{current_acc:.2f}%'
            })
            
            # Clear unnecessary tensors
            del outputs, loss
            torch.cuda.empty_cache()
    
    val_epoch_loss = val_loss / len(val_loader)
    val_epoch_acc = 100 * val_correct / val_total
    val_losses.append(val_epoch_loss)
    val_accs.append(val_epoch_acc)
    
    # Learning rate scheduling
    scheduler.step()
    current_lr = optimizer.param_groups[0]['lr']
    
    # Save best model
    if val_epoch_acc > best_val_acc:
        best_val_acc = val_epoch_acc
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict(),
            'best_val_acc': best_val_acc,
            'label_encoder': label_encoder,
            'mfcc_mean': mfcc_mean,
            'mfcc_std': mfcc_std,
            'prosodic_mean': prosodic_mean,
            'prosodic_std': prosodic_std
        }, 'models/best_emotion_model.pth')
    
    # Update progress bar
    epoch_pbar.set_postfix({
        'train_loss': f'{train_epoch_loss:.4f}',
        'train_acc': f'{train_epoch_acc:.2f}%',
        'val_loss': f'{val_epoch_loss:.4f}',
        'val_acc': f'{val_epoch_acc:.2f}%',
        'best_val_acc': f'{best_val_acc:.2f}%',
        'lr': f'{current_lr:.2e}'
    })

print("\nTraining completed!")
print(f"Best validation accuracy: {best_val_acc:.2f}%")

# Plot training history
plot_training_history(train_losses, val_losses, train_accs, val_accs)

# Get emotion labels from label encoder
emotion_labels = label_encoder.classes_
print("\nEmotion label mapping:")
for i, label in enumerate(emotion_labels):
    print(f"{i}: {label}")

# Plot confusion matrix for the best predictions
plot_confusion_matrix(val_labels_list, val_preds, emotion_labels)

# Print classification report
print("\nClassification Report:")
print(classification_report(val_labels_list, val_preds, target_names=emotion_labels))

Starting training...
MFCC input shape: (1152, 128, 345)
Prosodic input shape: (1152, 9)
Number of classes: 8
Batch size: 32
Initial learning rate: 0.0002999999999999999
Maximum learning rate: 0.003
Using OneCycleLR scheduler with cosine annealing


Epochs:   0%|          | 0/200 [00:00<?, ?it/s]

Training:   0%|          | 0/36 [00:00<?, ?it/s]

RuntimeError: mat1 and mat2 shapes cannot be multiplied (32x32 and 9x256)