<a href="https://colab.research.google.com/github/vivek-viswam-rv/language-translator/blob/main/GRU_grid_search.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
import urllib.request
import tarfile

# downloading the europarl fr-en dataset
url = "http://www.statmt.org/europarl/v7/fr-en.tgz"
filename = "fr-en.tgz"
urllib.request.urlretrieve(url, filename)
with tarfile.open(filename, "r:gz") as tar:
    tar.extractall("europarl")

In [None]:
!pip install sentencepiece
import sentencepiece as spm

In [None]:
ENGLISH_PATH = "europarl/europarl-v7.fr-en.en"
FRENCH_PATH = "europarl/europarl-v7.fr-en.fr"

def zip_files(english_path, french_path):
    with open(english_path, "r", encoding="utf-8") as f_english, \
         open(french_path, "r", encoding="utf-8") as f_french:
        french_lines = f_french.readlines()
        english_lines = f_english.readlines()

    assert len(english_lines) == len(french_lines), "different number of lines in files!!"
    pairs = list(zip(french_lines, english_lines))
    return pairs

sentence_pairs = zip_files(ENGLISH_PATH, FRENCH_PATH)

print("total sentence pairs:", len(sentence_pairs))


In [None]:
import unicodedata
import re

def clean_text(s: str) -> str:
    # normalize unicode to NFC
    s = unicodedata.normalize("NFC", s)
    # replace non breaking space with regular space
    s = s.replace("\xa0", " ")
    # collapse multiple spaces into a single space
    s = re.sub(r"\s+", " ", s)
    # strip whitespace and lowercase all letters
    return s.strip().lower()

def clean_data(sentences):
    cleaned_data = []
    for french, english in sentences:
        french_clean = clean_text(french)
        english_clean = clean_text(english)
        if french_clean == "" or english_clean == "":
            continue
        cleaned_data.append((french_clean, english_clean))
    return cleaned_data


cleaned_data = clean_data(sentence_pairs)
print("after cleaning total sentence pairs:", len(cleaned_data))


In [None]:
import random

random.seed(42)
random.shuffle(cleaned_data)

n_total = len(cleaned_data)
n_train = int(0.80 * n_total)
n_validation = int(0.10 * n_total)
n_test = n_total - n_train - n_validation

train_pairs      = cleaned_data[:n_train]
validation_pairs = cleaned_data[n_train:n_train+n_validation]
test_pairs       = cleaned_data[n_train+n_validation:]

train_pairs = train_pairs[:20000]
validation_pairs = validation_pairs[:10000]
test_pairs = test_pairs[:10000]

with open("fr_train.txt", "w", encoding="utf-8") as f_fr, \
     open("en_train.txt", "w", encoding="utf-8") as f_en:
    for fr, en in train_pairs:
        f_fr.write(fr + "\n")
        f_en.write(en + "\n")


print(f"train: {len(train_pairs)}, val: {len(validation_pairs)}, test: {len(test_pairs)}")


In [None]:
#french
spm.SentencePieceTrainer.train(
    input="fr_train.txt",
    model_prefix="sp_fr",
    vocab_size=10000,
    model_type="bpe",
    character_coverage=0.9995,
    pad_id=0,
    unk_id=1,
    bos_id=2,
    eos_id=3,
    num_threads=1,
    shuffle_input_sentence=False,
    input_sentence_size=0,
    seed_sentencepiece_size=0
)

#english
spm.SentencePieceTrainer.train(
    input="en_train.txt",
    model_prefix="sp_en",
    vocab_size=10000,
    model_type="bpe",
    character_coverage=0.9995,
    pad_id=0,
    unk_id=1,
    bos_id=2,
    eos_id=3,
    num_threads=1,
    shuffle_input_sentence=False,
    input_sentence_size=0,
    seed_sentencepiece_size=0
)

sp_fr = spm.SentencePieceProcessor()
sp_fr.load("sp_fr.model")

sp_en = spm.SentencePieceProcessor()
sp_en.load("sp_en.model")

# vocab sizes
source_vocab_size = sp_fr.get_piece_size()
target_vocab_size = sp_en.get_piece_size()

SOURCE_PAD_IDX = sp_fr.pad_id()
SOURCE_UNK_IDX = sp_fr.unk_id()
SOURCE_BOS_IDX = sp_fr.bos_id()
SOURCE_EOS_IDX = sp_fr.eos_id()

TARGET_PAD_IDX = sp_en.pad_id()
TARGET_UNK_IDX = sp_en.unk_id()
TARGET_BOS_IDX = sp_en.bos_id()
TARGET_EOS_IDX = sp_en.eos_id()


In [None]:
import shutil

shutil.copy("sp_fr.model", "/content/drive/MyDrive/sp_fr.model")
shutil.copy("sp_fr.vocab", "/content/drive/MyDrive/sp_fr.vocab")

shutil.copy("sp_en.model", "/content/drive/MyDrive/sp_en.model")
shutil.copy("sp_en.vocab", "/content/drive/MyDrive/sp_en.vocab")


In [None]:
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence
import torch

#max subword length is 60
MAX_LEN = 60

class TranslationDataset(Dataset):
    def __init__(self, pairs, sp_source, sp_target, max_len=MAX_LEN):
        self.pairs = pairs
        self.sp_source = sp_source
        self.sp_target = sp_target
        self.max_len = max_len

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        french_str, english_str = self.pairs[idx]
        src_ids = self.sp_source.encode(french_str, out_type=int)[:max_len]
        toks = self.sp_target.encode(english_str, out_type=int)[:self.max_len-2]
        tgt_ids = [TARGET_BOS_IDX] + toks + [TARGET_EOS_IDX]
        src_ids = src_ids[:self.max_len]
        tgt_ids = tgt_ids[:self.max_len]

        return torch.tensor(src_ids), torch.tensor(tgt_ids)


def collate_fn(batch):
    src_seqs, tgt_seqs = zip(*batch)
    src_lengths = torch.tensor([len(s) for s in src_seqs], dtype=torch.long)
    tgt_lengths = torch.tensor([len(t) for t in tgt_seqs], dtype=torch.long)
    src_padded = pad_sequence(src_seqs, batch_first=True, padding_value=SOURCE_PAD_IDX)
    tgt_padded = pad_sequence(tgt_seqs, batch_first=True, padding_value=TARGET_PAD_IDX)

    return src_padded, tgt_padded, src_lengths, tgt_lengths



In [None]:
from torch.utils.data import DataLoader

BATCH_SIZE = 128

train_dataset = TranslationDataset(train_pairs,      sp_fr, sp_en)
val_dataset   = TranslationDataset(validation_pairs, sp_fr, sp_en)
test_dataset  = TranslationDataset(test_pairs,       sp_fr, sp_en)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE,
                          shuffle=True, collate_fn=collate_fn)

validation_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE,
                               shuffle=False, collate_fn=collate_fn)

test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE,
                         shuffle=False, collate_fn=collate_fn)



source_batch, target_batch, source_lengths, target_lengths = next(iter(train_loader))


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F



class Encoder(nn.Module):
    def __init__(self, input_dim, embed_dim, hidden_dim, dropout):
        super().__init__()
        # input_dim is the size of french vocab
        self.embedding = nn.Embedding(input_dim, embed_dim, padding_idx=SOURCE_PAD_IDX)
        self.gru = nn.GRU(
            embed_dim,
            hidden_dim,
            batch_first=True,
            bidirectional=True,
        )
        self.fc = nn.Linear(hidden_dim * 2, hidden_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, source):
        #we start by embedding the input tokens
        embedded = self.dropout(self.embedding(source))

        #we pass the embedded tokens through the bidirectional GRU
        encoder_outputs, hidden = self.gru(embedded)
        forward_hidden  = hidden[-2]
        backward_hidden = hidden[-1]

        #concatenate & map to decoder hidden size
        combined_hidden = torch.cat((forward_hidden, backward_hidden), dim=1)
        decoder_init_hidden = torch.tanh(self.fc(combined_hidden)).unsqueeze(0)

        return encoder_outputs, decoder_init_hidden

class BahdanauAttention(nn.Module):
    def __init__(self, encoder_hidden_dim, decoder_hidden_dim):
        super().__init__()
        self.attention = nn.Linear(encoder_hidden_dim * 2 + decoder_hidden_dim, decoder_hidden_dim)
        self.v = nn.Linear(decoder_hidden_dim, 1, bias=False)

    def forward(self, decoder_hidden, encoder_outputs, source_mask):
        source_length = encoder_outputs.size(1)
        decoder_hidden = decoder_hidden.permute(1, 0, 2).repeat(1, source_length, 1)
        combined = torch.cat((decoder_hidden, encoder_outputs), dim=2)
        energy = torch.tanh(self.attention(combined))
        scores = self.v(energy).squeeze(-1)
        scores = scores.masked_fill(~source_mask, -1e9)
        attention_weights = F.softmax(scores, dim=1)

        return attention_weights

class Decoder(nn.Module):
    def __init__(self, output_dim, embed_dim, encoder_hidden_dim, decoder_hidden_dim, attention, dropout):
        super().__init__()
        self.output_dim = output_dim
        self.attention = attention
        self.embedding = nn.Embedding(output_dim, embed_dim, padding_idx=TARGET_PAD_IDX)

        self.gru = nn.GRU(
            embed_dim + encoder_hidden_dim * 2,
            decoder_hidden_dim,
            batch_first=True,
        )

        self.fc_out = nn.Linear(
            decoder_hidden_dim + encoder_hidden_dim * 2 + embed_dim,
            output_dim,
        )

        self.dropout = nn.Dropout(dropout)

    def forward(self, input_token, hidden, encoder_outputs, source_mask):
        input_token = input_token.unsqueeze(1)
        embedded = self.dropout(self.embedding(input_token))

        attention_weights = self.attention(hidden, encoder_outputs, source_mask).unsqueeze(1)
        context = attention_weights @ encoder_outputs
        gru_input = torch.cat((embedded, context), dim=2)
        output, hidden = self.gru(gru_input, hidden)
        output = output.squeeze(1)
        context = context.squeeze(1)
        embedded = embedded.squeeze(1)

        combined = torch.cat((output, context, embedded), dim=1)
        logits = self.fc_out(combined)

        return logits, hidden

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, source, target, teacher_forcing_ratio=0.5):
        batch_size = source.size(0)
        target_length = target.size(1)
        output_dim = self.decoder.output_dim
        outputs = torch.zeros(batch_size, target_length - 1, output_dim, device=self.device)
        encoder_outputs, hidden = self.encoder(source)
        source_mask = (source != SOURCE_PAD_IDX).bool()
        input_token = target[:, 0]

        for t in range(1, target_length):
            logits, hidden = self.decoder(
                input_token,
                hidden,
                encoder_outputs,
                source_mask,
            )

            outputs[:, t - 1, :] = logits
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top_prediction = logits.argmax(dim=1)
            input_token = target[:, t] if teacher_force else top_prediction

        return outputs




In [None]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

def train_one_epoch(model, dataloader, optimizer, criterion, teacher_forcing_ratio=0.5):
    model.train()
    epoch_loss = 0.0

    for source_batch, target_batch, source_lengths, target_lengths in dataloader:
        source_batch = source_batch.to(device)
        target_batch = target_batch.to(device)

        optimizer.zero_grad()
        outputs = model(source_batch, target_batch, teacher_forcing_ratio=teacher_forcing_ratio)
        targ = target_batch[:, 1:]
        logits = outputs.reshape(-1, outputs.size(-1))
        targ = targ.reshape(-1)
        loss = criterion(logits, targ)
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(dataloader)

@torch.no_grad()
def evaluate(model, dataloader, criterion):
    model.eval()
    epoch_loss = 0.0

    for source_batch, target_batch, source_lengths, target_lengths in dataloader:
        source_batch = source_batch.to(device)
        target_batch = target_batch.to(device)
        outputs = model(source_batch, target_batch, teacher_forcing_ratio=0.0)
        targ = target_batch[:, 1:]
        logits = outputs.reshape(-1, outputs.size(-1))
        targ = targ.reshape(-1)
        loss = criterion(logits, targ)
        epoch_loss += loss.item()

    return epoch_loss / len(dataloader)

N_EPOCHS = 50

training_losses = []
validation_losses = []
for epoch in range(1, N_EPOCHS + 1):
    train_loss = train_one_epoch(model, train_loader, optimizer, criterion, teacher_forcing_ratio=max(0.3, 1 - epoch * 0.03))
    val_loss = evaluate(model, validation_loader, criterion)
    training_losses.append(train_loss)
    validation_losses.append(val_loss)
    if val_loss < min(validation_losses):
        torch.save(model.state_dict(), "best_model.pt")
    print(f"Epoch {epoch}: train_loss={train_loss:.4f}, val_loss={val_loss:.4f}")




In [None]:
best_checkpoint = {
    "state_dict": model.state_dict(),
    "config": {
        "HIDDEN_DIM":        512,
        "ENCODER_EMBED_DIM": 256,
        "DECODER_EMBED_DIM": 256,
        "DROPOUT":           0.2,
        "LR":                1e-3,
    },
    "tokenizers": {
        "sp_fr_model": "/content/drive/MyDrive/sp_fr.model",
        "sp_en_model": "/content/drive/MyDrive/sp_en.model",
    },
}

torch.save(best_checkpoint, "/content/drive/MyDrive/europarl_gru_best.pt")


In [None]:
import torch
import torch.nn.functional as F

@torch.no_grad()
def translate_sentence_greedy(
    model,
    fr_str,
    sp_source,
    sp_target,
    max_len=60,
):
    model.eval()

    fr_str = clean_text(fr_str)

    src_ids = sp_source.encode(fr_str, out_type=int)[:max_len]
    src_tensor = torch.tensor(src_ids, dtype=torch.long, device=device).unsqueeze(0)
    encoder_outputs, hidden = model.encoder(src_tensor)
    source_mask = (src_tensor != SOURCE_PAD_IDX).bool()
    input_token = torch.tensor([TARGET_BOS_IDX], device=device)
    decoded_ids = []

    for _ in range(max_len):
        logits, hidden = model.decoder(input_token, hidden, encoder_outputs, source_mask)
        next_token = logits.argmax(dim=-1)
        next_id = next_token.item()

        if next_id == TARGET_EOS_IDX:
            break

        decoded_ids.append(next_id)
        input_token = next_token
    return sp_target.decode(decoded_ids)


In [None]:
!pip install sacrebleu
import sacrebleu

@torch.no_grad()
def compute_bleu(
    model,
    test_pairs,
    sp_source,
    sp_target,
    max_len=60,
    num_samples=10000,
    decode_fn=translate_sentence_greedy,
):
    model.eval()
    preds = []
    refs = []

    pairs = test_pairs[:num_samples]

    for fr, en_targ in pairs:
        pred_en = decode_fn(model, fr, sp_source, sp_target, max_len=max_len)

        preds.append(pred_en.strip())
        refs.append(en_targ.strip())

    bleu = sacrebleu.corpus_bleu(preds, [refs])
    print(f"BLEU score: {bleu.score:.2f}")
    return bleu.score


In [None]:
PATH = "/content/drive/MyDrive/best_gru_model.pt"
model = torch.load(PATH, map_location=device, weights_only=False)
model.to(device)
model.eval()

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=TARGET_PAD_IDX)
val_loss = evaluate(model, validation_loader, criterion)
print("validation loss of loaded model:", val_loss)


In [None]:
from copy import deepcopy
import os
import matplotlib.pyplot as plt
os.makedirs("plots", exist_ok=True)

def build_model_and_optimizer(cfg):
    encoder = Encoder(
        input_dim=source_vocab_size,
        embed_dim=cfg["ENCODER_EMBED_DIM"],
        hidden_dim=cfg["HIDDEN_DIM"],
        dropout=cfg["DROPOUT"],
    ).to(device)

    attention = BahdanauAttention(
        encoder_hidden_dim=cfg["HIDDEN_DIM"],
        decoder_hidden_dim=cfg["HIDDEN_DIM"],
    ).to(device)

    decoder = Decoder(
        output_dim=target_vocab_size,
        embed_dim=cfg["DECODER_EMBED_DIM"],
        encoder_hidden_dim=cfg["HIDDEN_DIM"],
        decoder_hidden_dim=cfg["HIDDEN_DIM"],
        attention=attention,
        dropout=cfg["DROPOUT"],
    ).to(device)

    model = Seq2Seq(encoder, decoder, device).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=cfg["LR"])
    criterion = nn.CrossEntropyLoss(ignore_index=TARGET_PAD_IDX)

    return model, optimizer, criterion


def run_experiment(cfg, num_epochs=15, teacher_forcing_start=1.0):
    print(f"\nrunning config: {cfg}")

    model, optimizer, criterion = build_model_and_optimizer(cfg)
    best_val_loss = float("inf")
    best_state = None
    train_hist = []
    val_hist   = []

    for epoch in range(1, num_epochs + 1):
        tf_ratio = max(0.3, teacher_forcing_start - 0.05 * (epoch - 1))
        train_loss = train_one_epoch(
            model,
            train_loader,
            optimizer,
            criterion,
            teacher_forcing_ratio=tf_ratio,
        )
        val_loss = evaluate(model, validation_loader, criterion)
        train_hist.append(train_loss)
        val_hist.append(val_loss)
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_state = deepcopy(model.state_dict())

        print(
            f"Epoch {epoch:02d}: "
            f"train_loss={train_loss:.4f}, val_loss={val_loss:.4f}, TF={tf_ratio:.2f}"
        )

    if best_state is not None:
        model.load_state_dict(best_state)
    cfg_tag = (
        f"hd{cfg['HIDDEN_DIM']}_"
        f"emb{cfg['ENCODER_EMBED_DIM']}_"
        f"lr{str(cfg['LR']).replace('.', 'p')}_"
        f"do{str(cfg['DROPOUT']).replace('.', 'p')}"
    )

    plt.figure()
    plt.plot(train_hist, label="train")
    plt.plot(val_hist,   label="val")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title(f"GRU Loss curves ({cfg_tag})")
    plt.legend()
    plt.tight_layout()
    plt.savefig(f"gru_plots/gru_loss_{cfg_tag}.png", dpi=150)
    plt.close()

    torch.save(
        {
            "config": cfg,
            "train_loss": train_hist,
            "val_loss": val_hist,
            "best_val_loss": best_val_loss,
        },
        f"plots/gru_loss_{cfg_tag}.pt",
    )
    val_bleu = compute_bleu(
        model,
        validation_pairs,
        sp_fr,
        sp_en,
        max_len=MAX_LEN,
        num_samples=1000,
        decode_fn=translate_sentence_greedy,
    )

    print(
        f"config {cfg} -> best_val_loss={best_val_loss:.4f}, "
        f"val_bleu={val_bleu:.2f}"
    )

    return best_val_loss, val_bleu


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from itertools import product

search_space = {
    "HIDDEN_DIM":        [256, 512],
    "ENCODER_EMBED_DIM": [256, 512],
    "DECODER_EMBED_DIM": [256, 512],
    "DROPOUT":   [0.1, 0.3],
    "LR":                [1e-3, 5e-4],
}

configs = []
keys = list(search_space.keys())
for values in product(*search_space.values()):
    cfg = dict(zip(keys, values))
    configs.append(cfg)
results = []
for cfg in configs:
    val_loss, val_bleu = run_experiment(cfg, num_epochs=20)
    results.append({**cfg, "val_loss": val_loss, "val_bleu": val_bleu})

results_sorted = sorted(results, key=lambda r: r["val_bleu"], reverse=True)
print("\ngrid search results")
for r in results_sorted:
    print(r)
