This is the first version of the CNN.
It has a simple CNN that can serve as a baseline for more sophisticated models.

In [None]:
# Set path variables
import os
import sys

cwd = os.getcwd()
project_dir = os.path.abspath(os.path.join(cwd, os.pardir))
sys.path.append(project_dir)
data_path = os.path.join(project_dir, 'data/')
print(project_dir)
print(data_path)

In [None]:
# for data loading process
from src.data_loader import *
import pandas as pd

# load your libraries here
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from tqdm import tqdm

# Modeling (Adjust to whatever model you want to do)

## Data loading

In [None]:
# Load label annotation csv
train_annotations = pd.read_csv(data_path + 'mtat_train_label.csv', index_col=0).reset_index(drop=True)
val_annotations = pd.read_csv(data_path + 'mtat_val_label.csv', index_col=0).reset_index(drop=True)
test_annotations = pd.read_csv(data_path + 'mtat_test_label.csv', index_col=0).reset_index(drop=True)

### FOR TRANSFORMED AUDIO DATA (mel spectrograms with db)

Set transformation parameter to MEL_SPEC_DB_TRANSFORMATION

In [None]:
# Define global parameters across all classes
DATA_DIR = data_path
SAMPLE_RATE = 16000
DURATION_IN_SEC = 30
MEL_SPEC_DB_TRANSFORMATION = AudioUtil.get_audio_transforms(SAMPLE_RATE)

train_data = AudioDS(annotations_file=train_annotations, 
                     data_dir=DATA_DIR, 
                     target_sample_rate=SAMPLE_RATE, 
                     target_length=DURATION_IN_SEC, 
                     transformation=MEL_SPEC_DB_TRANSFORMATION)

val_data = AudioDS(annotations_file=val_annotations,
                     data_dir=DATA_DIR,
                     target_sample_rate=SAMPLE_RATE,
                     target_length=DURATION_IN_SEC,
                     transformation=MEL_SPEC_DB_TRANSFORMATION)

test_data = AudioDS(annotations_file=val_annotations,
                     data_dir=DATA_DIR,
                     target_sample_rate=SAMPLE_RATE,
                     target_length=DURATION_IN_SEC,
                     transformation=MEL_SPEC_DB_TRANSFORMATION)

In [None]:
# Load data from created datasets
BATCH_SIZE = 64

train_dataloader_melspec = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
val_dataloader_melspec = DataLoader(val_data, batch_size=BATCH_SIZE, shuffle=False)
test_dataloader_melspec = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
# Display batch information
train_features, train_labels = next(iter(train_dataloader_melspec))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")

In [None]:
# Retrieve a sample
idx = 9
melspec = train_features[idx]
label = train_labels[idx]
decoded_labels = train_data.decode_labels(label)
file_path = train_data.get_filepath(idx)

print(f"Audio file path: {file_path}")
print(f"Label: {label}")
print(f"Decoded labels: {decoded_labels}")

In [None]:
# shape of melspec
# first dimension: number of channels (1 - mono, 2 - stereo)
# second dimension: number of mel frequency bands
# third dimension: number of time frames in spectogams
melspec.shape

## Baseline CNN

### CNN Class

In [None]:
# Get cpu, gpu or mps device for training.
device = (
    "cuda" if torch.cuda.is_available() else 
    "mps" if torch.backends.mps.is_available() else 
    "cpu"
)

In [None]:
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        # Convolutional layers
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)

        # Max pooling
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)

        # Fully connected layers
        self.fc1 = nn.Linear(64 * 8 * 375, 500)  # Adjusted input dimensions
        self.fc2 = nn.Linear(500, 50)  # 50 classes

    def forward(self, x):
        # Add sequence of convolutional and max pooling layers
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))

        # Flatten the output for the fully connected layers
        x = x.view(-1, 64 * 8 * 375)  # Adjusted flattening dimensions

        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


### Instantiation

In [None]:
# Create an instance of the model
model = SimpleCNN()

# Move the model to the selected device
model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()  # Suitable for multi-class classification
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

## Training and Validation

### Training Loop

In [None]:
def train(dataloader, model, loss_fn, optimizer, device):
        model.train()  # Set the model to training mode
        running_loss = 0.0
        progress_bar = tqdm(enumerate(dataloader), total=len(dataloader), desc="Training")

        for i, data in progress_bar:
            # Get the input features and labels from the data loader
            inputs, labels = data[0].to(device), data[1].to(device)

            # Convert boolean labels to class indices if necessary
            if labels.dtype == torch.bool:
                labels = labels.type(torch.long)
                labels = torch.argmax(labels, dim=1)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            # Update running loss
            running_loss += loss.item()

            # Update the progress bar
            progress_bar.set_postfix({"loss": running_loss / (i + 1)})

## Validation Loop

In [None]:
def validate(dataloader, model, loss_fn, device):
    model.eval()  # Set the model to evaluation mode
    val_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)

            # Convert boolean labels to class indices if necessary
            if y.dtype == torch.bool:
                y = y.type(torch.long)
                y = torch.argmax(y, dim=1)

            pred = model(X)
            val_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    val_loss /= len(dataloader.dataset)
    correct /= len(dataloader.dataset)
    print(f"Validation Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {val_loss:>8f} \n")

### Run Training and Validation

In [None]:
# Training with validation after every epoch
num_epochs = 10

for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")
    train(train_dataloader_melspec, model, criterion, optimizer, device) # Call Train loop
    validate(val_dataloader_melspec, model, criterion, device) # Call validation loop
    torch.save(model.state_dict(), f"model_epoch_{epoch}.pth") # Saves model state after every epoch

print("Training and validation done!")

### Test Loop

In [None]:
def test(dataloader, model, loss_fn, device):
    model.eval()  # Set the model to evaluation mode
    test_loss, correct = 0, 0
    with torch.no_grad():  # No need to track gradients
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)

            # Convert boolean labels to class indices if necessary
            if y.dtype == torch.bool:
                y = y.type(torch.long)
                y = torch.argmax(y, dim=1)

            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= len(dataloader.dataset)
    correct /= len(dataloader.dataset)
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

### Run Testing

In [None]:
for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")
    test(test_dataloader_melspec, model, criterion, device)

print("Testing done!")