In [1]:
# If sacrebleu is not installed in your environment, uncomment:
# !pip -q install sacrebleu tokenizers scikit-learn pandas

In [2]:
import math
import os
import time
import re

import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace

import sacrebleu

In [3]:
CSV_PATH = "../Cleaned_data/cleaned_hieroglyphs_data.csv"
BATCH_SIZE = 16
EPOCHS = 5
LR = 1e-4

D_MODEL = 256
NHEAD = 8
NUM_LAYERS = 3
DROPOUT = 0.1
MAX_POSITIONS = 4096

VOCAB_SIZE_TARGET = 16000  # good default for English

CHECKPOINT_DIR = "../best_weights/gardiner_to_english_transformer_checkpoints"
os.makedirs(CHECKPOINT_DIR, exist_ok=True)

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
torch.manual_seed(42)

print("Device:", DEVICE)
print("Checkpoint dir:", CHECKPOINT_DIR)

Device: cuda
Checkpoint dir: ../best_weights/gardiner_to_english_transformer_checkpoints


In [4]:
df = pd.read_csv(CSV_PATH)

df = df.dropna(subset=["gardiner_sequence", "english_translation"])
df["gardiner_sequence"] = df["gardiner_sequence"].astype(str)
df["english_translation"] = df["english_translation"].astype(str)

train_df, val_df = train_test_split(
    df,
    test_size=0.10,
    random_state=42,
    shuffle=True
)

print("Train size:", len(train_df))
print("Val size:", len(val_df))
train_df.head()

Train size: 27654
Val size: 3073


Unnamed: 0,gardiner_sequence,english_translation
16381,S19 O1 D21 T22 N35,the siegler peri-sen
26380,A26 S125 G17 D54 G7 W11 G1 V28 Z7 A7A G7 G17 D...,"oh 'image of the exhausted' , come to your sch..."
27041,V28 D36 D36 A28 L1 D21 Y1 G17 R15 D58 D46 N25 ...,celebrations reign throughout abydos !
1035,P6 D36 N35 N35 G81 N35 R8 Q3 N35 M4 X1 3 N11 N...,there this god spent three years and nine mont...
16319,F35 D21 X1 N35 X1 G7 X1 X1 Aa1 Q3 Q3 X1 D54 M1...,an offering that the king gives: may he walk p...


In [5]:
tokenizer = Tokenizer(BPE(unk_token="<unk>"))
tokenizer.pre_tokenizer = Whitespace()

trainer = BpeTrainer(
    vocab_size=VOCAB_SIZE_TARGET,
    special_tokens=["<pad>", "<sos>", "<eos>", "<unk>"]
)

tokenizer.train_from_iterator(
    train_df["gardiner_sequence"].tolist() + train_df["english_translation"].tolist(),
    trainer
)

PAD = tokenizer.token_to_id("<pad>")
SOS = tokenizer.token_to_id("<sos>")
EOS = tokenizer.token_to_id("<eos>")
UNK = tokenizer.token_to_id("<unk>")

VOCAB_SIZE = tokenizer.get_vocab_size()

print("Vocab size:", VOCAB_SIZE)
print("Special IDs:", {"PAD": PAD, "SOS": SOS, "EOS": EOS, "UNK": UNK})

tokenizer_path = os.path.join(CHECKPOINT_DIR, "tokenizer.json")
tokenizer.save(tokenizer_path)
print("Saved tokenizer to:", tokenizer_path)

Vocab size: 16000
Special IDs: {'PAD': 0, 'SOS': 1, 'EOS': 2, 'UNK': 3}
Saved tokenizer to: ../best_weights/gardiner_to_english_transformer_checkpoints\tokenizer.json


In [6]:
class Seq2SeqDataset(Dataset):
    def __init__(self, df, tokenizer, sos_id, eos_id):
        self.src = df["gardiner_sequence"].tolist()
        self.tgt = df["english_translation"].tolist()
        self.tokenizer = tokenizer
        self.sos = sos_id
        self.eos = eos_id

    def __len__(self):
        return len(self.src)

    def __getitem__(self, idx):
        src_ids = [self.sos] + self.tokenizer.encode(self.src[idx]).ids + [self.eos]
        tgt_ids = [self.sos] + self.tokenizer.encode(self.tgt[idx]).ids + [self.eos]
        return torch.tensor(src_ids, dtype=torch.long), torch.tensor(tgt_ids, dtype=torch.long)


def collate_fn(batch, pad_id):
    src, tgt = zip(*batch)
    src = nn.utils.rnn.pad_sequence(src, batch_first=True, padding_value=pad_id)
    tgt = nn.utils.rnn.pad_sequence(tgt, batch_first=True, padding_value=pad_id)
    return src, tgt

In [7]:
train_ds = Seq2SeqDataset(train_df, tokenizer, SOS, EOS)
val_ds   = Seq2SeqDataset(val_df, tokenizer, SOS, EOS)

train_loader = DataLoader(
    train_ds,
    batch_size=BATCH_SIZE,
    shuffle=True,
    collate_fn=lambda b: collate_fn(b, PAD)
)

val_loader = DataLoader(
    val_ds,
    batch_size=BATCH_SIZE,
    shuffle=False,
    collate_fn=lambda b: collate_fn(b, PAD)
)

In [8]:
def simple_detok(s: str) -> str:
    """
    Fix common tokenization artifacts produced by BPE/whitespace decoding
    so BLEU is computed on normal-looking English text.
    """
    s = s.strip()
    s = re.sub(r"\s+([.,!?;:])", r"\1", s)  # "word ." -> "word."
    s = s.replace(" n't", "n't").replace(" '", "'")  # contractions
    s = re.sub(r"\s+", " ", s)  # collapse multiple spaces
    return s

In [9]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(dropout)

        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0, max_len).unsqueeze(1)
        div = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(pos * div)
        pe[:, 1::2] = torch.cos(pos * div)

        self.register_buffer("pe", pe.unsqueeze(0))

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

In [None]:
class TransformerSeq2Seq(nn.Module):
    def __init__(self, vocab_size, pad_id):
        super().__init__()
        self.pad_id = pad_id

        self.src_emb = nn.Embedding(vocab_size, D_MODEL, padding_idx=pad_id)
        self.tgt_emb = nn.Embedding(vocab_size, D_MODEL, padding_idx=pad_id)

        self.pos_enc = PositionalEncoding(D_MODEL, DROPOUT, MAX_POSITIONS)

        self.transformer = nn.Transformer(
            d_model=D_MODEL,
            nhead=NHEAD,
            num_encoder_layers=NUM_LAYERS,
            num_decoder_layers=NUM_LAYERS,
            dropout=DROPOUT,
            batch_first=True
        )

        self.fc_out = nn.Linear(D_MODEL, vocab_size)

    def forward(self, src, tgt):
        src_pad_mask = (src == self.pad_id)
        tgt_pad_mask = (tgt == self.pad_id)

        src = self.pos_enc(self.src_emb(src) * math.sqrt(D_MODEL))
        tgt = self.pos_enc(self.tgt_emb(tgt) * math.sqrt(D_MODEL))

        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.size(1)).to(src.device)

        out = self.transformer(
            src, tgt,
            tgt_mask=tgt_mask,
            src_key_padding_mask=src_pad_mask,
            tgt_key_padding_mask=tgt_pad_mask,
            memory_key_padding_mask=src_pad_mask
        )

        return self.fc_out(out)

In [11]:
def train_epoch(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0.0

    for src, tgt in loader:
        src, tgt = src.to(DEVICE), tgt.to(DEVICE)

        # teacher forcing shift
        tgt_in = tgt[:, :-1]
        tgt_y  = tgt[:, 1:]

        optimizer.zero_grad()
        logits = model(src, tgt_in)

        loss = criterion(
            logits.reshape(-1, VOCAB_SIZE),
            tgt_y.reshape(-1)
        )

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        total_loss += loss.item()

    return total_loss / max(1, len(loader))


@torch.no_grad()
def eval_loss_acc_and_bleu(model, loader, criterion, pad_id):
    """
    Evaluation is on the loader you pass (we pass val_loader each epoch).
    Returns:
      - val_loss
      - avg_token_acc (micro over all non-pad tokens)
      - BLEU (SacreBLEU) on detokenized decoded text
    """
    model.eval()

    total_loss = 0.0
    total_correct = 0
    total_tokens = 0

    all_pred_texts = []
    all_gold_texts = []

    for src, tgt in loader:
        src, tgt = src.to(DEVICE), tgt.to(DEVICE)

        tgt_in = tgt[:, :-1]
        tgt_y  = tgt[:, 1:]

        logits = model(src, tgt_in)

        loss = criterion(
            logits.reshape(-1, VOCAB_SIZE),
            tgt_y.reshape(-1)
        )
        total_loss += loss.item()

        preds = logits.argmax(dim=-1)  # (B, T-1)

        # avg token accuracy (micro)
        mask = (tgt_y != pad_id)
        total_correct += (preds[mask] == tgt_y[mask]).sum().item()
        total_tokens += mask.sum().item()

        # decode for BLEU
        preds_np = preds.detach().cpu().numpy()
        labels_np = tgt_y.detach().cpu().numpy()

        pred_texts = [
            simple_detok(tokenizer.decode(list(map(int, ids)), skip_special_tokens=True))
            for ids in preds_np
        ]
        gold_texts = [
            simple_detok(tokenizer.decode(list(map(int, ids)), skip_special_tokens=True))
            for ids in labels_np
        ]

        all_pred_texts.extend(pred_texts)
        all_gold_texts.extend(gold_texts)

    val_loss = total_loss / max(1, len(loader))
    avg_token_acc = (total_correct / total_tokens) if total_tokens > 0 else 0.0

    bleu = sacrebleu.corpus_bleu(
        all_pred_texts,
        [all_gold_texts],
        force=True
    ).score

    return val_loss, avg_token_acc, bleu

In [12]:
model = TransformerSeq2Seq(VOCAB_SIZE, PAD).to(DEVICE)

criterion = nn.CrossEntropyLoss(ignore_index=PAD)
optimizer = optim.Adam(model.parameters(), lr=LR)

print("Model initialized.")

Model initialized.


In [13]:
def save_checkpoint(path, model, optimizer, epoch, train_loss, val_loss, avg_token_acc, bleu):
    ckpt = {
        "epoch": epoch,
        "model_state": model.state_dict(),
        "optimizer_state": optimizer.state_dict(),
        "train_loss": train_loss,
        "val_loss": val_loss,
        "avg_token_acc": avg_token_acc,
        "bleu": bleu,
        "special_ids": {"PAD": PAD, "SOS": SOS, "EOS": EOS, "UNK": UNK},
        "config": {
            "D_MODEL": D_MODEL,
            "NHEAD": NHEAD,
            "NUM_LAYERS": NUM_LAYERS,
            "DROPOUT": DROPOUT,
            "VOCAB_SIZE": VOCAB_SIZE,
        }
    }
    torch.save(ckpt, path)


def load_checkpoint(path, model, optimizer=None, map_location=DEVICE):
    ckpt = torch.load(path, map_location=map_location)
    model.load_state_dict(ckpt["model_state"])
    if optimizer is not None and "optimizer_state" in ckpt:
        optimizer.load_state_dict(ckpt["optimizer_state"])
    return ckpt

In [14]:
def fmt(sec):
    sec = int(sec)
    h = sec // 3600
    m = (sec % 3600) // 60
    s = sec % 60
    if h > 0:
        return f"{h}h {m}m {s}s"
    if m > 0:
        return f"{m}m {s}s"
    return f"{s}s"


# ---- Lists to store losses and metrics per epoch ----
train_losses = []
val_losses = []
val_token_accs = []
val_bleus = []

start_all = time.time()
epoch_times = []

best_val_loss = float("inf")
best_path = os.path.join(CHECKPOINT_DIR, "best.pt")

for epoch in range(1, EPOCHS + 1):
    start_epoch = time.time()

    # ---- TRAIN ----
    train_loss = train_epoch(model, train_loader, optimizer, criterion)
    train_losses.append(train_loss)

    # ---- VALIDATION / TEST ----
    val_loss, avg_token_acc, bleu = eval_loss_acc_and_bleu(
        model, val_loader, criterion, PAD
    )

    val_losses.append(val_loss)
    val_token_accs.append(avg_token_acc)
    val_bleus.append(bleu)

    # ---- Timing ----
    epoch_sec = time.time() - start_epoch
    epoch_times.append(epoch_sec)

    elapsed_sec = time.time() - start_all
    avg_epoch_sec = sum(epoch_times) / len(epoch_times)
    eta_sec = (EPOCHS - epoch) * avg_epoch_sec

    # ---- Save checkpoint ----
    ckpt_path = os.path.join(CHECKPOINT_DIR, f"epoch_{epoch:03d}.pt")
    save_checkpoint(
        ckpt_path,
        model,
        optimizer,
        epoch,
        train_loss,
        val_loss,
        avg_token_acc,
        bleu
    )

    # ---- Save best model (by validation loss) ----
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        save_checkpoint(
            best_path,
            model,
            optimizer,
            epoch,
            train_loss,
            val_loss,
            avg_token_acc,
            bleu
        )

    # ---- PRINT (train + test/val loss explicitly) ----
    print(
        f"Epoch {epoch}/{EPOCHS} | "
        f"train_loss={train_loss:.4f} | "
        f"test_loss={val_loss:.4f} | "
        f"avg_token_acc={avg_token_acc:.4f} | "
        f"BLEU={bleu:.2f} | "
        f"epoch_time={fmt(epoch_sec)} | "
        f"elapsed={fmt(elapsed_sec)} | "
        f"ETA={fmt(eta_sec)} | "
        f"saved={ckpt_path}"
    )

print("\nTraining finished.")
print("Best checkpoint:", best_path)
print("Best validation (test) loss:", best_val_loss)

  output = torch._nested_tensor_from_mask(


Epoch 1/5 | train_loss=5.1446 | test_loss=4.3733 | avg_token_acc=0.2990 | BLEU=4.40 | epoch_time=2m 2s | elapsed=2m 2s | ETA=8m 10s | saved=../best_weights/gardiner_to_english_transformer_checkpoints\epoch_001.pt
Epoch 2/5 | train_loss=4.1694 | test_loss=3.9491 | avg_token_acc=0.3380 | BLEU=6.87 | epoch_time=2m 5s | elapsed=4m 8s | ETA=6m 11s | saved=../best_weights/gardiner_to_english_transformer_checkpoints\epoch_002.pt
Epoch 3/5 | train_loss=3.7836 | test_loss=3.6826 | avg_token_acc=0.3679 | BLEU=5.40 | epoch_time=2m 59s | elapsed=7m 8s | ETA=4m 44s | saved=../best_weights/gardiner_to_english_transformer_checkpoints\epoch_003.pt
Epoch 4/5 | train_loss=3.5182 | test_loss=3.5107 | avg_token_acc=0.3894 | BLEU=7.23 | epoch_time=2m 58s | elapsed=10m 8s | ETA=2m 31s | saved=../best_weights/gardiner_to_english_transformer_checkpoints\epoch_004.pt
Epoch 5/5 | train_loss=3.3101 | test_loss=3.3788 | avg_token_acc=0.4034 | BLEU=8.78 | epoch_time=2m 25s | elapsed=12m 34s | ETA=0s | saved=../bes

In [None]:
# Example usage:
# ckpt = load_checkpoint(os.path.join(CHECKPOINT_DIR, "best.pt"), model, optimizer)
# print("Resumed from epoch:", ckpt["epoch"], "val_loss:", ckpt["val_loss"], "BLEU:", ckpt["bleu"])