In [3]:
from tqdm import tqdm
import os
import librosa
import soundfile as sf
import pandas as pd
import numpy as np

root_path = "."

In [4]:
import pandas as pd
import os

# Define the path to the consolidated dataset
ravdess_path = os.path.join(root_path, "audio_speech_actors_01-24")

# Function to parse metadata from filenames
def parse_ravdess_metadata(file_path):
    filename = os.path.basename(file_path)
    parts = filename.split(".")[0].split("-")
    return {
        "file_path": file_path,
        "modality": int(parts[0]),  # 01 = full-AV, 02 = video-only, 03 = audio-only
        "vocal_channel": int(parts[1]),  # 01 = speech, 02 = song
        "emotion": int(parts[2]),  # Emotion (01 to 08)
        "intensity": int(parts[3]),  # Intensity (01 = normal, 02 = strong)
        "statement": int(parts[4]),  # Statement (01 = "Kids...", 02 = "Dogs...")
        "repetition": int(parts[5]),  # Repetition (01 = 1st, 02 = 2nd)
        "actor": int(parts[6]),  # Actor ID (01 to 24)
        "gender": "female" if int(parts[6]) % 2 == 0 else "male",  # Gender based on Actor ID
    }

# Iterate over all actor folders and parse metadata
def create_ravdess_annotations(base_path):
    rows = []
    for root, _, files in os.walk(base_path):
        for file in files:
            if file.endswith(".wav"):
                file_path = os.path.join(root, file)
                metadata = parse_ravdess_metadata(file_path)
                rows.append(metadata)
    return pd.DataFrame(rows)

# Generate annotations
ravdess_annotations = create_ravdess_annotations(ravdess_path)

# Save annotations to CSV
output_csv = "ravdess_annotations.csv"
ravdess_annotations.to_csv(output_csv, index=False)
print(f"Annotations saved to {output_csv}")

# Display the first few rows
print(ravdess_annotations.head())


Annotations saved to ravdess_annotations.csv
                                           file_path  modality  vocal_channel  \
0  ./audio_speech_actors_01-24/Actor_16/03-01-05-...         3              1   
1  ./audio_speech_actors_01-24/Actor_16/03-01-06-...         3              1   
2  ./audio_speech_actors_01-24/Actor_16/03-01-06-...         3              1   
3  ./audio_speech_actors_01-24/Actor_16/03-01-05-...         3              1   
4  ./audio_speech_actors_01-24/Actor_16/03-01-07-...         3              1   

   emotion  intensity  statement  repetition  actor  gender  
0        5          1          2           1     16  female  
1        6          1          2           2     16  female  
2        6          2          1           2     16  female  
3        5          2          1           1     16  female  
4        7          1          1           1     16  female  


In [5]:
annotations_csv = "ravdess_annotations.csv"
ravdess_annotations = pd.read_csv(annotations_csv)

# Count the number of files per emotion
print("Emotion distribution:")
print(ravdess_annotations["emotion"].value_counts())

# Count the number of files per intensity level
print("Intensity distribution:")
print(ravdess_annotations["intensity"].value_counts())

# Count the number of files per gender
print("Gender distribution:")
print(ravdess_annotations["gender"].value_counts())


Emotion distribution:
emotion
5    192
6    192
7    192
4    192
8    192
3    192
2    192
1     96
Name: count, dtype: int64
Intensity distribution:
intensity
1    768
2    672
Name: count, dtype: int64
Gender distribution:
gender
female    720
male      720
Name: count, dtype: int64


In [6]:
# Define paths
normalized_path = "normalized_audio"
os.makedirs(normalized_path, exist_ok=True)
# Define function to normalize audio
def normalize_audio(file_path, output_path):
    try:
        y, sr = librosa.load(file_path, sr=None)  # Load audio
        y_normalized = librosa.util.normalize(y)  # Normalize amplitude
        sf.write(output_path, y_normalized, sr)  # Save normalized audio
    except Exception as e:
        print(f"Error processing {file_path}: {e}")

# Normalize all audio files with a progress bar
print("Starting audio normalization...")
for _, row in tqdm(ravdess_annotations.iterrows(), total=len(ravdess_annotations), desc="Normalizing audio"):
    src = row["file_path"]
    dest = os.path.join(normalized_path, os.path.basename(src))
    normalize_audio(src, dest)

print("Audio normalization completed.")

Starting audio normalization...


Normalizing audio: 100%|██████████| 1440/1440 [00:08<00:00, 161.07it/s]

Audio normalization completed.





In [7]:
def resample_audio(file_path, output_path, target_sr=16000):
    y, sr = librosa.load(file_path, sr=None)
    y_resampled = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
    sf.write(output_path, y_resampled, target_sr)

# Create a directory for resampled audio
resampled_path = os.path.join(root_path, "resampled_audio")
os.makedirs(resampled_path, exist_ok=True)

# Resample all audio files
for _, row in ravdess_annotations.iterrows():
    src = row["file_path"]
    dest = os.path.join(resampled_path, os.path.basename(src))
    resample_audio(src, dest)


In [8]:
def extract_mel_spectrogram(file_path, n_mels=128, max_length=150):
    """
    Extract Mel Spectrogram and ensure consistent time dimension.
    """
    try:
        y, sr = librosa.load(file_path, sr=16000)  # Load audio
        mel_spec = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=n_mels)  # Extract Mel Spectrogram
        mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)  # Convert to decibel scale

        # Pad or truncate to ensure consistent time dimension
        if mel_spec_db.shape[1] < max_length:
            # Pad with zeros
            mel_spec_db = np.pad(mel_spec_db, ((0, 0), (0, max_length - mel_spec_db.shape[1])), mode='constant')
        else:
            # Truncate
            mel_spec_db = mel_spec_db[:, :max_length]

        return mel_spec_db
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None


# Example: Extract Mel Spectrogram for the first file
example_file = ravdess_annotations["file_path"].iloc[0]
mel_spec = extract_mel_spectrogram(example_file)
print("Mel Spectrogram shape:", mel_spec.shape)

Mel Spectrogram shape: (128, 150)


In [9]:
from torch.utils.data import Dataset, DataLoader

from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    features, labels = zip(*batch)
    features = [torch.tensor(f, dtype=torch.float32) for f in features]
    labels = torch.tensor(labels, dtype=torch.long)

    # Pad features to the same size
    features_padded = pad_sequence(features, batch_first=True, padding_value=0)
    return features_padded, labels


# Dataset class
class RAVDESSDataset(Dataset):
    def __init__(self, annotations, feature_extraction_fn):
        self.annotations = annotations
        self.feature_extraction_fn = feature_extraction_fn

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        row = self.annotations.iloc[idx]
        file_path = row["file_path"]
        label = row["emotion"] - 1  # Adjust labels to start from 0
        features = self.feature_extraction_fn(file_path)
        return features, label


# Instantiate the dataset
dataset = RAVDESSDataset(annotations=ravdess_annotations, feature_extraction_fn=extract_mel_spectrogram)

# Create the DataLoader
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Iterate through the DataLoader
for features, labels in dataloader:
    print(f"Features shape: {features.shape}, Labels shape: {labels.shape}")
    break

print(ravdess_annotations["emotion"].value_counts())

Features shape: torch.Size([32, 128, 150]), Labels shape: torch.Size([32])
emotion
5    192
6    192
7    192
4    192
8    192
3    192
2    192
1     96
Name: count, dtype: int64


In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import random_split, DataLoader
from tqdm import tqdm

# Define the RNN Model
class EmotionRNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(EmotionRNN, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=3, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.dropout = nn.Dropout(p=0.3)
        self.fc = nn.Linear(hidden_size * 2, num_classes)

    def forward(self, x):
        # x: (batch_size, input_size, time_steps)
        x = x.permute(0, 2, 1)  # Rearrange to (batch_size, time_steps, input_size)
        _, (hidden, _) = self.lstm(x)  # hidden: (num_layers * 2, batch_size, hidden_size)
        hidden_concat = torch.cat((hidden[-2], hidden[-1]), dim=1)  # Concatenate forward and backward
        out = self.fc(self.dropout(hidden_concat))  # Apply dropout before the fully connected layer
        return out


# Hyperparameters
input_size = 128  # Mel bands
hidden_size = 128  # LSTM hidden layer size
num_classes = len(ravdess_annotations["emotion"].unique())  # Number of emotion classes
num_epochs = 10
batch_size = 32
learning_rate = 0.001

# Instantiate the model
model = EmotionRNN(input_size, hidden_size, num_classes)

# Loss and optimizer
class_counts = ravdess_annotations["emotion"].value_counts()
class_weights = 1.0 / class_counts
class_weights = class_weights / class_weights.sum()

criterion = nn.CrossEntropyLoss(weight=torch.tensor(class_weights.values, dtype=torch.float32))
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Split the dataset into train and test sets
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# Training loop
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for features, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        # Move data to device (if using GPU)
        features, labels = features, labels

        # Forward pass
        outputs = model(features)
        loss = criterion(outputs, labels)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_loader):.4f}")

# Evaluation
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for features, labels in test_loader:
        outputs = model(features)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Test Accuracy: {100 * correct / total:.2f}%")

from sklearn.metrics import confusion_matrix, classification_report

y_true = []
y_pred = []

with torch.no_grad():
    for features, labels in test_loader:
        outputs = model(features)
        _, predicted = torch.max(outputs, 1)
        y_true.extend(labels.tolist())
        y_pred.extend(predicted.tolist())

print(confusion_matrix(y_true, y_pred))
print(classification_report(y_true, y_pred))

Epoch 1/10: 100%|██████████| 36/36 [00:20<00:00,  1.78it/s]


Epoch [1/10], Loss: 2.0053


Epoch 2/10: 100%|██████████| 36/36 [00:22<00:00,  1.60it/s]


Epoch [2/10], Loss: 1.9179


Epoch 3/10: 100%|██████████| 36/36 [00:14<00:00,  2.49it/s]


Epoch [3/10], Loss: 1.9078


Epoch 4/10: 100%|██████████| 36/36 [00:14<00:00,  2.57it/s]


Epoch [4/10], Loss: 1.8996


Epoch 5/10: 100%|██████████| 36/36 [00:14<00:00,  2.55it/s]


Epoch [5/10], Loss: 1.8852


Epoch 6/10: 100%|██████████| 36/36 [00:15<00:00,  2.38it/s]


Epoch [6/10], Loss: 1.9086


Epoch 7/10: 100%|██████████| 36/36 [00:15<00:00,  2.32it/s]


Epoch [7/10], Loss: 1.8616


Epoch 8/10: 100%|██████████| 36/36 [00:23<00:00,  1.51it/s]


Epoch [8/10], Loss: 1.8506


Epoch 9/10: 100%|██████████| 36/36 [00:17<00:00,  2.07it/s]


Epoch [9/10], Loss: 1.8348


Epoch 10/10: 100%|██████████| 36/36 [00:19<00:00,  1.84it/s]


Epoch [10/10], Loss: 1.8338
Test Accuracy: 21.88%
[[ 0  2  0  0  0  0  0 22]
 [ 0  5  0  0  0  0  9 14]
 [ 0  4  0  0  2  0  2 29]
 [ 0  6  0  0  4  0  3 33]
 [ 0  2  0  0 12  0  5 16]
 [ 0  2  0  0  5  0  3 34]
 [ 0  4  0  0  6  0 12 16]
 [ 0  1  0  0  0  0  1 34]]
              precision    recall  f1-score   support

           0       0.00      0.00      0.00        24
           1       0.19      0.18      0.19        28
           2       0.00      0.00      0.00        37
           3       0.00      0.00      0.00        46
           4       0.41      0.34      0.38        35
           5       0.00      0.00      0.00        44
           6       0.34      0.32      0.33        38
           7       0.17      0.94      0.29        36

    accuracy                           0.22       288
   macro avg       0.14      0.22      0.15       288
weighted avg       0.14      0.22      0.14       288



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
