In [None]:
!pip install torch torchvision torchaudio
!pip install sacrebleu jiwer
!pip install pandas numpy matplotlib


In [None]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt

In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [2]:
base_path = "/content/drive/MyDrive/dataset"
import os

for poet in os.listdir(base_path):
    print(poet)


waseem-barelvi
wali-mohammad-wali
sahir-ludhianvi
nazm-tabatabai
parveen-shakir
noon-meem-rashid
nida-fazli
naji-shakir
naseer-turabi
mohsin-naqvi
mirza-ghalib
jigar-moradabadi
meer-taqi-meer
kaifi-azmi
javed-akhtar
meer-anees
jaan-nisar-akhtar
jaun-eliya
habib-jalib
gulzar
firaq-gorakhpuri
faiz-ahmad-faiz
fahmida-riaz
dagh-dehlvi
ameer-khusrau
altaf-hussain-hali
bahadur-shah-zafar
akbar-allahabadi
ahmad-faraz
allama-iqbal


In [None]:
import os

base_path = "/content/drive/MyDrive/dataset"
urdu_texts = []

for poet in os.listdir(base_path):
    ur_folder = os.path.join(base_path, poet, "ur")
    if os.path.exists(ur_folder):
        for file in os.listdir(ur_folder):
            file_path = os.path.join(ur_folder, file)
            if os.path.isfile(file_path):
                with open(file_path, 'r', encoding='utf-8') as f:
                    text = f.read().strip()
                    if text:
                        urdu_texts.append(text)

print("Total Urdu files collected:", len(urdu_texts))
print("Example Urdu text:\n", urdu_texts[:1])


In [None]:
import re

def clean_urdu(text):
    # remove diacritics
    diacritics = re.compile(r'[\u064B-\u0652]')
    text = re.sub(diacritics, '', text)

    # normalize characters (e.g. different forms of ya, heh)
    text = text.replace('ي', 'ی').replace('ہ', 'ھ')

    # remove non-Urdu characters (digits, english, punctuation)
    text = re.sub(r'[^\u0600-\u06FF\s]', ' ', text)

    # remove extra spaces
    text = re.sub(r'\s+', ' ', text).strip()

    return text

# apply to all urdu texts
cleaned_urdu = [clean_urdu(t) for t in urdu_texts]

print("Before:", urdu_texts[0][:])
print("After:", cleaned_urdu[0][:])


In [None]:
urdu_to_roman_map = {
    'ا': 'a', 'آ': 'aa', 'ب': 'b', 'پ': 'p', 'ت': 't', 'ٹ': 'ṭ', 'ث': 's',
    'ج': 'j', 'چ': 'ch', 'ح': 'h', 'خ': 'kh', 'د': 'd', 'ڈ': 'ḍ', 'ر': 'r',
    'ڑ': 'ṛ', 'ز': 'z', 'س': 's', 'ش': 'sh', 'غ': 'gh', 'ف': 'f', 'ق': 'q',
    'ک': 'k', 'گ': 'g', 'ل': 'l', 'م': 'm', 'ن': 'n', 'و': 'w', 'ہ': 'h',
    'ھ': 'h', 'ء': '', 'ی': 'y', 'ے':'e', 'ئ':'i'
}

def urdu_to_roman(text):
    return ''.join(urdu_to_roman_map.get(ch, ch) for ch in text)

roman_texts = [urdu_to_roman(t) for t in cleaned_urdu]
print(roman_texts)
print(len(roman_texts))

In [None]:
!pip install sentencepiece
import sentencepiece as spm
import os


In [None]:
# Write all cleaned Urdu lines to one file
with open("all_urdu.txt", "w", encoding="utf-8") as f:
    for line in cleaned_urdu:
        f.write(line + "\n")

# Write all Roman lines to one file
with open("all_roman.txt", "w", encoding="utf-8") as f:
    for line in roman_texts:
        f.write(line + "\n")


In [None]:
# Train Urdu subword tokenizer
spm.SentencePieceTrainer.train(
    input='all_urdu.txt',
    model_prefix='urdu_bpe',
    vocab_size=8000,
    character_coverage=0.9995,
    model_type='bpe'
)

# Train Roman subword tokenizer
spm.SentencePieceTrainer.train(
    input='all_roman.txt',
    model_prefix='roman_bpe',
    vocab_size=8000,
    character_coverage=1.0,
    model_type='bpe'
)


In [None]:
# Load trained models
sp_urdu = spm.SentencePieceProcessor()
sp_urdu.load('urdu_bpe.model')

sp_roman = spm.SentencePieceProcessor()
sp_roman.load('roman_bpe.model')

# Convert texts to subword IDs
urdu_ids = [sp_urdu.encode(line, out_type=int) for line in cleaned_urdu]
roman_ids = [sp_roman.encode(line, out_type=int) for line in roman_texts]


PAD_IDX = sp_urdu.pad_id()   # same for Urdu, SentencePiece uses the same special ids
BOS_IDX = sp_urdu.bos_id()
EOS_IDX = sp_urdu.eos_id()


print("PAD:", PAD_IDX, "BOS:", BOS_IDX, "EOS:", EOS_IDX)

print("Example Urdu tokens:", urdu_ids[0][:20])
print("Example Roman tokens:", roman_ids[0][:20])


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import sentencepiece as spm

# ---------- Load SentencePiece Tokenizers ----------
sp_urdu = spm.SentencePieceProcessor()
sp_urdu.load("urdu_bpe.model")

sp_roman = spm.SentencePieceProcessor()
sp_roman.load("roman_bpe.model")

# Vocabulary sizes from SentencePiece
INPUT_DIM = sp_urdu.get_piece_size()    # Urdu vocab size
OUTPUT_DIM = sp_roman.get_piece_size()  # Roman vocab size

class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim, n_layers=2, dropout=0.3,pad_idx=0):
        super(Encoder, self).__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(input_dim, emb_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(
            emb_dim,
            hidden_dim,
            num_layers=n_layers,
            batch_first=True,
            bidirectional=True,
            dropout=dropout if n_layers > 1 else 0
        )

    def forward(self, src):
        embedded = self.embedding(src)
        outputs, (hidden, cell) = self.lstm(embedded)
        # hidden: [n_layers*2, batch, hidden_dim]

        hidden = self._cat_directions(hidden)
        cell = self._cat_directions(cell)

        # hidden/cell: [n_layers, batch, hidden_dim*2]
        return hidden, cell

    def _cat_directions(self, h):
        # [num_layers*2, batch, hidden_dim] -> [num_layers, batch, hidden_dim*2]
        return torch.cat((h[0::2], h[1::2]), dim=2)




# ---------- Decoder ----------
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim, n_layers=4, dropout=0.3,pad_idx=0):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(
            emb_dim,
            hidden_dim * 2,  # match encoder biLSTM output
            num_layers=n_layers,
            batch_first=True,
            dropout=dropout if n_layers > 1 else 0
        )
        self.fc_out = nn.Linear(hidden_dim * 2, output_dim)

    def forward(self, input, hidden, cell):
        input = input.unsqueeze(1)  # [batch, 1]
        embedded = self.embedding(input)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(1))
        return prediction, hidden, cell


# ---------- Seq2Seq ----------
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.size(0)
        trg_len = trg.size(1)
        trg_vocab_size = self.decoder.fc_out.out_features

        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)

        hidden, cell = self.encoder(src)

        # 🔥 Fix: expand encoder states (2 layers → 4 layers for decoder)
        if hidden.size(0) < self.decoder.lstm.num_layers:
            factor = self.decoder.lstm.num_layers // hidden.size(0)
            hidden = hidden.repeat(factor, 1, 1)
            cell = cell.repeat(factor, 1, 1)

        # First input to decoder is <sos>
        input = trg[:, 0]

        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t, :] = output
            top1 = output.argmax(1)
            input = trg[:, t] if torch.rand(1).item() < teacher_forcing_ratio else top1

        return outputs


EMB_DIM = 256
HIDDEN_DIM = 512   # bigger model
ENC_LAYERS = 2
DEC_LAYERS = 4

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

encoder = Encoder(INPUT_DIM, EMB_DIM, HIDDEN_DIM, ENC_LAYERS, pad_idx=PAD_IDX)
decoder = Decoder(OUTPUT_DIM, EMB_DIM, HIDDEN_DIM, DEC_LAYERS, pad_idx=PAD_IDX)
model = Seq2Seq(encoder, decoder, device).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [None]:
from torch.utils.data import Dataset, DataLoader
import torch

class TranslationDataset(Dataset):
    def __init__(self, src_ids, trg_ids, max_len=50):
        self.src_ids = src_ids
        self.trg_ids = trg_ids
        self.max_len = max_len

    def __len__(self):
        return len(self.src_ids)

    def __getitem__(self, idx):
        src = self.src_ids[idx]
        trg = self.trg_ids[idx]

        # pad to fixed length
        src = src[:self.max_len] + [PAD_IDX] * (self.max_len - len(src))
        trg = trg[:self.max_len] + [PAD_IDX] * (self.max_len - len(trg))


        return torch.tensor(src), torch.tensor(trg)


In [None]:
from sklearn.model_selection import train_test_split

# assume urdu_ids and roman_ids are lists of tokenized sequences
train_src, test_src, train_trg, test_trg = train_test_split(urdu_ids, roman_ids, test_size=0.25, random_state=42)
train_src, val_src, train_trg, val_trg = train_test_split(train_src, train_trg, test_size=0.33, random_state=42)

train_dataset = TranslationDataset(train_src, train_trg, max_len=50)
val_dataset   = TranslationDataset(val_src, val_trg, max_len=50)
test_dataset  = TranslationDataset(test_src, test_trg, max_len=50)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=32)
test_loader  = DataLoader(test_dataset, batch_size=32)


In [None]:
print("Example Roman encoding:", sp_roman.encode("kwiy aṭka hwa he pl shayd", out_type=int, add_bos=True, add_eos=True))
print("Decoded back:", sp_roman.decode(sp_roman.encode("kwiy aṭka hwa he pl shayd", out_type=int, add_bos=True, add_eos=True)))
print("Urdu sample:", cleaned_urdu[0])
print("Urdu encoded:", sp_urdu.encode(cleaned_urdu[0], out_type=int, add_bos=True, add_eos=True)[:40])


In [None]:
# import torch.nn.functional as F

# def train(model, dataloader, optimizer, criterion, clip=1):
#     model.train()
#     epoch_loss = 0

#     for src, trg in dataloader:
#         src, trg = src.to(device), trg.to(device)

#         optimizer.zero_grad()
#         output = model(src, trg)   # [batch, trg_len, output_dim]

#         # shift for loss (ignore sos token at t=0)
#         output_dim = output.shape[-1]
#         output = output[:,1:,:].reshape(-1, output_dim)
#         trg = trg[:,1:].reshape(-1)

#         loss = criterion(output, trg)
#         loss.backward()

#         torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
#         optimizer.step()

#         epoch_loss += loss.item()

#     return epoch_loss / len(dataloader)


# def evaluate(model, dataloader, criterion):
#     model.eval()
#     epoch_loss = 0

#     with torch.no_grad():
#         for src, trg in dataloader:
#             src, trg = src.to(device), trg.to(device)

#             output = model(src, trg, 0)  # no teacher forcing in eval
#             output_dim = output.shape[-1]
#             output = output[:,1:,:].reshape(-1, output_dim)
#             trg = trg[:,1:].reshape(-1)

#             loss = criterion(output, trg)
#             epoch_loss += loss.item()

#     return epoch_loss / len(dataloader)


In [None]:
def train(model, loader, optimizer, criterion, clip=1):
    model.train()
    epoch_loss = 0

    for src, trg in loader:
        src, trg = src.to(model.device), trg.to(model.device)

        optimizer.zero_grad()
        output = model(src, trg)   # output: [batch, trg_len, vocab_size]

        # reshape for loss
        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)  # ignore first token
        trg = trg[:, 1:].reshape(-1)

        loss = criterion(output, trg)
        loss.backward()

        # gradient clipping (avoid exploding gradients)
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        optimizer.step()
        epoch_loss += loss.item()

    return epoch_loss / len(loader)


In [None]:
def evaluate(model, loader, criterion):
    model.eval()
    epoch_loss = 0

    with torch.no_grad():
        for src, trg in loader:
            src, trg = src.to(model.device), trg.to(model.device)

            output = model(src, trg, teacher_forcing_ratio=0)  # no teacher forcing

            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)

            loss = criterion(output, trg)
            epoch_loss += loss.item()

    return epoch_loss / len(loader)


In [None]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=32)


In [None]:
# N_EPOCHS = 15


# for epoch in range(N_EPOCHS):
#     train_loss = train(model, train_loader, optimizer, criterion)
#     val_loss = evaluate(model, val_loader, criterion)

#     print(f"Epoch {epoch+1}: Train Loss = {train_loss:.3f}, Val Loss = {val_loss:.3f}")
def train_model(model, train_loader, val_loader, optimizer, criterion, n_epochs=80, patience=5):
    best_val_loss = float("inf")
    best_model_state = None
    patience_counter = 0

    for epoch in range(1, n_epochs + 1):
        train_loss = train(model, train_loader, optimizer, criterion)
        val_loss = evaluate(model, val_loader, criterion)

        print(f"Epoch {epoch}: Train Loss = {train_loss:.3f}, Val Loss = {val_loss:.3f}")

        # save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state = model.state_dict()
            patience_counter = 0
        else:
            patience_counter += 1

        # stop if no improvement
        if patience_counter >= patience:
            print(f"⏹️ Early stopping at epoch {epoch} (no improvement for {patience} epochs).")
            break

    # restore best model
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
        print("✅ Best model restored with val_loss =", best_val_loss)

    return model


In [None]:
model = train_model(model, train_loader, val_loader, optimizer, criterion,
                    n_epochs=80, patience=15)


Epoch 1: Train Loss = 8.461, Val Loss = 7.184
Epoch 2: Train Loss = 6.659, Val Loss = 6.443


In [None]:
!pip install sacrebleu jiwer


In [None]:
def translate_sentence(model, src_tensor, sp_urdu, sp_roman, max_len=50):
    model.eval()
    src_tensor = src_tensor.unsqueeze(0).to(model.device)  # add batch dim

    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)

        # Expand hidden/cell to match decoder layers
        if hidden.size(0) < model.decoder.lstm.num_layers:
            factor = model.decoder.lstm.num_layers // hidden.size(0)
            hidden = hidden.repeat(factor, 1, 1)
            cell = cell.repeat(factor, 1, 1)

        input = torch.tensor([sp_roman.bos_id()], device=model.device)  # <sos>
        outputs = []
        print("Urdu tokens:", sp_urdu.encode("کوئی اٹکا ھوا ھے پل شاید"))
        print("Roman tokens:", sp_roman.encode("kwiy aṭka hwa he pl shayd"))

        for _ in range(max_len):
            output, hidden, cell = model.decoder(input, hidden, cell)
            top1 = output.argmax(1)
            if top1.item() == sp_roman.eos_id():  # stop at <eos>
                break
            outputs.append(top1.item())
            input = top1

    return sp_roman.decode(outputs)


In [None]:
import sacrebleu
from jiwer import cer

def evaluate_metrics(model, src_texts, trg_texts, sp_urdu, sp_roman, n_samples=100):
    preds, refs = [], []
    for i in range(min(n_samples, len(src_texts))):
        src_ids = torch.tensor(src_texts[i])
        pred = translate_sentence(model, src_ids, sp_urdu, sp_roman)
        tgt = sp_roman.decode(trg_texts[i])
        preds.append(pred)
        refs.append(tgt)

    # BLEU
    bleu = sacrebleu.corpus_bleu(preds, [refs]).score
    # Perplexity (from validation loss)
    val_loss = evaluate(model, val_loader, criterion)
    perplexity = torch.exp(torch.tensor(val_loss)).item()
    # CER
    cer_score = cer(refs, preds)

    print(f"BLEU: {bleu:.2f}")
    print(f"Perplexity: {perplexity:.2f}")
    print(f"CER: {cer_score:.3f}")

    # Show some examples
    for i in range(3):
        print("\nUrdu Input:   ", sp_urdu.decode(src_texts[i]))
        print("Target Roman: ", refs[i])
        print("Model Output: ", preds[i])


In [None]:
evaluate_metrics(model, val_src, val_trg, sp_urdu, sp_roman, n_samples=50)
