Data: https://www.kaggle.com/datasets/sripaadsrinivasan/audio-mnist

# IMPORT PACKAGES
All package that are imported for the solution of the problem

In [None]:
import os
import math
import random
import pandas as pd
import time
from sklearn.metrics import confusion_matrix
import torch
from torch import optim
from torch.autograd import Variable
import torch.nn as nn
from torch.nn import init
from torch.utils.data import DataLoader, Dataset, random_split
import torchaudio
from torchaudio import transforms
from google.colab import drive

# MOUNT GDRIVE
Mount gdrive to connect to local folders/files

In [None]:
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
#@title Prepare utilities for sound functions. {display-mode: "form"}
#@markdown
#@markdown You do not need to look into this cell.
#@markdown Just execute once and you are good to go.

#-------------------------------------------------------------------------------
# Preparation of data and helper functions.
#-------------------------------------------------------------------------------

class SoundUtils():
  
  @staticmethod
  def open(audio_file):
    sig, sr = torchaudio.load(audio_file)
    return (sig, sr)

  @staticmethod
  def rechannel(aud, new_channel):
    sig, sr = aud
    if (sig.shape[0] == new_channel):
      return aud
    if (new_channel == 1):
      resig = sig[:1, :]
    else:
      resig = torch.cat([sig, sig])
    return ((resig, sr))

  @staticmethod
  def resample(aud, newsr):
    sig, sr = aud
    if (sr == newsr):
      return aud
    num_channels = sig.shape[0]
    resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:])
    if (num_channels > 1):
      retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:])
      resig = torch.cat([resig, retwo])
    return ((resig, newsr))

  @staticmethod
  def pad_trunc(aud, max_ms):
    sig, sr = aud
    num_rows, sig_len = sig.shape
    max_len = sr//1000 * max_ms

    if (sig_len > max_len):
      sig = sig[:,:max_len]

    elif (sig_len < max_len):
      pad_begin_len = random.randint(0, max_len - sig_len)
      pad_end_len = max_len - sig_len - pad_begin_len
      pad_begin = torch.zeros((num_rows, pad_begin_len))
      pad_end = torch.zeros((num_rows, pad_end_len))
      sig = torch.cat((pad_begin, sig, pad_end), 1)
    return (sig, sr)

  @staticmethod
  def time_shift(aud, shift_limit):
    sig,sr = aud
    _, sig_len = sig.shape
    shift_amt = int(random.random() * shift_limit * sig_len)
    return (sig.roll(shift_amt), sr)

  @staticmethod
  def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
    sig,sr = aud
    top_db = 80
    spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)
    spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
    return (spec)
  
  @staticmethod
  def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
    _, n_mels, n_steps = spec.shape
    mask_value = spec.mean()
    aug_spec = spec

    freq_mask_param = max_mask_pct * n_mels
    for _ in range(n_freq_masks):
      aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

    time_mask_param = max_mask_pct * n_steps
    for _ in range(n_time_masks):
      aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

    return aug_spec

  @staticmethod
  def getSpectrogramFromSound(sound_path, sr, channel, duration, shift_pct):
    sound = SoundUtils.open(sound_path)
    sound = SoundUtils.resample(sound, sr)
    sound = SoundUtils.rechannel(sound, channel)
    sound = SoundUtils.pad_trunc(sound, duration)
    sound = SoundUtils.time_shift(sound, shift_pct)
    sound = SoundUtils.spectro_gram(sound, n_mels=64, n_fft=1024, hop_len=None)
    sound = SoundUtils.spectro_augment(sound, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)
    return sound

  @staticmethod
  def getSpectrogramFromSoundAndSampleRate(sound, sr, channel, duration, shift_pct):
    sound = SoundUtils.resample(sound, sr)
    sound = SoundUtils.rechannel(sound, channel)
    sound = SoundUtils.pad_trunc(sound, duration)
    sound = SoundUtils.time_shift(sound, shift_pct)
    sound = SoundUtils.spectro_gram(sound, n_mels=64, n_fft=1024, hop_len=None)
    sound = SoundUtils.spectro_augment(sound, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)
    return sound

# LOAD DEVICE
Load the torch device to use the gpu instead of the cpu (important to connected the gpu runtime)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
if torch.cuda.is_available():
  print(torch.cuda.get_device_name(0))

cuda
Tesla K80


# DATASET SOUND CLASS
Definition of a custom Pytorch Dataset object, to use it for loading data from folder and get an item. It is a custom Dataset that use the audio transforms to preprocess an audio file and prepare every items in it.

In [None]:
class DigitsSoundDataset(Dataset):
  def __init__(self, data_path, num_folders, num_sounds):
    self.data_path = data_path
    self.num_folders = num_folders
    self.num_sounds = num_sounds

    self.sr = 48000
    self.channel = 2
    self.duration = 1000
    self.shift_pct = 0.1
            
  def __len__(self):
    return self.num_sounds
    
  def __getitem__(self, idx):
    folder = idx // (self.num_sounds // self.num_folders)
    label = (idx - folder * (self.num_sounds // self.num_folders)) // (self.num_sounds // (self.num_folders * 10))
    elem = (idx - folder * (self.num_sounds // self.num_folders)) - (label * (self.num_sounds // self.num_folders // 10))

    sound_file = os.path.join(self.data_path, f"{folder+1:02d}", f"{label}_{folder+1:02d}_{elem}.wav")

    sound = SoundUtils.getSpectrogramFromSound(sound_file, self.sr, self.channel, self.duration, self.shift_pct)

    return sound, label

# LOAD DATASETS
Given the names of the guys that record the digits, I get the training and the test sets of the sounds, that are tensor arrays. Training set will be the 80% of sounds, test set will be remaining 20% of sounds.

In [None]:
dataset = DigitsSoundDataset("gdrive/My Drive/Colab Notebooks/RECORDINGS/", 60, 30000) #60, 30000
num_ds = len(dataset)
num_train = round(num_ds * 0.8)
num_test = num_ds - num_train
training_set, test_set = random_split(dataset, [num_train, num_test])

# SET DATALOADERS
Creation of the loaders to train and test the network, they use the custom Dataset object to get individual data items and packages them into batch of data.

In [None]:
train_dataloader = DataLoader(training_set, batch_size=40, shuffle=True)
test_dataloader = DataLoader(test_set, batch_size=40, shuffle=False)

# BUILD THE CNN MODEL
Definition of the convolutional neural network of sound. Is defined to build a CNN classification architecture to process them. It has 4 convolutional layers that generate the feature map. These data are reshaped in a format that we can use like an input of a linear classifier layer, which give in output the predictions of the 10 classes.

In [None]:
class SoundCNN(nn.Module):
    def __init__(self):
        super().__init__()
        conv_layers = []

        self.conv1 = nn.Conv2d(2, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]

        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=10)

        self.conv = nn.Sequential(*conv_layers)
 
    def forward(self, x):
        x = self.conv(x)
        x = self.ap(x)
        x = x.view(x.shape[0], -1)
        x = self.lin(x)
        return x

In [None]:
sound_model = SoundCNN().to(device)
next(sound_model.parameters()).device
print(sound_model)

SoundCNN(
  (conv1): Conv2d(2, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
  (relu1): ReLU()
  (bn1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (relu2): ReLU()
  (bn2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv3): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (relu3): ReLU()
  (bn3): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv4): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (relu4): ReLU()
  (bn4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (ap): AdaptiveAvgPool2d(output_size=1)
  (lin): Linear(in_features=64, out_features=10, bias=True)
  (conv): Sequential(
    (0): Conv2d(2, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
    (1): ReLU()
    (2): BatchNorm2d(8, eps=1e-05, momentum=0.

# LOSS FUNCTION
Definition of the loss function, where it will be the cross entropy loss.

In [None]:
loss_function = nn.CrossEntropyLoss()
print(loss_function)

CrossEntropyLoss()


# OPTIMIZATION OF REGULARIZATION
Definition of the function of optimization of regularization, that it will be the Adam function.

In [None]:
optimizer = optim.Adam(sound_model.parameters(), lr=0.001)
print(optimizer)

Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    eps: 1e-08
    lr: 0.001
    weight_decay: 0
)


# TRAIN THE NETWORK
After the definitions of loss and optimizer functions to dynamically vary our learning rate as training progresses, I will define the train function. It is trained for epochs, processing a batch of data in every iteration. Then we track the accuracy, the loss and the time for every part of batch.

In [None]:
def train(num_epoch, cnn, train_loader):
  cnn.train()
  t = time.time()

  for epoch in range(num_epoch):
      running_loss = 0.0
      correct_prediction = 0
      total_prediction = 0

      for i, data in enumerate(train_loader):
          inputs = Variable(data[0]).to(device)
          labels = Variable(data[1]).to(device)

          outputs = cnn(inputs)
          loss = loss_function(outputs, labels)
          optimizer.zero_grad()
          loss.backward()
          optimizer.step()

          running_loss += loss.item()

          _, prediction = torch.max(outputs,1)
          
          correct_prediction += (prediction == labels).sum().item()
          total_prediction += prediction.shape[0]

          if((i+1) % 20 == 0):
            num_batches = len(train_loader)
            avg_loss = running_loss / num_batches
            acc = correct_prediction/total_prediction
            elapsed = time.time() - t
            print(f'Epoch: [{epoch+1}/{num_epoch}], Batch: [{i+1}/{len(train_loader)}], Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}, Time: {elapsed:.1f}')
            t = time.time()

In [None]:
num_epoch = 10
train(num_epoch, sound_model, train_dataloader)

Epoch: [1/10], Batch: [20/600], Loss: 0.08, Accuracy: 0.16, Time: 322.6
Epoch: [1/10], Batch: [40/600], Loss: 0.15, Accuracy: 0.21, Time: 293.7
Epoch: [1/10], Batch: [60/600], Loss: 0.21, Accuracy: 0.26, Time: 296.5
Epoch: [1/10], Batch: [80/600], Loss: 0.28, Accuracy: 0.29, Time: 293.1
Epoch: [1/10], Batch: [100/600], Loss: 0.34, Accuracy: 0.32, Time: 294.3
Epoch: [1/10], Batch: [120/600], Loss: 0.39, Accuracy: 0.35, Time: 291.9
Epoch: [1/10], Batch: [140/600], Loss: 0.44, Accuracy: 0.37, Time: 291.5
Epoch: [1/10], Batch: [160/600], Loss: 0.49, Accuracy: 0.40, Time: 291.0
Epoch: [1/10], Batch: [180/600], Loss: 0.54, Accuracy: 0.42, Time: 290.3
Epoch: [1/10], Batch: [200/600], Loss: 0.58, Accuracy: 0.45, Time: 288.6
Epoch: [1/10], Batch: [220/600], Loss: 0.62, Accuracy: 0.47, Time: 288.2
Epoch: [1/10], Batch: [240/600], Loss: 0.66, Accuracy: 0.49, Time: 288.3
Epoch: [1/10], Batch: [260/600], Loss: 0.69, Accuracy: 0.51, Time: 289.5
Epoch: [1/10], Batch: [280/600], Loss: 0.72, Accuracy: 

# CHECK HOW NETWORK BEHAVIES IN TEST SET
Finally to check the evalutation of the metric, i will do an inference loop removing the updating of the gradient, then I use the CNN to see if the predicted value is the same of the test label. Also I can build the confusion matrix to understand if the prediction it will be good or not, seeing if the values on the diagonal are very high and in the others are near to 0.


In [None]:
actual_labels_test = []
predictes_labels_test = []
sound_model.to('cpu')

with torch.no_grad():
  for data in test_dataloader:
    inputs = data[0]
    labels = data[1]
    test_output = sound_model(inputs)
    pred_y = torch.max(test_output,1)[1].data.squeeze()
    actual_labels_test += labels.tolist()
    predictes_labels_test += pred_y.tolist()

confusion_matrix(actual_labels_test, predictes_labels_test)

array([[620,   0,   2,   2,   2,   1,   0,   1,   0,   0],
       [  0, 564,   2,   1,   2,   1,   0,   0,   0,   7],
       [  1,   0, 553,   2,   0,   0,   1,   1,   0,   0],
       [  0,   0,   2, 583,   0,   1,   1,   0,   1,   0],
       [  0,   0,   2,   1, 592,   2,   0,   0,   0,   0],
       [  0,   0,   0,   0,   2, 558,   0,   3,   0,   1],
       [  0,   0,   0,   0,   0,   1, 619,   1,   4,   0],
       [  1,   0,   1,   0,   2,   2,   0, 616,   0,   1],
       [  0,   0,   1,   1,   0,   2,   0,   0, 610,   0],
       [  0,   1,   1,   1,   0,   5,   0,   0,   0, 618]])

# SAVE MODEL
I save the model, to reuse it in other applications.

In [None]:
torch.save({
  'epoch': num_epoch,
  'model_state_dict': sound_model.state_dict(),
  'optimizer_state_dict': optimizer.state_dict()
  }, 'gdrive/My Drive/Colab Notebooks/Progetto/digits_speach_model.pth')