# Seq2Seq Model and Attention Mechanisms
---

In [None]:
!pip install spacy

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
import datasets
from tqdm import tqdm
import spacy
from pprint import pprint
from transformers import AutoTokenizer
from collections import Counter
import json
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

## Prepare Data and Preprocessing

Load dataset using the `datasets` library.

In [None]:
dataset = datasets.load_dataset('bentrevett/multi30k')

In [None]:
# Split the train dataset into train, validation, and test sets
train_data, valid_data, test_data = (
    dataset["train"],
    dataset["validation"],
    dataset["test"],
)

In [None]:
print(train_data, test_data, valid_data)

In [None]:
# Define special tokens and parameters
max_length = 1000
lower = True
sos_token = "<sos>"
eos_token = "<eos>"
unk_token = "<unk>"
pad_token = "<pad>"
min_freq = 2
special_tokens = [unk_token, pad_token, sos_token, eos_token]

To install tokenizer for English and German:

`python -m spacy download en_core_web_sm`

`python -m spacy download de_core_news_sm`

In [None]:
!/usr/bin/python3 -m spacy download en_core_web_sm
!/usr/bin/python3 -m spacy download de_core_news_sm

In [None]:
# Define tokenizer
en_nlp = spacy.load('en_core_web_sm')
de_nlp = spacy.load('de_core_news_sm')

In [None]:
def process_sentence(sentence, en_vocab, de_vocab):
    # Tokenize English and German
    en_tokens = [token.text for token in en_nlp.tokenizer(sentence["en"])][:max_length]
    de_tokens = [token.text for token in de_nlp.tokenizer(sentence["de"])][:max_length]  

    if lower:
        en_tokens = [token.lower() for token in en_tokens]
        de_tokens = [token.lower() for token in de_tokens]

    # Add special tokens
    en_tokens = [sos_token] + en_tokens + [eos_token]
    de_tokens = [sos_token] + de_tokens + [eos_token]

    # Numericalize tokens
    en_ids = [en_vocab.get(token, en_vocab[unk_token]) for token in en_tokens]
    de_ids = [de_vocab.get(token, de_vocab[unk_token]) for token in de_tokens]

    return {
        "en": sentence["en"],
        "de": sentence["de"],
        "en_tokens": en_tokens,
        "de_tokens": de_tokens,
        "en_ids": en_ids,
        "de_ids": de_ids,
    }

In [None]:
# Step 2: Build Vocabulary
def build_vocab(data, min_freq, specials):
    counter = Counter()
    for tokens in data:
        counter.update(tokens)
    vocab = {token: idx for idx, token in enumerate(specials)}
    idx = len(vocab)
    for token, freq in counter.items():
        if freq >= min_freq and token not in vocab:
            vocab[token] = idx
            idx += 1
    return vocab

In [None]:
# Step 3: Generate Tokenized Data for Vocabulary Building
tokenized_train_data = train_data.map(
    lambda example: {"en_tokens": [token.text for token in en_nlp.tokenizer(example["en"])][:max_length],
                     "de_tokens": [token.text for token in de_nlp.tokenizer(example["de"])][:max_length]}
)

In [None]:
# Build vocabularies
en_vocab = build_vocab(tokenized_train_data["en_tokens"], min_freq, special_tokens)
de_vocab = build_vocab(tokenized_train_data["de_tokens"], min_freq, special_tokens)

In [None]:
# Step 4: Process Full Dataset
train_data = train_data.map(lambda x: process_sentence(x, en_vocab, de_vocab))
valid_data = valid_data.map(lambda x: process_sentence(x, en_vocab, de_vocab))
test_data = test_data.map(lambda x: process_sentence(x, en_vocab, de_vocab))

In [None]:
# Save vocabularies to JSON
with open("en_vocab.json", "w") as f:
    json.dump(en_vocab, f)
with open("de_vocab.json", "w") as f:
    json.dump(de_vocab, f)

In [None]:
# Check for special tokens in both vocabularies - this is debugging
assert en_vocab[unk_token] == de_vocab[unk_token]
assert en_vocab[pad_token] == de_vocab[pad_token]

unk_index = en_vocab[unk_token]
pad_index = en_vocab[pad_token]

In [None]:
data_type = "torch"
format_columns = ["en_ids", "de_ids"]

train_data = train_data.with_format(
    type=data_type, columns=format_columns, output_all_columns=True
)

valid_data = valid_data.with_format(
    type=data_type,
    columns=format_columns,
    output_all_columns=True,
)

test_data = test_data.with_format(
    type=data_type,
    columns=format_columns,
    output_all_columns=True,
)

In [None]:
print(train_data)
pprint(train_data[0])

## Data Loaders

In [None]:
def get_collate_fn(pad_index):
    def collate_fn(batch):
        batch_en_ids = [example["en_ids"] for example in batch]
        batch_de_ids = [example["de_ids"] for example in batch]

        # Pad sequences along the first dimension to maintain [seq_length, batch_size]
        batch_en_ids = nn.utils.rnn.pad_sequence(batch_en_ids, batch_first=False, padding_value=pad_index)
        batch_de_ids = nn.utils.rnn.pad_sequence(batch_de_ids, batch_first=False, padding_value=pad_index)

        # Create a batch dictionary
        batch = {
            "en_ids": batch_en_ids, 
            "de_ids": batch_de_ids,
        }
        return batch

    return collate_fn

In [None]:
def get_data_loader(dataset, batch_size, pad_index, shuffle=False):
    collate_fn = get_collate_fn(pad_index)
    data_loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        collate_fn=collate_fn,
        shuffle=shuffle,
    )
    return data_loader

In [None]:
batch_size = 64

train_loader = get_data_loader(train_data, batch_size, pad_index, shuffle=True)
valid_loader = get_data_loader(valid_data, batch_size, pad_index)
test_loader = get_data_loader(test_data, batch_size, pad_index)

## Define the Encoder and Decoder for Seq2Seq 

### Encoder

- Encoder reads the input sequence and summerizes the information in something called internal state vectors or context vectors. This context vector aims to encapsulate the information for all input elements to help the decoder make accurate predictions.
- This implementation involves creating an RNN-based encoder.

In [None]:
# Here, define class Encoder with __init__ and forward function
# collate_fn method edited due to inconsistent batch sizes
class Encoder(nn.Module):
    def __init__(self, input_dim, embed_dim, hidden_dim, n_layers, dropout):
        super(Encoder, self).__init__()
        
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers

        # Embedding layer
        self.embedding = nn.Embedding(input_dim, embed_dim)

        # LSTM layer
        self.lstm = nn.LSTM(
            input_size=embed_dim,
            hidden_size=hidden_dim,
            num_layers=n_layers,
            dropout=dropout if n_layers > 1 else 0,
            batch_first=True
        )

        # Dropout layer
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, src_lengths):
        # Pass through embedding layer
        embedded = self.dropout(self.embedding(src))

        # Pack padded sequences for efficiency
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, src_lengths.cpu(), batch_first=True, enforce_sorted=False)

        # Pass through LSTM
        packed_outputs, (hidden, cell) = self.lstm(packed_embedded)

        # Unpack sequences
        outputs, _ = nn.utils.rnn.pad_packed_sequence(packed_outputs, batch_first=True)

        # Return outputs, hidden state, and cell state
        return outputs, hidden, cell



### Decoder

In [None]:
# Similarly, introduce class Decoder here
class Decoder(nn.Module):
    def __init__(self, output_dim, embed_dim, hidden_dim, n_layers, dropout):
        super(Decoder, self).__init__()
        
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers

        # Embedding layer
        self.embedding = nn.Embedding(output_dim, embed_dim)

        # LSTM layer
        self.lstm = nn.LSTM(
            input_size=embed_dim,
            hidden_size=hidden_dim,
            num_layers=n_layers,
            dropout=dropout if n_layers > 1 else 0,
            batch_first=True
        )

        # Fully connected layer to map hidden state to output vocabulary
        self.fc = nn.Linear(hidden_dim, output_dim)

        # Dropout layer
        self.dropout = nn.Dropout(dropout)

    def forward(self, tgt, hidden, cell):
        # Take only the last token (teacher forcing step-by-step)
        tgt = tgt.unsqueeze(1)

        # Pass through embedding layer
        embedded = self.dropout(self.embedding(tgt))

        # Pass through LSTM
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))

        # Pass through the fully connected layer
        prediction = self.fc(output.squeeze(1))

        # Return prediction, hidden state, and cell state
        return prediction, hidden, cell


### Seq2Seq Model 

For the final part of the implemenetation, we'll implement the seq2seq model. This will handle:

- receiving the input/source sentence
- using the encoder to produce the context vectors
- using the decoder to produce the predicted output/target sentence

In [None]:
#Here, introduce a Seq2Seq class using the encoder and the decoder
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, tgt, teacher_forcing_ratio=0.5):
        batch_size = tgt.size(0)
        tgt_len = tgt.size(1)
        tgt_vocab_size = self.decoder.fc.out_features

        # Tensor to store decoder outputs
        outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size).to(self.device)

        # Encode the source sequence
        src_lengths = (src != 0).sum(dim=1)
        encoder_outputs, hidden, cell = self.encoder(src, src_lengths)

        # Initialize the decoder input with the <sos> token
        input = tgt[:, 0]

        # Iterate through the target sequence
        for t in range(1, tgt_len):
            # Pass through the decoder
            output, hidden, cell = self.decoder(input, hidden, cell)

            # Store the prediction
            outputs[:, t, :] = output

            # Decide whether to use teacher forcing
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio

            # Get the highest predicted token
            top1 = output.argmax(1)

            # Use teacher forcing or prediction as the next input
            input = tgt[:, t] if teacher_force and t + 1 < tgt_len else top1

        return outputs


In [None]:
input_dim = len(de_vocab)
output_dim = len(en_vocab)
encoder_embedding_dim = 256
decoder_embedding_dim = 256
hidden_dim = 512
n_layers = 2
encoder_dropout = 0.5
decoder_dropout = 0.5
device = torch.device("mps" if torch.mps.is_available() else "cpu")

encoder = Encoder(
    input_dim,
    encoder_embedding_dim,
    hidden_dim,
    n_layers,
    encoder_dropout,
)

decoder = Decoder(
    output_dim,
    decoder_embedding_dim,
    hidden_dim,
    n_layers,
    decoder_dropout,
)

model = Seq2Seq(encoder, decoder, device).to(device)

In [None]:
print("Input Dim (German Vocab):", input_dim)
print("Output Dim (English Vocab):", output_dim)

In [None]:
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.01, 0.01)

model.apply(init_weights)

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"The model has {count_parameters(model):,} trainable parameters")

In [None]:
assert encoder.hidden_dim == decoder.hidden_dim
assert encoder.n_layers == decoder.n_layers

In [None]:
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=pad_index)

def train_fn(
    model, data_loader, optimizer, criterion, clip, teacher_forcing_ratio
):
    model.train()
    epoch_loss = 0
    for i, batch in enumerate(data_loader):
        src = batch["de_ids"].to(device)
        trg = batch["en_ids"].to(device)
        # src = [src length, batch size]
        # trg = [trg length, batch size]
        optimizer.zero_grad()
        output = model(src, trg, teacher_forcing_ratio)
        # output = [trg length, batch size, trg vocab size]
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        # output = [(trg length - 1) * batch size, trg vocab size]
        trg = trg[1:].view(-1)
        # trg = [(trg length - 1) * batch size]
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(data_loader)

In [None]:
def evaluate_fn(model, data_loader, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for i, batch in enumerate(data_loader):
            src = batch["de_ids"].to(device)
            trg = batch["en_ids"].to(device)
            # src = [src length, batch size]
            # trg = [trg length, batch size]
            output = model(src, trg, 0)  # turn off teacher forcing
            # output = [trg length, batch size, trg vocab size]
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            # output = [(trg length - 1) * batch size, trg vocab size]
            trg = trg[1:].view(-1)
            # trg = [(trg length - 1) * batch size]
            loss = criterion(output, trg)
            epoch_loss += loss.item()
    return epoch_loss / len(data_loader)

In [None]:
n_epochs = 10
clip = 1.0
teacher_forcing_ratio = 0.5

best_valid_loss = float("inf")

for epoch in tqdm(range(n_epochs)):
    train_loss = train_fn(
        model,
        train_loader,
        optimizer,
        criterion,
        clip,
        teacher_forcing_ratio,

    )
    valid_loss = evaluate_fn(
        model,
        valid_loader,
        criterion,

    )
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), "tranlate-model.pt")
    print(f"\tTrain Loss: {train_loss:7.3f} | Train PPL: {np.exp(train_loss):7.3f}")
    print(f"\tValid Loss: {valid_loss:7.3f} | Valid PPL: {np.exp(valid_loss):7.3f}")

In [None]:
model.load_state_dict(torch.load("tranlate-model.pt"))

test_loss = evaluate_fn(model, test_loader, criterion)

print(f"| Test Loss: {test_loss:.3f} | Test PPL: {np.exp(test_loss):7.3f} |")

In [None]:
def translate_sentence(sentence, src_vocab, trg_vocab, model, device, max_length=50):
    model.eval()
    
    # Tokenize the sentence
    tokens = [token.text.lower() for token in de_nlp(sentence)]
    # Add <sos> and <eos> tokens
    tokens = [sos_token] + tokens + [eos_token]
    # Convert tokens to indices
    src_indexes = [src_vocab.get(token, src_vocab[unk_token]) for token in tokens]
    # Convert to tensor and add batch dimension
    src_tensor = torch.LongTensor(src_indexes).unsqueeze(1).to(device)
    
    # Encode the source sentence
    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)
    
    # Initialize the target sentence with <sos> token
    trg_indexes = [trg_vocab[sos_token]]
    
    for _ in range(max_length):
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)
        with torch.no_grad():
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell)
        pred_token = output.argmax(1).item()
        trg_indexes.append(pred_token)
        
        if pred_token == trg_vocab[eos_token]:
            break
    
    trg_tokens = [list(trg_vocab.keys())[list(trg_vocab.values()).index(i)] for i in trg_indexes]
    
    return trg_tokens[1:-1]

In [None]:
# Test the model on test data
sentence = test_data[0]["de"]
expected_result = test_data[0]["en"]
translation = translate_sentence(sentence, de_vocab, en_vocab, model, device)

print("Translated sentence:", " ".join(translation))
print("expected_result:", expected_result)

## BLEU Score Calculation

In [None]:
from nltk.translate.bleu_score import sentence_bleu

In [None]:
def calculate_bleu_score(reference, candidate):
    """
    Calculate BLEU score for a single reference and candidate sentence pair.
    
        :param reference: List of words in the target sentence (ground truth).
        :param candidate: List of words in the predicted sentence.

    Return: BLEU score (float)
    """
    return sentence_bleu([reference], candidate)

In [None]:
x = calculate_bleu_score(expected_result, translation)