In [108]:
import torch
import torch.nn as nn
from torch import optim
from torch.functional import F
import numpy as np

from spacy.lang.en import English
from spacy.lang.fr import French

In [109]:
from collections import Counter

In [110]:
import random

In [126]:
class Encoder(nn.Module):
    
    def __init__(self, input_size, hidden_size, n_layers=1, prob=0):
        super(Encoder, self).__init__()
        
        self.hidden_size = hidden_size
        self.prob = prob
        self.n_layers = n_layers
        
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers=self.n_layers, dropout=prob, batch_first=True)
        
    def forward(self, inputs, hidden):
        embedded = self.embedding(inputs)
        output, hidden = self.lstm(embedded, hidden)
        
        return output, hidden
    
    def init_hidden(self, batch_size=1):
        return (torch.zeros(self.n_layers, batch_size, self.hidden_size),
            torch.zeros(self.n_layers, batch_size, self.hidden_size))

In [112]:
class DecoderBahdanau(nn.Module):
    def __init__(self, hidden_size, output_size, n_layers=1, prob=0):
        super(DecoderBahdanau, self).__init__()
        
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers
        self.dropout_prob = prob
        
        # embedd decoder outputs
        self.embedding = nn.Embedding(self.output_size, self.hidden_size)
        
        self.fc_hidden = nn.Linear(self.hidden_size, self.hidden_size, bias=False)
        self.fc_encoder = nn.Linear(self.hidden_size, self.hidden_size, bias=False)
        
        self.weight = nn.Parameter(torch.FloatTensor(1, hidden_size)) # alpha ij ?
        self.attn_combine = nn.Linear(2*self.hidden_size, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_prob)
        self.lstm = nn.LSTM(2*self.hidden_size, self.hidden_size, batch_first=True)
        self.classifier = nn.Linear(self.hidden_size, self.output_size)
        
    def forward(self, inputs, hidden, encoder_outputs):
        encoder_outputs = encoder_outputs.squeeze()
        embedded_decoder_inputs = self.embedding(inputs).view(1, -1)
        embedded_decoder_inputs = self.dropout(embedded_decoder_inputs)
        
        # alignment score
        x = torch.tanh(self.fc_hidden(hidden[0]) + self.fc_encoder(encoder_outputs))
        alignment_score = x.bmm(self.weight.squeeze(2))
        
        # softmaxing alignment scores
        attn_weights = F.softmax(alignment_score.view(1, -1), dim=1)
        
        # get context vector
        context_vector = torch.bmm(attn_weights.unsqueeze(0), encoder_outputs.unsqueeze(0))
        # concat context vector & decoder embedded output/input into a single hidden state tensor
        output = torch.cat((embedded_decoder_inputs, context_vector[0]), 1).unsqueeze(0)
        output, hidden = self.lstm(output, hidden)
        output = F.log_softmax(self.classifier(output[0]), dim=1)
        
        return output, hidden, attn_weights

In [113]:
with open("eng-fra.txt", "r+") as file:
    fra = [x[:-1] for x in file.readlines()] # remove \n at the end of each line
    

In [114]:
en, fr = [], []

for line in fra:
    en.append(line.split('\t')[0])
    fr.append(line.split('\t')[1])

In [115]:
# length of training set
len_train_examples = 100

spacy_en, spacy_fr = English(), French()

In [116]:
en_words, fr_words = Counter(), Counter()

en_inputs, fr_inputs = [], []

In [117]:
# tokenization
for i in range(len_train_examples):
    en_tokens = spacy_en(en[i])
    fr_tokens = spacy_fr(fr[i])
    
    if len(en_tokens) == 0 or len(fr_tokens) == 0: # space or tab
        continue
    for token in en_tokens:
        en_words.update([token.text.lower()])
    en_inputs.append([token.text.lower() for token in en_tokens] + ['_EOS'])
    
    for token in fr_tokens:
        fr_words.update([token.text.lower()])
    fr_inputs.append([token.text.lower() for token in fr_tokens] + ['_EOS'])


In [118]:
en_words = ['_SOS', '_EOS', '_UNK'] + sorted(en_words, key=en_words.get, reverse=True)
fr_words = ['_SOS', '_EOS', '_UNK'] + sorted(fr_words, key=fr_words.get, reverse=True)

In [119]:
en_i2w = {index:word for index, word in enumerate(en_words)}
fr_i2w = {index:word for index, word in enumerate(fr_words)}

en_w2i = {word:index for index, word in enumerate(en_words)}
fr_w2i = {word:index for index, word in enumerate(fr_words)}

In [120]:
# convert Enlish & French sentences to their token indices
for i in range(len(en_inputs)):
    en_sentence = en_inputs[i]
    fr_sentence = fr_inputs[i]
    
    en_inputs[i] = [en_w2i[word] for word in en_sentence]
    fr_inputs[i] = [fr_w2i[word] for word in fr_sentence]


In [132]:
# define hyperparameters
lr = 0.001
hidden_size = 256
epoches = 3
teacher_forcing_prob = 0.5

In [133]:
encoder = Encoder(len(en_words), hidden_size)

In [134]:
decoder = DecoderBahdanau(hidden_size, len(fr_words))

In [135]:
encoder_optimizer = optim.Adam(encoder.parameters(), lr=lr)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=lr)

In [136]:
encoder.train()
decoder.train()

for ep in range(epoches):
    avg_loss = 0
    
    for i, sentence in enumerate(en_inputs):
        loss = 0
        
        h = encoder.init_hidden()
        
        # clear gradients
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()
        
        # preprocess
        inp = torch.tensor(sentence).squeeze(0)
        encoder_outputs, h = encoder(inp, h)
        
        # first decodee input is '_SOS'
        decoder_input = torch.tensor([en_w2i['_SOS']])
        # first decoder hidden state is the last encoder hidden state
        decoder_hidden = h
        
        output = []
        teacher_forcing = True if random.random() < teacher_forcing_prob else False
        for ii in range(len(fr_inputs[i])):
            decoder_output, decoder_hidden, attn_weights = decoder(decoder_input, decoder_hidden, encoder_outputs)
            
            top_value, top_index = decoder_output.topk(1)
            
            if teacher_forcing:
                decoder_input = torch.tensor([fr_inputs[i][ii]])
                
            else:
                decoder_input = torch.tensor([top_index.item()])
                
            output.append(top_index.item())
            
            loss += F.nll_loss(decoder_output.view(1, -1), torch.tensor([fr_inputs[i][ii]]))
            
        # backprop
        loss.backward()
        encoder_optimizer.step()
        decoder_optimizer.step()
        
        avg_loss += loss.item() / len(en_inputs)
        
        
        
        

RuntimeError: input must have 3 dimensions, got 2

In [131]:
len(encoder.init_hidden())

2