# Lab 6 – Advanced Language Models and Generation

This notebook implements:

- Katz Backoff for a 4-gram model
- Interpolated Kneser–Ney smoothing for a 4-gram model
- Sentence generation (100 sentences) for n-grams n∈{1,2,3,4} using:
  - Greedy (MLE)
  - Beam search (beam=20)

It builds on the tokenized Gujarati dataset prepared in Lab 1.

In [3]:
import json
import math
import random
from collections import Counter, defaultdict
from typing import List, Tuple, Dict

# Reproducibility
SEED = 42
random.seed(SEED)

BOS = "<s>"
EOS = "</s>"

# Load tokenized dataset (from lab1)
with open("../lab1/gujarati_corpus_tokenized.json", "r", encoding="utf-8") as f:
    data = json.load(f)

# Extract sentences as token lists (keeping punctuation tokens if present)
all_sentences_tokens: List[List[str]] = []
for doc in data:
    for s in doc.get("sentences", []):
        tokens = s.get("words", [])
        if tokens:
            all_sentences_tokens.append(tokens)

num_sentences = len(all_sentences_tokens)
print(f"Total sentences available: {num_sentences}")

# Create 100/100/remaining splits
indices = list(range(num_sentences))
random.shuffle(indices)
val_size = min(100, num_sentences)
test_size = min(100, max(0, num_sentences - val_size))
val_idx = set(indices[:val_size])
test_idx = set(indices[val_size:val_size + test_size])

val_set = [all_sentences_tokens[i] for i in val_idx]
test_set = [all_sentences_tokens[i] for i in test_idx]
train_set = [all_sentences_tokens[i] for i in indices if i not in val_idx and i not in test_idx]

print(f"Split sizes -> train: {len(train_set)}, val: {len(val_set)}, test: {len(test_set)}")

# Wrap sentences with BOS/EOS for n-gram building
train_wrapped = [[BOS, BOS, BOS] + s + [EOS] for s in train_set]
val_wrapped = [[BOS, BOS, BOS] + s + [EOS] for s in val_set]
test_wrapped = [[BOS, BOS, BOS] + s + [EOS] for s in test_set]

Total sentences available: 1631
Split sizes -> train: 1431, val: 100, test: 100


In [4]:
def build_vocab(sentences: List[List[str]]) -> Counter:
    vocab = Counter()
    for s in sentences:
        vocab.update(s)
    return vocab


def ngrams(seq: List[str], n: int) -> List[Tuple[str, ...]]:
    return [tuple(seq[i:i+n]) for i in range(len(seq) - n + 1)]


def build_ngram_counts(sentences: List[List[str]], max_n: int = 4):
    counts = {n: Counter() for n in range(1, max_n + 1)}
    for s in sentences:
        for n in range(1, max_n + 1):
            counts[n].update(ngrams(s, n))
    return counts


vocab = build_vocab(train_wrapped)
V = len(vocab)
counts = build_ngram_counts(train_wrapped, 4)

# Precompute context counts for conditional probabilities
context_counts = {n: Counter() for n in range(2, 5)}  # for n>=2
for n in range(2, 5):
    for ng, c in counts[n].items():
        ctx = ng[:-1]
        context_counts[n][ctx] += c

print("Vocab size:", V)
print("Some 4-gram examples:", list(counts[4].items())[:3])

Vocab size: 7701
Some 4-gram examples: [(('<s>', '<s>', '<s>', 'રજાઓ'), 1), (('<s>', '<s>', 'રજાઓ', 'અને'), 1), (('<s>', 'રજાઓ', 'અને', 'સપ્તાહના'), 1)]


In [5]:
# Katz Backoff (Good-Turing based) for up to 4-grams

def good_turing_discount(c: int, Nc: Dict[int, int]):
    # Simple Good-Turing: c* = (c+1) * N_{c+1}/N_c if available, else c
    if c == 0:
        return 0.0
    Nc_c = Nc.get(c, 0)
    Nc_cp1 = Nc.get(c + 1, 0)
    if Nc_c > 0 and Nc_cp1 > 0:
        return (c + 1) * (Nc_cp1 / Nc_c)
    return float(c)


def count_of_counts(counter: Counter) -> Dict[int, int]:
    Nc = Counter(counter.values())
    return dict(Nc)


Nc_tables = {n: count_of_counts(counts[n]) for n in range(1, 5)}


def katz_prob(word: str, context: Tuple[str, ...]):
    # context length can be 0..3. We try highest order then backoff.
    n = len(context) + 1
    if n == 1:
        # Unigram: MLE with add-small epsilon to avoid zero
        total = sum(counts[1].values())
        return counts[1][(word,)] / total if total else 1.0 / max(1, V)

    # Attempt n-gram
    full = context + (word,)
    c_full = counts[n][full]
    c_ctx = context_counts[n][context] if n >= 2 else sum(counts[1].values())

    if c_full > 0 and c_ctx > 0:
        # Discounted probability
        c_star = good_turing_discount(c_full, Nc_tables[n])
        return c_star / c_ctx

    # Backoff mass alpha(context)
    # alpha = leftover mass for unseen continuations of this context
    # alpha = 1 - sum_{w: c(context,w)>0} discounted_prob(context,w)
    if c_ctx == 0:
        # no evidence for this context -> back off immediately
        shorter = context[1:] if len(context) > 0 else ()
        return katz_prob(word, shorter)

    seen_words = [full_w[-1] for full_w in counts[n] if full_w[:-1] == context]
    mass_seen = 0.0
    for w in seen_words:
        c = counts[n][context + (w,)]
        c_star = good_turing_discount(c, Nc_tables[n])
        mass_seen += c_star / c_ctx
    alpha = max(0.0, 1.0 - mass_seen)

    # Normalization denominator for backoff distribution over unseen words
    # denom = sum of lower-order probabilities for words not seen in this context
    shorter = context[1:] if len(context) > 0 else ()
    denom = 0.0
    for w in vocab.keys():
        if counts[n][context + (w,)] == 0:
            denom += katz_prob(w, shorter)
    if denom == 0:
        # fall back to uniform tiny prob
        return alpha / V
    return alpha * (katz_prob(word, shorter) / denom)


def sentence_logprob_katz(tokens: List[str]) -> float:
    seq = [BOS, BOS, BOS] + tokens + [EOS]
    logp = 0.0
    for i in range(3, len(seq)):
        ctx = tuple(seq[i-3:i])
        w = seq[i]
        p = max(1e-12, katz_prob(w, ctx))
        logp += math.log(p)
    return logp

In [6]:
# Interpolated Kneser-Ney for up to 4-grams

D = 0.75  # standard discount

# Precompute: number of unique continuations and unique histories
unique_continuations = {n: defaultdict(set) for n in range(2, 5)}  # context -> set of next words
unique_histories = {n: defaultdict(set) for n in range(2, 5)}      # word -> set of previous contexts

for n in range(2, 5):
    for ng, c in counts[n].items():
        ctx, w = ng[:-1], ng[-1]
        unique_continuations[n][ctx].add(w)
        unique_histories[n][w].add(ctx)

# For unigrams: Kneser-Ney base uses continuation counts
total_unique_bigrams = len(counts[2]) if counts[2] else 1


def P_continuation(w: str) -> float:
    # number of distinct contexts that w follows divided by total distinct bigrams
    return len(unique_histories[2].get(w, set())) / total_unique_bigrams


def kn_prob(word: str, context: Tuple[str, ...]) -> float:
    m = len(context)
    if m == 0:
        # unigram KN
        p = P_continuation(word)
        if p == 0:
            return 1.0 / V
        return p
    # use order m+1 model
    n = m + 1
    c_ctx = context_counts[n][context]
    c_full = counts[n][context + (word,)]
    if c_ctx > 0:
        # discounted MLE part
        discounted = max(c_full - D, 0) / c_ctx
        # backoff weight
        lambda_ctx = (D * len(unique_continuations[n].get(context, set()))) / c_ctx
        return discounted + lambda_ctx * kn_prob(word, context[1:])
    else:
        # no evidence for context; back off
        return kn_prob(word, context[1:])


def sentence_logprob_kn(tokens: List[str]) -> float:
    seq = [BOS, BOS, BOS] + tokens + [EOS]
    logp = 0.0
    for i in range(3, len(seq)):
        ctx = tuple(seq[i-3:i])
        w = seq[i]
        p = max(1e-12, kn_prob(w, ctx))
        logp += math.log(p)
    return logp

In [7]:
import heapq

MAX_LEN = 30


def mle_prob(word: str, context: Tuple[str, ...]) -> float:
    n = len(context) + 1
    if n == 1:
        total = sum(counts[1].values())
        return counts[1][(word,)] / total if total else 1.0 / max(1, V)
    c_ctx = context_counts[n][context]
    if c_ctx == 0:
        return mle_prob(word, context[1:])
    return counts[n][context + (word,)] / c_ctx


def greedy_generate(n: int, model: str = "mle") -> List[str]:
    seq = [BOS, BOS, BOS]
    for _ in range(MAX_LEN):
        context = tuple(seq[-(n-1):]) if n > 1 else tuple()
        best_w = None
        best_p = -1.0
        for w in vocab.keys():
            if w == BOS:
                continue
            if model == "mle":
                p = mle_prob(w, context)
            elif model == "katz":
                p = katz_prob(w, context)
            elif model == "kn":
                p = kn_prob(w, context)
            else:
                p = mle_prob(w, context)
            if p > best_p:
                best_p = p
                best_w = w
        if best_w is None or best_w == EOS:
            break
        seq.append(best_w)
        if best_w == EOS:
            break
    # Trim BOS and stop at EOS
    out = [t for t in seq[3:] if t != EOS]
    return out


def beam_search_generate(n: int, beam_size: int = 20, model: str = "mle") -> List[str]:
    # each hypothesis: (neg_logp, seq)
    start = [BOS, BOS, BOS]
    beam = [(0.0, start)]
    completed = []
    for _ in range(MAX_LEN):
        new_beam = []
        for neg_logp, seq in beam:
            context = tuple(seq[-(n-1):]) if n > 1 else tuple()
            for w in vocab.keys():
                if w == BOS:
                    continue
                if model == "mle":
                    p = mle_prob(w, context)
                elif model == "katz":
                    p = katz_prob(w, context)
                elif model == "kn":
                    p = kn_prob(w, context)
                else:
                    p = mle_prob(w, context)
                p = max(p, 1e-12)
                new_seq = seq + [w]
                new_score = neg_logp - math.log(p)
                if w == EOS:
                    completed.append((new_score, new_seq))
                else:
                    new_beam.append((new_score, new_seq))
        # prune
        beam = sorted(new_beam)[:beam_size]
        if not beam:
            break
    if completed:
        best = min(completed, key=lambda x: x[0])[1]
    else:
        best = min(beam, key=lambda x: x[0])[1]
    out = [t for t in best[3:] if t != EOS]
    return out

In [None]:
# Generate 100 sentences for n=1..4 using Greedy (MLE), Katz, and Kneser-Ney

generated = {"greedy_mle": {}, "greedy_katz": {}, "greedy_kn": {},
             "beam_mle": {}, "beam_katz": {}, "beam_kn": {}}

for n in [1, 2, 3, 4]:
    greedy_mle_list = [" ".join(greedy_generate(n, model="mle")) for _ in range(100)]
    greedy_katz_list = [" ".join(greedy_generate(n, model="katz")) for _ in range(100)]
    greedy_kn_list = [" ".join(greedy_generate(n, model="kn")) for _ in range(100)]

    beam_mle_list = [" ".join(beam_search_generate(n, 20, model="mle")) for _ in range(100)]
    beam_katz_list = [" ".join(beam_search_generate(n, 20, model="katz")) for _ in range(100)]
    beam_kn_list = [" ".join(beam_search_generate(n, 20, model="kn")) for _ in range(100)]

    generated["greedy_mle"][n] = greedy_mle_list
    generated["greedy_katz"][n] = greedy_katz_list
    generated["greedy_kn"][n] = greedy_kn_list
    generated["beam_mle"][n] = beam_mle_list
    generated["beam_katz"][n] = beam_katz_list
    generated["beam_kn"][n] = beam_kn_list

# Show few samples
for key in ["greedy_mle", "greedy_katz", "greedy_kn", "beam_mle", "beam_katz", "beam_kn"]:
    print(f"\n=== {key} ===")
    for n in [1, 2, 3, 4]:
        print(f"n={n} -> {generated[key][n][0][:200]}")