<a href="https://colab.research.google.com/github/mahb97/fw-vs-ulysses-zipf/blob/main/prepare_texts.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Setup and drive
from pathlib import Path
import sys, os, re, json, hashlib, unicodedata, datetime as dt

def echo(msg):
    print(msg, flush=True)

IN_COLAB = False
try:
    import google.colab  # type: ignore
    IN_COLAB = True
except Exception:
    pass

if IN_COLAB:
    from google.colab import drive, files
    echo("Detected Colab ✓  Mounting Drive…")
    drive.mount('/content/drive', force_remount=True)
    ROOT = Path("/content/drive/MyDrive/zipf_joyce").resolve()
else:
    echo("Not in Colab (running locally). Writing to ./zipf_joyce_local/")
    ROOT = (Path.cwd() / "zipf_joyce_local").resolve()

DATA_RAW  = ROOT / "data" / "raw"
DATA_PROC = ROOT / "data" / "processed"
META      = ROOT / "data" / "metadata"
RESULTS   = ROOT / "results"
for p in (DATA_RAW, DATA_PROC, META, RESULTS):
    p.mkdir(parents=True, exist_ok=True)

echo(f"ROOT        : {ROOT}")
echo(f"DATA_RAW    : {DATA_RAW}")
echo(f"DATA_PROC   : {DATA_PROC}")
echo(f"META        : {META}")
echo(f"RESULTS     : {RESULTS}")
echo(f"Existing RAW files: {[p.name for p in DATA_RAW.glob('*')]}")

TEST_FILE = RESULTS / "_write_test.txt"
TEST_FILE.write_text("ok", encoding="utf-8")
echo(f"Write test ✓ -> {TEST_FILE}")


Detected Colab ✓  Mounting Drive…
Mounted at /content/drive
ROOT        : /content/drive/MyDrive/zipf_joyce
DATA_RAW    : /content/drive/MyDrive/zipf_joyce/data/raw
DATA_PROC   : /content/drive/MyDrive/zipf_joyce/data/processed
META        : /content/drive/MyDrive/zipf_joyce/data/metadata
RESULTS     : /content/drive/MyDrive/zipf_joyce/results
Existing RAW files: ['fw.txt', '.ipynb_checkpoints', 'ulysses.txt']
Write test ✓ -> /content/drive/MyDrive/zipf_joyce/results/_write_test.txt


In [2]:
def ensure_sources(verbose=True):
    needed = []
    # accepts fw.txt/fw.xml/fw.tei and same for ulysses
    candidates = {
        "fw":      [DATA_RAW/"fw.txt", DATA_RAW/"fw.xml", DATA_RAW/"fw.tei"],
        "ulysses": [DATA_RAW/"ulysses.txt", DATA_RAW/"ulysses.xml", DATA_RAW/"ulysses.tei"],
    }
    have = {}
    for key, opts in candidates.items():
        found = next((p for p in opts if p.exists()), None)
        have[key] = found
        if not found:
            needed.append(key)
    if verbose:
        echo(f"Source detection → fw={have['fw']}, ulysses={have['ulysses']}")

    if needed and IN_COLAB:
        echo("Missing sources. Prompting for upload…")
        uploaded = files.upload()  # UI prompt
        for fname, data in uploaded.items():
            out = DATA_RAW / fname
            with open(out, "wb") as f:
                f.write(data)
            echo(f"Saved upload → {out}")
        # re-check
        return ensure_sources(verbose=verbose)
    elif needed and not IN_COLAB:
        raise FileNotFoundError(
            f"Missing: {needed}. Place files in {DATA_RAW}. "
            "Accepted names: fw.txt/.xml/.tei and ulysses.txt/.xml/.tei"
        )
    return have

have = ensure_sources(verbose=True)
echo(f"RAW after ensure_sources: {[p.name for p in DATA_RAW.glob('*')]}")

# print sizes to confirm non-empty
for k, p in have.items():
    if p:
        echo(f"{k} → {p.name}  size={p.stat().st_size:,} bytes")
        if p.stat().st_size == 0:
            raise ValueError(f"{p} is empty.")

Source detection → fw=/content/drive/MyDrive/zipf_joyce/data/raw/fw.txt, ulysses=/content/drive/MyDrive/zipf_joyce/data/raw/ulysses.txt
RAW after ensure_sources: ['fw.txt', '.ipynb_checkpoints', 'ulysses.txt']
fw → fw.txt  size=1,391,007 bytes
ulysses → ulysses.txt  size=1,585,349 bytes


In [5]:
import datetime
import unicodedata as _ud
import re as _re
from pathlib import Path

def clean_text_base(s: str, nf="NFKC", lowercase=False) -> str:
    s = s.replace("\r\n","\n").replace("\r","\n")
    s = _ud.normalize(nf, s)
    s = _re.sub(r"[ \t]+"," ", s)
    s = _re.sub(r"\n{3,}","\n\n", s)
    if lowercase:
        s = s.lower()
    return s.strip()

In [7]:
import re, unicodedata as _ud, datetime, json
from pathlib import Path

START_MARKERS = [
    "* A Distributed Proofreaders Canada eBook *",
]
END_MARKERS = [
    "*** END OF THIS PROJECT GUTENBERG EBOOK",
    "***END OF THIS PROJECT GUTENBERG EBOOK",
    "end of the project gutenberg ebook",
    "end of this project gutenberg ebook",
    "end of project gutenberg",
]

def robust_guten_strip(s: str) -> str:
    if s and s[0] == "\ufeff":
        s = s[1:]
    s = s.replace("\r\n","\n").replace("\r","\n")
    low = s.lower()

    starts = []
    for m in START_MARKERS:
        i = low.find(m)
        if i != -1:
            j = low.find("\n", i)
            starts.append(j+1 if j != -1 else i)
    start_cut = max(starts) if starts else 0
    ends = []
    for m in END_MARKERS:
        k = low.rfind(m)
        if k != -1:
            ends.append(k)
    end_cut = min(ends) if ends else len(s)

    core = s[start_cut:end_cut]

    # light tidy
    core = _ud.normalize("NFKC", core)
    core = re.sub(r"[ \t]+"," ", core)
    core = re.sub(r"\n{3,}", "\n\n", core)
    return core.strip()

def re_clean_file(proc_path: Path):
    raw = (Path("/content/drive/MyDrive/zipf_joyce/data/raw") / proc_path.name) if str(proc_path).startswith("/content/drive") else proc_path
    if not raw.exists():
        raw = DATA_RAW / proc_path.name
    text = raw.read_text(encoding="utf-8", errors="ignore")
    cleaned = robust_guten_strip(text)
    # keep your existing extra tidies
    cleaned = re.sub(r"-\n(?=[a-z])","", cleaned)
    cleaned = re.sub(r"[ \t]+"," ", cleaned)
    cleaned = re.sub(r"\n{3,}","\n\n", cleaned).strip()

    proc_path.write_text(cleaned, encoding="utf-8")
    meta = {
        "name": proc_path.stem,
        "output_file": str(proc_path),
        "length_chars": len(cleaned),
        "length_lines": cleaned.count("\n")+1,
        "timestamp_utc": datetime.datetime.utcnow().isoformat(timespec="seconds")+"Z",
        "note": "Re-cleaned with robust Gutenberg strip",
    }
    (META / f"{proc_path.stem}.json").write_text(json.dumps(meta, indent=2), encoding="utf-8")

    lines = cleaned.splitlines()
    print(f"[{proc_path.stem}] chars={len(cleaned):,}  lines={len(lines):,}")
    print(f"[{proc_path.stem}] preview (first 8 lines):")
    for ln in lines[:8]:
        print(ln)

fw_out = DATA_PROC / "fw.txt"
ul_out = DATA_PROC / "ulysses.txt"
print("Re-cleaning processed texts with robust Gutenberg stripper…")
re_clean_file(fw_out)
print()
re_clean_file(ul_out)
print("\n Re-clean complete (headers/footers should be gone).")


Re-cleaning processed texts with robust Gutenberg stripper…
[fw] chars=1,304,732  lines=24,276
[fw] preview (first 8 lines):
Finnegans Wake by James Joyce 

riverrun, past Eve and Adam's, from swerve of shore to bend
of bay, brings us by a commodius vicus of recirculation back to
Howth Castle and Environs.

Sir Tristram, violer d'amores, fr'over the short sea, had passencore
rearrived from North Armorica on this side the scraggy



  "timestamp_utc": datetime.datetime.utcnow().isoformat(timespec="seconds")+"Z",


[ulysses] chars=1,516,830  lines=32,624
[ulysses] preview (first 8 lines):
Ulysses

by James Joyce

Stately, plump Buck Mulligan came from the stairhead, bearing a bowl of
lather on which a mirror and a razor lay crossed. A yellow
dressinggown, ungirdled, was sustained gently behind him on the mild
morning air. He held the bowl aloft and intoned:

 Re-clean complete (headers/footers should be gone).


In [8]:
import regex as re
import pandas as pd
from collections import Counter

RESULTS.mkdir(parents=True, exist_ok=True)
TABLES = ROOT / "data" / "tables"
TABLES.mkdir(parents=True, exist_ok=True)

WORD_RE = re.compile(r"\p{L}[\p{L}\p{M}\p{Pd}\p{Pc}\']*")

def tokenize_basic(text: str, lower=True, split_hyphens=True):
    s = text
    if lower:
        s = s.lower()
    toks = WORD_RE.findall(s)
    if split_hyphens:
        tt = []
        for t in toks:
            tt.extend(t.split("-"))
        toks = [t for t in tt if t]
    return toks

def stats_and_export(name: str, path: Path, config_label="canonical"):
    txt = path.read_text(encoding="utf-8", errors="ignore")
    toks = tokenize_basic(txt, lower=True, split_hyphens=True)
    counts = Counter(toks)
    types = len(counts)
    tokens = len(toks)
    hapax = sum(1 for c in counts.values() if c == 1)
    hapax_share = hapax / types if types else 0.0

    print(f"\n— {name} · {config_label} —")
    print(f"tokens={tokens:,}  types={types:,}  hapax={hapax:,} ({hapax_share:.2%})")
    print("top 15:")
    for w,c in counts.most_common(15):
        print(f"{w:>15}  {c:,}")

    # export full type-count table
    df = pd.DataFrame({"type": list(counts.keys()), "count": list(counts.values())})
    df = df.sort_values("count", ascending=False).reset_index(drop=True)
    out_csv = TABLES / f"{name}_{config_label}_type_counts.csv"
    df.to_csv(out_csv, index=False)
    print("wrote:", out_csv)

    # export compact rank–freq table (rank, freq)
    freqs = sorted(counts.values(), reverse=True)
    rf = pd.DataFrame({"rank": range(1, len(freqs)+1), "freq": freqs})
    rf_out = TABLES / f"{name}_{config_label}_rankfreq.csv"
    rf.to_csv(rf_out, index=False)
    print("wrote:", rf_out)

# Run for FW + Ulysses
stats_and_export("fw", DATA_PROC / "fw.txt", config_label="canonical")
stats_and_export("ulysses", DATA_PROC / "ulysses.txt", config_label="canonical")

print("\n Token stats + exports ready (type_counts & rankfreq CSVs).")


— fw · canonical —
tokens=219,320  types=58,083  hapax=46,272 (79.67%)
top 15:
            the  12,056
            and  8,507
             of  7,115
              a  4,584
             to  4,485
             in  3,605
            his  2,958
            for  2,425
           with  2,057
             he  1,832
           that  1,813
            you  1,737
             as  1,693
             on  1,622
              i  1,585
wrote: /content/drive/MyDrive/zipf_joyce/data/tables/fw_canonical_type_counts.csv
wrote: /content/drive/MyDrive/zipf_joyce/data/tables/fw_canonical_rankfreq.csv

— ulysses · canonical —
tokens=268,690  types=29,375  hapax=16,053 (54.65%)
top 15:
            the  14,950
             of  8,141
            and  7,212
              a  6,515
             to  4,960
             in  4,945
             he  4,223
            his  3,330
              i  2,995
              s  2,821
           that  2,780
           with  2,514
             it  2,511
            was  2,133
     

In [9]:
# Canonical Zipf fit (discrete MLE + KS xmin + bootstrap)
from pathlib import Path
import numpy as np, pandas as pd, math, json
import mpmath as mp
from scipy.optimize import minimize_scalar
from numpy.random import default_rng

# Paths
IN_COLAB = False
try:
    import google.colab
    IN_COLAB = True
except Exception:
    pass
ROOT = Path("/content/drive/MyDrive/zipf_joyce") if IN_COLAB else (Path.cwd()/"zipf_joyce_local")
TABLES = (ROOT/"data"/"tables"); TABLES.mkdir(parents=True, exist_ok=True)
RESULTS = (ROOT/"results"/"tables"); RESULTS.mkdir(parents=True, exist_ok=True)

# Load canonical rankfreq or type_counts -> counts vector
def load_counts(name: str) -> np.ndarray:
    p = TABLES/f"{name}_canonical_type_counts.csv"
    df = pd.read_csv(p)
    return df["count"].to_numpy(dtype=int)

# Discrete power law pieces
mp.mp.dps = 50
def hurwitz_zeta(alpha: float, xmin: int) -> float:
    return float(mp.zeta(alpha, xmin))

def loglik_discrete_powerlaw(xs: np.ndarray, alpha: float, xmin: int) -> float:
    if alpha <= 1: return -np.inf
    Z = hurwitz_zeta(alpha, xmin)
    if not np.isfinite(Z) or Z <= 0: return -np.inf
    return - xs.size * math.log(Z) - alpha * np.log(xs).sum()

def mle_alpha(xs: np.ndarray, xmin: int) -> float:
    def nll(a): return -loglik_discrete_powerlaw(xs, a, xmin)
    res = minimize_scalar(nll, bounds=(1.0001, 5.0), method="bounded")
    return float(res.x)

def cdf_discrete_powerlaw(alpha: float, xmin: int, xmax: int) -> np.ndarray:
    ks = np.arange(xmin, xmax+1, dtype=int)
    w = ks.astype(float)**(-alpha)
    W = w.cumsum()
    return W / W[-1]

def ks_statistic(xs: np.ndarray, alpha: float, xmin: int) -> float:
    xs = np.sort(xs)
    uniq, counts = np.unique(xs, return_counts=True)
    ecdf = np.cumsum(counts) / xs.size
    F = cdf_discrete_powerlaw(alpha, xmin, uniq.max())
    model = F[uniq - xmin]
    return float(np.max(np.abs(ecdf - model)))

def choose_xmin_by_ks(counts: np.ndarray, xmin_min: int = 2, max_unique: int = 600):
    u = np.unique(counts)
    u = u[u >= xmin_min]
    u = u[:max_unique]
    best = (None, None, np.inf)
    for xm in u:
        tail = counts[counts >= xm]
        if tail.size < 100:  # stability guard
            continue
        a = mle_alpha(tail, xm)
        D = ks_statistic(tail, a, xm)
        if D < best[2]:
            best = (xm, a, D)
    if best[0] is None:
        raise ValueError("No valid x_min; tail too small. Try lowering threshold or check tokenisation.")
    return best  # (xmin, alpha, KS D)

def ks_pvalue_parametric(counts: np.ndarray, alpha: float, xmin: int, B: int = 300, seed: int = 42) -> float:
    rng = default_rng(seed)
    tail = counts[counts >= xmin]
    n = tail.size
    D_obs = ks_statistic(tail, alpha, xmin)
    # build pmf over a truncated support
    K = max(int(tail.max()*3), xmin+1500)
    ks = np.arange(xmin, K+1, dtype=int)
    w = ks.astype(float)**(-alpha); pmf = w / w.sum()
    ge = 0
    for _ in range(B):
        sim = rng.choice(ks, size=n, replace=True, p=pmf)
        a_sim = mle_alpha(sim, xmin)
        D_sim = ks_statistic(sim, a_sim, xmin)
        if D_sim >= D_obs:
            ge += 1
    return (ge + 1) / (B + 1)

def bootstrap_alpha_ci(counts: np.ndarray, xmin: int, B: int = 300, seed: int = 7):
    rng = default_rng(seed)
    tail = counts[counts >= xmin]
    n = tail.size
    alphas = np.empty(B, float)
    for b in range(B):
        samp = rng.choice(tail, size=n, replace=True)
        alphas[b] = mle_alpha(samp, xmin)
    return float(np.percentile(alphas, 2.5)), float(np.percentile(alphas, 97.5))

def run_fit(name: str):
    counts = load_counts(name)
    print(f"\n=== {name.upper()} (canonical) ===")
    print(f"types={len(counts):,}")
    xmin, alpha, D = choose_xmin_by_ks(counts, xmin_min=2, max_unique=600)
    n_tail = int((counts >= xmin).sum())
    print(f"x_min={xmin}  alpha={alpha:.4f}  KS D={D:.4f}  tail_n={n_tail:,}")
    p = ks_pvalue_parametric(counts, alpha, xmin, B=300, seed=123)
    ci_lo, ci_hi = bootstrap_alpha_ci(counts, xmin, B=300, seed=321)
    print(f"KS p={p:.3f}  alpha 95% CI=({ci_lo:.3f}, {ci_hi:.3f})")
    return dict(text=name, pipeline="canonical", x_min=xmin, alpha=alpha, ks_D=D, ks_p=p,
                alpha_ci_low=ci_lo, alpha_ci_high=ci_hi, n_tail=n_tail, types=len(counts))

rows = []
for name in ["fw", "ulysses"]:
    rows.append(run_fit(name))

headline = pd.DataFrame(rows)
out = RESULTS/"headline_stats_canonical.csv"
headline.to_csv(out, index=False)
print("\nSaved:", out, "size:", out.stat().st_size, "bytes")
headline


=== FW (canonical) ===
types=58,083
x_min=7  alpha=2.0111  KS D=0.0167  tail_n=2,271
KS p=0.140  alpha 95% CI=(1.971, 2.057)

=== ULYSSES (canonical) ===
types=29,375
x_min=5  alpha=1.9454  KS D=0.0063  tail_n=5,212
KS p=0.761  alpha 95% CI=(1.924, 1.972)

Saved: /content/drive/MyDrive/zipf_joyce/results/tables/headline_stats_canonical.csv size: 326 bytes


Unnamed: 0,text,pipeline,x_min,alpha,ks_D,ks_p,alpha_ci_low,alpha_ci_high,n_tail,types
0,fw,canonical,7,2.011055,0.016734,0.139535,1.971402,2.056897,2271,58083
1,ulysses,canonical,5,1.945432,0.006285,0.760797,1.923838,1.972184,5212,29375
