# Audio classificiation

This notebook performs audio classification in pytorch, using the [SpeechCommands dataset](https://arxiv.org/abs/1804.03209).

This code implements a 1-Dimensional Convoluitonal Neural Network that classifies raw waveforms of people speaking different voice instructions. The CNN model implemented here is based on [this paper](https://arxiv.org/pdf/1610.00087.pdf).

The code in this notebook is heavily modified (for readability and adaptability) from [this source](https://colab.research.google.com/github/pytorch/tutorials/blob/gh-pages/_downloads/c64f4bad00653411821adcb75aea9015/speech_command_classification_with_torchaudio_tutorial.ipynb), adapted by [Terence Broad](https://researchers.arts.ac.uk/2351-terence-broad) and [Irini Kalaitzidi](https://www.gold.ac.uk/computing/people/kalaitzidi-irini/).

## Build and Train Model

In [None]:
%matplotlib inline

import os
import sys
import math
import time
import pathlib
import numpy as np
import IPython.display as ipd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import torchaudio
# 'Audio Transforms'
import torchaudio.transforms as AT

# Get cpu, gpu or mps device for training
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

if "google.colab" in sys.modules:
    !pip install torchcodec

In [None]:
# fixed directory structure --------------
DATASETS_DIR = pathlib.Path("datasets")
DATASETS_DIR.mkdir(exist_ok=True)

MODELS_DIR = pathlib.Path("models")
MODELS_DIR.mkdir(exist_ok=True)
# ----------------------------------------

# change accordingly
MODEL_NAME = "speechcommands_conv"

MODEL_DIR = MODELS_DIR / "speechcommands"
MODEL_DIR.mkdir(exist_ok=True)

### Hyperparameters

In [None]:
SAMPLE_RATE_ORIG = 16000 # Sample rate for the speech commands dataset
SAMPLE_RATE_NEW = 8000   # Sample rate for our model
BATCH_SIZE = 200         # Batch size for training
LEARNING_RATE = 0.01     # Learning rate for training

### Define data transform

When working with torchaudio we can only define one transform here. This function downsamples the audio waveform from a sample rate of 16000 to 8000, which is fine for working with human voices and helps us train more efficiently.

Unlike when working with images, the padding and normalising of the data to the same length happens in the function `collate_audio_folder_batch`. As all of our audio files are different lengths, we need to harmonise them when we load in a random mini-batch. 

Note: [Resampling tutorial](https://docs.pytorch.org/audio/stable/tutorials/audio_resampling_tutorial.html).

In [None]:
transform = AT.Resample(
    orig_freq=SAMPLE_RATE_ORIG,
    new_freq=SAMPLE_RATE_NEW
)

### Create datasets

In [None]:
def find_classes(dirname):
    classes = [d for d in os.listdir(dirname) if os.path.isdir(os.path.join(dirname, d))]
    classes.sort()
    class_to_idx = {classes[i]: i for i in range(len(classes))}
    return classes, class_to_idx


class SpeechCommandsDataset(torchaudio.datasets.SPEECHCOMMANDS):
    def __init__(
        self,
        root = DATASETS_DIR,
        url = 'speech_commands_v0.02',
        folder_in_archive = "SpeechCommands",
        download = True,
        subset = None,
        transform = None,
    ):
        super().__init__(
            root = root,
            url = url,
            folder_in_archive = folder_in_archive,
            download = download,
            subset = subset
        )

        classes, class_to_idx = find_classes(os.path.join(root, folder_in_archive, url))
        self.classes = classes
        self.class_to_idx = class_to_idx 
        self.idx_to_class = {v: k for k, v in class_to_idx.items()}
        self.transform = transform

    def __getitem__(self, index):
        # https://github.com/pytorch/audio/blob/e1232690308a6b5297fcd06e925899a9b64f7280/src/torchaudio/datasets/speechcommands.py#L158
        # metadata is (path, sample rate, label (str), speaker ID, utterance number)
        metadata = self.get_metadata(index)

        waveform = torchaudio.datasets.utils._load_waveform(self._archive, metadata[0], metadata[1])
        if self.transform is not None:
            waveform = self.transform(waveform)
        
        # return only (waveform, class index)
        return (waveform, torch.tensor(self.class_to_idx[metadata[2]]))

train_dataset = SpeechCommandsDataset(
    subset = "training",
    transform = transform
)
val_dataset = SpeechCommandsDataset(
    subset = "validation",
    transform = transform
)
test_dataset = SpeechCommandsDataset(
    subset = "testing",
    transform = transform
)

In [None]:
train_dataset[0][0].shape

In [None]:
train_dataset.get_metadata(0)

In [None]:
NUM_CLASSES = len(train_dataset.classes)

In [None]:
def pad_sequence(batch):
    # Make all tensor in a batch the same length by padding with zeros
    # print(batch[0].shape)
    # batch has samples of shape (channels, timesteps) (usually: 1, ts), we need to pass them as (ts, C)
    batch = [item.t() for item in batch]
    batch = torch.nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=0.)
    # batch is now (B, TS, C), permuting back to (B, C, TS)
    # print(batch.shape)
    # Einstein summation notation: https://docs.pytorch.org/docs/stable/generated/torch.einsum.html
    return torch.einsum("btc -> bct", batch)

def collate_audio_folder_batch(batch):
    # Empty lists for adding data too
    tensors, targets = [], []

    # Gather in lists, and encode labels as indices
    for waveform, label in batch:
        tensors += [waveform]
        targets += [label]

    # Group the list of tensors into a batched tensor
    tensors = pad_sequence(tensors)
    targets = torch.stack(targets)
    return tensors, targets

if device == "cuda":
    num_workers = 1
    pin_memory = True
else:
    num_workers = 0
    pin_memory = False

# Create dataloaders
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    collate_fn=collate_audio_folder_batch,
    shuffle=True,
    num_workers=num_workers,
    pin_memory=pin_memory,    
)

val_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    collate_fn=collate_audio_folder_batch,
    shuffle=False,
    num_workers=num_workers,
    pin_memory=pin_memory,    
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    collate_fn=collate_audio_folder_batch,
    shuffle=False
)

for i, (waveform, label) in enumerate(train_loader):
    print(waveform.shape)
    break

### Plot a sample of the data

In [None]:
data_batch, label_batch = next(iter(train_loader))
sample_waveform = data_batch[0].squeeze()
sample_class = label_batch[0].item()

print(f"Data batch shape: {data_batch.shape} (B, C, TS)")
print(f"Shape of waveform: {sample_waveform.size()}")
print(f"Class of waveform: '{train_dataset.idx_to_class[sample_class]}'")

plt.plot(sample_waveform.t().numpy())
ipd.Audio(sample_waveform.numpy(), rate=SAMPLE_RATE_NEW)

### Define the Network

Here we define a 1-Dimensional convolutional neural network to process raw audio data. The specific architecture is modeled after the M5 network architecture described in [this paper](https://arxiv.org/pdf/1610.00087.pdf). 

In [None]:
class M5(nn.Module):
    def __init__(self, n_input=1, n_output=NUM_CLASSES, stride=16, n_channel=32, kernel_size=80):
        super().__init__()
        self.conv1 = nn.Conv1d(n_input, n_channel, kernel_size=kernel_size, stride=stride)
        self.bn1 = nn.BatchNorm1d(n_channel)
        self.pool1 = nn.MaxPool1d(4)
        self.conv2 = nn.Conv1d(n_channel, n_channel, kernel_size=3)
        self.bn2 = nn.BatchNorm1d(n_channel)
        self.pool2 = nn.MaxPool1d(4)
        self.conv3 = nn.Conv1d(n_channel, 2 * n_channel, kernel_size=3)
        self.bn3 = nn.BatchNorm1d(2 * n_channel)
        self.pool3 = nn.MaxPool1d(4)
        self.conv4 = nn.Conv1d(2 * n_channel, 2 * n_channel, kernel_size=3)
        self.bn4 = nn.BatchNorm1d(2 * n_channel)
        self.pool4 = nn.MaxPool1d(4)
        self.fc1 = nn.Linear(2 * n_channel, n_output)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(self.bn1(x))
        x = self.pool1(x)
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        x = self.pool2(x)
        x = self.conv3(x)
        x = F.relu(self.bn3(x))
        x = self.pool3(x)
        x = self.conv4(x)
        x = F.relu(self.bn4(x))
        x = self.pool4(x)
        x = F.avg_pool1d(x, x.shape[-1])
        x = x.permute(0, 2, 1)
        x = self.fc1(x)
        return x
        

### Setup core objects

Here we setup our core objects, the model, the loss function (criterion) and the optimiser.

In [None]:
# model = M5(n_input=1, n_output=NUM_CLASSES)
model = M5()
print(model)

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)
    
n = count_parameters(model)
print(f"Number of parameters: {n}.")

optimizer = optim.Adam(
    model.parameters(),
    lr=LEARNING_RATE,
    weight_decay=0.0001
)

# reduce the learning after 20 epochs by a factor of 10
scheduler = optim.lr_scheduler.StepLR(
    optimizer,
    step_size=20,
    gamma=0.1
)

criterion = nn.CrossEntropyLoss()

### Utils

In [None]:
def number_of_correct(preds, labels):
    # count number of correct predictions
    return preds.squeeze().eq(labels).sum().item()

def get_likely_index(tensor):
    # find most likely label index for each element in the batch
    return tensor.argmax(dim=-1)

### Training loop

Here is our training loop for our data. Look at how the training set and validation set are used differently. 

What differences are there in the code when we cycle through each of these sets of data?

In [None]:
# Number of epochs for training (this is a large dataset so not many epochs needed)
NUM_EPOCHS = 5 

# Log process every n interations
PRINT_EVERY = 100

# initialising loss lists here, so that we can do multiple runs
train_losses = []
val_losses = []

best_loss = 100000

In [None]:
# Put model on device
model.to(device)

# For each cycle of the dataset
for epoch in range(NUM_EPOCHS):
    
    # Variables to keep track of running loss
    train_loss = 0.0
    val_loss = 0.0

    # --------------------------------------------------------------------------------
    # Train loop (this could be wrapped into a function)

    # Put model on device & in training mode
    model.train()

    t = time.time()
    
    # For each batch in one cycle of the training set
    for batch_idx, (batch, labels) in enumerate(train_loader):

        # Move batch to whatever device we are running training on
        batch = batch.to(device)
        labels = labels.to(device)

        # Forward pass with the model
        output = model(batch)

        # Evaluate classification accuracy
        loss = criterion(output.squeeze(), labels)
        
        # Backpropagate loss and update gradients
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Keep track off loss over time
        train_loss += loss.item()
        train_losses.append(loss.item())

        # print training stats
        if batch_idx % PRINT_EVERY == 0 or batch_idx == len(train_loader) - 1:
            len_fmt = len(str(len(train_loader)))
            msg = f"Train Epoch: {epoch + 1} [{batch_idx:>{len_fmt}}/{len(train_loader)} "
            msg += f"({100. * batch_idx / len(train_loader):>3.0f}%)] | Loss: {loss.item():.6f}"
            msg += f" | {time.time() - t:.2f}s"
            print(msg)
            t = time.time()

    # --------------------------------------------------------------------------------
    # Validation loop (could also be wrapped into a function)

    # Put model in evaluation mode (turn off batch norm)
    model.eval()

    # Without gradient tracking 
    with torch.no_grad():
        
        # Variable to track total correct classifications
        correct = 0

        # Validation loop
        # For each batch in one cycle of the validation set
        for batch, targets in val_loader:
            
            # Move batch to whatever device we are running training on
            batch = batch.to(device)
            targets = targets.to(device)

            # Forward pass with the model
            output = model(batch)

            # Evaluate classification accuracy
            loss = criterion(output.squeeze(), targets)
            
            # Track loss
            val_loss += loss.item()
            val_losses.append(loss.item())

            # Get top prediction
            pred = get_likely_index(output)
            
            # Check if prediction is correct
            correct += number_of_correct(pred, targets)

    # --------------------------------------------------------------------------------
    # Printing & stats
    
    # Normalise cumulative losses to dataset size
    # train_loss = train_loss / len(train_loader)
    val_loss = val_loss / len(val_loader)
    
    # # Added cumulative losses to lists for later display
    # train_losses.append(train_loss)
    # val_losses.append(val_loss)

    msg = f"   Val Acc: {100. * correct / len(val_loader.dataset):.0f}% ({correct}/{len(val_loader.dataset)})"
    msg += f" | Val Loss: \033[1m{val_loss:.6f}\033[0m"
    print(msg)

    # update learning rate
    scheduler.step()

    # if validation score is lowest so far, save the model
    if val_loss < best_loss:
        print("   Val loss improved, saving model.")
        best_loss = val_loss
        torch.save(model.state_dict(), MODEL_DIR / f"{MODEL_NAME}.pt")
        torch.jit.save(torch.jit.script(model), MODEL_DIR / f"{MODEL_NAME}_scripted.pt")
    print()

In [None]:
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 6), sharex=False)

ax1.plot(train_losses)
ax1.set_title("Training loss")
ax1.set_ylabel("loss")

ax2.plot(val_losses, color="tab:orange")
ax2.set_title("Validation loss")
ax2.set_xlabel("steps")
ax2.set_ylabel("loss")

plt.tight_layout()
plt.show()

## Use our model

In [None]:
model = torch.jit.load(MODEL_DIR / f"{MODEL_NAME}_scripted.pt")

In [None]:
def predict(tensor):
    model.to(device)
    model.eval()
    tensor = tensor.to(device)
    with torch.no_grad():    
        # Use the model to predict the label of the waveform
        tensor = F.pad(tensor, pad = (tensor.shape[0], SAMPLE_RATE_NEW - tensor.shape[-1] - 1,), value = 0.)
        tensor = model(tensor[None, ...])
        label = get_likely_index(tensor).cpu().item()
        class_name = train_dataset.idx_to_class[label]
    return class_name

### Test a sample from the test set (using `train_dataset`)

In [None]:
waveform, label = test_dataset[torch.randint(0,len(test_dataset), (1,))]
ipd.display(ipd.Audio(waveform.numpy(), rate=SAMPLE_RATE_NEW))
print(f"Expected: {train_dataset.idx_to_class[label.item()]}. Predicted: {predict(waveform)}.")

### Test a sample from the test set (using `test_loader`)

In [None]:
test_samples, test_targets = next(iter(test_loader))
test_sample = test_samples[0]
test_target = test_targets[0]

print(f"Expected: {test_dataset.idx_to_class[test_target.item()]}. Predicted: {predict(test_sample)}.")
ipd.Audio(test_sample.numpy(), rate=8000)

### Test on test set

In [None]:
model.eval()

with torch.no_grad():
    
    test_loss = 0.0
    test_correct = 0.0
    
    for i, (batch, targets) in enumerate(test_loader):
        
        # Move batch to whatever device we are running training on
        batch = batch.to(device)
        targets = targets.to(device)

        # Forward pass with the model
        output = model(batch)

        # # Evaluate classification accuracy
        # loss = criterion(output.squeeze(), targets)
        
        # # Track loss
        # val_loss += loss.item()

        # Get top prediction
        pred = get_likely_index(output)
        
        # Check if prediction is correct
        test_correct += number_of_correct(pred, targets)

print(f"{test_correct:.0f}/{len(test_loader.dataset)} correct samples.")
print(f"Accuracy: {test_correct / len(test_loader.dataset) * 100:.2f}%")

### Check wrong classifications

In [None]:
def browse_misclassified():
    model.eval()
    for i, (waveform, label) in enumerate(test_dataset):
        utterance = test_dataset.idx_to_class[label.item()]
        # print(waveform.shape)
        with torch.no_grad():
            output = predict(waveform)
            if output != utterance:
                print(f"Data point #{i}. Expected: {utterance}. Predicted: {output}.")
                yield ipd.Audio(waveform.numpy(), rate=SAMPLE_RATE_NEW)
    else:
        print("All examples in this dataset were correctly classified!")
        print("In this case, let's just look at the last data point")
        print(f"Data point #{i}. Expected: {utterance}. Predicted: {output}.")
        return pd.Audio(waveform.numpy(), rate=SAMPLE_RATE_NEW)

browse_iter = iter(browse_misclassified())

In [None]:
next(browse_iter)

### Test your own sample

In [None]:
test_wav, test_sample_rate = torchaudio.load("sounds/tree.wav")
print(test_wav.shape, test_sample_rate)

ipd.Audio(test_wav[:1].numpy(), rate=test_sample_rate)

In [None]:
resampler = AT.Resample(
    test_sample_rate,
    SAMPLE_RATE_NEW,
    dtype=test_wav.dtype
)
test_sample = resampler(test_wav)[:1, 1000:9000]
print(test_sample.shape)

ipd.Audio(test_sample.numpy(), rate=SAMPLE_RATE_NEW)

In [None]:
print(f"Expected: tree. Predicted: {predict(test_sample[:1])}.")

### Record your own sample

In [None]:
def record(seconds=1):
    # Colab
    if "google.colab" in sys.modules:
        from google.colab import output as colab_output
        from base64 import b64decode
        from io import BytesIO
        from pydub import AudioSegment
    
        RECORD = (
            b"const sleep  = time => new Promise(resolve => setTimeout(resolve, time))\n"
            b"const b2text = blob => new Promise(resolve => {\n"
            b"  const reader = new FileReader()\n"
            b"  reader.onloadend = e => resolve(e.srcElement.result)\n"
            b"  reader.readAsDataURL(blob)\n"
            b"})\n"
            b"var record = time => new Promise(async resolve => {\n"
            b"  stream = await navigator.mediaDevices.getUserMedia({ audio: true })\n"
            b"  recorder = new MediaRecorder(stream)\n"
            b"  chunks = []\n"
            b"  recorder.ondataavailable = e => chunks.push(e.data)\n"
            b"  recorder.start()\n"
            b"  await sleep(time)\n"
            b"  recorder.onstop = async ()=>{\n"
            b"    blob = new Blob(chunks)\n"
            b"    text = await b2text(blob)\n"
            b"    resolve(text)\n"
            b"  }\n"
            b"  recorder.stop()\n"
            b"})"
        )
        RECORD = RECORD.decode("ascii")
    
        print(f"Recording started for {seconds} seconds.")
        display(ipd.Javascript(RECORD))
        s = colab_output.eval_js("record(%d)" % (seconds * 1000))
        print("Recording ended.")
        b = b64decode(s.split(",")[1])
    
        fileformat = "wav"
        filename = f"_audio.{fileformat}"
        AudioSegment.from_file(BytesIO(b)).export(filename, format=fileformat)
        return torchaudio.load(filename)

    # Locally
    else:    
        import sounddevice as sd
    
        print(f"Recording started for {seconds} seconds.")
        audio = sd.rec(
            int(seconds * SAMPLE_RATE_NEW),
            samplerate=SAMPLE_RATE_NEW,
            channels=1,
            dtype="float32"
        )
        sd.wait()
        print("Recording ended.")
    
        waveform = torch.from_numpy(audio.T)
        return waveform, SAMPLE_RATE_NEW    

In [None]:
waveform, sample_rate = record()
print(f"Predicted: {predict(waveform)}.")
ipd.display(ipd.Audio(waveform.numpy(), rate=sample_rate))

# Notes

## Manually/randomly split a dataset

```python
full_dataset = SpeechCommandsDataset(
    subset = None,
    transform = transform
)

VAL_SIZE = 0.3

# Get train / val split for data points
generator = torch.Generator().manual_seed(42)

# split the indices
first_indices, second_indices = torch.utils.data.random_split(
    # provide all indices as a range
    range(len(full_dataset)),
    # provide percentages (.3, .7)
    [1 - VAL_SIZE, VAL_SIZE],
    generator=generator
)

# use the indices to select data from the full dataset
first_dataset_subset = torch.utils.data.Subset(full_dataset, first_indices)
second_dataset_subset = torch.utils.data.Subset(full_dataset, second_indices)

# sanity check
l_full = len(full_dataset)
l_first = len(first_dataset_subset)
l_second = len(second_dataset_subset)
print(l_full, l_first, l_second, l_full == l_first + l_second)
```

## Advanced: MFCC

It is possible to change [the transform to be an MFCC](https://pytorch.org/audio/main/generated/torchaudio.transforms.MFCC.html) (Mel-Frequence Cepstrum Coefficient) instead of training the network on a raw waveform. There are a couple of ways of doing this:
- [Change the downsampling transform](https://pytorch.org/audio/main/generated/torchaudio.transforms.MFCC.html) to calculate an MFCC instead. Then in the collate function (`audio_folder_collate_fn.py`) you can flatten the 2D MFCC respresentation into a 1D vector before all the vectors then get padded to the same length. This way you can continue to use the 1D CNN in this notebook. 
- Keep the downsampling transform the same. Instead, calculate the MFCC in the collate function after padding has been applied (`audio_folder_collate_fn.py`), this way all the MFCCs will have the same dimensionality. Instead of flattening the MFCC matricies into vectors, you can then repace the [1D CNN code](#define-the-network) with a 2-D CNN instead and train a classifier on the MFCC matricies (You can borrow coe from Week 3 for this).
- Keep the architecture a 1D CNN, and use the MFCC frequencies as channels (see [this ChatGPT thread](https://chatgpt.com/share/6970bc42-69b8-8005-bcf2-80a995e3408d)).