In [1]:
import re

class Lang():
    def __init__(self, name):
        self.word2index = {'<START>': 0, '<END>': 1, '<UNK>': 2}
        self.index2word = {0: '<START>', 1: '<END>', 2: '<UNK>'}
        self.word_freq = {}
        self.n_words = 3
        
    def add_sentence(self, sentence):        
        for word in sentence.split(' '):            
            self.add_word(word)
    
    def add_word(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.index2word[self.n_words] = word
            self.word_freq[word] = 1
            self.n_words += 1
        else:
            self.word_freq[word] += 1

def clean_sentence(sentence):
    sentence = re.sub(r'([!.?,।])', r' \1', sentence)
    sentence = re.sub(r'\n', r'', sentence)
    return sentence

In [2]:
path = '../datasets/english-hindi-translation/'
train_set = 'train'
test_set = 'test'

source = open(path+'source_' + train_set + '.txt', encoding='utf8')
source_lang = Lang('eng')

target = open(path+'target_' + train_set + '.txt', encoding='utf8')
target_lang = Lang('hin')

pairs_train = []
for src, tar in zip(source.readlines(), target.readlines()):
    src = clean_sentence(src)
    tar = clean_sentence(tar)
    source_lang.add_sentence(src)
    target_lang.add_sentence(tar)
    pairs_train.append((src, tar))

source = open(path+'source_' + test_set + '.txt', encoding='utf8')
target = open(path+'target_' + test_set + '.txt', encoding='utf8')

pairs_test = []
for src, tar in zip(source.readlines(), target.readlines()):
    pairs_test.append((src, tar))

print('Total Training Data: ', len(pairs_train))
print('Total Testing Data: ', len(pairs_test))

Total Training Data:  1659083
Total Testing Data:  2507


In [None]:
import torch.nn as nn
import torch

class EncoderRNN(nn.Module):
    def __init__(self, vocab_dim, embedding_dim, hidden_dim):
        super().__init__();
        self.hidden_size = hidden_dim
        self.embedding = nn.Embedding(vocab_dim, embedding_dim)
        self.rnn = nn.GRU(embedding_dim, hidden_dim)
    
    def forward(self, line, hidden):
        embedded = self.embedding(line)
        output, hidden = self.rnn(embedded, hidden)
        return output, hidden
    
    def init_hidden(self):
        return torch.zeros(1, 1, self.hidden_size)
    
class DecoderRNN(nn.Module):
    def __init__(self, vocab_dim, embedding_dim, hidden_dim):
        super().__init__();
        self.hidden_size = hidden_dim
        self.embedding = nn.Embedding(vocab_dim, embedding_dim)
        self.relu = nn.ReLU()
        self.rnn = nn.GRU(embedding_dim, hidden_dim)
        self.linear = nn.Linear(hidden_dim, vocab_dim)
        self.softmax = nn.LogSoftmax(dim=1)        
    
    # Need to process word by word to implement teacher forcing
    def forward(self, word, hidden):
        embedded = self.embedding(word)
        activated = self.relu(embedded)
        output, hidden = self.rnn(activated, hidden)
        output = self.softmax(self.linear(output[0]))
        return output, hidden

In [None]:
def sentence2tensor_pair(sentence_pair):
    source_tensor = [source_lang.word2index[x] if x in source_lang.word2index else source_lang.word2index['<UNK>'] for x in sentence_pair[0].split(' ')]
    source_tensor.append(source_lang.word2index['<END>'])
    source_tensor = torch.tensor(source_tensor)
    target_tensor = [target_lang.word2index[x] if x in target_lang.word2index else target_lang.word2index['<UNK>'] for x in sentence_pair[1].split(' ')]
    target_tensor.append(target_lang.word2index['<END>'])
    target_tensor = torch.tensor(target_tensor)
    return source_tensor, target_tensor

for pair in pairs_train[0:2]:
    print(sentence2tensor_pair(pair))

In [None]:
import torch.optim as optim

encoder = EncoderRNN(vocab_dim = source_lang.n_words,
                     embedding_dim = 100,
                     hidden_dim = 50)
decoder = DecoderRNN(vocab_dim = target_lang.n_words,
                     embedding_dim = 100,
                     hidden_dim = 50)

encoder_optimizer = optim.SGD(encoder.parameters(), lr=0.005);
decoder_optimizer = optim.SGD(decoder.parameters(), lr=0.005);
lossfn = nn.NLLLoss();

In [None]:
tensor_pair = sentence2tensor_pair(pairs_train[0])

hidden = encoder.init_hidden()
input_ = tensor_pair[0].unsqueeze(1)
print(input_.shape)
output, hidden = encoder(input_, hidden)
print(hidden.shape)
print(output.shape)

In [None]:
input_ = torch.tensor([target_lang.word2index['<START>']]).unsqueeze(1)
print(input_)
output, hidden = decoder(input_, hidden)
print(hidden.shape)
print(output.shape)

In [None]:
import random
teacher_forcing_ratio = 0.5

def train(tensor_pair):
    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False
    
    encoder.train()
    decoder.train()
    
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()    
    
    encoder_hidden = encoder.init_hidden()
    encoder_input = tensor_pair[0].unsqueeze(1)
    _, encoder_hidden = encoder(encoder_input, encoder_hidden)
    
    decoder_hidden = encoder_hidden
    decoder_input = torch.tensor([target_lang.word2index['<START>']]).unsqueeze(1)   
    
    loss = 0
    for target_word in tensor_pair[1].unsqueeze(1):        
        decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
        loss += lossfn(decoder_output, target_word)
        _, pred_word = decoder_output.topk(1)
        
        if pred_word[0][0].item() == target_lang.word2index['<END>']:
            break
            
        if use_teacher_forcing:
            decoder_input = target_word.unsqueeze(1)
        else:
            decoder_input = pred_word
    
    loss.backward()
    
    encoder_optimizer.step()
    decoder_optimizer.step()  

    return loss.item()

In [None]:
from tqdm import tqdm
loss_track = []
epochs = 5
for epoch in range(epochs):
    avg_loss = 0
    for pair in tqdm(pairs_train):
        tensor_pair = sentence2tensor_pair(pair)
        avg_loss += train(tensor_pair)
    avg_loss /= len(pairs_train)
    print('Epoch: ', epoch, '| Loss: ', avg_loss)
    loss_track.append(avg_loss)

In [None]:
import matplotlib.pyplot as plt

plt.figure()
plt.plot(loss_track)

In [None]:
def test(tensor_pair):
    encoder.eval()
    decoder.eval()
    
    encoder_hidden = encoder.init_hidden()
    encoder_input = tensor_pair[0].unsqueeze(1)
    _, encoder_hidden = encoder(encoder_input, encoder_hidden)
    
    decoder_hidden = encoder_hidden
    decoder_input = torch.tensor([target_lang.word2index['<START>']]).unsqueeze(1)
    
    loss = 0
    max_len = 50
    translation = []
    for k in range(max_len):    
        decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
        _, pred_word = decoder_output.topk(1)
        
        translation.append(pred_word[0][0].item())
        if pred_word[0][0].item() == target_lang.word2index['<END>']:
            break
        decoder_input = pred_word
    
    return translation

def get_translation(text):
    tensor_pair = sentence2tensor_pair((text,''))
    translation = test(tensor_pair)
    return ' '.join([target_lang.index2word[idx] for idx in translation])

In [None]:
for pair in pairs_test[0:5]:
    print('Source: ', pair[0])
    print('Target: ', pair[1])
    tensor_pair = sentence2tensor_pair(pair)
    translation = test(tensor_pair)
    res = ' '.join([target_lang.index2word[idx] for idx in translation])
    print('Result: ', res)
    print('\n----------------------------------------------------------------------------------\n')