In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import librosa as lb
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# Load the annotated data
annotated_data = pd.read_csv('/Users/rachelwang/Downloads/notes/models/csv/quality_labeled.csv')

# Split the data into train, validation, and test sets. 60%, 20%, 20%
Xtemp, Xtest, ytemp, ytest = train_test_split(
    annotated_data, annotated_data.quality, stratify=annotated_data.quality, random_state=42, test_size=0.20)
Xtrain, Xval, ytrain, yval = train_test_split(
    Xtemp, ytemp, stratify=ytemp, random_state=42, test_size=0.25)

In [12]:
# Show the numbers for each set
print(f"Training set size: {len(Xtrain)}")
print(f"Validation set size: {len(Xval)}")
print(f"Test set size: {len(Xtest)}")

# To also show the distribution of quality labels in each set, convert arrays to Series
print("\nTraining set label distribution:")
print(pd.Series(ytrain).value_counts())

print("\nValidation set label distribution:")
print(pd.Series(yval).value_counts())

print("\nTest set label distribution:")
print(pd.Series(ytest).value_counts())

Training set size: 479
Validation set size: 160
Test set size: 160

Training set label distribution:
4    367
3     75
2     16
1     11
0     10
Name: count, dtype: int64

Validation set label distribution:
4    122
3     25
2      6
0      4
1      3
Name: count, dtype: int64

Test set label distribution:
4    123
3     25
2      5
1      4
0      3
Name: count, dtype: int64


In [2]:
# Encode labels
le = LabelEncoder()
ytrain = le.fit_transform(ytrain)
yval = le.transform(yval)
ytest = le.transform(ytest)

In [3]:
# Feature extraction function
def pad_or_truncate(feature, max_len):
    if feature.shape[1] < max_len:
        pad_width = max_len - feature.shape[1]
        feature = np.pad(feature, ((0, 0), (0, pad_width)), mode='constant')
    else:
        feature = feature[:, :max_len]
    return feature

def getFeatures(path, max_len=2368):
    soundArr, sample_rate = lb.load(path)
    mfcc = lb.feature.mfcc(y=soundArr, sr=sample_rate)
    mfcc = pad_or_truncate(mfcc, max_len)
    return mfcc

In [4]:
# Custom dataset class
class AudioDataset(Dataset):
    def __init__(self, df):
        self.df = df
        self.features = []
        self.labels = []

        for idx, row in df.iterrows():
            path = row['file']
            mfcc = getFeatures(path)
            self.features.append(mfcc)
            self.labels.append(row['quality'])

        self.labels = le.transform(self.labels)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

train_dataset = AudioDataset(Xtrain)
val_dataset = AudioDataset(Xval)
test_dataset = AudioDataset(Xtest)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [15]:
from torchsummary import summary
# Define the model
class MFCCNet(nn.Module):
    def __init__(self):
        super(MFCCNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=(5, 5), stride=(1, 3), padding=2)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(2)
        
        self.conv2 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 2), padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(2)
        
        self.conv3 = nn.Conv2d(64, 96, kernel_size=(2, 2), padding=1)
        self.bn3 = nn.BatchNorm2d(96)
        self.pool3 = nn.MaxPool2d(2)
        
        self.conv4 = nn.Conv2d(96, 128, kernel_size=(2, 2), padding=1)
        self.bn4 = nn.BatchNorm2d(128)
        self.gmp = nn.AdaptiveMaxPool2d((1, 1))
        
        self.fc1 = nn.Linear(128, 50)
        self.fc2 = nn.Linear(50, 25)
        self.fc3 = nn.Linear(25, 5)
        self.dropout = nn.Dropout(0.3)
        
    def forward(self, x):
        x = self.pool1(nn.ReLU()(self.bn1(self.conv1(x))))
        x = self.pool2(nn.ReLU()(self.bn2(self.conv2(x))))
        x = self.pool3(nn.ReLU()(self.bn3(self.conv3(x))))
        x = self.gmp(nn.ReLU()(self.bn4(self.conv4(x))))
        x = x.view(x.size(0), -1)
        x = self.dropout(nn.ReLU()(self.fc1(x)))
        x = self.dropout(nn.ReLU()(self.fc2(x)))
        x = self.fc3(x)
        return x

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MFCCNet().to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

summary(model, input_size=(1, 128, 862))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 32, 128, 288]             832
       BatchNorm2d-2         [-1, 32, 128, 288]              64
         MaxPool2d-3          [-1, 32, 64, 144]               0
            Conv2d-4           [-1, 64, 64, 72]          18,496
       BatchNorm2d-5           [-1, 64, 64, 72]             128
         MaxPool2d-6           [-1, 64, 32, 36]               0
            Conv2d-7           [-1, 96, 33, 37]          24,672
       BatchNorm2d-8           [-1, 96, 33, 37]             192
         MaxPool2d-9           [-1, 96, 16, 18]               0
           Conv2d-10          [-1, 128, 17, 19]          49,280
      BatchNorm2d-11          [-1, 128, 17, 19]             256
AdaptiveMaxPool2d-12            [-1, 128, 1, 1]               0
           Linear-13                   [-1, 50]           6,450
          Dropout-14                   

In [6]:
# Early stopping parameters
early_stop_patience = 10
early_stop_counter = 0
best_val_loss = float('inf')

# File path for saving the best model
best_model_path = 'best_model_updated.pth'

In [8]:
# Training loop
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for features, labels in train_loader:
        mfcc = torch.tensor(features).unsqueeze(1).float().to(device)
        labels = torch.tensor(labels).to(device)
        # print(f'labels in the training: {labels}')
        optimizer.zero_grad()
        outputs = model(mfcc)
        # print(f'output in the training: {outputs}')
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = 100 * correct / total
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%")

    # Validation
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for features, labels in val_loader:
            mfcc = torch.tensor(features).unsqueeze(1).float().to(device)
            labels = torch.tensor(labels).to(device)

            outputs = model(mfcc)
            loss = criterion(outputs, labels)

            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    val_loss /= len(val_loader)
    val_accuracy = 100 * val_correct / val_total

    print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")

    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        early_stop_counter = 0
        torch.save(model.state_dict(), best_model_path)
    else:
        early_stop_counter += 1
        if early_stop_counter >= early_stop_patience:
            print("Early stopping triggered.")
            break

print("Training complete.")
print(f"Best model saved at: {best_model_path}")

  mfcc = torch.tensor(features).unsqueeze(1).float().to(device)
  labels = torch.tensor(labels).to(device)


Epoch [1/100], Loss: 0.5373, Accuracy: 79.12%


  mfcc = torch.tensor(features).unsqueeze(1).float().to(device)
  labels = torch.tensor(labels).to(device)


Validation Loss: 0.8800, Validation Accuracy: 76.25%
Epoch [2/100], Loss: 0.5439, Accuracy: 78.08%
Validation Loss: 0.6172, Validation Accuracy: 83.12%
Epoch [3/100], Loss: 0.5113, Accuracy: 78.71%
Validation Loss: 0.9106, Validation Accuracy: 76.88%
Epoch [4/100], Loss: 0.5271, Accuracy: 79.12%
Validation Loss: 0.7136, Validation Accuracy: 75.62%
Epoch [5/100], Loss: 0.5141, Accuracy: 78.91%
Validation Loss: 0.6004, Validation Accuracy: 78.75%
Epoch [6/100], Loss: 0.5197, Accuracy: 79.12%
Validation Loss: 0.7842, Validation Accuracy: 77.50%
Epoch [7/100], Loss: 0.4442, Accuracy: 80.79%
Validation Loss: 0.6276, Validation Accuracy: 78.75%
Epoch [8/100], Loss: 0.4673, Accuracy: 78.91%
Validation Loss: 1.0288, Validation Accuracy: 76.88%
Epoch [9/100], Loss: 0.4933, Accuracy: 80.79%
Validation Loss: 0.5857, Validation Accuracy: 83.12%
Epoch [10/100], Loss: 0.4263, Accuracy: 83.09%
Validation Loss: 0.6098, Validation Accuracy: 81.25%
Epoch [11/100], Loss: 0.4256, Accuracy: 82.46%
Validati

In [10]:
# Load the best model for testing
model.load_state_dict(torch.load(best_model_path))
model.eval()

# Test evaluation
test_loss = 0.0
test_correct = 0
test_total = 0

with torch.no_grad():
    for features, labels in test_loader:
        mfcc = torch.tensor(features).unsqueeze(1).float().to(device)
        labels = torch.tensor(labels).to(device)

        outputs = model(mfcc)
        loss = criterion(outputs, labels)

        test_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        test_total += labels.size(0)
        test_correct += (predicted == labels).sum().item()

test_loss /= len(test_loader)
test_accuracy = 100 * test_correct / test_total

print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%")

  mfcc = torch.tensor(features).unsqueeze(1).float().to(device)
  labels = torch.tensor(labels).to(device)


Test Loss: 0.5566, Test Accuracy: 80.62%
