<a href="https://colab.research.google.com/github/savitha91/SpeechToImage/blob/master/Seq2Seq_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [65]:
import json
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
import torch.nn as nn

In [None]:
# https://github.com/AladdinPerzon/Machine-Learning-Collection/blob/master/ML/Pytorch/more_advanced/Seq2Seq_attention/seq2seq_attention.py

In [66]:
!pip install transformers



In [67]:
JSON_PATH = '/content/data_old.json'


In [68]:
def addPadding(X):
    lenLi = [len(i) for i in X]
    maxLen = np.max(lenLi)
    zer = np.zeros(10).tolist()
    for i in X:
        if(len(i) < maxLen):
            diff = maxLen - len(i)
            for j in range(diff):
                i.append(zer)

In [69]:
# add <pad> tag at end
def addPaddingY(sentence,maxLen):
  #print(sentence)
  #print("BEFOR.  " , len(sentence))
  if(len(sentence) < maxLen):
    diff = maxLen - len(sentence)
    for j in range(diff):
      sentence = sentence + ' '
  #print("AFTER.  " , len(sentence))
  return sentence

In [70]:
import torch
from transformers import DistilBertTokenizer, DistilBertModel
#Not adding any padding and squeeze
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-cased')
bertModel = DistilBertModel.from_pretrained('distilbert-base-cased')


In [71]:
#Uses BERT Model for Embedding
def load_data(data_path):
    with open(data_path, "r") as fp:
        data = json.load(fp)
    X = data['MFCCs']
    addPadding(X) #101,10 ; 200,10
    X = np.array(X)
    y = []
    y_text = np.array(data["utterances"])
    for sentence in y_text:
      inputs = tokenizer.encode(sentence, add_special_tokens=True,max_length=20, pad_to_max_length=True,truncation=True)
      #ten = torch.tensor(inputs).unsqueeze(0)
      #outputs = bertModel(ten) # returns tuple (embeddings , requires_grad = True)
      #sq_output =  outputs[0].squeeze(0) #removing the batc size
      #arr = sq_output.detach().numpy() # outputs[0] is a tuple , converts this to numpy array
      y.append(inputs)

    y = np.array(y)
    return X,y

In [72]:
def prepare_dataset(data_path, test_size=0.2, validation_size=0.2):
    # load dataset
    X, y = load_data(data_path)
    # create train, validation, test split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size,shuffle=True)
    X_train, X_validation, y_train, y_validation = train_test_split(X_train, y_train, test_size=validation_size,shuffle=True)
    return X_train, y_train, X_validation, y_validation, X_test, y_test

In [73]:
from tensorflow.keras import Model
from keras.layers import Input
from keras.layers import LSTM
from keras.layers import Dense
from keras.utils.vis_utils import plot_model
# configure
X_train, y_train, X_validation, y_validation, X_test, y_test = prepare_dataset(JSON_PATH)


In [74]:
len(X_train) #708
len(X_validation) #178
len(X_test) #222


5

In [75]:
o = tokenizer.decode(y_test[1])
o 
#[CLS] No - no, thank you. [SEP] 

'[CLS] No, I know! [SEP] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD]'

In [76]:
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, p):
        super(Encoder, self).__init__()
        self.dropout = nn.Dropout(p)
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        #self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(input_size, hidden_size, num_layers, dropout=p)

    def forward(self, x):
        # x shape: (seq_length, N) where N is batch size

        #embedding = self.dropout(self.embedding(x))
        # embedding shape: (seq_length, N, embedding_size)

        outputs, (hidden, cell) = self.rnn(x)
        #output (seq_len, batch, hidden_size * num_directions)
        #h_n (num_layers * num_directions, batch, hidden_size)
        #c_n (num_layers * num_directions, batch, hidden_size)
        # outputs shape torch.Size([101, 2, 1024])
        # hidden shape torch.Size([2, 2, 1024])
        # cell shape torch.Size([2, 2, 1024])
        return hidden, cell


In [77]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_size_encoder = 10
hidden_size = 1024  # Needs to be the same for both RNN's
num_layers = 2
enc_dropout = 0.5

encoder_net = Encoder(
    input_size_encoder, hidden_size, num_layers, enc_dropout
).to(device)

In [78]:
encoder_net

Encoder(
  (dropout): Dropout(p=0.5, inplace=False)
  (rnn): LSTM(10, 1024, num_layers=2, dropout=0.5)
)

In [79]:
#As Embedding is done, we need to use Embedding size. If Embedding not done we use vocab size, for which Embeddings are created by the Embedding layer
input_size_decoder = 768
output_size = tokenizer.vocab_size #28996
#decoder_embedding_size = 400
hidden_size = 1024  # Needs to be the same for both RNN's
dec_dropout = 0.5
class Decoder(nn.Module):
    def __init__(
        self, input_size, hidden_size, output_size, num_layers, p
    ):
        super(Decoder, self).__init__()
        self.dropout = nn.Dropout(p)
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        #self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(input_size, hidden_size, num_layers, dropout=p)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden, cell):
        x = x.unsqueeze(0)
        embedX = bertModel(x) # returns tuple (embeddings , requires_grad = True)
        x =  embedX[0]
        # x shape: (N) where N is for batch size, we want it to be (1, N), seq_length
        # is 1 here because we are sending in a single word and not a sentence
        #print("Before squeeze",x.shape)
        #x = x.unsqueeze(0)
        #embedding = self.dropout(self.embedding(x))
        #print("Decoder embedding done")
        #print("decoder embedding shape", embedding.shape)
        # embedding shape: (1, N, embedding_size)

        outputs, (hidden, cell) = self.rnn(x, (hidden, cell))
        # outputs shape: (1, N, hidden_size)
        predictions = self.fc(outputs)
        # predictions shape: (1, N, length_target_vocabulary) to send it to
        # loss function we want it to be (N, length_target_vocabulary) so we're
        # just gonna remove the first dim
        predictions = predictions.squeeze(0)

        return predictions, hidden, cell

In [80]:
decoder_net = Decoder(
    input_size_decoder,
    hidden_size,
    output_size,
    num_layers,
    dec_dropout,
).to(device)

In [81]:
decoder_net

Decoder(
  (dropout): Dropout(p=0.5, inplace=False)
  (rnn): LSTM(768, 1024, num_layers=2, dropout=0.5)
  (fc): Linear(in_features=1024, out_features=28996, bias=True)
)

In [82]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source, target, teacher_force_ratio=0.5):
        batch_size = source.shape[1] 
        target_len = target.shape[0]
        target_vocab_size = tokenizer.vocab_size

        outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(device)
        hidden, cell = self.encoder(source)
        # Grab the first input to the Decoder which will be <SOS> token
        x = target[0]
        # Default LSTM without the flags, return_state = true or return_sequences =True, returns hidden_state of last time-step as output
        # Seq-to-Seq model, usually has Encoder with LSTM , return_state = True , returns output (=hidden state(h)) , hidden state(h), cell state(c) from the last time step
        # Decoder with LSTM , return_state =True, return_sequences = True, returns output=hidden state(h) from each time step, hidden state(h), cell state(c) from the last time step. output of the last time step = hidden state results
        # In pytorch, we dont have these flags, thus, to get the hidden/cell state from each time-step, we take each word, pass it throight decoder LSTM and get the hidden/cell state
            # Use previous hidden, cell as context from encoder at start
            output, hidden, cell = self.decoder(x, hidden, cell)
            # Store next output prediction
            outputs[t] = output
            # Get the best word the Decoder predicted (index in the vocabulary)
            best_guess = output.argmax(1)
            # With probability of teacher_force_ratio we take the actual next word
            # otherwise we take the word that the Decoder predicted it to be.
            # Teacher Forcing is used so that the model gets used to seeing
            # similar inputs at training and testing time, if teacher forcing is 1
            # then inputs at test time might be completely different than what the
            # network is used to. This was a long comment.
            #Actual next word already has embedding. Next word predicted by the decoder, doesnt have embeddings
            if random.random() < teacher_force_ratio:
              x = target[t] #Actual next word
            else:
              x = best_guess
            
        return outputs

In [83]:
load_model = False
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [84]:
import torch.optim as optim
learning_rate = 0.001
model = Seq2Seq(encoder_net, decoder_net).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

pad_idx = 0
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)

In [85]:
y_train.shape

(16, 20)

In [86]:
tensor_x = torch.tensor(X_train)
tensor_y = torch.tensor(y_train)

tensor_xval = torch.tensor(X_validation)
tensor_yval = torch.tensor(y_validation)

tensor_xtest = torch.tensor(X_test)
tensor_ytest = torch.tensor(y_test)

In [87]:
from torch.utils.data import TensorDataset, DataLoader
import random

train_set = TensorDataset(tensor_x,tensor_y) # create your datset
train_loader = DataLoader(train_set,batch_size=2)

val_set = TensorDataset(tensor_xval,tensor_yval) # create your datset
val_loader = DataLoader(val_set,batch_size=2)

test_set = TensorDataset(tensor_xtest,tensor_ytest) # create your datset
test_loader = DataLoader(test_set,batch_size=2)

In [88]:
def save_checkpoint(state, filename="my_checkpoint.pth.tar"):
    print("=> Saving checkpoint")
    torch.save(state, filename)


def load_checkpoint(checkpoint, model, optimizer):
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint["state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer"])

In [89]:
#X_sample = X_test[3]
# Convert to tensor,

def recognizeSpeech(model, X,  device, max_length):
  tensor_sample = torch.tensor(X).unsqueeze(1).to(device)
  with torch.no_grad():
        hidden, cell = model.encoder(tensor_sample.float())
  outputs = [101]
  for _ in range(max_length):
    previous_word = torch.LongTensor([outputs[-1]]).to(device)
    with torch.no_grad():
      output, hidden, cell = model.decoder(previous_word, hidden, cell)
      best_guess = output.argmax(1).item()

    outputs.append(best_guess)

    # Model predicts it's the end of the sentence
    if output.argmax(1).item() == 102:
      break


  translated_sentence = [tokenizer.decode(idx) for idx in outputs]
  # remove start token
  return translated_sentence[1:]

In [90]:
#Shape of y, requires grad for decoder ???
num_epochs = 10
for epoch in range(num_epochs):
    print(f"[Epoch {epoch} / {num_epochs}]")

    checkpoint = {"state_dict": model.state_dict(), "optimizer": optimizer.state_dict()}
    #save_checkpoint(checkpoint)
    
    model.eval()
    recognizedText = recognizeSpeech(
        model, X_test[0],  device, max_length=20
    )

    print(f"Recognized Text: \n {recognizedText}")   

    model.train()

    for batch_idx, (X_train,y_train) in enumerate(train_loader):
        # Get input and targets and get to cuda
        inp_data = X_train.to(device)
        target = y_train.to(device)
        # Forward prop
        inp_data = torch.transpose(inp_data, 0, 1)
        target = torch.transpose(target, 0, 1) #[20,2] : 2 examples, each example having 20 words. Col0 has 20 words indexes
        output = model(inp_data.float(), target) #[20, 2, 28996] : 2 examples, each example having 20 words
        # Output is of shape (trg_len, batch_size, output_dim) but Cross Entropy Loss
        # doesn't take input in that form. For example if we have MNIST we want to have
        # output to be: (N, 10) and targets just (N). Here we can view it in a similar
        # way that we have output_words * batch_size that we want to send in into
        # our cost function, so we need to do some reshapin. While we're at it
        # Let's also remove the start token while we're at it
        output = output[1:].reshape(-1, output.shape[2])
        target = target[1:].reshape(-1)
        loss = criterion(output, target)
        # Back prop
        loss.backward()
        # Clip to avoid exploding gradient issues, makes sure grads are
        # within a healthy range
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)

        # Gradient descent step
        optimizer.step()
        optimizer.zero_grad()
        # Plot to tensorboard
       

[Epoch 0 / 10]
outputs shape torch.Size([101, 1, 1024])
hidden shape torch.Size([2, 1, 1024])
cell shape torch.Size([2, 1, 1024])
Recognized Text: 
 ['P r i n c i p l e s', 'C o n t e s t', 'C o n t e s t', 'a u c t i o n', 'a u c t i o n', 'a u c t i o n', 'L a z a r u s', 's t a m p', 's n e a k i n g', 's n e a k i n g', 'P a r a c h u t e', 'c o c k t a i l', 'R e c e n t l y', 's n e a k i n g', 'P a r a c h u t e', 'R e c e n t l y', 'R e c e n t l y', 'R e c e n t l y', 's n e a k i n g', 'r i v a l']
outputs shape torch.Size([101, 2, 1024])
hidden shape torch.Size([2, 2, 1024])
cell shape torch.Size([2, 2, 1024])
outputs shape torch.Size([101, 2, 1024])
hidden shape torch.Size([2, 2, 1024])
cell shape torch.Size([2, 2, 1024])
outputs shape torch.Size([101, 2, 1024])
hidden shape torch.Size([2, 2, 1024])
cell shape torch.Size([2, 2, 1024])
outputs shape torch.Size([101, 2, 1024])
hidden shape torch.Size([2, 2, 1024])
cell shape torch.Size([2, 2, 1024])
outputs shape torch.Size([

KeyboardInterrupt: ignored