In [1]:
# ================================
# 1. Mount Drive & basic setup
# ================================
from google.colab import drive
drive.mount("/content/drive")

from pathlib import Path
import torch
import torch.nn as nn
import sentencepiece as spm

DATA_DIR = Path("/content/drive/MyDrive/DL Final Project/data/europarl")
CKPT_DIR = Path("/content/drive/MyDrive/DL Final Project/checkpoints")

SPM_PATH   = DATA_DIR / "spm_bpe.model"
FT_CKPT    = CKPT_DIR / "seq2seq_finetuned_final.pth"   # fine-tuned checkpoint you saved

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)
print("SPM model:", SPM_PATH)
print("Checkpoint:", FT_CKPT)

Mounted at /content/drive
Using device: cuda
SPM model: /content/drive/MyDrive/DL Final Project/data/europarl/spm_bpe.model
Checkpoint: /content/drive/MyDrive/DL Final Project/checkpoints/seq2seq_finetuned_final.pth


In [2]:
# ================================
# 2. Load SentencePiece tokenizer
# ================================
sp = spm.SentencePieceProcessor()
sp.Load(str(SPM_PATH))

PAD, BOS, EOS, UNK = sp.pad_id(), sp.bos_id(), sp.eos_id(), sp.unk_id()
VOCAB = sp.get_piece_size()
print("Vocab size:", VOCAB)
print("PAD/BOS/EOS/UNK:", PAD, BOS, EOS, UNK)

Vocab size: 8000
PAD/BOS/EOS/UNK: 0 1 2 3


In [3]:
# ================================
# 3. Model definitions (same as training)
# ================================
class Enc(nn.Module):
    def __init__(self, V, E=256, H=512):
        super().__init__()
        self.emb = nn.Embedding(V, E, padding_idx=PAD)
        self.rnn = nn.LSTM(E, H, num_layers=2, batch_first=True, bidirectional=True)
        self.H = H

    def forward(self, x, lens):
        # x: [B, Ts]
        e = self.emb(x)
        pack = nn.utils.rnn.pack_padded_sequence(
            e, lens.cpu(), batch_first=True, enforce_sorted=False
        )
        out, (h, c) = self.rnn(pack)
        out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
        # concat bidirectional states
        h = torch.cat([h[0:h.size(0):2], h[1:h.size(0):2]], dim=2)
        c = torch.cat([c[0:c.size(0):2], c[1:c.size(0):2]], dim=2)
        return out, (h, c)

class Luong(nn.Module):
    def __init__(self, H):
        super().__init__()
        self.W = nn.Linear(H, H, bias=False)

    def forward(self, q, K, V, mask):
        # q: [B,H], K/V: [B,Ts,H]
        score = torch.bmm(self.W(q).unsqueeze(1), K.transpose(1, 2)).squeeze(1)
        score.masked_fill_(mask, -1e9)
        attn = score.softmax(-1)          # [B,Ts]
        ctx = torch.bmm(attn.unsqueeze(1), V).squeeze(1)  # [B,H]
        return ctx, attn

class Dec(nn.Module):
    def __init__(self, V, E=256, H=1024):  # H doubled for bidi encoder
        super().__init__()
        self.emb = nn.Embedding(V, E, padding_idx=PAD)
        self.attn = Luong(H)
        self.rnn = nn.LSTM(E + H, H, num_layers=2, batch_first=True)
        self.fc  = nn.Linear(H + H, V)
        self.H = H

    def forward(self, y_prev, hidden, enc_out, src_mask):
        # y_prev: [B,1]
        e = self.emb(y_prev)           # [B,1,E]
        q = hidden[0][-1]              # [B,H] (top-layer hidden)
        ctx, _ = self.attn(q, enc_out, enc_out, src_mask)
        rnn_in = torch.cat([e, ctx.unsqueeze(1)], dim=2)
        out, hidden = self.rnn(rnn_in, hidden)     # out: [B,1,H]
        logits = self.fc(torch.cat([out.squeeze(1), ctx], dim=1))  # [B,V]
        return logits, hidden

class Seq2Seq(nn.Module):
    def __init__(self, V, E=256, H=512):
        super().__init__()
        self.enc = Enc(V, E, H)
        self.dec = Dec(V, E, H * 2)

    def mask(self, x):
        # x: [B,Ts]
        return (x == PAD)

    def forward(self, src, src_lens, tgt, teacher=0.5):
        # Only needed for training; not used in UI notebook
        B, T = tgt.size()
        enc_out, hid = self.enc(src, src_lens)
        m = self.mask(src)
        y = tgt[:, 0].unsqueeze(1)     # BOS
        outs = []
        for t in range(1, T):
            logits, hid = self.dec(y, hid, enc_out, m)
            outs.append(logits.unsqueeze(1))
            use_teacher = torch.rand(1).item() < teacher
            next_tok = tgt[:, t] if use_teacher else logits.argmax(-1)
            y = next_tok.unsqueeze(1)
        return torch.cat(outs, 1)

    def decode(self, src, src_lens, max_len=120):
        self.eval()
        enc_out, hid = self.enc(src, src_lens)
        m = self.mask(src)
        y = torch.full(
            (src.size(0), 1), BOS, dtype=torch.long, device=src.device
        )
        outs = [y]
        for _ in range(max_len):
            logits, hid = self.dec(y, hid, enc_out, m)
            y = logits.argmax(-1, keepdim=True)
            outs.append(y)
            if (y.squeeze(1) == EOS).all():
                break
        return torch.cat(outs, 1)


In [4]:
# ================================
# 4. Instantiate & load checkpoint
# ================================
model = Seq2Seq(VOCAB).to(device)
state = torch.load(FT_CKPT, map_location=device)
model.load_state_dict(state)
model.eval()
print("Fine-tuned model loaded.")


Fine-tuned model loaded.


In [5]:
# ================================
# 5. Translation helper
# ================================
import re

MAX_LEN = 120  # decode length (training used 120 inside SPMDataset)

def clean_text(s: str) -> str:
    s = s.strip().lower()
    s = re.sub(r"\s+", " ", s)
    return s

def strip_ids(ids):
    """Remove BOS/PAD and cut at EOS."""
    cleaned = []
    for i in ids:
        i = int(i)
        if i in (PAD, BOS):
            continue
        if i == EOS:
            break
        cleaned.append(i)
    return cleaned

def translate_de_to_en(de_text: str) -> str:
    if not de_text.strip():
        return ""
    de_text = clean_text(de_text)

    # Encode German sentence
    src_ids = [BOS] + sp.encode(de_text, out_type=int)[:MAX_LEN - 2] + [EOS]
    src_tensor = torch.tensor(src_ids, dtype=torch.long, device=device).unsqueeze(0)
    src_len = torch.tensor([len(src_ids)], dtype=torch.long, device=device)

    with torch.no_grad():
        out = model.decode(src_tensor, src_len, max_len=MAX_LEN)  # [1, T]
    hyp_ids = strip_ids(out[0].tolist())
    en_text = sp.decode_ids(hyp_ids)
    return en_text

# Quick sanity check (optional)
print("Test:", translate_de_to_en("Das ist ein wichtiger Punkt."))


Test: that is an important point .


In [6]:
# ================================
# 6. Gradio web interface
# ================================
import gradio as gr

def translate_interface(text):
    return translate_de_to_en(text)

title = "ðŸ‡©ðŸ‡ª âžœ ðŸ‡¬ðŸ‡§ GERMAN TO ENGLISH - Language Translation"
description = (
    "Bi-LSTM Encoderâ€“Decoder with Luong Attention for Germanâ€“English Translation "
)

demo = gr.Interface(
    fn=translate_interface,
    inputs=gr.Textbox(lines=4, label="German Input"),
    outputs=gr.Textbox(lines=4, label="English Translation"),
    title=title,
    description=description,
)

# Set share=True to get a public URL that looks like a standalone website
demo.launch(share=True)

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://ecb6494374330c5b71.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


