Input: Mel Spectrogram (1, 128, 256)
↓
Conv Block ×4: (Conv → BN + ReLU → MaxPool)
↓
Flatten + Positional Encoding
↓
Transformer Encoder Layer ×2
↓
Avg Pool → BN → FC → Softmax

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import os
from torch.utils.data import Dataset, DataLoader, random_split

import numpy as np
import random
import copy

from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
import time

In [2]:
def set_seed(seed=42):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)

In [3]:
class MelSpectrogramDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.samples = []
        self.transform = transform
        self.label_map = {}

        for label_idx, label in enumerate(sorted(os.listdir(root_dir))):
            label_path = os.path.join(root_dir, label, "mel_spec_tensor")
            if not os.path.isdir(label_path):
                continue

            self.label_map[label] = label_idx
            for file in os.listdir(label_path):
                if file.endswith(".pt"):
                    self.samples.append({
                        "path": os.path.join(label_path, file),
                        "label": label_idx
                    })

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = self.samples[idx]
        mel_tensor = torch.load(sample["path"])  # shape: (1, 128, 256)
        label = sample["label"]

        if self.transform:
            mel_tensor = self.transform(mel_tensor)

        return mel_tensor, label


In [4]:
def get_dataloaders(root_dir, batch_size=32, val_frac=0.1, test_frac=0.1):
    dataset = MelSpectrogramDataset(root_dir)
    total_size = len(dataset)
    test_size = int(total_size * test_frac)
    val_size = int(total_size * val_frac)
    train_size = total_size - val_size - test_size

    train_set, val_set, test_set = random_split(dataset, [train_size, val_size, test_size])
    
    return {
        "train": DataLoader(train_set, batch_size=batch_size, shuffle=True),
        "val": DataLoader(val_set, batch_size=batch_size),
        "test": DataLoader(test_set, batch_size=batch_size),
    }


In [5]:
class ConvTransformerClassifier(nn.Module):
    def __init__(self, n_classes, d_model=128, nhead=4, num_layers=2, input_shape=(1, 128, 256)):
        super(ConvTransformerClassifier, self).__init__()

        # --- CNN Feature Extractor ---
        conv_layers = []
        in_channels = input_shape[0]
        for _ in range(4):
            conv_layers += [
                nn.Conv2d(in_channels, d_model, kernel_size=3, padding=1),
                nn.BatchNorm2d(d_model),
                nn.ReLU(),
                nn.MaxPool2d(kernel_size=2)
            ]
            in_channels = d_model

        self.cnn = nn.Sequential(*conv_layers)

        # --- Flatten + Positional Encoding ---
        self.flatten = nn.Flatten(2)  # flatten spatial dims into a sequence
        self.positional_encoding = PositionalEncoding(d_model)

        # --- Transformer Encoder ---
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, batch_first=True)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # --- Classification Head ---
        self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.bn = nn.BatchNorm1d(d_model)
        self.fc = nn.Linear(d_model, n_classes)

    def forward(self, x):
        # x: (B, 1, 128, 256)
        x = self.cnn(x)  # (B, C, H', W') => e.g. (B, 128, 8, 16)
        B, C, H, W = x.shape

        x = self.flatten(x)            # (B, C, H*W)  → sequence
        x = x.permute(0, 2, 1)         # (B, seq_len, C)
        x = self.positional_encoding(x)

        x = self.transformer(x)        # (B, seq_len, C)
        x = x.permute(0, 2, 1)         # (B, C, seq_len)
        x = self.avgpool(x).squeeze(-1)  # (B, C)

        x = self.bn(x)
        x = self.fc(x)                 # (B, n_classes)
        return x

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=1024):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)  # (max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)  # (max_len, 1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-torch.log(torch.tensor(10000.0)) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)  # (1, max_len, d_model)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x: (B, seq_len, d_model)
        seq_len = x.size(1)
        return x + self.pe[:, :seq_len]


In [8]:
def train_model(model, dataloaders, device="cuda", epochs=10, lr=1e-3, weight_decay=0.0, clip_grad=True, patience=5):
    set_seed()
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)

    best_model = None
    best_val_acc = 0.0
    best_epoch = 0
    patience_counter = 0

    for epoch in range(epochs):
        print(f"\nEpoch {epoch+1}/{epochs}")

        model.train()
        train_loss, train_correct = 0.0, 0

        for X, y in dataloaders["train"]:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            outputs = model(X)
            loss = criterion(outputs, y)
            loss.backward()
            if clip_grad:
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            train_loss += loss.item() * X.size(0)
            train_correct += (outputs.argmax(1) == y).sum().item()

        train_acc = train_correct / len(dataloaders["train"].dataset)
        train_loss /= len(dataloaders["train"].dataset)

        model.eval()
        val_loss, val_correct = 0.0, 0
        with torch.no_grad():
            for X, y in dataloaders["val"]:
                X, y = X.to(device), y.to(device)
                outputs = model(X)
                loss = criterion(outputs, y)
                val_loss += loss.item() * X.size(0)
                val_correct += (outputs.argmax(1) == y).sum().item()

        val_acc = val_correct / len(dataloaders["val"].dataset)
        val_loss /= len(dataloaders["val"].dataset)
        scheduler.step(val_loss)

        print(f"Train Loss: {train_loss:.4f} | Accuracy: {train_acc:.4f}")
        print(f"Val   Loss: {val_loss:.4f} | Accuracy: {val_acc:.4f}")

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model = copy.deepcopy(model.state_dict())
            best_epoch = epoch
            patience_counter = 0
        else:
            patience_counter += 1

        if patience_counter >= patience:
            print("Early stopping triggered.")
            break

    model.load_state_dict(best_model)
    print(f"Best validation accuracy: {best_val_acc:.4f} at epoch {best_epoch+1}")
    return model, best_val_acc


In [9]:
# Suppose `model` is your CNN+Transformer hybrid
model = ConvTransformerClassifier(n_classes=2, d_model=128, nhead=4, num_layers=2)
dataloaders = get_dataloaders("/vol/bitbucket/sg2121/fypdataset/dataset/tensors", batch_size=32)
train_model(model, dataloaders, device="cuda", epochs=10)


Epoch 1/10
Train Loss: 0.6938 | Accuracy: 0.6400
Val   Loss: 0.5907 | Accuracy: 0.6548

Epoch 2/10
Train Loss: 0.4976 | Accuracy: 0.7704
Val   Loss: 0.6103 | Accuracy: 0.6667

Epoch 3/10
Train Loss: 0.4498 | Accuracy: 0.7896
Val   Loss: 0.5207 | Accuracy: 0.7500

Epoch 4/10
Train Loss: 0.3906 | Accuracy: 0.8193
Val   Loss: 0.5483 | Accuracy: 0.6905

Epoch 5/10
Train Loss: 0.3295 | Accuracy: 0.8622
Val   Loss: 1.0487 | Accuracy: 0.6071

Epoch 6/10
Train Loss: 0.3648 | Accuracy: 0.8578
Val   Loss: 0.4821 | Accuracy: 0.8333

Epoch 7/10
Train Loss: 0.3079 | Accuracy: 0.8696
Val   Loss: 0.4509 | Accuracy: 0.8452

Epoch 8/10
Train Loss: 0.2397 | Accuracy: 0.9126
Val   Loss: 0.5730 | Accuracy: 0.8214

Epoch 9/10
Train Loss: 0.1749 | Accuracy: 0.9363
Val   Loss: 0.6183 | Accuracy: 0.8690

Epoch 10/10
Train Loss: 0.1382 | Accuracy: 0.9541
Val   Loss: 0.6237 | Accuracy: 0.8333
Best validation accuracy: 0.8690 at epoch 9


(ConvTransformerClassifier(
   (cnn): Sequential(
     (0): Conv2d(1, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
     (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
     (2): ReLU()
     (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
     (4): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
     (5): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
     (6): ReLU()
     (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
     (8): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
     (9): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
     (10): ReLU()
     (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
     (12): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
     (13): BatchNorm2d(128, eps=1e-05, momentum=0.1, affin

In [10]:
def evaluate_model(model, dataloader, device="cuda"):
    model.eval()
    criterion = nn.CrossEntropyLoss()
    test_loss, test_correct = 0.0, 0
    all_preds, all_labels = [], []
    total_inference_time = 0.0

    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)

            start_time = time.time()
            outputs = model(X)
            end_time = time.time()

            inference_time = end_time - start_time
            total_inference_time += inference_time

            loss = criterion(outputs, y)
            preds = outputs.argmax(1)

            test_loss += loss.item() * X.size(0)
            test_correct += (preds == y).sum().item()

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(y.cpu().numpy())

    test_acc = test_correct / len(dataloader.dataset)
    test_loss /= len(dataloader.dataset)
    avg_inference_time = total_inference_time / len(dataloader.dataset)

    # Metrics
    precision = precision_score(all_labels, all_preds, average='binary')
    recall = recall_score(all_labels, all_preds, average='binary')
    f1 = f1_score(all_labels, all_preds, average='binary')

    # False Positive Rate (FPR = FP / (FP + TN))
    tn, fp, fn, tp = confusion_matrix(all_labels, all_preds).ravel()
    fpr = fp / (fp + tn + 1e-10)  # avoid division by zero

    print(f"Test  Loss: {test_loss:.4f} | Accuracy: {test_acc:.4f}")
    print(f"Precision: {precision:.4f} | Recall: {recall:.4f} | F1 Score: {f1:.4f}")
    print(f"False Positive Rate: {fpr:.4f}")
    print(f"Avg Inference Time per Sample: {avg_inference_time * 1000:.2f} ms")


In [11]:
evaluate_model(model, dataloaders["test"], device="cuda")

Test  Loss: 0.6677 | Accuracy: 0.7857
Precision: 0.7231 | Recall: 1.0000 | F1 Score: 0.8393
False Positive Rate: 0.4865
Avg Inference Time per Sample: 0.14 ms
