# 04 — Redundancy Metrics (Surface Predictability)

This notebook computes surface-form redundancy metrics for reading materials.

Why:
- Semantic novelty measures "new meaning over time"
- Redundancy measures "how repetitive / predictable the text is"

High redundancy (high compressibility, low entropy) can indicate an over-constrained text environment.

Inputs:
- `data/texts_clean/chunks.jsonl`

Outputs:
- `data/redundancy/redundancy_per_chunk.jsonl`
- `data/redundancy/redundancy_summary_per_doc.jsonl`

Note:
These are **materials-side** metrics. They do not diagnose student boredom.


## Imports + paths

In [None]:
from __future__ import annotations

import json
import re
import gzip
import math
from pathlib import Path
from typing import Dict, Any, List, Tuple, Optional
from collections import Counter, defaultdict

import numpy as np

from _paths import set_repo_root
ROOT = set_repo_root()

CHUNKS_IN = ROOT / "data" / "texts_clean" / "chunks.jsonl"

OUT_DIR = ROOT / "data" / "redundancy"
OUT_DIR.mkdir(parents=True, exist_ok=True)

PER_CHUNK_OUT = OUT_DIR / "redundancy_per_chunk.jsonl"
PER_DOC_OUT = OUT_DIR / "redundancy_summary_per_doc.jsonl"

print("Input:", CHUNKS_IN.resolve())
print("Output dir:", OUT_DIR.resolve())


## Load chunks

In [None]:
def read_jsonl(path: Path) -> List[Dict[str, Any]]:
    rows = []
    with path.open("r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            rows.append(json.loads(line))
    return rows

chunks = read_jsonl(CHUNKS_IN)
print("Loaded chunks:", len(chunks))

required = {"chunk_id", "doc_id", "title", "chunk_index", "chunk_type", "text"}
bad = [i for i, r in enumerate(chunks) if not required.issubset(r.keys())]
if bad:
    raise ValueError(f"Missing required keys at row index: {bad[0]}")

# Sort deterministically
chunks = sorted(chunks, key=lambda r: (r["title"], r["doc_id"], r["chunk_index"], r["chunk_id"]))
print("Example chunk:", chunks[0]["title"], "idx", chunks[0]["chunk_index"])
print(chunks[0]["text"][:250])


## Tokenization / normalization utilities

We keep punctuation-light tokens for lexical stats. Keep a separate "char stream" for entropy.

In [None]:
RE_WS = re.compile(r"\s+")
RE_SENT_SPLIT = re.compile(r"(?<=[.!?])\s+")
RE_TOKEN = re.compile(r"[a-zA-Z]+(?:'[a-zA-Z]+)?|\d+")

def normalize_spaces(s: str) -> str:
    s = s.replace("\u00a0", " ")
    s = s.replace("\r\n", "\n").replace("\r", "\n")
    s = RE_WS.sub(" ", s)
    return s.strip()

def tokenize_words(s: str) -> List[str]:
    s = s.lower()
    return RE_TOKEN.findall(s)

def split_sentences(s: str) -> List[str]:
    s = normalize_spaces(s)
    if not s:
        return []
    return [x.strip() for x in RE_SENT_SPLIT.split(s) if x.strip()]


## Metric 1: gzip compression ratio

Higher compression ratio -> more redundancy.

We define:

- `gzip_ratio = compressed_bytes / raw_bytes`
   - Lower is "more compressible"; to align with "redundancy score high = more redundant", we define:

- `redundancy_gzip = 1 - gzip_ratio` (bounded roughly 0..1-ish, not strict)

In [None]:
def gzip_ratio(text: str) -> Tuple[float, int, int]:
    raw = text.encode("utf-8", errors="ignore")
    if len(raw) == 0:
        return float("nan"), 0, 0
    comp = gzip.compress(raw)
    return (len(comp) / len(raw)), len(comp), len(raw)

def redundancy_from_gzip_ratio(r: float) -> float:
    if not np.isfinite(r):
        return float("nan")
    return float(1.0 - r)


## Metric 2: character n-gram entropy (predictability)

Compute Shannon entropy over character n-grams (default n=3). Lower entropy -> more predictable.

In [None]:
def char_ngrams(s: str, n: int = 3) -> List[str]:
    s = normalize_spaces(s.lower())
    # Keep spaces so we capture word-boundary predictability too
    if len(s) < n:
        return []
    return [s[i:i+n] for i in range(len(s) - n + 1)]

def shannon_entropy(counter: Counter) -> float:
    total = sum(counter.values())
    if total == 0:
        return float("nan")
    ent = 0.0
    for c in counter.values():
        p = c / total
        ent -= p * math.log2(p)
    return ent

def char_ngram_entropy(text: str, n: int = 3) -> float:
    grams = char_ngrams(text, n=n)
    if not grams:
        return float("nan")
    cnt = Counter(grams)
    return shannon_entropy(cnt)

def normalize_entropy(ent: float, vocab_size: int) -> float:
    """
    Normalize by maximum possible entropy log2(V).
    Returns value in ~[0,1] if ent finite and V>1.
    """
    if not np.isfinite(ent) or vocab_size <= 1:
        return float("nan")
    return float(ent / math.log2(vocab_size))


## Metric 3: lexical repetition / diversity

- Type-token ratio (TTR): unique_words / total_words (lower -> more repetitive)
- Repeated bigram fraction

In [None]:
def lexical_stats(tokens: List[str]) -> Dict[str, float]:
    n = len(tokens)
    if n == 0:
        return {
            "n_words": 0,
            "ttr": float("nan"),
            "top_word_frac": float("nan"),
            "repeated_bigram_frac": float("nan"),
        }

    counts = Counter(tokens)
    n_types = len(counts)
    ttr = n_types / n

    top_word, top_ct = counts.most_common(1)[0]
    top_word_frac = top_ct / n

    # bigrams
    bigrams = list(zip(tokens, tokens[1:]))
    if len(bigrams) == 0:
        repeated_bigram_frac = float("nan")
    else:
        bigram_counts = Counter(bigrams)
        repeated = sum(ct for bg, ct in bigram_counts.items() if ct >= 2)
        repeated_bigram_frac = repeated / len(bigrams)

    return {
        "n_words": n,
        "ttr": float(ttr),
        "top_word_frac": float(top_word_frac),
        "repeated_bigram_frac": float(repeated_bigram_frac),
    }


## Metric 4: "template-y-ness" (repeated sentence openers)

This is a *delightful* redundancy signal for decodables and scripted curricula.

We extract first 3–5 tokens from each sentence as an "opener".

In [None]:
def sentence_openers(text: str, opener_len: int = 4) -> List[str]:
    sents = split_sentences(text)
    openers = []
    for s in sents:
        toks = tokenize_words(s)
        if len(toks) >= 2:
            openers.append(" ".join(toks[:opener_len]))
    return openers

def opener_stats(openers: List[str]) -> Dict[str, float]:
    if not openers:
        return {
            "n_sentences": 0,
            "opener_unique_frac": float("nan"),
            "top_opener_frac": float("nan"),
        }
    cnt = Counter(openers)
    n = len(openers)
    opener_unique_frac = len(cnt) / n
    top_opener, top_ct = cnt.most_common(1)[0]
    top_opener_frac = top_ct / n
    return {
        "n_sentences": n,
        "opener_unique_frac": float(opener_unique_frac),
        "top_opener_frac": float(top_opener_frac),
    }


## Compute per-chunk metrics

In [None]:
CHAR_N = 3
OPENER_LEN = 4

rows: List[Dict[str, Any]] = []

for r in chunks:
    text = r["text"]
    text_ns = normalize_spaces(text)

    # gzip
    gz_r, gz_comp, gz_raw = gzip_ratio(text_ns)
    redundancy_gz = redundancy_from_gzip_ratio(gz_r)

    # char entropy
    grams = char_ngrams(text_ns, n=CHAR_N)
    ent = shannon_entropy(Counter(grams)) if grams else float("nan")
    ent_norm = normalize_entropy(ent, vocab_size=len(set(grams)) if grams else 0)

    # lexical
    toks = tokenize_words(text_ns)
    lex = lexical_stats(toks)

    # openers
    opens = sentence_openers(text_ns, opener_len=OPENER_LEN)
    op = opener_stats(opens)

    out = {
        "chunk_id": r["chunk_id"],
        "doc_id": r["doc_id"],
        "title": r["title"],
        "chunk_index": int(r["chunk_index"]),
        "chunk_type": r["chunk_type"],
        "n_chars": len(text_ns),
        "gzip_ratio": float(gz_r),
        "gzip_bytes_raw": int(gz_raw),
        "gzip_bytes_comp": int(gz_comp),
        "redundancy_gzip": float(redundancy_gz),  # higher = more redundant
        "char_ngram_n": int(CHAR_N),
        "char_ngram_entropy": float(ent),
        "char_ngram_entropy_norm": float(ent_norm),  # lower = more predictable
        **lex,
        **op,
    }
    rows.append(out)

print("Computed redundancy rows:", len(rows))
print("Example:", {k: rows[0][k] for k in ["title","chunk_index","redundancy_gzip","char_ngram_entropy_norm","ttr","top_opener_frac"]})


## Save per-chunk JSONL

In [None]:
def write_jsonl(path: Path, rows: List[Dict[str, Any]]) -> None:
    with path.open("w", encoding="utf-8") as f:
        for r in rows:
            f.write(json.dumps(r, ensure_ascii=False) + "\n")

write_jsonl(PER_CHUNK_OUT, rows)
print("Wrote:", PER_CHUNK_OUT, f"({PER_CHUNK_OUT.stat().st_size} bytes)")


## Aggregate summaries per document

We summarize redundancy using robust stats.

In [None]:
def finite(arr: List[float]) -> np.ndarray:
    a = np.array(arr, dtype=np.float64)
    return a[np.isfinite(a)]

def summarize(a: np.ndarray) -> Dict[str, float]:
    if a.size == 0:
        return {"mean": float("nan"), "median": float("nan"), "p10": float("nan"), "p90": float("nan")}
    return {
        "mean": float(np.mean(a)),
        "median": float(np.median(a)),
        "p10": float(np.percentile(a, 10)),
        "p90": float(np.percentile(a, 90)),
    }

by_doc = defaultdict(list)
for r in rows:
    by_doc[r["doc_id"]].append(r)

doc_summaries: List[Dict[str, Any]] = []
for doc_id, rs in by_doc.items():
    rs = sorted(rs, key=lambda x: x["chunk_index"])
    title = rs[0]["title"]
    chunk_type = rs[0]["chunk_type"]
    n_chunks = len(rs)

    doc_summaries.append({
        "doc_id": doc_id,
        "title": title,
        "chunk_type": chunk_type,
        "n_chunks": n_chunks,
        "gzip_ratio": summarize(finite([x["gzip_ratio"] for x in rs])),
        "redundancy_gzip": summarize(finite([x["redundancy_gzip"] for x in rs])),
        "char_ngram_entropy_norm": summarize(finite([x["char_ngram_entropy_norm"] for x in rs])),
        "ttr": summarize(finite([x["ttr"] for x in rs])),
        "repeated_bigram_frac": summarize(finite([x["repeated_bigram_frac"] for x in rs])),
        "top_opener_frac": summarize(finite([x["top_opener_frac"] for x in rs])),
        "opener_unique_frac": summarize(finite([x["opener_unique_frac"] for x in rs])),
    })

print("Doc summaries:", len(doc_summaries))
print("Example doc summary keys:", list(doc_summaries[0].keys()))


## Save per-doc summaries

In [None]:
write_jsonl(PER_DOC_OUT, doc_summaries)
print("Wrote:", PER_DOC_OUT, f"({PER_DOC_OUT.stat().st_size} bytes)")


## Fun diagnostics: most redundant chunks (by gzip) + most templated chunks

In [None]:
# Build lookup for preview
text_lookup = {r["chunk_id"]: r["text"] for r in chunks}

def preview(chunk_id: str, n: int = 220) -> str:
    t = text_lookup.get(chunk_id, "").replace("\n", " ")
    return t[:n] + ("..." if len(t) > n else "")

# Most redundant (highest redundancy_gzip)
top_red = sorted(rows, key=lambda r: (-(r["redundancy_gzip"] if np.isfinite(r["redundancy_gzip"]) else -1)))[:8]
print("\nTop redundant chunks (gzip-based):")
for r in top_red:
    print(f"  red_gz={r['redundancy_gzip']:.3f}  gz_ratio={r['gzip_ratio']:.3f}  "
          f"ttr={r['ttr']:.3f}  opener_top={r['top_opener_frac']:.3f}  "
          f"{r['title']} idx={r['chunk_index']}  |  {preview(r['chunk_id'])}")

# Most templated (highest top_opener_frac)
top_temp = sorted(rows, key=lambda r: (-(r["top_opener_frac"] if np.isfinite(r["top_opener_frac"]) else -1)))[:8]
print("\nMost templated chunks (repeated sentence openers):")
for r in top_temp:
    print(f"  opener_top={r['top_opener_frac']:.3f}  opener_unique={r['opener_unique_frac']:.3f}  "
          f"{r['title']} idx={r['chunk_index']}  |  {preview(r['chunk_id'])}")


## Next notebook: 05_contextual_diversity.ipynb

We'll measure whether words/ideas appear in varied contexts across a document/corpus:
- contextual diversity per word
- dispersion of contexts (using LSA embeddings or co-occurrence windows)
- "semantic neighborhoods per 1,000 words"

Then we can unify:
- novelty (03)
- redundancy (04)
- contextual diversity (05)

...into a single "Boredom Report".
