In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio
from scipy.io import wavfile

import matplotlib.pyplot as plt
import IPython.display as ipd

from tqdm import tqdm

from torchaudio.datasets import SPEECHCOMMANDS
import os
import glob

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [2]:
labels = ['forward', 'backward', 'up', 'down',
          'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'zero',
          'left', 'right', 'go', 'stop', 'yes', 'no', 'on', 'off', 'unknown']
# The following dataset labels are considered unkonwn
# unknown = ['bed', 'bird', 'cat', 'dog', 'follow', 'happy', 'house', 'learn', 'marvin',
#            'sheila', 'visual', 'wow', 'tree']

In [3]:
SPEECH_DATA_ROOT = "/Users/invincibleo/Leo/Projects/Datasets/SpeechCommands"
# Load the speech command dataset from pytorch dataset
class SubsetSC(SPEECHCOMMANDS):
    def __init__(self, subset: str = None):
        super().__init__(os.path.dirname(SPEECH_DATA_ROOT), download=True)

        def load_list(filename):
            filepath = os.path.join(self._path, filename)
            with open(filepath) as fileobj:
                return [os.path.normpath(os.path.join(self._path, line.strip())) for line in fileobj]

        if subset == "validation":
            self._walker = load_list("validation_list.txt")
        elif subset == "testing":
            self._walker = load_list("testing_list.txt")
        elif subset == "training":
            excludes = load_list("validation_list.txt") + load_list("testing_list.txt")
            excludes = set(excludes)
            self._walker = [w for w in self._walker if w not in excludes]

In [4]:
# Create training and testing split of the data. We do not use validation in this tutorial.
train_set = SubsetSC("training")
valid_set = SubsetSC("validation")
test_set = SubsetSC("testing")

In [None]:
waveform, sample_rate, label, speaker_id, utterance_number = train_set[0]
print("Shape of waveform: {}".format(waveform.size()))
print("Sample rate of waveform: {}".format(sample_rate))

plt.plot(waveform.t().numpy())

In [None]:
print("Number of labels:", len(labels))
print("Number of training examples:", len(train_set))
print("Number of validation examples:", len(valid_set))
print("Number of testing examples:", len(test_set))

In [5]:
def label_to_index(word):
    if word in labels:
        return torch.tensor(labels.index(word))
    else:
        return torch.tensor(labels.index("unknown"))

def index_to_label(index):
    # Return the word corresponding to the index in labels
    # This is the inverse of label_to_index
    return labels[index]

# Test label "on"
index = label_to_index("on")
word = index_to_label(index)
print(index, word)

# Test label "unknown"
index = label_to_index("bird")
word = index_to_label(index)
print(index, word)

tensor(20) on
tensor(22) unknown


In [6]:
def pad_sequence(batch):
    # Make all tensor in a batch the same length by padding with zeros
    batch = [item.t() for item in batch]
    batch = torch.nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=0.)
    return batch.permute(0, 2, 1)

# MFCC feature extraction and save to disk
def extract_mfcc(waveform):
    mfcc = torchaudio.transforms.MFCC(
        sample_rate=sample_rate,
        n_mfcc=16,
        melkwargs={"n_fft": int(0.03*sample_rate), "hop_length": int(0.03*0.5*sample_rate), "n_mels": 64,
                   "window_fn": torch.hamming_window, "center": False, "pad_mode": "reflect"},
    )
    return mfcc(waveform)

def collate_fn_extract_feature(batch):

    # A data tuple has the form:
    # waveform, sample_rate, label, speaker_id, utterance_number

    tensors, targets, file_name = [], [], []

    # Gather in lists, and encode labels as indices
    for waveform, sample_rate, label, speaker_id, utterance_number in batch:
        tensors += [waveform]
        targets += [label_to_index(label)]
        name = "mfcc_" + str(speaker_id) + "_" + str(utterance_number) + ".pt"
        file_name += [name]

    # Group the list of tensors into a batched tensor
    tensors = pad_sequence(tensors)
    # Extract MFCC features
    tensors = extract_mfcc(tensors)
    targets = torch.stack(targets)

    return tensors, targets, file_name

batch_size = 1

if device == "cuda":
    num_workers = 1
    pin_memory = True
else:
    num_workers = 0
    pin_memory = False

train_loader = torch.utils.data.DataLoader(
    train_set,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_fn_extract_feature,
    num_workers=num_workers,
    pin_memory=pin_memory,
)
valid_loader = torch.utils.data.DataLoader(
    valid_set,
    batch_size=batch_size,
    shuffle=False,
    drop_last=False,
    collate_fn=collate_fn_extract_feature,
    num_workers=num_workers,
    pin_memory=pin_memory,
)

In [None]:
mfcc_feature_dir = os.path.join(SPEECH_DATA_ROOT, "mfcc")
os.makedirs(mfcc_feature_dir, exist_ok=True)
for feature, label, file_name in tqdm(train_loader):
    label = index_to_label(label[0])
    os.makedirs(os.path.join(mfcc_feature_dir, label), exist_ok=True)
    feature_path = os.path.join(mfcc_feature_dir, label, file_name[0])
    if not os.path.exists(feature_path):
        torch.save(feature, feature_path)

for feature, label, file_name in tqdm(valid_loader):
    label = index_to_label(label[0])
    os.makedirs(os.path.join(mfcc_feature_dir, label), exist_ok=True)
    feature_path = os.path.join(mfcc_feature_dir, label, file_name[0])
    if not os.path.exists(feature_path):
        torch.save(feature, feature_path)

In [7]:
SPEECH_DATA_ROOT = "/Users/invincibleo/Leo/Projects/Datasets/SpeechCommands"
class MFCC(SPEECHCOMMANDS):
    def __init__(self, subset: str = None):
        super().__init__(os.path.dirname(SPEECH_DATA_ROOT), download=False)

        def load_list(filename):
            filepath = os.path.join(self._path, filename)
            feature_path_list = []
            with open(filepath) as fileobj:
                for line in fileobj:
                    line = line.strip().replace("_nohash_", "_")
                    mfcc_file_name = "mfcc_" + line.split("/")[-1].split(".")[0] + ".pt"
                    mfcc_file_name = os.path.join(line.split("/")[0], mfcc_file_name)
                    feature_path = os.path.join(SPEECH_DATA_ROOT, "mfcc", mfcc_file_name)
                    feature_path_list.append(feature_path)
            return feature_path_list

        if subset == "validation":
            self._walker = load_list("validation_list.txt")
        elif subset == "testing":
            self._walker = load_list("testing_list.txt")
        elif subset == "training":
            excludes = set(load_list("validation_list.txt") + load_list("testing_list.txt"))
            walker = sorted(str(p) for p in glob.glob(os.path.join(SPEECH_DATA_ROOT, "mfcc", "*", "*.pt")))
            self._walker = [
                w for w in walker
                if os.path.normpath(w) not in excludes
            ]

    def __getitem__(self, n: int):
        fileid = self._walker[n]
        feature = torch.load(fileid)
        label = fileid.split("/")[-2]
        return feature, label
    

train_set = MFCC("training")
valid_set = MFCC("validation")

In [8]:
def collate_fn(batch):
    tensors, targets = [], []
    for mfcc, label in batch:
        mfcc = torch.squeeze(mfcc, (0, 1))
        tensors += [mfcc]
        targets += [label_to_index(label)]

    tensors = pad_sequence(tensors)
    tensors = tensors.permute(0, 2, 1)
    targets = torch.stack(targets)
    return tensors, targets

# Construct the dataloaders
batch_size = 512
train_loader = torch.utils.data.DataLoader(
    train_set,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_fn,
    drop_last=True,
)
valid_loader = torch.utils.data.DataLoader(
    valid_set,
    batch_size=batch_size,
    shuffle=False,
    collate_fn=collate_fn,
    drop_last=False,
)
# test_loader = torch.utils.data.DataLoader(
#     test_set,
#     batch_size=batch_size,
#     shuffle=False,
#     collate_fn=collate_fn,
#     drop_last=False,
# )

In [None]:
# # Count the number of training examples per label
# labels_train = []
# for _, label in train_loader:
#     labels_train.append(label)

# labels_train = torch.stack(labels_train)
# labels_train = labels_train.view(-1)

# print("Shape of labels_train:", labels_train.size())

# # Count the number of training examples per label
# train_count = torch.bincount(labels_train).float()
# print("Count of labels:", train_count)

# # Plot the count of train examples per label
# plt.figure(figsize=(10, 5))
# plt.bar(torch.arange(len(train_count)), train_count.numpy())
# plt.xticks(torch.arange(len(train_count)), labels, rotation=45)
# plt.ylabel("Count")
# plt.xlabel("Label")
# plt.title("Number of training examples per label")
# plt.show()

In [None]:
class LSTM(nn.Module):
    def __init__(self, n_input=16, n_output=23, n_channel=64):
        super().__init__()
        self.LSTM1 = nn.LSTM(n_input, n_channel, num_layers=1, batch_first=True, bidirectional=True)
        self.fc1 = nn.Linear(n_channel*2, n_output)
        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)

    def forward(self, x):
        x, (h_n, c_n) = self.LSTM1(x)
        x = self.fc1(x)
        x = self.global_avg_pool(torch.transpose(x, 1, 2)).squeeze(2)
        return F.log_softmax(x, dim=1)

mfcc, label = train_set[0]
model = LSTM(n_input=mfcc.shape[-2], n_output=len(labels))
model.to(device)
print(model)


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


n = count_parameters(model)
print("Number of parameters: %s" % n)

In [9]:
class CNN(nn.Module):
    def __init__(self, n_input=16, n_output=23, n_channel=32):
        super().__init__()
        self.cnn1 = nn.Conv1d(n_input, n_channel, kernel_size=3)
        self.bn1 = nn.BatchNorm1d(n_channel)
        self.pool1 = nn.MaxPool1d(3)
        self.cnn2 = nn.Conv1d(n_channel, 2*n_channel, kernel_size=3)
        self.bn2 = nn.BatchNorm1d(2*n_channel)
        self.pool2 = nn.MaxPool1d(3)
        self.conv3 = nn.Conv1d(2*n_channel, 4 * n_channel, kernel_size=3)
        self.bn3 = nn.BatchNorm1d(4 * n_channel)
        self.pool3 = nn.MaxPool1d(3)
        self.fc1 = nn.Linear(4 * n_channel, n_output)

    def forward(self, x):
        x = torch.permute(x, [0, 2, 1])
        x = self.pool1(F.relu(self.bn1(self.cnn1(x))))
        x = self.pool2(F.relu(self.bn2(self.cnn2(x))))
        x = self.pool3(F.relu(self.bn3(self.conv3(x))))
        x = x.squeeze(-1)
        x = self.fc1(x)
        return F.log_softmax(x, dim=1)

model = CNN(n_input=16, n_output=len(labels))
model.to(device)
print(model)

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

n = count_parameters(model)
print("Number of parameters: %s" % n)

CNN(
  (cnn1): Conv1d(16, 32, kernel_size=(3,), stride=(1,))
  (bn1): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool1): MaxPool1d(kernel_size=3, stride=3, padding=0, dilation=1, ceil_mode=False)
  (cnn2): Conv1d(32, 64, kernel_size=(3,), stride=(1,))
  (bn2): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool2): MaxPool1d(kernel_size=3, stride=3, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv1d(64, 128, kernel_size=(3,), stride=(1,))
  (bn3): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool3): MaxPool1d(kernel_size=3, stride=3, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=128, out_features=23, bias=True)
)
Number of parameters: 35895


In [10]:
# optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=0.0001)
# scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)  # reduce the learning after 20 epochs by a factor of 10
optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=0.0001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)  # reduce the learning after 20 epochs by a factor of 10

In [11]:
def number_of_correct(pred, target):
    # count number of correct predictions
    return pred.squeeze().eq(target).sum().item()


def get_likely_index(tensor):
    # find most likely label index for each element in the batch
    return tensor.argmax(dim=-1)

In [12]:
def train(model, epoch, log_interval):
    model.train()
    losses = []
    correct = 0
    for batch_idx, (data, target) in enumerate(train_loader):

        data = data.to(device)
        target = target.to(device)

        # apply transform and model on whole batch directly on device
        # data = transform(data)
        output = model(data)
        pred = get_likely_index(output)
        correct += number_of_correct(pred, target)
        # negative log-likelihood for a tensor of size (batch x 1 x n_output)
        loss = F.nll_loss(output.squeeze(), target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # print training stats
        if batch_idx % log_interval == 0:
            print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")

        # update progress bar
        # pbar.update(pbar_update)
        # record loss
        losses.append(loss.item())
    
    # Calculate training set accuracy
    acc = 100. * correct / len(train_loader.dataset)
    print(f"\nTraining set: Average loss: {sum(losses) / len(losses):.4f}, Accuracy: {correct}/{len(train_loader.dataset)} ({acc:.0f}%)\n")
    return acc
        

In [13]:
def validation(model, epoch):
    model.eval()
    correct = 0
    for data, target in valid_loader:

        data = data.to(device)
        target = target.to(device)

        # apply transform and model on whole batch directly on device
        # data = transform(data)
        output = model(data)

        pred = get_likely_index(output)
        correct += number_of_correct(pred, target)

        # update progress bar
        # pbar.update(pbar_update)

    acc = 100. * correct / len(valid_loader.dataset)
    print(f"\nValidation Epoch: {epoch}\tAccuracy: {correct}/{len(valid_loader.dataset)} ({acc:.0f}%)\n")
    return acc

In [14]:
log_interval = 20
n_epoch = 40

# pbar_update = 1 / (len(train_loader) + len(test_loader))

train_accuracy_epoch = []
valid_accuracy_epoch = []

# The transform needs to live on the same device as the model and the data.
# transform = transform.to(device)
with tqdm(total=n_epoch) as pbar:
    for epoch in range(1, n_epoch + 1):
        train_accuracy_epoch.append(train(model, epoch, log_interval))
        valid_accuracy_epoch.append(validation(model, epoch))
        scheduler.step()

        # Save model
        os.makedirs("./v2_models", exist_ok=True)
        torch.save(model.state_dict(), f"./v2_models/model_e{epoch}.pt")

  0%|          | 0/40 [00:00<?, ?it/s]


Training set: Average loss: 0.8764, Accuracy: 61860/84843 (73%)


Validation Epoch: 1	Accuracy: 8167/9981 (82%)


Training set: Average loss: 0.4989, Accuracy: 71450/84843 (84%)


Validation Epoch: 2	Accuracy: 8543/9981 (86%)



  0%|          | 0/40 [01:20<?, ?it/s]


KeyboardInterrupt: 

In [None]:
# Plot the training and validation accuracies in the same plot
plt.figure(figsize=(10, 6))
plt.plot(train_accuracy_epoch, label="Train")
plt.plot(valid_accuracy_epoch, label="Validation")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

In [None]:
def predict(tensor):
    model.eval()
    # Use the model to predict the label of the waveform
    tensor = tensor.to(device)
    # tensor = transform(tensor)
    # if tensor shape is not 16000, then the waveform is too short and we need to pad it
    if tensor.numel() != 16000:
        tensor = F.pad(tensor, (0, 16000 - tensor.numel()), "constant", 0.0)

    # # Calculate MFCC
    tensor = extract_mfcc(tensor)
    tensor = torch.squeeze(tensor, (0, 1)).t()

    tensor = model(tensor.unsqueeze(0))
    tensor = get_likely_index(tensor)
    tensor = index_to_label(tensor.squeeze())
    return tensor

correct = 0
test_pred = []
test_target = []
for i, (waveform, sample_rate, label, *_) in enumerate(test_set):
    output = predict(waveform)
    test_pred.append(output)
    test_target.append(label)
    if output == label:
        correct += 1
    # if output != label:
    #     ipd.Audio(waveform.numpy(), rate=sample_rate)
    #     print(f"Data point #{i}. Expected: {label}. Predicted: {output}.")
    #     # break
# else:
#     print("All examples in this dataset were correctly classified!")
#     print("In this case, let's just look at the last data point")
#     ipd.Audio(waveform.numpy(), rate=sample_rate)
#     print(f"Data point #{i}. Expected: {utterance}. Predicted: {output}.")
        
print(f"Accuracy: {correct}/{len(test_set)} ({100. * correct / len(test_set):.0f}%)")

In [None]:
# Confusion matrix using pytorch
from ignite.metrics.confusion_matrix import ConfusionMatrix


In [None]:
import sounddevice as sd
print(sd.query_devices())
sd.default.device = "Leo's iPhone 13 Microphone"
def record(seconds=5, sample_rate=16000):
    # Make a 1s recording
    print("Start recording.")
    recording = sd.rec(int(seconds * sample_rate), samplerate=sample_rate, channels=1)
    sd.wait()
    
    # Define the file format
    fileformat = "wav"
    filename = f"_audio.{fileformat}"
    # Write the recording to a file using scipy wavfile
    wavfile.write(filename, sample_rate, recording)
    return torchaudio.load(filename)

# Detect whether notebook runs in google colab
record_wav, sample_rate = record()
# sample_rate, record_wav = wavfile.read("_audio.wav")
# Check if record_wav is a torch tensor
if not isinstance(record_wav, torch.Tensor):
    record_wav = torch.tensor(record_wav, dtype=torch.float32)
record_wav = torch.reshape(record_wav, (1, -1))
print(f"Predicted: {predict(record_wav)}.")
ipd.Audio(record_wav, rate=sample_rate)

In [None]:
if not isinstance(record_wav, torch.Tensor):
    record_wav = torch.tensor(record_wav, dtype=torch.float32)
record_wav = torch.reshape(record_wav, (1, -1))
print(f"Predicted: {predict(record_wav)}.")