# Phase III: Trigram Language Model for Urdu Story Generation

This notebook implements a **Trigram Language Model** using **Maximum Likelihood Estimation (MLE)** with **Interpolation** for generating Urdu stories.

## Components:
1. **Data Loading** - Load preprocessed Urdu stories
2. **BPE Tokenization** - Byte Pair Encoding tokenization from Phase II
3. **MLE Probability Estimation** - For unigrams, bigrams, and trigrams
4. **Interpolation** - Smooth probabilities by combining n-gram models
5. **Text Generation** - Generate stories until `<EOT>` token

### Special Tokens:
- `<EOS>` - End of Sentence
- `<EOP>` - End of Paragraph  
- `<EOT>` - End of Text (Story)

In [15]:
# ============================================
# IMPORTS AND CONFIGURATION
# ============================================
import os
import glob
import random
import pickle
import math
from collections import defaultdict, Counter
from typing import List, Dict, Tuple, Optional

# Set random seed for reproducibility
random.seed(42)

# Configuration
DATA_DIR = "../PreProcessing/Preprocessed_documents/"
MODEL_SAVE_PATH = "trigram_model.pkl"

# Special tokens (using unused Unicode characters as per Phase I)
EOS_TOKEN = "\ue000"  # End of Sentence
EOP_TOKEN = "\ue001"  # End of Paragraph
EOT_TOKEN = "\ue002"  # End of Text/Story
START_TOKEN = "\ue003"  # Start token for padding

print("Configuration loaded successfully!")
print(f"Special Tokens: EOS={repr(EOS_TOKEN)}, EOP={repr(EOP_TOKEN)}, EOT={repr(EOT_TOKEN)}")

Configuration loaded successfully!
Special Tokens: EOS='\ue000', EOP='\ue001', EOT='\ue002'


In [16]:
# ============================================
# DATA LOADING
# ============================================

def load_stories(data_dir: str) -> List[str]:
    """
    Load all story documents from the data directory.
    Returns a list of story texts.
    """
    stories = []
    file_pattern = os.path.join(data_dir, "doc*.txt")
    files = sorted(glob.glob(file_pattern))
    
    for file_path in files:
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                story = f.read().strip()
                if story:  # Only add non-empty stories
                    stories.append(story)
        except Exception as e:
            print(f"Error reading {file_path}: {e}")
    
    print(f"Loaded {len(stories)} stories from {data_dir}")
    return stories

# Load the data
stories = load_stories(DATA_DIR)
print(f"\nSample story (first 500 chars):\n{stories[0][:500]}...")

Loaded 469 stories from ../PreProcessing/Preprocessed_documents/

Sample story (first 500 chars):
اکمل میٹرک کا طالب علم تھا، لیکن اپنی پڑھائی اور والدین اور اساتذہ کا احترام کرنے میں لاپروا سا تھا۔ <EOS>اکمل کو بگاڑنے میں زیادہ تر ہاتھ ان کے دادا تھا، جو ایک سرکاری ادارے سے ریٹائرڈ افسر تھے۔ <EOS>خود تو وہ تمام عمر پابندیوں میں رہتے ہوئے ملازمت کرتے رہے، لیکن اکمل کو انھوں نے بے جا لاڈ پیار کی وجہ سے خراب کر دیا تھا۔ <EOS>
دادا اپنی پینشن سے اس کی ہر فرمائش کو پورا کرتے۔ <EOS>اکمل کے ماں باپ منع بھی کرتے، مگر دادا کو اپنے پوتے سے بہت پیار تھا۔ <EOS>یہی وجہ تھی کہ اکمل سارا دن کمپیوٹر اور مو...


In [17]:
# ============================================
# BPE TOKENIZER (Phase II Integration)
# ============================================
import json

class BPETokenizer:
    """
    BPE (Byte Pair Encoding) Tokenizer.
    Loads pre-trained vocabulary and merges from Phase II.
    """
    
    def __init__(self, vocab_path: str = "../Tokenization/vocab.json", 
                 merges_path: str = "../Tokenization/merges.txt"):
        """
        Initialize the BPE Tokenizer with pre-trained vocab and merges.
        
        Args:
            vocab_path: Path to vocab.json file
            merges_path: Path to merges.txt file
        """
        self.vocab = self._load_vocab(vocab_path)
        self.merges = self._load_merges(merges_path)
        
        # Build token to ID mapping
        self.token_to_id = {token: idx for idx, token in enumerate(self.vocab)}
        self.id_to_token = {idx: token for idx, token in enumerate(self.vocab)}
        
        # Add special tokens if not in vocab
        special_tokens = [START_TOKEN, EOS_TOKEN, EOP_TOKEN, EOT_TOKEN]
        for token in special_tokens:
            if token not in self.token_to_id:
                idx = len(self.token_to_id)
                self.token_to_id[token] = idx
                self.id_to_token[idx] = token
                self.vocab.append(token)
        
        print(f"BPE Tokenizer loaded: {len(self.vocab)} tokens, {len(self.merges)} merges")
    
    def _load_vocab(self, vocab_path: str) -> List[str]:
        """Load vocabulary from JSON file."""
        try:
            with open(vocab_path, 'r', encoding='utf-8') as f:
                return json.load(f)
        except FileNotFoundError:
            print(f"Warning: Vocab file not found at {vocab_path}. Starting with empty vocab.")
            return []
    
    def _load_merges(self, merges_path: str) -> List[Tuple[str, str]]:
        """Load merge operations from file."""
        merges = []
        try:
            with open(merges_path, 'r', encoding='utf-8') as f:
                for line in f:
                    parts = line.strip().split(' ')
                    if len(parts) == 2:
                        merges.append((parts[0], parts[1]))
        except FileNotFoundError:
            print(f"Warning: Merges file not found at {merges_path}. No merges loaded.")
        return merges
    
    def _apply_merges(self, word: str) -> List[str]:
        """
        Apply BPE merges to a word.
        
        Args:
            word: Input word to tokenize
            
        Returns:
            List of BPE tokens
        """
        # Start with character-level representation
        tokens = list(word)
        
        # Apply each merge in order
        for merge_pair in self.merges:
            i = 0
            while i < len(tokens) - 1:
                if tokens[i] == merge_pair[0] and tokens[i + 1] == merge_pair[1]:
                    # Merge the pair
                    tokens = tokens[:i] + [merge_pair[0] + merge_pair[1]] + tokens[i + 2:]
                else:
                    i += 1
        
        return tokens
    
    def tokenize(self, text: str) -> List[str]:
        """
        Tokenize text into BPE tokens.
        
        Args:
            text: Input text to tokenize
            
        Returns:
            List of BPE tokens
        """
        tokens = []
        words = text.split()
        
        for word in words:
            word_tokens = self._apply_merges(word)
            tokens.extend(word_tokens)
            tokens.append(' ')  # Add space between words
        
        # Remove trailing space if exists
        if tokens and tokens[-1] == ' ':
            tokens = tokens[:-1]
        
        return tokens
    
    def encode(self, text: str) -> List[int]:
        """
        Encode text into token IDs.
        
        Args:
            text: Input text to encode
            
        Returns:
            List of token IDs
        """
        tokens = self.tokenize(text)
        ids = []
        
        for token in tokens:
            if token in self.token_to_id:
                ids.append(self.token_to_id[token])
            else:
                # Unknown token - add to vocab dynamically
                idx = len(self.token_to_id)
                self.token_to_id[token] = idx
                self.id_to_token[idx] = token
                self.vocab.append(token)
                ids.append(idx)
        
        return ids
    
    def decode(self, token_ids: List[int]) -> str:
        """
        Decode token IDs back to text.
        
        Args:
            token_ids: List of token IDs
            
        Returns:
            Decoded text string
        """
        tokens = [self.id_to_token.get(tid, '') for tid in token_ids]
        return ''.join(tokens)
    
    def get_vocab(self) -> Dict[str, int]:
        """Return the vocabulary mapping."""
        return self.token_to_id.copy()
    
    def get_vocab_size(self) -> int:
        """Return the vocabulary size."""
        return len(self.vocab)

# Initialize the BPE tokenizer
tokenizer = BPETokenizer()
print(f"\nVocabulary size: {tokenizer.get_vocab_size()}")

# Test tokenization
test_text = "یہ ایک ٹیسٹ ہے"
test_tokens = tokenizer.tokenize(test_text)
print(f"Test text: {test_text}")
print(f"Tokens: {test_tokens}")

BPE Tokenizer loaded: 2004 tokens, 1937 merges

Vocabulary size: 2004
Test text: یہ ایک ٹیسٹ ہے
Tokens: ['یہ', ' ', 'ایک', ' ', 'ٹیسٹ', ' ', 'ہے']


In [18]:
# ============================================
# TEXT PREPROCESSING WITH SPECIAL TOKENS
# ============================================

def preprocess_story(story: str) -> str:
    """
    Preprocess a story by adding special tokens.
    - Add <EOS> at end of sentences (after ۔)
    - Add <EOP> at end of paragraphs (after double newlines)
    - Add <EOT> at end of story
    """
    # Replace sentence endings with <EOS>
    # Urdu full stop is '۔'
    story = story.replace('۔', f'۔{EOS_TOKEN}')
    
    # Replace paragraph breaks (double newlines) with <EOP>
    story = story.replace('\n\n', f'{EOP_TOKEN}')
    story = story.replace('\n', f'{EOP_TOKEN}')  # Single newlines as paragraph breaks too
    
    # Add <EOT> at the end
    story = story.strip() + EOT_TOKEN
    
    return story

def prepare_corpus(stories: List[str]) -> List[str]:
    """Prepare the entire corpus with special tokens."""
    preprocessed = []
    for story in stories:
        processed = preprocess_story(story)
        preprocessed.append(processed)
    return preprocessed

# Preprocess all stories
corpus = prepare_corpus(stories)
print(f"Preprocessed {len(corpus)} stories")
print(f"\nSample preprocessed story (first 500 chars):")
print(corpus[0][:500])

Preprocessed 469 stories

Sample preprocessed story (first 500 chars):
اکمل میٹرک کا طالب علم تھا، لیکن اپنی پڑھائی اور والدین اور اساتذہ کا احترام کرنے میں لاپروا سا تھا۔ <EOS>اکمل کو بگاڑنے میں زیادہ تر ہاتھ ان کے دادا تھا، جو ایک سرکاری ادارے سے ریٹائرڈ افسر تھے۔ <EOS>خود تو وہ تمام عمر پابندیوں میں رہتے ہوئے ملازمت کرتے رہے، لیکن اکمل کو انھوں نے بے جا لاڈ پیار کی وجہ سے خراب کر دیا تھا۔ <EOS>دادا اپنی پینشن سے اس کی ہر فرمائش کو پورا کرتے۔ <EOS>اکمل کے ماں باپ منع بھی کرتے، مگر دادا کو اپنے پوتے سے بہت پیار تھا۔ <EOS>یہی وجہ تھی کہ اکمل سارا دن کمپیوٹر ا


In [19]:
# ============================================
# TRIGRAM LANGUAGE MODEL (Built from Scratch)
# ============================================

class TrigramLanguageModel:
    """
    Trigram Language Model using Maximum Likelihood Estimation (MLE).
    Implements interpolation smoothing combining unigram, bigram, and trigram probabilities.
    
    Built entirely from scratch without using any pre-built language modeling libraries.
    Now uses BPE tokenization from Phase II.
    """
    
    def __init__(self, lambda1: float = 0.1, lambda2: float = 0.3, lambda3: float = 0.6):
        """
        Initialize the Trigram Language Model.
        
        Args:
            lambda1: Weight for unigram probability (default: 0.1)
            lambda2: Weight for bigram probability (default: 0.3)
            lambda3: Weight for trigram probability (default: 0.6)
            
        Note: lambda1 + lambda2 + lambda3 must equal 1.0
        """
        # Validate interpolation weights
        assert abs(lambda1 + lambda2 + lambda3 - 1.0) < 1e-6, \
            "Interpolation weights must sum to 1.0"
        
        self.lambda1 = lambda1  # Unigram weight
        self.lambda2 = lambda2  # Bigram weight
        self.lambda3 = lambda3  # Trigram weight
        
        # Count dictionaries for n-grams
        self.unigram_counts = Counter()  # Count of each token
        self.bigram_counts = defaultdict(Counter)  # Count of (token1, token2)
        self.trigram_counts = defaultdict(Counter)  # Count of (token1, token2, token3)
        
        # Total counts for MLE denominator
        self.total_unigrams = 0
        self.bigram_context_counts = Counter()  # Count of (token1) for bigram context
        self.trigram_context_counts = Counter()  # Count of (token1, token2) for trigram context
        
        # Vocabulary
        self.vocabulary = set()
        
        # Is the model trained?
        self.is_trained = False
    
    def _tokenize(self, text: str) -> List[str]:
        """
        Tokenize text into list of BPE tokens.
        Uses the BPE tokenizer from Phase II.
        """
        return tokenizer.tokenize(text)
    
    def _create_ngrams(self, tokens: List[str], n: int) -> List[Tuple]:
        """Create n-grams from a list of tokens."""
        ngrams = []
        for i in range(len(tokens) - n + 1):
            ngram = tuple(tokens[i:i + n])
            ngrams.append(ngram)
        return ngrams
    
    def train(self, corpus: List[str]):
        """
        Train the trigram model on the given corpus.
        
        Args:
            corpus: List of preprocessed text documents
        """
        print("Training Trigram Language Model with BPE tokenization...")
        
        for doc_idx, document in enumerate(corpus):
            # Tokenize the document using BPE
            tokens = self._tokenize(document)
            
            # Add padding at the beginning for trigram context
            padded_tokens = [START_TOKEN, START_TOKEN] + tokens
            
            # Build vocabulary
            self.vocabulary.update(tokens)
            
            # Count unigrams
            for token in tokens:
                self.unigram_counts[token] += 1
                self.total_unigrams += 1
            
            # Count bigrams
            for i in range(len(padded_tokens) - 1):
                context = padded_tokens[i]
                next_token = padded_tokens[i + 1]
                self.bigram_counts[context][next_token] += 1
                self.bigram_context_counts[context] += 1
            
            # Count trigrams
            for i in range(len(padded_tokens) - 2):
                context = (padded_tokens[i], padded_tokens[i + 1])
                next_token = padded_tokens[i + 2]
                self.trigram_counts[context][next_token] += 1
                self.trigram_context_counts[context] += 1
            
            # Progress indicator
            if (doc_idx + 1) % 50 == 0:
                print(f"  Processed {doc_idx + 1}/{len(corpus)} documents...")
        
        self.is_trained = True
        print(f"\nTraining complete!")
        print(f"  Vocabulary size: {len(self.vocabulary)}")
        print(f"  Total unigrams (BPE tokens): {self.total_unigrams}")
        print(f"  Unique bigram contexts: {len(self.bigram_counts)}")
        print(f"  Unique trigram contexts: {len(self.trigram_counts)}")
    
    def get_unigram_probability(self, token: str) -> float:
        """
        Calculate unigram probability using MLE.
        P(token) = count(token) / total_tokens
        """
        if self.total_unigrams == 0:
            return 0.0
        return self.unigram_counts[token] / self.total_unigrams
    
    def get_bigram_probability(self, context: str, token: str) -> float:
        """
        Calculate bigram probability using MLE.
        P(token | context) = count(context, token) / count(context)
        """
        context_count = self.bigram_context_counts[context]
        if context_count == 0:
            return 0.0
        return self.bigram_counts[context][token] / context_count
    
    def get_trigram_probability(self, context: Tuple[str, str], token: str) -> float:
        """
        Calculate trigram probability using MLE.
        P(token | context1, context2) = count(context1, context2, token) / count(context1, context2)
        """
        context_count = self.trigram_context_counts[context]
        if context_count == 0:
            return 0.0
        return self.trigram_counts[context][token] / context_count
    
    def get_interpolated_probability(self, context: Tuple[str, str], token: str) -> float:
        """
        Calculate interpolated probability combining unigram, bigram, and trigram.
        
        P_interp(token | ctx1, ctx2) = λ1 * P(token) + λ2 * P(token | ctx2) + λ3 * P(token | ctx1, ctx2)
        
        This smoothing technique helps handle unseen n-grams by falling back to
        lower-order n-grams.
        """
        # Unigram probability
        p_unigram = self.get_unigram_probability(token)
        
        # Bigram probability (using only the second context token)
        p_bigram = self.get_bigram_probability(context[1], token)
        
        # Trigram probability
        p_trigram = self.get_trigram_probability(context, token)
        
        # Interpolated probability
        p_interp = (self.lambda1 * p_unigram + 
                    self.lambda2 * p_bigram + 
                    self.lambda3 * p_trigram)
        
        return p_interp
    
    def get_next_token_probabilities(self, context: Tuple[str, str]) -> Dict[str, float]:
        """
        Get interpolated probabilities for all possible next tokens given a context.
        
        Args:
            context: Tuple of (token1, token2) representing the bigram context
            
        Returns:
            Dictionary mapping each token to its interpolated probability
        """
        probabilities = {}
        for token in self.vocabulary:
            prob = self.get_interpolated_probability(context, token)
            if prob > 0:
                probabilities[token] = prob
        return probabilities
    
    def sample_next_token(self, context: Tuple[str, str], temperature: float = 1.0) -> str:
        """
        Sample the next token given a context using the interpolated probability distribution.
        
        Args:
            context: Tuple of (token1, token2) representing the bigram context
            temperature: Sampling temperature (higher = more random, lower = more deterministic)
            
        Returns:
            The sampled next token
        """
        probabilities = self.get_next_token_probabilities(context)
        
        if not probabilities:
            # Fallback: return a random token from vocabulary
            return random.choice(list(self.vocabulary))
        
        # Apply temperature scaling
        if temperature != 1.0:
            scaled_probs = {}
            for token, prob in probabilities.items():
                if prob > 0:
                    scaled_probs[token] = prob ** (1.0 / temperature)
            probabilities = scaled_probs
        
        # Normalize probabilities
        total = sum(probabilities.values())
        if total == 0:
            return random.choice(list(self.vocabulary))
        
        normalized_probs = {k: v / total for k, v in probabilities.items()}
        
        # Sample from the distribution
        tokens = list(normalized_probs.keys())
        probs = list(normalized_probs.values())
        
        return random.choices(tokens, weights=probs, k=1)[0]
    
    def calculate_perplexity(self, text: str) -> float:
        """
        Calculate the perplexity of a text sequence.
        
        Perplexity = exp(-1/N * sum(log(P(token_i | context))))
        
        Lower perplexity indicates better model fit.
        """
        tokens = self._tokenize(text)
        padded_tokens = [START_TOKEN, START_TOKEN] + tokens
        
        log_prob_sum = 0.0
        n = len(tokens)
        
        for i in range(2, len(padded_tokens)):
            context = (padded_tokens[i - 2], padded_tokens[i - 1])
            token = padded_tokens[i]
            
            prob = self.get_interpolated_probability(context, token)
            if prob > 0:
                log_prob_sum += math.log(prob)
            else:
                # Handle zero probability with a small value
                log_prob_sum += math.log(1e-10)
        
        avg_log_prob = log_prob_sum / n if n > 0 else 0
        perplexity = math.exp(-avg_log_prob)
        
        return perplexity

print("TrigramLanguageModel class defined successfully (using BPE tokenization)!")

TrigramLanguageModel class defined successfully (using BPE tokenization)!


In [20]:
# ============================================
# TEXT GENERATION
# ============================================

class UrduStoryGenerator:
    """
    Urdu Story Generator using the Trigram Language Model.
    Generates text until the <EOT> (End of Text) token is reached.
    """
    
    def __init__(self, model: TrigramLanguageModel):
        """
        Initialize the generator with a trained trigram model.
        
        Args:
            model: A trained TrigramLanguageModel instance
        """
        self.model = model
    
    def generate(self, 
                 prefix: str = "", 
                 max_length: int = 1000, 
                 temperature: float = 1.0,
                 stop_on_eot: bool = True) -> str:
        """
        Generate text starting from an optional prefix.
        
        Args:
            prefix: Starting text (prompt) for generation
            max_length: Maximum number of tokens to generate
            temperature: Sampling temperature (higher = more diverse)
            stop_on_eot: Whether to stop generation at <EOT> token
            
        Returns:
            Generated text string
        """
        if not self.model.is_trained:
            raise RuntimeError("Model must be trained before generation!")
        
        # Initialize with prefix or start tokens
        if prefix:
            tokens = list(prefix)
        else:
            tokens = []
        
        # Add padding for context
        padded_tokens = [START_TOKEN, START_TOKEN] + tokens
        
        generated_count = 0
        
        while generated_count < max_length:
            # Get the context (last two tokens)
            context = (padded_tokens[-2], padded_tokens[-1])
            
            # Sample next token
            next_token = self.model.sample_next_token(context, temperature)
            
            # Add to sequence
            padded_tokens.append(next_token)
            generated_count += 1
            
            # Check for end of text
            if stop_on_eot and next_token == EOT_TOKEN:
                break
        
        # Extract generated tokens (without padding)
        generated_tokens = padded_tokens[2:]
        generated_text = ''.join(generated_tokens)
        
        return generated_text
    
    def generate_story(self, 
                       starting_phrase: str = "", 
                       max_length: int = 2000,
                       temperature: float = 0.8) -> str:
        """
        Generate a complete Urdu story.
        
        Args:
            starting_phrase: Starting phrase in Urdu
            max_length: Maximum length of the story
            temperature: Creativity parameter (0.5-1.5 recommended)
            
        Returns:
            Generated story text
        """
        raw_story = self.generate(
            prefix=starting_phrase,
            max_length=max_length,
            temperature=temperature,
            stop_on_eot=True
        )
        
        # Post-process: Clean up special tokens for display
        cleaned_story = self._clean_story(raw_story)
        
        return cleaned_story
    
    def _clean_story(self, text: str) -> str:
        """
        Clean up the generated story by formatting special tokens.
        """
        # Replace EOS with space (sentence endings are already marked by ۔)
        text = text.replace(EOS_TOKEN, '')
        
        # Replace EOP with newlines (paragraph breaks)
        text = text.replace(EOP_TOKEN, '\n\n')
        
        # Remove EOT token
        text = text.replace(EOT_TOKEN, '')
        
        # Clean up multiple newlines
        while '\n\n\n' in text:
            text = text.replace('\n\n\n', '\n\n')
        
        return text.strip()
    
    def generate_interactive(self):
        """
        Interactive story generation - generates token by token.
        Useful for step-wise display (like ChatGPT).
        """
        tokens = [START_TOKEN, START_TOKEN]
        
        while True:
            context = (tokens[-2], tokens[-1])
            next_token = self.model.sample_next_token(context, temperature=0.8)
            tokens.append(next_token)
            
            # Yield the cleaned token for streaming display
            if next_token == EOT_TOKEN:
                break
            elif next_token == EOS_TOKEN:
                yield ''
            elif next_token == EOP_TOKEN:
                yield '\n\n'
            else:
                yield next_token

print("UrduStoryGenerator class defined successfully!")

UrduStoryGenerator class defined successfully!


In [21]:
# ============================================
# TRAIN THE MODEL
# ============================================

# Initialize the trigram model with interpolation weights
# λ1 (unigram) = 0.1, λ2 (bigram) = 0.3, λ3 (trigram) = 0.6
trigram_model = TrigramLanguageModel(lambda1=0.1, lambda2=0.3, lambda3=0.6)

# Train on the preprocessed corpus
trigram_model.train(corpus)

# Print some statistics
print("\n" + "="*50)
print("MODEL STATISTICS")
print("="*50)
print(f"Vocabulary Size: {len(trigram_model.vocabulary)}")
print(f"Total Tokens: {trigram_model.total_unigrams:,}")
print(f"Unique Bigram Contexts: {len(trigram_model.bigram_counts):,}")
print(f"Unique Trigram Contexts: {len(trigram_model.trigram_counts):,}")
print(f"\nInterpolation Weights:")
print(f"  λ1 (Unigram): {trigram_model.lambda1}")
print(f"  λ2 (Bigram):  {trigram_model.lambda2}")
print(f"  λ3 (Trigram): {trigram_model.lambda3}")

Training Trigram Language Model with BPE tokenization...
  Processed 50/469 documents...
  Processed 100/469 documents...
  Processed 150/469 documents...
  Processed 200/469 documents...
  Processed 250/469 documents...
  Processed 300/469 documents...
  Processed 350/469 documents...
  Processed 400/469 documents...
  Processed 450/469 documents...

Training complete!
  Vocabulary size: 1522
  Total unigrams (BPE tokens): 830961
  Unique bigram contexts: 1522
  Unique trigram contexts: 16320

MODEL STATISTICS
Vocabulary Size: 1522
Total Tokens: 830,961
Unique Bigram Contexts: 1,522
Unique Trigram Contexts: 16,320

Interpolation Weights:
  λ1 (Unigram): 0.1
  λ2 (Bigram):  0.3
  λ3 (Trigram): 0.6


In [22]:
# ============================================
# GENERATE STORIES
# ============================================

# Initialize the story generator
generator = UrduStoryGenerator(trigram_model)

# Generate a story with no prompt
print("="*60)
print("GENERATED STORY (No Prompt)")
print("="*60)
generated_story = generator.generate_story(
    starting_phrase="",
    max_length=500,
    temperature=0.8
)
print(generated_story)
print("\n")

# Generate a story with a prompt
print("="*60)
print("GENERATED STORY (With Prompt: 'ایک دن')")
print("="*60)
generated_story_with_prompt = generator.generate_story(
    starting_phrase="ایک دن",
    max_length=500,
    temperature=0.8
)
print(generated_story_with_prompt)

GENERATED STORY (No Prompt)
رنگا اور وہاں پہنچ گئے تھے۔ <EOS>

اس اُلو پان کر دور یا تھا کر  پھر اُڑ کر

اچھا ایسا کرتا کہ وہ اس سر کے بار ی نتیمولا گ لگائے وہ چلا پڑھ ابا کر کے <EOS>انہوں کی <EOS>وہ سمجھ کر کہا، بھر پور کمرے کو بہت ساتھ نہیں کا ب مرغا ضرورت  کھولایا  رہ کی   کو محتاط ہو سکتے ہوئے اور کہا میں <EOP>

راجہ کے لئے آگے میں اب اسے ہوئے کہ آج میں پر بیٹھی اور اس نے  <EOS>وہی ماں کے ہوا بہت خوش تھا کہیں۔ شروازار کا دل دھاڑ دوں سے ایک لڑکا تھا کہ آپ سے میں اور تمھارامیشان <EOS> بی اور ف لوٹ گئے۔ <EOS>

ان کہا کہ اس اور کر دیا گیا۔ <EOS>

آنکھ کھری طرف دکانپ <EOS>  

ا کر کہ آپ کیسے کرو، بھی رہتا ہے۔ <EOS>

چڑیا تھا۔  خرابی پڑے، آپ آپ بھی و کی جان تو نہیں،جس سے تھا۔ کے پاس جن زادی میں و پکڑ کر دیراد نے جان اس کا انہوں چلا سوچا دو۔ <EOS>

تمھیں ان  آ گئے۔ <EOS>

علی کے م بندہ پڑھنے میں  چھٹی پر نے اس کی ل بہت ذہینوں سے دیا شروع بندر میں جائےیہ تھے۔ <EOS>

اس مرتبہ ان کے


GENERATED STORY (With Prompt: 'ایک دن')
ایک دن انھوں نے نہیں  <EOS>وہ بہت پر کی والے  پُراسر سے ملے ایک لیا 

In [23]:
# ============================================
# MODEL EVALUATION
# ============================================

# Calculate perplexity on a sample from the training data
sample_texts = [corpus[i][:500] for i in range(min(5, len(corpus)))]

print("="*60)
print("MODEL EVALUATION - Perplexity")
print("="*60)

perplexities = []
for i, text in enumerate(sample_texts):
    perplexity = trigram_model.calculate_perplexity(text)
    perplexities.append(perplexity)
    print(f"Sample {i+1}: Perplexity = {perplexity:.2f}")

avg_perplexity = sum(perplexities) / len(perplexities)
print(f"\nAverage Perplexity: {avg_perplexity:.2f}")
print("(Lower perplexity = better model fit)")

MODEL EVALUATION - Perplexity
Sample 1: Perplexity = 9.81
Sample 2: Perplexity = 8.48
Sample 3: Perplexity = 9.72
Sample 4: Perplexity = 9.19
Sample 5: Perplexity = 8.26

Average Perplexity: 9.09
(Lower perplexity = better model fit)


In [24]:
# ============================================
# SAVE AND LOAD MODEL
# ============================================

def save_model(model: TrigramLanguageModel, filepath: str):
    """
    Save the trained trigram model to a pickle file.
    """
    model_data = {
        'lambda1': model.lambda1,
        'lambda2': model.lambda2,
        'lambda3': model.lambda3,
        'unigram_counts': dict(model.unigram_counts),
        'bigram_counts': {k: dict(v) for k, v in model.bigram_counts.items()},
        'trigram_counts': {k: dict(v) for k, v in model.trigram_counts.items()},
        'total_unigrams': model.total_unigrams,
        'bigram_context_counts': dict(model.bigram_context_counts),
        'trigram_context_counts': dict(model.trigram_context_counts),
        'vocabulary': model.vocabulary,
        'is_trained': model.is_trained
    }
    
    with open(filepath, 'wb') as f:
        pickle.dump(model_data, f)
    
    print(f"Model saved to {filepath}")

def load_model(filepath: str) -> TrigramLanguageModel:
    """
    Load a trained trigram model from a pickle file.
    """
    with open(filepath, 'rb') as f:
        model_data = pickle.load(f)
    
    model = TrigramLanguageModel(
        lambda1=model_data['lambda1'],
        lambda2=model_data['lambda2'],
        lambda3=model_data['lambda3']
    )
    
    model.unigram_counts = Counter(model_data['unigram_counts'])
    model.bigram_counts = defaultdict(Counter)
    for k, v in model_data['bigram_counts'].items():
        model.bigram_counts[k] = Counter(v)
    model.trigram_counts = defaultdict(Counter)
    for k, v in model_data['trigram_counts'].items():
        model.trigram_counts[k] = Counter(v)
    model.total_unigrams = model_data['total_unigrams']
    model.bigram_context_counts = Counter(model_data['bigram_context_counts'])
    model.trigram_context_counts = Counter(model_data['trigram_context_counts'])
    model.vocabulary = model_data['vocabulary']
    model.is_trained = model_data['is_trained']
    
    print(f"Model loaded from {filepath}")
    return model

# Save the trained model
save_model(trigram_model, MODEL_SAVE_PATH)
print(f"\nModel saved successfully!")

Model saved to trigram_model.pkl

Model saved successfully!


In [25]:
# ============================================
# API INTERFACE (For Phase IV Integration)
# ============================================

class StoryGeneratorAPI:
    """
    API interface for the Urdu Story Generator.
    This class provides methods that can be easily integrated with FastAPI.
    See Phase IV for the actual FastAPI service implementation.
    """
    
    def __init__(self, model_path: str = None):
        """
        Initialize the API with a trained model.
        
        Args:
            model_path: Path to the saved model file. If None, uses in-memory model.
        """
        if model_path and os.path.exists(model_path):
            self.model = load_model(model_path)
        else:
            # Use the already trained model
            self.model = trigram_model
        
        self.generator = UrduStoryGenerator(self.model)
    
    def generate(self, prefix: str = "", max_length: int = 1000, temperature: float = 0.8) -> dict:
        """
        Generate a story (endpoint: POST /generate).
        
        Args:
            prefix: Starting phrase in Urdu
            max_length: Maximum number of tokens to generate
            temperature: Sampling temperature
            
        Returns:
            Dictionary with generated story and metadata
        """
        try:
            story = self.generator.generate_story(
                starting_phrase=prefix,
                max_length=max_length,
                temperature=temperature
            )
            
            return {
                "success": True,
                "story": story,
                "input_prefix": prefix,
                "max_length": max_length,
                "temperature": temperature
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "input_prefix": prefix
            }
    
    def get_model_info(self) -> dict:
        """
        Get model information and statistics.
        """
        return {
            "model_type": "Trigram Language Model",
            "vocabulary_size": len(self.model.vocabulary),
            "total_tokens": self.model.total_unigrams,
            "interpolation_weights": {
                "lambda1_unigram": self.model.lambda1,
                "lambda2_bigram": self.model.lambda2,
                "lambda3_trigram": self.model.lambda3
            },
            "is_trained": self.model.is_trained
        }
    
    def generate_stream(self, prefix: str = "", max_length: int = 1000, temperature: float = 0.8):
        """
        Generate story in streaming mode (token by token).
        Useful for ChatGPT-like step-wise display.
        
        Yields:
            Individual tokens/characters for streaming display
        """
        if prefix:
            tokens = list(prefix)
        else:
            tokens = []
        
        padded_tokens = [START_TOKEN, START_TOKEN] + tokens
        generated_count = 0
        
        while generated_count < max_length:
            context = (padded_tokens[-2], padded_tokens[-1])
            next_token = self.model.sample_next_token(context, temperature)
            padded_tokens.append(next_token)
            generated_count += 1
            
            if next_token == EOT_TOKEN:
                break
            elif next_token == EOS_TOKEN:
                yield ''
            elif next_token == EOP_TOKEN:
                yield '\n\n'
            else:
                yield next_token

# Initialize API
api = StoryGeneratorAPI()
print("API Interface initialized!")
print(f"\nModel Info: {api.get_model_info()}")

API Interface initialized!

Model Info: {'model_type': 'Trigram Language Model', 'vocabulary_size': 1522, 'total_tokens': 830961, 'interpolation_weights': {'lambda1_unigram': 0.1, 'lambda2_bigram': 0.3, 'lambda3_trigram': 0.6}, 'is_trained': True}


In [26]:
# ============================================
# TEST API ENDPOINT
# ============================================

# Test the generate endpoint
print("="*60)
print("TESTING API ENDPOINT: generate()")
print("="*60)

# Test with different prefixes
test_inputs = [
    {"prefix": "", "max_length": 300},
    {"prefix": "ایک بار", "max_length": 300},
    {"prefix": "بچے نے", "max_length": 300},
]

for i, test in enumerate(test_inputs):
    print(f"\nTest {i+1}: prefix='{test['prefix']}'")
    print("-" * 40)
    result = api.generate(prefix=test['prefix'], max_length=test['max_length'])
    if result['success']:
        print(f"Generated Story:\n{result['story'][:500]}...")
    else:
        print(f"Error: {result['error']}")

TESTING API ENDPOINT: generate()

Test 1: prefix=''
----------------------------------------
Generated Story:
ایک  ہاتھ انسان سے آئے دن بعد مجھے اس میں کرے انہیں اس کلو اچھا امی  کی پر ایک ڈورڈ جاندر ہی نہیں کر دیکن سو کو،سستی رہتی ہے۔ <EOS>

اچانداز میں بھی باہ کی تو ہیں، ان سے کھلی ہے کہ ہے اس نے کہا بدلا تو انھوں نے ان کو گیا۔ <EOS>الب قیمت کا سوالات  تھی، لیا اور بہت ہی  لالچ رہا ہے اور گیا چار روپے دیک روتی ہے۔ دنوں کی طرف دیکھ کر اس میں  کے جب کی طرف دائیں یاد  کی  کر <EOS>اس لئے بہت رہی تھے، اس کے ٹری سمجھے جھولا بولا اور انتقال میں  آرام کہ اس کو سبق سے دور ہو  <EOS> <EOP>

ان معظم چ لا ہوا...

Test 2: prefix='ایک بار'
----------------------------------------
Generated Story:
ایک بار سے معاملہ  نے ان کر کے ایک ایک پر چلتے کی لہاڑا یہ روٹی کا  نظر آ رہا تھا کہ آج اگلی مکر د تھا کہ جن بن گئی۔ <EOS>نہیں  شروع اور آتیں۔ کو پر ہلکی گے۔ <EOS> <EOP>

مس بچوں کی صورت میں  اور میں وہ پوری ہے۔ <EOS>

نہیں۔ <EOS>

اُسے کی صفائی کے بعد فہمی کی جائیں کر سکتے پانڈ کر دیا۔ <EOS>ری ہے۔ <EOS>



In [27]:
# ============================================
# STREAMING GENERATION DEMO (For Phase V)
# ============================================

print("="*60)
print("STREAMING GENERATION DEMO")
print("="*60)
print("Generating story character by character (first 200 chars):\n")

# Collect streamed output
streamed_text = ""
char_count = 0

for token in api.generate_stream(prefix="", max_length=500, temperature=0.8):
    streamed_text += token
    char_count += 1
    if char_count >= 200:
        break

print(streamed_text)
print("\n... (truncated for demo)")
print("\n" + "="*60)
print("Phase III Complete! Model is ready for Phase IV integration.")
print("="*60)

STREAMING GENERATION DEMO
Generating story character by character (first 200 chars):

ایک <EOS>

میں پہنچ گئے۔ واپس چانک ایک  <EOS>

یوں کی گئی کہ سب کچھوا دارے تیسرائے آج پھر ہمار نیک اس  کی گئیں، خیریت والی کے رہتے ہیں۔ ہوں  سے اُتیا تھا۔ <EOS>ہر طرف  چولہاڑی وہ کچھ کر کہ میں ماریہ میں بیٹھا ہے۔ <EOS> <EOP> <EOT>

... (truncated for demo)

Phase III Complete! Model is ready for Phase IV integration.


## Interpolation Technique Explanation

The interpolation smoothing technique combines probabilities from unigram, bigram, and trigram models:

$$P_{interp}(w_i | w_{i-2}, w_{i-1}) = \lambda_1 \cdot P(w_i) + \lambda_2 \cdot P(w_i | w_{i-1}) + \lambda_3 \cdot P(w_i | w_{i-2}, w_{i-1})$$

Where:
- $\lambda_1 = 0.1$ (unigram weight) - helps with completely unseen contexts
- $\lambda_2 = 0.3$ (bigram weight) - provides some context awareness
- $\lambda_3 = 0.6$ (trigram weight) - gives most weight to the full context

**Benefits:**
1. **Handles sparse data**: When trigram counts are zero, we fall back to bigram and unigram
2. **Smoother distribution**: Avoids zero probabilities for unseen n-grams
3. **Balances specificity and generalization**: Higher-order n-grams capture more context, while lower-order provide robustness

In [28]:
# ============================================
# EXPORT AS PYTHON MODULE (For Phase IV)
# ============================================

module_code = '''"""
Trigram Language Model for Urdu Story Generation
Phase III - Built from scratch without any pre-built models

Usage:
    from trigram_model import TrigramLanguageModel, UrduStoryGenerator, StoryGeneratorAPI
    
    # Load pre-trained model
    api = StoryGeneratorAPI(model_path="trigram_model.pkl")
    
    # Generate story
    result = api.generate(prefix="ایک دن", max_length=1000)
    print(result["story"])
"""

import os
import random
import pickle
import math
from collections import defaultdict, Counter
from typing import List, Dict, Tuple, Optional

# Set random seed
random.seed(42)

# Special tokens
EOS_TOKEN = "\\ue000"  # End of Sentence
EOP_TOKEN = "\\ue001"  # End of Paragraph
EOT_TOKEN = "\\ue002"  # End of Text/Story
START_TOKEN = "\\ue003"  # Start token


class TrigramLanguageModel:
    """Trigram Language Model using MLE with Interpolation."""
    
    def __init__(self, lambda1: float = 0.1, lambda2: float = 0.3, lambda3: float = 0.6):
        assert abs(lambda1 + lambda2 + lambda3 - 1.0) < 1e-6
        self.lambda1 = lambda1
        self.lambda2 = lambda2
        self.lambda3 = lambda3
        self.unigram_counts = Counter()
        self.bigram_counts = defaultdict(Counter)
        self.trigram_counts = defaultdict(Counter)
        self.total_unigrams = 0
        self.bigram_context_counts = Counter()
        self.trigram_context_counts = Counter()
        self.vocabulary = set()
        self.is_trained = False
    
    def train(self, corpus: List[str]):
        for document in corpus:
            tokens = list(document)
            padded = [START_TOKEN, START_TOKEN] + tokens
            self.vocabulary.update(tokens)
            for token in tokens:
                self.unigram_counts[token] += 1
                self.total_unigrams += 1
            for i in range(len(padded) - 1):
                ctx = padded[i]
                nxt = padded[i + 1]
                self.bigram_counts[ctx][nxt] += 1
                self.bigram_context_counts[ctx] += 1
            for i in range(len(padded) - 2):
                ctx = (padded[i], padded[i + 1])
                nxt = padded[i + 2]
                self.trigram_counts[ctx][nxt] += 1
                self.trigram_context_counts[ctx] += 1
        self.is_trained = True
    
    def get_interpolated_probability(self, context: Tuple[str, str], token: str) -> float:
        p1 = self.unigram_counts[token] / self.total_unigrams if self.total_unigrams > 0 else 0
        ctx_count = self.bigram_context_counts[context[1]]
        p2 = self.bigram_counts[context[1]][token] / ctx_count if ctx_count > 0 else 0
        tri_count = self.trigram_context_counts[context]
        p3 = self.trigram_counts[context][token] / tri_count if tri_count > 0 else 0
        return self.lambda1 * p1 + self.lambda2 * p2 + self.lambda3 * p3
    
    def sample_next_token(self, context: Tuple[str, str], temperature: float = 1.0) -> str:
        probs = {}
        for token in self.vocabulary:
            p = self.get_interpolated_probability(context, token)
            if p > 0:
                probs[token] = p ** (1.0 / temperature)
        if not probs:
            return random.choice(list(self.vocabulary))
        total = sum(probs.values())
        probs = {k: v / total for k, v in probs.items()}
        return random.choices(list(probs.keys()), weights=list(probs.values()))[0]


class UrduStoryGenerator:
    """Story generator using Trigram model."""
    
    def __init__(self, model: TrigramLanguageModel):
        self.model = model
    
    def generate(self, prefix: str = "", max_length: int = 1000, temperature: float = 0.8) -> str:
        tokens = list(prefix) if prefix else []
        padded = [START_TOKEN, START_TOKEN] + tokens
        for _ in range(max_length):
            ctx = (padded[-2], padded[-1])
            nxt = self.model.sample_next_token(ctx, temperature)
            padded.append(nxt)
            if nxt == EOT_TOKEN:
                break
        text = ''.join(padded[2:])
        return text.replace(EOS_TOKEN, '').replace(EOP_TOKEN, '\\n\\n').replace(EOT_TOKEN, '').strip()


class StoryGeneratorAPI:
    """API interface for FastAPI integration."""
    
    def __init__(self, model_path: str = None):
        if model_path and os.path.exists(model_path):
            self.model = self._load_model(model_path)
        else:
            self.model = TrigramLanguageModel()
        self.generator = UrduStoryGenerator(self.model)
    
    def _load_model(self, path: str) -> TrigramLanguageModel:
        with open(path, 'rb') as f:
            data = pickle.load(f)
        model = TrigramLanguageModel(data['lambda1'], data['lambda2'], data['lambda3'])
        model.unigram_counts = Counter(data['unigram_counts'])
        model.bigram_counts = defaultdict(Counter)
        for k, v in data['bigram_counts'].items():
            model.bigram_counts[k] = Counter(v)
        model.trigram_counts = defaultdict(Counter)
        for k, v in data['trigram_counts'].items():
            model.trigram_counts[k] = Counter(v)
        model.total_unigrams = data['total_unigrams']
        model.bigram_context_counts = Counter(data['bigram_context_counts'])
        model.trigram_context_counts = Counter(data['trigram_context_counts'])
        model.vocabulary = data['vocabulary']
        model.is_trained = True
        return model
    
    def generate(self, prefix: str = "", max_length: int = 1000, temperature: float = 0.8) -> dict:
        try:
            story = self.generator.generate(prefix, max_length, temperature)
            return {"success": True, "story": story, "prefix": prefix}
        except Exception as e:
            return {"success": False, "error": str(e)}


if __name__ == "__main__":
    api = StoryGeneratorAPI(model_path="trigram_model.pkl")
    result = api.generate(prefix="ایک دن", max_length=500)
    print(result["story"] if result["success"] else result["error"])
'''

# Save as Python module
module_path = "trigram_model.py"
with open(module_path, 'w', encoding='utf-8') as f:
    f.write(module_code)

print(f"Python module exported to: {module_path}")
print("This module can be imported in Phase IV for FastAPI integration.")

Python module exported to: trigram_model.py
This module can be imported in Phase IV for FastAPI integration.
