# Keyword Spotting System with Speech Command Dataset

> Install all library we need if you don't have in your pc

In [None]:
# Library was used: numpy, torch, torchaudio, matplotlib, sounddevice, scipy, IPython, tqdm
!pip3 install numpy torch torchaudio matplotlib sounddevice scipy IPython tqdm

> Run all cells below cell by cell

In [None]:
import torch 
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio

import numpy as np
import matplotlib.pyplot as plt
import IPython.display as ipd
from tqdm import tqdm as tqdm
import time

In [None]:
# Run this project on GPU if you have it, or on CPU otherwise
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

> **When you run this cell below, Speech Command (1.3 GB) will be downloaded**

In [None]:
from torch.utils.data import Dataset, DataLoader 
from torchaudio.datasets import SPEECHCOMMANDS 
import os

# Create dataset class
class SubsetSC(SPEECHCOMMANDS):
    def __init__(self, subset: str = None):
        super().__init__("./", download=True)

        def load_list(filename):
            filepath = os.path.join(self._path, filename)
            with open(filepath) as fileobj:
                return [os.path.join(self._path, line.strip()) for line in fileobj]

        if subset == "validation":
            self._walker = load_list("validation_list.txt")
        elif subset == "testing":
            self._walker = load_list("testing_list.txt")
        elif subset == "training":
            excludes = load_list("validation_list.txt") + load_list("testing_list.txt")
            excludes = set(excludes)
            self._walker = [w for w in self._walker if w not in excludes]


# Create training set and test set from Speech Command Dataset
train_set = SubsetSC("training")
test_set = SubsetSC("testing")

waveform, sample_rate, label, speaker_id, utterance_number = train_set[0]

In [None]:
print(len(train_set))
print(len(test_set))

In [None]:
print("Shape of waveform: {}".format(waveform.size()))
print("Sample rate of waveform: {}".format(sample_rate))
plt.plot(waveform.t().numpy());

In [None]:
# All 35 keywords will be recognise by this KWS system
labels = ['backward','bed','bird','cat','dog','down','eight','five','follow','forward','four','go','happy','house','learn','left','marvin','nine','no','off','on','one','right','seven','sheila','six','stop','three','tree','two','up','visual','wow','yes','zero']
labels

In [None]:
# We will reduce sample rate of all audio files to 8000 Hz
new_sample_rate = 8000

#'transform' function below will do above task
transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=new_sample_rate)

In [None]:
# Create a function that returns the index corresponding to the label
def label_to_index(word):
    return torch.tensor(labels.index(word))

# Create a function that returns the label corresponding to the index
def index_to_label(index):
    # tra ve lables tuong ung voi chi so
    return labels[index]

In [None]:
# Make all tensor in a batch the same length by padding with zeros
def pad_sequence(batch):
    batch = [item.t() for item in batch]
    batch = torch.nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=0.)
    return batch.permute(0, 2, 1)

# Create function to form the data before move to data loader
def collate_fn(batch):

    # A data tuple has the form:
    # waveform, sample_rate, label, speaker_id, utterance_number
    tensors, targets = [], []

    # Gather in lists, and encode labels as indices
    for waveform, _, label, *_ in batch:
        tensors += [waveform]
        targets += [label_to_index(label)]

    # Group the list of tensors into a batched tensor
    tensors = pad_sequence(tensors)
    targets = torch.stack(targets)

    return tensors, targets

# Number data point in a batch
batch_size = 256

if device == "cuda":
    num_workers = 1
    pin_memory = True
else:
    num_workers = 0
    pin_memory = False
# Create training data loader
train_loader = torch.utils.data.DataLoader(
    train_set,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=num_workers,
    pin_memory=pin_memory,
)

# Create testing data loader
test_loader = torch.utils.data.DataLoader(
    test_set,
    batch_size=batch_size,
    shuffle=False,
    drop_last=False,
    collate_fn=collate_fn,
    num_workers=num_workers,
    pin_memory=pin_memory,
)

In [None]:
# Create model class which named 'M5'
class M5(nn.Module):
    def __init__(self, n_input=1, n_output=35, stride=16, n_channel=32):
        super().__init__()
        self.conv1 = nn.Conv1d(n_input, n_channel, kernel_size=80, stride=stride)
        self.bn1 = nn.BatchNorm1d(n_channel)
        self.pool1 = nn.MaxPool1d(4)
        self.conv2 = nn.Conv1d(n_channel, n_channel, kernel_size=3)
        self.bn2 = nn.BatchNorm1d(n_channel)
        self.pool2 = nn.MaxPool1d(4)
        self.conv3 = nn.Conv1d(n_channel, 2 * n_channel, kernel_size=3)
        self.bn3 = nn.BatchNorm1d(2 * n_channel)
        self.pool3 = nn.MaxPool1d(4)
        self.conv4 = nn.Conv1d(2 * n_channel, 2 * n_channel, kernel_size=3)
        self.bn4 = nn.BatchNorm1d(2 * n_channel)
        self.pool4 = nn.MaxPool1d(4)
        self.fc1 = nn.Linear(2 * n_channel, n_output)

    def forward(self, x):
        x = self.conv1(x)        
        x = F.relu(self.bn1(x))
        x = self.pool1(x)
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        x = self.pool2(x)        
        x = self.conv3(x)
        x = F.relu(self.bn3(x))
        x = self.pool3(x)        
        x = self.conv4(x)
        x = F.relu(self.bn4(x))
        x = self.pool4(x)
        x = F.avg_pool1d(x, x.shape[-1])
        x = x.permute(0, 2, 1)
        x = self.fc1(x)
        return F.log_softmax(x, dim=2)

# Create an object of M5 class
model = M5(n_input = 1, n_output = len(labels))
model.to(device)
print(model)

# Function counts number of parametes in model
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

n = count_parameters(model)
print("Number of parameters: %s" % n)

In [None]:
# Create optimizer with intial learning rate is 0.01 and weight decay is 0.0001
optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=0.0001)

# Reduce the learning after 20 epochs by a factor of 10
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)  

In [None]:
def train(model, epoch, log_interval):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):

        data = data.to(device)
        target = target.to(device)

        # Apply transform and model on whole batch directly on device
        data = transform(data)
        output = model(data)

        # Negative log-likelihood for a tensor of size (batch x 1 x n_output)
        loss = F.nll_loss(output.squeeze(), target)

        optimizer.zero_grad()
        
        loss.backward()
        optimizer.step()

        # Print training stats
        if batch_idx % log_interval == 0:
            print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")

        # Update progress bar
        pbar.update(pbar_update)
        # Record loss
        losses.append(loss.item())

In [None]:
def number_of_correct(pred, target):
    # Count number of correct predictions
    return pred.squeeze().eq(target).sum().item()


def get_likely_index(tensor):
    # Find most likely label index for each element in the batch
    return tensor.argmax(dim=-1)


def test(model, epoch):
    model.eval()
    correct = 0
    for data, target in test_loader:

        data = data.to(device)
        target = target.to(device)

        # Apply transform and model on whole batch directly on device
        data = transform(data)
        output = model(data)

        pred = get_likely_index(output)
        correct += number_of_correct(pred, target)

        # Update progress bar
        pbar.update(pbar_update)

    print(f"\nTest Epoch: {epoch}\tAccuracy: {correct}/{len(test_loader.dataset)} ({100. * correct / len(test_loader.dataset):.0f}%)\n")

# Training
When you run this cell below, you will start trainning this KWS system  
It takes about 25 minutes for 10 epochs

In [None]:
log_interval = 20
n_epoch = 10

pbar_update = 1 / (len(train_loader) + len(test_loader))
losses = []

# The transform needs to live on the same device as the model and the data.
transform = transform.to(device)
with tqdm(total=n_epoch) as pbar:
    for epoch in range(1, n_epoch + 1):
        train(model, epoch, log_interval)
        test(model, epoch)
        scheduler.step()


In [None]:
# Let's plot the training loss versus the number of iteration.
plt.plot(losses);
plt.title("Training loss");
plt.ylabel("Loss")
plt.xlabel("Batch")

In [None]:
import datetime
datetimeObj = datetime.datetime.now()
datetimeStr = datetimeObj.strftime("%d-%b-%Y-%H:%M")
model_dir = f"./Model_State_Dict/model_state_{n_epoch}epoch_{datetimeStr}.pt"

#Save model state after finish training
torch.save(model.state_dict(), model_dir)

In [None]:
def predict(tensor):
    # Use the model to predict the label of the waveform
    tensor = tensor.to(device)
    tensor = transform(tensor)
    tensor = model(tensor.unsqueeze(0))
    tensor = get_likely_index(tensor)
    tensor = index_to_label(tensor.squeeze())
    return tensor
#error_idx = [1095,]
random_idx = np.random.randint(0,105828)
print(random_idx)
waveform, sample_rate, utterance, *_ = train_set[random_idx]

print(f"Expected: {utterance}. Predicted: {predict(waveform)}.")

ipd.Audio(waveform.numpy(), rate=sample_rate)

# Test this system with your real voice
1. Run this cell below
2. After display "Start recording...", say one of 35 keywords in 1 second
3. Check the result

In [None]:
import sounddevice as sd
from scipy.io.wavfile import write

fs = 16000 #sample rate
seconds = 1 #duration of recording

start = time.time()
end = time.time()

while (end-start) < 20:   
    ipd.clear_output()
    end = time.time()
    print("start recording...")
    myrecording = sd.rec(int(seconds * fs), samplerate=fs, channels=1)
    sd.wait()
    ipd.clear_output()
    tensor_record = torch.tensor(myrecording.transpose())
    x = predict(tensor_record)
    print(f"Keyword: {x}")
    time.sleep(2)