In [None]:
# Original
# https://colab.research.google.com/drive/1qFt8qxKtM05hRuRxsA1Lq4JtP7tstcgc

In [None]:
%matplotlib inline

In [None]:
%%capture
# !pip install torch==1.7.0+cu101 torchaudio==0.7.0 -f https://download.pytorch.org/whl/torch_stable.html

from IPython.display import Audio

## PyTorch things
import torch
import torchaudio
import torch.nn.functional as F

## Other libs
from urllib.request import urlopen
import matplotlib.pyplot as plt
import glob
import os
import random
from tqdm.notebook import tqdm
import torchsummary
import numpy as np
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import normalize
import pandas as pd
import seaborn as sn

In [None]:
# basic random seed
import os
import random
import numpy as np

DEFAULT_RANDOM_SEED = 2021

def seedBasic(seed=DEFAULT_RANDOM_SEED):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)

# tensorflow random seed
import tensorflow as tf

def seedTF(seed=DEFAULT_RANDOM_SEED):
    tf.random.set_seed(seed)

# torch random seed

def seedTorch(seed=DEFAULT_RANDOM_SEED):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# basic + tensorflow + torch
def seedEverything(seed=DEFAULT_RANDOM_SEED):
    seedBasic(seed)
    seedTF(seed)
    seedTorch(seed)

seedEverything(1004)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
from torchaudio.datasets import SPEECHCOMMANDS
import os

import pickle
# with open('/content/blind_test.pkl', 'rb') as f:
#   blind = pickle.load(f)
# blind_set = [test_set[i] for i in blind]
# test_set = blind_set

class SubsetSC(SPEECHCOMMANDS):
    def __init__(self, subset: str = None):
        super().__init__("./", download=True)

        def load_list(filename):
            filepath = os.path.join(self._path, filename)
            with open(filepath) as fileobj:
                return sorted([os.path.normpath(os.path.join(self._path, line.strip())) for line in fileobj])

        if subset == "testing":
            self._walker = load_list("testing_list.txt")
            # test_list = [int(line) for line in urlopen('https://www.cse.iitb.ac.in/~pjyothi/cs753/test_list.txt')]
            with open('/content/blind_test.pkl', 'rb') as f:
              blind = pickle.load(f)
            self._walker = ['./' + self._walker[i] for i in blind]
        elif subset == "training":
            excludes = load_list("validation_list.txt") + load_list("testing_list.txt")
            excludes = set(excludes)
            self._walker = [w for w in self._walker if w[2:] not in excludes]
            train_list = [int(line) for line in urlopen('https://www.cse.iitb.ac.in/~pjyothi/cs753/train_list.txt')]
            self._walker = [self._walker[i] for i in train_list]

# Create training and testing split of the data. We do not use validation in this tutorial.
train_set = SubsetSC("training")
test_set = SubsetSC("testing")

In [None]:
classes = sorted(os.listdir('./SpeechCommands/speech_commands_v0.02'))
classes.remove("LICENSE")
classes.remove("README.md")
classes.remove("_background_noise_")
classes.remove("testing_list.txt")
classes.remove("validation_list.txt")
classes.remove('.DS_Store')

In [None]:
waveform, sample_rate, label, speaker_id, utterance_number = train_set[0]
# labels = sorted(list(set(datapoint[2] for datapoint in train_set)))
mfcc = torchaudio.transforms.MFCC(n_mfcc=12, log_mels=True)(waveform)
print(mfcc.shape)

torch.Size([1, 12, 81])




In [None]:
class SpeechDataset(torch.utils.data.Dataset):
  
  def __init__(self, classes, file_list):
    
    self.classes = classes
    
    # create a map from class name to integer
    self.class_to_int = dict(zip(classes, range(len(classes))))
    
    # store the file names
    self.samples = file_list
    
    # store our MFCC transform
    self.mfcc_transform = torchaudio.transforms.MFCC(n_mfcc=12, log_mels=True)
    
  def __len__(self):
    return len(self.samples)
    
  def __getitem__(self,i):
    with torch.no_grad():
      # load a normalized waveform
      waveform, sample_rate, label, speaker_id, utterance_number = self.samples[i]
      
      # if the waveform is too short (less than 1 second) we pad it with zeroes
      if waveform.shape[1] < 16000:
        waveform = F.pad(input=waveform, pad=(0, 16000 - waveform.shape[1]), mode='constant', value=0)
      
      # then, we apply the transform
      mfcc = self.mfcc_transform(waveform).squeeze(0).transpose(0,1)
    
    # return the mfcc coefficient with the sample label
    return mfcc, self.class_to_int[label]

In [None]:
if device == "cuda":
    num_workers = 1
    pin_memory = True
else:
    num_workers = 0
    pin_memory = False

batch_size = 256
train_dataset = SpeechDataset(classes,train_set)
train_dl = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=num_workers
)

test_dataset = SpeechDataset(classes,test_set)
test_dl = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=num_workers
)

In [None]:
class SpeechLSTM(torch.nn.Module):
  
  def __init__(self):
    super(SpeechLSTM, self).__init__()
    
    self.lstm = torch.nn.LSTM(
        input_size=12, num_layers=2, hidden_size=350, batch_first=True, dropout=0.2
    )
    
    self.out_layer = torch.nn.Linear(350,35)
    
  def forward(self, x):
    
    out, _ = self.lstm(x)
    
    x = self.out_layer(out[:,-1,:])
    
    return F.log_softmax(x, dim=1)

In [None]:
def train(model, epoch, log_interval):
    model.train()
    for batch_idx, (data, target) in enumerate(train_dl):

        model.zero_grad()

        data = data.to(device)
        target = target.to(device)
        # print(data.shape)

        output = model(data)
        # print(output.argmax(dim=-1))
        # print(target)

        # negative log-likelihood for a tensor of size (batch x 1 x n_output)
        loss = F.nll_loss(output, target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # print training stats
        if batch_idx % log_interval == 0:
            print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_dl.dataset)} ({100. * batch_idx / len(train_dl):.2f}%)]\tLoss: {loss.item():.6f}")

        # update progress bar
        pbar.update(pbar_update)
        # record loss
        losses.append(loss.item())

In [None]:
def number_of_correct(pred, target):
    # count number of correct predictions
    return pred.squeeze().eq(target).sum().item()

def get_likely_index(tensor):
    # find most likely label index for each element in the batch
    return tensor.argmax(dim=-1)

def test(model, epoch):
    model.eval()
    correct = 0
    for data, target in test_dl:

        data = data.to(device)
        target = target.to(device)

        output = model(data)

        pred = get_likely_index(output)
        correct += number_of_correct(pred, target)

        # update progress bar
        pbar.update(pbar_update)

    print(f"\nTest Epoch: {epoch}\tAccuracy: {correct}/{len(test_dl.dataset)} ({100. * correct / len(test_dl.dataset):.2f}%)\n")

In [None]:
model = SpeechLSTM()
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

In [None]:
log_interval = 20
n_epoch = 20

pbar_update = 1 / (len(train_dl) + len(test_dl))
losses = []

with tqdm(total=n_epoch) as pbar:
    for epoch in range(1, n_epoch + 1):
        train(model, epoch, log_interval)
        test(model, epoch)
        scheduler.step()

  0%|          | 0/20 [00:00<?, ?it/s]


Test Epoch: 1	Accuracy: 598/3000 (19.93%)


Test Epoch: 2	Accuracy: 1148/3000 (38.27%)


Test Epoch: 3	Accuracy: 1598/3000 (53.27%)


Test Epoch: 4	Accuracy: 1730/3000 (57.67%)


Test Epoch: 5	Accuracy: 1817/3000 (60.57%)


Test Epoch: 6	Accuracy: 1939/3000 (64.63%)


Test Epoch: 7	Accuracy: 1986/3000 (66.20%)


Test Epoch: 8	Accuracy: 2008/3000 (66.93%)


Test Epoch: 9	Accuracy: 2000/3000 (66.67%)


Test Epoch: 10	Accuracy: 2016/3000 (67.20%)


Test Epoch: 11	Accuracy: 2069/3000 (68.97%)


Test Epoch: 12	Accuracy: 2068/3000 (68.93%)


Test Epoch: 13	Accuracy: 2088/3000 (69.60%)


Test Epoch: 14	Accuracy: 2085/3000 (69.50%)


Test Epoch: 15	Accuracy: 2086/3000 (69.53%)


Test Epoch: 16	Accuracy: 2097/3000 (69.90%)


Test Epoch: 17	Accuracy: 2095/3000 (69.83%)


Test Epoch: 18	Accuracy: 2101/3000 (70.03%)


Test Epoch: 19	Accuracy: 2102/3000 (70.07%)


Test Epoch: 20	Accuracy: 2106/3000 (70.20%)



In [None]:
model.eval()
correct = 0
for data, target in test_dl:

    data = data.to(device)
    target = target.to(device)

    output = model(data)

    pred = get_likely_index(output)
    correct += number_of_correct(pred, target)

    # update progress bar
    pbar.update(pbar_update)

print(f"{100. * correct / len(test_dl.dataset):.2f}")

70.20
