In [18]:
import pandas as pd
from collections import Counter
import random
import re
import numpy as np
PARQUET_FILE_PATH = '../ass1/gujarati_sentence_tokenized.parquet'
COLUMN_NAME = 'sentence'

In [19]:
def load_tokenized_data_from_parquet(file_path, column_name):
    """
    Loads a list of tokenized sentences from a specified column in a Parquet file.
    """
    try:
        print(f"Attempting to read data from '{file_path}'...")
        df = pd.read_parquet(file_path)
        
        if column_name not in df.columns:
            print(f"Error: Column '{column_name}' not found in the Parquet file.")
            print(f"Available columns are: {df.columns.tolist()}")
            return None
            
        tokenized_sentences = df[column_name].tolist()[:1000000]
        print(f"Successfully loaded {len(tokenized_sentences)} sentences.")
        return tokenized_sentences
        
    except FileNotFoundError:
        print(f"Error: The file '{file_path}' was not found.")
        print("Please make sure the PARQUET_FILE_PATH is correct.")
        return None
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None

# Load the data
tokenized_data = load_tokenized_data_from_parquet(PARQUET_FILE_PATH, COLUMN_NAME)

Attempting to read data from '../ass1/gujarati_sentence_tokenized.parquet'...
Successfully loaded 1000000 sentences.


In [None]:
def gujarati_word_tokenizer(sentence):
    """
    Tokenizes a Gujarati sentence using the provided regex logic.
    """
    # Ensure the input is a string, as data from files can sometimes be other types
    if not isinstance(sentence, str):
        return []
        
    sentence = re.sub(r'\s+', ' ', sentence.strip())
    
    # Using the provided regex patterns
    url_pattern = r'https?://\S+|www\.\S+'
    email_pattern = r'\b[\w\.-]+@[\w\.-]+\.\w+\b'
    date_pattern = r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b|\b\d{1,2}(?:st|nd|rd|th)?\s+\w+\s+\d{4}\b'
    number_pattern = r'\b\d+(?:[\.,]\d+)?\b'
    # Pattern to capture Gujarati script, Latin script, numbers, URLs, emails, dates, and punctuation
    full_pattern = re.compile(
        f'{url_pattern}|{email_pattern}|{date_pattern}|{number_pattern}|[a-zA-Z]+|[\u0A80-\u0AFF]+|[^\w\s]',
        re.UNICODE
    )
    words = re.findall(full_pattern, sentence)
    return words


In [27]:
def add_start_end_tokens(sentences):
    """
    Adds start (<s>) and end (</s>) tokens to each sentence.
    Handles cases where sentences are strings by splitting them into tokens.
    """
    processed_sentences = []
    for sentence in sentences:
        # Check if the item is a string (and not already a list of tokens)
        if isinstance(sentence, str):
            tokens = gujarati_word_tokenizer(sentence)
            processed_sentences.append(['<s>'] + tokens + ['</s>'])
        elif isinstance(sentence, list):
            # If it's already a list, just add the markers
            processed_sentences.append(['<s>'] + sentence + ['</s>'])
    return processed_sentences

def pad_sentences(sentences, n):
    """Pads sentences with n-1 start tokens for an n-gram model."""
    padded_sentences = []
    for sentence in sentences:
        padding = ['<s>'] * (n - 1)
        padded_sentence = padding + sentence[1:]
        padded_sentences.append(padded_sentence)
    return padded_sentences


In [28]:
def generate_ngrams_manual(words, n):
    """A manual generator for n-grams using a sliding window."""
    for i in range(len(words) - n + 1):
        yield tuple(words[i : i + n])

def build_ngram_models_manual(sentences):
    """Builds Unigram, Bigram, Trigram, and Quadrigram models."""
    print("\nBuilding language models...")
    unigram_counts = Counter()
    bigram_counts = Counter()
    trigram_counts = Counter()
    quadrigram_counts = Counter()

    all_words = [word for sentence in sentences for word in sentence]
    unigram_counts.update(all_words)
    
    for i, sentence in enumerate(sentences):
        bigram_counts.update(generate_ngrams_manual(sentence, 2))
        trigram_counts.update(generate_ngrams_manual(sentence, 3))
        quadrigram_counts.update(generate_ngrams_manual(sentence, 4))
    
    print("Models built successfully.")
    return unigram_counts, bigram_counts, trigram_counts, quadrigram_counts

In [29]:
def calculate_bigram_prob(word, prev_word, unigram_counts, bigram_counts):
    """Calculates P(word | prev_word)"""
    bigram_count = bigram_counts.get((prev_word, word), 0)
    prev_word_count = unigram_counts.get(prev_word, 0)
    if prev_word_count == 0: return 0.0
    return bigram_count / prev_word_count

def calculate_trigram_prob(word, prev_word1, prev_word2, bigram_counts, trigram_counts):
    """Calculates P(word | prev_word1, prev_word2)"""
    trigram_count = trigram_counts.get((prev_word1, prev_word2, word), 0)
    bigram_context_count = bigram_counts.get((prev_word1, prev_word2), 0)
    if bigram_context_count == 0: return 0.0
    return trigram_count / bigram_context_count

def calculate_quadrigram_prob(word, p1, p2, p3, trigram_counts, quadrigram_counts):
    """Calculates P(word | p1, p2, p3)"""
    quadrigram_count = quadrigram_counts.get((p1, p2, p3, word), 0)
    trigram_context_count = trigram_counts.get((p1, p2, p3), 0)
    if trigram_context_count == 0: return 0.0
    return quadrigram_count / trigram_context_count

print("Unsmoothed probability functions are defined.")

Unsmoothed probability functions are defined.


In [30]:
def calculate_add_k_bigram_prob(word, prev_word, k, vocab_size, unigram_counts, bigram_counts):
    """Calculates P(word | prev_word) with Add-K smoothing."""
    numerator = bigram_counts.get((prev_word, word), 0) + k
    denominator = unigram_counts.get(prev_word, 0) + (k * vocab_size)
    if denominator == 0: return 0.0
    return numerator / denominator

def calculate_add_k_trigram_prob(word, p1, p2, k, vocab_size, bigram_counts, trigram_counts):
    """Calculates P(word | p1, p2) with Add-K smoothing."""
    numerator = trigram_counts.get((p1, p2, word), 0) + k
    denominator = bigram_counts.get((p1, p2), 0) + (k * vocab_size)
    if denominator == 0: return 0.0
    return numerator / denominator

def calculate_add_k_quadrigram_prob(word, p1, p2, p3, k, vocab_size, trigram_counts, quadrigram_counts):
    """Calculates P(word | p1, p2, p3) with Add-K smoothing."""
    numerator = quadrigram_counts.get((p1, p2, p3, word), 0) + k
    denominator = trigram_counts.get((p1, p2, p3), 0) + (k * vocab_size)
    if denominator == 0: return 0.0
    return numerator / denominator

print("Smoothed probability functions are defined.")

Smoothed probability functions are defined.


In [None]:
import pandas as pd
from collections import Counter, defaultdict
import numpy as np
import time
import re
# --- 0. Global Settings ---
PARQUET_FILE_PATH = '../ass1/gujarati_sentence_tokenized.parquet'
COLUMN_NAME = 'sentence'

# --- 1. Data Loading and Splitting ---

def gujarati_word_tokenizer(sentence):
    """Tokenizes a Gujarati sentence using comprehensive regex logic."""
    if not isinstance(sentence, str): return []
    sentence = re.sub(r'\s+', ' ', sentence.strip())
    # (Regex patterns are omitted for brevity but should be included)
    url_pattern = r'https?://\S+|www\.\S+'
    email_pattern = r'\b[\w\.-]+@[\w\.-]+\.\w+\b'
    # ... other patterns
    number_pattern = r'\b\d+(?:[\.,]\d+)?\b'
    full_pattern = re.compile(
        f'{url_pattern}|{email_pattern}|[a-zA-Z]+|[\u0A80-\u0AFF]+|[^\w\s]', re.UNICODE
    )
    return re.findall(full_pattern, sentence)

def load_and_split_data(file_path, column_name):
    """Loads data, shuffles it, and splits into train, validation, and test sets."""
    print("Loading and splitting data...")
    try:
        df = pd.read_parquet(file_path)
        # Shuffle the entire dataframe. frac=1 means 100%. random_state for reproducibility.
        df_shuffled = df.sample(frac=1, random_state=42).reset_index(drop=True)[:100000]

        # Define split sizes
        val_size = 1000
        test_size = 1000

        # Create the splits
        val_set = df_shuffled.iloc[0:val_size][column_name].tolist()
        test_set = df_shuffled.iloc[val_size : val_size + test_size][column_name].tolist()
        train_set = df_shuffled.iloc[val_size + test_size :][column_name].tolist()

        print(f"  Training set size: {len(train_set)}")
        print(f"  Validation set size: {len(val_set)}")
        print(f"  Test set size: {len(test_set)}")

        return train_set, val_set, test_set
    except Exception as e:
        print(f"Error during data loading/splitting: {e}")
        return None, None, None

def prepare_sentences(sentence_list):
    """Tokenizes and adds boundary markers to a list of sentences."""
    return [['<s>'] + gujarati_word_tokenizer(s) + ['</s>'] for s in sentence_list]


# --- 2. N-gram Model Building (Unchanged) ---
def generate_ngrams(words, n):
    for i in range(len(words) - n + 1):
        yield tuple(words[i:i + n])

def build_ngram_models(sentences):
    """Builds all n-gram models from a list of sentences."""
    print("Building language models on the training set...")
    counts = {
        'unigram': Counter(), 'bigram': Counter(),
        'trigram': Counter(), 'quadrigram': Counter()
    }
    for sentence in sentences:
        counts['unigram'].update(sentence)
        counts['bigram'].update(generate_ngrams(sentence, 2))
        counts['trigram'].update(generate_ngrams(sentence, 3))
        counts['quadrigram'].update(generate_ngrams(sentence, 4))
    print("Models built successfully.")
    return counts

# --- 3. Good-Turing Smoothing Implementation ---

class GoodTuringSmoother:
    """A class to handle Good-Turing smoothing for a given n-gram model."""
    def __init__(self, ngram_counts, vocab_size, n):
        self.ngram_counts = ngram_counts
        self.vocab_size = vocab_size
        self.n = n
        
        print(f"Initializing Good-Turing for {n}-grams...")
        
        # Total number of observed n-gram tokens
        self.N = sum(self.ngram_counts.values())
        
        # Calculate frequency of frequencies (Nc)
        freq_of_freqs = Counter(self.ngram_counts.values())
        self.N_c = defaultdict(int, freq_of_freqs)
        
        # N1 is the number of n-grams that appeared exactly once
        self.N1 = self.N_c[1]
        
        # Total probability mass for all unseen n-grams
        self.P_unseen_total = self.N1 / self.N if self.N > 0 else 0
        
        # Estimate the number of unseen n-gram types. THIS IS THE CHALLENGE.
        num_seen_types = len(self.ngram_counts)
        # For unigrams, this is feasible
        if n == 1:
            num_unseen_types = self.vocab_size - num_seen_types
        else:
            # For n > 1, V^n is too large. We can't calculate this.
            # We'll just use the total probability mass when an unseen n-gram is queried.
            num_unseen_types = -1 # Sentinel for "infeasible to calculate"

        if num_unseen_types > 0:
            self.P_unseen_individual = self.P_unseen_total / num_unseen_types
        else:
            # A fallback: distribute the mass over a very large number as a heuristic
            # This is not theoretically pure but avoids division by zero.
            self.P_unseen_individual = self.P_unseen_total / (self.vocab_size**n * 0.001) if self.vocab_size > 0 else 0


    def get_adjusted_count(self, c):
        """Calculates c* = (c+1) * N(c+1) / N(c)"""
        if self.N_c[c] == 0:
            return c # Fallback if Nc is 0
        return (c + 1) * self.N_c[c + 1] / self.N_c[c]

    def get_probability(self, ngram):
        """Returns the smoothed probability of a given n-gram."""
        c = self.ngram_counts.get(ngram, 0)
        
        if c == 0:
            # The n-gram is unseen
            return self.P_unseen_individual
        else:
            # The n-gram is seen, use the adjusted count c*
            # If N(c+1) is 0, c* will be 0. Fall back to MLE in that case to avoid 0 prob.
            adjusted_c = self.get_adjusted_count(c)
            if adjusted_c == 0:
                # Fallback for the highest frequency items
                return c / self.N
            return adjusted_c / self.N

    def calculate_sentence_prob(self, sentence):
        """Calculates the log probability of a sentence using this smoother."""
        log_prob = 0.0
        epsilon = 1e-12
        
        # Pad with start tokens for context
        padded_sentence = ['<s>'] * (self.n) + sentence[1:]
        
        for i in range(1, len(sentence)):
            # Create the n-gram ending at the current word
            context_start = i + self.n - self.n
            ngram = tuple(padded_sentence[context_start : context_start + self.n])
            prob = self.get_probability(ngram)
            log_prob += np.log(prob + epsilon)
            
        return log_prob

def display_good_turing_table(smoother, model_name, max_c=50, infeasible_threshold=1e7):
    """
    Enhanced display for Good-Turing: prints Nc, adjusted counts, cumulative seen,
    and an estimate (or infeasible note) of unseen types for n-grams.
    
    Args:
        smoother: GoodTuringSmoother instance
        model_name: string label for printing
        max_c: max count C to show rows for (1..max_c)
        infeasible_threshold: if V^n > this, we mark unseen as infeasible
    """
    print(f"\n--- Good-Turing Frequency Table for {model_name} (n={smoother.n}) ---")
    
    # Basic stats
    num_seen_types = len(smoother.ngram_counts)          # number of observed n-gram types (C>0)
    total_observed_tokens = smoother.N                  # sum of counts
    N1 = smoother.N1                                    # number of types with count == 1
    P_unseen_total = smoother.P_unseen_total            # total prob mass assigned to unseen types
    
    # Try to compute Nc0 (number of unseen types)
    V = smoother.vocab_size
    n = smoother.n
    est_unseen_types = None
    nc0_str = None
    if n == 1:
        # exact
        nc0 = max(0, V - num_seen_types)
        est_unseen_types = nc0
        nc0_str = str(nc0)
    else:
        # estimate if V^n is small enough
        try:
            possible_types = V ** n
            if possible_types <= infeasible_threshold:
                nc0 = max(0, int(possible_types) - num_seen_types)
                est_unseen_types = nc0
                nc0_str = str(nc0)
            else:
                nc0_str = f"infeasible (V^{n} = {possible_types:,} > {int(infeasible_threshold):,})"
                est_unseen_types = None
        except OverflowError:
            nc0_str = "infeasible (overflow)"
            est_unseen_types = None

    # Build rows for C = 1..max_c
    rows = []
    cum_seen = 0
    for c in range(1, max_c + 1):
        Nc = smoother.N_c.get(c, 0)
        # adjusted c* if Nc>0 (safe fallback inside smoother.get_adjusted_count)
        c_star = smoother.get_adjusted_count(c) if Nc > 0 else 0.0
        cum_seen += Nc
        rows.append({
            'C': c,
            'Nc': Nc,
            'C*': f"{c_star:.6f}",
            'CumSeen': cum_seen
        })
    
    # Summaries
    total_Nc_seen = sum(smoother.N_c[c] for c in smoother.N_c if c >= 1)
    # sanity check: total_Nc_seen should equal num_seen_types
    sanity = "(OK)" if total_Nc_seen == num_seen_types else f"(mismatch: sum Nc={total_Nc_seen} != seen={num_seen_types})"

    # For C=0 per-type adjusted estimate (if computable)
    if est_unseen_types is not None and est_unseen_types > 0:
        per_unseen_prob = P_unseen_total / est_unseen_types if est_unseen_types > 0 else 0.0
        c0_display = f"Nc0 = {est_unseen_types}, per-type prob â‰ˆ {per_unseen_prob:.4e}"
    else:
        c0_display = f"Nc0 = {nc0_str}; total unseen probability mass = {P_unseen_total:.6e}"

    # Print summary block
    print(f"Vocab size (V): {V}")
    print(f"n-gram order (n): {n}")
    print(f"Observed (seen) types: {num_seen_types} {sanity}")
    print(f"Total observed tokens (N): {total_observed_tokens}")
    print(f"N1 (types with count==1): {N1}")
    print(f"Total prob mass for unseen types (P_unseen_total): {P_unseen_total:.6e}")
    print(f"For C=0: {c0_display}")
    print("")

    # Print the detailed table as a pandas DataFrame for readability
    df = pd.DataFrame(rows)
    pd.set_option('display.max_rows', None)
    pd.set_option('display.width', 200)
    print(df.to_string(index=False))

    # Also print the top few Nc entries (if many zeros)
    nonzero_Nc = {c: smoother.N_c[c] for c in sorted(smoother.N_c) if smoother.N_c[c] > 0}
    top_items = list(nonzero_Nc.items())[:20]
    if top_items:
        print("\nTop non-zero Nc (count -> Nc):")
        for c, nc in top_items:
            print(f"  Count {c:3d} -> Nc = {nc}")


# --- 4. Deleted Interpolation Implementation ---

def find_best_lambdas(val_sentences, counts, vocab_size):
    """Finds the best lambda values for a quadrigram model using the validation set."""
    print("\n--- Finding best lambdas for Deleted Interpolation ---")
    
    best_lambdas = {}
    best_log_prob = -np.inf
    
    # Simple grid search over possible lambda values
    for l4 in np.arange(0.1, 1.0, 0.2):
        for l3 in np.arange(0.1, 1.0 - l4, 0.2):
            for l2 in np.arange(0.1, 1.0 - l4 - l3, 0.2):
                l1 = 1.0 - l4 - l3 - l2
                if l1 < 0: continue
                
                lambdas = {'l4': l4, 'l3': l3, 'l2': l2, 'l1': l1}
                current_log_prob = 0
                
                for sentence in val_sentences:
                    current_log_prob += calculate_interpolated_sentence_prob(sentence, lambdas, counts)
                
                print(f"Trying lambdas {l4:.1f}, {l3:.1f}, {l2:.1f}, {l1:.1f} -> LogProb: {current_log_prob:.2f}")
                
                if current_log_prob > best_log_prob:
                    best_log_prob = current_log_prob
                    best_lambdas = lambdas
                    
    print(f"\nBest lambdas found: {best_lambdas} with log probability {best_log_prob:.2f}")
    return best_lambdas

def calculate_interpolated_sentence_prob(sentence, lambdas, counts):
    """Calculates sentence probability using an interpolated quadrigram model."""
    log_prob = 0.0
    epsilon = 1e-12
    
    unigrams, bigrams, trigrams, quadrigrams = counts['unigram'], counts['bigram'], counts['trigram'], counts['quadrigram']
    
    padded_sentence = ['<s>'] * 3 + sentence
    
    for i in range(3, len(padded_sentence)):
        w_i = padded_sentence[i]
        c4 = tuple(padded_sentence[i-3:i])
        c3 = tuple(padded_sentence[i-2:i])
        c2 = tuple(padded_sentence[i-1:i])
        c1 = tuple(padded_sentence[i-1:i-0]) # should be just w_{i-1} for bigram context

        p4 = quadrigrams.get((*c4, w_i), 0) / trigrams.get(c4, 1)
        p3 = trigrams.get((*c3, w_i), 0) / bigrams.get(c3, 1)
        p2 = bigrams.get((*c2, w_i), 0) / unigrams.get(c2[0], 1)
        p1 = unigrams.get(w_i, 0) / sum(unigrams.values())
        
        prob = (lambdas['l4'] * p4) + (lambdas['l3'] * p3) + \
               (lambdas['l2'] * p2) + (lambdas['l1'] * p1)
               
        log_prob += np.log(prob + epsilon)
        
    return log_prob

# --- 5. Main Execution Block ---

def main():
    start_time = time.time()
    
    # 1. Load and split the data
    train_data, val_data, test_data = load_and_split_data(PARQUET_FILE_PATH, COLUMN_NAME)
    if train_data is None: return
    
    # 2. Prepare (tokenize) the datasets
    train_sents = prepare_sentences(train_data)
    val_sents = prepare_sentences(val_data)
    test_sents = prepare_sentences(test_data)
    
    # 3. Build n-gram models on the TRAINING set
    counts = build_ngram_models(train_sents)
    vocab_size = len(counts['unigram'])
    print(f"Vocabulary Size (V): {vocab_size}")
    
    # 4. Implement Good-Turing Smoothing
    gt_unigram = GoodTuringSmoother(counts['unigram'], vocab_size, 1)
    gt_bigram = GoodTuringSmoother(counts['bigram'], vocab_size, 2)
    
    # 5. Display Good-Turing Tables
    display_good_turing_table(gt_unigram, "Unigram")
    display_good_turing_table(gt_bigram, "Bigram")

    # 6. Evaluate Good-Turing on validation and test sets
    val_prob_gt_bigram = sum(gt_bigram.calculate_sentence_prob(s) for s in val_sents)
    test_prob_gt_bigram = sum(gt_bigram.calculate_sentence_prob(s) for s in test_sents)
    print(f"\nTotal Log Probability of Bigram GT on Validation Set: {val_prob_gt_bigram:.2f}")
    print(f"Total Log Probability of Bigram GT on Test Set: {test_prob_gt_bigram:.2f}")
    
    # 7. Implement and evaluate Deleted Interpolation
    best_lambdas = find_best_lambdas(val_sents, counts, vocab_size)
    
    # 8. Evaluate the final interpolated model on the TEST set
    test_prob_interp = sum(calculate_interpolated_sentence_prob(s, best_lambdas, counts) for s in test_sents)
    print(f"\nTotal Log Probability of Interpolated Quadrigram on Test Set: {test_prob_interp:.2f}")

    print(f"\nTotal execution time: {time.time() - start_time:.2f} seconds.")

if __name__ == "__main__":
    main()

Loading and splitting data...
  Training set size: 98000
  Validation set size: 1000
  Test set size: 1000
Building language models on the training set...
Models built successfully.
Vocabulary Size (V): 142817
Initializing Good-Turing for 1-grams...
Initializing Good-Turing for 2-grams...

--- Good-Turing Frequency Table for Unigram (n=1) ---
Vocab size (V): 142817
n-gram order (n): 1
Observed (seen) types: 142817 (OK)
Total observed tokens (N): 1951974
N1 (types with count==1): 84806
Total prob mass for unseen types (P_unseen_total): 4.344628e-02
For C=0: Nc0 = 0; total unseen probability mass = 4.344628e-02

 C    Nc        C*  CumSeen
 1 84806  0.441643    84806
 2 18727  1.391146   103533
 3  8684  2.405804   112217
 4  5223  3.298870   117440
 5  3446  4.572258   120886
 6  2626  5.421935   123512
 7  2034  6.383481   125546
 8  1623  7.264325   127169
 9  1310  8.496183   128479
10  1113  9.013477   129592
11   912 10.210526   130504
12   776 12.346649   131280
13   737 12.233379