# BirdCLEF+ 2025: CNN on Mel-Spectrograms

This notebook implements the CNN on Mel-Spectrograms approach for the BirdCLEF+ 2025 competition. It uses pretrained EfficientNet or ResNet models fine-tuned on mel-spectrograms of animal sounds.

## Setup and Dependencies

First, let's install and import all necessary libraries.

In [None]:
# Install any missing packages
!pip install -q librosa torchlibrosa timm torchaudio

# Core libraries
import os
import random
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import roc_auc_score

# Audio processing
import librosa
import librosa.display
import soundfile as sf

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import timm  # For efficient model implementations
from torch.cuda.amp import autocast, GradScaler  # For mixed precision training

# Set seeds for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    
set_seed()

# Check if GPU is available
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

## Connect to Google Drive

Connect to Google Drive to access the dataset and save trained models.

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Define paths
DATA_PATH = '/content/drive/MyDrive/birdclef-2025-data'
MODEL_SAVE_DIR = '/content/drive/MyDrive/birdclef-2025-models'

# Create model save directory if it doesn't exist
os.makedirs(MODEL_SAVE_DIR, exist_ok=True)

# Verify paths
print(f"Data path exists: {os.path.exists(DATA_PATH)}")
print(f"Model save directory: {MODEL_SAVE_DIR}")

## Data Exploration

Let's explore the dataset structure and understand the metadata.

In [None]:
# Load metadata files
train_df = pd.read_csv(os.path.join(DATA_PATH, 'train.csv'))
taxonomy_df = pd.read_csv(os.path.join(DATA_PATH, 'taxonomy.csv'))

# Display information
print(f"Training data shape: {train_df.shape}")
print(f"Unique species: {train_df['primary_label'].nunique()}")
train_df.head()

In [None]:
# Class distribution
species_counts = train_df['primary_label'].value_counts()
print(f"Max samples per class: {species_counts.max()}, Min samples per class: {species_counts.min()}")

plt.figure(figsize=(12, 6))
plt.plot(range(len(species_counts)), species_counts.values)
plt.title('Class Distribution')
plt.xlabel('Class Index (sorted)')
plt.ylabel('Number of Samples')
plt.tight_layout()
plt.show()

## Audio File Exploration

Let's load and visualize some audio files to better understand the data.

In [None]:
def load_audio_file(file_path, sr=32000):
    """Load an audio file and convert it to the desired sample rate."""
    try:
        audio, orig_sr = sf.read(file_path)
        if orig_sr != sr:
            audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=sr)
        return audio, sr
    except Exception as e:
        print(f"Error loading audio file {file_path}: {e}")
        return None, None

def plot_audio_and_spectrogram(audio, sr, title=None):
    """Plot audio waveform and mel-spectrogram."""
    fig, axes = plt.subplots(2, 1, figsize=(12, 8))
    
    # Plot waveform
    librosa.display.waveshow(audio, sr=sr, ax=axes[0])
    axes[0].set_title(f'Waveform ({title})' if title else 'Waveform')
    
    # Compute and plot mel-spectrogram
    mel_spec = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=128, fmax=sr/2)
    mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
    img = librosa.display.specshow(mel_spec_db, sr=sr, x_axis='time', y_axis='mel', fmax=sr/2, ax=axes[1])
    axes[1].set_title('Mel-Spectrogram')
    fig.colorbar(img, ax=axes[1], format='%+2.0f dB')
    plt.tight_layout()
    plt.show()

# Sample a few audio files from different species categories
sample_rows = train_df.groupby('primary_label').sample(1).iloc[:3]

for idx, row in sample_rows.iterrows():
    filename = row['filename']
    file_path = os.path.join(DATA_PATH, 'train_audio', filename)
    audio, sr = load_audio_file(file_path)
    if audio is not None:
        print(f"File: {filename}, Primary Label: {row['primary_label']}, Common Name: {row.get('common_name', 'N/A')}")
        plot_audio_and_spectrogram(audio, sr, title=row.get('common_name', row['primary_label']))

## Feature Extraction: Mel-Spectrograms

Define functions to extract mel-spectrograms from audio files.

In [None]:
def compute_melspec(audio, sr=32000, n_mels=128, fmin=20, fmax=16000, window_size=1024, hop_length=512):
    """Compute a mel-spectrogram from an audio signal."""
    # Apply a small offset to avoid log(0)
    melspec = librosa.feature.melspectrogram(
        y=audio,
        sr=sr,
        n_mels=n_mels,
        fmin=fmin,
        fmax=fmax,
        n_fft=window_size,
        hop_length=hop_length,
        power=2.0
    )
    
    # Convert to dB scale
    melspec_db = librosa.power_to_db(melspec, ref=np.max)
    
    # Normalize to [0, 1] range
    melspec_normalized = (melspec_db - melspec_db.min()) / (melspec_db.max() - melspec_db.min() + 1e-8)
    
    return melspec_normalized

def audio_to_image(audio, sr=32000, fixed_length=5, **kwargs):
    """Convert audio to a fixed-length mel-spectrogram 'image'."""
    # Trim or pad audio to fixed length in seconds
    target_len = int(fixed_length * sr)
    
    if len(audio) > target_len:
        # Random crop if audio is longer than target
        start = np.random.randint(0, len(audio) - target_len)
        audio = audio[start:start + target_len]
    else:
        # Pad with zeros if audio is shorter than target
        padding = target_len - len(audio)
        offset = padding // 2
        audio = np.pad(audio, (offset, padding - offset), 'constant')
    
    # Compute mel-spectrogram
    melspec = compute_melspec(audio, sr=sr, **kwargs)
    
    # Add channel dimension for neural network input (1 channel)
    return melspec[np.newaxis, :, :]

## Data Augmentation Functions

Define audio augmentation techniques that will be applied during training.

In [None]:
def time_shift(audio, shift_factor=0.2):
    """Apply random time shift to audio."""
    shift = int(len(audio) * shift_factor)
    direction = np.random.randint(0, 2)
    if direction == 1:
        shift = -shift
    aug_audio = np.roll(audio, shift)
    # Set the rolled part to zero
    if shift > 0:
        aug_audio[:shift] = 0
    else:
        aug_audio[shift:] = 0
    return aug_audio

def add_gaussian_noise(audio, noise_factor=0.01):
    """Add random Gaussian noise to audio."""
    noise = np.random.normal(0, audio.std() * noise_factor, audio.shape)
    return audio + noise

def change_pitch(audio, sr, pitch_factor=4):
    """Change pitch of audio without changing tempo."""
    pitch_shift = np.random.randint(-pitch_factor, pitch_factor)
    return librosa.effects.pitch_shift(audio, sr=sr, n_steps=pitch_shift)

def change_speed(audio, speed_factor=0.2):
    """Change speed of audio."""
    speed_change = np.random.uniform(1 - speed_factor, 1 + speed_factor)
    indices = np.round(np.arange(0, len(audio), speed_change)).astype(int)
    indices = indices[indices < len(audio)]
    return audio[indices]

def apply_augmentation(audio, sr):
    """Apply a random combination of augmentations to an audio sample."""
    augmentations = [
        lambda audio: time_shift(audio, shift_factor=0.2),
        lambda audio: add_gaussian_noise(audio, noise_factor=0.01),
        lambda audio: change_pitch(audio, sr, pitch_factor=2),
        lambda audio: audio  # Identity function (no augmentation)
    ]
    
    # Randomly select 1-2 augmentations
    n_augmentations = np.random.randint(0, 3)  # 0-2 augmentations
    augmentation_indices = np.random.choice(len(augmentations), size=n_augmentations, replace=False)
    
    aug_audio = audio
    for idx in augmentation_indices:
        aug_audio = augmentations[idx](aug_audio)
    
    return aug_audio

## Create Dataset and DataLoader

Define a custom PyTorch dataset and dataloaders for training.

In [None]:
class AudioMelDataset(Dataset):
    def __init__(self, data_df, data_path, target_sr=32000, duration=5, augment=False, 
                 cache_waveform=False, cache_melspec=False):
        self.data_df = data_df
        self.data_path = data_path
        self.target_sr = target_sr
        self.duration = duration
        self.augment = augment
        self.cache_waveform = cache_waveform
        self.cache_melspec = cache_melspec
        self.cached_waveforms = {}
        self.cached_melspecs = {}
        
        # Create label encodings and mapping
        self.unique_labels = sorted(data_df['primary_label'].unique())
        self.label_to_idx = {label: idx for idx, label in enumerate(self.unique_labels)}
        self.idx_to_label = {idx: label for label, idx in self.label_to_idx.items()}
        
        # Store file paths for efficient loading
        self.file_paths = [os.path.join(data_path, 'train_audio', filename) for filename in data_df['filename']]
        self.labels = [self.label_to_idx[label] for label in data_df['primary_label']]
    
    def __len__(self):
        return len(self.data_df)
    
    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        label = self.labels[idx]
        
        # Check if we have this waveform cached
        if self.cache_waveform and file_path in self.cached_waveforms:
            audio = self.cached_waveforms[file_path]
            sr = self.target_sr
        else:
            # Load audio file
            audio, sr = load_audio_file(file_path, self.target_sr)
            if audio is None:
                # If loading failed, create a zero array as fallback
                audio = np.zeros(self.target_sr * self.duration)
                sr = self.target_sr
            
            # Cache waveform if enabled
            if self.cache_waveform:
                self.cached_waveforms[file_path] = audio
        
        # Generate a unique augmentation key based on audio file and possible random seed
        aug_key = None
        if self.augment:
            # Apply audio augmentation
            audio = apply_augmentation(audio, sr)
        elif self.cache_melspec:
            # For non-augmented data, we can use the file path as the cache key
            aug_key = file_path
            
        # Check if we have the melspec cached (only for non-augmented data)
        if self.cache_melspec and aug_key is not None and aug_key in self.cached_melspecs:
            melspec = self.cached_melspecs[aug_key]
        else:
            # Convert to mel-spectrogram
            melspec = audio_to_image(audio, sr=sr, fixed_length=self.duration)
            
            # Cache melspec if enabled and this is non-augmented data
            if self.cache_melspec and aug_key is not None:
                self.cached_melspecs[aug_key] = melspec
        
        # Convert to tensor
        melspec_tensor = torch.tensor(melspec, dtype=torch.float32)
        label_tensor = torch.tensor(label, dtype=torch.long)
        
        return melspec_tensor, label_tensor


def prepare_dataloaders(train_df, data_path, batch_size=32, val_size=0.2, seed=42, 
                        num_workers=4, cache_waveform=False, cache_val_melspec=True):
    """Prepare train and validation dataloaders with optimized settings."""
    # Split data into train and validation sets
    train_data, val_data = train_test_split(
        train_df, 
        test_size=val_size, 
        random_state=seed,
        stratify=train_df['primary_label']  # Ensure class balance
    )
    
    # Reset indices
    train_data = train_data.reset_index(drop=True)
    val_data = val_data.reset_index(drop=True)
    
    # Create datasets
    train_dataset = AudioMelDataset(train_data, data_path, augment=True, 
                                   cache_waveform=cache_waveform, cache_melspec=False)
    
    # For validation data, we can cache melspectrograms since no augmentation is used
    val_dataset = AudioMelDataset(val_data, data_path, augment=False,
                                 cache_waveform=cache_waveform, cache_melspec=cache_val_melspec)
    
    # Create dataloaders with optimal settings
    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True,
        drop_last=True,
        persistent_workers=(num_workers > 0),  # Keep workers alive between batches
        prefetch_factor=2 if num_workers > 0 else None,  # Prefetch next batches
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size * 2,  # Can use larger batches for validation
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True,
        persistent_workers=(num_workers > 0),  # Keep workers alive between batches
        prefetch_factor=2 if num_workers > 0 else None,  # Prefetch next batches
    )
    
    return train_loader, val_loader, train_dataset.label_to_idx

## Model Definition

Define the CNN model using a pre-trained backbone.

In [None]:
class CNNMelSpectrogramModel(nn.Module):
    def __init__(self, num_classes, backbone="efficientnet_b0", pretrained=True):
        super().__init__()
        # Load pretrained backbone with proper handling for feature dimension
        if "efficientnet" in backbone:
            # Create the model first to get the feature dimension
            temp_model = timm.create_model(
                backbone,
                pretrained=pretrained,
                in_chans=1,  # Grayscale input for mel-spectrogram
                num_classes=0  # No classification head
            )
            # Extract the feature dimension before replacing classifier
            feature_dim = temp_model.classifier.in_features
            
            # Now create the actual model we'll use
            self.backbone = timm.create_model(
                backbone,
                pretrained=pretrained,
                in_chans=1,  # Grayscale input for mel-spectrogram
                num_classes=0  # No classification head
            )
            self.backbone.classifier = nn.Identity()
            
        elif "resnet" in backbone:
            # Create the model first to get the feature dimension
            temp_model = timm.create_model(
                backbone,
                pretrained=pretrained,
                in_chans=1,  # Grayscale input for mel-spectrogram
                num_classes=0  # No classification head
            )
            # Extract the feature dimension before replacing fc layer
            feature_dim = temp_model.fc.in_features
            
            # Now create the actual model we'll use
            self.backbone = timm.create_model(
                backbone,
                pretrained=pretrained,
                in_chans=1,  # Grayscale input for mel-spectrogram
                num_classes=0  # No classification head
            )
            self.backbone.fc = nn.Identity()
        else:
            raise NotImplementedError(f"Backbone {backbone} not supported")
        
        # New classifier head
        self.feature_dim = feature_dim  # Store this for reference
        self.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(feature_dim, num_classes)
        )
        
        print(f"Model initialized with {backbone} backbone, feature dimension: {feature_dim}")
    
    def forward(self, x):
        # Get features from backbone
        features = self.backbone(x)
        # Apply classifier head
        logits = self.classifier(features)
        return logits

## Training Functions

Define training and validation loops.

In [None]:
def train_epoch(model, train_loader, criterion, optimizer, device, scaler=None, accumulation_steps=1):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    # Reset gradients at the beginning
    optimizer.zero_grad()
    
    pbar = tqdm(train_loader, desc="Training")
    for i, (melspec, targets) in enumerate(pbar):
        # Move data to device
        melspec = melspec.to(device)
        targets = targets.to(device)
        
        # Mixed precision training
        if scaler is not None:
            with autocast():
                # Forward pass
                outputs = model(melspec)
                loss = criterion(outputs, targets) / accumulation_steps  # Normalize loss
            
            # Backward pass with gradient scaling
            scaler.scale(loss).backward()
            
            # Gradient accumulation - only update weights after accumulation_steps
            if (i + 1) % accumulation_steps == 0 or (i + 1) == len(train_loader):
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()
        else:
            # Standard precision training
            outputs = model(melspec)
            loss = criterion(outputs, targets) / accumulation_steps  # Normalize loss
            
            # Backward pass
            loss.backward()
            
            # Gradient accumulation - only update weights after accumulation_steps
            if (i + 1) % accumulation_steps == 0 or (i + 1) == len(train_loader):
                optimizer.step()
                optimizer.zero_grad()
        
        # Update statistics (use the full loss value for logging)
        running_loss += loss.item() * accumulation_steps * melspec.size(0)
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        
        # Update progress bar
        pbar.set_postfix({"loss": loss.item() * accumulation_steps, "acc": 100. * correct / total})
    
    epoch_loss = running_loss / total
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc

def validate(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_targets = []
    all_outputs = []
    
    with torch.no_grad():
        pbar = tqdm(val_loader, desc="Validation")
        for melspec, targets in pbar:
            # Move data to device
            melspec = melspec.to(device)
            targets = targets.to(device)
            
            # Forward pass
            outputs = model(melspec)
            loss = criterion(outputs, targets)
            
            # Store outputs and targets for AUC calculation
            all_outputs.append(outputs.softmax(dim=1).cpu().numpy())
            all_targets.append(F.one_hot(targets, num_classes=outputs.size(1)).cpu().numpy())
            
            # Update statistics
            running_loss += loss.item() * melspec.size(0)
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
            
            # Update progress bar
            pbar.set_postfix({"loss": loss.item(), "acc": 100. * correct / total})
    
    # Concatenate all outputs and targets
    all_outputs = np.concatenate(all_outputs)
    all_targets = np.concatenate(all_targets)
    
    # Calculate metrics
    epoch_loss = running_loss / total
    epoch_acc = 100. * correct / total
    
    # Calculate ROC-AUC (similar to competition metric)
    # Only include classes that have positive examples
    positive_classes = np.sum(all_targets, axis=0) > 0
    if positive_classes.sum() > 0:
        roc_auc = roc_auc_score(all_targets[:, positive_classes], all_outputs[:, positive_classes], average='macro')
    else:
        roc_auc = 0.0
    
    return epoch_loss, epoch_acc, roc_auc

## Training Loop

Define the main training function with early stopping.

In [None]:
def train(model, train_loader, val_loader, criterion, optimizer, scheduler, device, 
          num_epochs=20, patience=5, model_path="best_model.pt"):
    """Train model with early stopping."""
    best_val_auc = 0.0
    epochs_without_improvement = 0
    history = {"train_loss": [], "train_acc": [], "val_loss": [], "val_acc": [], "val_auc": []}
    
    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        
        # Train for one epoch
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
        
        # Validate
        val_loss, val_acc, val_auc = validate(model, val_loader, criterion, device)
        
        # Update learning rate
        if scheduler is not None:
            if isinstance(scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau):
                scheduler.step(val_loss)
            else:
                scheduler.step()
        
        # Print epoch results
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
        print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%, Val AUC: {val_auc:.4f}")
        
        # Update history
        history["train_loss"].append(train_loss)
        history["train_acc"].append(train_acc)
        history["val_loss"].append(val_loss)
        history["val_acc"].append(val_acc)
        history["val_auc"].append(val_auc)
        
        # Check if this is the best model
        if val_auc > best_val_auc:
            best_val_auc = val_auc
            epochs_without_improvement = 0
            print(f"New best validation AUC: {val_auc:.4f}, saving model...")
            torch.save(model.state_dict(), model_path)
        else:
            epochs_without_improvement += 1
            print(f"No improvement for {epochs_without_improvement} epochs. Best Val AUC: {best_val_auc:.4f}")
            
            # Check early stopping
            if epochs_without_improvement >= patience:
                print(f"Early stopping triggered after {epoch+1} epochs")
                break
    
    # Load best model
    model.load_state_dict(torch.load(model_path))
    
    return model, history

## Plotting Functions

Functions to visualize training history.

In [None]:
def plot_training_history(history):
    """Plot training and validation metrics."""
    fig, axes = plt.subplots(1, 3, figsize=(18, 5))
    
    # Plot loss
    axes[0].plot(history["train_loss"], label="Training Loss")
    axes[0].plot(history["val_loss"], label="Validation Loss")
    axes[0].set_xlabel("Epoch")
    axes[0].set_ylabel("Loss")
    axes[0].set_title("Loss Curves")
    axes[0].legend()
    
    # Plot accuracy
    axes[1].plot(history["train_acc"], label="Training Accuracy")
    axes[1].plot(history["val_acc"], label="Validation Accuracy")
    axes[1].set_xlabel("Epoch")
    axes[1].set_ylabel("Accuracy (%)")
    axes[1].set_title("Accuracy Curves")
    axes[1].legend()
    
    # Plot AUC
    axes[2].plot(history["val_auc"], label="Validation AUC")
    axes[2].set_xlabel("Epoch")
    axes[2].set_ylabel("AUC")
    axes[2].set_title("Validation AUC")
    axes[2].legend()
    
    plt.tight_layout()
    plt.show()

## Main Training

Run the training pipeline and save the model for later use in Kaggle.

In [None]:
def main(save_for_kaggle=True):
    # Configuration
    BATCH_SIZE = 32  # Smaller base batch size for gradient accumulation
    ACCUMULATION_STEPS = 8  # Effective batch size = BATCH_SIZE * ACCUMULATION_STEPS = 256
    NUM_EPOCHS = 20
    LEARNING_RATE = 1e-4
    BACKBONE = "efficientnet_b0"  # Or "resnet34", "resnet50", etc.
    NUM_WORKERS = 8  # Increased from 4 to 8 for more efficient data loading
    USE_AMP = True  # Enable mixed precision training
    CACHE_VAL_MELSPEC = True  # Cache validation mel spectrograms
    
    # File paths for saving models and data
    MODEL_PATH = os.path.join(MODEL_SAVE_DIR, "cnn_melspectrogram_model.pt")
    LABEL_MAPPING_PATH = os.path.join(MODEL_SAVE_DIR, "label_mapping.pt")
    KAGGLE_EXPORT_PATH = os.path.join(MODEL_SAVE_DIR, "cnn_melspec_kaggle.pth")
    
    # Load data
    train_df = pd.read_csv(os.path.join(DATA_PATH, 'train.csv'))
    
    # Prepare dataloaders with optimized settings
    train_loader, val_loader, label_to_idx = prepare_dataloaders(
        train_df, DATA_PATH, batch_size=BATCH_SIZE,
        num_workers=NUM_WORKERS, cache_val_melspec=CACHE_VAL_MELSPEC
    )
    num_classes = len(label_to_idx)
    print(f"Number of classes: {num_classes}")
    
    # Create model
    model = CNNMelSpectrogramModel(num_classes, backbone=BACKBONE)
    model = model.to(DEVICE)
    
    # Loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)
    
    # Initialize gradient scaler for mixed precision training
    scaler = GradScaler() if USE_AMP and torch.cuda.is_available() else None
    if scaler is not None:
        print("Using mixed precision training")
    
    # Function to wrap training with all optimizations
    def optimized_train_epoch(model, train_loader, criterion, optimizer, device, scaler, accumulation_steps):
        return train_epoch(model, train_loader, criterion, optimizer, device, scaler, accumulation_steps)
    
    # Modified training loop that passes the scaler and accumulation steps
    def optimized_train(model, train_loader, val_loader, criterion, optimizer, scheduler, device,
                        num_epochs=20, patience=5, model_path="best_model.pt",
                        scaler=None, accumulation_steps=1):
        best_val_auc = 0.0
        epochs_without_improvement = 0
        history = {"train_loss": [], "train_acc": [], "val_loss": [], "val_acc": [], "val_auc": []}
        
        for epoch in range(num_epochs):
            print(f"\nEpoch {epoch+1}/{num_epochs}")
            
            # Train for one epoch with optimizations
            train_loss, train_acc = optimized_train_epoch(
                model, train_loader, criterion, optimizer, device, scaler, accumulation_steps
            )
            
            # Validate
            val_loss, val_acc, val_auc = validate(model, val_loader, criterion, device)
            
            # Update learning rate
            if scheduler is not None:
                if isinstance(scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau):
                    scheduler.step(val_loss)
                else:
                    scheduler.step()
            
            # Print epoch results
            print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
            print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%, Val AUC: {val_auc:.4f}")
            
            # Update history
            history["train_loss"].append(train_loss)
            history["train_acc"].append(train_acc)
            history["val_loss"].append(val_loss)
            history["val_acc"].append(val_acc)
            history["val_auc"].append(val_auc)
            
            # Check if this is the best model
            if val_auc > best_val_auc:
                best_val_auc = val_auc
                epochs_without_improvement = 0
                print(f"New best validation AUC: {val_auc:.4f}, saving model...")
                torch.save(model.state_dict(), model_path)
            else:
                epochs_without_improvement += 1
                print(f"No improvement for {epochs_without_improvement} epochs. Best Val AUC: {best_val_auc:.4f}")
                
                # Check early stopping
                if epochs_without_improvement >= patience:
                    print(f"Early stopping triggered after {epoch+1} epochs")
                    break
        
        # Load best model
        model.load_state_dict(torch.load(model_path))
        
        return model, history
    
    # Train model with optimizations
    model, history = optimized_train(
        model, train_loader, val_loader, criterion, optimizer, scheduler,
        DEVICE, num_epochs=NUM_EPOCHS, model_path=MODEL_PATH,
        scaler=scaler, accumulation_steps=ACCUMULATION_STEPS
    )
    
    # Plot training history
    plot_training_history(history)
    
    # Save mapping
    idx_to_label = {v: k for k, v in label_to_idx.items()}
    label_mapping = {"idx_to_label": idx_to_label}
    torch.save(label_mapping, LABEL_MAPPING_PATH)
    
    # Save a combined model + mapping file for Kaggle
    if save_for_kaggle:
        kaggle_export = {
            "model_state_dict": model.state_dict(),
            "backbone": BACKBONE,
            "label_to_idx": label_to_idx,
            "idx_to_label": idx_to_label,
            "num_classes": num_classes,
            "history": history
        }
        torch.save(kaggle_export, KAGGLE_EXPORT_PATH)
        print(f"Model and metadata saved for Kaggle at: {KAGGLE_EXPORT_PATH}")
    
    print("Training complete!")

In [None]:
# Run the training pipeline
main(save_for_kaggle=True)

## Inference Functions

Define functions for making predictions on new audio.

In [None]:
def predict_audio(model, audio_path, label_to_idx, device=None, sr=32000, duration=5):
    """Make predictions on a single audio file."""
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    model.eval()
    
    # Load audio
    audio, sr = load_audio_file(audio_path, sr=sr)
    if audio is None:
        print(f"Failed to load audio: {audio_path}")
        return None
    
    # Process audio in overlapping windows
    window_duration = duration  # Seconds
    window_length = int(window_duration * sr)
    step_size = window_length // 2  # 50% overlap
    
    all_preds = []
    
    with torch.no_grad():
        for start in range(0, max(1, len(audio) - window_length + 1), step_size):
            end = min(len(audio), start + window_length)
            window = audio[start:end]
            
            # Convert to mel-spectrogram
            melspec = audio_to_image(window, sr=sr, fixed_length=window_duration)
            melspec_tensor = torch.tensor(melspec, dtype=torch.float32).unsqueeze(0).to(device)
            
            # Predict
            outputs = model(melspec_tensor)
            probs = torch.softmax(outputs, dim=1)
            all_preds.append(probs.cpu().numpy())
    
    # Average predictions from all windows
    if all_preds:
        avg_preds = np.mean(np.concatenate(all_preds, axis=0), axis=0)
        
        # Get top predictions
        top_indices = np.argsort(-avg_preds)[:5]  # Top 5 predictions
        idx_to_label = {v: k for k, v in label_to_idx.items()}
        top_preds = [(idx_to_label[idx], avg_preds[idx]) for idx in top_indices]
        
        return top_preds
    else:
        return None

## Test the Model

Make predictions on a few test samples.

In [None]:
def test_model(model_path="cnn_melspectrogram_model.pt", 
               label_mapping_path="label_mapping.pt", 
               backbone="efficientnet_b0"):
    """Test the trained model on a few samples."""
    # Load label mapping
    try:
        label_mapping = torch.load(label_mapping_path)
        idx_to_label = label_mapping["idx_to_label"]
        label_to_idx = {v: k for k, v in idx_to_label.items()}
    except FileNotFoundError:
        print("Label mapping not found. Please train the model first.")
        return
    
    # Create model
    model = CNNMelSpectrogramModel(len(label_to_idx), backbone=backbone)
    
    # Load model weights
    try:
        model.load_state_dict(torch.load(model_path))
    except FileNotFoundError:
        print(f"Model file not found: {model_path}")
        return
    
    model.to(DEVICE)
    model.eval()
    
    # Sample a few test files
    train_df = pd.read_csv(os.path.join(DATA_PATH, 'train.csv'))
    test_samples = train_df.sample(5)
    
    for idx, row in test_samples.iterrows():
        file_path = os.path.join(DATA_PATH, 'train_audio', row['filename'])
        print(f"\nTesting: {row['filename']}")
        print(f"True label: {row['primary_label']}")
        
        # Make prediction
        predictions = predict_audio(model, file_path, label_to_idx, device=DEVICE)
        
        if predictions:
            print("Top predictions:")
            for label, prob in predictions:
                print(f"{label}: {prob:.4f}")
        else:
            print("Failed to make prediction")

In [None]:
# Test the model
# test_model()  # Uncomment to test the model

## Verify Model for Kaggle

Let's verify that the saved model can be loaded and used correctly.

In [None]:
def verify_kaggle_model(model_path=None):
    """Test loading the model saved for Kaggle."""
    if model_path is None:
        model_path = os.path.join(MODEL_SAVE_DIR, "cnn_melspec_kaggle.pth")
    
    print(f"Loading model from {model_path}")
    
    # Load the saved model data
    try:
        kaggle_data = torch.load(model_path)
        print("Successfully loaded model data!")
        
        # Extract metadata
        backbone = kaggle_data.get("backbone", "efficientnet_b0")
        num_classes = kaggle_data.get("num_classes")
        
        print(f"Model information:")
        print(f"- Backbone: {backbone}")
        print(f"- Number of classes: {num_classes}")
        print(f"- Label mapping available: {'Yes' if 'idx_to_label' in kaggle_data else 'No'}")
        
        # Create a new model instance
        model = CNNMelSpectrogramModel(num_classes, backbone=backbone)
        
        # Load the state dict
        model.load_state_dict(kaggle_data["model_state_dict"])
        print("Model weights loaded successfully!")
        
        # Put model in evaluation mode
        model.eval()
        print("Model ready for inference!")
        
        return True
    except Exception as e:
        print(f"Error loading model: {e}")
        return False

In [None]:
# Verify that the model can be loaded for Kaggle
verify_kaggle_model()

## Conclusion

This notebook implemented a CNN model on mel-spectrograms for the BirdCLEF+ 2025 competition. The model uses a pre-trained EfficientNet or ResNet backbone fine-tuned on mel-spectrograms of bird and animal sounds. Key features include:

1. Comprehensive audio preprocessing for mel-spectrogram generation
2. Data augmentation techniques for audio (time shift, noise, pitch shift)
3. Custom dataset and dataloader implementation
4. Pre-trained CNN backbone with transfer learning
5. Training pipeline with early stopping and learning rate scheduling
6. ROC-AUC evaluation matching competition metric
7. Inference with window-based processing for longer audio files

After training, the model is saved in two formats:
1. Separate model weights and label mapping files
2. A combined file for Kaggle with all necessary data for inference

This model represents one component of the ensemble approach described in the project proposal.

## Loading Precomputed Features

Load precomputed mel spectrograms from the shared NPZ file instead of generating them from scratch.

In [None]:
# Define the path to the precomputed features file
# First, download from Google Drive if needed
import gdown
import os

# URL for the shared Google Drive file
PRECOMPUTED_FEATURES_URL = "https://drive.google.com/file/d/1bkkglM6lV1aV-9bSsVpD475YewPwblmL/view?usp=sharing"
PRECOMPUTED_FEATURES_PATH = os.path.join(DATA_PATH, 'bird_features_scratch.npz')

def load_precomputed_features(url=PRECOMPUTED_FEATURES_URL, local_path=PRECOMPUTED_FEATURES_PATH):
    """Download and load precomputed features from Google Drive."""
    # Check if file already exists
    if not os.path.exists(local_path):
        print(f"Downloading precomputed features file to {local_path}...")
        # Use gdown to download the file from Google Drive
        gdown.download(url, local_path, quiet=False, fuzzy=True)
    else:
        print(f"Precomputed features file already exists at {local_path}")
    
    # Load the features
    print("Loading precomputed features...")
    try:
        data = np.load(local_path)
        X = data['data']
        y = data['labels']
        print(f"Successfully loaded precomputed features!")
        print(f"Features shape: {X.shape}")
        print(f"Labels shape: {y.shape}")
        print(f"Number of unique classes: {len(np.unique(y))}")
        return X, y
    except Exception as e:
        print(f"Error loading precomputed features: {e}")
        return None, None

# Load the precomputed features
X_precomputed, y_precomputed = load_precomputed_features()

## Training with Precomputed Features

Let's create a more efficient training pipeline using the precomputed features instead of generating them on the fly.

In [None]:
def precomputed_features_training(X_features=None, y_labels=None, test_size=0.2, seed=42):
    """Set up and train the model using precomputed features instead of generating them on the fly."""
    # If features aren't provided, load them first
    if X_features is None or y_labels is None:
        X_features, y_labels = load_precomputed_features()
        if X_features is None:
            print("Failed to load precomputed features. Exiting.")
            return None, None
    
    print("Setting up training with precomputed features...")
    
    # Split data into training and validation sets
    X_train, X_val, y_train, y_val = train_test_split(X_features, y_labels, test_size=test_size, random_state=seed, stratify=y_labels)
    print(f"Training set: {X_train.shape[0]} samples")
    print(f"Validation set: {X_val.shape[0]} samples")
    
    # Convert to PyTorch tensors
    X_train_tensor = torch.FloatTensor(X_train)
    y_train_tensor = torch.LongTensor(y_train)
    X_val_tensor = torch.FloatTensor(X_val)
    y_val_tensor = torch.LongTensor(y_val)
    
    # Create data loaders
    train_dataset = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor)
    val_dataset = torch.utils.data.TensorDataset(X_val_tensor, y_val_tensor)
    
    # Use larger batch sizes since we're not generating features on the fly
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4, pin_memory=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=128, shuffle=False, num_workers=4, pin_memory=True)
    
    # Configuration
    NUM_EPOCHS = 20
    LEARNING_RATE = 1e-4
    BACKBONE = "efficientnet_b0"
    MODEL_PATH = os.path.join(MODEL_SAVE_DIR, "cnn_melspectrogram_precomputed.pt")
    KAGGLE_EXPORT_PATH = os.path.join(MODEL_SAVE_DIR, "cnn_melspec_precomputed_kaggle.pth")
    
    # Get number of unique classes
    num_classes = len(np.unique(y_labels))
    print(f"Number of classes: {num_classes}")
    
    # Create a mapping from index to label
    # Since we just have numerical labels from the NPZ file, we'll create a simple mapping
    idx_to_label = {int(idx): f"class_{idx}" for idx in range(num_classes)}
    label_to_idx = {v: k for k, v in idx_to_label.items()}
    
    # Create and train the model
    model = CNNMelSpectrogramModel(num_classes, backbone=BACKBONE)
    model = model.to(DEVICE)
    
    # Loss function, optimizer and scheduler
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)
    
    # Use mixed precision training if available
    scaler = GradScaler() if torch.cuda.is_available() else None
    if scaler is not None:
        print("Using mixed precision training")
    
    # Start training
    model, history = train(model, train_loader, val_loader, criterion, optimizer, scheduler, DEVICE, 
                          num_epochs=NUM_EPOCHS, patience=5, model_path=MODEL_PATH)
    
    # Plot training history
    plot_training_history(history)
    
    # Save model for Kaggle
    kaggle_export = {
        "model_state_dict": model.state_dict(),
        "backbone": BACKBONE,
        "label_to_idx": label_to_idx,
        "idx_to_label": idx_to_label,
        "num_classes": num_classes,
        "history": history
    }
    torch.save(kaggle_export, KAGGLE_EXPORT_PATH)
    print(f"Model and metadata saved for Kaggle at: {KAGGLE_EXPORT_PATH}")
    
    print("Training complete!")
    return model, history

In [None]:
# Run training with precomputed features
model_precomputed, history_precomputed = precomputed_features_training(X_precomputed, y_precomputed)