Credit: https://www.kaggle.com/code/jocelyndumlao/birdclef-2025-inference-w-simplecnn-spectrogram <br>
Changed CNN architecture.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import os
import librosa
import numpy as np
import pandas as pd

import gc
import dataclasses
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Callable, Tuple, List
import traceback  # Import traceback module

In [None]:
test_data = "/kaggle/input/birdclef-2025/test_soundscapes"
submission = "/kaggle/input/birdclef-2025/sample_submission.csv"
train_csv = "/kaggle/input/birdclef-2025/train.csv"
taxonomy_csv = "/kaggle/input/birdclef-2025/taxonomy.csv"

transform: Optional[Callable] = None  # Type hint for transform
audio_transform: Optional[Callable] = None # Type hint for audio_transform

@dataclasses.dataclass
class AudioParam:
    SR: int = 32_000  # Sample rate
    NFFT: int = 2048  # Number of FFT points
    NMEL: int = 128   # Number of Mel bands
    FMAX: int = 16_000 # Maximum frequency
    FMIN: int = 20   # Minimum frequency
    HOP_LENGTH: int = NFFT // 4  # Hop length

audio_param = AudioParam()

# Load submission CSV to get class names
try:
    sub_csv = pd.read_csv(submission)
    idx2cls = sub_csv.columns.drop("row_id").tolist()  # List of bird species (class names)
    cls2idx = {c: i for i, c in enumerate(idx2cls)} # Class name to index mapping
except FileNotFoundError as e:
    print(f"Error: sample_submission.csv not found! {e}")
    idx2cls = [] # Provide a default for testing, but the code will likely fail
    cls2idx = {}


DEBUG = True # Enable Debugging
file_names = [os.path.join(test_data, fp) for fp in os.listdir(test_data) if fp.endswith(".ogg")]

# Use a single file for debugging.  This makes the matrix dimension calculations easier.
if len(file_names) == 0:
    file_names = [
        "/kaggle/input/birdclef-2025/train_soundscapes/H02_20230420_074000.ogg",
    ]
    DEBUG = True


In [None]:
class ImprovedCNN(nn.Module):
    def __init__(self, num_classes: int):
        super().__init__()
        # Convolutional feature extractor
        self.features = nn.Sequential(
            # Block 1
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),

            # Block 2
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),

            # Block 3
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),

            # Block 4
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
        )

        # Global pooling + classifier
        self.global_pool = nn.AdaptiveAvgPool2d((1, 1))  # collapse H×W → 1×1
        self.dropout     = nn.Dropout(0.5)
        self.classifier  = nn.Linear(256, num_classes)

        # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
            elif isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: (B, 1, H, W)
        x = self.features(x)               # → (B, 256, H', W')
        x = self.global_pool(x)            # → (B, 256, 1, 1)
        x = torch.flatten(x, 1)            # → (B, 256)
        x = self.dropout(x)
        x = self.classifier(x)             # → (B, num_classes)
        return x

In [None]:
model = ImprovedCNN(num_classes=len(idx2cls))
model.eval()

def pipeline(x: np.ndarray) -> np.ndarray:
    """
    Converts audio data to a mel spectrogram and then to a dB scale.
    """
    try:
        mels = librosa.feature.melspectrogram(
            y=x,
            sr=audio_param.SR,
            n_fft=audio_param.NFFT,
            n_mels=audio_param.NMEL,
            fmax=audio_param.FMAX,
            fmin=audio_param.FMIN,
            hop_length=audio_param.HOP_LENGTH,
        )
        db_map = librosa.power_to_db(mels, ref=np.max)
        db_map = (db_map + 80) / (80 + 1e-6)  # Normalize to [0, 1] - Added small constant
        if np.isnan(db_map).any():
            print("Warning: NaN values detected in db_map!")
            db_map = np.nan_to_num(db_map) #Replace with 0

        return db_map[None, :, :]  # Add a channel dimension (1, height, width)
    except Exception as e:
        print(f"Error in pipeline: {e}")
        return np.zeros((1, audio_param.NMEL, 1)) # return a zero array


In [None]:
@torch.no_grad()
def predict(fp: str) -> Tuple[np.ndarray, List[str]]:
    """
    Predicts bird calls in a given audio file.

    Args:
        fp (str): File path of the audio file.

    Returns:
        Tuple[np.ndarray, List[str]]: Tuple containing the model output and the list of row IDs.
    """
    try:
        x, _ = librosa.load(fp, sr=audio_param.SR)  # Load the audio file.

        if x.size == 0:
            print(f"Warning: Audio file {fp} is empty!")
            return np.array([]), [] #return empty arrays
    except Exception as e:
        print(f"Error loading file {fp}: {e}")
        return np.array([]), []

    # Number of 5-second segments
    num_segments = int(np.floor(len(x) / audio_param.SR / 5))
    all_outs = []
    all_row_ids = []
    for i in range(num_segments):
        start = i * audio_param.SR * 5
        end = (i + 1) * audio_param.SR * 5
        segment = x[start:end]


        if audio_transform is not None:
            try:
                segment = audio_transform(sample=segment, sample_rate=audio_param.SR) #Apply audio transform
            except Exception as e:
                print(f"Audio Transform Failed {e}")

        try:
            segment = pipeline(segment)  #Convert waveform to mel spectrogram.
        except Exception as e:
            print(f"Pipeline failed {e}")
            continue

        if transform is not None:
            try:
                segment = transform(image=segment)["image"] #Apply image transform.
            except Exception as e:
                print(f"Transform failed {e}")
                continue

        try:
            segment = torch.from_numpy(segment).float().unsqueeze(0)  # Convert to tensor and add batch dimension.
            out = model(segment).sigmoid().detach().cpu().numpy() # Get the model output.
            all_outs.append(out[0])

            fp_name = os.path.basename(fp).split(".")[0] #Extract the base filename.
            row_id = f"{fp_name}_{(i + 1) * 5}" #Create row IDs.  Correct the slice name
            all_row_ids.append(row_id)
        except Exception as e:
            print(f"Error during processing of segment {i} in {fp}: {e}  {traceback.format_exc()}") #Print trace

    return np.array(all_outs), all_row_ids # return all values


In [None]:
row_id = []
matrix = []

with ThreadPoolExecutor(max_workers=4) as executor:
    for fp_idx, (fp) in enumerate(file_names):
        try:
            out, rid = predict(fp)
            if len(rid) > 0:
                row_id.extend(rid)
                matrix.extend(out)
            else:
                print(f"Warning: No predictions generated for file: {fp}")
        except Exception as e:
            print(f"Failed to run predict for file {fp} {e}")
        gc.collect() #Collect after each file
        print(f"Finished {fp_idx+1}/{len(file_names)}")

try:
    matrix = np.array(matrix).reshape(-1, len(idx2cls))
    row_id = np.array(row_id).reshape(-1, 1)
    matrix = np.hstack([row_id, matrix])

    # Create a Pandas DataFrame from the results.
    sub = pd.DataFrame(matrix, columns=["row_id", *idx2cls])
    sub.to_csv('submission.csv', index=False)

    print(sub.head())
except Exception as e:
    print(f"Error creating submission file {e}")

print("Finished!")
gc.collect()