# Slide‑Rail Acoustic Anomaly Detection – Advanced Models
This notebook extends the baseline implementation by experimenting with advanced models like CAE, VAE, Normalizing Flow, MAE, and Deep SVDD.

### Environment & Library Setup
This section installs or imports the libraries required for audio processing, model training, and evaluation. It ensures reproducibility by pinning key package versions where feasible. Running these commands at the outset prevents dependency issues downstream.

In [13]:

# !pip install --quiet librosa==0.10.1 torch torchvision torchaudio scikit-learn tqdm kaggle 
import sys, os, json, zipfile, shutil, math, random, warnings, itertools
from pathlib import Path
import numpy as np
import librosa, librosa.display
import torch, torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn import mixture, metrics
from tqdm import tqdm
import pandas as pd
warnings.filterwarnings('ignore')
torch.manual_seed(42)
np.random.seed(42)


### Data Acquisition
Here we load the labeled data from CSV files for supervised learning. The CSV files provide the class labels (0 for normal, 1 for anomaly) for both training and testing datasets.

In [14]:

# Load labeled data from CSV files
train_csv_path = "/Users/mymac/Study Abroad/Master Computer Science EURECOM/AML/Lab/AML-EURECOM-Group14/Challenge 2-Anomaly Detection/file_list_dev_data_train.csv"
test_csv_path = "/Users/mymac/Study Abroad/Master Computer Science EURECOM/AML/Lab/AML-EURECOM-Group14/Challenge 2-Anomaly Detection/file_list_dev_data_test.csv"

train_df = pd.read_csv(train_csv_path)
test_df = pd.read_csv(test_csv_path)

print("Training data loaded:", train_df.shape)
print("Testing data loaded:", test_df.shape)

DATA_DIR = "/Users/mymac/Study Abroad/Master Computer Science EURECOM/AML/Lab/AML-EURECOM-Group14/Challenge 2-Anomaly Detection/dataset"


Training data loaded: (2370, 4)
Testing data loaded: (1101, 4)


### Feature Extraction: Log‑Mel Spectrograms
We extract log‑Mel spectrograms for each audio file listed in the CSV files. These features serve as input to the supervised learning model.

In [15]:
SAMPLE_RATE = 16000
N_FFT, HOP, N_MELS = 1024, 512, 64

def extract_logmelspec(path, cache_dir=Path(DATA_DIR) / 'features'):
    cache_dir.mkdir(parents=True, exist_ok=True)
    cache_path = cache_dir / (path.stem + '.npy')
    if cache_path.exists():
        return np.load(cache_path)
    y, sr = librosa.load(path, sr=SAMPLE_RATE, mono=True)
    mel = librosa.feature.melspectrogram(
        y=y, sr=sr, n_fft=N_FFT, hop_length=HOP, n_mels=N_MELS, power=2
    )
    logmel = librosa.power_to_db(mel).astype(np.float32)
    np.save(cache_path, logmel)
    return logmel

def extract_features_from_csv(df, folder_path, cache_dir=Path(DATA_DIR) / 'features'):
    features = []
    labels = []
    for _, row in df.iterrows():
        file_path = Path(folder_path) / row["original_filename"]
        logmel = extract_logmelspec(file_path, cache_dir=cache_dir)
        features.append(logmel)
        labels.append(row["class"])
    return np.array(features), np.array(labels)

# Extract features and labels for training and testing
train_features, train_labels = extract_features_from_csv(train_df, 
    "/Users/mymac/Study Abroad/Master Computer Science EURECOM/AML/Lab/AML-EURECOM-Group14/Challenge 2-Anomaly Detection/dataset/dev_data/dev_data/slider/train")
test_features, test_labels = extract_features_from_csv(test_df, 
    "/Users/mymac/Study Abroad/Master Computer Science EURECOM/AML/Lab/AML-EURECOM-Group14/Challenge 2-Anomaly Detection/dataset/dev_data/dev_data/slider/test")

print("Feature extraction completed.")


Feature extraction completed.


### Torch Dataset Wrapper
We create a custom `Dataset` class to handle the supervised learning data. This class provides both features and labels for training and evaluation.

In [16]:
class SupervisedSlideRailDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        x = torch.from_numpy(self.features[idx]).unsqueeze(0)  # Add channel dimension
        y = torch.tensor(self.labels[idx], dtype=torch.long)
        return x, y

# Create datasets and data loaders
train_dataset = SupervisedSlideRailDataset(train_features, train_labels)
test_dataset = SupervisedSlideRailDataset(test_features, test_labels)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16)


### Convolutional Autoencoder (CAE) Baseline
We train a CAE for 20-30 epochs and measure the internal AUC for anomaly detection.

In [24]:
try:
    device
except NameError:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class CAE(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(16, 32, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2)
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(32, 16, kernel_size=2, stride=2), nn.ReLU(),
            nn.ConvTranspose2d(16, 1, kernel_size=2, stride=2)
        )

    def forward(self, x):
        z = self.encoder(x)
        out = self.decoder(z)
        # Crop output to match input dimensions if necessary
        if out.size(2) != x.size(2) or out.size(3) != x.size(3):
            out = out[:, :, :x.size(2), :x.size(3)]
        return out

cae_model = CAE().to(device)
opt = torch.optim.Adam(cae_model.parameters(), lr=1e-3)
crit = nn.MSELoss()

# Train CAE
for epoch in range(20):
    cae_model.train()
    train_loss = 0.0
    for x, _ in train_loader:
        x = x.to(device)
        opt.zero_grad()
        recon = cae_model(x)
        loss = crit(recon, x)
        loss.backward()
        opt.step()
        train_loss += loss.item() * x.size(0)
    train_loss /= len(train_loader.dataset)
    print(f"Epoch {epoch+1:2d} | Train Loss: {train_loss:.4f}")

# Measure AUC
cae_model.eval()
scores, labels = [], []
with torch.no_grad():
    for x, y in test_loader:
        x = x.to(device)
        recon = cae_model(x)
        loss = ((recon - x) ** 2).mean(dim=(1, 2, 3))
        scores.extend(loss.cpu().numpy())
        labels.extend(y.numpy())
auc = metrics.roc_auc_score(labels, scores)
print(f"CAE AUC: {auc:.4f}")

RuntimeError: The size of tensor a (312) must match the size of tensor b (313) at non-singleton dimension 3

### Variational Autoencoder (VAE)
We replace the reconstruction loss with a VAE loss to potentially improve AUC by 2-3%.

In [25]:
class VAE(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(16, 32, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2)
        )
        self.fc_mu = nn.Linear(32 * 16 * 16, 128)
        self.fc_logvar = nn.Linear(32 * 16 * 16, 128)
        self.decoder_fc = nn.Linear(128, 32 * 16 * 16)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(32, 16, 2, stride=2), nn.ReLU(),
            nn.ConvTranspose2d(16, 1, 2, stride=2)
        )

    def forward(self, x):
        z = self.encoder(x).view(x.size(0), -1)
        mu, logvar = self.fc_mu(z), self.fc_logvar(z)
        z_sample = mu + torch.exp(0.5 * logvar) * torch.randn_like(logvar)
        recon = self.decoder(self.decoder_fc(z_sample).view(x.size(0), 32, 16, 16))
        return recon, mu, logvar

vae_model = VAE().to(device)
opt = torch.optim.Adam(vae_model.parameters(), lr=1e-3)

# VAE Loss
def vae_loss(recon, x, mu, logvar):
    recon_loss = ((recon - x) ** 2).mean()
    kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return recon_loss + kl_div / x.size(0)

# Train VAE
for epoch in range(20):
    vae_model.train()
    train_loss = 0.0
    for x, _ in train_loader:
        x = x.to(device)
        opt.zero_grad()
        recon, mu, logvar = vae_model(x)
        loss = vae_loss(recon, x, mu, logvar)
        loss.backward()
        opt.step()
        train_loss += loss.item() * x.size(0)
    train_loss /= len(train_loader.dataset)
    print(f"Epoch {epoch+1:2d} | Train Loss: {train_loss:.4f}")

RuntimeError: mat1 and mat2 shapes cannot be multiplied (16x39936 and 8192x128)

### Normalizing Flow or Masked Autoencoder (MAE)
For advanced experiments, consider Normalizing Flow or MAE for potential performance gains.

In [None]:
# Placeholder for Normalizing Flow or MAE implementation
# These models require significant computational resources and are optional.

Validation scores computed.


### Deep SVDD
A one-class anomaly detection method using a lightweight model.

In [None]:
class DeepSVDD(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(1, 16, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(16, 32, 3, padding=1), nn.ReLU(), nn.AdaptiveAvgPool2d((1, 1))
        )

    def forward(self, x):
        return self.net(x).view(x.size(0), -1)

svdd_model = DeepSVDD().to(device)
center = torch.zeros(32).to(device)

# Train Deep SVDD
opt = torch.optim.Adam(svdd_model.parameters(), lr=1e-3)
for epoch in range(20):
    svdd_model.train()
    train_loss = 0.0
    for x, _ in train_loader:
        x = x.to(device)
        opt.zero_grad()
        z = svdd_model(x)
        loss = ((z - center) ** 2).mean()
        loss.backward()
        opt.step()
        train_loss += loss.item() * x.size(0)
    train_loss /= len(train_loader.dataset)
    print(f"Epoch {epoch+1:2d} | Train Loss: {train_loss:.4f}")