In [None]:
import re
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [None]:

from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
device="cuda" if torch.cuda.is_available() else "cpu"

#**Preprocessing the data**

####1.The script cleans Roman Urdu text by replacing special characters with standard alphabets and prints before-and-after samples for inspection.

In [None]:
'''
import pandas as pd

# Replacement mapping dictionary (extended from earlier)
replacement_mapping = {
    'ñ': 'n', 'ā': 'a', 'ḳ': 'k', 'ġ': 'g', 'ī': 'i', 'ḍ': 'd',
    'ḥ': 'h', 'ṣ': 's', 'ṭ': 't', 'ẓ': 'z', 'ū': 'u', 'ż': 'z',
    'ṁ': 'm', 'ṙ': 'r', 'ʼ': '', 'ḷ': 'l', 'ê': 'e', 'é': 'e',
    'ó': 'o', 'ô': 'o', 'í': 'i', 'â': 'a', 'ç': 'c', 'ã': 'a',
    'è': 'e', 'à': 'a', 'ù': 'u', 'ü': 'u', 'ö': 'o', 'ì': 'i',
    'ẖ': 'h', 'ẏ': 'y', 'ʿ': '', '.': '', ',': '', '’': '',
    '!': '', '?': '', '“': '', '”': '', '–': '-', '—': '-',
    '‘': '', 'µ': 'u', '@': 'at'
}

# Function to clean text by replacing special characters
def replace_special_characters(text):
    for special_char, standard_char in replacement_mapping.items():
        text = text.replace(special_char, standard_char)
    return text

# Load the dataset (example CSV with a column named 'text')
df = pd.read_csv('/content/drive/MyDrive/datasets/Roman-Urdu-Poetry.csv')

# Show a sample before cleaning
print("Before cleaning sample:")
print(df[['Poetry']].head(5))

# Apply the cleaning function
df['cleaned_text'] = df['Poetry'].apply(replace_special_characters)

# Show a sample after cleaning
print("\nAfter cleaning sample:")
print(df[['cleaned_text']].head(5))

# Save the cleaned dataset
df.to_csv('/content/drive/MyDrive/datasets/Roman-Urdu-Poetry/cleaned_roman_urdu_dataset.csv', index=False)

print("\nDataset cleaned and saved successfully.")
'''

'\nimport pandas as pd\n\n# Replacement mapping dictionary (extended from earlier)\nreplacement_mapping = {\n    \'ñ\': \'n\', \'ā\': \'a\', \'ḳ\': \'k\', \'ġ\': \'g\', \'ī\': \'i\', \'ḍ\': \'d\',\n    \'ḥ\': \'h\', \'ṣ\': \'s\', \'ṭ\': \'t\', \'ẓ\': \'z\', \'ū\': \'u\', \'ż\': \'z\',\n    \'ṁ\': \'m\', \'ṙ\': \'r\', \'ʼ\': \'\', \'ḷ\': \'l\', \'ê\': \'e\', \'é\': \'e\',\n    \'ó\': \'o\', \'ô\': \'o\', \'í\': \'i\', \'â\': \'a\', \'ç\': \'c\', \'ã\': \'a\',\n    \'è\': \'e\', \'à\': \'a\', \'ù\': \'u\', \'ü\': \'u\', \'ö\': \'o\', \'ì\': \'i\',\n    \'ẖ\': \'h\', \'ẏ\': \'y\', \'ʿ\': \'\', \'.\': \'\', \',\': \'\', \'’\': \'\',\n    \'!\': \'\', \'?\': \'\', \'“\': \'\', \'”\': \'\', \'–\': \'-\', \'—\': \'-\',\n    \'‘\': \'\', \'µ\': \'u\', \'@\': \'at\'\n}\n\n# Function to clean text by replacing special characters\ndef replace_special_characters(text):\n    for special_char, standard_char in replacement_mapping.items():\n        text = text.replace(special_char, standard_char)\n  

In [None]:
# Load the dataset (example CSV with a column named 'text')
data = pd.read_csv('/content/drive/MyDrive/datasets/Roman-Urdu-Poetry/cleaned_roman_urdu_dataset.csv')

In [None]:
# Function to preprocess sentences
def preprocess_english_data(data):
    # Remove all non-alphabetic characters, keeping only letters a-z and A-Z
    data = re.sub(r'[^a-zA-Z\s]', '', data)  # Keeps only letters and spaces
    # Remove extra spaces
    data = re.sub(r'\s+', ' ', data).strip()
    #to lower case
    data=data.lower()
    return data

In [None]:
data.dropna(inplace=True)
data['clean_data'] = data['cleaned_text'].apply(preprocess_english_data)
data[['clean_data','cleaned_text']].head()

Unnamed: 0,clean_data,cleaned_text
0,aankh se duur na ho dil se utar jaega vaqt ka ...,aankh se duur na ho dil se utar jaega \nvaqt k...
1,ashiqi men mir jaise khvab mat dekha karo bavl...,ashiqi men 'mir' jaise khvab mat dekha karo \n...
2,ab aur kya kisi se marasim bahaen ham ye bhi b...,ab aur kya kisi se marasim baḌhaen ham \nye bh...
3,ab ke ham bichhe to shayad kabhi khvabon men m...,ab ke ham bichhḌe to shayad kabhi khvabon men ...
4,ab ke tajdidevafa ka nahin imkan janan yaad ky...,ab ke tajdid-e-vafa ka nahin imkan janan \nyaa...


In [None]:
corpus = data['clean_data'].astype(str).tolist()

Step 1: Tokenization and Encoding

#Tokenization

In [None]:
def tokenize(data):
  if isinstance(data,str):
    return data.split()
  else:
    tokenized_data=[]
    for sentence in data:
      tokenized_data.append(sentence.split())
    return tokenized_data

In [None]:
tokenized_data=tokenize(corpus)
print(tokenized_data[0])

['aankh', 'se', 'duur', 'na', 'ho', 'dil', 'se', 'utar', 'jaega', 'vaqt', 'ka', 'kya', 'hai', 'guzarta', 'hai', 'guzar', 'jaega', 'itna', 'manus', 'na', 'ho', 'khalvategham', 'se', 'apni', 'tu', 'kabhi', 'khud', 'ko', 'bhi', 'dekhega', 'to', 'dar', 'jaega', 'dubte', 'dubte', 'kashti', 'ko', 'uchhala', 'de', 'duun', 'main', 'nahin', 'koi', 'to', 'sahil', 'pe', 'utar', 'jaega', 'zindagi', 'teri', 'ata', 'hai', 'to', 'ye', 'jaane', 'vaala', 'teri', 'bakhshish', 'tiri', 'dahliz', 'pe', 'dhar', 'jaega', 'zabt', 'lazim', 'hai', 'magar', 'dukh', 'hai', 'qayamat', 'ka', 'faraz', 'zalim', 'ab', 'ke', 'bhi', 'na', 'roega', 'to', 'mar', 'jaega']


#Building Vocab and word2idx *mapping*

In [None]:
def vocab_word2idx(data):
  vocab=set()
  for sentence in data:
    for word in sentence:
      vocab.add(word)
  vocab.add("<PAD>")

  word2idx={} #empty dictionary
  idx=1
  for word in vocab:
    word2idx[word]=idx
    idx+=1
  word2idx["<PAD>"] = 0  # Padding token
  return word2idx,vocab

In [None]:

word2idx_mapping,vocab=vocab_word2idx(tokenized_data)


#using the word2idx mapping now replacing the words with indices

In [None]:
def convert_tokens_to_indices(tokenized_data, word2idx):
    indexed_data = []

    for sentence in tokenized_data:
        indexed_sentence = []

        for word in sentence:
            if word in word2idx:
                indexed_sentence.append(word2idx[word])  # Add word index if found
            else:
                indexed_sentence.append(0)  # Assign 0 for unknown words

        indexed_data.append(indexed_sentence)  # Append indexed sentence to the result

    return indexed_data


In [None]:
input_data=convert_tokens_to_indices(tokenized_data,word2idx_mapping)
print(input_data[0])
print(input_data[1])

[103, 4671, 5668, 5243, 1866, 11272, 4671, 11636, 11365, 8425, 3854, 13240, 7436, 1939, 7436, 7369, 11365, 15895, 969, 5243, 1866, 14110, 4671, 7901, 11333, 12058, 10671, 1155, 15824, 8603, 7065, 13895, 11365, 4878, 4878, 1275, 1155, 15163, 3727, 6321, 12731, 9894, 9239, 7065, 16653, 14891, 11636, 11365, 10733, 9575, 8486, 7436, 7065, 454, 3400, 3925, 9575, 14671, 9091, 7557, 14891, 3749, 11365, 10750, 4705, 7436, 1570, 679, 7436, 1466, 3854, 10690, 10366, 3231, 2013, 15824, 5243, 482, 7065, 3376, 11365]
[2106, 9708, 3374, 12934, 13073, 998, 4752, 8369, 12180, 1866, 9961, 8912, 998, 4752, 8369, 11960, 11960, 12081, 12371, 10929, 13404, 3869, 7487, 3854, 10187, 2854, 998, 4752, 8369, 14251, 7045, 9708, 4313, 1044, 13143, 4567, 15623, 9866, 6971, 1155, 11486, 998, 4752, 8369, 284, 9708, 13240, 5121, 7341, 9708, 13240, 7114, 14092, 9708, 16381, 16381, 998, 4752, 8369, 15694, 4671, 5565, 2013, 5948, 15433, 7065, 15517, 12501, 1030, 10187, 8539, 8727, 10954, 4431, 998, 4752, 8369, 5262, 125

##Splitting data into context and target

In [None]:
context_data = []
target_data = []

for sentence in input_data:  # Iterate through each sentence in the input_data
    for i in range(1, len(sentence)):  # Starting from index 1 to the end of the sentence
        context_data.append(sentence[:i])  # Context: first i words (token indices)
        target_data.append(sentence[i])   # Target: next word (token index)


In [None]:
print(context_data[:2])

[[103], [103, 4671]]


In [None]:
max_context_length = max(len(seq) for seq in context_data)


##Adding Padding

In [None]:
'''def pad_sequences(data, max_len, padding_value=0):
    padded_data = []
    for seq in data:
        padded_seq = F.pad(torch.tensor(seq), (0, max_len - len(seq)), value=padding_value)
        padded_data.append(padded_seq)
    return torch.stack(padded_data)'''


'def pad_sequences(data, max_len, padding_value=0):\n    padded_data = []\n    for seq in data:\n        padded_seq = F.pad(torch.tensor(seq), (0, max_len - len(seq)), value=padding_value)\n        padded_data.append(padded_seq)\n    return torch.stack(padded_data)'

In [None]:
max_seq_len = 100  # Limit sequence length to 100

def pad_sequences(data, max_len, padding_value=0):
    padded_data = []
    for seq in data:
        truncated_seq = seq[:max_len]  # Truncate sequences longer than max_len
        padded_seq = F.pad(torch.tensor(truncated_seq), (0, max_len - len(truncated_seq)), value=padding_value)
        padded_data.append(padded_seq)
    return torch.stack(padded_data)


#Test

In [None]:
# Adjusting context data with new max sequence length
padded_context_data = pad_sequences(context_data, max_len=max_seq_len, padding_value=0)
print("Sample padded context:", padded_context_data[:2])
print("Sample target data:", target_data[:2])

Sample padded context: tensor([[ 103,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0],
        [ 103, 4671,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0,    0,    0,  

In [None]:
# After creating word2idx_mapping and vocab
idx2word = {idx: word for word, idx in word2idx_mapping.items()}

Step 2: Preparing Dataset

In [None]:
class PoetryDataset(Dataset):
    def __init__(self, context_data, target_data):
        self.context_data = context_data
        self.target_data = target_data

    def __len__(self):
        return len(self.context_data)

    def __getitem__(self, idx):
        return self.context_data[idx], self.target_data[idx]


In [None]:
# Create the dataset
dataset = PoetryDataset(padded_context_data, target_data)

In [None]:
# Create DataLoader
batch_size = 128
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [None]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

#Model Architecture

In [None]:
class PoetryGenerator(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(PoetryGenerator, self).__init__()

        # Embedding Layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        # GRU Layer (2 layers)
        self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True,dropout=0.3)

        # Fully connected layer to predict next word
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # Get word embeddings for input
        embedded = self.embedding(x)

        # Pass through GRU
        gru_out, hidden = self.gru(embedded)

        # Predict next word using the last hidden state
        out = self.fc(gru_out[:, -1, :])  # Use the last time step
        return out

In [None]:
# Initialize model
embedding_dim = 128
hidden_dim = 256
output_dim = (len(vocab)+1)  # Vocabulary size
model = PoetryGenerator(vocab_size=output_dim, embedding_dim=embedding_dim, hidden_dim=hidden_dim, output_dim=output_dim)



In [None]:
model.to(device)

PoetryGenerator(
  (embedding): Embedding(16764, 128)
  (gru): GRU(128, 256, batch_first=True, dropout=0.3)
  (fc): Linear(in_features=256, out_features=16764, bias=True)
)

In [None]:
# Loss and Optimizer
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001,weight_decay=1e-5)

In [None]:
torch.cuda.empty_cache()

#Training the Model

In [None]:
'''
# Loop through batches to print input shapes
for epoch in range(1):  # Only running for 1 epoch to check the inputs
    for context_batch, target_batch in dataloader:
        print(f"Context batch shape: {context_batch.shape}")
        print(f"Target batch shape: {target_batch.shape}")
        # You can also print the actual values of the batches if needed
        print(f"Context batch: {context_batch}")
        print(f"Target batch: {target_batch}")
        print("-" * 50)  # Divider for readability

'''

'\n# Loop through batches to print input shapes\nfor epoch in range(1):  # Only running for 1 epoch to check the inputs\n    for context_batch, target_batch in dataloader:\n        print(f"Context batch shape: {context_batch.shape}")\n        print(f"Target batch shape: {target_batch.shape}")\n        # You can also print the actual values of the batches if needed\n        print(f"Context batch: {context_batch}")\n        print(f"Target batch: {target_batch}")\n        print("-" * 50)  # Divider for readability\n\n'

In [None]:
# Parameters
epochs = 10
patience = 3  # Stop if no improvement for 3 epochs
best_train_loss = float('inf')
early_stop_counter = 0

for epoch in range(epochs):
    model.train()
    running_loss = 0.0

    for batch_idx, (context_batch, target_batch) in enumerate(dataloader):
        context_batch, target_batch = context_batch.cuda(), target_batch.cuda()

        optimizer.zero_grad()
        output = model(context_batch)
        loss = criterion(output, target_batch)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    avg_train_loss = running_loss / len(dataloader)
    print(f"Epoch {epoch+1}/{epochs}, Training Loss: {avg_train_loss:.4f}")

    # **Early Stopping Check**
    if avg_train_loss < best_train_loss:
        best_train_loss = avg_train_loss
        early_stop_counter = 0  # Reset counter
        torch.save(model.state_dict(), "best_model.pth")  # Save best model
    else:
        early_stop_counter += 1
        print(f"No improvement. Early stop counter: {early_stop_counter}/{patience}")

    if early_stop_counter >= patience:
        print("Early stopping triggered. Training stopped.")
        break


Epoch 1/10, Training Loss: 3.2598
Epoch 2/10, Training Loss: 3.2253
Epoch 3/10, Training Loss: 3.2033
Epoch 4/10, Training Loss: 3.2084
No improvement. Early stop counter: 1/3
Epoch 5/10, Training Loss: 3.1843
Epoch 6/10, Training Loss: 3.1665
Epoch 7/10, Training Loss: 3.1501
Epoch 8/10, Training Loss: 3.1408
Epoch 9/10, Training Loss: 3.1370
Epoch 10/10, Training Loss: 3.1230


#Testing the model

In [None]:
def sample_with_temperature(output_logits, temperature=1.0):
    """Apply temperature to the output logits and sample a token."""
    probabilities = torch.nn.functional.softmax(output_logits / temperature, dim=-1)
    return torch.multinomial(probabilities, num_samples=1).item()

def generate_poetry_with_temperature(start_word, model, word2idx, idx2word, max_length=50, temperature=1.0):
    model.eval()

    # Convert start word to index
    input_seq = torch.tensor([word2idx[start_word]]).unsqueeze(0).cuda()  # Add batch dimension
    generated = [start_word]

    # Generate words one at a time
    for _ in range(max_length):
        output = model(input_seq)  # Get model output
        predicted_idx = sample_with_temperature(output, temperature)  # Apply temperature sampling

        # Get the predicted word using the idx2word dictionary
        predicted_word = idx2word[predicted_idx]

        generated.append(predicted_word)

        # Update input sequence for next prediction
        input_seq = torch.tensor([predicted_idx]).unsqueeze(0).cuda()

    return ' '.join(generated)


In [None]:
start_word = "dil"  # Starting word
temperature = 0.8    # Adjust temperature for randomness
generated_poetry = generate_poetry_with_temperature(start_word, model, word2idx_mapping, idx2word, max_length=20, temperature=temperature)
print(generated_poetry)

dil ye duniya dushman dulai jununalamat qasam yadgar aankh bhala baag dardepinhan bhige chunanche baghaireyakdilebemuddaa uthiye hudikhvanon khul bemuhaba muskuraungi bharam
