
# LLMs: N-gram Model (Python)

**Dataset:** `/anvil/projects/tdm/data/amazon/music.txt` (~2GB).  


**What is built build**
- Robust `read_lines` (streamed) and enhanced `clean_text` (lowercasing, escape decoding, alpha-only)
- A reusable `NGram` class with:
  - `set_data`
  - `generate_ngrams`
  - `generate_ngram_probabilities`
  - `get_next_word` (random/common/uncommon)
  - `get_perplexity`
- Test calls mirroring the project prompts


In [2]:
from pathlib import Path
import codecs
import numpy as np
from typing import List, Tuple, Dict, Union

DATA_PATH = Path('/anvil/projects/tdm/data/amazon/music.txt')


---
## Question 1 — set_data with helpers

**Deliverables**
- Modified `clean_text` (lowercase + alpha-only)
- Implemented `set_data`
- Passes provided tests


In [5]:

def read_lines(file_path: Union[str, Path], n: int, start: int = 0) -> List[str]:
    """Stream n lines starting at 0-based index 'start'."""
    file_path = Path(file_path)
    out = []
    with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
        for i, line in enumerate(f):
            if i < start:
                continue
            if i >= start + n:
                break
            out.append(line)
    return out

def clean_text(text: str) -> str:
    """Decode escapes, strip surrounding quotes, normalize whitespace,
    lowercase, and keep only alphabet + spaces."""
    if text is None:
        return ""
    try:
        decoded = codecs.decode(text, 'unicode_escape')
    except Exception:
        decoded = text
    s = decoded.strip()
    # Remove starting/ending quotes if both present
    if len(s) >= 2 and s[0] == '"' and s[-1] == '"':
        s = s[1:-1]
    # Replace actual control chars with space
    s = s.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ')
    # Lowercase
    s = s.lower()
    # Keep only letters and spaces
    s = ''.join(c for c in s if c.isalpha() or c.isspace())
    # Normalize multiple spaces
    s = ' '.join(s.split())
    return s

class NGram:
    def __init__(self, n: int, is_character_based: bool = False):
        self.n = int(n)
        self.is_character_based = bool(is_character_based)
        self.data = None  # list[str] for character mode; list[tuple[str,...]] for word mode
        self.ngrams = set()
        self.ngram_frequencies: Dict[Union[str, Tuple[str,...]], int] = {}
        # Probabilities: context (len n-1) -> { next_token: prob }
        self.ngram_probabilities: Dict[Union[str, Tuple[str,...]], Dict[str, float]] = {}

    def set_data(self, data: List[str]):
        """Ingest raw lines; clean; then:
        - character-based: self.data = list[str] (one string per line)
        - word-based: self.data = list[tuple[str,...]] (hashable sequences)"""
        if not isinstance(data, list):
            return
        cleaned = [clean_text(s) for s in data if isinstance(s, str)]
        if self.is_character_based:
            # Keep as list of strings (characters will be slid over these strings)
            self.data = [s for s in cleaned if s]  # drop empties
        else:
            # Split into tuples-of-words for hashability
            sequences = []
            for s in cleaned:
                if not s:
                    continue
                toks = s.split()
                if toks:
                    sequences.append(tuple(toks))
            self.data = sequences

# Tests from prompt for set_data
print("Character-based set_data test:")
my_ngram = NGram(2, is_character_based=True)
my_ngram.set_data(['"hello!"'])
print(my_ngram.data)  # expect ['hello']

print("\nWord-based set_data test (first 2 lines):")
my_ngram = NGram(2, is_character_based=False)
my_ngram.set_data(read_lines(DATA_PATH, 2))
print(my_ngram.data[:2])


Character-based set_data test:
['hello']

Word-based set_data test (first 2 lines):
[('i', 'love', 'this', 'cd', 'so', 'inspiring'), ('love', 'it', 'great', 'seller')]



---
## Question 2 — generate_ngrams

Build frequency dictionary for each n-gram:
- **Character-based**: slide window of size `n` over each string
- **Word-based**: slide window of size `n` over each tuple of tokens

Return and store in `self.ngram_frequencies`.


In [9]:
def _windows(seq, n):
    for i in range(len(seq) - n + 1):
        yield seq[i:i+n]

def _add_freq(freq: dict, key):
    freq[key] = freq.get(key, 0) + 1

def generate_ngrams(self) -> Dict[Union[str, Tuple[str,...]], int]:
    if not isinstance(self.data, list) or self.n <= 0:
        self.ngram_frequencies = {}
        return {}
    freq = {}
    if self.is_character_based:
        for s in self.data:
            if len(s) < self.n:
                continue
            for win in _windows(s, self.n):
                _add_freq(freq, win)  # win is a string slice
    else:
        for tup in self.data:
            if len(tup) < self.n:
                continue
            for win in _windows(tup, self.n):
                _add_freq(freq, tuple(win))  # ensure tuple key
    self.ngram_frequencies = freq
    self.ngrams = set(freq.keys())
    return freq

# Bind to class
setattr(NGram, "generate_ngrams", generate_ngrams)

# Tests (character & word for a specific line index)
print("Generate ngrams (char-based) — 1 line at index 14:")
my_ngram = NGram(2, is_character_based=True)
my_ngram.set_data(read_lines(DATA_PATH, 1, 14))
print(my_ngram.generate_ngrams())

print("\nGenerate ngrams (word-based) — 1 line at index 14:")
my_ngram = NGram(2, is_character_based=False)
my_ngram.set_data(read_lines(DATA_PATH, 1, 14))
print(my_ngram.generate_ngrams())


Generate ngrams (char-based) — 1 line at index 14:
{'ha': 3, 'ad': 1, 'd ': 3, ' t': 3, 'th': 4, 'hi': 2, 'is': 2, 's ': 5, ' a': 4, 'as': 1, 'an': 1, 'n ': 2, 'al': 2, 'lb': 1, 'bu': 1, 'um': 1, 'm ': 1, ' b': 1, 'ba': 1, 'ac': 1, 'ck': 1, 'k ': 1, ' i': 1, 'in': 1, 'he': 1, 'e ': 4, ' d': 1, 'da': 1, 'ay': 2, 'y ': 2, ' h': 2, 'av': 2, 've': 3, 'lw': 1, 'wa': 1, 'ys': 1, ' e': 2, 'en': 2, 'nj': 1, 'jo': 1, 'oy': 1, 'ye': 1, 'ed': 1, ' k': 1, 'ki': 1, 'ie': 1, 'et': 1, 'h ': 1, ' g': 1, 'gr': 1, 're': 1, 'ee': 1, 'ns': 1, ' m': 1, 'mu': 1, 'us': 1, 'si': 1, 'ic': 1, 'c ': 1, 'ev': 1, 'er': 1, 'ry': 1, ' o': 1, 'on': 1, 'ne': 1, ' s': 1, 'sh': 1, 'ho': 1, 'ou': 1, 'ul': 1, 'ld': 1, ' c': 1, 'cd': 1}

Generate ngrams (word-based) — 1 line at index 14:
{('had', 'this'): 1, ('this', 'as'): 1, ('as', 'an'): 1, ('an', 'album'): 1, ('album', 'back'): 1, ('back', 'in'): 1, ('in', 'the'): 1, ('the', 'day'): 1, ('day', 'have'): 1, ('have', 'always'): 1, ('always', 'enjoyed'): 1, ('enjoyed', 'ki


---
## Question 3 — generate_ngram_probabilities

Create nested mapping: **context (n−1)** → { **next**: probability } using frequency counts.


In [10]:
def generate_ngram_probabilities(self):
    if not isinstance(self.ngram_frequencies, dict) or not self.ngram_frequencies:
        # If not yet built, try to build
        self.generate_ngrams()
        if not self.ngram_frequencies:
            self.ngram_probabilities = {}
            return {}
    probs = {}
    for ngram, cnt in self.ngram_frequencies.items():
        if self.is_character_based:
            context = ngram[:-1]  # string slice
            nxt = ngram[-1]
        else:
            # ngram is a tuple of tokens
            context = ngram[:-1]
            nxt = ngram[-1]
        # Normalize key types: context stays same type; next must be str
        nxt_str = nxt if isinstance(nxt, str) else str(nxt)
        # Accumulate counts by context
        if context not in probs:
            probs[context] = {}
        probs[context][nxt_str] = probs[context].get(nxt_str, 0) + cnt

    # Convert counts to probabilities
    for ctx, nxt_dict in probs.items():
        total = float(sum(nxt_dict.values()))
        for k in list(nxt_dict.keys()):
            nxt_dict[k] = nxt_dict[k] / total
    self.ngram_probabilities = probs
    return probs

# Bind to class
setattr(NGram, "generate_ngram_probabilities", generate_ngram_probabilities)

# Test from prompt (3-gram, word-based)
print("generate_ngram_probabilities — 3-gram word-based on 5 lines starting at 3:")
my_ngram = NGram(3, is_character_based=False)
my_ngram.set_data(read_lines(DATA_PATH, 5, 3))
probs = my_ngram.generate_ngram_probabilities()
# Print a small slice
items = list(probs.items())[:10]
for k,v in items:
    print(k, "->", v)


generate_ngram_probabilities — 3-gram word-based on 5 lines starting at 3:
('as', 'good') -> {'as': 1.0}
('good', 'as') -> {'i': 1.0}
('as', 'i') -> {'remember': 1.0}
('i', 'remember') -> {'back': 1.0}
('remember', 'back') -> {'when': 1.0}
('back', 'when') -> {'i': 1.0}
('when', 'i') -> {'bought': 1.0}
('i', 'bought') -> {'the': 1.0}
('bought', 'the') -> {'one': 1.0}
('the', 'one') -> {'in': 1.0}



---
## Question 4 — get_next_word

Given previous (n−1) tokens/chars, return:
- **random**: sample via probabilities
- **common**: most-probable next
- **uncommon**: least-probable next


In [11]:
def _normalize_input_for_context(s: str, is_char: bool, n_minus_1: int):
    """Return context key (matching how we stored contexts)."""
    s = clean_text(s)  # same cleaning pipeline as training
    if is_char:
        # Need exactly n-1 characters
        if len(s) != n_minus_1:
            return None
        return s  # string slice used as key
    else:
        toks = tuple(s.split())
        if len(toks) != n_minus_1:
            return None
        return toks

def get_next_word(self, previous_words: str, method: str = 'random') -> str:
    if not isinstance(self.ngram_probabilities, dict) or not self.ngram_probabilities:
        return ""
    n_minus_1 = self.n - 1
    ctx = _normalize_input_for_context(previous_words, self.is_character_based, n_minus_1)
    if ctx is None or ctx not in self.ngram_probabilities:
        return ""
    candidates = list(self.ngram_probabilities[ctx].keys())
    probs = np.array([self.ngram_probabilities[ctx][k] for k in candidates], dtype=float)
    if method == 'random':
        # Safe: normalize in case of tiny rounding issues
        probs = probs / probs.sum()
        return np.random.choice(candidates, p=probs)
    elif method == 'common':
        max_p = probs.max()
        # Tie-break deterministically by lexicographic order
        winners = [c for c, p in zip(candidates, probs) if p == max_p]
        return sorted(winners)[0]
    elif method == 'uncommon':
        min_p = probs.min()
        losers = [c for c, p in zip(candidates, probs) if p == min_p]
        return sorted(losers)[0]
    else:
        return ""

# Bind
setattr(NGram, "get_next_word", get_next_word)

# Tests from prompt (seeded random for reproducibility)
print("get_next_word tests on 3-gram word-based, 300 lines starting at 50:")
np.random.seed(18)
ng = NGram(3, is_character_based=False)
ng.set_data(read_lines(DATA_PATH, 300, 50))
ng.generate_ngram_probabilities()
print("random #1:", ng.get_next_word('is a', method='random'))
print("random #2:", ng.get_next_word('is a', method='random'))
print("random #3:", ng.get_next_word('is a', method='random'))
print("common:", ng.get_next_word('is a', method='common'))
print("uncommon:", ng.get_next_word('is a', method='uncommon'))


get_next_word tests on 3-gram word-based, 300 lines starting at 50:
random #1: truly
random #2: very
random #3: show
common: great
uncommon: beautiful



---
## Question 5 — get_perplexity

Perplexity \(= \exp\left(-\tfrac{1}{N}\sum_i \log p_i\right)\)  
If an n-gram is unseen, use probability **1e-5** (heavy penalty).


In [13]:
def get_perplexity(self, text: str) -> float:
    if not isinstance(self.ngram_probabilities, dict) or not self.ngram_probabilities:
        return float('inf')
    text_clean = clean_text(text)
    if self.is_character_based:
        seq = text_clean  # string
        if len(seq) < self.n:
            return float('inf')
        # iterate over consecutive n-grams
        probs = []
        for i in range(len(seq) - self.n + 1):
            context = seq[i:i+self.n-1]
            nxt = seq[i+self.n-1]
            ctx = context
            if ctx in self.ngram_probabilities and nxt in self.ngram_probabilities[ctx]:
                p = self.ngram_probabilities[ctx][nxt]
            else:
                p = 1e-5
            probs.append(p)
    else:
        toks = tuple(text_clean.split())
        if len(toks) < self.n:
            return float('inf')
        probs = []
        for i in range(len(toks) - self.n + 1):
            context = toks[i:i+self.n-1]
            nxt = toks[i+self.n-1]
            if context in self.ngram_probabilities and nxt in self.ngram_probabilities[context]:
                p = self.ngram_probabilities[context][nxt]
            else:
                p = 1e-5
            probs.append(p)
    if not probs:
        return float('inf')
    probs = np.array(probs, dtype=float)
    # Natural log base (e)
    return float(np.exp(-np.mean(np.log(probs))))

# Bind
setattr(NGram, "get_perplexity", get_perplexity)

# Example tests (values depend on the actual dataset used)
ng2 = NGram(3, is_character_based=False)
ng2.set_data(read_lines(DATA_PATH, 2000, 0))  # fewer lines for quick demo
ng2.generate_ngram_probabilities()
print("Perplexity examples:")
for s in ["is a great cd", "is a good cd", "is a bad cd",
          "this music is a wonderful experience and i love it"]:
    print(s, "->", ng2.get_perplexity(s))


Perplexity examples:
is a great cd -> 8.658734561731656
is a good cd -> 38.683904203744014
is a bad cd -> 100000.00000000001
this music is a wonderful experience and i love it -> 143.91101930501804


Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
KeyboardInterrupt
