In [None]:
# import sys
# sys.path.append('/kaggle/input/project-541/project')

import os
import random
from typing import List

import numpy as np
import pandas as pd
import librosa
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from augment import WaveformAugment

In [None]:
# configurations 

DATA_DIR = "./data/UrbanSound8K" # dataset path
MODEL_DIR = "./model/model-d" # store trained model and figure (path)
os.makedirs(MODEL_DIR, exist_ok=True)

SAMPLE_RATE = 22050 # sample rate
CLIP_DURATION = 4.0 # length of clip
N_CLASSES = 10

BATCH_SIZE = 16 # batch size
NUM_EPOCHS = 20 # epoch
LEARNING_RATE = 1e-4

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", DEVICE)

Using device: cuda


In [3]:
# Dataset construct

class UrbanSound8KWaveformDataset(Dataset):
    def __init__(
        self,
        data_dir: str,
        folds: List[int],
        sample_rate: int = SAMPLE_RATE,
        duration: float = CLIP_DURATION,
        waveform_augment: WaveformAugment = None,
    ):
        super().__init__()
        self.data_dir = data_dir
        self.folds = folds if isinstance(folds, list) else [folds]
        self.sample_rate = sample_rate
        self.duration = duration
        self.n_samples = int(sample_rate * duration)
        self.waveform_augment = waveform_augment

        # 读取 metadata
        meta_path = os.path.join(data_dir, "metadata", "UrbanSound8K.csv")
        self.df = pd.read_csv(meta_path)
        self.df = self.df[self.df["fold"].isin(self.folds)].reset_index(drop=True)
        self.labels = self.df["classID"].to_numpy(dtype=np.int64)

    def __len__(self):
        return len(self.df)

    def _load_waveform(self, index: int) -> np.ndarray:
        row = self.df.iloc[index]
        fold = row["fold"]
        filename = row["slice_file_name"]
        file_path = os.path.join(self.data_dir, "audio", f"fold{fold}", filename)

        # resample
        wav, sr = librosa.load(file_path, sr=self.sample_rate, mono=True)

        # pad / truncate to 4 seconds
        if len(wav) < self.n_samples:
            pad_width = self.n_samples - len(wav)
            wav = np.pad(wav, (0, pad_width), mode="constant")
        elif len(wav) > self.n_samples:
            wav = wav[: self.n_samples]

        # normalization
        if np.std(wav) > 1e-6:
            wav = (wav - np.mean(wav)) / np.std(wav)

        return wav.astype(np.float32)

    def __getitem__(self, index: int):
        label = int(self.labels[index])
        wav_np = self._load_waveform(index)
        wav = torch.from_numpy(wav_np).unsqueeze(0) # [1, T] (channels, samples)

        # data augmentation (for train set)
        if self.waveform_augment is not None:
            wav = self.waveform_augment(wav)

        return wav, torch.tensor(label, dtype=torch.long) # return waveform [1, T] and label

In [4]:
# VGG-13 1D model

class VGG13_1D(nn.Module):
    """
    [64,64,'M', 128,128,'M', 256,256,'M', 512,512,'M', 512,512,'M']
    """
    def __init__(self, num_classes: int = N_CLASSES, in_channels: int = 1):
        super().__init__()

        cfg = [64, 64, 'M',
               128, 128, 'M',
               256, 256, 'M',
               512, 512, 'M',
               512, 512, 'M']

        self.features = self.make_layers(cfg, in_channels)
        # Global Average Pooling and FC
        self.gap = nn.AdaptiveAvgPool1d(1) # used for match the shape
        self.classifier = nn.Linear(512, num_classes)

    def make_layers(self, cfg, in_channels): # construct layer
        layers = []
        for v in cfg:
            if v == 'M':
                layers += [nn.MaxPool1d(kernel_size=2, stride=2)]
            else:
                layers += [
                    nn.Conv1d(in_channels, v, kernel_size=3, padding=1),
                    nn.BatchNorm1d(v),
                    nn.ReLU(inplace=True),
                ]
                in_channels = v
        return nn.Sequential(*layers)

    def forward(self, x):
        # x: [B, 1, T]
        x = self.features(x)
        x = self.gap(x)
        x = x.squeeze(-1) # [B, 512]
        x = self.classifier(x) # [B, num_classes]
        return x

In [5]:
# Functions for train, validate and plot

def set_seed(seed: int = 38):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)


def train_one_epoch(model, loader, criterion, optimizer, device: torch.device): # train one epoch
    model.train()
    running_loss, correct, total = 0.0, 0, 0

    for x, y in loader:
        x = x.to(device)
        y = y.to(device)

        optimizer.zero_grad() # zero gradient
        logits = model(x) # forward
        loss = criterion(logits, y) # loss
        loss.backward() # backward
        optimizer.step() # update

        running_loss += loss.item() * x.size(0)
        _, preds = torch.max(logits, dim=1)
        correct += (preds == y).sum().item()
        total += y.size(0)

    epoch_loss = running_loss / total
    epoch_acc = correct / total
    return epoch_loss, epoch_acc


@torch.no_grad() # evaluate
def evaluate(model, loader, criterion, device: torch.device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    for x, y in loader:
        x = x.to(device)
        y = y.to(device)

        logits = model(x)
        loss = criterion(logits, y)

        running_loss += loss.item() * x.size(0)
        _, preds = torch.max(logits, dim=1)
        correct += (preds == y).sum().item()
        total += y.size(0)

    epoch_loss = running_loss / total
    epoch_acc = correct / total
    return epoch_loss, epoch_acc


def plot_learning_curve(history, fold_id: int, save_dir: str = MODEL_DIR): # plot
    """
    history: dict with keys 'train_loss', 'val_loss', 'train_acc', 'val_acc'
    """
    epochs = range(1, len(history["train_loss"]) + 1)

    plt.figure(figsize=(10, 4))

    # loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs, history["train_loss"], label="Train Loss")
    plt.plot(epochs, history["val_loss"], label="Val Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title(f"d - Fold {fold_id} - Loss")
    plt.legend()

    # accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, history["train_acc"], label="Train Acc")
    plt.plot(epochs, history["val_acc"], label="Val Acc")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.title(f"d - Fold {fold_id} - Accuracy")
    plt.legend()

    plt.tight_layout()
    save_path = os.path.join(save_dir, f"vgg13_d_fold{fold_id}_curve.png")
    plt.savefig(save_path, dpi=150)
    plt.close()
    print(f"Saved learning curve to: {save_path}")

In [6]:
# 10-fold Cross-Validation train

set_seed(38)

all_folds = list(range(1, 11))

for test_fold in all_folds:
    print("=" * 80)
    print(f"Fold {test_fold} as TEST fold")
    print("=" * 80)

    # current fold as test set, remaining 9 folds as train set and val set
    trainval_folds = [f for f in all_folds if f != test_fold]

    # last fold of remaining 9 set as val set，remaining 8 folds as train set
    val_fold = trainval_folds[-1]
    train_folds = trainval_folds[:-1]

    print(f"Train folds: {train_folds}")
    print(f"Val fold: {val_fold}")
    print(f"Test fold: {test_fold}")

    # augmentation
    wave_aug = WaveformAugment(sample_rate=SAMPLE_RATE)

    train_dataset = UrbanSound8KWaveformDataset( # train set
        data_dir=DATA_DIR,
        folds=train_folds,
        sample_rate=SAMPLE_RATE,
        duration=CLIP_DURATION,
        waveform_augment=wave_aug # do augmentation
    )

    val_dataset = UrbanSound8KWaveformDataset( # val set
        data_dir=DATA_DIR,
        folds=[val_fold],
        sample_rate=SAMPLE_RATE,
        duration=CLIP_DURATION,
        waveform_augment=None # don't do augmentation
    )

    test_dataset = UrbanSound8KWaveformDataset( # test set
        data_dir=DATA_DIR,
        folds=[test_fold],
        sample_rate=SAMPLE_RATE,
        duration=CLIP_DURATION,
        waveform_augment=None # don't do augmentation
    )

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
    val_loader   = DataLoader(val_dataset,   batch_size=BATCH_SIZE, shuffle=False, num_workers=4)
    test_loader  = DataLoader(test_dataset,  batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

    # creat model, loss and optimizer
    model = VGG13_1D(num_classes=N_CLASSES, in_channels=1).to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

    history = {
        "train_loss": [],
        "val_loss": [],
        "train_acc": [],
        "val_acc": []
    }

    best_val_acc = 0.0
    best_model_state = None

    for epoch in range(1, NUM_EPOCHS + 1):
        print(f"\n[Fold {test_fold}] Epoch {epoch}/{NUM_EPOCHS}")

        train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer, DEVICE)
        val_loss, val_acc = evaluate(model, val_loader, criterion, DEVICE)

        history["train_loss"].append(train_loss)
        history["val_loss"].append(val_loss)
        history["train_acc"].append(train_acc)
        history["val_acc"].append(val_acc)

        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        print(f"Val   Loss: {val_loss:.4f}, Val   Acc: {val_acc:.4f}")

        # early stop to store the best model (on val set)
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_state = model.state_dict()

    # plot learning curve
    plot_learning_curve(history, fold_id=test_fold, save_dir=MODEL_DIR)

    # load best model
    if best_model_state is not None:
        model.load_state_dict(best_model_state)

    # test on test set after finishing training
    test_loss, test_acc = evaluate(model, test_loader, criterion, DEVICE)
    print(f"[Fold {test_fold}] Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}")

    # store the model
    model_path = os.path.join(MODEL_DIR, f"vgg13_d_fold{test_fold}.pth")
    torch.save(model.state_dict(), model_path)
    print(f"Saved model for fold {test_fold} to: {model_path}")

Fold 1 as TEST fold
Train folds: [2, 3, 4, 5, 6, 7, 8, 9]
Val fold: 10
Test fold: 1

[Fold 1] Epoch 1/20
Train Loss: 1.6581, Train Acc: 0.3989
Val   Loss: 1.7919, Val   Acc: 0.4134

[Fold 1] Epoch 2/20
Train Loss: 1.4953, Train Acc: 0.4583
Val   Loss: 1.6322, Val   Acc: 0.4779

[Fold 1] Epoch 3/20
Train Loss: 1.4267, Train Acc: 0.4910
Val   Loss: 1.6746, Val   Acc: 0.4134

[Fold 1] Epoch 4/20
Train Loss: 1.3783, Train Acc: 0.5078
Val   Loss: 1.6329, Val   Acc: 0.4385

[Fold 1] Epoch 5/20
Train Loss: 1.3261, Train Acc: 0.5275
Val   Loss: 1.6567, Val   Acc: 0.4074

[Fold 1] Epoch 6/20
Train Loss: 1.2952, Train Acc: 0.5422
Val   Loss: 1.6039, Val   Acc: 0.4659

[Fold 1] Epoch 7/20
Train Loss: 1.2655, Train Acc: 0.5538
Val   Loss: 1.7320, Val   Acc: 0.4504

[Fold 1] Epoch 8/20
Train Loss: 1.2342, Train Acc: 0.5741
Val   Loss: 1.5681, Val   Acc: 0.5090

[Fold 1] Epoch 9/20
Train Loss: 1.2153, Train Acc: 0.5807
Val   Loss: 1.5297, Val   Acc: 0.5818

[Fold 1] Epoch 10/20
Train Loss: 1.1901, T

KeyboardInterrupt: 