In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, random_split

import torchaudio
import torchaudio.functional as AF

import numpy as np
import pandas as pd
import random

from sklearn.metrics import f1_score, roc_auc_score, accuracy_score

import os
import shutil
import time
from datetime import datetime

print(f"PyTorch Version: {torch.__version__}")
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

In [None]:
N_FFT = 400
HOP_LENGTH = 160
N_MEL = 40
N_MFCC = 13
SR = 16000
TARGET_LENGTH = 40000
BATCH_SIZE = 16

NUM_EPOCH = 50
PATIENCE = 7
THRESHOLD = 0.5

In [None]:
def load_audio_paths(base_path, label_map=None, default_label=None, start_count=0, show_done=True):
    data_list = []
    file_count = start_count

    if label_map:
        for root_folder, label in label_map.items():
            root_path = os.path.join(base_path, root_folder)
            if not os.path.exists(root_path):
                continue

            for dirpath, _, filenames in os.walk(root_path):
                for file_name in filenames:
                    if file_name.endswith((".wav", ".mp3")):
                        file_path = os.path.join(dirpath, file_name)
                        data_list.append((file_path, label))
                        file_count += 1

    else:
        for dirpath, _, filenames in os.walk(base_path):
            for file_name in filenames:
                if file_name.endswith((".wav", ".mp3")):
                    file_path = os.path.join(dirpath, file_name)
                    data_list.append((file_path, default_label))
                    file_count += 1
                
    return data_list, file_count

In [None]:
fpt_label_map = {"true": 1, "false_yes": 0, "false_no": 0}
fpt_path = "/kaggle/input/voice-fpt-aip491/Data_voices/Data_voices/FPT.AI/"

all_data_list, current_count = load_audio_paths(
    fpt_path,
    label_map=fpt_label_map
)

cut_sound_path = "/kaggle/input/voice-fpt-aip491/Data_voices/Data_voices/cut_sound"
cut_list, current_count = load_audio_paths(
    cut_sound_path,
    default_label=0,
    start_count=current_count
)
all_data_list.extend(cut_list)

edge_base_path = "/kaggle/input/voice-fpt-aip491/Data_voices/Data_voices/edge_voices_16k"
edge_label_map = {"true": 1, "false_yes": 0, "false_no": 0, "false_true": 0}

for speaker_folder in os.listdir(edge_base_path):
    speaker_path = os.path.join(edge_base_path, speaker_folder)
    if not os.path.isdir(speaker_path): continue

    temp_list, current_count = load_audio_paths(
        speaker_path,
        label_map=edge_label_map,
        start_count=current_count,
        show_done=False
    )
    all_data_list.extend(temp_list)

print(f"Total files found: {len(all_data_list)}")

In [None]:
spectrogram_transform = torchaudio.transforms.Spectrogram(
    n_fft=N_FFT,        
    hop_length=HOP_LENGTH,    
    power=2.0
)

class AudioSpectrogramDataset(torch.utils.data.Dataset):
    def __init__(self, data_list, transform, target_length=40000, is_train=False):
        
        self.data_list = data_list
        self.transform = transform
        self.target_length = target_length
        self.is_train = is_train
        
        self.freq_masking = torchaudio.transforms.FrequencyMasking(freq_mask_param=20)
        self.time_masking = torchaudio.transforms.TimeMasking(time_mask_param=40)

    def __len__(self):
        return len(self.data_list)
    
    def _augment_waveform(self, waveform):
        if random.random() < 0.5:
            gain = random.uniform(0.8, 1.2)
            waveform = waveform * gain

        if random.random() < 0.5:
            noise_intensity = random.uniform(0.001, 0.005) 
            noise = torch.randn_like(waveform) * noise_intensity
            waveform = waveform + noise

        if random.random() < 0.5:
            max_shift = int(0.1 * waveform.shape[1])
            
            if max_shift > 0:
                shift_amt = random.randint(1, max_shift) 
                
                new_waveform = torch.zeros_like(waveform)
                length = waveform.shape[1]
                
                if random.random() < 0.5: 
                    new_waveform[:, shift_amt:] = waveform[:, :length - shift_amt]
                else: 
                    new_waveform[:, :length - shift_amt] = waveform[:, shift_amt:]
                    
                waveform = new_waveform
            
        return waveform

    def __getitem__(self, idx):
        file_path, label = self.data_list[idx]
        
        try:
            waveform, sample_rate = torchaudio.load(file_path)
            if waveform.shape[0] > 1:
                waveform = torch.mean(waveform, dim=0, keepdim=True)
        except Exception as e:
            waveform = torch.zeros(1, self.target_length)

        length = waveform.shape[1] 

        if length < self.target_length:
            waveform = torch.nn.functional.pad(waveform, (0, self.target_length - length))
        else:
            if self.is_train and length > self.target_length:
                start = random.randint(0, length - self.target_length)
                waveform = waveform[:, start:start+self.target_length]
            else:
                waveform = waveform[:, :self.target_length]

        if self.is_train:
            waveform = self._augment_waveform(waveform)

        spec = self.transform(waveform)
        
        if self.is_train:
            spec = self.freq_masking(spec)
            spec = self.time_masking(spec)
            
            if random.random() < 0.3:
                 spec = self.time_masking(spec)
                
        spec = spec.squeeze(0)
        
        return spec, torch.tensor(label, dtype=torch.long)

In [None]:
total = len(all_data_list)
train_size = int(total * 0.9)

random.shuffle(all_data_list)
train_list = all_data_list[:train_size]
valid_list = all_data_list[train_size:]

print(f"Train samples: {len(train_list)}")
print(f"Valid samples: {len(valid_list)}")


train_dataset = AudioSpectrogramDataset(
    train_list, 
    transform=spectrogram_transform, 
    target_length=TARGET_LENGTH,
    is_train=True
)

valid_dataset = AudioSpectrogramDataset(
    valid_list, 
    transform=spectrogram_transform, 
    target_length=TARGET_LENGTH,
    is_train=False
)

train_loader = DataLoader(
    train_dataset, 
    batch_size=BATCH_SIZE, 
    shuffle=True, 
    num_workers=0, 
    pin_memory=True
)

valid_loader = DataLoader(
    valid_dataset, 
    batch_size=BATCH_SIZE, 
    shuffle=False, 
    num_workers=0, 
    pin_memory=True
)

In [None]:
class CNN_BiLSTM(nn.Module):
    def __init__(self,      
                 n_fft=N_FFT, hop_length=HOP_LENGTH,
                 sr=SR, n_mfcc=N_MFCC, n_mels=N_MEL,
                 hidden_dim=128, num_layers=2):
        
        super(CNN_BiLSTM, self).__init__()
        
        n_stft = n_fft // 2 + 1 
        
        self.mel_scale = torchaudio.transforms.MelScale(
            n_mels=n_mels, 
            sample_rate=sr, 
            n_stft=n_stft
        )
        
        self.amplitude_to_db = torchaudio.transforms.AmplitudeToDB(
            stype="power", top_db=80.0
        )
        
        self.register_buffer("dct_mat", AF.create_dct(n_mfcc, n_mels, norm="ortho"))
        self.register_buffer("std_epsilon", torch.tensor(1e-6))

        self.conv1 = nn.Conv2d(1, 16, kernel_size=(3,3), padding=1)
        self.bn1   = nn.BatchNorm2d(16)
        self.pool1 = nn.MaxPool2d((2,2))
        self.drop1 = nn.Dropout2d(0.3)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=(3,3), padding=1)
        self.bn2   = nn.BatchNorm2d(32)
        self.pool2 = nn.MaxPool2d((2,2))
        self.drop2 = nn.Dropout2d(0.3)

        cnn_out_dim = 32 * (n_mfcc // 4)
        
        self.lstm = nn.LSTM(
            input_size=cnn_out_dim, hidden_size=hidden_dim, num_layers=num_layers,
            batch_first=True, bidirectional=True, dropout=0.3 if num_layers > 1 else 0
        )
        self.ln = nn.LayerNorm(hidden_dim * 2)
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(hidden_dim * 2, 1)

    def forward(self, x):
        
        x = self.mel_scale(x) 
        x = self.amplitude_to_db(x)
        x = x.transpose(-1, -2)
        x = torch.matmul(x, self.dct_mat)
        mfcc = x.transpose(-1, -2)
        mean = mfcc.mean(dim=-1, keepdim=True)
        std = mfcc.std(dim=-1, keepdim=True) + self.std_epsilon
        mfcc = (mfcc - mean) / std
        x_cnn = mfcc.unsqueeze(1)
        
        x = self.drop1(self.pool1(F.relu(self.bn1(self.conv1(x_cnn)))))
        x = self.drop2(self.pool2(F.relu(self.bn2(self.conv2(x)))))
        
        B, C, freq, T_reduced = x.size()
        x = x.permute(0, 3, 1, 2)
        x = x.reshape(B, T_reduced, C * freq)
        
        lstm_out, _ = self.lstm(x)
        out = self.ln(lstm_out[:, -1, :])
        out = self.dropout(out)
        return self.fc(out)
        
model = CNN_BiLSTM()
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Total parameters: {total_params:,}")

In [None]:
def safe_auc(y_true, y_score):
    try:
        return roc_auc_score(y_true, y_score) if len(set(y_true)) > 1 else 0.5
    except:
        return 0.5

def train_model(model_class, train_loader, valid_loader, num_epochs, patience):
    global device
    
    model = model_class().to(device)

    print(f"Training started with {len(train_loader.dataset)} train samples.")
    print(f"Validation loader ready with {len(valid_loader.dataset)} valid samples.")

    optimizer = torch.optim.Adam(model.parameters(), lr=5e-4, weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)
    criterion = nn.BCEWithLogitsLoss()

    best_val_loss = float('inf')
    epochs_no_improve = 0
    best_state = None

    for epoch in range(num_epochs):
        model.train()
        train_running_loss = 0.0 
        all_train_preds, all_train_labels, all_train_probs = [], [], []
        
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        for batch_idx, (X_batch, y_batch) in enumerate(train_loader):
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()

            outputs = model(X_batch).view(-1)
            loss = criterion(outputs, y_batch.float())
            loss.backward()

            train_running_loss += loss.item() * X_batch.size(0)

            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            
            probs = torch.sigmoid(outputs)
            preds = (probs > 0.5).long()
            
            all_train_preds.extend(preds.detach().cpu().numpy())
            all_train_labels.extend(y_batch.detach().cpu().numpy())
            all_train_probs.extend(probs.detach().cpu().numpy())
            
            if batch_idx % 100 == 0:
                print(f"\r   Batch {batch_idx}/{len(train_loader)}", end="")

        train_loss = train_running_loss / len(train_loader.dataset)
        train_acc = (np.array(all_train_preds) == np.array(all_train_labels)).mean()
        train_f1 = f1_score(all_train_labels, all_train_preds, average="macro")
        train_auc = safe_auc(all_train_labels, all_train_probs)

        print("\n   Running validation...")
        model.eval()
        all_val_preds, all_val_labels, all_val_probs = [], [], []
        valid_running_loss = 0.0 
        
        with torch.no_grad():
            for X_val, y_val in valid_loader:
                X_val, y_val = X_val.to(device), y_val.to(device)
                outputs = model(X_val).view(-1)
                loss = criterion(outputs, y_val.float())
                valid_running_loss += loss.item() * X_val.size(0)

                probs = torch.sigmoid(outputs)
                preds = (probs > 0.5).long()

                all_val_preds.extend(preds.cpu().numpy())
                all_val_labels.extend(y_val.cpu().numpy())
                all_val_probs.extend(probs.cpu().numpy())

        valid_loss = valid_running_loss / len(valid_loader.dataset)
        val_acc = (np.array(all_val_preds) == np.array(all_val_labels)).mean()
        val_f1 = f1_score(all_val_labels, all_val_preds, average="macro")
        val_auc = safe_auc(all_val_labels, all_val_probs)
        
        scheduler.step(valid_loss)
        current_lr = optimizer.param_groups[0]['lr']

        print("-" * 80)
        print(f" RESULT | Epoch {epoch+1} | LR: {current_lr:.6f}")
        print(f"   >> TRAIN | Loss: {train_loss:.4f} | Acc: {train_acc*100:.2f}% | F1: {train_f1:.3f} | AUC: {train_auc:.3f}")
        print(f"   >> VALID | Loss: {valid_loss:.4f} | Acc: {val_acc*100:.2f}% | F1: {val_f1:.3f} | AUC: {val_auc:.3f}")
        print("-" * 80)

        if valid_loss < best_val_loss:
            best_val_loss = valid_loss
            epochs_no_improve = 0
            best_state = {"model": model.state_dict()}
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print(f"Early stopping triggered. Best Val Loss: {best_val_loss:.4f}")
                break

    if best_state:
        save_path = "model_CNN_BiLSTM.pt"
        torch.save(best_state, save_path)
        print(f"Saved best model to {save_path}")
        model.load_state_dict(best_state["model"])

    return model

In [None]:
print("\n Starting training...")
trained_model = train_model(
    model_class=CNN_BiLSTM,
    train_loader=train_loader,   
    valid_loader=valid_loader,
    num_epochs=NUM_EPOCH,
    patience=PATIENCE
)

In [None]:
def load_test_data_paths(test_root_dir):
    data_list = [] 
        
    label_map = {"false_no": 0, "false_yes": 0, "true": 1}
    file_count = 0

    if not os.path.exists(test_root_dir):
        print(f"Directory not found: {test_root_dir}")
        return []

    for user_dir in os.listdir(test_root_dir):
        user_path = os.path.join(test_root_dir, user_dir)
        if not os.path.isdir(user_path):
            continue

        for sub_dir_name, label in label_map.items():
            sub_path = os.path.join(user_path, sub_dir_name)
            if os.path.isdir(sub_path):
                for filename in os.listdir(sub_path):
                    if filename.endswith((".wav", ".mp3")): 
                        filepath = os.path.join(sub_path, filename)
                        data_list.append((filepath, label))
                        file_count += 1
                        
    if file_count == 0:
        print("\nNo test files found.")
        return []
        
    print(f"\nTotal test samples found: {file_count}")
    return data_list

def test_model(model, test_loader, threshold=THRESHOLD, verbose=True):

    device = torch.device("cpu")
    model.to(device)
    model.eval()

    all_preds, all_labels, all_probs = [], [], []
    criterion = nn.BCEWithLogitsLoss()

    with torch.no_grad():
        for X_test, y_test in test_loader:

            X_test, y_test = X_test.to(device), y_test.to(device)

            outputs = model(X_test).view(-1)

            loss = criterion(outputs, y_test.float())

            probs = torch.sigmoid(outputs)
            preds = (probs > threshold).long()

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(y_test.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

    total_samples = len(all_labels)
    if total_samples == 0:
        return 0.0, 0.0, 0.0

    accuracy = (np.array(all_preds) == np.array(all_labels)).mean()
    f1_macro = f1_score(all_labels, all_preds, average="macro")

    try:
        auc_score = roc_auc_score(all_labels, all_probs) if len(set(all_labels)) > 1 else 0.5
    except:
        auc_score = 0.5

    if verbose:
        print("\n===== TEST RESULTS =====")
        print(f"Accuracy     : {accuracy:.4f}")
        print(f"F1 Score     : {f1_macro:.4f}")
        print(f"AUC Score    : {auc_score:.4f}")
    
    return accuracy, f1_macro, auc_score


In [None]:
test_root_dir = "/kaggle/input/test-aip419/Datatest/"

test_data_list = load_test_data_paths(test_root_dir)

if len(test_data_list) > 0:
    test_dataset = AudioSpectrogramDataset(
        test_data_list, 
        transform=spectrogram_transform,
        target_length=TARGET_LENGTH,
        is_train=False
    )
    
    test_loader = DataLoader(test_dataset, batch_size= 1, shuffle=False, num_workers=2) 
    
    print(f"Test loader is ready with {len(test_dataset)} samples.")
    
    try:
        if 'trained_model' in globals():
            best_thresh = test_model(trained_model, test_loader)
        else:
            print("Error: 'trained_model' is not defined. Make sure you trained the model first.")
            
    except Exception as e:
        print(f"\nERROR during testing: {e}")
        import traceback
        traceback.print_exc()

else:
    print("No test data loaded. Skipping evaluation.")

In [None]:
def export_to_onnx():
    SAVED_WEIGHTS_PATH = "model_CNN_BiLSTM.pt"
    ONNX_EXPORT_PATH = "model_CNN_BiLSTM.onnx"

    print("Starting model export to ONNX...")

    if not os.path.exists(SAVED_WEIGHTS_PATH):
        print(f"Error: Weight file not found at '{SAVED_WEIGHTS_PATH}'.")
        print("Please ensure you have run the training script successfully.")
        return

    device = torch.device("cpu")
    print(f"Using device: {device}")

    print("Initializing CNN_BiLSTM model...")
    model_to_export = CNN_BiLSTM(
        sr=SR,
        n_mfcc=N_MFCC,
        n_fft=N_FFT,
        hop_length=HOP_LENGTH,
        n_mels=N_MEL,
        hidden_dim=128,
        num_layers=2
    ).to(device)

    try:
        print(f"Loading weights from: {SAVED_WEIGHTS_PATH}...")
        checkpoint = torch.load(SAVED_WEIGHTS_PATH, map_location=device)

        model_to_export.load_state_dict(checkpoint['model'])
        print("Successfully loaded trained weights.")

        model_to_export.eval()

        n_freq = (N_FFT // 2) + 1
        n_time = (TARGET_LENGTH // HOP_LENGTH) + 1

        dummy_input = torch.randn(1, n_freq, n_time).to(device)
        print(f"Creating dummy input (Spectrogram) with shape: {dummy_input.shape}")

        print(f"Exporting model to ONNX at: {ONNX_EXPORT_PATH}...")
        torch.onnx.export(
            model_to_export,
            dummy_input,
            ONNX_EXPORT_PATH,
            export_params=True,
            opset_version=17,
            do_constant_folding=True,
            input_names=['spectrogram'],
            output_names=['logits'],
            dynamic_axes={
                'spectrogram': {0: 'batch_size'},
                'logits': {0: 'batch_size'}
            }
        )
        print("-" * 50)
        print(f" ONNX EXPORT SUCCESSFUL!")
        print(f"Model saved at: {ONNX_EXPORT_PATH}")
        print(f"Model expects input 'spectrogram' (shape [B, {n_freq}, {n_time}]) and returns 'logits'.")
        print("-" * 50)

    except Exception as e:
        print(f"\n ERROR DURING WEIGHT LOADING OR ONNX EXPORT:")
        print(f"{type(e).__name__}: {e}")
        print("\nPlease check if the .pt file is correct or if the model architecture matches.")

if __name__ == "__main__":
    export_to_onnx()