In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


Using device: cuda


In [2]:
from pathlib import Path

DATA_ROOT = Path(r"C:\Users\Abdul\OneDrive\Desktop\Varsity\Deep Learning\Dataset")
asv2019_root = DATA_ROOT / "ASVSpoof2019" / "LA" / "LA"

train_dir = asv2019_root / "ASVspoof2019_LA_train"
dev_dir   = asv2019_root / "ASVspoof2019_LA_dev"
eval_dir  = asv2019_root / "ASVspoof2019_LA_eval"

train_proto = asv2019_root / "ASVspoof2019_LA_cm_protocols" / "ASVspoof2019.LA.cm.train.trn.txt"
dev_proto   = asv2019_root / "ASVspoof2019_LA_cm_protocols" / "ASVspoof2019.LA.cm.dev.trl.txt"
eval_proto  = asv2019_root / "ASVspoof2019_LA_cm_protocols" / "ASVspoof2019.LA.cm.eval.trl.txt"


In [6]:
LABEL_MAP = {"bonafide": 0, "spoof": 1}

def parse_protocol(proto_path, base_dir):
    """
    Parse ASVspoof2019 LA CM protocol file.

    Typical line examples:
      LA_0065 LA_T_1000135 - bonafide
      LA_0065 LA_T_1000137 A01 spoof

    - file_id is the 2nd token (parts[1])
    - 'bonafide' or 'spoof' appears somewhere in the line
    """
    entries = []
    with open(proto_path, "r") as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith("#"):
                continue

            parts = line.split()

            # 1) file_id in 2nd column
            file_id = parts[1]   # e.g., LA_T_1000135

            # 2) find label token: bonafide/spoof
            label_token = None
            for p in parts:
                if p in LABEL_MAP:
                    label_token = p
                    break
            if label_token is None:
                # no valid label in this line, skip
                continue

            label = LABEL_MAP[label_token]

            # 3) build audio path from file_id
            rel_path = file_id + ".flac"
            audio_path = base_dir / "flac" / rel_path

            entries.append((audio_path, label))

    print(f"Parsed {len(entries)} entries from {proto_path.name}")
    return entries


In [7]:
from torch.utils.data import Dataset, DataLoader
import librosa
import numpy as np
import torch

def pad_or_truncate_audio(y, target_len=64000):
    if len(y) < target_len:
        pad_len = target_len - len(y)
        y = np.pad(y, (0, pad_len), mode="constant")
    else:
        y = y[:target_len]
    return y

class ASVspoof2019LADataset(Dataset):
    def __init__(self, split="train"):
        if split == "train":
            self.base_dir = train_dir
            self.proto_path = train_proto
        elif split == "dev":
            self.base_dir = dev_dir
            self.proto_path = dev_proto
        else:
            self.base_dir = eval_dir
            self.proto_path = eval_proto

        self.entries = parse_protocol(self.proto_path, self.base_dir)
        print(f"{split} split: {len(self.entries)} files")

    def __len__(self):
        return len(self.entries)

    def __getitem__(self, idx):
        audio_path, label = self.entries[idx]
        y, sr = librosa.load(audio_path, sr=16000)
        y = pad_or_truncate_audio(y, target_len=64000)
        y = torch.tensor(y, dtype=torch.float32).unsqueeze(0)  # [1, T]
        return y, torch.tensor(label)


In [8]:
batch_size = 4

eval_dataset = ASVspoof2019LADataset(split="eval")
eval_loader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False, num_workers=0)


Parsed 71237 entries from ASVspoof2019.LA.cm.eval.trl.txt
eval split: 71237 files


In [9]:
train_dataset = ASVspoof2019LADataset(split="train")
dev_dataset   = ASVspoof2019LADataset(split="dev")

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
dev_loader   = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False, num_workers=0)


Parsed 25380 entries from ASVspoof2019.LA.cm.train.trn.txt
train split: 25380 files
Parsed 24844 entries from ASVspoof2019.LA.cm.dev.trl.txt
dev split: 24844 files


In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ResBlockRawNet2(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn1   = nn.BatchNorm1d(out_channels)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2   = nn.BatchNorm1d(out_channels)

        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv1d(in_channels, out_channels, kernel_size=1,
                          stride=stride, bias=False),
                nn.BatchNorm1d(out_channels)
            )
        else:
            self.shortcut = nn.Identity()

        self.act = nn.LeakyReLU(0.3, inplace=True)

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.act(out)

        out = self.conv2(out)
        out = self.bn2(out)

        shortcut = self.shortcut(x)

        out = out + shortcut
        out = self.act(out)
        return out


class RawNet2(nn.Module):
    """
    RawNet2-style architecture (simplified) for ASVspoof2019 LA.
    Input: [B, 1, T] raw waveform (e.g. T=64000)
    """
    def __init__(self, n_classes=2):
        super().__init__()
        self.act = nn.LeakyReLU(0.3, inplace=True)

        self.conv_in = nn.Conv1d(1, 64, kernel_size=3, stride=3, padding=1, bias=False)
        self.bn_in   = nn.BatchNorm1d(64)

        self.block1 = ResBlockRawNet2(64, 64, stride=1)   # T/3
        self.block2 = ResBlockRawNet2(64, 128, stride=3)  # ~T/9
        self.block3 = ResBlockRawNet2(128, 128, stride=3) # ~T/27
        self.block4 = ResBlockRawNet2(128, 256, stride=3) # ~T/81
        self.block5 = ResBlockRawNet2(256, 256, stride=1)

        self.gru = nn.GRU(
            input_size=256,
            hidden_size=256,
            num_layers=2,
            batch_first=True,
            bidirectional=False,
        )

        self.fc = nn.Linear(256, n_classes)

    def forward(self, x, return_embedding: bool = False):
        """
        x: [B, 1, T]
        If return_embedding=True, returns (logits, h_last)
        Else returns logits only.
        """
        x = self.conv_in(x)
        x = self.bn_in(x)
        x = self.act(x)

        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)

        x = x.transpose(1, 2)   # [B, T_seq, 256]

        out, h_n = self.gru(x)  # h_n: [num_layers, B, 256]
        h_last = h_n[-1]        # [B, 256] â†’ this is our embedding

        logits = self.fc(h_last)  # [B, n_classes]

        if return_embedding:
            return logits, h_last
        else:
            return logits


In [12]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

model = RawNet2(n_classes=2).to(device)
state = torch.load("rawnet2_best.pth", map_location=device)
model.load_state_dict(state)
model.eval()


Using device: cuda


  state = torch.load("rawnet2_best.pth", map_location=device)


RawNet2(
  (act): LeakyReLU(negative_slope=0.3, inplace=True)
  (conv_in): Conv1d(1, 64, kernel_size=(3,), stride=(3,), padding=(1,), bias=False)
  (bn_in): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (block1): ResBlockRawNet2(
    (conv1): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,), bias=False)
    (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv2): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,), bias=False)
    (bn2): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (shortcut): Identity()
    (act): LeakyReLU(negative_slope=0.3, inplace=True)
  )
  (block2): ResBlockRawNet2(
    (conv1): Conv1d(64, 128, kernel_size=(3,), stride=(3,), padding=(1,), bias=False)
    (bn1): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv2): Conv1d(128, 128, kernel_size=(3,), stride=(1,), padding=(1,), bias=False)
  

In [13]:
import numpy as np
import torch

def extract_embeddings(model, loader, device):
    model.eval()
    all_embs = []
    all_labels = []
    all_scores = []

    with torch.inference_mode():
        for audio_batch, labels in loader:
            audio_batch = audio_batch.to(device)
            labels = labels.to(device)

            logits, emb = model(audio_batch, return_embedding=True)  # [B,2], [B,256]
            probs = torch.softmax(logits, dim=1)[:, 1]               # spoof prob

            all_embs.append(emb.cpu().numpy())       # [B, 256]
            all_labels.append(labels.cpu().numpy())  # [B]
            all_scores.append(probs.cpu().numpy())   # [B]

    embeddings = np.concatenate(all_embs, axis=0)
    labels = np.concatenate(all_labels, axis=0)
    scores = np.concatenate(all_scores, axis=0)

    print("Embeddings shape:", embeddings.shape)
    print("Labels shape:", labels.shape)
    print("Scores shape:", scores.shape)

    return embeddings, labels, scores


In [14]:
emb_eval, labels_eval, scores_eval = extract_embeddings(model, eval_loader, device)

np.savez(
    "rawnet2_eval_embeddings.npz",
    embeddings=emb_eval,
    labels=labels_eval,
    scores=scores_eval,
)
print("Saved eval embeddings to rawnet2_eval_embeddings.npz")


Embeddings shape: (71237, 256)
Labels shape: (71237,)
Scores shape: (71237,)
Saved eval embeddings to rawnet2_eval_embeddings.npz


In [15]:
import numpy as np
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

# Threshold at 0.5
pred = (eval_scores >= 0.5).astype(int)

# Confusion Matrix
cm = confusion_matrix(eval_labels, pred)

print("Confusion Matrix (Raw Counts):")
print(cm)

# Pretty Plot
disp = ConfusionMatrixDisplay(cm, display_labels=["Bonafide (0)", "Spoof (1)"])
fig, ax = plt.subplots(figsize=(4,4))
disp.plot(ax=ax, values_format='d', colorbar=False)
plt.title("RawNet2 Confusion Matrix (Eval)")
plt.tight_layout()
plt.show()


NameError: name 'eval_scores' is not defined

In [16]:
cm_norm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]

print("\nNormalized Confusion Matrix:")
print(cm_norm)

# Plot
disp_norm = ConfusionMatrixDisplay(confusion_matrix=cm_norm,
                                   display_labels=["Bonafide (0)", "Spoof (1)"])
fig, ax = plt.subplots(figsize=(4,4))
disp_norm.plot(ax=ax, values_format='.2f', colorbar=True)
plt.title("RawNet2 Normalized Confusion Matrix (Eval)")
plt.tight_layout()
plt.show()


NameError: name 'cm' is not defined

In [17]:
from sklearn.metrics import roc_curve

fpr, tpr, thresholds = roc_curve(eval_labels, eval_scores)
fnr = 1 - tpr

# Find EER
eer_idx = np.nanargmin(np.abs(fnr - fpr))
eer = fpr[eer_idx]
eer_fpr = fpr[eer_idx]
eer_tpr = tpr[eer_idx]

print("EER:", eer)
print("EER threshold:", thresholds[eer_idx])

# Plot ROC
plt.figure(figsize=(6,6))
plt.plot(fpr, tpr, label="ROC Curve")
plt.plot([0,1], [0,1], 'k--', label="Random")
plt.scatter(eer_fpr, eer_tpr, color="red", label=f"EER = {eer:.3f}")
plt.title("RawNet2 ROC Curve (Eval)")
plt.xlabel("False Positive Rate (FPR)")
plt.ylabel("True Positive Rate (TPR)")
plt.legend(loc="lower right")
plt.grid()
plt.tight_layout()
plt.show()


NameError: name 'eval_labels' is not defined

In [18]:
TN, FP, FN, TP = cm.ravel()

summary = {
    "TN (Correct Bonafide)": TN,
    "FP (Bonafide classified as Spoof)": FP,
    "FN (Spoof classified as Bonafide)": FN,
    "TP (Correct Spoof)": TP,
    "Accuracy": (TP + TN) / (TP + TN + FP + FN),
    "Bonafide Detection Rate (TNR)": TN / (TN + FP),
    "Spoof Detection Rate (TPR)": TP / (TP + FN),
    "EER": eer
}

print("\n=== PERFORMANCE SUMMARY MATRIX ===")
for k, v in summary.items():
    print(f"{k}: {v}")


NameError: name 'cm' is not defined