# Imports

In [1]:
import os
import gc

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import h5py
from pathlib import Path


import torch
import torch.nn as nn
import torch.nn.functional as F

from sklearn.model_selection import KFold
from torch.utils.data import DataLoader, TensorDataset

# Data Settings

In [11]:
# All bird species available in the HDF5 spectrogram dataset
ALL_BIRD_SPECIES = [
    'amecro',  # American Crow
    'amerob',  # American Robin
    'bewwre',  # Bewick's Wren
    'bkcchi',  # Black-capped Chickadee
    'daejun',  # Dark-eyed Junco
    'houfin',  # House Finch
    'houspa',  # House Sparrow
    'norfli',  # Northern Flicker
    'rewbla',  # Red-winged Blackbird
    'sonspa',  # Song Sparrow
    'spotow',  # Spotted Towhee
    'whcspa',  # White-crowned Sparrow
]

# Selected bird species for binary classification
# Class 0: 'houspa' → House Sparrow
# Class 1: 'sonspa' → Song Sparrow
SELECTED_BIRD_CLASSES = [
    'houspa',  # Class 0
    'sonspa',  # Class 1
]

# Path to the HDF5 file containing bird spectrograms
SPEC_FILE_PATH = Path('../data/bird_spectrograms.hdf5')

# Training hyperparameters
EPOCHS = 50                 # Total number of training epochs
NUM_CV_FOLDS = 3            # K-Fold cross-validation (choose 3, 4, or 5)
BATCH_SIZE = 16             # Batch size for training
LEARNING_RATE = 0.001      # Learning rate for the optimizer

# List of evaluation metrics returned by `evaluate()` function
EVALUATION_METRICS = [
    "Weighted Accuracy",          # Balanced accuracy between houspa recall and sonspa specificity
    "Sensitivity/Recall",         # Recall for houspa (class 0)
    "Specificity",                # Specificity for sonspa (class 1)
    "Precision_houspa",           # Precision for class 0
    "Precision_sonspa",           # Precision for class 1
    "Precision_avg",              # Average of both precisions
    "F1_houspa",                  # F1 score for class 0
    "F1_sonspa",                  # F1 score for class 1
    "F1_avg",                     # Mean of both F1 scores
    "AUC_ROC_Score",              # Approximate ROC AUC
    "False_Discovery_Rate",       # FP / (FP + TP) for sonspa
    "False_Negative_Rate",        # FN / (FN + TP) for houspa
    "False_Omission_Rate",        # FN / (FN + TN)
    "False_Positive_Rate",        # FP / (FP + TN) for sonspa
    "Jaccard_Index"               # Intersection-over-union for houspa
]

# Load Data
We need to transpose the data from (128, 517, sample_size) into (sample_size, 128, 517) because CNNs expect an input of shape (N, C, H, W). 

Here:
- N = number of samples
- C = number of channels (1 for grayscale spectrograms)
- H = height (128)
- W = width (517)

In [3]:
X = []
y = []

with h5py.File(SPEC_FILE_PATH, 'r') as f:
    for label, key in enumerate(SELECTED_BIRD_CLASSES):
        data = f[key][:]  # shape = (128, 517, N)
        data = np.transpose(data, (2, 0, 1))  # shape = (N, 128, 517)
        X.append(data)
        y.append(np.full(data.shape[0], label))  # 0 for houspa, 1 for sonspa

X = np.concatenate(X, axis=0)  # (N_total, 128, 517)
y = np.concatenate(y)

len(X)

893

# Evaluation Metrics

In [4]:
def evaluate(y_hat_class, Y):
    """
    Evaluate binary classification metrics for Houspa (class 0) vs. Sonspa (class 1).

    Parameters:
    y_hat_class (np.ndarray): Predicted class labels (0=houspa, 1=sonspa).
    Y (np.ndarray): True class labels.

    Returns:
    tuple: (metrics array, confusion matrix)
    """
    cm = np.zeros((2, 2), dtype=int)
    y_hat_class = y_hat_class.reshape(-1)

    # Build confusion matrix
    for y_hat, y in zip(y_hat_class, Y):
        if y == 0:  # Actual houspa
            if y_hat == 0:
                cm[0, 0] += 1  # True Positive (TP) for houspa
            else:
                cm[0, 1] += 1  # False Negative (FN): missed houspa
        elif y == 1:  # Actual sonspa
            if y_hat == 1:
                cm[1, 1] += 1  # True Positive (TP) for sonspa
            else:
                cm[1, 0] += 1  # False Positive (FP): incorrectly predicted houspa

    # Confusion matrix entries
    TP_houspa = cm[0, 0]
    FN_houspa = cm[0, 1]
    FP_sonspa = cm[1, 0]
    TP_sonspa = cm[1, 1]

    # Sensitivity/Recall for houspa
    recall_houspa = TP_houspa / (TP_houspa + FN_houspa) if (TP_houspa + FN_houspa) != 0 else 0.0
    # Specificity for sonspa
    specificity_sonspa = TP_sonspa / (TP_sonspa + FP_sonspa) if (TP_sonspa + FP_sonspa) != 0 else 0.0

    # Precision for each class
    precision_houspa = TP_houspa / (TP_houspa + FP_sonspa) if (TP_houspa + FP_sonspa) != 0 else 0.0
    precision_sonspa = TP_sonspa / (TP_sonspa + FN_houspa) if (TP_sonspa + FN_houspa) != 0 else 0.0

    # F1 Scores
    f1_houspa = 2 * precision_houspa * recall_houspa / (precision_houspa + recall_houspa) if (precision_houspa + recall_houspa) != 0 else 0.0
    f1_sonspa = 2 * precision_sonspa * specificity_sonspa / (precision_sonspa + specificity_sonspa) if (precision_sonspa + specificity_sonspa) != 0 else 0.0

    # Averages
    precision_avg = (precision_houspa + precision_sonspa) / 2
    f1_avg = (f1_houspa + f1_sonspa) / 2
    weighted_accuracy = 0.5 * (recall_houspa + specificity_sonspa)

    # ROC AUC (approx.)
    auc_roc = 0.5 * (1 + recall_houspa - (FP_sonspa / (FP_sonspa + TP_sonspa) if (FP_sonspa + TP_sonspa) != 0 else 0.0))

    # Jaccard index for class 0 (houspa)
    jaccard = TP_houspa / (TP_houspa + FN_houspa + FP_sonspa) if (TP_houspa + FN_houspa + FP_sonspa) != 0 else 0.0

    # Error Rates
    fdr = FP_sonspa / (FP_sonspa + TP_houspa) if (FP_sonspa + TP_houspa) != 0 else 0.0  # False Discovery Rate
    fnr = FN_houspa / (FN_houspa + TP_houspa) if (FN_houspa + TP_houspa) != 0 else 0.0  # False Negative Rate
    forate = FN_houspa / (FN_houspa + TP_sonspa) if (FN_houspa + TP_sonspa) != 0 else 0.0  # False Omission Rate
    fpr = FP_sonspa / (FP_sonspa + TP_sonspa) if (FP_sonspa + TP_sonspa) != 0 else 0.0  # False Positive Rate

    return (
        np.array([
            weighted_accuracy,   # 0
            recall_houspa,       # 1
            specificity_sonspa,  # 2
            precision_houspa,    # 3
            precision_sonspa,    # 4
            precision_avg,       # 5
            f1_houspa,           # 6
            f1_sonspa,           # 7
            f1_avg,              # 8
            auc_roc,             # 9
            fdr,                 # 10
            fnr,                 # 11
            forate,              # 12
            fpr,                 # 13
            jaccard              # 14
        ]),
        cm
    )

# CNN Model Model Architecture

This convolutional neural network (CNN) performs **binary classification** on spectrogram inputs of shape `(1, 128, 517)`, where:
- `1` is the channel dimension (grayscale),
- `128` is the number of frequency bins (height),
- `517` is the number of time steps (width).

---

### Model Layers:

```python
self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
```

- Applies 16 convolutional filters of size 3×3.
- Padding keeps the spatial size unchanged (128×517).

```python
self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
```

- Applies 32 filters to the output of the first conv layer.
- Extracts more abstract audio patterns.

```python
self.pool = nn.MaxPool2d(2)
```

- Reduces both height and width by a factor of 2.
-  After two pooling layers: (128, 517) → (64, 258) → (32, 129)

```python
self.fc1 = nn.Linear(32 * 32 * 129, 64)
```

- Fully connected layer with 131,072 input features.
- Reduces to a 64-dimensional feature vector.

```python
self.fc2 = nn.Linear(64, 2)
```

- Outputs 2 logits (one per class).

In [None]:
class BirdBinaryCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(16)

        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)

        self.pool = nn.MaxPool2d(2)
        self.dropout = nn.Dropout(0.3)

        # Global Average Pooling reduces spatial dimension to 1x1
        self.gap = nn.AdaptiveAvgPool2d(1)  # Output shape: (batch, 32, 1, 1)

        # Final classifier
        self.fc1 = nn.Linear(32, 64)
        self.fc2 = nn.Linear(64, 2)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))  # (16, 64, 258)
        x = self.pool(F.relu(self.bn2(self.conv2(x))))  # (32, 32, 129)
        x = self.gap(x)                                 # (32, 1, 1)
        x = x.view(x.size(0), -1)                       # Flatten to (32,)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        return self.fc2(x)

# Model Training and Evaluation Loop

For CNNs, these four essential steps are important. <br>
I have explained more about backpropagation in this [Medium article](https://medium.com/data-science-collective/why-backpropagation-is-so-important-for-models-in-machine-learning-4736591b24b3)

- Forward pass: Feed input through the network to make a prediction.
- Loss calculation: It compares those predictions to the actual labels and computes the loss.
- Backpropagation: Use the chain rule to find how much each weight influenced the loss.
- Weight update: Apply the gradients using an optimizer like Adam.
---

## Reasoning behind of KFold Cross Validation
Since the dataset is relatively small, it may be best to go with just a KFold Cross Validation rather than doing a train-test split + cross-validation. <br>
That is the reason why I went with K-fold cross validation.

In [16]:
#  Convert NumPy to Torch Tensors
X_tensor = torch.tensor(X, dtype=torch.float32).unsqueeze(1)  # (N_total, 1, 128, 517)
y_tensor = torch.tensor(y, dtype=torch.long) # Converts labels (e.g., [0, 0, 0, 1, 1, 1, ...]) to a PyTorch tensor

# Train Test Split
kfold = KFold(n_splits=NUM_CV_FOLDS, shuffle=True, random_state=42)
metrics_all_folds = pd.DataFrame(columns=EVALUATION_METRICS)

for fold, (train_idx, test_idx) in enumerate(kfold.split(X_tensor)):
    print(f"\n Fold {fold + 1} | Train size: {len(train_idx)}, Test size: {len(test_idx)}")

    # Step 1: Split the data according to folds
    X_train, X_test = X_tensor[train_idx], X_tensor[test_idx]
    y_train, y_test = y_tensor[train_idx], y_tensor[test_idx]

    # Step 2: Wrap in TensorDataset and DataLoader
    train_ds = TensorDataset(X_train, y_train)
    test_ds = TensorDataset(X_test, y_test)

    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True) # Check Data Settings for BS
    test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE) # Check Data Settings for BS

    # Step 3: Initialize model and optimizer
    model = BirdBinaryCNN()
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4) # Check Data Settings for LR
    criterion = nn.CrossEntropyLoss()

    # Step 4: Train the model
    for epoch in range(EPOCHS):
        model.train()
        total_loss = 0
        for xb, yb in train_loader:
            pred = model(xb)
            loss = criterion(pred, yb)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        # Validation loss (only loss, no metrics)
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for xb, yb in test_loader:
                pred = model(xb)
                loss = criterion(pred, yb)
                val_loss += loss.item()

        print(f"Epoch {epoch + 1}/{EPOCHS} | Train Loss: {total_loss:.4f} | Val Loss: {val_loss:.4f}")


    # Step 5: Evaluate
    model.eval()
    y_true = []
    y_pred = []

    with torch.no_grad():
        for xb, yb in test_loader:
            pred = model(xb)
            preds = pred.argmax(dim=1)
            y_true.extend(yb.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())

    metrics, cm = evaluate(np.array(y_pred), np.array(y_true))
    VERSION_TAG = f"../output/binary_model_{EPOCHS}_fold_{fold}"


    # Save results per fold
    metrics_all_folds.loc[VERSION_TAG] = list(metrics)

    single_result_df = pd.DataFrame(
        metrics_all_folds.loc[[VERSION_TAG]]
    )

    output_path = os.path.join(f"{VERSION_TAG}_student_evaluation_results.csv")

    # Save in append mode, include header only if file does not exist
    if os.path.isfile(output_path):
        single_result_df.to_csv(output_path, mode="a", header=False)
    else:
        single_result_df.to_csv(output_path, mode="a", header=True)

    # Clean up
    del model
    gc.collect()

# === Final Summary ===
print("\n📊 Cross-Validation Summary:")
print(f"Avg Sensitivity (Recall):  {metrics_all_folds['Sensitivity/Recall'].astype(float).mean():.4f}")
print(f"Avg Specificity:           {metrics_all_folds['Specificity'].astype(float).mean():.4f}")
print(f"Avg Weighted Accuracy:     {metrics_all_folds['Weighted Accuracy'].astype(float).mean():.4f}")
print(f"Avg F1 Score (Avg):         {metrics_all_folds['F1_avg'].astype(float).mean():.4f}")
print(f"Avg AUC-ROC:               {metrics_all_folds['auc_roc_score'].astype(float).mean():.4f}")


 Fold 1 | Train size: 595, Test size: 298
Epoch 1/50 | Train Loss: 24.2120 | Val Loss: 11.2807
Epoch 2/50 | Train Loss: 23.7231 | Val Loss: 11.2279
Epoch 3/50 | Train Loss: 23.5185 | Val Loss: 11.1479
Epoch 4/50 | Train Loss: 23.3036 | Val Loss: 11.1335
Epoch 5/50 | Train Loss: 23.8738 | Val Loss: 11.2174
Epoch 6/50 | Train Loss: 23.5011 | Val Loss: 11.0833
Epoch 7/50 | Train Loss: 23.0533 | Val Loss: 11.1134
Epoch 8/50 | Train Loss: 23.8013 | Val Loss: 11.3719
Epoch 9/50 | Train Loss: 23.6421 | Val Loss: 11.1173
Epoch 10/50 | Train Loss: 23.0470 | Val Loss: 11.0010
Epoch 11/50 | Train Loss: 23.6704 | Val Loss: 11.0561
Epoch 12/50 | Train Loss: 23.2830 | Val Loss: 10.9745
Epoch 13/50 | Train Loss: 23.7138 | Val Loss: 11.1007
Epoch 14/50 | Train Loss: 23.0620 | Val Loss: 11.1119
Epoch 15/50 | Train Loss: 23.4806 | Val Loss: 11.1363
Epoch 16/50 | Train Loss: 23.5131 | Val Loss: 10.9405
Epoch 17/50 | Train Loss: 23.6349 | Val Loss: 10.9990
Epoch 18/50 | Train Loss: 22.8798 | Val Loss: 10

KeyboardInterrupt: 