In [None]:
# sample execution (requires torchvision)
import torch
from torchvision import transforms
import torch.optim as optim
import torch.nn as nn
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split
import numpy as np
import glob, os
import soundfile as sf
import librosa
from sklearn.metrics import accuracy_score, roc_curve
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
import sys
from google.colab import drive
drive.mount('/content/drive')
sys.path.append("/content/drive/My Drive/Master/ASR")

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

## Preprocessing

In [None]:
testfile = open("/content/drive/My Drive/Master/ASR/test.txt", "r")
testlist = testfile.readlines()
testlist = [file[:-1]+".flac" for file in testlist]

trainfile = open("/content/drive/My Drive/Master/ASR/train.txt", "r")
trainlist = trainfile.readlines()
trainlist = [file[:-1]+".flac" for file in trainlist]

In [None]:
testnames = []
trainnames = []
trainlabels = []
testlabels = []
for filename in glob.iglob('/content/drive/My Drive/Master/ASR/LibriSpeech/dev-clean/*/**', recursive=True):
    if os.path.isfile(filename) and '.flac' in filename: # filter dirs
        name = filename
        if name.split('/')[-1] in trainlist:
            trainnames.append(name)
            label = name.split('/')[-3]
            trainlabels.append(label)
        elif name.split('/')[-1] in testlist:
            testnames.append(name)
            label = name.split('/')[-3]
            testlabels.append(label)

In [None]:
# Encoding the Labels as One-Hot
label_encoder = LabelEncoder()
train_labels = label_encoder.fit_transform(trainlabels)
test_labels = label_encoder.fit_transform(testlabels)
n_classes = len(np.unique(train_labels))
print("nclasses:", n_classes)
binarize = LabelBinarizer(neg_label=0, pos_label=1, sparse_output=False)
train_labels = binarize.fit_transform(train_labels)
test_labels = binarize.fit_transform(test_labels)

In [None]:
def getfiles(names, labels):
    output_data = []
    output_labels = []
    for i, n in enumerate(names):
        data, Fs = sf.read(n)
        mfcc = librosa.feature.mfcc(data, Fs, n_mfcc=40)[:,:100]
        if mfcc.shape[1]==100:
            dat = [mfcc, mfcc, mfcc]
            output_data.append(dat)
            output_labels.append(labels[i])
    return output_data, output_labels

x_trainval, y_trainval = getfiles(trainnames, train_labels)
x_test, y_test = getfiles(testnames, test_labels)
x_train, x_val, y_train, y_val = train_test_split(x_trainval, y_trainval, train_size = 0.9)

In [None]:
x_train = torch.tensor(x_train).permute(0, 1, 3, 2).to(device)
x_test = torch.tensor(x_test).permute(0, 1, 3, 2).to(device)
x_val = torch.tensor(x_val).permute(0, 1, 3, 2).to(device)
y_train = torch.DoubleTensor(y_train).to(device)
y_test = torch.DoubleTensor(y_test).to(device)
y_val = torch.DoubleTensor(y_val).to(device)

In [None]:
trainingdat = torch.utils.data.TensorDataset(x_train, y_train)

trainloader = torch.utils.data.DataLoader(trainingdat, batch_size=5, shuffle=True)

## Model

In [None]:
model = torch.hub.load('pytorch/vision:v0.6.0', 'resnet18', pretrained=False)
model.double()
model = nn.Sequential(*list(model.children())[:-2]) #Taking out averaging and FC layers
#print(model)

n_k = 4
n_c = 16 
beta = .0001

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.ResNet = model
        self.linear1 = nn.Linear(1024, n_c)
        self.linear2 = nn.Linear(n_c, n_k)
        self.bn1 = nn.BatchNorm1d(4)
        self.pool_time = nn.AdaptiveAvgPool2d((1, 1024))
        self.fc1 = nn.Linear(1024, 256)
        self.fc2 = nn.Linear(256, n_classes)

    def forward(self, x):
        # RESNET LAYERS
        x1 = self.ResNet(x)
        
        # CONVERT X FOR SELF-ATTENTION
        x = x1.permute(0, 2, 1, 3) #For some reason the dimensions seemed to be in different order?
        x = nn.Flatten(2,3)(x)
        
        # SELF-ATTENTION
        a = self.linear1(x)
        A = nn.Softmax(1)(self.linear2(nn.Tanh()(a)))
        A = A.permute(0,2,1)
        x = torch.matmul(A, x)
        x = self.bn1(x)
        
        # FINAL LAYERS
        x = self.pool_time(x)
        x = nn.Flatten(1,2)(x)
        x = self.fc1(x)
        x = self.fc2(x)
        x = nn.Softmax(1)(x)        
        return x, A

net = Net()
net.to(device)
net.double()
#print(net)
tensor1 = torch.randn(2, 3, 100, 40).double().to(device)
output, _ = net(tensor1)

In [None]:
print(net)

In [None]:
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

In [None]:
def entrop_loss_function(prds, actual):
    crossentropy = -torch.sum(torch.sum(actual * torch.log(prds), dim=1))
    return crossentropy

In [None]:
def p_loss_function(A):
    Asum = torch.sum(A, dim=0)
    mat = torch.mm(Asum, Asum.T)
    mat = mat - torch.eye(n_k).double().to(device)
    l = torch.norm(mat, p='fro')**2
    return(l)

## Training

In [None]:
#net.load_state_dict(torch.load())
import time
start = time.time()
acc_train = []
acc_val = []
eps = []

for epoch in range(26):  # loop over the dataset multiple times
    
    with torch.no_grad():
      torch.save(net.state_dict(), "/content/drive/My Drive/Master/ASR/weights/2net_weights_%d.mdl" % (epoch))

      print("Epoch:", epoch)
      eps.append(epoch)
          
      preds, _ = net(x_train)
      preds = (preds == torch.max(preds, dim=1, keepdim=True)[0]).type(torch.int).to(device)
      trainingacc = accuracy_score(y_train.cpu(), preds.cpu())
      print("Training accuracy:", trainingacc)
      acc_train.append(trainingacc)

      preds1org, _ = net(x_val)
      preds1 = (preds1org == torch.max(preds1org, dim=1, keepdim=True)[0]).type(torch.int).to(device)
      validacc = accuracy_score(y_val.cpu(), preds1.cpu())
      print("Validation accuracy:", validacc)
      acc_val.append(validacc)

      print("loss", running_loss)

      print("")

    for i, data in enumerate(trainloader, 0):
        
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data[0].to(device), data[1].to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward step
        outputs, A = net(inputs)
        
        # CALCULATING THE LOSS
        entrop_loss = entrop_loss_function(outputs, labels)
        p_loss = p_loss_function(A)
        loss = entrop_loss + beta * p_loss
        #print(loss)
        loss.backward()
        
        optimizer.step()
    
print('Finished Training')
end = time.time()
print(end - start)

## Testing

In [None]:
preds, _ = net(x_test)
preds = (preds == torch.max(preds, dim=1, keepdim=True)[0]).type(torch.int)
print("Training loss:", accuracy_score(y_test.cpu(), preds.cpu()))

In [None]:
plt.rcParams["figure.figsize"] = (13, 5)
plt.rc('font', size=14)
sns.lineplot(x=eps, y=acc_train, label="Train Accuracy")
sns.lineplot(x=eps, y=acc_val, label="Validation Accuracy")
plt.legend(loc="lower right")
plt.xlabel("Epoch")
plt.ylabel("Accuracy Value")

In [None]:
preds_values = []
for pred in preds:
    preds_values.append(pred.argmax().item())

test_values = []
for y in y_test:
    test_values.append(y.argmax().item())

In [None]:
data = {'y_Actual': test_values, 'y_Predicted': preds_values}
df = pd.DataFrame(data, columns=['y_Actual', 'y_Predicted'])
df.rename(columns={"0.0": "Unseen", "1.0": "Seen"})
confusion = pd.crosstab(df['y_Actual'],
                        df['y_Predicted'],
                        rownames=['Actual'],
                        colnames=['Predicted'])
plt.figure(figsize=(15, 10))
plt.rc('font', size=14)
sns.heatmap(confusion,fmt='g', square=True)