In [None]:
import sys
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.utils.rnn as rnn_utils
import torch.optim as optim
import torch.nn.utils as utils
import seaborn as sns
import matplotlib.pyplot as plt
import time
import random
from torch.utils import data
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from torch.utils.data import Dataset, DataLoader
from typing import List, Tuple, Dict
import os
import glob
import tqdm

In [43]:
# These are the various characters in the transcripts of the datasetW
VOCAB = ['<sos>',   
         'A',   'B',    'C',    'D',    
         'E',   'F',    'G',    'H',    
         'I',   'J',    'K',    'L',       
         'M',   'N',    'O',    'P',    
         'Q',   'R',    'S',    'T', 
         'U',   'V',    'W',    'X', 
         'Y',   'Z',    "'",    ' ', 
         '<eos>']

VOCAB_MAP = {VOCAB[i]:i for i in range(0, len(VOCAB))}

SOS_TOKEN = VOCAB_MAP["<sos>"]
EOS_TOKEN = VOCAB_MAP["<eos>"]

BATCH_SIZE = 96

In [62]:
class MFCCDataset:
    def __init__(self, data_path, vocab_map, val = False, cep_norm = True):
        """
        Let's access the datapaths for the input and the labels in this sections 
        x: MFCC path
        y: Transcripts 

        1) Load all the data a-priori in the init for faster training. 
        2) Cepstral normalization :  
        """
        self.val = val
        self.cep_norm = cep_norm 
        if self.val:
            self.x =  str(data_path)+"\\dev-clean\\mfcc\\*.npy" 
            self.y =  str(data_path)+"\\dev-clean\\transcript\\*.npy"
        else: 
            self.x = str(data_path)+"\\train-clean-100\\mfcc\\*.npy"
            self.y = str(data_path)+"\\train-clean-100\\transcript\\raw\\*.npy"

        self.mfcc_list = sorted(glob.glob(self.x))
        self.transcript_list = sorted(glob.glob(self.y))
        self.alphabets = vocab_map
       
    def __len__(self):
        
        return len(self.mfcc_list)

    def __getitem__(self, index):
        """
        cepstral normalization performed here for higher SNR 
        """

        if self.val:
            mf_temp = np.load(self.mfcc_list[index], allow_pickle= True)
            tr_temp = np.load(self.transcript_list[index], allow_pickle= True)
            tr_temp = [self.alphabets[ele] for ele in tr_temp]
            if self.cep_norm:
                mf_temp = (mf_temp - np.mean(mf_temp, axis = 0))/ np.std(mf_temp)
            
            return torch.tensor(mf_temp)
        
        else: 
            mf_temp = np.load(self.mfcc_list[index], allow_pickle= True)
            tr_temp = np.load(self.transcript_list[index], allow_pickle= True)
            
            # Converting the alphabets in the labels to integers using the pre-defined map provided 
            tr_temp = [self.alphabets[ele] for ele in tr_temp]

            if self.cep_norm:
                mf_temp = (mf_temp - np.mean(mf_temp, axis = 0))/ np.std(mf_temp)
            return torch.tensor(mf_temp), torch.tensor(tr_temp)

#Collate function for uniform padding of the input sequences 

def collate_train_val(data): 
    
    (xx, yy) = zip(*data)
    x_lens = [len(x) for x in xx]
    y_lens = [len(y) for y in yy]

    xx_pad = pad_sequence(xx,batch_first=True)
    yy_pad = pad_sequence(yy,batch_first=True)

    x_lens = np.asarray(x_lens)
    y_lens = np.asarray(y_lens)
    # Some augmentation and masking here may help the network converge better. 

        
    return xx_pad, yy_pad, torch.tensor(x_lens), torch.tensor(y_lens)


In [63]:
# Dataset and dataloader sections 
data_path = "C:\\Users\\thopa\Desktop\\Assignments\\11685\\HW4\\2022Implementation\\11-785-f22-hw4p2\\hw4p2"
train_data = MFCCDataset(data_path, vocab_map= VOCAB_MAP)
val_data = MFCCDataset(data_path,vocab_map= VOCAB_MAP, val = True)

train_loader = DataLoader(train_data , collate_fn= collate_train_val , shuffle = True, pin_memory= True)
val_loader = DataLoader(val_data, collate_fn = collate_train_val, shuffle= False, pin_memory = True)


## Neural Network and Training 

### Silly notes for reference 
When training RNN (LSTM or GRU or vanilla-RNN), it is difficult to batch the variable length sequences. For example: if the length of sequences in a size 8 batch is [4,6,8,5,4,3,7,8], you will pad all the sequences and that will result in 8 sequences of length 8. You would end up doing 64 computations (8x8), but you needed to do only 45 computations. Moreover, if you wanted to do something fancy like using a bidirectional-RNN, it would be harder to do batch computations just by padding and you might end up doing more computations than required.

Instead, PyTorch allows us to pack the sequence, internally packed sequence is a tuple of two lists. One contains the elements of sequences. Elements are interleaved by time steps (see example below) and other contains the size of each sequence the batch size at each step. This is helpful in recovering the actual sequences as well as telling RNN what is the batch size at each time step. This has been pointed by @Aerin. This can be passed to RNN and it will internally optimize the computations.

In [121]:
class PBLSTM(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(PBLSTM,self).__init__()

        self.blstm = nn.LSTM(input_size = input_size, hidden_size = hidden_size, num_layers = 2, batch_first = True, bidirectional = True, dropout = 0.3)
    
    def reshape(self, x, x_lens):
        # Reshaping for concatenation / reducing dimensions
        batch, rows, cols = x.shape[0], x.shape[1], x.shape[2]

        if (rows % 2 != 0):
            x = x[:,:-1,:]
        x = x.reshape(batch, int(rows/2), cols*2)
        x_lens = x_lens//2
    
        return x, x_lens
    
    def forward(self,x):
        """
        Computational savings and original sequence recovery using the pack padded and pad packed routine 
        """
        x_pad, x_pad_lens = pad_packed_sequence(x, batch_first=True)
        x, x_lens = self.reshape(x_pad, x_pad_lens.to("cuda"))
        input = rnn_utils.pack_padded_sequence(x, lengths = x_lens.cpu(), batch_first= True, enforce_sorted= False)
        rnn_out, _ = self.blstm(input)
        #output, lens = rnn_utils.pad_packed_sequence(rnn_out, batch_first= True)
        
        return rnn_out

In [122]:
class Encoder(nn.Module):
    def __init__(self, input_size, encoder_hidden_size):
        super(Encoder, self).__init__()
        """
        The encoder used is a pyramidal-BiLSTM for matching the input rate and the speech transcription rate which is about 8:1. 
        This model is sigificantly influenced by the LAS paper 
        LAS: Chan, William, et al. "Listen, attend and spell." arXiv preprint arXiv:1508.01211 (2015).

        [REF: B. Raj, Deep Learning Carnegie Mellon University]
        The pBLSTM is a variant of Bi-LSTMs that downsamples sequences by a factor of 2 by concatenating
        adjacent pairs of inputs before running a conventional Bi-LSTM on the reduced-length sequence. So, given
        an input vector sequence X0, X1, X2, X3, . . . XN−1, the pBLSTM first concatenates adjacent pairs of vectors
        as [X0, X1], [X2, X3], . . . [XN−2, XN−1], and then computes a regular BiLSTM on the reshaped input.

        -) Initial Bi-LSTM 
        -) 3x Pyramidal Bi-LSTM 
        """    
        self.base_lstm = nn.LSTM(input_size = input_size, hidden_size = encoder_hidden_size, num_layers = 1, batch_first = True, bidirectional = True, dropout = 0.1)
        self.pblstm = nn.Sequential(PBLSTM(4*encoder_hidden_size,encoder_hidden_size),PBLSTM(4*encoder_hidden_size,encoder_hidden_size),PBLSTM(4*encoder_hidden_size,encoder_hidden_size))
        
    def forward(self, x, x_lens):
        pack_padd_out = pack_padded_sequence(x, x_lens.to('cpu'),batch_first=True, enforce_sorted=False)
        #print(type(pack_padd_out))
        out_lstm, _  = self.base_lstm(pack_padd_out)
        encoder_outputs = self.pblstm(out_lstm)
        encoder_outputs, encoder_lens = pad_packed_sequence(encoder_outputs, batch_first=True)

        return encoder_outputs, encoder_lens




In [123]:
from torchsummaryX import summary

for data in train_loader:
    x, y, lx, ly = data
    print(x.shape, y.shape, lx.shape, ly.shape)
    break 

encoder = Encoder(15,256)# TODO: Initialize Listener
out, lens = encoder.forward(x, lx)
del encoder

torch.Size([1, 1282, 15]) torch.Size([1, 188]) torch.Size([1]) torch.Size([1])


  x_lens = x_lens//2


In [56]:
# Attention block 
"""
Possible Efficiencies with the attention mechanism (d2l book)
1) In general, it requires that both the query and the key have the 
same vector length, say d, even though this can be addressed easily by replacing 
q⊤k with q⊤Mk where M is a suitably chosen matrix to translate
between both spaces. For now assume that the dimensions match.
2) Adding dropout weights also helps 
"""
class Attention(nn.Module):
    def __init__(self):
        super(Attention, self).__init__()
        
        
    def forward(self, query, key, value, mask):
        energy = torch.bmm(key, query.unsqueeze(2))
        energy = torch.squeeze(energy, dim = 2)

        #What should the mask least value be? 
        energy.masked_fill_(mask, -1e9)

        attention = torch.nn.functional.softmax(energy, dim = 1)
        context = torch.bmm(attention.unsqueeze(1), value)
        
        return context, attention

IndentationError: expected an indented block (904003134.py, line 1)

In [None]:
encoder = Listener(15,256)# TODO: Initialize Listener
encoder = encoder.to(DEVICE)
print(encoder)
summary(encoder, x.to(DEVICE), lx)
del encoder

In [None]:
class Decoder(nn.Module):