In [38]:
from torch.utils.tensorboard import SummaryWriter

from torch.utils.data import Dataset
import torch.optim as optim
from torch.nn import init
import torch.nn as nn
import torchaudio
import torch

from optuna.trial import *
import optuna

from pathlib import Path
import pandas as pd
import soundfile  # read audio
import random

In [39]:
# find the device available for training
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [40]:
# Read metadata and split file

# name of the original metadata file
metadata_file = "esc50.csv"

# chosen categories
categories = [
    "airplane",
    "thunderstorm",
    "vacuum_cleaner",
    "cat",
    "chainsaw",
    "dog",
    "chirping_birds",
    "keyboard_typing",
    "fireworks",
    "church_bells",
]

# read metadata, add backslash to filename and filter categories
metadata = pd.read_csv(metadata_file)
metadata["filename"] = "\\" + metadata["filename"]
metadata = metadata[metadata["category"].isin(categories)].reset_index(drop=True)

# assign new target number to each category
for category in categories:
    metadata.loc[metadata["category"] == category, "target"] = categories.index(
        category
    )

# take the 1+ folds as training data and the first fold as validation and test data
metadata_train = metadata[metadata["fold"] > 1].reset_index(drop=True)
metadata_val_test = metadata[metadata["fold"] == 1].reset_index(drop=True)

# list of metadata for validation and test data
metadata_validation = []
metadata_test = []

# sample one item from each category for validation and test data
# repeat until for each category there are no more items left
for category in categories:
    for _ in range(
        round(metadata_val_test[metadata_val_test["category"] == category].shape[0] / 2)
    ):
        item = metadata_val_test[metadata_val_test["category"] == category].sample(2)
        metadata_validation.append(item.iloc[0])
        metadata_test.append(item.iloc[1])
        metadata_val_test = metadata_val_test.drop(item.index)

# convert validation and test metadata to dataframe
metadata_validation = pd.DataFrame(metadata_validation)
metadata_test = pd.DataFrame(metadata_test)

# create category map where each target number has its corresponding category name
category_map = (
    metadata_test[["target", "category"]].drop_duplicates().reset_index(drop=True)
)

# save metadata and category map to csv
metadata_folder = Path("metadata")
metadata_train.to_csv(metadata_folder / "metadata_train.csv", index=False)
metadata_validation.to_csv(metadata_folder / "metadata_validation.csv", index=False)
metadata_test.to_csv(metadata_folder / "metadata_test.csv", index=False)
category_map.to_csv(metadata_folder / "category_map.csv", index=False)

In [41]:
# class with audio utility functions for loading, augmenting and transforming audio files
class AudioUtil:
    # ----------------------------
    # Load an audio file. Return the signal as a tensor and the sample rate
    # ----------------------------
    @staticmethod
    def open(audio_file):
        signal_tensor, sample_rate = torchaudio.load(audio_file)
        return (signal_tensor, sample_rate)

    # ----------------------------
    # Shifts the signal to the left or right by some percent. Values at the end
    # are 'wrapped around' to the start of the transformed signal.
    # ----------------------------
    @staticmethod
    def time_shift(aud, shift_limit):
        sig, sr = aud
        _, sig_len = sig.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        return (sig.roll(shift_amt), sr)

    # ----------------------------
    # Generate a Spectrogram
    # ----------------------------
    @staticmethod
    def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
        sig, sr = aud
        top_db = 80

        # spec has shape [channel, n_mels, time], where channel is mono, stereo etc
        spec = torchaudio.transforms.MelSpectrogram(
            sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels
        )(sig)

        # Convert to decibels
        spec = torchaudio.transforms.AmplitudeToDB(top_db=top_db)(spec)
        return spec

    # ----------------------------
    # Augment the Spectrogram by masking out some sections of it in both the frequency
    # dimension (ie. horizontal bars) and the time dimension (vertical bars) to prevent
    # overfitting and to help the model generalise better. The masked sections are
    # replaced with the mean value.
    # ----------------------------
    @staticmethod
    def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
        _, n_mels, n_steps = spec.shape
        mask_value = spec.mean()
        aug_spec = spec

        freq_mask_param = max_mask_pct * n_mels
        for _ in range(n_freq_masks):
            aug_spec = torchaudio.transforms.FrequencyMasking(freq_mask_param)(
                aug_spec, mask_value
            )

        time_mask_param = max_mask_pct * n_steps
        for _ in range(n_time_masks):
            aug_spec = torchaudio.transforms.TimeMasking(time_mask_param)(
                aug_spec, mask_value
            )

        return aug_spec

In [42]:
# class for creating a dataset from the metadata files
# ----------------------------
# Sound Dataset
# ----------------------------
class SoundDS(Dataset):
    def __init__(self, df, data_path, augmentation):
        self.df = df
        self.augmentation = augmentation
        self.data_path = str(data_path)
        self.shift_pct = 0.4

    # ----------------------------
    # Number of items in dataset
    # ----------------------------
    def __len__(self):
        return len(self.df)

    # ----------------------------
    # Get i'th item in dataset
    # ----------------------------
    def __getitem__(self, idx):
        # Absolute file path of the audio file - concatenate the audio directory with
        # the relative path
        audio_file = self.data_path + self.df.loc[idx, "filename"]
        # Get the Class ID
        class_id = self.df.loc[idx, "target"]

        aud = AudioUtil.open(audio_file)

        if self.augmentation:
            aud = AudioUtil.time_shift(aud, self.shift_pct)

        sgram = AudioUtil.spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None)

        if self.augmentation:
            sgram = AudioUtil.spectro_augment(
                sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2
            )

        return sgram, class_id

In [43]:
# class with neural network model architecture
class AudioClassifier(nn.Module):
    # ----------------------------
    # Build the model architecture
    # ----------------------------
    def __init__(self):
        super().__init__()
        conv_layers = []

        # First Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
        self.conv1 = nn.Conv2d(1, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        # Second Convolution Block
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        # Second Convolution Block
        self.conv3 = nn.Conv2d(
            16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1)
        )
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        # Second Convolution Block
        self.conv4 = nn.Conv2d(
            32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1)
        )
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]

        # Linear Classifier
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=10)

        # Wrap the Convolutional Blocks
        self.conv = nn.Sequential(*conv_layers)

    # ----------------------------
    # Forward pass computations
    # ----------------------------
    def forward(self, x):
        # Run the convolutional blocks
        x = self.conv(x)

        # Adaptive pool and flatten for input to linear layer
        x = self.ap(x)
        x = x.view(x.shape[0], -1)

        # Linear layer
        x = self.lin(x)

        # Final output
        return x

In [44]:
def create_data_loader_train_val(batchsize):
    """
    Create data loaders for training and validation data
    Args:
        batchsize: batch size for training and validation data
    Returns:
        train_dl: training data loader
        validation_dl: validation data loader
    """
    # load metadata and create path to audio files
    data_folder = Path("audio")
    metadata_folder = Path("metadata")
    metadata_train = pd.read_csv(metadata_folder / "metadata_train.csv")
    metadata_validation = pd.read_csv(metadata_folder / "metadata_validation.csv")

    # take only filename and target columns
    metadata_train = metadata_train[["filename", "target"]]
    metadata_validation = metadata_validation[["filename", "target"]]

    # create datasets using SoundDS class
    sound_dataset_train = SoundDS(metadata_train, data_folder, augmentation=True)
    sound_dataset_validation = SoundDS(
        metadata_validation, data_folder, augmentation=False
    )

    # create data loaders for training and validation data
    # shuffle training data and do not shuffle validation data
    train_dl = torch.utils.data.DataLoader(
        sound_dataset_train, batch_size=batchsize, shuffle=True
    )
    validation_dl = torch.utils.data.DataLoader(
        sound_dataset_validation, batch_size=batchsize, shuffle=False
    )

    return train_dl, validation_dl

In [45]:
def checkpoint(epoch, model_state_dict, optimizer_state_dict, loss, filename):
    """
    Save model checkpoint
    Args:
        epoch: current epoch
        model_state_dict: state of the model
        optimizer_state_dict: state of the optimizer
        loss: loss of the model
        filename: name of the file to save the checkpoint
    """
    checkpoints_folder = Path("checkpoints")
    torch.save(
        {
            "epoch": epoch,
            "model_state_dict": model_state_dict,
            "optimizer_state_dict": optimizer_state_dict,
            "loss": loss,
        },
        checkpoints_folder / filename,
    )

In [46]:
def test_accuracy(net, dataloader):
    """
    Calculate accuracy on test or validation data
    Args:
        net: model to test
        dataloader: data loader for test or validation data
    Returns:
        acc: accuracy on test or validation data
        correct_per_class: dictionary with number of correct predictions per class
    """
    # initialize variables
    correct_prediction = 0
    total_prediction = 0
    correct_per_class = {}

    # set model to evaluation mode (inference mode)
    net.eval()

    # do not calculate gradients during inference
    with torch.no_grad():
        # iterate over test or validation data loader and calculate accuracy
        for data in dataloader:
            # get inputs and labels from data loader
            inputs, labels = data[0].to(device), data[1].to(device)

            # normalize inputs
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s

            # forward pass
            outputs = net(inputs)

            # get predictions and update correct predictions and total predictions
            _, prediction = torch.max(outputs, 1)
            correct_prediction += (prediction == labels).sum().item()
            total_prediction += prediction.shape[0]

            # update correct predictions per class
            for item, pred in zip(labels, prediction):
                item = item.item()
                pred = pred.item()

                if item in correct_per_class:
                    correct_per_class[item] += int(pred == item)
                else:
                    correct_per_class[item] = int(pred == item)

    # calculate accuracy
    acc = correct_prediction / total_prediction

    return acc, correct_per_class

In [47]:
def training_optimizing(net, trial, config, epochs):
    """
    Train and optimize model
    Args:
        net: model to train and optimize
        trial: optuna trial
        config: dictionary with hyperparameters
        epochs: number of epochs to train the model
    Returns:
        acc_validation: accuracy on validation data
    """
    # criterion for calculating loss function
    # CrossEntropyLoss is used for classification problems with multiple classes
    criterion = nn.CrossEntropyLoss()

    # learning rate, optimizer and batch size from config dictionary with hyperparameters
    lr = config["lr"]
    optimizer = config["optimizer"]
    batch_size = config["batch_size"]

    # create data loaders for training and validation data
    train_dl, validation_dl = create_data_loader_train_val(batch_size)

    # create scheduler for learning rate decay
    scheduler = torch.optim.lr_scheduler.OneCycleLR(
        optimizer,
        max_lr=lr,
        steps_per_epoch=int(len(train_dl)),
        epochs=epochs,
        anneal_strategy="linear",
    )

    # create tensorboard writer for logging and create variable for maximum validation accuracy
    writer = SummaryWriter()
    max_trial_accuracy = 0

    # Repeat for each epoch
    for epoch in range(epochs):
        # create variable for running loss
        running_loss = 0.0

        # set the model to training mode
        net.train()

        # Repeat for each batch in the training set
        for i, data in enumerate(train_dl, 0):
            # Get the input features and target labels, and put them on the GPU
            inputs, labels = data[0].to(device), data[1].to(device)

            # Normalize the inputs
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s

            # Zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            scheduler.step()

            # Keep stats for Loss and Accuracy
            running_loss += loss.item()

        # Calculate average loss over an epoch
        num_batches = len(train_dl)
        avg_loss_train = running_loss / num_batches

        # set model to evaluation mode (inference mode)
        net.eval()

        # create variables for validation loss and number of steps
        val_loss_validation = 0.0
        val_steps_validation = 0

        # calculate validation loss and number of steps for validation data without calculating gradients
        for data in validation_dl:
            with torch.no_grad():
                inputs, labels = data
                inputs, labels = inputs.to(device), labels.to(device)

                inputs_m, inputs_s = inputs.mean(), inputs.std()
                inputs = (inputs - inputs_m) / inputs_s

                outputs = net(inputs)

                loss = criterion(outputs, labels)
                val_loss_validation += loss.cpu().numpy()
                val_steps_validation += 1

        # save checkpoint for each epoch
        checkpoint(
            epoch,
            net.state_dict(),
            optimizer.state_dict(),
            loss,
            "conv_net_checkpoint.pth",
        )

        # Calculate accuracy on validation data
        val_accuracy = test_accuracy(net, validation_dl)[0]
        trial.report(val_accuracy, epoch)

        # update maximum trial accuracy
        if val_accuracy > max_trial_accuracy:
            max_trial_accuracy = val_accuracy

        # check if validation accuracy is greater than maximum validation accuracy
        # if yes, update maximum validation accuracy and save checkpoint
        global max_val_accuracy
        if val_accuracy > max_val_accuracy:
            max_val_accuracy = val_accuracy
            checkpoint(
                epoch,
                net.state_dict(),
                optimizer.state_dict(),
                loss,
                "conv_net_checkpoint_best.pth",
            )

        # write to tensorboard
        writer.add_scalar("Loss/train", avg_loss_train, epoch + 1)
        writer.add_scalar(
            "Loss/validation", val_loss_validation / val_steps_validation, epoch + 1
        )
        writer.add_scalar("Accuracy/train", test_accuracy(net, train_dl)[0], epoch + 1)
        writer.add_scalar("Accuracy/validation", val_accuracy, epoch + 1)

        # Handle pruning based on the intermediate value.
        if trial.should_prune():
            writer.flush()
            writer.close()
            raise optuna.exceptions.TrialPruned()

    # flush and close tensorboard writer
    writer.flush()
    writer.close()

    return max_trial_accuracy

In [48]:
def objective(trial):
    """
    Objective function for optuna
    Args:
        trial: optuna trial
    Returns:
        acc: accuracy on validation data
    """
    # create model and move it to detected device
    net = AudioClassifier()
    net.to(device)

    # number of epochs to train the model
    epochs = 100

    # hyperparameters to optimize
    optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "SGD", "RMSprop"])
    lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
    batch_size = trial.suggest_categorical("batch_size", [8, 16, 32, 64])

    # create optimizer
    optimizer = getattr(optim, optimizer_name)(net.parameters(), lr=lr)

    # create config dictionary with hyperparameters
    config = {"optimizer": optimizer, "lr": lr, "batch_size": batch_size}

    # train and check accuracy on validation data
    acc = training_optimizing(net, trial, config, epochs)

    return acc

In [49]:
max_val_accuracy = 0

# start optuna study and optimize hyperparameters
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=30, show_progress_bar=True)

[I 2023-12-11 12:31:01,474] A new study created in memory with name: no-name-857ca2ff-2077-4410-9a5b-7184a6589db2


  0%|          | 0/30 [00:00<?, ?it/s]

[I 2023-12-11 12:40:20,381] Trial 0 finished with value: 0.9 and parameters: {'optimizer': 'RMSprop', 'lr': 0.00016878998103766953, 'batch_size': 16}. Best is trial 0 with value: 0.9.
[I 2023-12-11 12:53:13,860] Trial 1 finished with value: 0.75 and parameters: {'optimizer': 'SGD', 'lr': 0.002769275394997339, 'batch_size': 8}. Best is trial 0 with value: 0.9.
[I 2023-12-11 13:14:24,362] Trial 2 finished with value: 0.925 and parameters: {'optimizer': 'RMSprop', 'lr': 0.001683849132799046, 'batch_size': 32}. Best is trial 2 with value: 0.925.
[I 2023-12-11 13:37:54,748] Trial 3 finished with value: 0.85 and parameters: {'optimizer': 'Adam', 'lr': 0.0015251613028135665, 'batch_size': 8}. Best is trial 2 with value: 0.925.
[I 2023-12-11 14:03:23,235] Trial 4 finished with value: 0.7 and parameters: {'optimizer': 'Adam', 'lr': 0.00022989970681060707, 'batch_size': 8}. Best is trial 2 with value: 0.925.
[I 2023-12-11 14:24:17,419] Trial 5 finished with value: 0.825 and parameters: {'optimiz

In [50]:
# get number of pruned and complete trials from study and print them
pruned_trials = study.get_trials(deepcopy=False, states=[TrialState.PRUNED])
complete_trials = study.get_trials(deepcopy=False, states=[TrialState.COMPLETE])

print("Study statistics: ")
print("  Number of finished trials: ", len(study.trials))
print("  Number of pruned trials: ", len(pruned_trials))
print("  Number of complete trials: ", len(complete_trials))

Study statistics: 
  Number of finished trials:  30
  Number of pruned trials:  18
  Number of complete trials:  12


In [51]:
# print best trial and its parameters
print("Best trial:")
trial = study.best_trial

print("  Value: ", trial.value)

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))

Best trial:
  Value:  0.925
  Params: 
    optimizer: RMSprop
    lr: 0.001683849132799046
    batch_size: 32
