In [None]:
# Mount Drive (optional - use if connecting to drive and in google colab environment)
from google.colab import drive
import os

drive.mount('/content/drive', force_remount=True)
path_to_root_dir = "/content/drive/MyDrive/deep learning folder/ensembleLSTM-CNN hybrid model/seq2seq" # You are advised to created a folder in your drive with name `deep learning folder` for everything to work seamlessly or You can change this to your desired folder path

if os.path.exists(path_to_root_dir) == False:
  os.mkdir(path_to_root_dir)

# Change directory to Where you want to save results
os.chdir(path_to_root_dir)

In [None]:
import torch
import spacy
from torchtext.data.metrics import bleu_score
import sys
# https://github.com/aladdinpersson/Machine-Learning-Collection/tree/master/ML/Pytorch/more_advanced

def translate_sentence(model, sentence, german, english, device, max_length=50):
    # print(sentence)

    # sys.exit()

    # Load german tokenizer
    # spacy_ger = spacy.load("de")
    spacy_ger = spacy.load('de_core_news_sm')

    # Create tokens using spacy and everything in lower case (which is what our vocab is)
    if type(sentence) == str:
        tokens = [token.text.lower() for token in spacy_ger(sentence)]
    else:
        tokens = [token.lower() for token in sentence]

    # print(tokens)

    # sys.exit()
    # Add <SOS> and <EOS> in beginning and end respectively
    tokens.insert(0, german.init_token)
    tokens.append(german.eos_token)

    # Go through each german token and convert to an index
    text_to_indices = [german.vocab.stoi[token] for token in tokens]

    # Convert to Tensor
    sentence_tensor = torch.LongTensor(text_to_indices).unsqueeze(1).to(device)

    # Build encoder hidden, cell state
    with torch.no_grad():
        hidden, cell = model.encoder(sentence_tensor)

    outputs = [english.vocab.stoi["<sos>"]]

    for _ in range(max_length):
        previous_word = torch.LongTensor([outputs[-1]]).to(device)

        with torch.no_grad():
            output, (hidden, cell) = model.decoder(previous_word, hidden, cell)
            best_guess = output.argmax(1).item()

        outputs.append(best_guess)

        # Model predicts it's the end of the sentence
        if output.argmax(1).item() == english.vocab.stoi["<eos>"]:
            break

    translated_sentence = [english.vocab.itos[idx] for idx in outputs]

    # remove start token
    return translated_sentence[1:]


def bleu(data, model, german, english, device):
    targets = []
    outputs = []

    for example in data:
        src = vars(example)["src"]
        trg = vars(example)["trg"]

        prediction = translate_sentence(model, src, german, english, device)
        prediction = prediction[:-1]  # remove <eos> token

        targets.append([trg])
        outputs.append(prediction)

    return bleu_score(outputs, targets)


def save_checkpoint(state, filename="my_checkpoint.pth.tar"):
    print("=> Saving checkpoint")
    torch.save(state, filename)


def load_checkpoint(checkpoint, model, optimizer):
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint["state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer"])

In [None]:
# !pip install torchtext==0.6.0
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter # to print to tensorboard
from torchtext.datasets import Multi30k
from torchtext.data import Field, BucketIterator
import numpy as np
import spacy
import random

spacy.cli.download("de_core_news_sm")
spacy_ger = spacy.load('de_core_news_sm')

spacy.cli.download("en_core_web_sm")
spacy_eng = spacy.load('en_core_web_sm')

def tokenizer_ger(text):
  return [tok.text for tok in spacy_ger.tokenizer(text)]

def tokenizer_eng(text):
  return [tok.text for tok in spacy_eng.tokenizer(text)]

german = Field(tokenize=tokenizer_ger, lower=True, init_token='<sos>', eos_token='<eos>')
english = Field(tokenize=tokenizer_eng, lower=True, init_token='<sos>', eos_token='<eos>')

train_data, validation_data, test_data = Multi30k.splits(exts=('.de', '.en'), fields=(german, english))

german.build_vocab(train_data, max_size=10000, min_freq=2)
english.build_vocab(train_data, max_size=10000, min_freq=2)

In [None]:
class Encoder(nn.Module):
  def __init__(self, input_size, embedding_size, hidden_size, num_layers, dropout):
      super(Encoder, self).__init__()
      self.hidden_size = hidden_size
      self.num_layers = num_layers

      self.dropout = nn.Dropout(dropout)
      self.embedding = nn.Embedding(input_size, embedding_size)
      self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=dropout)

  def forward(self, x):
      # x shape: (seq_len, N) e.g [[2,3],[9,4], [7,1]] 
      # shape: (3, 2)
      
      embedding = self.dropout(self.embedding(x))
      # embedding shape: (seq_len, N, embedding_size) e.g
      #[
      #[
      #  [1,9,5,5,8],
      #  [1,9,5,5,8]
      #],
      #
      #
      #[
      #  [1,9,5,5,8],
      #  [1,9,5,5,8]  
      #],
      #
      #
      #[
      #  [1,9,5,5,8],
      #  [1,9,5,5,8]
      #]]
      # shape: (3, 2, 5)

      outputs, (hidden, cell) = self.rnn(embedding)

      return hidden, cell

class Decoder(nn.Module):
  def __init__(self, input_size, embedding_size, hidden_size, output_size, num_layers, dropout):
      super(Decoder, self).__init__()
      self.hidden_size = hidden_size
      self.num_layers = num_layers
      self.dropout = nn.Dropout(dropout)
      self.embedding = nn.Embedding(input_size, embedding_size)
      self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=dropout)
      self.fc = nn.Linear(hidden_size, output_size)

  def forward(self, x, hidden, cell):
    # shape of x: (N) but we want (1, N) e.g e.g [7,1] (shape: 2) --> [[7,1]](shape: 1*2) 
    # What this means is that we feed the decoded with one word at a time to get the next word.
    x = x.unsqueeze(0)

    embedding = self.dropout(self.embedding(x))
    # embedding shape: (1, N, embedding_size)
    # e.g 
    # [
    #    [
    #      [1,2,3,4,5],
    #       [1,2,3,1,6]
    #     ]
    # ]
    # shape:- ((1, 2, 5))

    outputs, (hidden, cell) = self.rnn(embedding, (hidden, cell))
    # shape of output: (1, N, hidden_size)

    predictions = self.fc(outputs) 
    # shape of prediction: (1, N, output_size)

    predictions = predictions.squeeze(0)
    # shape of prediction: (N, output_size)

    return predictions, (hidden, cell)

class Seq2Seq(nn.Module):
  def __init__(self, encoder, decoder):
      super(Seq2Seq, self).__init__()
      self.encoder = encoder
      self.decoder = decoder

  def forward(self, source, target, teacher_force_ratio=0.5):
      batch_size = source.shape[1]
      target_len = target.shape[0]
      target_vocab_size = len(english.vocab)

      outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(device)
      hidden, cell = self.encoder(source)

      # Grad start token
      x = target[0]
      for t in range(1, target_len):
          output, (hidden, cell) = self.decoder(x, hidden, cell)
          outputs[t] = output
          best_guess = output.argmax(1)
          x = target[t] if random.random() < teacher_force_ratio else best_guess
      return outputs
  

In [None]:
### Now we're ready to do the training ###

# Training Hyperparameters
num_epochs = 20
learning_rate = 0.003
batch_size = 64

# Model Hyperparamters
load_model = False
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
input_size_encoder = len(german.vocab)
input_size_decoder = len(english.vocab)
output_size = len(english.vocab)
encoder_embedding_size = 300
decoder_embedding_size = 300
hidden_size = 1024
num_layers = 2
enc_dropout = 0.2
dec_dropout = 0.2

# Tensorboard
writer = SummaryWriter(f"runs/loss_plot")
step = 0
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_data, validation_data, test_data), batch_size=batch_size, sort_within_batch=True,
    sort_key = lambda x: len(x.src),
    device=device
)

encoder_net = Encoder(input_size_encoder, encoder_embedding_size, hidden_size, num_layers, enc_dropout).to(device)
decoder_net = Decoder(input_size_decoder, decoder_embedding_size, hidden_size, output_size, num_layers, dec_dropout).to(device)
model = Seq2Seq(encoder_net, decoder_net).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
pad_idx = english.vocab.stoi['<pad>']
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)

In [None]:
num_epochs = 150

if load_model:
  load_checkpoint(torch.load('my_checkpoint.path.ptar'), model, optimizer)

sentence = 'Ein Boot mit mehreren Männern wird von einem großen Pferdegespann ans Ufer gezogen..'

for epoch in range(num_epochs):
    print(f"Epoch [{epoch} / {num_epochs}]")

    checkpoint = {'state_dict' : model.state_dict(), 'optimizer':optimizer.state_dict()}
    save_checkpoint(checkpoint)

    model.eval()
    translated_sentence = translate_sentence(model, sentence, german, english, device, max_length=50)
    print(f"Translated example sentence \n {translated_sentence}")
    model.train()

    for batch_idx, bacth in enumerate(train_iterator):
      inp_data = bacth.src.to(device)
      target = bacth.trg.to(device)

      output = model(inp_data, target)
      # output shape: (trg_len, bacth_size, output_dim)

      output = output[1:].reshape(-1, output.shape[2])
      target = target[1:].reshape(-1)

      optimizer.zero_grad()
      loss = criterion(output, target)

      loss.backward()

      torch.nn.utils.clip_grad_norm_(model.parameters(),max_norm=1)
      optimizer.step()

      writer.add_scalar('Training loss', loss, global_step=step)
      step += 1


In [None]:
score = bleu(test_data, model, german, english, device)
print(f"Bleu score {score*100:.2f}")