# NLP Exercise 3: Seq2Seq Model and Attention Mechanisms
---

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
import datasets
from tqdm import tqdm
import spacy
from pprint import pprint
from transformers import AutoTokenizer
from collections import Counter
import json
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

  from .autonotebook import tqdm as notebook_tqdm


## Prepare Data and Preprocessing

Load dataset using the `datasets` library.

In [2]:
dataset = datasets.load_dataset('bentrevett/multi30k')

In [3]:
# Split the train dataset into train, validation, and test sets
train_data, valid_data, test_data = (
    dataset["train"],
    dataset["validation"],
    dataset["test"],
)

In [4]:
print(train_data, test_data, valid_data)

Dataset({
    features: ['en', 'de'],
    num_rows: 29000
}) Dataset({
    features: ['en', 'de'],
    num_rows: 1000
}) Dataset({
    features: ['en', 'de'],
    num_rows: 1014
})


In [5]:
# Define special tokens and parameters
max_length = 1000
lower = True
sos_token = "<sos>"
eos_token = "<eos>"
unk_token = "<unk>"
pad_token = "<pad>"
min_freq = 2
special_tokens = [unk_token, pad_token, sos_token, eos_token]

To install tokenizer for English and German:

`python -m spacy download en_core_web_sm`

`python -m spacy download de_core_news_sm`

In [6]:
# Define tokenizer
en_nlp = spacy.load('en_core_web_sm')
de_nlp = spacy.load('de_core_news_sm')

In [7]:
def process_sentence(sentence, en_vocab, de_vocab):
    # Tokenize English and German
    en_tokens = [token.text for token in en_nlp.tokenizer(sentence["en"])][:max_length]
    de_tokens = [token.text for token in de_nlp.tokenizer(sentence["de"])][:max_length]  

    if lower:
        en_tokens = [token.lower() for token in en_tokens]
        de_tokens = [token.lower() for token in de_tokens]

    # Add special tokens
    en_tokens = [sos_token] + en_tokens + [eos_token]
    de_tokens = [sos_token] + de_tokens + [eos_token]

    # Numericalize tokens
    en_ids = [en_vocab.get(token, en_vocab[unk_token]) for token in en_tokens]
    de_ids = [de_vocab.get(token, de_vocab[unk_token]) for token in de_tokens]

    return {
        "en": sentence["en"],
        "de": sentence["de"],
        "en_tokens": en_tokens,
        "de_tokens": de_tokens,
        "en_ids": en_ids,
        "de_ids": de_ids,
    }

In [8]:
# Step 2: Build Vocabulary
def build_vocab(data, min_freq, specials):
    counter = Counter()
    for tokens in data:
        counter.update(tokens)
    vocab = {token: idx for idx, token in enumerate(specials)}
    idx = len(vocab)
    for token, freq in counter.items():
        if freq >= min_freq and token not in vocab:
            vocab[token] = idx
            idx += 1
    return vocab

In [9]:
# Step 3: Generate Tokenized Data for Vocabulary Building
tokenized_train_data = train_data.map(
    lambda example: {"en_tokens": [token.text for token in en_nlp.tokenizer(example["en"])][:max_length],
                     "de_tokens": [token.text for token in de_nlp.tokenizer(example["de"])][:max_length]}
)

In [10]:
# Build vocabularies
en_vocab = build_vocab(tokenized_train_data["en_tokens"], min_freq, special_tokens)
de_vocab = build_vocab(tokenized_train_data["de_tokens"], min_freq, special_tokens)

In [11]:
# Step 4: Process Full Dataset
train_data = train_data.map(lambda x: process_sentence(x, en_vocab, de_vocab))
valid_data = valid_data.map(lambda x: process_sentence(x, en_vocab, de_vocab))
test_data = test_data.map(lambda x: process_sentence(x, en_vocab, de_vocab))

Map: 100%|██████████| 29000/29000 [00:03<00:00, 8557.72 examples/s] 
Map: 100%|██████████| 1014/1014 [00:00<00:00, 7646.72 examples/s]
Map: 100%|██████████| 1000/1000 [00:00<00:00, 8376.93 examples/s]


In [12]:
# Save vocabularies to JSON
with open("en_vocab.json", "w") as f:
    json.dump(en_vocab, f)
with open("de_vocab.json", "w") as f:
    json.dump(de_vocab, f)

In [13]:
# Check for special tokens in both vocabularies
assert en_vocab[unk_token] == de_vocab[unk_token]
assert en_vocab[pad_token] == de_vocab[pad_token]

unk_index = en_vocab[unk_token]
pad_index = en_vocab[pad_token]

In [14]:
data_type = "torch"
format_columns = ["en_ids", "de_ids"]

train_data = train_data.with_format(
    type=data_type, columns=format_columns, output_all_columns=True
)

valid_data = valid_data.with_format(
    type=data_type,
    columns=format_columns,
    output_all_columns=True,
)

test_data = test_data.with_format(
    type=data_type,
    columns=format_columns,
    output_all_columns=True,
)

In [15]:
print(train_data)
pprint(train_data[0])

Dataset({
    features: ['en', 'de', 'en_tokens', 'de_tokens', 'en_ids', 'de_ids'],
    num_rows: 29000
})
{'de': 'Zwei junge weiße Männer sind im Freien in der Nähe vieler Büsche.',
 'de_ids': tensor([   2,  273,    5,    6,    0,    8,    9, 3720,   11,   12,    0,   14,
           0,   16,    3]),
 'de_tokens': ['<sos>',
               'zwei',
               'junge',
               'weiße',
               'männer',
               'sind',
               'im',
               'freien',
               'in',
               'der',
               'nähe',
               'vieler',
               'büsche',
               '.',
               '<eos>'],
 'en': 'Two young, White males are outside near many bushes.',
 'en_ids': tensor([  2, 286,   5,   6, 117,   8,   9,  10,  11,  12,  13,  14,   3]),
 'en_tokens': ['<sos>',
               'two',
               'young',
               ',',
               'white',
               'males',
               'are',
               'outside',
             

## Data Loaders

In [16]:
def get_collate_fn(pad_index):
    def collate_fn(batch):
        batch_en_ids = [example["en_ids"] for example in batch]
        batch_de_ids = [example["de_ids"] for example in batch]
        batch_en_ids = nn.utils.rnn.pad_sequence(batch_en_ids, padding_value=pad_index)
        batch_de_ids = nn.utils.rnn.pad_sequence(batch_de_ids, padding_value=pad_index)
        batch = {
            "en_ids": batch_en_ids,
            "de_ids": batch_de_ids,
        }
        return batch

    return collate_fn

In [17]:
def get_data_loader(dataset, batch_size, pad_index, shuffle=False):
    collate_fn = get_collate_fn(pad_index)
    data_loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        collate_fn=collate_fn,
        shuffle=shuffle,
    )
    return data_loader

In [18]:
batch_size = 64

train_loader = get_data_loader(train_data, batch_size, pad_index, shuffle=True)
valid_loader = get_data_loader(valid_data, batch_size, pad_index)
test_loader = get_data_loader(test_data, batch_size, pad_index)

## Define the Encoder and Decoder for Seq2Seq 

### Encoder

- Encoder reads the input sequence and summerizes the information in something called internal state vectors or context vectors. This context vector aims to encapsulate the information for all input elements to help the decoder make accurate predictions.
- This implementation involves creating an RNN-based encoder.

In [19]:
class Encoder(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, n_layers, dropout):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(input_dim, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        # src = [src length, batch size]
        embedded = self.dropout(self.embedding(src))
        # embedded = [src length, batch size, embedding dim]
        outputs, (hidden, cell) = self.rnn(embedded)
        # outputs = [src length, batch size, hidden dim * n directions]
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # outputs are always from the top hidden layer
        return hidden, cell

### Decoder

In [22]:
class Decoder(nn.Module):
    def __init__(self, output_dim, embedding_dim, hidden_dim, n_layers, dropout):
        super().__init__()
        self.output_dim = output_dim
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(output_dim, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        # input = [batch size]
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # n directions in the decoder will both always be 1, therefore:
        # hidden = [n layers, batch size, hidden dim]
        # context = [n layers, batch size, hidden dim]
        input = input.unsqueeze(0)
        # input = [1, batch size]
        embedded = self.dropout(self.embedding(input))
        # embedded = [1, batch size, embedding dim]
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        # output = [seq length, batch size, hidden dim * n directions]
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # seq length and n directions will always be 1 in this decoder, therefore:
        # output = [1, batch size, hidden dim]
        # hidden = [n layers, batch size, hidden dim]
        # cell = [n layers, batch size, hidden dim]
        prediction = self.fc_out(output.squeeze(0))
        # prediction = [batch size, output dim]
        return prediction, hidden, cell

### Seq2Seq Model 

For the final part of the implemenetation, we'll implement the seq2seq model. This will handle:

- receiving the input/source sentence
- using the encoder to produce the context vectors
- using the decoder to produce the predicted output/target sentence

In [23]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        assert (
            encoder.hidden_dim == decoder.hidden_dim
        ), "Hidden dimensions of encoder and decoder must be equal!"
        assert (
            encoder.n_layers == decoder.n_layers
        ), "Encoder and decoder must have equal number of layers!"

    def forward(self, src, trg, teacher_forcing_ratio):
        # src = [src length, batch size]
        # trg = [trg length, batch size]
        # teacher_forcing_ratio is probability to use teacher forcing
        # e.g. if teacher_forcing_ratio is 0.75 we use ground-truth inputs 75% of the time
        batch_size = trg.shape[1]
        trg_length = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim
        # tensor to store decoder outputs
        outputs = torch.zeros(trg_length, batch_size, trg_vocab_size).to(self.device)
        # last hidden state of the encoder is used as the initial hidden state of the decoder
        hidden, cell = self.encoder(src)
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # first input to the decoder is the <sos> tokens
        input = trg[0, :]
        # input = [batch size]
        for t in range(1, trg_length):
            # insert input token embedding, previous hidden and previous cell states
            # receive output tensor (predictions) and new hidden and cell states
            output, hidden, cell = self.decoder(input, hidden, cell)
            # output = [batch size, output dim]
            # hidden = [n layers, batch size, hidden dim]
            # cell = [n layers, batch size, hidden dim]
            # place predictions in a tensor holding predictions for each token
            outputs[t] = output
            # decide if we are going to use teacher forcing or not
            teacher_force = random.random() < teacher_forcing_ratio
            # get the highest predicted token from our predictions
            top1 = output.argmax(1)
            # if teacher forcing, use actual next token as next input
            # if not, use predicted token
            input = trg[t] if teacher_force else top1
            # input = [batch size]
        return outputs

In [24]:
input_dim = len(de_vocab)
output_dim = len(en_vocab)
encoder_embedding_dim = 256
decoder_embedding_dim = 256
hidden_dim = 512
n_layers = 2
encoder_dropout = 0.5
decoder_dropout = 0.5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

encoder = Encoder(
    input_dim,
    encoder_embedding_dim,
    hidden_dim,
    n_layers,
    encoder_dropout,
)

decoder = Decoder(
    output_dim,
    decoder_embedding_dim,
    hidden_dim,
    n_layers,
    decoder_dropout,
)

model = Seq2Seq(encoder, decoder, device).to(device)

In [25]:
print("Input Dim (German Vocab):", input_dim)
print("Output Dim (English Vocab):", output_dim)

Input Dim (German Vocab): 8014
Output Dim (English Vocab): 6191


In [26]:
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.01, 0.01)

model.apply(init_weights)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(8014, 256)
    (rnn): LSTM(256, 512, num_layers=2, dropout=0.5)
    (dropout): Dropout(p=0.5, inplace=False)
  )
  (decoder): Decoder(
    (embedding): Embedding(6191, 256)
    (rnn): LSTM(256, 512, num_layers=2, dropout=0.5)
    (fc_out): Linear(in_features=512, out_features=6191, bias=True)
    (dropout): Dropout(p=0.5, inplace=False)
  )
)

In [27]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"The model has {count_parameters(model):,} trainable parameters")

The model has 14,168,879 trainable parameters


In [28]:
assert encoder.hidden_dim == decoder.hidden_dim
assert encoder.n_layers == decoder.n_layers

In [29]:
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=pad_index)

def train_fn(
    model, data_loader, optimizer, criterion, clip, teacher_forcing_ratio
):
    model.train()
    epoch_loss = 0
    for i, batch in enumerate(data_loader):
        src = batch["de_ids"].to(device)
        trg = batch["en_ids"].to(device)
        # src = [src length, batch size]
        # trg = [trg length, batch size]
        optimizer.zero_grad()
        output = model(src, trg, teacher_forcing_ratio)
        # output = [trg length, batch size, trg vocab size]
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        # output = [(trg length - 1) * batch size, trg vocab size]
        trg = trg[1:].view(-1)
        # trg = [(trg length - 1) * batch size]
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(data_loader)

In [30]:
def evaluate_fn(model, data_loader, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for i, batch in enumerate(data_loader):
            src = batch["de_ids"].to(device)
            trg = batch["en_ids"].to(device)
            # src = [src length, batch size]
            # trg = [trg length, batch size]
            output = model(src, trg, 0)  # turn off teacher forcing
            # output = [trg length, batch size, trg vocab size]
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            # output = [(trg length - 1) * batch size, trg vocab size]
            trg = trg[1:].view(-1)
            # trg = [(trg length - 1) * batch size]
            loss = criterion(output, trg)
            epoch_loss += loss.item()
    return epoch_loss / len(data_loader)

In [31]:
n_epochs = 10
clip = 1.0
teacher_forcing_ratio = 0.5

best_valid_loss = float("inf")

for epoch in tqdm(range(n_epochs)):
    train_loss = train_fn(
        model,
        train_loader,
        optimizer,
        criterion,
        clip,
        teacher_forcing_ratio,

    )
    valid_loss = evaluate_fn(
        model,
        valid_loader,
        criterion,

    )
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), "tranlate-model.pt")
    print(f"\tTrain Loss: {train_loss:7.3f} | Train PPL: {np.exp(train_loss):7.3f}")
    print(f"\tValid Loss: {valid_loss:7.3f} | Valid PPL: {np.exp(valid_loss):7.3f}")

 10%|█         | 1/10 [08:16<1:14:30, 496.72s/it]

	Train Loss:   5.093 | Train PPL: 162.841
	Valid Loss:   4.954 | Valid PPL: 141.732


 20%|██        | 2/10 [16:07<1:04:12, 481.54s/it]

	Train Loss:   4.599 | Train PPL:  99.397
	Valid Loss:   4.910 | Valid PPL: 135.604


 30%|███       | 3/10 [25:01<58:59, 505.58s/it]  

	Train Loss:   4.337 | Train PPL:  76.451
	Valid Loss:   4.689 | Valid PPL: 108.778


 40%|████      | 4/10 [37:28<1:00:05, 600.92s/it]

	Train Loss:   4.074 | Train PPL:  58.805
	Valid Loss:   4.435 | Valid PPL:  84.375


 50%|█████     | 5/10 [46:18<47:55, 575.05s/it]  

	Train Loss:   3.889 | Train PPL:  48.851
	Valid Loss:   4.336 | Valid PPL:  76.376


 60%|██████    | 6/10 [54:11<36:02, 540.65s/it]

	Train Loss:   3.756 | Train PPL:  42.774
	Valid Loss:   4.286 | Valid PPL:  72.661


 70%|███████   | 7/10 [1:02:18<26:09, 523.10s/it]

	Train Loss:   3.616 | Train PPL:  37.189
	Valid Loss:   4.215 | Valid PPL:  67.673


 80%|████████  | 8/10 [1:09:29<16:27, 493.67s/it]

	Train Loss:   3.503 | Train PPL:  33.213
	Valid Loss:   4.140 | Valid PPL:  62.777


 90%|█████████ | 9/10 [1:19:35<08:48, 528.90s/it]

	Train Loss:   3.394 | Train PPL:  29.776
	Valid Loss:   4.050 | Valid PPL:  57.420


100%|██████████| 10/10 [1:31:36<00:00, 549.66s/it]

	Train Loss:   3.275 | Train PPL:  26.452
	Valid Loss:   4.049 | Valid PPL:  57.329





In [None]:
model.load_state_dict(torch.load("translate-model.pt"))

test_loss = evaluate_fn(model, test_loader, criterion)

print(f"| Test Loss: {test_loss:.3f} | Test PPL: {np.exp(test_loss):7.3f} |")

In [35]:
def translate_sentence(sentence, src_vocab, trg_vocab, model, device, max_length=50):
    model.eval()
    
    # Tokenize the sentence
    tokens = [token.text.lower() for token in de_nlp(sentence)]
    # Add <sos> and <eos> tokens
    tokens = [sos_token] + tokens + [eos_token]
    # Convert tokens to indices
    src_indexes = [src_vocab.get(token, src_vocab[unk_token]) for token in tokens]
    # Convert to tensor and add batch dimension
    src_tensor = torch.LongTensor(src_indexes).unsqueeze(1).to(device)
    
    # Encode the source sentence
    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)
    
    # Initialize the target sentence with <sos> token
    trg_indexes = [trg_vocab[sos_token]]
    
    for _ in range(max_length):
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)
        with torch.no_grad():
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell)
        pred_token = output.argmax(1).item()
        trg_indexes.append(pred_token)
        
        if pred_token == trg_vocab[eos_token]:
            break
    
    trg_tokens = [list(trg_vocab.keys())[list(trg_vocab.values()).index(i)] for i in trg_indexes]
    
    return trg_tokens[1:-1]

In [40]:
# Test the model on test data
sentence = test_data[0]["de"]
expected_result = test_data[0]["en"]
translation = translate_sentence(sentence, de_vocab, en_vocab, model, device)

print("Translated sentence:", " ".join(translation))
print("expected_result:", expected_result)

Translated sentence: a man in a brown hat is .
expected_result: A man in an orange hat starring at something.


## BLEU Score Calculation

In [9]:
from nltk.translate.bleu_score import sentence_bleu

In [None]:
def calculate_bleu_score(reference, candidate):
    """
    Calculate BLEU score for a single reference and candidate sentence pair.
    
        :param reference: List of words in the target sentence (ground truth).
        :param candidate: List of words in the predicted sentence.

    Return: BLEU score (float)
    """
    return sentence_bleu([reference], candidate)