In [3]:
#Make a config file in which you choose the dataset for training

import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.datasets import Multi30k#IWSLT
from torchtext.data import Field, BucketIterator
import numpy as np
import spacy
import random

import warnings
warnings.filterwarnings("ignore")

In [4]:
 torch.cuda.is_available()

True

In [5]:
import spacy.cli
import en_core_web_sm
import de_core_news_sm


spacy.cli.download("en_core_web_sm")
spacy.cli.download("de_core_news_sm")


spacy_ger = de_core_news_sm.load()
spacy_eng = en_core_web_sm.load()

✔ Download and installation successful
You can now load the model via spacy.load('en_core_web_sm')
✔ Download and installation successful
You can now load the model via spacy.load('de_core_news_sm')


In [6]:
def tokenizer_de(text):
  return [tok.text for tok in spacy_ger.tokenizer(text)]

def tokenizer_eng(text):
  return [tok.text for tok in spacy_eng.tokenizer(text)]

In [7]:
tokenizer_eng("I ate my friends's O'neal apple yesterday")

['I', 'ate', 'my', 'friends', "'s", "O'neal", 'apple', 'yesterday']

In [8]:
german = Field(tokenize=tokenizer_de, lower=True, init_token="<sos>", eos_token="<eos>")

english = Field(
    tokenize=tokenizer_eng, lower=True, init_token="<sos>", eos_token="<eos>"
)

In [9]:
def f(x):    
    return len(vars(x)['src']) <= 50 and len(vars(x)['trg']) <= 50

train_data, valid_data, test_data = Multi30k.splits(
        exts=('.de', '.en'), fields=(german, english), 
        filter_pred=f)

downloading training.tar.gz


.data\multi30k\training.tar.gz: 100%|██████████████████████████████████████████████| 1.21M/1.21M [00:01<00:00, 669kB/s]


downloading validation.tar.gz


.data\multi30k\validation.tar.gz: 100%|████████████████████████████████████████████| 46.3k/46.3k [00:00<00:00, 216kB/s]


downloading mmt_task1_test2016.tar.gz


.data\multi30k\mmt_task1_test2016.tar.gz: 100%|████████████████████████████████████| 66.2k/66.2k [00:00<00:00, 150kB/s]


In [10]:
german.build_vocab(train_data, max_size=10000, min_freq=2)
english.build_vocab(train_data, max_size=10000, min_freq=2)

In [11]:
#vocab.freqs in a python counter datatype
print("Five most common words in the dataset: " + str(english.vocab.freqs.most_common(5)))

Five most common words in the dataset: [('a', 49165), ('.', 27623), ('in', 14886), ('the', 10955), ('on', 8035)]


In [12]:

class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, p):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, bidirectional=True)

        self.fc_hidden = nn.Linear(hidden_size * 2, hidden_size)
        self.fc_cell = nn.Linear(hidden_size * 2, hidden_size)
        self.dropout = nn.Dropout(p)
        
        self.energy = nn.Linear(hidden_size * 3, 1)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=0)


    def forward(self, x):
        # x: (seq_length, N) where N is batch size

        embedding = self.dropout(self.embedding(x))
        # embedding shape: (seq_length, N, embedding_size)

        encoder_states, (hidden, cell) = self.rnn(embedding)
        # outputs shape: (seq_length, N, hidden_size*2)

        # Use forward, backward cells and hidden through a linear layer
        # so that it can be input to the decoder which is not bidirectional
        # Also using index slicing ([idx:idx+1]) to keep the dimension
        hidden = self.fc_hidden(torch.cat((hidden[0:1], hidden[1:2]), dim=2))
        cell = self.fc_cell(torch.cat((cell[0:1], cell[1:2]), dim=2))
        
        

        # encoder_states: (seq_length, N, hidden_size*2)
        sequence_length = encoder_states.shape[0]
        h_reshaped = hidden.repeat(sequence_length, 1, 1)
        # h_reshaped: (seq_length, N, hidden_size)

        energy = self.relu(self.energy(torch.cat((h_reshaped, encoder_states), dim=2)))
        # energy: (seq_length, N, 1)

        attention = self.softmax(energy)
        # attention: (seq_length, N, 1)

        # attention: (seq_length, N, 1), snk
        # encoder_states: (seq_length, N, hidden_size*2), snl
        # we want context_vector: (1, N, hidden_size*2), i.e knl
        context_vector = torch.einsum("snk,snl->knl", attention, encoder_states)
        
        
        
        
        return encoder_states, hidden, cell, context_vector


In [13]:
class Decoder(nn.Module):
    def __init__(
        self, input_size, embedding_size, hidden_size, output_size, num_layers, p
    ):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(hidden_size * 2 + embedding_size, hidden_size, num_layers)

        self.fc = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(p)

    def forward(self, x, encoder_states, hidden, cell, context_vector):
        x = x.unsqueeze(0)
        # x: (1, N) where N is the batch size

        embedding = self.dropout(self.embedding(x))
        # embedding shape: (1, N, embedding_size)
        

        rnn_input = torch.cat((context_vector, embedding), dim=2)
        # rnn_input: (1, N, hidden_size*2 + embedding_size)

        outputs, (hidden, cell) = self.rnn(rnn_input, (hidden, cell))
        # outputs shape: (1, N, hidden_size)

        predictions = self.fc(outputs).squeeze(0)
        # predictions: (N, hidden_size)

        return predictions, hidden, cell

In [14]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source, target, teacher_force_ratio=0.5):
        batch_size = source.shape[1]
        target_len = target.shape[0]
        target_vocab_size = len(english.vocab)

        outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(device)
        encoder_states, hidden, cell, context_vector = self.encoder(source)

        # First input will be <SOS> token
        x = target[0]

        for t in range(1, target_len):
            # At every time step use encoder_states and update hidden, cell
            output, hidden, cell = self.decoder(x, encoder_states, hidden, cell, context_vector)

            # Store prediction for current time step
            outputs[t] = output

            # Get the best word the Decoder predicted (index in the vocabulary)
            best_guess = output.argmax(1)

            # With probability of teacher_force_ratio we take the actual next word
            # otherwise we take the word that the Decoder predicted it to be.
            # Teacher Forcing is used so that the model gets used to seeing
            # similar inputs at training and testing time, if teacher forcing is 1
            # then inputs at test time might be completely different than what the
            # network is used to. This was a long comment.
            x = target[t] if random.random() < teacher_force_ratio else best_guess

        return outputs

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

#Training Hyperparameters
num_epochs = 100
lr = 3e-4
batch_size = 256
d_model = 256

input_size_encoder = len(german.vocab)
input_size_decoder = len(english.vocab)
output_size = len(english.vocab)

encoder_embedding_size = d_model
decoder_embedding_size = d_model
hidden_size = d_model*6

num_layers = 1
dropout = 0.1

In [None]:
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size=batch_size,
    sort_within_batch=True,
    sort_key=lambda x: len(x.src),
    device=device,
)

In [None]:
encoder_net = Encoder(input_size_encoder, encoder_embedding_size, 
                      hidden_size, num_layers, dropout).to(device)

decoder_net = Decoder(input_size_decoder, decoder_embedding_size, 
                      hidden_size, output_size, num_layers, dropout).to(device)

In [None]:
model = Seq2Seq(encoder_net, decoder_net).to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)

In [None]:
en_pad_idx = english.vocab.stoi['<pad>']
de_pad_idx = german.vocab.stoi['<pad>']

criterion = nn.CrossEntropyLoss(ignore_index=en_pad_idx)

In [None]:
import sys 

def run_epoch():
    
    model.train()
    total_loss = 0
    
    for batch_idx, batch in enumerate(train_iterator):
        
        inp_data = batch.src.to(device)
        target = batch.trg.to(device)

        output = model(inp_data, target)
        output = output[1:].reshape(-1, output.shape[2])
        target = target[1:].reshape(-1)

        optimizer.zero_grad()
        loss = criterion(output, target)

        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
        optimizer.step()
        
        total_loss += loss.item()
        
        sys.stdout.write("\r %d" % (batch_idx))
        sys.stdout.flush()
        
    return total_loss / len(train_iterator)

In [None]:
def run_validation():
    
    model.train()
    total_loss = 0
    
    for batch_idx, batch in enumerate(valid_iterator):
        
        inp_data = batch.src.to(device)
        target = batch.trg.to(device)

        output = model(inp_data, target)
        output = output[1:].reshape(-1, output.shape[2])
        target = target[1:].reshape(-1)

        loss = criterion(output, target)
        total_loss += loss.item()
        
    return total_loss / len(valid_iterator)

In [None]:
def translate_sentence(model, sentence, german, english, device, max_length=50):
    
    model.eval()

    tokens = [token.text.lower() for token in spacy_ger(sentence)]
    tokens.append(german.eos_token)

    text_to_indices = [german.vocab.stoi[token] for token in tokens]
    sentence_tensor = torch.LongTensor(text_to_indices).unsqueeze(1).to(device)

    preds = [english.vocab.stoi[english.init_token]]

    with torch.no_grad():
        
        encoder_states, hidden, cell, context_vector = model.encoder(sentence_tensor)
        
        for t in range(max_length):
                    
            trg = torch.Tensor([preds[-1]]).long().to(device)

            output, hidden, cell = model.decoder(trg, encoder_states, hidden, cell, context_vector)
            new = output.argmax(1).item()
            
            preds.append(new)
            
            if new == english.vocab.stoi["<eos>"]:
                break
            
        
    return [english.vocab.itos[i] for i in preds]

In [None]:
def beam(phrase, k):
    
    model.eval()
    
    sos = english.vocab.stoi["<sos>"]
    tgt = [sos]
    
    #Prepare sentence
    tokens = [token.text.lower() for token in spacy_ger(phrase)]
    tokens.append(german.eos_token)

    text_to_indices = [german.vocab.stoi[token] for token in tokens]
    sentence_tensor = torch.LongTensor(text_to_indices).unsqueeze(1).to(device)    
    

    with torch.no_grad():

        #Get encoder output
        encoder_states, hidden, cell, context_vector = model.encoder(sentence_tensor)
        
        
        #Get first output from model
        trg = torch.Tensor(tgt).long().to(device)

        output, hidden, cell = model.decoder(trg[0:1], encoder_states, hidden, cell, context_vector)
        out = F.softmax(output).squeeze()



        args = out.argsort()[-k:]
        probs = out[args].detach().cpu().numpy()
        
        args = args.detach().cpu().numpy()
        
        
        probs = np.log(probs)
        possible = list(zip([tgt + [args[i]] for i in range(k)], probs, [hidden.clone() for j in range(k)], [cell.clone() for j in range(k)]))


        for i in range(50):

            test=  []
            for j in range(k):

                tmp_tgt, tmp_prob, tmp_hidden, tmp_cell = possible[j]

                if tmp_tgt[-1] == english.vocab.stoi["<eos>"]:  #If sentence already ended
                    test.append(possible[j])

                else:
                    
                    #Compute output
                    trg = torch.Tensor(tmp_tgt).long().to(device)

                    output, hidden, cell = model.decoder(trg[i:i+1], encoder_states, tmp_hidden, tmp_cell, context_vector)
                    out = F.softmax(output).squeeze()
                    
                    
                    tmp_args = out.argsort()[-k:]
                    tmp_probs = out[args].detach().cpu().numpy()

                    tmp_args = tmp_args.detach().cpu().numpy()
                    tmp_probs = (tmp_prob + np.log(tmp_probs))/(len(tmp_tgt)-1)


                    for r in range(k): 
                        test.append((tmp_tgt + [tmp_args[r]], tmp_probs[r], hidden, cell))


            possible = sorted(test, key=lambda x:x[1], reverse=True)[:k]


                    
    
    return possible



def convert(x):
    
    sentence = x[0]
    sentence = [english.vocab.itos[i] for i in sentence]
    
    return (" ".join(sentence), x[1])

In [None]:
sentence =  "Da ich hungrig bin, möchte ich essen."
best_loss = 65646

for epoch in range(20):
    
    print(f'Epoch [{epoch} / {num_epochs}]')


    loss =  run_epoch()
    validation_loss = run_validation()
    
    translated_sentence = translate_sentence(model, sentence, german, english, device, max_length=50)
    out = beam(sentence, 3) 
    
    
    print(f"Translated example sentence: \n {list(map(convert, out[:2]))}")
    print(f"Greedy: {translated_sentence}")
    
    print(f"\n Train loss {loss} | Validation loss {validation_loss} \n \n")
    
    if validation_loss < best_loss:
        torch.save(model, "best_model")
        best_loss = validation_loss

In [None]:
def beam(phrase, k):
    
    model.eval()
    
    sos = english.vocab.stoi["<sos>"]
    tgt = [sos]
    
    #Prepare sentence
    tokens = [token.text.lower() for token in spacy_ger(phrase)]
    tokens.append(german.eos_token)
    tokens.insert(0, german.init_token)

    text_to_indices = [german.vocab.stoi[token] for token in tokens]
    sentence_tensor = torch.LongTensor(text_to_indices).unsqueeze(1).to(device)    
    

    with torch.no_grad():

        #Get encoder output
        encoder_states, hidden, cell = model.encoder(sentence_tensor)
        
        
        #Get first output from model
        trg = torch.Tensor([tgt[-1]]).long().to(device)

        output, hidden, cell = model.decoder(trg, encoder_states, hidden, cell)
        out = F.softmax(output).squeeze()



        args = out.argsort()[-k:]
        probs = out[args].detach().cpu().numpy()
        
        args = args.detach().cpu().numpy()
        
        
        probs = np.log(probs)
        possible = list(zip([tgt + [args[i]] for i in range(k)], probs, [hidden.clone() for j in range(k)], [cell.clone() for j in range(k)]))


        for i in range(50):

            test=  []
            for j in range(k):

                tmp_tgt, tmp_prob, tmp_hidden, tmp_cell = possible[j]

                if tmp_tgt[-1] == english.vocab.stoi["<eos>"]:  #If sentence already ended
                    test.append(possible[j])

                else:
                    
                    #Compute output
                    trg = torch.Tensor([tmp_tgt[-1]]).long().to(device)

                    output, hidden, cell = model.decoder(trg, encoder_states, tmp_hidden, tmp_cell)
                    out = F.softmax(output).squeeze()
                    
                    
                    tmp_args = out.argsort()[-k:]
                    tmp_probs = out[args].detach().cpu().numpy()

                    tmp_args = tmp_args.detach().cpu().numpy()
                    tmp_probs = (tmp_prob + np.log(tmp_probs))/(len(tmp_tgt)-1)


                    for r in range(k): 
                        test.append((tmp_tgt + [tmp_args[r]], tmp_probs[r], hidden, cell))


            possible = sorted(test, key=lambda x:x[1], reverse=True)[:k]


                    
    
    return possible



def convert(x):
    
    sentence = x[0]
    sentence = [english.vocab.itos[i] for i in sentence]
    
    return (" ".join(sentence), x[1])