In [None]:
from typing import Tuple
import random
from pathlib import Path

import numpy as np
import pandas as pd
import librosa

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

import torch
from collections import OrderedDict


BASE_DIR = Path("/Users/sp7078/Downloads/Kaggle_Data")
TRAIN_CSV = BASE_DIR / "metadata" / "kaggle_train.csv"
TEST_CSV = BASE_DIR / "metadata" / "kaggle_test.csv"
AUDIO_DIR = BASE_DIR / "audio"
TEST_AUDIO_DIR = AUDIO_DIR / "test"
FINAL_PATH = "audio_final_cnn.pth"

SAMPLE_RATE = 16000
CLIP_DURATION = 4.0
SIGNAL_LENGTH = int(SAMPLE_RATE * CLIP_DURATION)

N_MELS = 64
N_FFT = 2048
HOP_LENGTH = 512

BATCH_SIZE = 32
NUM_EPOCHS = 20
LEARNING_RATE = 1e-3
VAL_RATIO = 0.1

In [None]:
def set_seed(seed=42):
    import torch, random, numpy as np
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)


def load_audio(path):
    y, sr = librosa.load(path, sr=SAMPLE_RATE, mono=True)
    return y


def fix_length(y):
    if len(y) < SIGNAL_LENGTH:
        pad = SIGNAL_LENGTH - len(y)
        y = np.pad(y, (0, pad))
    elif len(y) > SIGNAL_LENGTH:
        start = (len(y) - SIGNAL_LENGTH) // 2
        y = y[start:start + SIGNAL_LENGTH]
    return y


def mk_logmel(y):
    mel = librosa.feature.melspectrogram(
        y=y,
        sr=SAMPLE_RATE,
        n_fft=N_FFT,
        hop_length=HOP_LENGTH,
        n_mels=N_MELS,
        power=2.0
    )
    mel_db = librosa.power_to_db(mel, ref=np.max)
    mel_db = (mel_db - mel_db.mean()) / (mel_db.std() + 1e-9)
    return mel_db.astype(np.float32)

import random

def spec_augment(mel, max_mask_pct=0.15, num_freq_masks=2, num_time_masks=2):
    n_mels, n_steps = mel.shape
    mask_val = mel.mean()

    for _ in range(num_freq_masks):
        f = random.randint(0, int(max_mask_pct * n_mels))
        f0 = random.randint(0, n_mels - f)
        mel[f0:f0+f, :] = mask_val

    for _ in range(num_time_masks):
        t = random.randint(0, int(max_mask_pct * n_steps))
        t0 = random.randint(0, n_steps - t)
        mel[:, t0:t0+t] = mask_val
    
    return mel


def mixup(x, y, alpha=0.4):
    l = np.random.beta(alpha, alpha)
    idx = torch.randperm(x.size(0))
    x2, y2 = x[idx], y[idx]
    x_mix = l * x + (1 - l) * x2
    return x_mix, y, y2, l


In [None]:
class SoundDataset(Dataset):
    def __init__(self, df, audio_root, train=True):
        self.df = df.reset_index(drop=True)
        self.audio_root = audio_root
        self.train = train

        uniq_labels = sorted(df["class"].unique())
        self.label2id = {lab: i for i, lab in enumerate(uniq_labels)}
        self.id2label = {v: k for k, v in self.label2id.items()}

    def __len__(self):
        return len(self.df)

    def find_audio(self, filename):
        for d in self.audio_root.glob("fold*"):
            candidate = d / filename
            if candidate.exists():
                return candidate
        raise FileNotFoundError(f"Not found in any fold: {filename}")

    def __getitem__(self, idx):
        row = self.df.iloc[idx]

        filename = row["slice_file_name"]
        label = row["class"]

        path = self.find_audio(filename) 
        y = load_audio(path)
        y = fix_length(y)
        mel = mk_logmel(y)

        if self.train:
            mel=spec_augment(mel)

        mel_tensor = torch.tensor(mel).unsqueeze(0)
        return mel_tensor, self.label2id[label]

In [None]:

class ConvBlock(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, 3, padding=2),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(),
            nn.Conv2d(out_ch, out_ch, 3, padding=2),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

    def forward(self, x):
        return self.net(x)


class AudioCNN(nn.Module):
    def __init__(self, class_num):
        super().__init__()
        self.b1 = ConvBlock(1, 32)
        self.b2 = ConvBlock(32, 64)
        self.b3 = ConvBlock(64, 128)
        self.b4 = ConvBlock(128, 256)
        self.b5= ConvBlock(256,512)
        
        
        

        self.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, class_num)
        )

    def forward(self, x):
        x = self.b1(x)
        x = self.b2(x)
        x = self.b3(x)
        x = self.b4(x)
        x=self.b5(x)
        
        
        x = x.mean(dim=(-1, -2))  # GAP
        return self.fc(x)

In [None]:
from tqdm import tqdm

def train_epoch(model, loader, optimizer, loss_fn, device):
    model.train()
    total = 0
    correct = 0
    running_loss = 0.0

    pbar = tqdm(loader, desc="TRAIN", leave=False)
    for mel, label in pbar:
        mel, label = mel.to(device), label.to(device)
        mel, label = mel.to(device), label.to(device)
        mel, label_orig, label_shuf, lam = mixup(mel, label)
        optimizer.zero_grad()
        logits = model(mel)
        loss = lam * loss_fn(logits, label_orig) + (1-lam) * loss_fn(logits, label_shuf)


        
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        pred = logits.argmax(1)
        correct += (pred == label).sum().item()
        total += label.size(0)

        pbar.set_postfix(loss=f"{running_loss/len(loader):.4f}",
                         acc=f"{correct/total:.4f}")

    avg_loss = running_loss / len(loader)
    avg_acc = correct / total
    return avg_loss, avg_acc



def validate(model, loader, loss_fn, device):
    model.eval()
    total = 0
    correct = 0
    running_loss = 0.0

    with torch.no_grad():
        pbar = tqdm(loader, desc="VAL", leave=False)
        for mel, label in pbar:
            mel, label = mel.to(device), label.to(device)

            logits = model(mel)
            loss = loss_fn(logits, label)

            running_loss += loss.item()
            pred = logits.argmax(1)
            correct += (pred == label).sum().item()
            total += label.size(0)

            pbar.set_postfix(loss=f"{running_loss/len(loader):.4f}",
                             acc=f"{correct/total:.4f}")

    avg_loss = running_loss / len(loader)
    avg_acc = correct / total
    return avg_loss, avg_acc

In [None]:
def main():
    set_seed(42)

    df = pd.read_csv(TRAIN_CSV)

    df_train = df[df["fold"] <= 7]
    df_val   = df[df["fold"] > 7]

    train_data = SoundDataset(df_train, AUDIO_DIR, train=True)
    val_data   = SoundDataset(df_val, AUDIO_DIR, train=False)

    train_load = DataLoader(train_data, batch_size=16, shuffle=True, num_workers=0)
    val_load   = DataLoader(val_data, batch_size=16, shuffle=False, num_workers=0)

    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}")

    model = AudioCNN(class_num=10).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=8)
    loss_fn = nn.CrossEntropyLoss()

    epochs = 50
    best_val_acc = 0.0
    best_state = None

    for epoch in range(epochs):
        print(f"\nEPOCH {epoch + 1} / {epochs}")

        train_loss, train_acc = train_epoch(model, train_load, optimizer, loss_fn, device)
        val_loss, val_acc     = validate(model, val_load, loss_fn, device)

        scheduler.step()

        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        print(f"Val   Loss: {val_loss:.4f}, Val   Acc: {val_acc:.4f}")

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_state = model.state_dict()
            print(f"New best val_acc={best_val_acc:.4f}")

    if best_state is not None:
        torch.save(best_state, FINAL_PATH)
        print(f"\nSaved best model with val_acc={best_val_acc:.4f} to {FINAL_PATH}")

In [None]:
def predict():
    df_test = pd.read_csv(TEST_CSV)
    device = "cuda" if torch.cuda.is_available() else "cpu"

    
    model = AudioCNN(class_num=10).to(device)
    model.load_state_dict(torch.load(FINAL_PATH, map_location=device))
    model.eval()
    

    ids=[]
    preds=[]


    results = []
    for i, row in tqdm(df_test.iterrows(), total=len(df_test)):
        fname=row["slice_file_name"]
        ID=row["ID"]
        path = TEST_AUDIO_DIR / fname

        y = load_audio(path)
        y = fix_length(y)
        mel = mk_logmel(y)
        x = torch.tensor(mel).unsqueeze(0).unsqueeze(0).to(device)

        with torch.no_grad():
            pred = model(x).argmax(1).item()

        ids.append(ID)
        preds.append(pred)

    df_sub = pd.DataFrame({"ID": ids, "TARGET": preds})
    df_sub.to_csv("submission_new.csv", index=False)
    print("submission_new.csv saved.")
    

In [None]:
if __name__ == "__main__":
    #main()
    predict()
    