# Seq2Seq with Attention for Machine Translation
Based on [Aladdin Persson](https://www.youtube.com/@AladdinPersson)'s [Tutorial](https://www.youtube.com/watch?v=sQUqQddQtB4)

### Downloads and imports

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

from torchtext.datasets import Multi30k # German to English dataset
"""
Had a problem with downloading the dataset due to down servers. Had to substitute the urls with the ones below.

urls = ['https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/training.tar.gz',
        'https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/validation.tar.gz',
        'https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/mmt16_task1_test.tar.gz',
        'https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/mmt_task1_test2016.tar.gz']
"""


from torchtext.data import Field, BucketIterator
import numpy as np
import spacy # Tokenizer
import random
from torch.utils.tensorboard.writer import SummaryWriter  # to print to tensorboard

In [2]:
DEVICE = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {DEVICE} device")

Using cuda device


### Utils
Imported from [Aladdin Persson](https://www.youtube.com/@AladdinPersson)'s [Utils File](https://github.com/aladdinpersson/Machine-Learning-Collection/blob/558557c7989f0b10fee6e8d8f953d7269ae43d4f/ML/Pytorch/more_advanced/Seq2Seq_attention/seq2seq_attention.py)

In [3]:
import torch
import spacy
from torchtext.data.metrics import bleu_score
import sys


def translate_sentence(model, sentence, german, english, device, max_length=50):
    # Load german tokenizer
    spacy_de = spacy.load("de")

    # Create tokens using spacy and everything in lower case (which is what our vocab is)
    if type(sentence) == str:
        tokens = [token.text.lower() for token in spacy_de(sentence)]
    else:
        tokens = [token.lower() for token in sentence]

    # Add <SOS> and <EOS> in beginning and end respectively
    tokens.insert(0, german.init_token)
    tokens.append(german.eos_token)

    # Go through each german token and convert to an index
    text_to_indices = [german.vocab.stoi[token] for token in tokens]

    # Convert to Tensor
    sentence_tensor = torch.LongTensor(text_to_indices).unsqueeze(1).to(device)

    # Build encoder hidden, cell state
    with torch.no_grad():
        outputs_encoder, hiddens, cells = model.encoder(sentence_tensor)

    outputs = [english.vocab.stoi["<sos>"]]

    for _ in range(max_length):
        previous_word = torch.LongTensor([outputs[-1]]).to(device)

        with torch.no_grad():
            output, hiddens, cells = model.decoder(
                previous_word, outputs_encoder, hiddens, cells
            )
            best_guess = output.argmax(1).item()

        outputs.append(best_guess)

        # Model predicts it's the end of the sentence
        if output.argmax(1).item() == english.vocab.stoi["<eos>"]:
            break

    translated_sentence = [english.vocab.itos[idx] for idx in outputs]

    # remove start token
    return translated_sentence[1:]


def bleu(data, model, german, english, device):
    targets = []
    outputs = []

    for example in data:
        src = vars(example)["src"]
        trg = vars(example)["trg"]

        prediction = translate_sentence(model, src, german, english, device)
        prediction = prediction[:-1]  # remove <eos> token

        targets.append([trg])
        outputs.append(prediction)

    return bleu_score(outputs, targets)


def save_checkpoint(state, filename="my_checkpoint.pth.tar"):
    print("=> Saving checkpoint")
    torch.save(state, filename)


def load_checkpoint(checkpoint, model, optimizer):
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint["state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer"])

### Data

In [4]:
spacy_de = spacy.load('de_core_news_sm') # German tokenizer
spacy_en = spacy.load('en_core_web_sm') # English tokenizer

In [5]:
def tokenize_de(text):
    """
    Tokenizes German text from a string into a list of strings (tokens)
    """
    return [tok.text for tok in spacy_de.tokenizer(text)]

def tokenize_en(text):
    """
    Tokenizes English text from a string into a list of strings (tokens)
    """
    return [tok.text for tok in spacy_en.tokenizer(text)]

In [6]:
deutsch = Field(tokenize=tokenize_de, lower=True,
                init_token='<sos>', eos_token='<eos>')

english = Field(tokenize=tokenize_en, lower=True,
                init_token='<sos>', eos_token='<eos>')

In [7]:
train_data, valid_data, test_data = Multi30k.splits(exts=('.de', '.en'),
                                                    fields=(deutsch, english))

In [8]:
deutsch.build_vocab(train_data, max_size=10000, min_freq=2)
english.build_vocab(train_data, max_size=10000, min_freq=2)

### Model

In [9]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers=1, dropout_p=0.):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.dropout = nn.Dropout(dropout_p)
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers,
                           bidirectional=True, dropout=dropout_p)
        
        self.fc_hidden = nn.Linear(hidden_size*2, hidden_size)
        self.fc_cell = nn.Linear(hidden_size*2, hidden_size)

    def forward(self, x):
        # x shape: (seq_length, N) where N is batch size
        
        embedding = self.dropout(self.embedding(x))
        # embedding shape: (seq_length, N, embedding_size)
        
        encoder_states, (hidden, cell) = self.rnn(embedding)

        hidden = self.fc_hidden(torch.cat((hidden[0:1], hidden[1:2]), dim=2))
        cell = self.fc_cell(torch.cat((cell[0:1], cell[1:2]), dim=2))


        return encoder_states, hidden, cell

In [10]:
class Decoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size,
                 output_size, num_layers=1, dropout_p=0.):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.dropout = nn.Dropout(dropout_p)
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=dropout_p)

        self.energy = nn.Linear(hidden_size*3, 1)
        # Hidden states from encoder and previous step on decoder
        self.softmax = nn.Softmax(dim=0)
        self.relu = nn.ReLU()
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, encoder_states, hidden, cell):
        # x shape: (N) where N is for batch size, we want it to be (1, N), seq_length
        x = x.unsqueeze(0)
        # x shape: (1, N)

        embedding = self.dropout(self.embedding(x))
        # embedding shape: (1, N, embedding_size)

        sequence_length = encoder_states.shape[0]
        h_reshaped = hidden.repeat(sequence_length, 1, 1)

        energy = self.relu(self.energy(torch.cat(h_reshaped, encoder_states), dim=2))
        attention = self.softmax(energy)
        # attention shape: (seq_length, N, 1)
        attention = attention.permute(1, 0, 2)
        # attention shape: (N, seq_length, 1)
        encoder_states = encoder_states.permute(1, 0, 2)
        # encoder_states shape: (N, seq_length, hidden_size*2)

        context_vector = torch.bmm(attention, encoder_states).permute(1, 0, 2)
        # context_vector shape: (1, N, hidden_size*2)

        rnn_input = torch.cat((context_vector, embedding), dim=2)

        outputs, (hidden, cell) = self.rnn(rnn_input, (hidden, cell))
        # outputs shape: (1, N, hidden_size)

        predictions = self.fc(outputs)
        predictions = predictions.squeeze(0)
        
        return predictions, hidden, cell

In [11]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source, target, teacher_force_ratio=0.5):
        # source shape: (seq_length, N) where N is batch size
        # teacher_force_ratio: how often the model uses its own predictions
        # during training versus relying on the target output

        batch_size = source.shape[1]
        target_len = target.shape[0]
        target_vocab_size = len(english.vocab)

        outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(DEVICE)
        # outputs shape: (target_len, batch_size, target_vocab_size)

        encoder_states, hidden, cell = self.encoder(source)
        # hidden shape: (num_layers, N, hidden_size)
        # cell shape: (num_layers, N, hidden_size)

        x = target[0] # Grab start token <SOS>
        # x shape: (N)

        for t in range(1, target_len):
            output, hidden, cell = self.decoder(x, encoder_states, hidden, cell)
            # output shape: (batch_size, target_vocab_size)
            outputs[t] = output
            best_guess = output.argmax(1) # argmax of output on dimension 1

            x = target[t] if random.random() < teacher_force_ratio else best_guess

        return outputs