In [None]:
!pip install pydub torch==1.7.0+cu101 torchvision==0.8.1+cu101 torchaudio==0.7.0 -f https://download.pytorch.org/whl/torch_stable.html

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio
import sys

import matplotlib.pyplot as plt
import IPython.display as ipd

from tqdm import tqdm

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in links: https://download.pytorch.org/whl/torch_stable.html




In [None]:
class BernoulliDynamicConvolution(nn.Module):
    def __init__(self, n_input=1, n_output=35, stride=16, n_channel=32):
        super().__init__()
        
        #initiaising random query, key, value matrices
        self.Q = torch.randn(1, 8000, 1).repeat(256, 1, 1).to(device)
        self.K = torch.randn(1, 8000, 1).repeat(256, 1, 1).to(device)
        self.V = torch.randn(1, 8000, 1).repeat(256, 1, 1).to(device)

        self.conv1 = nn.Conv1d(n_input, n_channel, kernel_size=80, stride=stride)
        self.multihead_attn = nn.MultiheadAttention(embed_dim, num_heads)# only attention layer
        self.bn1 = nn.BatchNorm1d(n_channel)
        self.pool1 = nn.MaxPool1d(4)

        self.conv2 = nn.Conv1d(n_channel, 2 * n_channel, kernel_size=3)
        self.bn2 = nn.BatchNorm1d(2 * n_channel)
        self.pool2 = nn.MaxPool1d(4)
        self.conv3 = nn.Conv1d(2 * n_channel, 2 * n_channel, kernel_size=3)
        self.bn3 = nn.BatchNorm1d(2 * n_channel)
        self.pool3 = nn.MaxPool1d(4)
        self.fc1 = nn.Linear(2 * n_channel, n_output)

    def forward(self, x):
        print("SIZE OF X IS", x.shape)
        print("SIZE OF Q IS", self.Q.shape)

        #taking matrix dot product x . Q, x . V, x . K by batch
        self.Q = torch.bmm(x, self.Q)
        self.K = torch.bmm(x, self.K)
        self.V = torch.bmm(x, self.V)
        print("shape of Q, K, V: ", self.Q.shape, self.K.shape, self.V.shape)
        
        #1. applying conv 1D
        x = self.conv1(x)
        print("shape of x ", x.shape)

        #2. calculating attention weights using Q, K, V
        attn_output, attn_output_weights = self.multihead_attn(self.Q, self.K, self.V)
        print("shape of attention weights ", attn_output_weights.shape)

        #3. applying bernoulli function to convert attention matrix to bernoulli matrix
        attn_output_weights.detach().apply_(bernoulli_matrix)

        #4. dot product of conv1d and attention matrix
        x = torch.mul(attn_output_weights, x)
        print("shape of dot product between x and attention weights ", x.shape)

        x = F.relu(self.bn1(x))
        x = self.pool1(x)

        #2nd conv 1d layer
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        x = self.pool2(x)
        print("shape of x after 2nd conv ", x.shape)

        #3rd conv layer
        x = self.conv3(x)
        x = F.relu(self.bn3(x))
        x = self.pool3(x)
        print("shape of x after 3rd conv ", x.shape)
        
        x = F.avg_pool1d(x, x.shape[-1])
        x = x.permute(0, 2, 1)
        x = self.fc1(x)
        return F.log_softmax(x, dim=2)



model = BernoulliDynamicConvolution(n_input=transformed.shape[0], n_output=len(labels))
model.to(device)
print(model)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [None]:
from torchaudio.datasets import SPEECHCOMMANDS
import os

#Subsetting the speech commands dataset into train, test, val
class SubsetSC(SPEECHCOMMANDS):
    def __init__(self, subset: str = None):
        super().__init__("./", download=True)

        def load_list(filename):
            filepath = os.path.join(self._path, filename)
            with open(filepath) as fileobj:
                return [os.path.normpath(os.path.join(self._path, line.strip())) for line in fileobj]

        if subset == "validation":
            self._walker = load_list("validation_list.txt")
        elif subset == "testing":
            self._walker = load_list("testing_list.txt")
        elif subset == "training":
            excludes = load_list("validation_list.txt") + load_list("testing_list.txt")
            excludes = set(excludes)
            self._walker = [w for w in self._walker if w not in excludes]

train_set = SubsetSC("training")
test_set = SubsetSC("testing")

#taking a sample from the train set to transform it
waveform, sample_rate, label, speaker_id, utterance_number = train_set[0]

In [None]:
#list of labels present in the dataset
labels = sorted(list(set(datapoint[2] for datapoint in train_set)))

In [None]:
#applying resample transform on a train set sample
new_sample_rate = 8000
transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=new_sample_rate)
transformed = transform(waveform)

ipd.Audio(transformed.numpy(), rate=new_sample_rate)

In [None]:
#Mapping functions between the index of a label and the actual label
def label_to_index(word):
    return torch.tensor(labels.index(word))

def index_to_label(index):
    return labels[index]

In [None]:
#padding shorter sequences in the batch 
def pad_sequence(batch):
    batch = [item.t() for item in batch]
    batch = torch.nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=0.)
    return batch.permute(0, 2, 1)

#putting together the padded sequences to match the sizes of all sequences to be the same
def collate_fn(batch):
    tensors, targets = [], []
    for waveform, _, label, *_ in batch:
        tensors += [waveform]
        targets += [label_to_index(label)]
    tensors = pad_sequence(tensors)
    targets = torch.stack(targets)

    return tensors, targets


batch_size = 256

if device == "cuda":
    num_workers = 1
    pin_memory = True
else:
    num_workers = 0
    pin_memory = False


In [None]:
#initialising dataloaders
train_loader = torch.utils.data.DataLoader(
    train_set,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=num_workers,
    pin_memory=pin_memory,
)
test_loader = torch.utils.data.DataLoader(
    test_set,
    batch_size=batch_size,
    shuffle=False,
    drop_last=False,
    collate_fn=collate_fn,
    num_workers=num_workers,
    pin_memory=pin_memory,
)

In [None]:
#bernoulli funciton, takes probablility of success
#used for turning attention matrix to bernoulli matrix
def bernoulli_matrix(p_success):
    return 1.0 if p_success >= 0.5 else 0.0

"""def BernoulliDynamicConvolution(X, Q, K, V):
    multihead_attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
    attn_output, attn_output_weights = multihead_attn(Q, K, V)

    attn_output_weights.detach().apply_(bernoulli_matrix)

    conv1d = torch.nn.Conv1d(in_channels=3, out_channels=3, kernel_size=3, stride=1)
    conv1d_output = conv1d(X)

    return torch.mul(attn_output_weights, conv1d_output)
"""
embed_dim=8000
num_heads=1

In [None]:
class BernoulliDynamicConvolution(nn.Module):
    def __init__(self, n_input=1, n_output=35, stride=16, n_channel=32):
        super().__init__()
        
        #initiaising random query, key, value matrices
        self.Q = torch.randn(1, 8000, 1).repeat(256, 1, 1).to(device)
        self.K = torch.randn(1, 8000, 1).repeat(256, 1, 1).to(device)
        self.V = torch.randn(1, 8000, 1).repeat(256, 1, 1).to(device)

        self.conv1 = nn.Conv1d(n_input, n_channel, kernel_size=80, stride=stride)
        self.multihead_attn = nn.MultiheadAttention(embed_dim, num_heads)# only attention layer
        self.bn1 = nn.BatchNorm1d(n_channel)
        self.pool1 = nn.MaxPool1d(4)

        self.conv2 = nn.Conv1d(n_channel, 2 * n_channel, kernel_size=3)
        self.bn2 = nn.BatchNorm1d(2 * n_channel)
        self.pool2 = nn.MaxPool1d(4)
        self.conv3 = nn.Conv1d(2 * n_channel, 2 * n_channel, kernel_size=3)
        self.bn3 = nn.BatchNorm1d(2 * n_channel)
        self.pool3 = nn.MaxPool1d(4)
        self.fc1 = nn.Linear(2 * n_channel, n_output)

    def forward(self, x):
        print("SIZE OF X IS", x.shape)
        print("SIZE OF Q IS", self.Q.shape)

        #taking matrix dot product x . Q, x . V, x . K by batch
        self.Q = torch.bmm(x, self.Q)
        self.K = torch.bmm(x, self.K)
        self.V = torch.bmm(x, self.V)
        print("shape of Q, K, V: ", self.Q.shape, self.K.shape, self.V.shape)
        
        #1. applying conv 1D
        x = self.conv1(x)
        print("shape of x ", x.shape)

        #2. calculating attention weights using Q, K, V
        attn_output, attn_output_weights = self.multihead_attn(self.Q, self.K, self.V)
        print("shape of attention weights ", attn_output_weights.shape)

        #3. applying bernoulli function to convert attention matrix to bernoulli matrix
        attn_output_weights.detach().apply_(bernoulli_matrix)

        #4. dot product of conv1d and attention matrix
        x = torch.mul(attn_output_weights, x)
        print("shape of dot product between x and attention weights ", x.shape)

        x = F.relu(self.bn1(x))
        x = self.pool1(x)

        #2nd conv 1d layer
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        x = self.pool2(x)
        print("shape of x after 2nd conv ", x.shape)

        #3rd conv layer
        x = self.conv3(x)
        x = F.relu(self.bn3(x))
        x = self.pool3(x)
        print("shape of x after 3rd conv ", x.shape)
        
        x = F.avg_pool1d(x, x.shape[-1])
        x = x.permute(0, 2, 1)
        x = self.fc1(x)
        return F.log_softmax(x, dim=2)



model = BernoulliDynamicConvolution(n_input=transformed.shape[0], n_output=len(labels))
model.to(device)
print(model)

BernoulliDynamicConvolution(
  (conv1): Conv1d(1, 32, kernel_size=(80,), stride=(16,))
  (multihead_attn): MultiheadAttention(
    (out_proj): _LinearWithBias(in_features=8000, out_features=8000, bias=True)
  )
  (bn1): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool1): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv1d(32, 64, kernel_size=(3,), stride=(1,))
  (bn2): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool2): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv1d(64, 64, kernel_size=(3,), stride=(1,))
  (bn3): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool3): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=64, out_features=35, bias=True)
)


In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=0.0001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)

In [None]:
def train(model, epoch, log_interval):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):

        data = data.to(device)
        target = target.to(device)

        data = transform(data)
        output = model(data)

        loss = F.nll_loss(output.squeeze(), target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch_idx % log_interval == 0:
            print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")

        pbar.update(pbar_update)
        print("ITER_", batch_idx)
        losses.append(loss.item())

In [None]:
def number_of_correct(pred, target):
    return pred.squeeze().eq(target).sum().item()


def get_likely_index(tensor):
    return tensor.argmax(dim=-1)


def test(model, epoch):
    model.eval()
    correct = 0
    for data, target in test_loader:

        data = data.to(device)
        target = target.to(device)

        data = transform(data)
        output = model(data)

        pred = get_likely_index(output)
        correct += number_of_correct(pred, target)

        pbar.update(pbar_update)

    print(f"\nTest Epoch: {epoch}\tAccuracy: {correct}/{len(test_loader.dataset)} ({100. * correct / len(test_loader.dataset):.0f}%)\n")

In [None]:
log_interval = 20
n_epoch = 2

pbar_update = 1 / (len(train_loader) + len(test_loader))
losses = []

transform = transform.to(device)
with tqdm(total=n_epoch) as pbar:
    for epoch in range(1, n_epoch + 1):
        train(model, epoch, log_interval)
        test(model, epoch)
        scheduler.step()

In [None]:
def predict(tensor):
    tensor = tensor.to(device)
    tensor = transform(tensor)
    tensor = model(tensor.unsqueeze(0))
    tensor = get_likely_index(tensor)
    tensor = index_to_label(tensor.squeeze())
    return tensor


waveform, sample_rate, utterance, *_ = train_set[-1]
ipd.Audio(waveform.numpy(), rate=sample_rate)

print(f"Expected: {utterance}. Predicted: {predict(waveform)}.")