<a href="https://colab.research.google.com/github/nisha-s10/Deep-Learning-Lab-AFI524/blob/main/Experiment%206/Experiement%206.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### STEP 1 – IMPORT LIBRARIES

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
import re
from collections import Counter
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from nltk.translate.bleu_score import sentence_bleu
import matplotlib.pyplot as plt
import seaborn as sns

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### STEP 2 – LOAD DATASET

In [1]:
file_path = "spa.txt"

pairs = []
with open(file_path, encoding="utf-8") as f:
    for line in f:
        eng, spa = line.strip().split("\t")
        pairs.append((eng.lower(), spa.lower()))

pairs = pairs[:5000]
print("Total sentence pairs:", len(pairs))

Total sentence pairs: 5000


### STEP 3 — CLEAN TEXT

In [None]:
def clean_text(text):
    text = re.sub(r'[^a-zA-Z¿¡?.!,\s]', '', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

pairs = [(clean_text(e), clean_text(s)) for e, s in pairs]

### STEP 4 — TRAIN/VAL/TEST SPLIT (80/10/10)

In [None]:
train_data, temp_data = train_test_split(pairs, test_size=0.2, random_state=42)
val_data, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)

print("Train:", len(train_data))
print("Validation:", len(val_data))
print("Test:", len(test_data))

Train: 4000
Validation: 500
Test: 500


### STEP 5 — BUILD VOCABULARY

In [None]:
def build_vocab(sentences):
    counter = Counter()
    for s in sentences:
        counter.update(s.split())

    vocab = {"<pad>":0, "<sos>":1, "<eos>":2, "<unk>":3}

    for word in counter:
        vocab[word] = len(vocab)

    return vocab

eng_vocab = build_vocab([e for e, s in pairs])
spa_vocab = build_vocab([s for e, s in pairs])

eng_vocab_size = len(eng_vocab)
spa_vocab_size = len(spa_vocab)

### STEP 6 — DATASET + PADDING

In [None]:
MAX_LEN = 15

class TranslationDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def encode(self, sentence, vocab):
        tokens = [vocab.get(w, vocab["<unk>"]) for w in sentence.split()]
        tokens = tokens[:MAX_LEN-1]
        tokens.append(vocab["<eos>"])
        tokens += [vocab["<pad>"]] * (MAX_LEN - len(tokens))
        return torch.tensor(tokens)

    def __getitem__(self, idx):
        eng, spa = self.data[idx]

        src = self.encode(eng, eng_vocab)

        trg = [spa_vocab["<sos>"]]
        trg += self.encode(spa, spa_vocab).tolist()
        trg = trg[:MAX_LEN]
        trg += [spa_vocab["<pad>"]] * (MAX_LEN - len(trg))
        trg = torch.tensor(trg)

        return src, trg

    def __len__(self):
        return len(self.data)

train_loader = DataLoader(TranslationDataset(train_data),
                          batch_size=32, shuffle=True)

### VANILLA ENCODER–DECODER (NO ATTENTION)

#### Encoder

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hidden_dim, batch_first=True)

    def forward(self, x):
        embedded = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embedded)
        return outputs, hidden, cell

#### Decoder

In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, hidden, cell):
        x = x.unsqueeze(1)
        embedded = self.embedding(x)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc(output.squeeze(1))
        return prediction, hidden, cell

### BAHADANAU ATTENTION

In [None]:
class BahdanauAttention(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.W1 = nn.Linear(hidden_dim, hidden_dim)
        self.W2 = nn.Linear(hidden_dim, hidden_dim)
        self.V = nn.Linear(hidden_dim, 1)

    def forward(self, hidden, encoder_outputs):
        hidden = hidden[-1].unsqueeze(1)
        score = torch.tanh(self.W1(encoder_outputs) + self.W2(hidden))
        attention = torch.softmax(self.V(score), dim=1)
        context = torch.sum(attention * encoder_outputs, dim=1)
        return context, attention

### LUONG ATTENTION

In [None]:
class LuongAttention(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.W = nn.Linear(hidden_dim, hidden_dim)

    def forward(self, hidden, encoder_outputs):
        hidden = hidden[-1].unsqueeze(2)
        energy = torch.bmm(self.W(encoder_outputs), hidden).squeeze(2)
        attention = torch.softmax(energy, dim=1)
        context = torch.bmm(attention.unsqueeze(1),
                            encoder_outputs).squeeze(1)
        return context, attention

### DECODERS WITH ATTENTION

#### Bahdanau Decoder

In [None]:
class DecoderBahdanau(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.attention = BahdanauAttention(hidden_dim)
        self.lstm = nn.LSTM(emb_dim + hidden_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, hidden, cell, encoder_outputs):
        x = x.unsqueeze(1)
        embedded = self.embedding(x)
        context, attention = self.attention(hidden, encoder_outputs)
        context = context.unsqueeze(1)
        lstm_input = torch.cat((embedded, context), dim=2)
        output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))
        prediction = self.fc(output.squeeze(1))
        return prediction, hidden, cell, attention

#### Luong Decoder

In [None]:
class DecoderLuong(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.attention = LuongAttention(hidden_dim)
        self.lstm = nn.LSTM(emb_dim + hidden_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, hidden, cell, encoder_outputs):
        x = x.unsqueeze(1)
        embedded = self.embedding(x)
        context, attention = self.attention(hidden, encoder_outputs)
        context = context.unsqueeze(1)
        lstm_input = torch.cat((embedded, context), dim=2)
        output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))
        prediction = self.fc(output.squeeze(1))
        return prediction, hidden, cell, attention

### SEQ2SEQ MODELS

#### Vanilla

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        encoder_outputs, hidden, cell = self.encoder(src)
        input_token = trg[:,0]
        outputs = []

        for t in range(1, MAX_LEN):
            output, hidden, cell = self.decoder(input_token, hidden, cell)
            outputs.append(output)
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input_token = trg[:,t] if teacher_force else top1

        return torch.stack(outputs, dim=1)

#### Attention Version

In [None]:
class Seq2SeqAttention(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        encoder_outputs, hidden, cell = self.encoder(src)
        input_token = trg[:,0]
        outputs = []
        attentions = []

        for t in range(1, MAX_LEN):
            output, hidden, cell, attention = self.decoder(
                input_token, hidden, cell, encoder_outputs)

            outputs.append(output)
            attentions.append(attention)

            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input_token = trg[:,t] if teacher_force else top1

        return torch.stack(outputs, dim=1), torch.stack(attentions, dim=1)

### TRAINING

In [None]:
def train_model(model, epochs=25):
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss(ignore_index=0)

    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for src, trg in train_loader:
            src, trg = src.to(device), trg.to(device)
            optimizer.zero_grad()

            if isinstance(model, Seq2SeqAttention):
                output, _ = model(src, trg)
            else:
                output = model(src, trg)

            loss = criterion(
                output.reshape(-1, spa_vocab_size),
                trg[:,1:].reshape(-1)
            )

            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch+1} Loss: {total_loss/len(train_loader)}")

### TRAIN ALL THREE MODELS

#### Vanilla

In [None]:
model_vanilla = Seq2Seq(
    Encoder(eng_vocab_size,128,128),
    Decoder(spa_vocab_size,128,128)
).to(device)

train_model(model_vanilla)

Epoch 1 Loss: 5.809868377685547
Epoch 2 Loss: 5.025956672668457
Epoch 3 Loss: 4.835268871307373
Epoch 4 Loss: 4.685989490509034
Epoch 5 Loss: 4.575812587738037
Epoch 6 Loss: 4.466721395492554
Epoch 7 Loss: 4.361347686767578
Epoch 8 Loss: 4.281347845077515
Epoch 9 Loss: 4.1909801998138425
Epoch 10 Loss: 4.1252293891906735
Epoch 11 Loss: 4.087166036605835
Epoch 12 Loss: 3.968954137802124
Epoch 13 Loss: 3.8370310153961182
Epoch 14 Loss: 3.7540576190948487
Epoch 15 Loss: 3.6454236755371094
Epoch 16 Loss: 3.490250152587891
Epoch 17 Loss: 3.3833036441802977
Epoch 18 Loss: 3.325146011352539
Epoch 19 Loss: 3.188232530593872
Epoch 20 Loss: 3.0186510066986085
Epoch 21 Loss: 2.9889814071655274
Epoch 22 Loss: 2.8547394733428955
Epoch 23 Loss: 2.767715591430664
Epoch 24 Loss: 2.6331648788452147
Epoch 25 Loss: 2.549083214759827


#### Bahdanau

In [None]:
model_bahdanau = Seq2SeqAttention(
    Encoder(eng_vocab_size,128,128),
    DecoderBahdanau(spa_vocab_size,128,128)
).to(device)

train_model(model_bahdanau)

Epoch 1 Loss: 5.722226245880127
Epoch 2 Loss: 5.030517917633056
Epoch 3 Loss: 4.7999146499633785
Epoch 4 Loss: 4.614158290863037
Epoch 5 Loss: 4.418981121063233
Epoch 6 Loss: 4.237409559249878
Epoch 7 Loss: 4.05439228630066
Epoch 8 Loss: 3.841314193725586
Epoch 9 Loss: 3.648164779663086
Epoch 10 Loss: 3.4620120487213133
Epoch 11 Loss: 3.3227709159851075
Epoch 12 Loss: 3.1379245414733887
Epoch 13 Loss: 2.971459743499756
Epoch 14 Loss: 2.814272668838501
Epoch 15 Loss: 2.6733160572052004
Epoch 16 Loss: 2.5262107372283937
Epoch 17 Loss: 2.3849626293182373
Epoch 18 Loss: 2.2476379737854004
Epoch 19 Loss: 2.1254301290512085
Epoch 20 Loss: 1.990165153503418
Epoch 21 Loss: 1.8634354763031007
Epoch 22 Loss: 1.752424919128418
Epoch 23 Loss: 1.655677978515625
Epoch 24 Loss: 1.5205258760452272
Epoch 25 Loss: 1.418192385673523


#### Luong

In [None]:
model_luong = Seq2SeqAttention(
    Encoder(eng_vocab_size,128,512),
    DecoderLuong(spa_vocab_size,256,512)
).to(device)

train_model(model_luong)

Epoch 1 Loss: 5.509538452148438
Epoch 2 Loss: 4.837158123016358


### BLEU FOR ALL THREE

In [None]:
def evaluate_bleu(model):
    model.eval()
    scores = []

    with torch.no_grad():
        dataset = TranslationDataset(test_data)

        for src, trg in dataset:
            src = src.unsqueeze(0).to(device)
            trg = trg.unsqueeze(0).to(device)

            if isinstance(model, Seq2SeqAttention):
                output, _ = model(src, trg, teacher_forcing_ratio=0)
            else:
                output = model(src, trg, teacher_forcing_ratio=0)

            pred = output.argmax(2).squeeze().tolist()
            reference = trg[:,1:].squeeze().tolist()

            scores.append(sentence_bleu([reference], pred))

    return np.mean(scores)

print("BLEU Vanilla:", evaluate_bleu(model_vanilla))
print("BLEU Bahdanau:", evaluate_bleu(model_bahdanau))
print("BLEU Luong:", evaluate_bleu(model_luong))

### VISUALIZE ATTENTION

In [None]:
src, trg = TranslationDataset(test_data)[0]
src = src.unsqueeze(0).to(device)
trg = trg.unsqueeze(0).to(device)

output, attention = model_bahdanau(src, trg, teacher_forcing_ratio=0)

attention_matrix = attention.squeeze().cpu().detach().numpy()

plt.figure(figsize=(8,6))
sns.heatmap(attention_matrix, cmap="viridis")
plt.title("Bahdanau Attention")
plt.show()