In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchaudio
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import os
from pathlib import Path
from dataclasses import dataclass

from shared import accent_mapping, dataset_path, SharedConfig

from torch.utils.data import Dataset, DataLoader
from transformers import (
    HubertModel, 
    HubertForSequenceClassification, 
    Trainer, 
    TrainingArguments, 
    AutoFeatureExtractor
)

from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from typing import Dict, List, Optional, Tuple, Union

2025-05-11 19:57:15.836573: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-05-11 19:57:15.884388: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

2025-05-11 19:57:21.869282: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


In [2]:
shared_cfg = SharedConfig()

In [3]:
N_MFCC = 40
WIN_LENGTH = int(0.025 * shared_cfg.sample_rate)  # 25 ms window
HOP_LENGTH = int(0.010 * shared_cfg.sample_rate)  # 10 ms stride

NUM_FRAMES = 1 + (shared_cfg.max_length - WIN_LENGTH) // HOP_LENGTH

In [4]:
@dataclass
class DataCollatorForAccentClassification:
    # def __call__(self, features: List[Dict[str, torch.Tensor]]) -> Dict[str, torch.Tensor]:
    def __call__(self, features: List[Tuple[torch.Tensor, str]]) -> Dict[str, torch.Tensor]:
        # Extract MFCCs and labels
        mfccs = [f[0] for f in features]  # Each item is (mfcc, label)
        labels = torch.tensor([f[1] for f in features], dtype=torch.long)

        # Find max time length in batch
        max_len = max(mfcc.shape[-1] for mfcc in mfccs)

        # Pad all MFCCs to max_len along time dimension
        padded_mfccs = []
        for mfcc in mfccs:
            if mfcc.shape[-1] < max_len:
                pad_width = max_len - mfcc.shape[-1]
                mfcc = F.pad(mfcc, (0, pad_width))  # pad last dim only
            padded_mfccs.append(mfcc)

        batch_mfcc = torch.stack(padded_mfccs)  # Shape: [batch, 1, 40, max_len]
        return {"input_values": batch_mfcc, "labels": labels}

In [5]:
class AccentDataset(Dataset):
    def __init__(self, csv_file, split='train', max_length=160000, transform=None):
        """
        Args:
            csv_file (str): Path to the full CSV file with file_path and accent columns.
            split (str): One of 'train', 'val', or 'test'.
            transform (callable, optional): MFCC transform to apply to audio.
        """
        self.transform = transform
        self.data = pd.read_csv(csv_file)

        # Filter rows by split
        self.data = self.data[self.data['split'] == split].reset_index(drop=True)

        # Store file paths and labels
        self.file_paths = [Path(fp) for fp in self.data['file_path']]
        self.labels = [accent_mapping[label] for label in self.data['accent']]

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        waveform, sample_rate = torchaudio.load(self.file_paths[idx])

        if waveform.shape[0] > 1:
            waveform = torch.mean(waveform, dim=0, keepdim=True)
        
        # Resample if necessary
        if sample_rate != shared_cfg.sample_rate:
            resampler = torchaudio.transforms.Resample(sample_rate, cfg.sample_rate)
            waveform = resampler(waveform)
        
        # Ensure waveform is 1D
        waveform = waveform.squeeze()
        
        mfcc = self.transform(waveform)  # shape: [1, 40, time]
        if mfcc.dim() == 2:  # shape: [40, T]
            mfcc = mfcc.unsqueeze(0)  # make it [1, 40, T]


        label = self.labels[idx]
        return mfcc, label

In [6]:
mfcc_transform = torchaudio.transforms.MFCC(
    sample_rate=16000,
    n_mfcc=40,
    melkwargs={
        'n_fft': 400,
        'hop_length': 160,
        'n_mels': 40
    }
)

In [7]:
type(mfcc_transform)

torchaudio.transforms._transforms.MFCC

In [8]:
# class AccentCNN(nn.Module):
#     def __init__(self, num_classes=21):
#         super(AccentCNN, self).__init__()
#         self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding=1)
#         self.pool = nn.MaxPool2d(2, 2)
#         self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
#         self.fc1 = nn.Linear(32 * 10 * 12, 128)
#         self.fc2 = nn.Linear(128, num_classes)

#     def forward(self, x):
#         x = self.pool(F.relu(self.conv1(x)))  # [B, 16, H/2, W/2]
#         x = self.pool(F.relu(self.conv2(x)))  # [B, 32, H/4, W/4]
#         x = x.view(x.size(0), -1)             # flatten
#         x = F.relu(self.fc1(x))
#         x = self.fc2(x)
#         return x

class AccentCNN(nn.Module):
    def __init__(self, num_mfcc, num_classes):
        super().__init__()
        # super(AudioCNN, self).__init__()

        self.conv1 = nn.Conv2d(1, 16, kernel_size=(3, 3), padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.pool1 = nn.MaxPool2d((2, 2))

        self.conv2 = nn.Conv2d(16, 32, kernel_size=(3, 3), padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.pool2 = nn.MaxPool2d((2, 2))

        self.conv3 = nn.Conv2d(32, 64, kernel_size=(3, 3), padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.pool3 = nn.AdaptiveMaxPool2d((1, 1))  # Global pooling

        self.fc = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        x = self.pool3(F.relu(self.bn3(self.conv3(x))))

        x = x.view(x.size(0), -1)  # Flatten
        x = self.fc(x)
        return x

In [9]:
def train_model(model, train_loader, val_loader=None, num_epochs=10, learning_rate=0.001, device=None):
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for batch in train_loader:
            inputs = batch["input_values"].to(device)  # shape: [B, 1, 40, T]
            labels = batch["labels"].to(device)

            optimizer.zero_grad()

            assert inputs.ndim == 4, f"Expected input shape [B, 1, 40, T], got {inputs.shape}"
            assert labels.ndim == 1, f"Expected label shape [B], got {labels.shape}"
            
            outputs = model(inputs)  # shape: [B, num_classes]
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        avg_loss = running_loss / total
        accuracy = 100. * correct / total
        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%")

        if val_loader:
            evaluate_model(model, val_loader, device)


In [10]:
def evaluate_model(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in data_loader:
            inputs = batch["input_values"].to(device)
            labels = batch["labels"].to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    
    accuracy = 100. * correct / total
    print(f"Validation Accuracy: {accuracy:.2f}%")


In [11]:
mfcc_transform = torchaudio.transforms.MFCC(
    sample_rate=16000,
    n_mfcc=40,
    melkwargs={
        'n_fft': 400,
        'hop_length': 160,
        'n_mels': 40
    }
)

In [13]:
collator = DataCollatorForAccentClassification()


train_dataset = AccentDataset(csv_file="./out-min-n-20-max-t-60-augmented/accent_data.csv", split="train", transform=mfcc_transform, max_length=shared_cfg.max_length)
val_dataset = AccentDataset(csv_file="./out-min-n-20-max-t-60-augmented/accent_data.csv", split="val", transform=mfcc_transform, max_length=shared_cfg.max_length)
test_dataset = AccentDataset(csv_file="./out-min-n-20-max-t-60-augmented/accent_data.csv", split="test", transform=mfcc_transform, max_length=shared_cfg.max_length)

from torch.utils.data import DataLoader
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collator)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, collate_fn=collator)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, collate_fn=collator)

In [15]:
model = AccentCNN(num_mfcc=N_MFCC, num_classes=len(accent_mapping))
train_model(model, train_loader)

Epoch 1/10 - Loss: 2.3867, Accuracy: 40.61%
Epoch 2/10 - Loss: 1.6498, Accuracy: 58.98%
Epoch 3/10 - Loss: 1.5281, Accuracy: 61.16%
Epoch 4/10 - Loss: 1.3740, Accuracy: 62.11%
Epoch 5/10 - Loss: 1.2971, Accuracy: 63.88%
Epoch 6/10 - Loss: 1.2240, Accuracy: 64.49%
Epoch 7/10 - Loss: 1.1789, Accuracy: 65.37%
Epoch 8/10 - Loss: 1.1137, Accuracy: 68.23%
Epoch 9/10 - Loss: 1.0668, Accuracy: 68.78%
Epoch 10/10 - Loss: 0.9860, Accuracy: 70.14%


In [16]:
def evaluate_model(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in data_loader:
            inputs = batch["input_values"].to(device)
            labels = batch["labels"].to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)
            correct += predicted.eq(labels).sum().item()
            total += labels.size(0)
    acc = 100. * correct / total
    print(f"Validation Accuracy: {acc:.2f}%")


In [17]:
evaluate_model(model, test_loader, "cpu")

Validation Accuracy: 59.68%
