In [1]:
import glob
# !pip install librosa # in colab, you’ll need to install this
import librosa
import numpy as np
import torchvision
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
import torch.nn.functional as F
import pandas as pd
from random import sample
import matplotlib.pyplot as plt
import soundfile
import os
import pickle

In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [96]:
def convert_row(row):
    '''
    Converts row into 2d tensor after stft on audio data
    :parameter row: data to be converted
    '''
    s = row.numpy()
    S = librosa.stft(s, n_fft=1024, hop_length=512)
    S_abs = np.abs(S)
    
    return torch.from_numpy(S.T), torch.from_numpy(S_abs.T)


def pickle_open(filename):
    '''
    Opens the pickle file
    :parameter filename: name of the file
    '''
    with open(filename, 'rb') as file:
        open_file = pickle.load(file)
        
        return torch.tensor(open_file)

    
def create_stack(data, file_name):
    '''
    Stacks each audio row in data after stft and save file
    :parameter data: data to be applied stft
    :parameter filename: name of the file
    '''
    temp_data = []
    temp_data_abs = []
    S_shape = (32, 513)
    
    for row in data:
        S_T, S_abs_T = convert_row(row)
        
        S_T = F.pad(S_T, 
                  pad=(0, 0, 0, S_shape[0] - S_T.shape[0]))
        S_abs = F.pad(S_abs_T, 
                      pad=(0, 0, 0, S_shape[0] - S_abs_T.shape[0]))
        
        temp_data.append(S_T)
        temp_data_abs.append(S_abs_T)
            
    temp_data = torch.stack(temp_data)
    temp_data_abs = torch.stack(temp_data_abs)
       
    torch.save(temp_data, f'{file_name}_tensor.pt') 
    torch.save(temp_data_abs, f'{file_name}_abs_tensor.pt') 
    
    return temp_data, temp_data_abs
        

    
    
def create_data(data, no_of_pairs, file_name):
    all_pairs_x1 = []
    label = []
    all_indices = set([i for i in range(len(data))])
    
    for i in range(0, len(data), 10):
        sample_indices_pos = [j for j in range(i, i + 10)]
        sample_indices_neg = list(all_indices - set(sample_indices_pos))
        
        for j in range(no_of_pairs):
            pair = np.random.choice(sample_indices_pos, 
                                    replace=True, 
                                    size=2)
            pair_data_1 = data[pair[0]]
            pair_data_2 = data[pair[1]]
            
            pair_neg = np.random.choice(sample_indices_neg, 
                                        replace=True,
                                        size=1)
            pair_neg = data[pair_neg].squeeze()
            
            all_pairs_x1.append(torch.Tensor(
                np.stack([pair_data_1, pair_data_2])))
            label.append(torch.Tensor([1]))
            
            all_pairs_x1.append(torch.Tensor(
                np.stack([pair_data_1, pair_neg])))
            label.append(torch.Tensor([0]))
            
                
    all_pairs_x1 = torch.stack(all_pairs_x1)
    label = torch.stack(label)
    
    torch.save(all_pairs_x1, f'{file_name}_pair.pt') 
    torch.save(label, f'{file_name}_label.pt') 
    
    return all_pairs_x1, label
            
# tensor_tes = pickle_open(file_tes)
# tensor_trs = pickle_open(file_trs)

# print('Test shape ', tensor_tes.shape)
# print('Train shape ', tensor_trs.shape)

class PrepareData(Dataset):
    def __init__(self, X, y):
        super(PrepareData, self).__init__()
        self.X = X
        self.y = y
        
    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]
    
    
def create_dataloader(x, y, batchsize):
    data = PrepareData(x, y)
   
    return DataLoader(dataset=data, shuffle=False, 
                      batch_size=batchsize)
    
class SiameseNetwork(nn.Module):
    def __init__(self, parameters):
            super(SiameseNetwork, self).__init__()

            self.type_init = parameters['type_init']
            self.activation = parameters['activation']
            self.dropout = parameters['dropout']

            self.lstm = nn.LSTM(input_size=parameters['in_dim'],
                                hidden_size=parameters['hidden_dim'],
                                num_layers=parameters['num_layers'],
                                batch_first=True,
                                # dropout=parameters['dropout'],
                               )
            
            self.sigmoid = nn.Sigmoid()
            self.flatten = nn.Flatten()
            self.l = nn.Linear(parameters['mid_dim'], 
                                parameters['out_dim'])
            if self.type_init == 'xavier':
                for name, param in self.lstm.named_parameters():
                    if 'bias' in name:
                         nn.init.constant(param, 0.0)
                    elif 'weight' in name:
                         nn.init.xavier_normal(param)

    def forward_one(self, x):
        x, (h,c) = self.lstm(x)
        x = F.relu(x)
        
        return x
    
    def forward(self, x_1, x_2):
        x_1 = self.forward_one(x_1)
        x_2 = self.forward_one(x_2)
        
        
        x_1 = self.flatten(x_1)
        x_1 = self.l(x_1)
        
        x_2 = self.flatten(x_2)
        x_2 = self.l(x_2)
        
        x = self.sigmoid((x_1 * x_2))
        
        return x
                         
def train(model, dataloader, 
          loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.to(device)
    model.train()
    
    for batch, X  in enumerate(dataloader):
        y = X[1]
        X_1 = X[0][:,0,:,:]
        X_2 = X[0][:,1,:,:]
       
        # Compute prediction error
        pred = model(X_1, X_2)
        loss = loss_fn(pred, y)
        
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if batch % 20 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test(dataloader, model, loss_fn):
    pred_y_all = torch.Tensor()
    org_y = torch.Tensor()
    input_x = torch.Tensor()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)

    model.eval()

    test_loss, correct = 0, 0

    with torch.no_grad():
        for batch, X  in enumerate(dataloader):
            y = X[1]
            X_1 = X[0][:,0,:,:]
            X_2 = X[0][:,1,:,:]
       
            # Compute prediction error
            pred = model(X_1, X_2)
            test_loss += loss_fn(pred, y).item()
            pred_y = torch.where(pred > 0.5, 1, 0)
            
            correct += (pred_y == y).type(torch.int).sum().item()

    test_loss /= num_batches
    correct /= size
    accuracy = round(100 * correct, 2)
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

        
def get_audio_data(parameters):
    '''
    Reads pickled audio data, create stack
    :parameter parametes: parametes required for the functions
    '''
    tensor_tes = pickle_open(parameters['file_tes'])
    tensor_trs = pickle_open(parameters['file_trs'])
    
    test_data, test_abs = create_stack(tensor_tes, 'hw4_test')
    train_data, train_abs = create_stack(tensor_trs, 'hw4_train')
    
    train_x, train_y = create_data(train_data, 
                                       parameters['no_of_pairs'],
                                       'hw4_train')
    test_x, test_y = create_data(test_data, 
                         parameters['no_of_pairs'],
                         'hw4_test',
                        )
    
    train_dataloader = create_dataloader(train_x, 
                                   train_y,
                                   parameters['batch']
                                  )
    test_dataloader = create_dataloader(test_x, 
                                  test_y,
                                  parameters['batch']
                                 )
    model = SiameseNetwork(parameters)
       
    model.train()
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), 
                                      lr=parameters['lr_rate'])
    
    for t in range(parameters['epoch']):
        print(f"Epoch {t+1}\n-------------------------------")
        train(model, train_dataloader, loss_fn, optimizer)
        test(test_dataloader, model, loss_fn)

        
parameters = {
    'file_tes': 'hw4_tes.pkl',
    'file_trs': 'hw4_trs.pkl',
    'no_of_pairs': 20,
    'batch': 40,
    'in_dim': 513,
    'out_dim': 1,
    'hidden_dim': 20,
    'num_layers': 1,
    'activation': 'ReLU',
    'mid_dim': 640,
    'type_init': 'xavier',
    'lr_rate': 0.01,
    'epoch': 100,
    'dropout':None,
}


get_audio_data(parameters)

  nn.init.xavier_normal(param)
  nn.init.constant(param, 0.0)


Epoch 1
-------------------------------
loss: 0.249946  [    0/ 2000]
loss: 0.251287  [   40/ 2000]
loss: 0.247133  [   80/ 2000]
Test Error: 
 Accuracy: 52.8%, Avg loss: 0.249036 

Epoch 2
-------------------------------
loss: 0.241551  [    0/ 2000]
loss: 0.211624  [   40/ 2000]
loss: 0.211793  [   80/ 2000]
Test Error: 
 Accuracy: 51.2%, Avg loss: 0.250979 

Epoch 3
-------------------------------
loss: 0.257688  [    0/ 2000]
loss: 0.226461  [   40/ 2000]
loss: 0.243812  [   80/ 2000]
Test Error: 
 Accuracy: 52.9%, Avg loss: 0.249198 

Epoch 4
-------------------------------
loss: 0.234376  [    0/ 2000]
loss: 0.183903  [   40/ 2000]
loss: 0.224086  [   80/ 2000]
Test Error: 
 Accuracy: 55.8%, Avg loss: 0.252473 

Epoch 5
-------------------------------
loss: 0.227878  [    0/ 2000]
loss: 0.131539  [   40/ 2000]
loss: 0.213641  [   80/ 2000]
Test Error: 
 Accuracy: 51.5%, Avg loss: 0.256560 

Epoch 6
-------------------------------
loss: 0.235046  [    0/ 2000]
loss: 0.104957  [   