# Train a tiny ML to spot "Hey" and "Pepper"

### Software Requirements
- Python 3
- packages
    - pytorch
    - [nnAudio](https://github.com/KinWaiCheuk/nnAudio)
    - [AudioLoader](https://github.com/KinWaiCheuk/AudioLoader)

In [1]:
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import WeightedRandomSampler,DataLoader, Dataset
from AudioLoader.speech.speechcommands import SPEECHCOMMANDS_12C

import torch
import torch.nn as nn
import torch.optim as optim
from torchaudio._backend import load as load_audio
from nnAudio.features.mel import MelSpectrogram
from pytorch_lightning.core import LightningModule
from pytorch_lightning import Trainer

from typing import Literal

import random
import os
from typing import NamedTuple
from pathlib import Path


In [2]:
# setting up configuration
#device = 'cuda:0'
device = 'mps'
batch_size= 100
max_epochs = 200
check_val_every_n_epoch = 2
num_sanity_val_steps = 5
data_root= './speech_commands/' # Download the data here
download_option= False
n_mels= 40 
input_dim= (n_mels*101)
output_dim= 4

## Import speech commands dataset
We have generated the "Hey", "Pepper" data ourselves, but to train the model we need wrong/unknown data as well to teach the model to distinguish core words from unrecognized ones. For this we use the "speech commands 12 classes dataset" from google. It has 35 words, where ten of them are labeled as commands by convention and the rest is labeled as unknown, but we will relabel all as unknown.   

If we use the new pytorch 2.0+ we need to change the line 238 in speechcommands.py from AudioLoader:
```py
        # download_url(url, root, hash_value=checksum, hash_type="md5")
        download_url(url, archive)
```
And also line 14:
```py
    # from torchaudio.datasets.utils import _extract_zip as extract_archive
    from torchaudio.datasets.utils import _extract_tar as extract_archive
```

In [2]:
#get the 12 classes speechcommands dataset
_ = SPEECHCOMMANDS_12C(root=data_root,
                              url='speech_commands_v0.02',
                              folder_in_archive='SpeechCommands',
                              download= True,
                              subset= 'training')

basename='speech_commands_v0.02.tar.gz'
archive='./speech_commands/speech_commands_v0.02.tar.gz'


Loading training set: 100%|██████████| 84843/84843 [00:34<00:00, 2479.80it/s]


In [3]:
dataset_folder = 'dataset_gen'
keywords = ['hey', 'pepper']
directories = ['out-noisy', 'out-wav']

sc_folder = 'speech_commands/SpeechCommands/speech_commands_v0.02'
HASH_DIVIDER = '_nohash_'
SAMPLE_RATE = 16000

label2idx = {
    'hey': 0,
    'pepper': 1,
    '_silence_': 2,
    '_unknown_': 3
}

class Datapoint(NamedTuple):
    audio: torch.Tensor
    sample_rate: int
    label: int

def _load_list(root, *filenames):
    output = []
    for filename in filenames:
        filepath = os.path.join(root, filename)
        with open(filepath) as fileobj:
            output += [os.path.normpath(os.path.join(root, line.strip())) for line in fileobj]
    return output


class KWS_Dataset(Dataset):
    def __init__(self, subset: Literal['training', 'validation'], seed: int | None = None):
        assert subset is None or subset in ["training", "validation"], (
            "When `subset` not None, it must take a value from "
            "{'training', 'validation'}."
        )
        
        files: list[tuple[str, str]] = []
        for keyword in keywords:
            for directory in directories:
                files += [(os.path.join(dataset_folder, directory, keyword, filename), keyword) for filename in os.listdir(os.path.join(dataset_folder, directory, keyword))]

        random.seed(seed)
        random.shuffle(files)
        cutoff = int(len(files) * 0.75)

        if subset == "training":
            files = files[:cutoff]
        elif subset == "validation":
            files = files[cutoff:]

        sc_files = [f for f in sorted(str(p) for p in Path(sc_folder).glob('*/*.wav')) if HASH_DIVIDER in f]

        include = None
        exclude = None
        if subset == "validation" or subset is None:
            include = _load_list(sc_folder, 'validation_list.txt')
        if subset == "training" or subset is None:
            exclude = _load_list(sc_folder, 'validation_list.txt')
        
        if(exclude is not None):
            sc_files = [f for f in sc_files if os.path.normpath(f) not in exclude]
        if(include is not None):
            sc_files = [f for f in sc_files if os.path.normpath(f) in include]

        files += [(f, '_unknown_') for f in sc_files]
        
        self.dataset: list[Datapoint] = []
        for path, label in files:
            audio_samples, rate = load_audio(path) # loading audio
            # audio_sample (1, len)
            
            if audio_samples.shape[1] != SAMPLE_RATE:
                pad_length = SAMPLE_RATE-audio_samples.shape[1]
                audio_samples = nn.functional.pad(audio_samples, (0,pad_length)) # pad the end of the audio until 1 second
                # (1, 16000)
            self.dataset.append(Datapoint(audio_samples, rate, label2idx[label])) 


        if subset=='training':
            silence_clips_sc = [
                'dude_miaowing.wav',
                'white_noise.wav',
                'exercise_bike.wav',
                'doing_the_dishes.wav',
                'pink_noise.wav',
            ]
            silence_clips_gen = [
                'bathroom_1.wav',
                'crowd_1_quieter.wav',
                'fan_1.wav',
                'homeoffice_2.wav',
                'office_1_quieter.wav',
                'static_1.wav',
            ]
        elif subset=='validation':
            silence_clips_sc = [
                'running_tap.wav',
            ]
            silence_clips_gen = [
                'cafeteria_1_quieter.wav',
                'homeoffice_1_quieter.wav',
                'fan_2.wav',
            ]
        else:
            silence_clips_sc = []
            silence_clips_gen = []

        for i in silence_clips_sc: 
            audio_samples, rate = load_audio(os.path.join(sc_folder, '_background_noise_', i))
            for start in range(0,
                            audio_samples.shape[1] - SAMPLE_RATE,
                            SAMPLE_RATE//2):
                audio_segment = audio_samples[0, start:start + SAMPLE_RATE]
                self.dataset.append(Datapoint(audio_segment.unsqueeze(0), rate, label2idx['_silence_']))   

        for i in silence_clips_gen: 
            audio_samples, rate = load_audio(os.path.join(dataset_folder, 'noise', i))
            for start in range(0,
                            audio_samples.shape[1] - SAMPLE_RATE,
                            SAMPLE_RATE//2):
                audio_segment = audio_samples[0, start:start + SAMPLE_RATE]
                self.dataset.append(Datapoint(audio_segment.unsqueeze(0), rate, label2idx['_silence_']))   
        
    def __getitem__(self, n: int):
        audio, rate, label = self.dataset[n]
        return audio, rate, label

    def __len__(self):
        return len(self.dataset)

In [4]:
seed = 42

train_dataset = KWS_Dataset(subset='training', seed=seed)
valid_dataset = KWS_Dataset(subset='validation', seed=seed)

In [5]:
class_weights = [1,1,4.6,1/17]
sample_weights = [0] * len(train_dataset)
for n, (data,rate,label_name) in enumerate(train_dataset):
    class_weight = class_weights[label_name]
    sample_weights[n] = class_weight
sampler = WeightedRandomSampler(sample_weights, num_samples=len(sample_weights),replacement=True)

#Data processing
def data_processing(data: list[torch.Tensor]):
    waveforms = []
    labels = []    
    for batch in data:
        waveforms.append(batch[0].squeeze(0)) #after squeeze => (audio_len) tensor # remove batch dim
        labels.append(batch[2])             
    waveform_padded = nn.utils.rnn.pad_sequence(waveforms, batch_first=True)     
    output_batch = {'waveforms': waveform_padded, 
             'labels': torch.tensor(labels),
             }
    return output_batch

# data loading
trainloader = DataLoader(train_dataset, collate_fn=lambda x: data_processing(x), batch_size=batch_size, sampler=sampler)

validloader = DataLoader(valid_dataset, collate_fn=lambda x: data_processing(x), batch_size=batch_size)

In [6]:
class SpeechCommand(LightningModule):
    def __init__(self): 
        super().__init__()
        self.mel_layer: MelSpectrogram       
        self.criterion: nn.CrossEntropyLoss
        self.linearlayer: nn.Linear
        self.validation_step_outputs = []

    def training_step(self, batch, batch_idx):
        outputs, spec = self(batch['waveforms']) 
        #return outputs [2D] for calculate loss, return spec [3D] for visual
        loss = self.criterion(outputs, batch['labels'].long())

        acc = sum(outputs.argmax(-1) == batch['labels'])/outputs.shape[0] #batch wise
        
        self.log('Train/acc', acc, on_step=False, on_epoch=True)
        self.log('Train/Loss', loss, on_step=False, on_epoch=True)
        #log(graph title, take acc as data, on_step: plot every step, on_epch: plot every epoch)
        return loss

     
    def optimizer_step(self, epoch, batch_idx, optimizer, optimizer_idx,
                       optimizer_closure, on_tpu, using_native_amp, using_lbfgs):
        
        optimizer.step(closure=optimizer_closure)
        with torch.no_grad():
            torch.clamp_(self.mel_layer.mel_basis, 0, 1)
        #after optimizer step, do clamp function on mel_basis         

        
    def validation_step(self, batch, batch_idx):               
        outputs, spec = self(batch['waveforms'])
        loss = self.criterion(outputs, batch['labels'].long())        
       
        self.log('Validation/Loss', loss, on_step=False, on_epoch=True)                     
        output_dict = {'outputs': outputs,
                       'labels': batch['labels']}      
        self.validation_step_outputs.append(loss)  
        return output_dict

    
    def on_validation_epoch_end(self):
        pred = []
        label = []
        for output in self.validation_step_outputs:
            pred.append(output['outputs'])
            label.append(output['labels'])
        label = torch.cat(label, 0)
        pred = torch.cat(pred, 0)
        acc = sum(pred.argmax(-1) == label)/label.shape[0]
        
        self.log('Validation/acc', acc, on_step=False, on_epoch=True)    

    
    def configure_optimizers(self):
        model_param = []
        for name, params in self.named_parameters():
            if 'mel_layer.' in name:
                pass
            else:
                model_param.append(params)          

        optimizer = optim.SGD(model_param, lr=1e-3, momentum= 0.9, weight_decay= 0.001)
        return [optimizer]

In [7]:
mel_layer = MelSpectrogram(sr=16000, 
                           n_fft=480,
                           win_length=None,
                           n_mels=n_mels, 
                           hop_length=160,
                           window='hann',
                           center=True,
                           pad_mode='reflect',
                           power=2.0,
                           htk=False,
                           fmin=0.0,
                           fmax=None,
                           norm=1,
                           trainable_mel=False,
                           trainable_STFT=False,
                           verbose=True)

STFT kernels created, time used = 0.0043 seconds
STFT filter created, time used = 0.0004 seconds
Mel filter created, time used = 0.0004 seconds


In [8]:
class Linearmodel_nnAudio(SpeechCommand):
    def __init__(self): 
        super().__init__()
        self.mel_layer = mel_layer       
        self.criterion = nn.CrossEntropyLoss()
        self.linearlayer = nn.Linear(input_dim, output_dim)
      
    def forward(self, x): 
        #x: 2D [B, 16000]
        spec = self.mel_layer(x)  
        #spec: 3D [B, F40, T101]
        
        spec = torch.log(spec+1e-10)
        flatten_spec = torch.flatten(spec, start_dim=1) 
        #flatten_spec: 2D [B, F*T(40*101)] 
        #start_dim: flattening start from 1st dimention
        
        out = self.linearlayer(flatten_spec) 
        #out: 2D [B,number of class(12)] 
                               
        return out, spec 

model_nnAudo = Linearmodel_nnAudio()
model_nnAudo = model_nnAudo.to(device)

In [9]:
trainer = Trainer(max_epochs=max_epochs,
    check_val_every_n_epoch= check_val_every_n_epoch,
    num_sanity_val_steps=num_sanity_val_steps)

trainer.fit(model_nnAudo, trainloader, validloader)

GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name        | Type             | Params
-------------------------------------------------
0 | mel_layer   | MelSpectrogram   | 0     
1 | criterion   | CrossEntropyLoss | 0     
2 | linearlayer | Linear           | 48.5 K
-------------------------------------------------
48.5 K    Trainable params
0         Non-trainable params
48.5 K    Total params
0.194     Total estimated model params size (MB)


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

/opt/homebrew/lib/python3.11/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:441: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.


RuntimeError: Invalid buffer size: 95.37 GB