# Setting up the environment

In [104]:
!ls

CNN_v1.ckpt  drive  file.wav  recording.webm  requirements.txt	sample_data


In [105]:
# file path for our trained model
model_path = "CNN_v1.ckpt"

## Import Packages

In [106]:
!pip install pytorch-lightning



In [107]:
import os

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio
import torchaudio.transforms as T
import torch.utils.data as data
import pytorch_lightning as pl

# Audio preprocessing & mel spectrogram

In [108]:
#Output: tensor
import matplotlib.pyplot as plt
import math
import random
import torch
import torchaudio
from torchaudio import transforms

class AudioUtil():
    @staticmethod
    def open(audio_file):
        #load audio file
        signal, sample_rate = torchaudio.load(audio_file)
        return signal, sample_rate

    @staticmethod
    def standardize_channel(aud):
        #to standardize the audio files to 1 channel (in case some have 2)
        signal, sample_rate = aud

        if signal.shape[0] > 1:
          signal = torch.mean(signal, dim=0, keepdim=True)

        return signal, sample_rate

    @staticmethod
    def resampling(aud, target_sr):
      signal, sample_rate = aud

      if sample_rate == target_sr:
        return aud

      else:
        channel = signal.shape[0]
        resampled = torchaudio.transforms.Resample(sample_rate, target_sr)(signal[:1,:])

        return resampled, target_sr


    @staticmethod
    def standardize_duration(aud, max_time):
        #standardize all audio files to the same length by either extending duration with silence or truncating it
        signal, sample_rate = aud
        num_of_rows, signal_length = signal.shape
        max_length = sample_rate//1000 * max_time

        if (signal_length > max_length):
            #truncate signal to given length
            signal = signal[:, :max_length]

        elif (signal_length < max_length):
            #length of padding to add
            padding_len = max_length - signal_length

            #pad with 0s
            padding = torch.zeros(num_of_rows, padding_len)

            signal = torch.cat((signal, padding), 1)

        return signal, sample_rate

    @staticmethod
    def time_shift(aud, shift_limit):
        #data augmentation on raw audio by time shifting to left/right by a random amount
        signal, sample_rate = aud
        _, signal_length = signal.shape
        amount_to_shift = int(random.random() * shift_limit * signal_length)

        return signal.roll(amount_to_shift), sample_rate

    @staticmethod
    def spectro_gram(aud, n_mels=64, n_fft = 1024, hop_len=None):
        #convert augmented audio to a mel spectrogram
        signal, sample_rate = aud
        top_db = 100

        spec = transforms.MelSpectrogram(sample_rate, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(signal)

        spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
        return spec

    @staticmethod
    def mel_spectrogram_augment(spec, max_mask=0.1, n_freq_masks=1, n_time_masks=1):
        #another round of augmentation, on mel spectrogram rather than raw data
        #frequency mask and time mask
        _, n_mels, n_steps = spec.shape
        mask_value = spec.mean()
        aug_spec = spec

        #freq_mask_param: max possible time of the mask
        freq_mask_param = max_mask * n_mels
        for _ in range (n_freq_masks):
            aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

        #time_mask_param: max possible time of the mask
        time_mask_param = max_mask * n_steps
        for _ in range (n_time_masks):
            aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

        return aug_spec

def example_spec_from_aud(audio_path):
    # Sample preprocessing pipeline
    aud = AudioUtil.open(audio_path)
    #reaud = AudioUtil.resampling(aud, 8000) #change the number to the sampling rate you want, here is 8khz
    mono = AudioUtil.standardize_channel(aud)
    fixed_duration = AudioUtil.standardize_duration(mono, 5000) #duration = 5s
    shift_aud = AudioUtil.time_shift(fixed_duration, 0.4) #40%
    spectrogram = AudioUtil.spectro_gram(shift_aud, n_mels=64, n_fft=1024, hop_len=None)
    #n_mels = 64 because that is the normal speaking vocal range
    aug_sgram = AudioUtil.mel_spectrogram_augment(spectrogram, max_mask=0.1, n_freq_masks=2, n_time_masks=2)
    return aug_sgram


#######################################
###     preprocessing functions     ###
#######################################

def raw_from_aud(aud):
    # Takes in raw aud, outputs preprocessed raw signal
    # Edit this to tune preprocessing
    
    new_aud = AudioUtil.resampling(aud, 8000) #change the number to the sampling rate you want, here is 8khz
    new_aud = AudioUtil.standardize_channel(new_aud)
    signal, sr = AudioUtil.standardize_duration(new_aud, 5000) #duration = 5s

    return signal

def spec_from_aud(aud):
    # Takes in raw aud, outputs preprocessed mel spectrogram
    # Edit this to tune preprocessing

    new_aud = AudioUtil.standardize_channel(aud)
    new_aud = AudioUtil.standardize_duration(new_aud, 5460) #duration = 5.46s to get mel output length of 512
    new_aud = AudioUtil.time_shift(new_aud, 0.4) #40%
    spectrogram = AudioUtil.spectro_gram(new_aud, n_mels=64, n_fft=1024, hop_len=None)
    #n_mels = 64 because that is the normal speaking vocal range
    spectrogram = AudioUtil.mel_spectrogram_augment(spectrogram, max_mask=0.1, n_freq_masks=2, n_time_masks=2)

    return spectrogram

def print_spectrogram(tensor_input):
  #time vs amplitude
  two_d_spec = tensor_input[0]
  return plt.imshow(two_d_spec.permute(0,1))


# Viewing dataset and Obtaining Dataloader

In [109]:
##############################
###  Parameters to change  ###
##############################

# Put your preprocessing functions here
PREPROCESSOR = spec_from_aud

BATCH_SIZE = 1  # Default 64
# If we were to have variable input length, keep batch_size at 1 or implement collate_fn()

# This cuts the dataset down to have faster iterations during prototyping
REDUCED_DATASET = True

num_workers = 4
# Need to experiment in this parameter.
# Colab gives 2 CPU cores.

# Model definition

In [110]:
# Define training parameters

loss_function = F.nll_loss

optimizer = torch.optim.Adam

learning_rate = 0.001
epochs = 50
torch.manual_seed(28)

<torch._C.Generator at 0x7f3bf35032f0>

We define the model here for the model file to be loaded into a PyTorch model

In [111]:
# CNN model implementation for Mels spectrogram
class CustomModel(nn.Module):

    def __init__(self, num_classes=6, any_other_params_you_need=None):
        super().__init__()
        self.H, self.W = 64, 512  # Size of mel_spectrogram
        self.num_classes = num_classes

        # Define your layers here:
        self.conv1 = nn.Conv2d(1, 32, 3, padding="same")
        self.bn1 = nn.BatchNorm2d(32)
        self.maxpool1 = nn.MaxPool2d(2, stride=2) #max pooling

        self.conv2 = nn.Conv2d(32, 16, 5, padding="same")
        self.maxpool2 = nn.MaxPool2d(2, stride=1) #max pooling

        self.conv3 = nn.Conv2d(16, 8, 2, padding="same")
        self.bn2 = nn.BatchNorm2d(8)
        self.fc1 = nn.Linear(8 * 31 * 255, self.num_classes)

        self.pooling2x2 = lambda x: F.max_pool2d(x, 2, stride=2)

    def forward(self, inputs):
        x = F.relu(self.conv1(inputs))
        x = self.bn1(x)
        x = self.maxpool1(x)

        x = self.conv2(x)
        x = self.maxpool2(x)

        x = self.conv3(x)
        x = self.bn2(x)

        x = x.view(inputs.shape[0], -1)
        x = self.fc1(x)
        return F.log_softmax(x, dim=1)


In [112]:
import pytorch_lightning as pl
import torchmetrics

class LightningModel(pl.LightningModule):
    def __init__(self, model, learning_rate=1e-3, loss_function=F.nll_loss, optimizer=torch.optim.Adam, weight_decay=1e-6):
        super().__init__()
        self.learning_rate = learning_rate
        self.loss_function = loss_function
        # Weight decay for L2 regularization
        self.optimizer = optimizer(model.parameters(), lr=self.learning_rate, weight_decay=weight_decay)
        self.model = model
        
        self.train_acc = torchmetrics.Accuracy()
        self.val_acc = torchmetrics.Accuracy()
        self.test_acc = torchmetrics.Accuracy()

    def forward(self, x):
        # in lightning, forward defines the prediction/inference actions
        output = self.model(x)
        return output

    def training_step(self, batch, batch_idx):
        # training_step defined the train loop.
        # It is independent of forward
        x, y = batch
        output = self(x)  # Call self.forward function
        loss = self.loss_function(output, y)
        self.train_acc(output, y)
        # Logging to TensorBoard by default
        self.log("train_loss", loss, on_epoch=True)
        self.log("train_acc", self.train_acc, on_epoch=True)

        return loss

    def validation_step(self, batch, batch_idx):
        # training_step defined the train loop.
        # It is independent of forward
        x, y = batch
        output = self(x)  # Call self.forward function
        loss = self.loss_function(output, y)
        self.val_acc(output, y)
        # Logging to TensorBoard by default
        self.log("val_loss", loss, on_epoch=True)
        self.log("val_acc", self.val_acc, on_epoch=True)
        return loss

    def test_step(self, batch, batch_idx):
        # training_step defined the train loop.
        # It is independent of forward
        x, y = batch
        output = self(x)  # Call self.forward function
        loss = self.loss_function(output, y)
        self.test_acc(output, y)
        # Logging to TensorBoard by default
        self.log("test_loss", loss, on_epoch=True)
        self.log("test_acc", self.test_acc, on_epoch=True)
        return loss

    def configure_optimizers(self):
        return self.optimizer

# Load model

Once the model has been loaded, when we run `print(model)`, we should see a summary of the model architecture.


In [113]:
# Initialise a new model and load the state
model = LightningModel(CustomModel(), learning_rate=learning_rate, loss_function=loss_function, optimizer=optimizer)

# code for loading checkpoint file
checkpoint = torch.load(model_path)
model.load_state_dict(checkpoint['state_dict'])

print(model)

LightningModel(
  (model): CustomModel(
    (conv1): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (maxpool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (conv2): Conv2d(32, 16, kernel_size=(5, 5), stride=(1, 1), padding=same)
    (maxpool2): MaxPool2d(kernel_size=2, stride=1, padding=0, dilation=1, ceil_mode=False)
    (conv3): Conv2d(16, 8, kernel_size=(2, 2), stride=(1, 1), padding=same)
    (bn2): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (fc1): Linear(in_features=63240, out_features=6, bias=True)
  )
  (train_acc): Accuracy()
  (val_acc): Accuracy()
  (test_acc): Accuracy()
)


# Let the model guess your age!

In [114]:
# set up microphone audio recorder

!sudo apt install ffmpeg
!pip install torchaudio ipywebrtc notebook
!jupyter nbextension enable --py widgetsnbextension

from ipywebrtc import AudioRecorder, CameraStream
import torchaudio
from IPython.display import Audio

from google.colab import output
output.enable_custom_widget_manager()

Reading package lists... Done
Building dependency tree       
Reading state information... Done
ffmpeg is already the newest version (7:3.4.8-0ubuntu0.2).
0 upgraded, 0 newly installed, 0 to remove and 40 not upgraded.
Enabling notebook extension jupyter-js-widgets/extension...
Paths used for configuration of notebook: 
    	/root/.jupyter/nbconfig/notebook.json
      - Validating: [32mOK[0m
Paths used for configuration of notebook: 
    	/root/.jupyter/nbconfig/notebook.json


In [115]:
# initialise audio recorder

camera = CameraStream(constraints={'audio': True,'video':False})
recorder = AudioRecorder(stream=camera)

Click on the dot button to start recording, and click on it again to stop recording

In [116]:
# runs audio recorder interface

recorder

AudioRecorder(audio=Audio(value=b'', format='webm'), stream=CameraStream(constraints={'audio': True, 'video': …

In [117]:
# convert audio recording into .wav

with open('recording.webm', 'wb') as f:
    f.write(recorder.audio.value)
!ffmpeg -i recording.webm -ac 1 -f wav file.wav -y -hide_banner -loglevel panic
sig, sr = torchaudio.load("file.wav") # converted audio is saved as file.wav

In [118]:
# pass in file.wav into model for prediction

pred = model(torch.unsqueeze(PREPROCESSOR(torchaudio.load("file.wav")),0))
pred_label = torch.argmax(pred, dim=1)

print("predicted", pred_label)

label_to_classification = {0: "TEENS", 1: "TWENTIES", 2: "THIRTIES", 
                           3: "FOURTIES", 4: "FIFTIES", 5: "SIXTIES"}
classification = label_to_classification[pred_label.item()]

predicted tensor([0])


# The model thinks you are..

In [119]:
print(f"""From your voice... 

The model thinks you are in your {classification}!
  """)

From your voice... 

The model thinks you are in your TEENS!
  
