## Downloading Dependencies

In [None]:
# install torchaudio
!pip install torchaudio==0.7.0 -f https://download.pytorch.org/whl/torch_stable.html

In [None]:
import os
import numpy as np
import pandas as pd

In [None]:
import random
import librosa

import warnings
warnings.filterwarnings("ignore")

# current torch version is 1.7.0+cu101
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torchaudio.transforms import MelSpectrogram
from torchaudio.transforms import TimeMasking
from torchaudio.transforms import FrequencyMasking
from torchaudio.transforms import TimeStretch

import torchaudio

import matplotlib.pyplot as plt
import IPython.display as ipd

In [None]:
# check if cuda GPU is available, make sure you're using GPU runtime on Google Colab
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device) # you should output "cuda"

## Importing dataset from Google Drive



In [None]:
from google.colab import drive
drive.mount('/content/drive/')
base_folder = '/content/drive/MyDrive' # Change this path to your desired directory
training_path = os.path.join(base_folder, "training_dataset.zip") # zip file for training (training + validation) dataset

!unzip $training_path

test_path = os.path.join(base_folder, "test_dataset.zip") # zip file for training dataset
!unzip $test_path

## Speech Classification Dataset

In [None]:
class CustomSpeechDataset(torch.utils.data.Dataset):
  def __init__(self, path, typ='train', transforms=None):

    assert typ == 'train' or typ == 'test', 'typ must be either "train" or "test"'

    self.typ = typ
    self.transforms = transforms
    self.targets = []

    if self.typ == 'train':
      self.class_names = sorted(os.listdir(path))
      num_classes = len(self.class_names)

      for class_idx, class_name in enumerate(self.class_names):
        class_dirx = os.path.join(path, class_name)
        wav_list = os.listdir(class_dirx)

        for wav_file in wav_list:
          self.targets.append({
              'filename': wav_file,
              'path': os.path.join(class_dirx, wav_file),
              'class': class_name
          })

    if self.typ == 'test':
      wav_list = os.listdir(path)
      for wav_file in wav_list:
        self.targets.append({
            'filename': wav_file,
            'path': os.path.join(path, wav_file)
        })
  
  def __len__(self):
    return len(self.targets)

  def __getitem__(self, idx):
    if torch.is_tensor(idx):
      idx.tolist()

    signal, sr = torchaudio.load(self.targets[idx]['path'], normalization=True)
    filename = self.targets[idx]['filename']

    if self.transforms:
      for transform in self.transforms:
        signal = transform(signal)

    if self.typ == 'train':
      clx_name = self.targets[idx]['class']
      return filename, signal, sr, clx_name
    
    elif self.typ == 'test':
      return filename, signal, sr

In [None]:
full_dataset = CustomSpeechDataset(path='training_dataset', typ='train') # Change to the folder inside the zip file
train_size = int(len(full_dataset)*0.8)
valid_size = len(full_dataset) - train_size
train_set, valid_set = torch.utils.data.random_split(full_dataset, [train_size, valid_size])
labels = full_dataset.class_names

In [None]:
labels_to_indices = {}
for idx, l in enumerate(labels):
  labels_to_indices[l] = idx

Let's next look at one example from the training set.

In [None]:
filename, waveform, sample_rate, label_id = train_set[0]

In [None]:
print("Shape of waveform: {}".format(waveform.size()))
print("Sample rate of waveform: {}".format(sample_rate))

# We can plot the waveform using matplotlib
plt.plot(waveform.t().numpy());

In [None]:
# We can play the audio clip and hear it for ourselves!
ipd.Audio(waveform.numpy(), rate=sample_rate)

## Audio Augmentation

To make our model more robust and less susceptible to overfitting of training instances, we can make random changes to make it different in each epoch.

In [None]:
# let's set up a list of transformations we are going to apply to the waveforms
transformations = []

### Add Noise to Audio


In [None]:
class AddNoise(torch.nn.Module):
  def __init__(self, noise_amt = 0.1):
    super().__init__()
    self.noise_amt = noise_amt

  def forward(self, waveform):
    waveform += np.random.normal(0, self.noise_amt*torch.max(waveform), waveform.size())
    return waveform

transformations.append(AddNoise(noise_amt = 0.01))

### Convert Tensor to Numpy
We can simplify augmentation operations using Numpy operations

In [None]:
class toNumpy(torch.nn.Module):
    def __init__(self, log_offset = 1e-6):
        super().__init__()

    def forward(self, waveform):
        return waveform[0].numpy();
        
transformations.append(toNumpy())

### Time Warping
We can perform a time warp by shifting the audio to the left or right

In [None]:
class AddShift(torch.nn.Module):
  def __init__(self, max_shift = 1600):
    super().__init__()
    self.max_shift = max_shift
    random.seed()

  def forward(self, waveform):
    return np.roll(waveform, int(random.randint(-self.max_shift, self.max_shift)))

transformations.append(AddShift(max_shift = 1600))

### Stretch
We squeeze the audio by a random factor.

If you wish to perform a stretch, decrease `stretch_min` and increase `req_length` under `PadAudio()` accordingly.

Padding must increase to accomodate for the amount of stretch, if not, the input Tensor to the model will differ in shape or size.

In [None]:
class AddStretch(torch.nn.Module):
  def __init__(self, stretch_min = 0.8, stretch_max = 1):
    super().__init__()
    self.stretch_min = stretch_min
    self.stretch_max = stretch_max
    random.seed()

  def forward(self, waveform):
    return librosa.effects.time_stretch(waveform,random.uniform(self.stretch_min, self.stretch_max))

transformations.append(AddStretch(stretch_min = 1, stretch_max = 1.3))

### Converting Numpy back to Tensor

In [None]:
class toTensor(torch.nn.Module):
    def __init__(self, log_offset = 1e-6):
        super().__init__()

    def forward(self, np_array):
        return torch.from_numpy(np_array).type(torch.float32);

transformations.append(toTensor())

### Padding Audio
The model takes in a Tensor as an input, in order to insert our Tensors into the model, we have to ensure that the Tensors have the same size or shape. The sample length varies across audio clips.

To ensure constant sample lengths, we can pad the audio clips to a maximum sample length of 16000. (16000 sample length is equal to 1 second at 16,000 Hz sampling rate)

We will pad audio clips, which are less than 1 second, with parts of itself.

In [None]:
audio_lens = []
for i in range(len(train_set)):
  audio_lens.append(train_set[i][1].size(1))

print('Max Sample Length:', max(audio_lens))
print('Min Sample Length:', min(audio_lens))

class PadAudio(torch.nn.Module):
  def __init__(self, req_length = 16000):
    super().__init__()
    self.req_length = req_length

  def forward(self, waveform):
    while waveform.size(0) < self.req_length:
      waveform = torch.cat((waveform, waveform[:self.req_length - waveform.size(0)]), axis=0)
    return waveform

transformations.append(PadAudio())

## Features

In this classification example, instead of using the raw waveform of the audio clips, we will craft handmade audio features known as melspectrograms instead.

For an in-depth explanation of what a melspectrogram is, I would highly recommend reading this article [here](https://medium.com/analytics-vidhya/understanding-the-mel-spectrogram-fca2afa2ce53).

In short, a melspectrogram is a way to represent an audio signal’s loudness as it varies over time at different frequencies, while scaled to how humans perceive sound. (We can easily tell the difference between 500 and 1000 Hz, but we can't between 10,000 and 10,500 Hz.)

![pic](https://i.ibb.co/WDsqsfb/melspectrogram.png)


TorchAudio has an in-built method that can help us with this transformation. We shall then apply log scaling.

In [None]:
# We define our own log transformation here
class LogMelTransform(torch.nn.Module):

    def __init__(self, log_offset = 1e-6):
        super().__init__()
        self.log_offset = log_offset

    def forward(self, melspectrogram):
        return torch.log(melspectrogram + self.log_offset)

transformations.append(MelSpectrogram(sample_rate = 16000, n_mels = 128))
transformations.append(LogMelTransform())

## Features for Evaluation

We will require a transformation without any form of augmentation to validate and predict on new audios

In [None]:
eval_transformations = []

eval_transformations.append(PadAudio())
eval_transformations.append(MelSpectrogram(sample_rate = 16000, n_mels = 128)) # CHANGE: input size to 256, edit: back to 128 so model is scalable
eval_transformations.append(LogMelTransform())

## Spectogram Data Augmentation

We will do a simple data augmentation process in order to increase the variations in our dataset.

In the audio domain, the augmentation technique known as [SpecAugment](https://arxiv.org/abs/1904.08779) is often used. It makes use of 3 steps:
- Time Warp (warps the spectrogram to the left or right)
- Frequency Masking (randomly masks a range of frequencies)
- Time Masking (randomly masks a range of time)

![specaugment pic](https://drive.google.com/uc?export=view&id=1C085-PlXVhjzh4kzCy869VHRGwC3aDHJ)

In [None]:
# Let's extend the list of transformations with the augmentations
transformations.append(TimeMasking(time_mask_param = 10)) 
transformations.append(FrequencyMasking(freq_mask_param = 16)) 
transformations

In [None]:
#Let's visualise the changes we have so far
filename, waveform, sample_rate, label_id = train_set[0]

for transform in transformations:
  waveform = transform(waveform)
plt.plot(waveform.t());

## Data Loaders

Let's now set up our data loaders so that we can streamline the batch loading of data for our model training later on. 

In [None]:
BATCH_SIZE = 8
NUM_WORKERS = 4
PIN_MEMORY = True if device == 'cuda' else False

def train_collate_fn(batch):

    # A data tuple has the form:
    # filename, waveform, sample_rate, label

    tensors, targets, filenames = [], [], []

    # Gather in lists, and encode labels as indices
    for filename, waveform, sample_rate, label in batch:
        # apply transformations
        for transform in transformations:
            waveform = transform(waveform)
        waveform = waveform.squeeze().T
        tensors += [waveform]
        targets += [labels_to_indices[label]]
        filenames += [filename]

    # Group the list of tensors into a batched tensor
    tensors = torch.stack(tensors)
    targets = torch.LongTensor(targets)

    return (tensors, targets, filenames)

def eval_collate_fn(batch):

    # A data tuple has the form:
    # filename, waveform, sample_rate, label

    tensors, targets, filenames = [], [], []

    # Gather in lists, and encode labels as indices
    for filename, waveform, sample_rate, label in batch:
        # apply transformations
        for transform in eval_transformations:
            waveform = transform(waveform)
        waveform = waveform.squeeze().T
        tensors += [waveform]
        targets += [labels_to_indices[label]]
        filenames += [filename]

    # Group the list of tensors into a batched tensor
    tensors = torch.stack(tensors)
    targets = torch.LongTensor(targets)
    filenames += [filename]

    return (tensors, targets, filenames)

train_loader = torch.utils.data.DataLoader(
    train_set,
    batch_size=BATCH_SIZE,
    shuffle=True,
    drop_last=False,
    collate_fn=train_collate_fn,
    num_workers=NUM_WORKERS,
    pin_memory=PIN_MEMORY,
)

valid_loader = torch.utils.data.DataLoader(
    valid_set,
    batch_size=BATCH_SIZE,
    shuffle=False,
    drop_last=False,
    collate_fn=eval_collate_fn,
    num_workers=NUM_WORKERS,
    pin_memory=PIN_MEMORY,
)

## Setting up the LSTM-RNN Model

In [None]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, device, classes=None):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.device = device
        self.classes = classes

    def forward(self, x):
        # Set initial hidden and cell states
        batch_size = x.size(0)
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(self.device) 
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(self.device) 
        
        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # shape = (batch_size, seq_length, hidden_size)
        
        # Decode the hidden state of the last time step
        out = self.fc(out[:, -1, :])
        return out

    def predict(self, x):
        '''Predict one label from one sample's features'''
        # x: feature from a sample, LxN
        #   L is length of sequency
        #   N is feature dimension
        x = torch.tensor(x[np.newaxis, :], dtype=torch.float32)
        x = x.to(self.device)
        outputs = self.forward(x)
        _, predicted = torch.max(outputs.data, 1)
        predicted_index = predicted.item()
        return predicted_index


In [None]:
# initialise dataset object for test set
test_set = CustomSpeechDataset(path='test_dataset', typ='test')

In [None]:
# define test collate function and set up test loader
def test_collate_fn(batch):

    # A data tuple has the form:
    # filename, waveform, sample_rate

    tensors, filenames = [], []

    # Gather in lists
    for filename, waveform, sample_rate in batch:
        # apply transformations
        for transform in eval_transformations:
            waveform = transform(waveform)
        waveform = waveform.squeeze().T
        tensors += [waveform]
        filenames += [filename]

    # Group the list of tensors into a batched tensor
    tensors = torch.stack(tensors)

    return (tensors, filenames)

test_loader = torch.utils.data.DataLoader(
    test_set,
    batch_size=BATCH_SIZE,
    shuffle=False,
    drop_last=False,
    collate_fn=test_collate_fn,
    num_workers=NUM_WORKERS,
    pin_memory=PIN_MEMORY,
)

In [None]:
# initialize the model class
model = RNN(input_size=128, hidden_size=256, num_layers=2, num_classes=len(labels), device=device, classes=labels).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
optimizer.zero_grad()

# scheduler that decays by 0.1 every 3 epochs
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma = 0.9) 

# define number of epochs
num_epochs = 50

In [None]:
for epoch in range(1,num_epochs+1):

  # training steps
  model.train()
  count_correct, count_total = 0, 0
  for idx, (features, targets, filenames) in enumerate(train_loader):

    features = features.to(device)
    targets = targets.to(device)

    # forward pass
    outputs = model(features)
    loss = criterion(outputs, targets)

    # backward pass
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

    # training results
    _, argmax = torch.max(outputs, 1)
    count_correct += (targets == argmax.squeeze()).sum().item()
    count_total += targets.size(0)

  train_acc = count_correct / count_total
  
  # evaluation steps
  model.eval()
  count_correct, count_total = 0, 0
  with torch.no_grad():
    for idx, (features, targets, filenames) in enumerate(valid_loader):

      features = features.to(device)
      targets = targets.to(device)

      # forward pass
      val_outputs = model(features)
      val_loss = criterion(val_outputs, targets)

      # validation results
      _, argmax = torch.max(val_outputs, 1)
      count_correct += (targets == argmax.squeeze()).sum().item()
      count_total += targets.size(0)

  # print results
  valid_acc = count_correct / count_total
  print('Epoch [{}/{}], Train loss = {:.4f}, Train accuracy = {:.2f}, Valid loss = {:.4f}, Valid accuracy = {:.2f}' 
        .format(epoch, num_epochs, loss.item(), 100*train_acc, val_loss.item(), 100*valid_acc))
  
  scheduler.step()

  # # Early stoppage, if required
  # if 100*valid_acc >= 90.0:
  #     break

In [None]:
save_path = os.path.join(base_folder, 'audio_classification_lstm.pt') # Change this path to your desired directory
torch.save(model.state_dict(), save_path)

## Test Set

In [None]:
load_path = os.path.join(base_folder, 'audio_classification_lstm.pt') # Change this path to your desired directory
model.load_state_dict(torch.load(load_path, map_location=device))

# pass test set through the RNN model
model.eval()
pred_list, filename_list = [], []
with torch.no_grad():
  for idx, (features, filenames) in enumerate(test_loader):

    features = features.to(device)

    # forward pass
    outputs = model(features)

    # validation results
    _, argmax = torch.max(outputs, 1)
    pred_list += argmax.cpu().tolist()
    filename_list += filenames

## Converting prediction results into a CSV file

In [None]:
pred_path = os.path.join(base_folder, 'prediction.csv') # Change this path to your desired directory

result_tuple = list(zip(filename_list, pred_list))
submission = pd.DataFrame(result_tuple, columns=['filename', 'pred'])
submission = submission.sort_values('filename').reset_index(drop=True)
submission['label'] = submission['pred'].apply(lambda x: labels[x])
submission[['filename', 'label']].head()
submission[['filename', 'label']].to_csv(pred_path, header=None, index=None)

Predictions `prediction.csv` or the saved model `audio_classification_lstm.pt` can be found in `base_folder` defined above.