In [27]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
import spacy
import datasets
import torchtext
import tqdm
import evaluate

# Loading Dataset

In [28]:
dataset = datasets.load_dataset("squad")

In [29]:
dataset

DatasetDict({
    train: Dataset({
        features: ['id', 'title', 'context', 'question', 'answers'],
        num_rows: 87599
    })
    validation: Dataset({
        features: ['id', 'title', 'context', 'question', 'answers'],
        num_rows: 10570
    })
})

In [30]:
split_dataset = dataset["train"].train_test_split(test_size=0.1, seed=42)

train_data = split_dataset["train"].select(range(10000))        # 90% of original train
test_data = split_dataset["test"]          # 10% of original train
valid_data = dataset["validation"]             # use official validation set
test_data[50]


{'id': '572fc26a04bcaa1900d76c90',
 'title': 'Greeks',
 'context': 'For those that remained under the Ottoman Empire\'s millet system, religion was the defining characteristic of national groups (milletler), so the exonym "Greeks" (Rumlar from the name Rhomaioi) was applied by the Ottomans to all members of the Orthodox Church, regardless of their language or ethnic origin. The Greek speakers were the only ethnic group to actually call themselves Romioi, (as opposed to being so named by others) and, at least those educated, considered their ethnicity (genos) to be Hellenic. There were, however, many Greeks who escaped the second-class status of Christians inherent in the Ottoman millet system, according to which Muslims were explicitly awarded senior status and preferential treatment. These Greeks either emigrated, particularly to their fellow Greek Orthodox protector, the Russian Empire, or simply converted to Islam, often only very superficially and whilst remaining crypto-Christian.

# Preprocessing

## Tokenization

In [31]:
en_nlp = spacy.load("en_core_web_sm")

In [32]:
def tokenize_example(example, en_nlp, max_length, lower, sos_token, eos_token):
    q_tokens = [token.text for token in en_nlp.tokenizer(example["question"])][:max_length]
    a_tokens = [token.text for token in en_nlp.tokenizer(example["answers"]["text"][0])][:max_length]
    if lower:
        q_tokens = [token.lower() for token in q_tokens]
        a_tokens = [token.lower() for token in a_tokens]
    q_tokens = [sos_token] + q_tokens + [eos_token]
    a_tokens = [sos_token] + a_tokens + [eos_token]
    return {"q_tokens": q_tokens, "a_tokens": a_tokens}

In [33]:
max_length = 1_000
lower = True
sos_token = "<sos>"
eos_token = "<eos>"

fn_kwargs = {
    "en_nlp": en_nlp,
    "max_length": max_length,
    "lower": lower,
    "sos_token": sos_token,
    "eos_token": eos_token,
}

train_data = train_data.map(tokenize_example, fn_kwargs=fn_kwargs)
valid_data = valid_data.map(tokenize_example, fn_kwargs=fn_kwargs)
test_data = test_data.map(tokenize_example, fn_kwargs=fn_kwargs)

Map: 100%|██████████| 8760/8760 [00:01<00:00, 5470.72 examples/s]


In [34]:
train_data[50]

{'id': '56d1ebdfe7d4791d00902578',
 'title': 'Buddhism',
 'context': 'Besides emptiness, Mahayana schools often place emphasis on the notions of perfected spiritual insight (prajñāpāramitā) and Buddha-nature (tathāgatagarbha). There are conflicting interpretations of the tathāgatagarbha in Mahāyāna thought. The idea may be traced to Abhidharma, and ultimately to statements of the Buddha in the Nikāyas. In Tibetan Buddhism, according to the Sakya school, tathāgatagarbha is the inseparability of the clarity and emptiness of one\'s mind. In Nyingma, tathāgatagarbha also generally refers to inseparability of the clarity and emptiness of one\'s mind. According to the Gelug school, it is the potential for sentient beings to awaken since they are empty (i.e. dependently originated). According to the Jonang school, it refers to the innate qualities of the mind that expresses themselves as omniscience etc. when adventitious obscurations are removed. The "Tathāgatagarbha Sutras" are a collection

## Creating Vocabularies

In [35]:
min_freq = 2
unk_token = "<unk>"
pad_token = "<pad>"

special_tokens = [
    unk_token,
    pad_token,
    sos_token,
    eos_token,
]

q_vocab = torchtext.vocab.build_vocab_from_iterator(
    train_data["q_tokens"],
    min_freq=min_freq,
    specials=special_tokens,
)

a_vocab = torchtext.vocab.build_vocab_from_iterator(
    train_data["a_tokens"],
    min_freq=min_freq,
    specials=special_tokens,
)

In [36]:
q_vocab.get_itos()[:10]

['<unk>', '<pad>', '<sos>', '<eos>', '?', 'the', 'what', 'of', 'in', 'to']

In [37]:
assert q_vocab[unk_token] == a_vocab[unk_token]
assert q_vocab[pad_token] == a_vocab[pad_token]

unk_index = q_vocab[unk_token]
pad_index = a_vocab[pad_token]

In [38]:
q_vocab.set_default_index(unk_index)
a_vocab.set_default_index(unk_index)

In [39]:
q_vocab["The"]

0

In [40]:
q_vocab.get_itos()[0]

'<unk>'

In [41]:
def numericalize_example(example, q_vocab, a_vocab):
    q_ids = q_vocab.lookup_indices(example["q_tokens"])
    a_ids = a_vocab.lookup_indices(example["a_tokens"])
    return {"q_ids": q_ids, "a_ids": a_ids}

In [42]:
fn_kwargs = {"q_vocab": q_vocab, "a_vocab": a_vocab}

train_data = train_data.map(numericalize_example, fn_kwargs=fn_kwargs)
valid_data = valid_data.map(numericalize_example, fn_kwargs=fn_kwargs)
test_data = test_data.map(numericalize_example, fn_kwargs=fn_kwargs)

Map: 100%|██████████| 8760/8760 [00:00<00:00, 14210.24 examples/s]


In [43]:
train_data[0]

{'id': '57263127ec44d21400f3dbf9',
 'title': 'Korean_War',
 'context': "After the formation of the People's Republic of China in 1949, the Chinese government named the Western nations, led by the United States, as the biggest threat to its national security. Basing this judgment on China's century of humiliation beginning in the early 19th century, American support for the Nationalists during the Chinese Civil War, and the ideological struggles between revolutionaries and reactionaries, the Chinese leadership believed that China would become a critical battleground in the United States' crusade against Communism. As a countermeasure and to elevate China's standing among the worldwide Communist movements, the Chinese leadership adopted a foreign policy that actively promoted Communist revolutions throughout territories on China's periphery.",
 'question': 'To show their strength in the international Communist movement, what did China do?',
 'answers': {'text': ['promoted Communist revol

In [44]:
data_type = "torch"
format_columns = ["q_ids", "a_ids"]

train_data = train_data.with_format(
    type=data_type, columns=format_columns, output_all_columns=True
)

valid_data = valid_data.with_format(
    type=data_type,
    columns=format_columns,
    output_all_columns=True,
)

test_data = test_data.with_format(
    type=data_type,
    columns=format_columns,
    output_all_columns=True,
)

In [45]:
train_data[0]

{'q_ids': tensor([   2,    9,  348,   59, 2506,    8,    5,  233, 4522,  377,   17,    6,
           12,  293,   35,    4,    3]),
 'a_ids': tensor([   2, 3408,  464,    0,    3]),
 'id': '57263127ec44d21400f3dbf9',
 'title': 'Korean_War',
 'context': "After the formation of the People's Republic of China in 1949, the Chinese government named the Western nations, led by the United States, as the biggest threat to its national security. Basing this judgment on China's century of humiliation beginning in the early 19th century, American support for the Nationalists during the Chinese Civil War, and the ideological struggles between revolutionaries and reactionaries, the Chinese leadership believed that China would become a critical battleground in the United States' crusade against Communism. As a countermeasure and to elevate China's standing among the worldwide Communist movements, the Chinese leadership adopted a foreign policy that actively promoted Communist revolutions throughout t

In [46]:
type(train_data[0]["a_ids"])

torch.Tensor

## Data Loaders

In [47]:
def get_collate_fn(pad_index):
    def collate_fn(batch):
        batch_en_ids = [example["q_ids"] for example in batch]
        batch_de_ids = [example["a_ids"] for example in batch]
        batch_en_ids = nn.utils.rnn.pad_sequence(batch_en_ids, padding_value=pad_index)
        batch_de_ids = nn.utils.rnn.pad_sequence(batch_de_ids, padding_value=pad_index)
        batch = {
            "q_ids": batch_en_ids,
            "a_ids": batch_de_ids,
        }
        return batch

    return collate_fn

In [48]:
def get_data_loader(dataset, batch_size, pad_index, shuffle=False):
    collate_fn = get_collate_fn(pad_index)
    data_loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        collate_fn=collate_fn,
        shuffle=shuffle,
    )
    return data_loader

In [49]:
batch_size = 128

train_data_loader = get_data_loader(train_data, batch_size, pad_index, shuffle=True)
valid_data_loader = get_data_loader(valid_data, batch_size, pad_index)
test_data_loader = get_data_loader(test_data, batch_size, pad_index)

In [50]:
def load_glove_embeddings(filepath):
    embeddings = {}
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split()
            word = parts[0]
            vector = torch.tensor([float(val) for val in parts[1:]], dtype=torch.float)
            embeddings[word] = vector
    return embeddings


In [None]:
def load_glove_embeddings(filepath):
    embeddings = {}
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split()
            word = parts[0]
            vector = torch.tensor([float(val) for val in parts[1:]], dtype=torch.float)
            embeddings[word] = vector
    return embeddings


# Building The Model

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, n_layers, dropout):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(input_dim, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        # src = [src length, batch size]
        embedded = self.dropout(self.embedding(src))
        # embedded = [src length, batch size, embedding dim]
        outputs, (hidden, cell) = self.rnn(embedded)
        # outputs = [src length, batch size, hidden dim * n directions]
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # outputs are always from the top hidden layer
        return hidden, cell

In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, embedding_dim, hidden_dim, n_layers, dropout):
        super().__init__()
        self.output_dim = output_dim
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(output_dim, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        # input = [batch size]
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # n directions in the decoder will both always be 1, therefore:
        # hidden = [n layers, batch size, hidden dim]
        # context = [n layers, batch size, hidden dim]
        input = input.unsqueeze(0)
        # input = [1, batch size]
        embedded = self.dropout(self.embedding(input))
        # embedded = [1, batch size, embedding dim]
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        # output = [seq length, batch size, hidden dim * n directions]
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # seq length and n directions will always be 1 in this decoder, therefore:
        # output = [1, batch size, hidden dim]
        # hidden = [n layers, batch size, hidden dim]
        # cell = [n layers, batch size, hidden dim]
        prediction = self.fc_out(output.squeeze(0))
        # prediction = [batch size, output dim]
        return prediction, hidden, cell

In [55]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        assert (
            encoder.hidden_dim == decoder.hidden_dim
        ), "Hidden dimensions of encoder and decoder must be equal!"
        assert (
            encoder.n_layers == decoder.n_layers
        ), "Encoder and decoder must have equal number of layers!"

    def forward(self, src, trg, teacher_forcing_ratio):
        # src = [src length, batch size]
        # trg = [trg length, batch size]
        # teacher_forcing_ratio is probability to use teacher forcing
        # e.g. if teacher_forcing_ratio is 0.75 we use ground-truth inputs 75% of the time
        batch_size = trg.shape[1]
        trg_length = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim
        # tensor to store decoder outputs
        outputs = torch.zeros(trg_length, batch_size, trg_vocab_size).to(self.device)
        # last hidden state of the encoder is used as the initial hidden state of the decoder
        hidden, cell = self.encoder(src)
        # hidden = [n layers * n directions, batch size, hidden dim]
        # cell = [n layers * n directions, batch size, hidden dim]
        # first input to the decoder is the <sos> tokens
        input = trg[0, :]
        # input = [batch size]
        for t in range(1, trg_length):
            # insert input token embedding, previous hidden and previous cell states
            # receive output tensor (predictions) and new hidden and cell states
            output, hidden, cell = self.decoder(input, hidden, cell)
            # output = [batch size, output dim]
            # hidden = [n layers, batch size, hidden dim]
            # cell = [n layers, batch size, hidden dim]
            # place predictions in a tensor holding predictions for each token
            outputs[t] = output
            # decide if we are going to use teacher forcing or not
            teacher_force = random.random() < teacher_forcing_ratio
            # get the highest predicted token from our predictions
            top1 = output.argmax(1)
            # if teacher forcing, use actual next token as next input
            # if not, use predicted token
            input = trg[t] if teacher_force else top1
            # input = [batch size]
        return outputs

In [None]:
input_dim = len(q_vocab)
output_dim = len(a_vocab)
encoder_embedding_dim = 256
decoder_embedding_dim = 256
hidden_dim = 512
n_layers = 2
encoder_dropout = 0.5
decoder_dropout = 0.5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

encoder = Encoder(
    input_dim,
    encoder_embedding_dim,
    hidden_dim,
    n_layers,
    encoder_dropout,
)

decoder = Decoder(
    output_dim,
    decoder_embedding_dim,
    hidden_dim,
    n_layers,
    decoder_dropout,
)

model = Seq2Seq(encoder, decoder, device).to(device)

AttributeError: 'Vocab' object has no attribute 'items'

In [104]:
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.08, 0.08)


model.apply(init_weights)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(6100, 256)
    (rnn): LSTM(256, 512, num_layers=2, dropout=0.5)
    (dropout): Dropout(p=0.5, inplace=False)
  )
  (decoder): Decoder(
    (embedding): Embedding(3831, 256)
    (rnn): LSTM(256, 512, num_layers=2, dropout=0.5)
    (fc_out): Linear(in_features=512, out_features=3831, bias=True)
    (dropout): Dropout(p=0.5, inplace=False)
  )
)

In [105]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


print(f"The model has {count_parameters(model):,} trainable parameters")

The model has 11,864,055 trainable parameters


In [106]:
optimizer = optim.Adam(model.parameters())

In [107]:
criterion = nn.CrossEntropyLoss(ignore_index=pad_index)

In [108]:
def train_fn(
    model, data_loader, optimizer, criterion, clip, teacher_forcing_ratio, device
):
    model.train()
    epoch_loss = 0
    for i, batch in enumerate(data_loader):
        src = batch["q_ids"].to(device)
        trg = batch["a_ids"].to(device)
        # src = [src length, batch size]
        # trg = [trg length, batch size]
        optimizer.zero_grad()
        output = model(src, trg, teacher_forcing_ratio)
        # output = [trg length, batch size, trg vocab size]
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        # output = [(trg length - 1) * batch size, trg vocab size]
        trg = trg[1:].view(-1)
        # trg = [(trg length - 1) * batch size]
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(data_loader)

In [109]:
def evaluate_fn(model, data_loader, criterion, device):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for i, batch in enumerate(data_loader):
            src = batch["q_ids"].to(device)
            trg = batch["a_ids"].to(device)
            # src = [src length, batch size]
            # trg = [trg length, batch size]
            output = model(src, trg, 0)  # turn off teacher forcing
            # output = [trg length, batch size, trg vocab size]
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            # output = [(trg length - 1) * batch size, trg vocab size]
            trg = trg[1:].view(-1)
            # trg = [(trg length - 1) * batch size]
            loss = criterion(output, trg)
            epoch_loss += loss.item()
    return epoch_loss / len(data_loader)

# Training

In [110]:
n_epochs = 4
clip = 1.0
teacher_forcing_ratio = 0.5

best_valid_loss = float("inf")

for epoch in tqdm.tqdm(range(n_epochs)):
    train_loss = train_fn(
        model,
        train_data_loader,
        optimizer,
        criterion,
        clip,
        teacher_forcing_ratio,
        device,
    )
    valid_loss = evaluate_fn(
        model,
        valid_data_loader,
        criterion,
        device,
    )
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), "best.pt")
    print(f"\tTrain Loss: {train_loss:7.3f} | Train PPL: {np.exp(train_loss):7.3f}")
    print(f"\tValid Loss: {valid_loss:7.3f} | Valid PPL: {np.exp(valid_loss):7.3f}")

 25%|██▌       | 1/4 [01:27<04:23, 87.78s/it]

	Train Loss:   5.355 | Train PPL: 211.729
	Valid Loss:   4.231 | Valid PPL:  68.766


 50%|█████     | 2/4 [02:47<02:46, 83.17s/it]

	Train Loss:   4.956 | Train PPL: 142.061
	Valid Loss:   4.225 | Valid PPL:  68.363


 75%|███████▌  | 3/4 [04:08<01:22, 82.03s/it]

	Train Loss:   4.864 | Train PPL: 129.514
	Valid Loss:   4.222 | Valid PPL:  68.166


100%|██████████| 4/4 [05:39<00:00, 84.97s/it]

	Train Loss:   4.821 | Train PPL: 124.123
	Valid Loss:   4.233 | Valid PPL:  68.909





# Evaluation

In [111]:
model.load_state_dict(torch.load("best.pt"))

test_loss = evaluate_fn(model, test_data_loader, criterion, device)

print(f"| Test Loss: {test_loss:.3f} | Test PPL: {np.exp(test_loss):7.3f} |")

| Test Loss: 4.391 | Test PPL:  80.691 |


In [114]:
def answer_question(
    question,
    model,
    en_nlp,
    a_vocab,
    q_vocab,
    lower,
    sos_token,
    eos_token,
    device,
    max_output_length=25,
):
    model.eval()
    with torch.no_grad():
        if isinstance(question, str):
            tokens = [token.text for token in en_nlp.tokenizer(question)]
        else:
            tokens = [token for token in question]
        if lower:
            tokens = [token.lower() for token in tokens]
        tokens = [sos_token] + tokens + [eos_token]
        ids = q_vocab.lookup_indices(tokens)
        tensor = torch.LongTensor(ids).unsqueeze(-1).to(device)
        hidden, cell = model.encoder(tensor)
        inputs = a_vocab.lookup_indices([sos_token])
        for _ in range(max_output_length):
            inputs_tensor = torch.LongTensor([inputs[-1]]).to(device)
            output, hidden, cell = model.decoder(inputs_tensor, hidden, cell)
            predicted_token = output.argmax(-1).item()
            inputs.append(predicted_token)
            if predicted_token == a_vocab[eos_token]:
                break
        tokens = a_vocab.lookup_tokens(inputs)
    return tokens

In [121]:
question = test_data[1000]["question"]
expected_answer = test_data[1000]["answers"]["text"][0]

question, expected_answer

('What alloy can no longer be used in HASL because of restrictions on the use of one of its metal components?',
 'tin-lead')

In [122]:
answer = answer_question(
    question,
    model,
    en_nlp,
    a_vocab,
    q_vocab,
    lower,
    sos_token,
    eos_token,
    device,
)

In [123]:
answer

['<sos>', '<unk>', '<eos>']

In [124]:
translations = [
    answer_question(
        example["question"],
        model,
        en_nlp,
        a_vocab,
        q_vocab,
        lower,
        sos_token,
        eos_token,
        device,
    )
    for example in tqdm.tqdm(test_data)
]

100%|██████████| 8760/8760 [00:51<00:00, 170.99it/s]


In [125]:
translations

[['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>', '<unk>', '<eos>'],
 ['<sos>',