# import libraries

In [None]:
import os
import math, random

import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn import init
from torchaudio import transforms
from torch.utils.data import random_split
from torch.utils.data import DataLoader, Dataset, random_split
import torch.nn.functional as F
import torchaudio

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from tqdm import tqdm

# Loading Dataset From kaggle

In [None]:
!pip install kaggle
!pip install torchsummary

In [None]:
from google.colab import files

# Upload the kaggle.json file
files.upload()

# Move kaggle.json to the .kaggle directory
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/

In [None]:
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d chrisfilo/urbansound8k

# Extract urbansound8k voices

In [None]:
import zipfile

zip_file_path = 'urbansound8k.zip'
extract_to_path = 'urbansound8k'

os.makedirs(extract_to_path, exist_ok=True)

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extract_to_path)

print(f"Contents of {zip_file_path} extracted to {extract_to_path}")

# Torch Seed initialization

In [1]:
def set_seed(seed=42):

    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)

    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

    os.environ['PYTHONHASHSEED'] = str(seed)

set_seed(seed=2023)

NameError: name 'np' is not defined

# Read initial Data

In [None]:
meta_df = pd.read_csv("urbansound8k/UrbanSound8K.csv")
meta_df['relative_path'] = '/fold' + meta_df['fold'].astype(str) + '/' + meta_df['slice_file_name'].astype(str)
meta_df.head()

# Take relevant columns

In [None]:
df = meta_df[['relative_path', 'classID']]
df.head()

# Voice preprocessing Functions

In [2]:
class AudioUtil:

    @staticmethod
    def open(audio_file): # Load an audio file. Return the signal as a tensor and the sample rate.
        sig, sr = torchaudio.load(audio_file)
        return sig, sr


    @staticmethod
    def rechannel(aud, new_channel): # Convert the given audio to the desired number of channels
        sig, sr = aud  #sig -> signal, sr -> sampling rate

        if sig.shape[0] == new_channel:
            return aud

        if new_channel == 1:
            resig = sig[:1, :] # Convert from stereo to mono by selecting only the first channel

        else:
            resig = torch.cat([sig, sig]) # Convert from mono to stereo by duplicating the first channel

        return resig, sr


    @staticmethod
    def resample(aud, newsr): # Since Resample applies to a single channel, we resample one channel at a time
        sig, sr = aud

        if sr == newsr:
            return aud

        num_channels = sig.shape[0]
        # Resample first channel
        resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:])
        if num_channels > 1:
            # Resample the second channel and merge both channels
            retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:])
            resig = torch.cat([resig, retwo])

        return resig, newsr


    @staticmethod
    def pad_trunc(aud, max_ms): # Pad (or truncate) the signal to a fixed length 'max_ms' in milliseconds
        sig, sr = aud
        num_rows, sig_len = sig.shape
        max_len = sr//1000 * max_ms

        if sig_len > max_len:
            # Truncate the signal to the given length
            sig = sig[:,:max_len]

        elif sig_len < max_len:
            # Length of padding to add at the beginning and end of the signal
            pad_begin_len = random.randint(0, max_len - sig_len)
            pad_end_len = max_len - sig_len - pad_begin_len

            # Pad with 0s
            pad_begin = torch.zeros((num_rows, pad_begin_len))
            pad_end = torch.zeros((num_rows, pad_end_len))

            sig = torch.cat((pad_begin, sig, pad_end), 1)

        return sig, sr



    @staticmethod
    def time_shift(aud, shift_limit):     # Shifts the signal to the left or right by some percent. Values at the end
                                          # are 'wrapped around' to the start of the transformed signal.
        sig,sr = aud
        _, sig_len = sig.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        return sig.roll(shift_amt), sr


    @staticmethod
    def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None): # Generate a Spectrogram
        sig,sr = aud
        top_db = 80

        # spec has shape [channel, n_mels, time], where channel is mono, stereo etc
        spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)

        # Convert to decibels
        spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
        return spec


    # ----------------------------
    # Augment the Spectrogram by masking out some sections of it in both the frequency
    # dimension (ie horizontal bars) and the time dimension (vertical bars) to prevent
    # overfitting and to help the model generalise better. The masked sections are
    # replaced with the mean value.
    # ----------------------------
    @staticmethod
    def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
        _, n_mels, n_steps = spec.shape
        mask_value = spec.mean()
        aug_spec = spec

        freq_mask_param = max_mask_pct * n_mels
        for _ in range(n_freq_masks):
            aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

        time_mask_param = max_mask_pct * n_steps
        for _ in range(n_time_masks):
            aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

        return aug_spec

# implementation of dataloader

In [None]:
class SoundDS(Dataset):
    def __init__(self, df, data_path):
        self.df = df
        self.data_path = str(data_path)
        self.duration = 4000
        self.sr = 44100 #  sampling rate
        self.channel = 2
        self.shift_pct = 0.4

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        audio_file = self.data_path + self.df.loc[idx, 'relative_path']
        class_id = self.df.loc[idx, 'classID']

        # Reading & Augmenting Data
        aud = AudioUtil.open(audio_file)
        reaud = AudioUtil.resample(aud, self.sr)
        rechan = AudioUtil.rechannel(reaud, self.channel)
        dur_aud = AudioUtil.pad_trunc(rechan, self.duration)
        shift_aud = AudioUtil.time_shift(dur_aud, self.shift_pct)

        # Spectrogram
        sgram = AudioUtil.spectro_gram(shift_aud, n_mels=64, n_fft=1024, hop_len=None)
        aug_sgram = AudioUtil.spectro_augment(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)

        return aug_sgram, class_id

In [None]:
data_path = "urbansound8k"
myds = SoundDS(df, data_path)

num_items = len(myds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds, val_ds = random_split(myds, [num_train, num_val])

train_dl = torch.utils.data.DataLoader(train_ds, batch_size=32, shuffle=True)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=128, shuffle=False)

In [None]:
next(iter(train_dl))[0].shape

# Model

In [None]:
class AudioClassifier (nn.Module):

    def __init__(self):
        super().__init__()
        conv_layers = []

        # First Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
        self.conv1 = nn.Conv2d(2, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        # Second Convolution Block
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        # The third Convolution Block
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        # The 4th Convolution Block
        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]

        # Linear Classifier
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=10)

        # Wrap the Convolutional Blocks
        self.conv = nn.Sequential(*conv_layers)


    def forward(self, x):
        x = self.conv(x)

        x = self.ap(x)

        x = x.view(x.shape[0], -1)
        x = self.lin(x)

        return x


device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model = AudioClassifier()
model = model.to(device)

next(model.parameters()).device

# Train Model

In [None]:
def training(model, train_dl, num_epochs):

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                    steps_per_epoch=int(len(train_dl)),
                                                    epochs=num_epochs,
                                                    anneal_strategy='linear')

    for epoch in range(num_epochs):
        running_loss = 0.0
        correct_prediction = 0
        total_prediction = 0

        for i, data in enumerate(train_dl):

            inputs, labels = data[0].to(device), data[1].to(device)

            # Normalize the inputs
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s


            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            scheduler.step()

            # Keep stats for Loss and Accuracy
            running_loss += loss.item()

            _, prediction = torch.max(outputs,1)
            correct_prediction += (prediction == labels).sum().item()
            total_prediction += prediction.shape[0]

            # if i % 10 == 0:    # print every 10 mini-batches
            #    print('[%d, %5d] loss: %.3f' % (epoch, i, running_loss / 10))


        num_batches = len(train_dl)
        avg_loss = running_loss / num_batches
        acc = correct_prediction/total_prediction
        print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')

    print('Finished Training')

num_epochs=20
training(model, train_dl, num_epochs)