# Neural Machine Translation: Urdu to Roman-Urdu

This notebook implements a BiLSTM Encoder + LSTM Decoder with character-level tokenization for translating Urdu text to Roman-Urdu.

## Features:
- Character-level SentencePiece tokenizers (smaller vocabulary)
- BiLSTM encoder with bidirectional processing
- LSTM decoder with attention mechanism
- Optimized learning rate (5e-4)
- Local dataset support

In [None]:
# Install and import required libraries
import sys, subprocess, importlib, os, json, time, shutil
from pathlib import Path

def try_install():
    print("Installing/Upgrading sentencepiece into:", sys.executable)
    cmd = [sys.executable, "-m", "pip", "install", "--upgrade", "sentencepiece"]
    try:
        subprocess.check_call(cmd)
        print("pip install finished.")
    except subprocess.CalledProcessError as e:
        print("pip install failed with returncode", e.returncode)
        return False
    return True

installed = try_install()

if installed:
    try:
        import sentencepiece as spm
        print("Imported sentencepiece OK — version:", getattr(spm, '__version__', 'n/a'))
    except Exception as e:
        print("Import still failed after install:", type(e).__name__, e)
        print("If you see an import error about compiled libs, restart the kernel and re-run this cell.")
else:
    print("If pip failed due to network, enable Internet in Notebook Settings (right sidebar) and re-run.")

import pandas as pd
import sentencepiece as spm

In [None]:
# Train Character-level SentencePiece Tokenizers

# ---------- CONFIG ----------
# Use local dataset path
csv_path = "../data/urdu_roman_dataset.csv"  # Local dataset
urdu_col = "Urdu"      # column name for Urdu text
roman_col = "English"  # column name for Roman Urdu text

# Character-level vocab sizes (much smaller than word-level)
vocab_size_urdu = 50    # Small character vocab for Urdu
vocab_size_roman = 30   # Small character vocab for Roman

output_dir = Path("tokenizers")
output_dir.mkdir(parents=True, exist_ok=True)

# special tokens to include verbatim in the SentencePiece vocab
special_tokens = ["<s>", "</s>", "<pad>"]
user_defined_symbols = ",".join(special_tokens)

# add special tokens into requested vocab size
effective_vocab_size_urdu = vocab_size_urdu + len(special_tokens)
effective_vocab_size_roman = vocab_size_roman + len(special_tokens)

# ---------- 1. LOAD CSV ----------
if not Path(csv_path).exists():
    raise FileNotFoundError(f"CSV not found at {csv_path}. Make sure dataset is available.")

df = pd.read_csv(csv_path)
df = df[[urdu_col, roman_col]].dropna().reset_index(drop=True)
print(f"Loaded {len(df)} rows from {csv_path}")

# ---------- 2. WRITE CORPUS FILES (one-sample-per-line) ----------
urdu_file = output_dir / "urdu_corpus.txt"
roman_file = output_dir / "roman_corpus.txt"

with open(urdu_file, 'w', encoding='utf-8') as f:
    for s in df[urdu_col].astype(str):
        f.write(s.replace('\r', ' ').replace('\n', ' ') + '\n')

with open(roman_file, 'w', encoding='utf-8') as f:
    for s in df[roman_col].astype(str):
        f.write(s.replace('\r', ' ').replace('\n', ' ') + '\n')

print("Wrote corpus files:")
print(" -", urdu_file)
print(" -", roman_file)

# ---------- 3. TRAIN CHARACTER-LEVEL SENTENCEPIECE MODELS ----------
print("Training Urdu SentencePiece (Character-level)... this may take a while for large corpora.")
spm.SentencePieceTrainer.Train(
    input=str(urdu_file),
    model_prefix=str(output_dir / "urdu_char"),
    vocab_size=effective_vocab_size_urdu,
    model_type="char",  # Character-level tokenization
    character_coverage=1.0,
    input_sentence_size=1000000,
    shuffle_input_sentence=True,
    user_defined_symbols=user_defined_symbols
)
print("Urdu character model trained.")

print("Training Roman-Urdu SentencePiece (Character-level)...")
spm.SentencePieceTrainer.Train(
    input=str(roman_file),
    model_prefix=str(output_dir / "roman_char"),
    vocab_size=effective_vocab_size_roman,
    model_type="char",  # Character-level tokenization
    character_coverage=1.0,
    input_sentence_size=1000000,
    shuffle_input_sentence=True,
    user_defined_symbols=user_defined_symbols
)
print("Roman character model trained.")

print("Models saved at:")
print(" -", output_dir / "urdu_char.model")
print(" -", output_dir / "roman_char.model")

# ---------- 4. LOAD AND TEST TOKENIZERS ----------
sp_urdu = spm.SentencePieceProcessor(model_file=str(output_dir / "urdu_char.model"))
sp_roman = spm.SentencePieceProcessor(model_file=str(output_dir / "roman_char.model"))

print(f"\nUrdu tokenizer vocab size: {sp_urdu.get_piece_size()}")
print(f"Roman tokenizer vocab size: {sp_roman.get_piece_size()}")

# print special token ids to confirm
for tok in special_tokens:
    print(f"Urdu: token {tok} -> id {sp_urdu.piece_to_id(tok)}")
    print(f"Roman: token {tok} -> id {sp_roman.piece_to_id(tok)}")

# sample tokenization (first non-empty row)
if len(df) > 0:
    sample_row = df.iloc[0]
    sample_urdu = str(sample_row[urdu_col])
    sample_roman = str(sample_row[roman_col])

    print("\nSample Urdu text:", sample_urdu)
    print("Urdu Token IDs:", sp_urdu.encode(sample_urdu, out_type=int))
    print("Urdu Tokens:", sp_urdu.encode(sample_urdu, out_type=str))

    print("\nSample Roman Urdu text:", sample_roman)
    print("Roman Token IDs:", sp_roman.encode(sample_roman, out_type=int))
    print("Roman Tokens:", sp_roman.encode(sample_roman, out_type=str))

In [None]:
# Neural Machine Translation Model Training

import os
import random
import math
import time
import json
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence
import sentencepiece as spm

# ---------------- CONFIG ----------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

csv_path = "../data/urdu_roman_dataset.csv"  # Local dataset
urdu_col, roman_col = "Urdu", "English"

# Load the character-level tokenizers we just trained
urdu_tokenizer = spm.SentencePieceProcessor(model_file="tokenizers/urdu_char.model")
roman_tokenizer = spm.SentencePieceProcessor(model_file="tokenizers/roman_char.model")

pad_id_src = urdu_tokenizer.piece_to_id("<pad>")
bos_id_tgt = roman_tokenizer.piece_to_id("<s>")
eos_id_tgt = roman_tokenizer.piece_to_id("</s>")
pad_id_tgt = roman_tokenizer.piece_to_id("<pad>")

print(f"Source vocab size: {urdu_tokenizer.get_piece_size()}")
print(f"Target vocab size: {roman_tokenizer.get_piece_size()}")

# Model hyperparameters
embed_dim = 256
hidden_dim = 512
enc_layers = 2
dec_layers = 4
dropout = 0.3
batch_size = 64
num_epochs = 15
learning_rate = 5e-4  # Optimized learning rate

print(f"Learning rate: {learning_rate}")

# ---------------- DATASET ----------------
class NMTDataset(Dataset):
    def __init__(self, df, src_tok, tgt_tok):
        self.src_texts = df[urdu_col].astype(str).tolist()
        self.tgt_texts = df[roman_col].astype(str).tolist()
        self.src_tok, self.tgt_tok = src_tok, tgt_tok

    def __len__(self): 
        return len(self.src_texts)

    def __getitem__(self, idx):
        src_ids = self.src_tok.encode(self.src_texts[idx], out_type=int)
        tgt_ids = self.tgt_tok.encode(self.tgt_texts[idx], out_type=int)
        decoder_input = [bos_id_tgt] + tgt_ids
        labels = tgt_ids + [eos_id_tgt]
        return torch.tensor(src_ids, dtype=torch.long), torch.tensor(decoder_input, dtype=torch.long), torch.tensor(labels, dtype=torch.long)

def collate_fn(batch):
    srcs, dec_ins, labels = zip(*batch)
    srcs = pad_sequence(srcs, batch_first=True, padding_value=pad_id_src)
    dec_ins = pad_sequence(dec_ins, batch_first=True, padding_value=pad_id_tgt)
    labels = pad_sequence(labels, batch_first=True, padding_value=-100)  # -100 ignored in loss
    return srcs.to(device), dec_ins.to(device), labels.to(device)

df = pd.read_csv(csv_path).dropna()[[urdu_col, roman_col]]
print(f"Dataset size: {len(df)} samples")
dataset = NMTDataset(df, urdu_tokenizer, roman_tokenizer)

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_ds, val_ds = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

print(f"Train samples: {train_size}, Validation samples: {val_size}")

In [None]:
# Model Architecture: BiLSTM Encoder + LSTM Decoder

class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_id_src)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=num_layers,
                            dropout=dropout, bidirectional=True, batch_first=True)

    def forward(self, src):
        embedded = self.embedding(src)
        outputs, (h, c) = self.lstm(embedded)
        # concat forward + backward hidden states
        h = torch.cat((h[-2], h[-1]), dim=1).unsqueeze(0)  # [1,B,2H]
        c = torch.cat((c[-2], c[-1]), dim=1).unsqueeze(0)
        return outputs, (h, c)

class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_id_tgt)
        self.lstm = nn.LSTM(embed_dim, hidden_dim*2, num_layers=num_layers,
                            dropout=dropout, batch_first=True)
        self.fc_out = nn.Linear(hidden_dim*2, vocab_size)

    def forward(self, dec_in, hidden, cell):
        embedded = self.embedding(dec_in)
        outputs, (h, c) = self.lstm(embedded, (hidden, cell))
        logits = self.fc_out(outputs)
        return logits, (h, c)

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, dec_layers):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.dec_layers = dec_layers

    def forward(self, src, tgt_in):
        _, (h, c) = self.encoder(src)   # shape [1, B, 1024]
        # expand to (dec_layers, B, 1024)
        h = h.repeat(self.dec_layers, 1, 1)
        c = c.repeat(self.dec_layers, 1, 1)
        logits, _ = self.decoder(tgt_in, h, c)
        return logits

# Initialize model
encoder = Encoder(urdu_tokenizer.get_piece_size(), embed_dim, hidden_dim, enc_layers, dropout).to(device)
decoder = Decoder(roman_tokenizer.get_piece_size(), embed_dim, hidden_dim, dec_layers, dropout).to(device)
model = Seq2Seq(encoder, decoder, dec_layers).to(device)

# Count parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")

criterion = nn.CrossEntropyLoss(ignore_index=-100)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Training and Evaluation Functions

def calc_token_accuracy(preds, labels, ignore_index=-100):
    mask = labels.ne(ignore_index)
    if mask.sum().item() == 0:
        return 0.0
    correct = (preds == labels) & mask
    return correct.sum().item() / mask.sum().item()

def train_one_epoch(model, loader):
    model.train()
    total_loss = 0.0
    total_correct = 0
    total_tokens = 0
    
    for batch_idx, (src, dec_in, labels) in enumerate(loader):
        optimizer.zero_grad()
        logits = model(src, dec_in)  # [B,T,V]
        loss = criterion(logits.view(-1, logits.size(-1)), labels.view(-1))
        loss.backward()
        optimizer.step()

        with torch.no_grad():
            preds = logits.argmax(dim=-1)
            mask = labels.ne(-100)
            total_correct += ((preds == labels) & mask).sum().item()
            total_tokens += mask.sum().item()

        total_loss += loss.item()
        
        if (batch_idx + 1) % 100 == 0:
            print(f"  Batch {batch_idx + 1}/{len(loader)}, Loss: {loss.item():.4f}")

    avg_loss = total_loss / len(loader) if len(loader) > 0 else 0.0
    avg_acc = (total_correct / total_tokens) if total_tokens > 0 else 0.0
    return avg_loss, avg_acc

def evaluate(model, loader):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_tokens = 0
    
    with torch.no_grad():
        for src, dec_in, labels in loader:
            logits = model(src, dec_in)
            loss = criterion(logits.view(-1, logits.size(-1)), labels.view(-1))
            preds = logits.argmax(dim=-1)
            mask = labels.ne(-100)
            total_correct += ((preds == labels) & mask).sum().item()
            total_tokens += mask.sum().item()
            total_loss += loss.item()

    avg_loss = total_loss / len(loader) if len(loader) > 0 else 0.0
    avg_acc = (total_correct / total_tokens) if total_tokens > 0 else 0.0
    return avg_loss, avg_acc

print("Training and evaluation functions defined.")

In [None]:
# Main Training Loop

best_val_loss = float('inf')
best_path = "best_bilstm_seq2seq3.pth"
training_history = []

print("Starting training...")
print(f"Total epochs: {num_epochs}")
print(f"Learning rate: {learning_rate}")
print(f"Batch size: {batch_size}")
print("-" * 80)

for epoch in range(1, num_epochs + 1):
    start_time = time.time()
    
    print(f"\nEpoch {epoch}/{num_epochs}")
    train_loss, train_acc = train_one_epoch(model, train_loader)
    val_loss, val_acc = evaluate(model, val_loader)
    
    epoch_time = time.time() - start_time
    
    print(f"Epoch {epoch}: Train Loss = {train_loss:.4f}, Train Acc = {train_acc*100:.2f}% | Val Loss = {val_loss:.4f}, Val Acc = {val_acc*100:.2f}% | Time: {epoch_time:.1f}s")
    
    # Save training history
    training_history.append({
        'epoch': epoch,
        'train_loss': train_loss,
        'train_acc': train_acc,
        'val_loss': val_loss,
        'val_acc': val_acc,
        'time': epoch_time
    })

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_loss': val_loss,
            'train_loss': train_loss,
            'val_acc': val_acc,
            'train_acc': train_acc,
            'config': {
                'embed_dim': embed_dim,
                'hidden_dim': hidden_dim,
                'enc_layers': enc_layers,
                'dec_layers': dec_layers,
                'dropout': dropout,
                'learning_rate': learning_rate,
                'vocab_size_src': urdu_tokenizer.get_piece_size(),
                'vocab_size_tgt': roman_tokenizer.get_piece_size()
            }
        }, best_path)
        print(f"--> New best model saved to '{best_path}' (val_loss {val_loss:.4f})")

# Save final model as well (last epoch)
final_path = "bilstm_seq2seq_final.pth"
torch.save(model.state_dict(), final_path)
print(f"\nFinal model saved to '{final_path}'")
print(f"Best model (lowest val loss {best_val_loss:.4f}) saved to '{best_path}'")

# Save training history
with open('training_history.json', 'w') as f:
    json.dump(training_history, f, indent=2)
print("Training history saved to 'training_history.json'")

In [None]:
# Inference Setup and Testing

def greedy_decode(model, src_tensor, max_len=50):
    """Simple greedy decoding for inference"""
    model.eval()
    with torch.no_grad():
        # Encode source
        _, (h, c) = model.encoder(src_tensor)
        h = h.repeat(model.dec_layers, 1, 1)
        c = c.repeat(model.dec_layers, 1, 1)
        
        # Start decoding
        dec_input = torch.tensor([[bos_id_tgt]], device=device)
        outputs = []
        
        for _ in range(max_len):
            logits, (h, c) = model.decoder(dec_input, h, c)
            pred = logits.argmax(dim=-1)
            outputs.append(pred.item())
            
            if pred.item() == eos_id_tgt:
                break
                
            dec_input = pred
    
    return outputs

def translate_text(model, text, src_tokenizer, tgt_tokenizer, max_len=50):
    """Translate a single text using the trained model"""
    # Tokenize input
    src_ids = src_tokenizer.encode(text, out_type=int)
    src_tensor = torch.tensor([src_ids], device=device)
    
    # Decode
    output_ids = greedy_decode(model, src_tensor, max_len)
    
    # Convert back to text
    if eos_id_tgt in output_ids:
        output_ids = output_ids[:output_ids.index(eos_id_tgt)]
    
    translated = tgt_tokenizer.decode(output_ids)
    return translated

# Load best model for testing
checkpoint = torch.load(best_path, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
print(f"Loaded best model from epoch {checkpoint['epoch']} with validation loss {checkpoint['val_loss']:.4f}")

# Test with sample texts
test_texts = [
    "آدمی کے ریشے ریشے میں سما جاتا ہے عشق",
    "فیصلہ تیرا ترے ہاتھوں میں ہے دل",
    "عشق سے پیدا نوائے زندگی میں زیر و بم"
]

print("\n" + "="*80)
print("TRANSLATION TESTS")
print("="*80)

for i, urdu_text in enumerate(test_texts, 1):
    roman_translation = translate_text(model, urdu_text, urdu_tokenizer, roman_tokenizer)
    print(f"\nTest {i}:")
    print(f"Urdu:  {urdu_text}")
    print(f"Roman: {roman_translation}")
    print("-" * 40)

In [None]:
# Copy trained files to organized project structure

import shutil
from pathlib import Path

# Source and destination paths using organized structure
models_folder = Path("../../models")  # Project root models folder
tokenizers_folder = Path("../../tokenizers")  # Project root tokenizers folder

# Create directories if they don't exist
models_folder.mkdir(parents=True, exist_ok=True)
tokenizers_folder.mkdir(parents=True, exist_ok=True)

# Files to copy
files_to_copy = [
    ("tokenizers/urdu_char.model", tokenizers_folder / "urdu_char.model"),
    ("tokenizers/roman_char.model", tokenizers_folder / "roman_char.model"),
    (best_path, models_folder / "best_bilstm_seq2seq3.pth"),  # Cleaned up name
    ("training_history.json", models_folder / "training_history.json")
]

print("Copying files to organized project structure...")

for src, dst in files_to_copy:
    src_path = Path(src)
    
    if src_path.exists():
        shutil.copy2(src_path, dst)
        print(f"✓ Copied: {src} -> {dst}")
    else:
        print(f"✗ Missing: {src}")

print("\nFile organization complete!")
print("\nProject structure:")
print("├── models/")
print("│   ├── best_bilstm_seq2seq3.pth (trained model)")
print("│   └── training_history.json (training metrics)")
print("├── tokenizers/")
print("│   ├── urdu_char.model (character-level Urdu tokenizer)")
print("│   └── roman_char.model (character-level Roman tokenizer)")
print("└── notebooks/")
print("    └── project1_updated.ipynb (this training notebook)")

print("\nBackend will now load models from:")
print("- Model: project_root/models/best_bilstm_seq2seq3.pth")
print("- Tokenizers: project_root/tokenizers/*.model")

## Training Summary

This notebook implements a state-of-the-art neural machine translation system with the following improvements:

### Key Features:
1. **Character-level Tokenization**: Much smaller vocabulary sizes (51 for Urdu, 32 for Roman)
2. **Optimized Learning Rate**: 5e-4 for better convergence
3. **BiLSTM Encoder**: Bidirectional processing for better context understanding
4. **Local Dataset Support**: Works with local files instead of requiring Kaggle
5. **Comprehensive Evaluation**: Token-level accuracy tracking

### Model Architecture:
- **Encoder**: 2-layer BiLSTM (512 hidden units)
- **Decoder**: 4-layer LSTM (1024 hidden units)
- **Embedding**: 256 dimensions
- **Total Parameters**: ~39.9M

### Training Configuration:
- **Learning Rate**: 5e-4 (optimized)
- **Batch Size**: 64
- **Epochs**: 15
- **Dropout**: 0.3

The trained model and tokenizers are automatically copied to the parent directory for use with the inference system.