# Audio Event Classification

The aim of this project is to produce a working Neural Network model that can classify audio events, with particular regard to crime incidents.

The datasets used to train the network were provided by the teacher. The [raw dataset](https://www.kaggle.com/datasets/afisarsy/raw-audio-of-accident-and-crime-detection) contains plain samples of audio events, while the [enhanced dataset](https://www.kaggle.com/datasets/afisarsy/enhanced-audio-of-accident-and-crime-detection) contains the same samples of the raw dataset, which have been mixed with noise effects (wind, thunderstorm, rain and road traffic).

In total, the final model needs to distinguish between 13 different audio classes, which are the following:
* car_crash
* conversation
* engine_idling
* gun_shot
* jambret
* maling
* rain
* rampok
* road_traffic
* scream
* thunderstorm
* tolong
* wind

## Structure, Network and Framework
The project was intended to be run as individual Python scripts, just like a normal program would be run with a series of parameters. For the purpose of the project's presentation, a Python Jupyter Notebook was assembled listing the content of each individual script.

The network adopted is an LSTM RNN implemented using PyTorch.

## Bibliography and external sources
The majority of the project needed a particular regard with respect to the specific dataset used and available system resources. For that reason, most of the code is brand new and original. However, some parts of the project required the use of already existing code, in order to have a working basis for the other modules involved.

In particular, the following sources were used for inspiration:
* [seth814/Audio-Classification](https://github.com/seth814/Audio-Classification), for the structure of the LSTM network
* [Audio Classification Starter](https://www.kaggle.com/code/aayush9753/audio-classification-starter-in-pytorch/notebook), for the use of Datasets and DataLoaders
* [Audio Classification with LSTM](https://www.kaggle.com/code/kvpratama/audio-classification-with-lstm-and-torchaudio), for the training and testing function

# AudioEventDataset.py

A Pytorch Dataset is a class used to handle the actual retrivial of data samples. It allows for a customizable processing of the data that can be adapted to one's specific needs.

In our case, the AudioEventDataset class (which extends the generic Dataset parent class provided by Pytorch) takes in input a source directory, a frame size + hop size (in seconds) and a sampling rate.
The actual method in charge of retrieving and pre-processing the audio samples is `__getitem__()`, which loads the audio file given a path, chunks it into several frames and returns a framed audio tensor with the correct label associated with it. The label is formatted as a *one-hot encoded* tensor

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset
import torchaudio
import os
from sklearn.preprocessing import LabelEncoder
from glob import glob
import numpy as np

class AudioEventDataset(Dataset):

    def __init__(self, src_dir, frame_size, hop_size, sample_rate):
        super().__init__()
        self.src_dir = src_dir
        self.frame_size = frame_size
        self.hop_size = hop_size
        self.sample_rate = sample_rate

        audio_paths_absolute = [x for x in glob('{}/**'.format(src_dir), recursive = True) if '.wav' in x] # look for all files within src_dir
        audio_paths = []
        for path in audio_paths_absolute:
            audio_paths.append(os.path.relpath(path, src_dir)) # append just the relative path

        self.paths = audio_paths

        self.classes = sorted(list(set( [x.split('/')[0] for x in self.paths] )))
        self.n_classes = len(self.classes)

        self.label_encoder = LabelEncoder()
        self.label_encoder.fit(self.classes) # use an encoder to convert labels to integers

        print(f"Total samples found: {len(self)}\n")
        print(f"Total classes found: {self.n_classes}")
        for class_item in self.classes:
            print(f"    - {class_item}")
        print('\n')

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, index):

        path = self.paths[index]

        audio_data, _ = torchaudio.load(os.path.join(self.src_dir, path), normalize = True) # load and normalize the audio file
        framed_audio_data = AudioEventDataset.frame_audio_overlap(audio_data, self.frame_size, self.sample_rate, self.hop_size) # chunk it into several frames
        label = path.split('/')[0]

        label = self.label_encoder.transform([label])[0]
        labels = np.array([np.eye(self.n_classes, dtype = 'float')[label]]) # create an one-hot encoded label tensor

        labels = np.repeat(labels, framed_audio_data.size()[0], axis = 0) # repeat it as many times as the frames created

        return framed_audio_data, torch.tensor(labels)

    @staticmethod
    def frame_audio_overlap(audio_tensor, frame_size, sample_rate, hop_size):
        audio = audio_tensor[0]
        frame_size_samples = int(frame_size * sample_rate)

        if hop_size:
            hop_size_samples = int(hop_size * sample_rate)
        else: hop_size_samples = frame_size_samples

        frames = torch.empty((0, frame_size_samples), dtype = torch.float32)

        for start in range(0, audio.size()[0], hop_size_samples):
            end = start + frame_size_samples
            frame = audio[start:end]
            if len(frame) < frame_size_samples:
                if len(frame) >= (frame_size_samples / 2): # if the size of the remaining sample is not shorter than half the frame size...
                    frame = nn.functional.pad(frame, (0, frame_size_samples - frame.size()[0]), 'constant', value = 0) # pad with zeros
                else: break
            frame = torch.reshape(frame, (1, -1)) # reshape as a batched sample
            frames = torch.cat([frames, frame], dim = 0) # concatenate

        return frames

if __name__ == '__main__':
    from glob import glob
    import os
    src_dir = '/home/ldomeneghetti/Documents/Forensics/audio_classification_pytorch/raw_audio/car_crash'
    audio_paths_absolute = [x for x in glob('{}/**'.format(src_dir), recursive = True) if '.wav' in x]
    audio_paths = []
    for path in audio_paths_absolute:
        audio_paths.append(os.path.relpath(path, src_dir))

    dataset = AudioEventDataset(audio_paths, src_dir, 1, 0.5, 44100)

    item = dataset.__getitem__(0)

    print(item)

# models.py

The LSTMNetwork class provides the functionality for the LSTM Neural Network model to be used in the project. First, each layer is defined and initialized by the `__init__()` method, then a `forward()` chain is constructed.

In [None]:
import torch
import torch.nn as nn
import torchaudio
import torchaudio.transforms as transforms
import math
from torchinfo import summary

class LSTMNetwork(nn.Module):

    def __init__(self, n_classes, sample_rate = 44100, sample_duration = 1.0, print_summary = True):

        super(LSTMNetwork, self).__init__()

        self.melspectrogram = transforms.MelSpectrogram(
            sample_rate=sample_rate,
            n_fft=2048,
            n_mels=128,
            hop_length=256,
            f_max= int(sample_rate / 2) # Shannon's sampling theorem, set max frequency equal to half sampling frequency
        )
        self.amplitude_to_db = transforms.AmplitudeToDB()

        self.mfcc = transforms.MFCC(
            sample_rate=sample_rate,
            n_mfcc=40,
            melkwargs={"n_fft": 2048, "hop_length": 256, "n_mels": 128, "f_max": int(sample_rate / 2)}
        )

        self.layer_norm_mel = nn.LayerNorm(128)
        self.layer_norm_mfcc = nn.LayerNorm(40)

        self.bidirectional_lstm_mel = nn.LSTM(
            input_size = 128, hidden_size = 256, num_layers = 1, batch_first = True, bidirectional = True
        )

        self.bidirectional_lstm_mfcc = nn.LSTM(
            input_size = 40, hidden_size = 256, num_layers = 1, batch_first = True, bidirectional = True
        )

        self.dense_1_relu = nn.Linear(1024, 512)  # LSTM outputs (512 + 512)
        self.batch_norm_1 = nn.BatchNorm1d(512) # batch normalization to avoid overfitting
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.5) # dropout layer to avoid overfitting

        self.dense_2_relu = nn.Linear(512, 128)
        self.batch_norm_2 = nn.BatchNorm1d(128)
        self.dense_3_relu = nn.Linear(128, 64)
        self.batch_norm_3 = nn.BatchNorm1d(64)

        self.output_layer = nn.Linear(64, n_classes)

        if(print_summary): summary(self, torch.Size([1, 44100]))

    def forward(self, x):
        x_mel = self.melspectrogram(x)
        x_mel = self.amplitude_to_db(x_mel)

        x_mfcc = self.mfcc(x)

        x_mel = x_mel.permute(0, 2, 1)
        x_mel = self.layer_norm_mel(x_mel)

        x_mfcc = x_mfcc.permute(0, 2, 1)
        x_mfcc = self.layer_norm_mfcc(x_mfcc)

        lstm_out_mel, (h_n_mel, _) = self.bidirectional_lstm_mel(x_mel)
        lstm_out_mfcc, (h_n_mfcc, _) = self.bidirectional_lstm_mfcc(x_mfcc)

        x_mel = torch.cat([h_n_mel[-2,:,:], h_n_mel[-1,:,:]], dim=1)
        x_mfcc = torch.cat([h_n_mfcc[-2,:,:], h_n_mfcc[-1,:,:]], dim=1) # get the last two hidden layers, both forward and reverse

        x = torch.cat([x_mel, x_mfcc], dim=1) # concatenate Mel and MFCC representation

        x = self.dense_1_relu(x)
        x = self.batch_norm_1(x)
        x = self.relu(x)

        x = self.dropout(x)

        x = self.dense_2_relu(x)
        x = self.batch_norm_2(x)
        x = self.relu(x)
        x = self.dense_3_relu(x)
        x = self.batch_norm_3(x)
        x = self.relu(x)

        logits = self.output_layer(x)

        return logits # return logits without softmax (CrossEntropyLoss will be used later)

if __name__ == '__main__':

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    lstm = LSTMNetwork(10, 44100, 1.0).to(device)
    summary(lstm, torch.Size([1, 44100]))

# train.py

The train module consists of two main steps:
*   initialize the data and prepare it to be fed to the LSTM network
*   the actual training + testing of the model

The actual `train()` function encapsulates the `train_one_epoch()` function which performs the training for a single epoch; iterating the latter, we obtain a full training cycle on the network. The training is performed on the GPU.

After the training, a single cycle of unbatched testing is performed on the CPU (due to insufficient memory on the host device). The testing allows to produce a confusion matrix which is then printed and can be saved.

After the whole process of training + testing, the model is saved both in a complete format and in a reduced format (weights only).

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, random_split

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

from AudioEventDataset import AudioEventDataset
from models import LSTMNetwork

from tqdm import tqdm

import argparse

import os

import json

def collate_audio(batch): # function needed to properly concatenate audio frames into a single batch
    data_list = []
    label_list = []
    for element in batch:
        data_list.append(element[0])
        label_list.append(element[1])

    data_tensor = torch.cat(data_list, dim = 0)
    label_tensor = torch.cat(label_list, dim = 0)

    return data_tensor, label_tensor


def initialize_train(args):

    os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:64"
    os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
    os.environ["QT_QPA_PLATFORM"] = "xcb"

    src_dir = args.src_dir
    batch_size = args.batch_size
    frame_size = args.frame_size
    hop_size = args.hop_size
    sample_rate = args.sample_rate
    test_size = args.test_size
    random_state = args.random_state
    epochs = args.epochs
    out_file = args.out_file

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    audio_dataset = AudioEventDataset(src_dir, frame_size, hop_size, sample_rate)

    assert (test_size > 0 and test_size < 1), "test_size must be strictly between 0 and 1"
    train_dataset, test_dataset = random_split(audio_dataset, [1.0 - test_size, test_size], generator = torch.Generator().manual_seed(random_state))

    train_dataloader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True, collate_fn = collate_audio)
    test_dataloader = DataLoader(test_dataset, batch_size = 1, shuffle = True, collate_fn = collate_audio)

    with open('classes.json', 'w', encoding = 'utf-8') as json_file:
        json.dump({'classes': audio_dataset.classes}, json_file, ensure_ascii = False, indent = 4) # save classes' labels in a JSON file

    lstm_model = LSTMNetwork(audio_dataset.n_classes, sample_rate, frame_size).to(device)

    train(lstm_model, train_dataloader, test_dataloader, epochs, device, audio_dataset.classes)

    torch.save(lstm_model.state_dict(), out_file + "_dict.pth") # save the model's dictionary (just the weights)
    torch.save(lstm_model, out_file + ".pth") # save the whole model



def train(model, train_dl, test_dl, epochs, device, classes):

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr = 0.001, weight_decay = 1e-4)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer,
                                                    max_lr = 0.001,
                                                    steps_per_epoch = int(len(train_dl)),
                                                    epochs = epochs,
                                                    anneal_strategy = 'linear')

    for epoch in range(epochs):

        train_loss, train_acc = train_one_epoch(model, train_dl, criterion, optimizer, device) # train the model for one epoch, return train loss and accuracy


        print(f"Epoch [{epoch+1}/{epochs}]")
        print(f"    Train Loss: {train_loss:.4f}        Train Acc:  {train_acc:.4f}")
        print("----------------------------------------------------------------")

    test_loss, test_acc = test_model(model, test_dl, criterion, classes) # after training, test the model and return test loss and test accuracy
    print("\nFinal model:")
    print(f"    Train Loss: {train_loss:.4f}        Train Acc:  {train_acc:.4f}")
    print(f"    Test Loss: {test_loss:.4f}        Test Acc:  {test_acc:.4f}")


def train_one_epoch(model, dl, criterion, optimizer, device):

    model.train()

    total_loss = 0.0
    total_correct = 0
    total_data = 0

    progress_bar = tqdm(dl, desc = "Training", unit = "batch")

    for data, target in progress_bar:
        data = data.to(device)
        target = target.to(device)

        model.zero_grad()

        output = model(data)

        loss = criterion(output, target) # compute the loss
        loss.backward() # backpropagate the loss
        optimizer.step()

        _, predicted = torch.max(output, dim = 1)
        _, correct = torch.max(target, dim = 1)

        total_data += data.size(0)

        partial_correct = torch.sum(predicted == correct).item()
        partial_accuracy = partial_correct / data.size(0)
        partial_loss = loss.item()

        total_correct += partial_correct
        total_loss += partial_loss * data.size(0)

        progress_bar.set_postfix({"Accuracy": partial_accuracy, "Loss": partial_loss})

    epoch_loss = total_loss / total_data
    epoch_acc = total_correct / total_data

    return epoch_loss, epoch_acc



def test_model(model, dl, criterion, classes):

    device = torch.device('cpu')

    model.to(device)
    model.eval()

    total_loss = 0.0
    total_correct = 0
    total_data = 0

    progress_bar = tqdm(dl, desc = "Testing", unit = "batch")

    correct_labels = torch.empty((0), dtype = int)
    predicted_labels = torch.empty((0), dtype = int)

    with torch.no_grad():
        for batch_index, (data, target) in enumerate(dl):
            data = data.to(device)
            target = target.to(device)

            output = model(data)

            loss = criterion(output, target)

            _, predicted = torch.max(output, dim = 1)
            _, correct = torch.max(target, dim = 1)

            correct_labels = torch.cat([correct_labels, correct])
            predicted_labels = torch.cat([predicted_labels, predicted])

            total_data += data.size(0)

            partial_correct = torch.sum(predicted == correct).item()
            partial_accuracy = partial_correct / data.size(0)
            partial_loss = loss.item()

            total_correct += partial_correct
            total_loss += partial_loss * data.size(0)

            progress_bar.set_postfix({"Accuracy": partial_accuracy, "Loss": partial_loss})

    display = ConfusionMatrixDisplay.from_predictions(correct_labels.tolist(), predicted_labels.tolist(), display_labels = classes, xticks_rotation = 'vertical')
    display.plot()
    plt.show() # compute and plot the confusion matrix

    test_loss = total_loss / total_data
    test_acc = total_correct / total_data

    return test_loss, test_acc


if __name__ == '__main__':

    parser = argparse.ArgumentParser(description = "Audio Event Classification training")

    parser.add_argument('--src-dir', type = str, required = True, help = "directory of source audio files")

    parser.add_argument('--batch-size', type = int, default = 16, help = "batch size")

    parser.add_argument('--frame-size', type = float, default = 1.0, help = "audio frame size in seconds")

    parser.add_argument('--hop-size', type = float, default = 0.5, help = "audio hop size in seconds")

    parser.add_argument('--sample-rate', type = int, default = 44100, help = "sample rate")

    parser.add_argument('--test-size', type = float, default = 0.1, help = "ammount of samples (from 0.0 to 1.0) to be used for testing")

    parser.add_argument('--random-state', type = int, default = 2159017, help = "random state for samples shuffling")

    parser.add_argument('--epochs', type = int, default = 16, help = "epochs to train the network")

    parser.add_argument('--out-file', type = str, default = "best_model", help = "name for the output model file")

    args, _ = parser.parse_known_args()

    initialize_train(args)

# predict.py

After having trained the model, predictions can be performed starting from new, unseen audio files.

The prediction follows a similar fashion to the pre-processing used during training: the audio data is divided into smaller windows (each window with a partial overlap with the preceeding and following), and prediction is performed on each of those windows.

Using some tollerance parameters, an *event segments array* is produced, which is a list of all the events occuring in the audio including the starting instant and the ending instant.

In [None]:
from AudioEventDataset import AudioEventDataset

import torchaudio
import torch

from models import LSTMNetwork

import argparse

import json

import math

import pygame

def initialize_predict(args):

    audio_file = args.audio_file
    sample_rate = args.sample_rate
    win_size = args.win_size
    win_hop = args.win_hop
    win_min = args.win_min
    threshold = args.threshold
    model_file = args.model_file
    classes_file = args.classes_file

    with open(classes_file, 'r') as file:
        data = json.load(file)

    classes = data['classes'] # import the classes' labels from the JSON file previously saved

    audio_data, _ = torchaudio.load(audio_file, normalize = True) # load the given audio data
    play_audio(audio_file) # play the audio
    processed_audio = AudioEventDataset.frame_audio_overlap(audio_data, win_size, sample_rate, win_hop) # chunk the audio into equally sized frames

    model = torch.load(model_file, weights_only = False) # load the model

    predict(model, processed_audio, classes, threshold, sample_rate, win_size, win_hop, win_min) # perform prediction


def predict(model, input, classes, threshold, sample_rate, win_size, win_hop, min_frames):
    model.to('cpu')
    model.eval()

    input = input.to('cpu')

    output = model(input)

    output_sigmoid = torch.nn.Sigmoid()(output) # convert logits to Sigmoid
    output_normalized = torch.nn.functional.normalize(output_sigmoid) # normalize Sigmoid

    predicted_classes = []
    for tensor in output_normalized:
        value, index = torch.max(tensor, dim = 0)
        if value.item() >= threshold:
            predicted_classes.append(index.item())
        else: predicted_classes.append(-1)

    detected_events = group_contiguous(predicted_classes, min_frames, win_size, win_hop) # group contiguous events

    # print event segments
    print("\n##################### DETECTED EVENTS #####################")
    for event in detected_events:
        start_sec = event[1]*win_hop
        end_sec = (event[2])*win_hop + win_size
        print(f"    - {classes[event[0]]}: start {start_sec:.2f} sec  /  end {end_sec:.2f} sec")
    print("###########################################################\n")


def group_contiguous(input_list, min_contiguous, win_size, win_hop):
    grouped_list = []
    previous_item = input_list[0]

    count = 0
    start_index = 0
    end_index = 1
    for index, item in enumerate(input_list):
        if item != previous_item:
            if previous_item != -1 and count >= min_contiguous:
                end_index = start_index + count - 1
                grouped_list.append([previous_item, start_index, end_index])
            start_index = index
            previous_item = item
            count = 1
        else: count += 1

    i = 1
    end = len(grouped_list)
    while i < end:
        if grouped_list[i][0] == grouped_list[i-1][0]:
            next_start = grouped_list[i][1]
            previous_end = grouped_list[i-1][2]
            if next_start - previous_end <= math.floor(win_size/win_hop):
                grouped_list[i][1] = grouped_list[i-1][1]
                del grouped_list[i-1]
                end -= 1
        i += 1

    return grouped_list

def play_audio(filename):

    pygame.mixer.init()
    pygame.mixer.music.load(filename)
    pygame.mixer.music.play()
    while pygame.mixer.music.get_busy():
        pass

if __name__ == '__main__':

    parser = argparse.ArgumentParser(description = "Audio Event Classification prediction")

    parser.add_argument('--audio-file', type = str, required = True, help = 'source audio file')

    parser.add_argument('--sample-rate', type = int, default = 44100, help = 'sample rate')

    parser.add_argument('--win-size', type = float, default = 0.4, help = 'detection window length')

    parser.add_argument('--win-hop', type = float, default = 0.2, help = 'detection window hop length')

    parser.add_argument('--win-min', type = int, default = 2, help = 'minimum contiguous windows required to detect an event')

    parser.add_argument('--threshold', type = float, default = 0.8, help = 'value above which a prediction is considered valid')

    parser.add_argument('--model-file', type = str, default = 'lstm_network.pth', help = 'filename of the model to be loaded')

    parser.add_argument('--classes-file', type = str, default = 'classes.json', help = 'filename of the classes file')

    args, _ = parser.parse_known_args()

    initialize_predict(args)