## This is starter code for single point prediction with CNNs

In [1]:
import os

# common math imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# common torch imports
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# common sklearn imports 
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

## Load and process data

In [2]:
# tensor covariate data (audio processing)
X = torch.load('data/bc22-32/X_tensor.pt')

# meta data information
metadata = pd.read_csv('data/bc22-32/orig_metadata.csv')
filtdata = pd.read_csv('data/bc22-32/train_metadata_cleaned.csv')
files = np.loadtxt('data/bc22-32/files.csv', dtype=str)

In [3]:
# tensor target data (classification labels)
# by zero crossing rate inferences
# which is strong labeling
# assuming that my zcr algorithm is correct
y = torch.load('data/bc22-32/y_tensor.pt')

In [4]:
# or, this tensor target data can be created from the metadata
# by encoding the file names
# which is weak labeling
# assuming that most all timeslices have a bird
file_encoder = LabelEncoder()
files_ = [file.split('/')[-2] for file in files]
y = file_encoder.fit_transform(files_)

## Some simple CNN models

Note that you can use these for multilabel or single categorical label classification, which will depend on your choice of target and loss.

In [None]:
class SimpleCNN(nn.Module):
    """Simple Convolutional Neural Network for audio classification."""

    def __init__(self, num_classes, in_channels=1, 
                 out_channels1=16, out_channels2=32,
                 fc_hidden_units=64, 
                 kernel_size=3, stride=1, padding=1,
                 pooling_size=2, dropout=0.5):
        """Initialize Simple Convolutional Neural Network for audio classification.

        Parameters
        ----------
        num_classes : int
            Number of output classes for classification.
        in_channels : int, optional
            Number of input channels (default is 1 for grayscale audio).
        out_channels1 : int, optional
            Number of output channels for the first convolutional layer.
        out_channels2 : int, optional
            Number of output channels for the second convolutional layer.
        fc_hidden_units : int, optional
            Number of hidden units in the fully connected layer.
        kernel_size : int or tuple, optional
            Size of the convolutional kernel.
        stride : int or tuple, optional
            Stride of the convolutional operation.
        padding : int or tuple, optional
            Padding added to the input tensor.
        pooling_size : int or tuple, optional
            Size of the pooling operation.
        dropout : float, optional
            Dropout rate for regularization (default is 0.5).

        Returns
        -------
        None
        """
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels1, 
                               kernel_size=kernel_size, stride=stride, padding=padding)
        self.conv2 = nn.Conv2d(out_channels1, out_channels2, 
                               kernel_size=kernel_size, stride=stride, padding=padding)
        self.pool = nn.MaxPool2d(pooling_size, pooling_size)
        self.dropout = nn.Dropout(dropout)
        self.fc1 = nn.LazyLinear(fc_hidden_units)
        self.fc2 = nn.Linear(fc_hidden_units, num_classes)

    def forward(self, x):
        """Forward pass of the SimpleCNN model.

        Parameters
        ----------
        x : torch.Tensor
            Input tensor of shape (batch_size, in_channels, height, width).

        Returns
        -------
        torch.Tensor
            Output tensor of shape (batch_size, num_classes).
        """
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
class SchMichiganCNN(nn.Module):
    """SchMichiganCNN: A Convolutional Neural Network for audio classification.
    This model is designed to process audio data represented as spectrograms or sonograms.
    It includes multiple convolutional layers, pooling layers, and fully connected layers.
    """

    def __init__(self, 
                 num_classes,
                 height=32,
                 width=50,
                 kernel_sizes=[3,3],
                 padding_sizes=[1,1],
                 stride_sizes=[1,1],
                 pooling_kernels=[(2,2),(2,2)],
                 pooling_strides=[(2,2),(2,2)],
                 channel_sizes=[64,64], 
                 fc_sizes=[64,64],
                 fc_activation=F.relu,
                 dropout=0.5,
                 batchNormalization=False, 
                 leakyRelu=False, 
                 ):
        """Initialize the SchMichiganCNN model.

        Parameters
        ----------
        num_classes : int
            Number of output classes for classification.
        height : int, optional
            Height of the input spectrogram (default is 32).
        width : int, optional
            Width of the input spectrogram (default is 50).
        kernel_sizes : list of int or tuple, optional
            List of kernel sizes for each convolutional layer.
        padding_sizes : list of int or tuple, optional
            List of padding sizes for each convolutional layer.
        stride_sizes : list of int or tuple, optional
            List of stride sizes for each convolutional layer.
        pooling_kernels : list of int or tuple, optional
            List of pooling kernel sizes for each pooling layer.
        pooling_strides : list of int or tuple, optional
            List of pooling stride sizes for each pooling layer.
        channel_sizes : list of int, optional
            List of output channel sizes for each convolutional layer.
        fc_sizes : list of int, optional
            List of sizes for the fully connected layers.
        fc_activation : callable, optional
            Activation function for the fully connected layers (default is ReLU).
        dropout : float, optional
            Dropout rate for regularization (default is 0.5).
        batchNormalization : bool, optional
            Whether to apply batch normalization after each convolutional layer (default is False).
        leakyRelu : bool, optional
            Whether to use Leaky ReLU activation instead of ReLU (default is False).

        Returns
        -------
        None
        """
        super(SchMichiganCNN, self).__init__()

        # Validate input parameters
        assert len(kernel_sizes) > 0, "At least one kernel size must be provided"
        ks = kernel_sizes
        ps = padding_sizes
        ss = stride_sizes
        nm = channel_sizes
        pks = pooling_kernels
        pss = pooling_strides
        assert len(ks) == len(ps) == len(ss) == len(nm), \
            "Kernel sizes, padding sizes, stride sizes, and channel sizes must have the same length"
        assert len(pks) == len(pss) == len(nm), \
            "Pooling kernel sizes and pooling stride sizes must match the number of channel sizes" 

        # Define the CNN layers
        cnn = nn.Sequential()
        nIn = 1 # Assuming sonogram input is 1 dimensional
        out_height, out_width = height, width
        for i in range(len(nm)):
            nOut = nm[i]
            # Add a convolutional layer
            cnn.add_module('conv{0}'.format(i),
                           nn.Conv2d(nIn, nOut, ks[i], ss[i], ps[i]))
            # Calculate output height and width after this conv layer
            out_height = (out_height + 2 * ps[i] - ks[i]) // ss[i] + 1
            out_width = (out_width + 2 * ps[i] - ks[i]) // ss[i] + 1

            # Add batch normalization if specified
            if batchNormalization:
                cnn.add_module('batchnorm{0}'.format(i), nn.BatchNorm2d(nOut))

            # Add activation function
            if leakyRelu:
                cnn.add_module('relu{0}'.format(i),
                               nn.LeakyReLU(0.2, inplace=True))
            else:
                cnn.add_module('relu{0}'.format(i), nn.ReLU(True))

            # Add the pooling layer
            pooling_kernel = pooling_kernels[i]
            pooling_stride = pooling_strides[i]
            cnn.add_module('pooling{0}'.format(i), nn.MaxPool2d(pooling_kernel, pooling_stride))
            # Update output height and width after pooling
            out_height = (out_height - pooling_kernel[0]) // pooling_stride[0] + 1
            out_width = (out_width - pooling_kernel[1]) // pooling_stride[1] + 1

            # Add dropout if specified
            if dropout > 0:
                cnn.add_module('dropout{0}'.format(i), nn.Dropout(dropout))

            # Update input channels for the next layer
            nIn = nOut

        # Save the CNN layers and output dimensions
        self.cnn = cnn
        self.height = out_height
        self.width = out_width

        # Define the fully connected layers
        assert len(fc_sizes) == 2, "Can only have 2 fully connected layers"
        self.fc1 = nn.Linear(nm[-1] * out_height * out_width, fc_sizes[0])
        # self.fc1 = nn.LazyLinear(fc_sizes[0])
        self.fc2 = nn.Linear(fc_sizes[1], num_classes)
        self.fc_activation = fc_activation

    def forward(self, x):
        """Forward pass of the SchMichiganCNN model.

        Parameters
        ----------
        x : torch.Tensor
            Input tensor of shape (batch_size, in_channels, height, width).

        Returns
        -------
        torch.Tensor
            Output tensor of shape (batch_size, num_classes).
        """
        x = self.cnn(x)
        x = x.view(x.size(0), -1) # x.size(0) is the batch size
        x = self.fc_activation(self.fc1(x))
        x = self.fc2(x)
        return x

## Reset categorical labels if analyzing a bird subset

In [66]:
# Analyze common species only
categories = filtdata['primary_label'].value_counts()[:5].index.tolist()
filtdata_v2 = filtdata[filtdata['primary_label'].isin(categories)]
common_labels = filtdata_v2['filename'].unique()
files_v2 = ['/'.join(file.split('/')[2:]) for file in files]
bools = np.isin(files_v2, common_labels)
X2 = X[bools]
y2 = y[bools]

In [67]:
# Because the common species don't have 1 - 10 labels
# Get unique values and create mapping
unique_vals = np.unique(y2)
val_to_new = {old.item(): new for new, old in enumerate(unique_vals)}

# Remap y2 using the mapping
y2_mapped = torch.tensor([val_to_new[val.item()] for val in y2], dtype=torch.long)

## Make train-test split and one hot encoding

In [68]:
batch_size = 200

X2 = X2.float()
num_classes = y2_mapped.unique().numel()
y_onehot = F.one_hot(y2_mapped, num_classes=num_classes)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    X2, y_onehot, test_size=0.2, stratify=y2_mapped, random_state=42
)

# Print the sizes of the splits
print(f"Training data size: {X_train.shape[0]}")
print(f"Testing data size: {X_test.shape[0]}")

train_data = TensorDataset(X_train, y_train)
test_data = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

Training data size: 16741
Testing data size: 4186


## Initialize the model

In [None]:
# Example usage
num_classes = len(y2_mapped.unique())  # Number of unique categories in y_encoded
cnn_model = SchMichiganCNN(num_classes,
                           channel_sizes=[32, 32],
                           fc_sizes=[32, 32],
                           batchNormalization=False,)
# cnn_model = SimpleCNN(num_classes,)

Total Conv2d parameters: 9568
Total Fully Connected (Linear) parameters: 98534

SchMichiganCNN(
  (cnn): Sequential(
    (conv0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (relu0): ReLU(inplace=True)
    (pooling0): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
    (dropout0): Dropout(p=0.5, inplace=False)
    (conv1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (relu1): ReLU(inplace=True)
    (pooling1): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
    (dropout1): Dropout(p=0.5, inplace=False)
  )
  (fc1): Linear(in_features=3072, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=6, bias=True)
)


#### Report the size of the model

In [None]:
if type(cnn_model) == SchMichiganCNN:
    conv_params = 0
    fc_params = 0

    for name, param in cnn_model.named_parameters():
        if 'conv' in name:
            conv_params += param.numel()
        elif 'fc' in name:
            fc_params += param.numel()

    print(f"Total Conv2d parameters: {conv_params}")
    print(f"Total Fully Connected (Linear) parameters: {fc_params}")
    print()

print(cnn_model)

## Set up training parameters

In [133]:
# Example usage

num_epochs = 10  # Number of epochs to train

# Define the loss function and optimizer
# criterion = nn.CrossEntropyLoss()  # Suitable for multi-class classification
criterion = nn.BCEWithLogitsLoss()  # Suitable for multi-label classification

# Choose an optimizer for training
optimizer = optim.Adam(cnn_model.parameters(), lr=0.01)  # You can adjust the learning rate

# Alternatively, use SGD optimizer
# optimizer = optim.SGD(cnn_model.parameters(), lr=0.01, momentum=0.9)


## Perform training

In [134]:
# # Training loop
for epoch in range(num_epochs):
    cnn_model.train()  # Set the model to training mode
    running_loss = 0.0

    for inputs, labels in train_loader:

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = cnn_model(inputs)

        # Compute the loss
        loss = criterion(outputs, labels.float())

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Accumulate the loss
        running_loss += loss.item()

    # Print the average loss for this epoch
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(train_loader):.4f}")

Epoch 1/10, Loss: 0.4417
Epoch 2/10, Loss: 0.3832
Epoch 3/10, Loss: 0.3618
Epoch 4/10, Loss: 0.3410
Epoch 5/10, Loss: 0.3309
Epoch 6/10, Loss: 0.3200
Epoch 7/10, Loss: 0.3139
Epoch 8/10, Loss: 0.3099
Epoch 9/10, Loss: 0.3058
Epoch 10/10, Loss: 0.2969


## Evaluate the model

In [135]:
# Evaluate the model's accuracy on the training data
cnn_model.eval()  # Set the model to evaluation mode
correct1 = 0
correct2 = 0
total1 = 0
total2 = 0

with torch.no_grad():  # Disable gradient computation for evaluation
    for inputs, labels in train_loader:

        # Forward pass
        outputs = cnn_model(inputs)

        # Get the predicted class (index of the maximum value in the output)
        # For multi-label (one-hot) targets, use sigmoid and threshold at 0.5
        probs = torch.sigmoid(outputs)
        predicted = (probs > 0.5).int()
        labels = labels.int()
        # print((predicted == labels).all(dim=1).float().mean().item())

        # Update total and correct counts
        # For multi-label accuracy, count samples where all labels match
        correct1 += (predicted == labels).all(dim=1).sum().item()
        total1 += labels.size(0)
        correct2 += ((predicted == labels) * labels).sum()
        total2 += labels.size(0)

# Calculate and print the accuracy
accuracy1 = correct1 / total1 * 100
accuracy2 = correct2 / total2 * 100
print("CNN model")
print(f"Training Accuracy (all labels): {accuracy1:.2f}%")
print(f"Training Accuracy (single label): {accuracy2:.2f}%")

CNN model
Training Accuracy (all labels): 41.65%
Training Accuracy (single label): 42.61%


In [136]:
# Evaluate the model's accuracy on the training data
cnn_model.eval()  # Set the model to evaluation mode
correct1 = 0
correct2 = 0
total1 = 0
total2 = 0

with torch.no_grad():  # Disable gradient computation for evaluation
    for inputs, labels in test_loader:

        # Forward pass
        outputs = cnn_model(inputs)

        # Get the predicted class (index of the maximum value in the output)
        # For multi-label (one-hot) targets, use sigmoid and threshold at 0.5
        probs = torch.sigmoid(outputs)
        predicted = (probs > 0.5).int()
        labels = labels.int()
        # print((predicted == labels).all(dim=1).float().mean().item())

        # Update total and correct counts
        # For multi-label accuracy, count samples where all labels match
        correct1 += (predicted == labels).all(dim=1).sum().item()
        total1 += labels.size(0)
        correct2 += ((predicted == labels) * labels).sum()
        total2 += labels.size(0)

# Calculate and print the accuracy
accuracy1 = correct1 / total1 * 100
accuracy2 = correct2 / total2 * 100
print("CNN model")
print(f"Testing Accuracy (all labels): {accuracy1:.2f}%")
print(f"Testing Accuracy (single label): {accuracy2:.2f}%")

CNN model
Testing Accuracy (all labels): 42.47%
Testing Accuracy (single label): 43.43%
