# Aksharantar — Fully Documented Baseline Notebook

Character-level seq2seq transliteration (Roman → Devanagari).

This notebook is organized for reviewers: clear explanations, runnable code cells, and baseline-only implementation (1-layer LSTM encoder + 1-layer LSTM decoder).


## Problem statement & Assumptions

**Task:** Map romanized strings (Latin characters) to Devanagari characters using a character-level seq2seq model.

**Baseline constraints (must satisfy):**
- 1-layer encoder + 1-layer decoder
- LSTM cell (allowed)
- Teacher forcing during training
- Save best baseline checkpoint at `results/checkpoints/best_baseline.pt`

**Assumptions for reproducibility:**
- `data/ready/hin_data.npz` exists and contains `X_train, Y_train, X_val, Y_val, X_test, Y_test` numpy arrays.
- `data/processed/hin_clean.csv` exists for building vocabularies.


In [None]:

# ================= CONFIG (baseline default) =================
import os
os.makedirs("results/checkpoints", exist_ok=True)

# Baseline (required)
BASELINE = True        # set False only when running extension (do NOT do this for baseline submission)
RNN_CELL = "LSTM"      # allowed: "RNN","GRU","LSTM"
NUM_LAYERS = 1         # baseline = 1 (encoder+decoder)
EMB_DIM = 256
HID_DIM = 512

# Training
BATCH_SIZE = 128
EPOCHS = 20
LR = 1e-3
TEACHER_FORCING_START = 0.5
TEACHER_FORCING_END = 0.1
CLIP = 1.0

CHECKPOINT_BASELINE = "results/checkpoints/best_baseline.pt"

# Device
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# Load prepared data and vocabs if not in memory
import numpy as np, pandas as pd
if 'X_train' not in globals():
    npz = np.load("data/ready/hin_data.npz", allow_pickle=True)
    X_train, Y_train = npz["X_train"], npz["Y_train"]
    X_val, Y_val = npz["X_val"], npz["Y_val"]
    X_test, Y_test = npz["X_test"], npz["Y_test"]

if 'roman2idx' not in globals():
    df = pd.read_csv("data/processed/hin_clean.csv", names=["roman","devanagari"])
    roman_chars = sorted(list(set("".join(df["roman"].values))))
    dev_chars = sorted(list(set("".join(df["devanagari"].values))))
    special_tokens = ["<pad>","<sos>","<eos>"]
    roman_vocab = special_tokens + roman_chars
    dev_vocab = special_tokens + dev_chars
    roman2idx = {c:i for i,c in enumerate(roman_vocab)}
    idx2roman = {i:c for c,i in roman2idx.items()}
    dev2idx = {c:i for i,c in enumerate(dev_vocab)}
    idx2dev = {i:c for c,i in dev2idx.items()}

INPUT_DIM = len(roman2idx)
OUTPUT_DIM = len(dev2idx)
print("INPUT_DIM:", INPUT_DIM, "OUTPUT_DIM:", OUTPUT_DIM)


In [None]:

from torch.utils.data import Dataset, DataLoader
import torch

class CharDataset(Dataset):
    """Simple dataset wrapper. Stores integer-encoded sequences (padded).

    X and Y are expected to be numpy arrays with shapes (N, T).
    """
    def __init__(self, X, Y):
        self.X = torch.tensor(X, dtype=torch.long)
        self.Y = torch.tensor(Y, dtype=torch.long)
    def __len__(self):
        return self.X.shape[0]
    def __getitem__(self, idx):
        return self.X[idx], self.Y[idx]

train_loader = DataLoader(CharDataset(X_train, Y_train), batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(CharDataset(X_val, Y_val), batch_size=BATCH_SIZE)
test_loader = DataLoader(CharDataset(X_test, Y_test), batch_size=BATCH_SIZE)

print("Data loaders ready. Train size:", len(train_loader.dataset))


In [None]:

import torch.nn as nn
import torch

class Encoder(nn.Module):
    """Encoder: embedding -> single-layer LSTM (baseline)."""
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers=1, cell="LSTM", dropout=0.0):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim, padding_idx=0)
        self.cell = cell.upper()
        self.hid_dim = hid_dim
        self.n_layers = n_layers
        if self.cell == "LSTM":
            self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, batch_first=True, dropout=dropout)
        elif self.cell == "GRU":
            self.rnn = nn.GRU(emb_dim, hid_dim, n_layers, batch_first=True, dropout=dropout)
        else:
            self.rnn = nn.RNN(emb_dim, hid_dim, n_layers, batch_first=True, nonlinearity='tanh', dropout=dropout)

    def forward(self, src):
        # src: (batch, src_len)
        emb = self.embedding(src)                  # (batch, src_len, emb_dim)
        outputs, hidden = self.rnn(emb)           # outputs: (batch, src_len, hid_dim)
        return outputs, hidden

class Decoder(nn.Module):
    """Decoder: single-step decoder. Returns raw logits for softmax.
    For baseline we DO NOT use attention.
    """
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers=1, cell="LSTM", dropout=0.0, attention=None, enc_hid_dim=None):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim, padding_idx=0)
        self.cell = cell.upper()
        self.hid_dim = hid_dim
        self.n_layers = n_layers
        self.attention = attention
        rnn_input_dim = emb_dim
        if self.cell == "LSTM":
            self.rnn = nn.LSTM(rnn_input_dim, hid_dim, n_layers, batch_first=True, dropout=dropout)
        elif self.cell == "GRU":
            self.rnn = nn.GRU(rnn_input_dim, hid_dim, n_layers, batch_first=True, dropout=dropout)
        else:
            self.rnn = nn.RNN(rnn_input_dim, hid_dim, n_layers, batch_first=True, nonlinearity='tanh', dropout=dropout)
        self.fc_out = nn.Linear(hid_dim, output_dim)

    def forward(self, input_token, hidden, enc_outputs=None):
        # input_token: (batch,) dtype long (single timestep tokens)
        emb = self.embedding(input_token).unsqueeze(1)    # (batch,1,emb)
        out, hidden = self.rnn(emb, hidden)
        out = out.squeeze(1)                              # (batch, hid_dim)
        pred = self.fc_out(out)                           # (batch, output_dim)
        return pred, hidden

class Seq2Seq(nn.Module):
    """Wrapper: runs encoder then auto-regressive decoder with teacher forcing option."""
    def __init__(self, encoder, decoder, device, teacher_forcing_ratio=0.5):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        self.teacher_forcing_ratio = teacher_forcing_ratio

    def forward(self, src, trg=None, max_len=None):
        batch_size = src.size(0)
        enc_outputs, enc_hidden = self.encoder(src)
        dec_hidden = enc_hidden

        if trg is not None:
            # Training mode (use target for teacher forcing)
            trg_len = trg.size(1)
            out_dim = self.decoder.fc_out.out_features
            outputs = torch.zeros(batch_size, trg_len, out_dim).to(self.device)
            input_tok = trg[:,0]   # <sos> expected as first token
            for t in range(1, trg_len):
                pred, dec_hidden = self.decoder(input_tok, dec_hidden)
                outputs[:,t,:] = pred
                tf = torch.rand(1).item() < self.teacher_forcing_ratio
                top1 = pred.argmax(1)
                input_tok = trg[:,t] if tf else top1
            return outputs
        else:
            # Inference (greedy) mode: requires max_len
            assert max_len is not None
            out_dim = self.decoder.fc_out.out_features
            outputs = torch.zeros(batch_size, max_len, out_dim).to(self.device)
            input_tok = torch.full((batch_size,), dev2idx['<sos>'], dtype=torch.long, device=self.device)
            for t in range(max_len):
                pred, dec_hidden = self.decoder(input_tok, dec_hidden)
                outputs[:,t,:] = pred
                input_tok = pred.argmax(1)
            return outputs


In [None]:

# initialize model (baseline)
enc = Encoder(INPUT_DIM, EMB_DIM, HID_DIM, n_layers=NUM_LAYERS, cell=RNN_CELL, dropout=0.2).to(device)
dec = Decoder(OUTPUT_DIM, EMB_DIM, HID_DIM, n_layers=NUM_LAYERS, cell=RNN_CELL, dropout=0.2).to(device)
model = Seq2Seq(enc, dec, device, teacher_forcing_ratio=TEACHER_FORCING_START).to(device)

import torch.optim as optim
optimizer = optim.Adam(model.parameters(), lr=LR)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.5, patience=2, verbose=True)
import torch.nn as nn
criterion = nn.CrossEntropyLoss(ignore_index=roman2idx['<pad>'])

print('Model parameter count (trainable):', sum(p.numel() for p in model.parameters() if p.requires_grad))


In [None]:

import time
best_val = float('inf')
no_improve = 0
early_stop_patience = 5

train_losses, val_losses = [], []

for epoch in range(1, EPOCHS+1):
    start = time.time()
    # linear TF decay across epochs
    frac = (epoch-1)/max(1, EPOCHS-1)
    model.teacher_forcing_ratio = TEACHER_FORCING_START*(1-frac) + TEACHER_FORCING_END*frac

    model.train()
    total_train = 0.0
    for src, trg in train_loader:
        src = src.to(device);
        trg = trg.to(device)
        optimizer.zero_grad()
        outputs = model(src, trg)  # (batch, trg_len, out_dim)
        out_dim = outputs.size(-1)
        loss = criterion(outputs[:,1:,:].reshape(-1,out_dim), trg[:,1:].reshape(-1))
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), CLIP)
        optimizer.step()
        total_train += loss.item()

    train_loss = total_train / len(train_loader)

    model.eval()
    total_val = 0.0
    with torch.no_grad():
        for src, trg in val_loader:
            src = src.to(device); trg = trg.to(device)
            outputs = model(src, trg)
            out_dim = outputs.size(-1)
            loss = criterion(outputs[:,1:,:].reshape(-1,out_dim), trg[:,1:].reshape(-1))
            total_val += loss.item()
    val_loss = total_val / len(val_loader)
    scheduler.step(val_loss)

    train_losses.append(train_loss); val_losses.append(val_loss)

    print(f"Epoch {epoch}/{EPOCHS} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | TF: {model.teacher_forcing_ratio:.3f} | Time: {time.time()-start:.1f}s")

    # checkpointing (save best)
    if val_loss + 1e-6 < best_val:
        best_val = val_loss
        no_improve = 0
        torch.save({'epoch': epoch, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'val_loss': val_loss}, CHECKPOINT_BASELINE)
        print("Saved new best checkpoint ->", CHECKPOINT_BASELINE)
    else:
        no_improve += 1
        if no_improve >= early_stop_patience:
            print("Early stopping triggered.")
            break

print("Training finished. Best val:", best_val)


In [None]:

# decode indices to readable string
def decode_indices(indices, idx2map):
    s=[]
    for i in indices:
        ch = idx2map.get(int(i), "")
        if ch in ("<pad>","<sos>","<eos>"): continue
        s.append(ch)
    return "".join(s)

# greedy transliterate a single word
def greedy_transliterate(model, word, max_len=40):
    inp_len = X_train.shape[1]
    seq = [roman2idx.get(ch, roman2idx['<pad>']) for ch in word]
    seq = seq[:inp_len] + [roman2idx['<pad>']]*max(0, inp_len - len(seq))
    src = torch.tensor([seq], dtype=torch.long).to(device)
    with torch.no_grad():
        outputs = model(src, trg=None, max_len=max_len)
    preds = outputs.argmax(-1).squeeze(0).cpu().tolist()
    return decode_indices(preds, idx2dev)

# Quick qualitative test
import random
for i in random.sample(range(len(X_test)), 10):
    inp = ''.join([idx2roman[c] for c in X_test[i] if idx2roman[c] not in ("<pad>",)])
    pred = greedy_transliterate(model, inp)
    true = decode_indices([int(x) for x in Y_test[i]], idx2dev)
    print(f"Input: {inp} | Pred: {pred} | True: {true}")


In [None]:

# Numeric analysis cell (params & MACs)
d = EMB_DIM
h = HID_DIM
T = X_train.shape[1]
V = max(INPUT_DIM, OUTPUT_DIM)

def lstm_params(d,h,V):
    p_layer = 4*(h*d + h*h + h)
    total = 2*p_layer + 2*(V*d) + (V*h + V)
    return p_layer, total

def lstm_macs(d,h,T):
    return 8 * T * h * (d + h)

p_lstm_layer, P_lstm_total = lstm_params(d,h,V)
macs_lstm = lstm_macs(d,h,T)

print("Baseline numeric (LSTM 1-layer enc+dec):")
print(f"d={d}, h={h}, T={T}, V={V}")
print(f"LSTM per-layer params = {p_lstm_layer:,}")
print(f"LSTM total params (enc+dec+emb+out) = {P_lstm_total:,}")
print(f"LSTM forward-pass MACs (enc+dec) ≈ {macs_lstm:,}")
print("Note: backward ≈ 2-3x forward for training.")
