In [3]:
# import libraries
import torch
from torchtext.datasets import BABI20
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
import torch.nn.init as I
import numpy as np
from torch.optim.lr_scheduler import StepLR
import re
from functools import reduce 
import random
from sklearn.model_selection import train_test_split
import preprocessing

In [3]:
def merge_question_story(data):
    """
    This function concatenates the stories and the query separting them
    by a special symbol Start of Query (SOQ)
    Input: List of tuples of story, query, answer
    Output: Merged list of story and answer
    """
    merged_data = [(s+['SOQ']+q, ["SOS"]+a) for s,q,a in data]
    return merged_data
    
# train_stories, test_stories = train_test_split(all_stories, test_size=0.2)

SOS_token = 0
EOS_token = 1
PAD_token = 2


class Lang:
    def __init__(self):
        self.word2index = {}
        self.word2count = {}
        self.index2word = {0: "SOS", 1: "EOS", 2:"PAD"} # special symbols
        self.n_words = 3  # Count SOS and EOS
        self.word2index['SOS'] = 0
        self.word2index['EOS'] = 1
        self.word2index['PAD'] = 2
        self.word2count['SOS'] = 1
        self.word2count['EOS'] = 1
        self.word2count['PAD'] = 1

    def addSentence(self, sentence):
        # add sentence to the voacb
        for word in sentence:
            self.addWord(word)

    def addWord(self, word):
        # add words to vocab and create word to index and index to word
        # dictionaries
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1
            
def readLangs(all_stories):
    pairs = merge_question_story(all_stories)

    # Reverse pairs, make Lang instances
    lang = Lang()
    return lang, pairs

def prepareData(all_stories):
    lang, pairs = readLangs(all_stories)
    for pair in pairs:
        lang.addSentence(pair[0])
        lang.addSentence(pair[1])
    maxlen = 0
    for pair in pairs:
        sentence = pair[0]
        if len(sentence)>maxlen:
            maxlen = len(sentence)
    return lang, pairs, maxlen

def get_data_for_task(n):
    """
    This function reads the file for given task, tokeizes, pads
    and creates train and test dataloaders
    Input: task number
    Output: rain loder, test loader, lang class object
    """
    with open(f'./tasks/task_{n}.txt') as f:
        all_stories = get_stories(f, flatten=True)
    lang, pairs ,maxlen = prepareData(all_stories)
    train_pairs, test_pairs = train_test_split(pairs, test_size=0.2) 
    train_pairs = [tensorsFromPair(lang, pair, maxlen) for pair in train_pairs]
    test_pairs = [tensorsFromPair(lang, pair, maxlen) for pair in test_pairs]
    batch_size = 64

    train_loader = torch.utils.data.DataLoader(train_pairs, 
        batch_size=batch_size, shuffle=True)

    test_loader = torch.utils.data.DataLoader(test_pairs, 
        batch_size=batch_size, shuffle=True)

    return train_loader, test_loader, lang

In [4]:
def indexesFromSentence(lang, sentence):
    return [lang.word2index[word] for word in sentence]

def tensorFromSentence(lang, sentence, maxlen):
    # this fucntion creates tensors from given sentence by tokenizing and adding
    # special symbols
    indexes = indexesFromSentence(lang, sentence)
    indexes.append(EOS_token)
    indexes = indexes + [PAD_token]*(maxlen-len(sentence))
    return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)

def tensorsFromPair(lang, pair, maxlen):
    input_tensor = tensorFromSentence(lang, pair[0], maxlen)
    target_tensor = tensorFromSentence(lang, pair[1], 1)
    return (input_tensor, target_tensor)

In [5]:
class EncoderLSTM(nn.Module):
    """
    This class defines the encoder class for the mdoel.
    """
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, p):
        super(EncoderLSTM, self).__init__()

        # Size of the one hot vectors that will be the input to the encoder
        self.input_size = input_size

        # Output size of the word embedding NN
        self.embedding_size = embedding_size

        # Dimension of the NN's inside the lstm cell/ (hs,cs)'s dimension.
        self.hidden_size = hidden_size

        # Number of layers in the lstm
        self.num_layers = num_layers

        # Regularization parameter
        self.dropout = nn.Dropout(p)
        self.tag = True
        self.embedding = nn.Embedding(self.input_size, self.embedding_size)
        self.LSTM = nn.LSTM(self.embedding_size, hidden_size, num_layers, dropout = p)

 
    def forward(self, x):
        embedding = self.dropout(self.embedding(x))
        outputs, (hidden_state, cell_state) = self.LSTM(embedding)

        return hidden_state, cell_state


In [6]:
class DecoderLSTM(nn.Module):
    """
    This class defines the decoder class for the mdoel.
    """
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, p, output_size):
        super(DecoderLSTM, self).__init__()

        # Size of the one hot vectors that will be the input to the encoder
        self.input_size = input_size

        # Output size of the word embedding NN
        self.embedding_size = embedding_size

        # Dimension of the NN's inside the lstm cell/ (hs,cs)'s dimension.
        self.hidden_size = hidden_size

        # Number of layers in the lstm
        self.num_layers = num_layers

        # Size of the one hot vectors that will be the output to the encoder (Vocab Size)
        self.output_size = output_size

        # Regularization parameter
        self.dropout = nn.Dropout(p)
        self.tag = True
        self.embedding = nn.Embedding(self.input_size, self.embedding_size)
        self.LSTM = nn.LSTM(self.embedding_size, hidden_size, num_layers, dropout = p)
        self.fc = nn.Linear(self.hidden_size, self.output_size)

  
    def forward(self, x, hidden_state, cell_state):
        x = x.unsqueeze(0)
        embedding = self.dropout(self.embedding(x))
        outputs, (hidden_state, cell_state) = self.LSTM(embedding, (hidden_state, cell_state))
        predictions = self.fc(outputs)
        predictions = predictions.squeeze(0)
        return predictions, hidden_state, cell_state

In [7]:
class Seq2Seq(nn.Module):
    """
    This class defines sequence to sequence model. It creates the encoder and decoder. It 
    passes the input to the encoder, collects the ouput from encoder and passes it to the 
    decoder. The model uses teacher force to decide which input to use in the decoder.
    """
    def __init__(self, Encoder_LSTM, Decoder_LSTM):
        super(Seq2Seq, self).__init__()
        self.Encoder_LSTM = Encoder_LSTM
        self.Decoder_LSTM = Decoder_LSTM

    def forward(self, source, target, tfr=0.5):
        batch_size = source.shape[1]
        target_len = target.shape[0]
        target_vocab_size = lang.n_words
        outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(device)
        hidden_state_encoder, cell_state_encoder = self.Encoder_LSTM(source)

        x = target[0] 

    for i in range(1, target_len-1):  
        output, hidden_state_decoder, cell_state_decoder = self.Decoder_LSTM(x, hidden_state_encoder, cell_state_encoder)
        outputs[i] = output
        best_guess = output.argmax(1) # 0th dimension is batch size, 1st dimension is word embedding
        x = target[i] if random.random() < tfr else best_guess # Either pass the next word correctly from the dataset or use the earlier predicted word

    return outputs


In [8]:
batch_size = 64
learning_rate = 0.005
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


tasks = [9]
for task in tasks:
    print(f"Task {task}:")
    train_loader, test_loader, lang = get_data_for_task(task)
    input_size_encoder = lang.n_words
    encoder_embedding_size = 10
    hidden_size = 10
    num_layers = 1
    encoder_dropout = float(0.3)

    encoder_lstm = EncoderLSTM(input_size_encoder, encoder_embedding_size,
                               hidden_size, num_layers, encoder_dropout).to(device)

    input_size_decoder = lang.n_words
    decoder_embedding_size = 10
    hidden_size = 10
    num_layers = 1
    decoder_dropout = float(0.3)
    output_size = lang.n_words

    decoder_lstm = DecoderLSTM(input_size_decoder, decoder_embedding_size,
                               hidden_size, num_layers, decoder_dropout, output_size).to(device)
    
    step = 0

    model = Seq2Seq(encoder_lstm, decoder_lstm).to(device)
    optimizer = optim.SGD(model.parameters(), lr=learning_rate)

    pad_idx = lang.word2index['PAD']
    criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)


    epoch_loss = 0.0
    num_epochs = 100

    for epoch in range(num_epochs):
        model.train(True)
        for batch_idx, batch in enumerate(train_loader):

            input = batch[0].to(device)
            target = batch[1].to(device)

            # Pass the input and target for model's forward method
            output = model(input.squeeze().permute(1,0), target.squeeze().permute(1,0))
            output = output[1:2].permute(1,0,2).reshape(-1, output.shape[2])
            target = target.permute(1,0,2)
            target = target[1:2].reshape(-1)

            # Clear the accumulating gradients
            optimizer.zero_grad()

            loss = criterion(output, target)

            # Calculate the gradients for weights & biases using back-propagation
            loss.backward()

            # Clip the gradient value is it exceeds > 1
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)

            # Update the weights values using the gradients we calculated using bp 
            optimizer.step()
            step += 1
            epoch_loss += loss.item()

    print("\tLoss - {}".format(np.round(loss.item(), 3)))
    test_accuracy = []
    for batch_idx, batch in enumerate(test_loader):
        input = batch[0].to(device)
        target = batch[1].to(device)

        output = model(input.squeeze().permute(1,0), target.squeeze().permute(1,0))
        output_len = output.shape[0]
        output = output[1:].reshape(output_len-1, -1, output.shape[2]).cpu().detach().numpy()
        predicted_ans = np.argmax(output, axis=2)
    #     print(output)
    #     print(predicted_ans.shape)

        target = target.permute(1,0,2)[1:].cpu().detach().numpy().squeeze()

        test_accuracy += list(target[0,:]==predicted_ans[0,:])
    print(f"\tTest aacuracy: {np.round(np.mean(np.hstack(test_accuracy))*100, 2)}%")



Task 9:


  "num_layers={}".format(dropout, num_layers))


	Loss - 0.725
	Test aacuracy: 63.1%
