# FIT-EST Quickstart (5 minutes)

> **Python 3.10+ required** (uses `statistics.correlation`). For 3.8/3.9, replace with a manual Pearson on ranks.

This notebook demonstrates an **EST-style workflow** on a tiny system using only the Python standard library.

Goal: show how to go from **(1) selecting a proposition** to **(2) pre-registering an estimator tuple** to **(3) running** and **(4) reporting**:

- **P10 (Estimator Coherence)**: two independent constraint estimators should agree (monotonically / rank-wise).
- **P2 (Late-time Constraint Monotonicity)**: the chosen constraint estimator should have a low violation rate.

System: **1D elementary cellular automaton** (binary string, rule number 0–255).

Constraint estimators (two examples):

- `C_hat_frozen`: fraction of cells that have not changed for `W` steps (windowed "frozen" proxy).
- `C_hat_compress`: compression-based constraint proxy via `zlib`.

We treat this as a miniature version of the Conway/Langton Tier-1 workflow: pre-register first, then run, then report.

**Note on P2**: The original P2 (constraint monotonicity) may show violations in oscillatory systems. FIT v2.4 introduces "task-typed monotonicity" (E1/E2) to handle this. Here we report raw violation rate as a diagnostic, not a strict pass/fail.

In [None]:
from __future__ import annotations

import dataclasses
import math
import random
import statistics
import time
import zlib
from typing import Dict, List, Literal, Tuple

Boundary = Literal["periodic", "fixed_zeros"]


## Step 1) Pre-register (EST-style)

We encode a minimal, machine-checkable pre-registration object.

This is intentionally lightweight: we only enforce a subset of A1/A6-like requirements here.


In [None]:
PREREG: Dict[str, object] = {
    "preregistration": {
        "id": "EST-QUICKSTART-CA1D-P2P10",
        "date_locked": "2025-12-30",
        "researcher": "(fill)",
        "notes": "Quickstart demo: CA1D, P2+P10",
    },
    "system": {
        "name": "CA1D",
        "state": "binary string length N",
        "boundary": "periodic",
        "rule": 30,
        "N": 128,
        "steps": 800,
        "seed": 42,
    },
    "task": {
        "targets": ["P10", "P2"],
        "P10_equivalence": "ordinal",
        "P2_equivalence": "ordinal",
    },
    "estimators": {
        "C_hat_primary": {
            "name": "frozen_fraction",
            "window_W": 5,
            "scope": {"state": "binary string", "boundary": "periodic"},
        },
        "C_hat_alternatives": [
            {
                "name": "compression_ratio_zlib",
                "scope": {"state": "binary string", "boundary": "periodic"},
            }
        ],
    },
    "coherence_gate": {
        "P10": {"metric": "spearman_rho", "threshold": 0.5},
    },
    "criteria": {
        "P2_violation_rate_max": 0.05,
    },
}

PREREG

### Minimal EST audit (tiny subset)

This is not the full A1–A8 automation; it's the minimal "first step" so a new user can run something and get a pass/fail report.


In [None]:
def est_audit_minimal(prereg: Dict[str, object]) -> List[str]:
    missing: List[str] = []

    # A1: scope declaration present
    try:
        scope = prereg["estimators"]["C_hat_primary"]["scope"]  # type: ignore[index]
        if not scope.get("state"):
            missing.append("A1: estimators.C_hat_primary.scope.state missing")
        if not scope.get("boundary"):
            missing.append("A1: estimators.C_hat_primary.scope.boundary missing")
    except Exception:
        missing.append("A1: estimators.C_hat_primary.scope missing")

    # A6: pre-registration lock present
    try:
        locked = prereg["preregistration"]["date_locked"]  # type: ignore[index]
        if not locked:
            missing.append("A6: preregistration.date_locked missing")
    except Exception:
        missing.append("A6: preregistration.date_locked missing")

    # P10 coherence gate threshold declared
    try:
        thr = prereg["coherence_gate"]["P10"]["threshold"]  # type: ignore[index]
        if thr is None:
            missing.append("P10: coherence_gate.P10.threshold missing")
    except Exception:
        missing.append("P10: coherence_gate.P10.threshold missing")

    return missing

issues = est_audit_minimal(PREREG)
print("EST minimal audit:")
if issues:
    for x in issues:
        print("- FAIL:", x)
else:
    print("- PASS (minimal checks)")


## Step 2) Implement the system (CA1D)

Elementary CA update rule (Wolfram code): each cell's next state depends on its 3-bit neighborhood.


In [None]:
def _rule_table(rule: int) -> Dict[int, int]:
    if not (0 <= rule <= 255):
        raise ValueError("rule must be 0..255")
    table: Dict[int, int] = {}
    for neighborhood in range(8):
        table[neighborhood] = (rule >> neighborhood) & 1
    return table


def ca1d_step(state: List[int], rule: int, boundary: Boundary) -> List[int]:
    n = len(state)
    tbl = _rule_table(rule)
    out = [0] * n

    for i in range(n):
        if boundary == "periodic":
            left = state[(i - 1) % n]
            mid = state[i]
            right = state[(i + 1) % n]
        elif boundary == "fixed_zeros":
            left = state[i - 1] if i - 1 >= 0 else 0
            mid = state[i]
            right = state[i + 1] if i + 1 < n else 0
        else:
            raise ValueError(f"unknown boundary: {boundary}")

        key = (left << 2) | (mid << 1) | right
        out[i] = tbl[key]

    return out


def ca1d_init(n: int, seed: int, p: float = 0.5) -> List[int]:
    rng = random.Random(seed)
    return [1 if rng.random() < p else 0 for _ in range(n)]


## Step 3) Implement estimators

### C_hat_frozen (windowed)

For each cell, track the last time it changed. A cell is considered "frozen" at time `t` if it has not changed for at least `W` steps.

### C_hat_compress (zlib)

We compress the binary string as bytes and compute:

```
C_hat_compress = 1 - compressed_size / raw_size
```

This is a crude algorithmic constraint proxy.


In [None]:
def c_hat_frozen_fraction(
    last_change_step: List[int],
    current_step: int,
    window_W: int,
) -> float:
    n = len(last_change_step)
    frozen = 0
    for i in range(n):
        if current_step - last_change_step[i] >= window_W:
            frozen += 1
    return frozen / n


def c_hat_compression_ratio_zlib(state: List[int]) -> float:
    raw = ("".join("1" if b else "0" for b in state)).encode("ascii")
    comp = zlib.compress(raw, level=9)
    if not raw:
        return 0.0
    return 1.0 - (len(comp) / len(raw))


def _ranks(xs: List[float]) -> List[float]:
    # Average ranks for ties.
    idx = sorted(range(len(xs)), key=lambda i: xs[i])
    ranks = [0.0] * len(xs)
    i = 0
    while i < len(xs):
        j = i
        while j + 1 < len(xs) and xs[idx[j + 1]] == xs[idx[i]]:
            j += 1
        avg = (i + j) / 2.0 + 1.0  # 1-based rank
        for k in range(i, j + 1):
            ranks[idx[k]] = avg
        i = j + 1
    return ranks


def spearman_rho(xs: List[float], ys: List[float]) -> float:
    if len(xs) != len(ys):
        raise ValueError("length mismatch")
    if len(xs) < 3:
        return float("nan")
    rx = _ranks(xs)
    ry = _ranks(ys)
    if len(set(rx)) < 2 or len(set(ry)) < 2:
        return float("nan")
    try:
        return statistics.correlation(rx, ry)
    except statistics.StatisticsError:
        return float("nan")


## Step 4) Run the experiment

We run CA1D and compute time series for both estimators.

Then we report:

- P10: Spearman rank correlation between `C_hat_frozen` and `C_hat_compress`
- P2: violation rate of non-decrease under the *primary* estimator


In [None]:
@dataclasses.dataclass
class RunResult:
    c_frozen: List[float]
    c_compress: List[float]


def run_ca1d(prereg: Dict[str, object]) -> RunResult:
    sys = prereg["system"]  # type: ignore[index]
    rule = int(sys["rule"])  # type: ignore[index]
    boundary: Boundary = sys["boundary"]  # type: ignore[assignment]
    n = int(sys["N"])  # type: ignore[index]
    steps = int(sys["steps"])  # type: ignore[index]
    seed = int(sys["seed"])  # type: ignore[index]

    W = int(prereg["estimators"]["C_hat_primary"]["window_W"])  # type: ignore[index]

    state = ca1d_init(n=n, seed=seed, p=0.5)
    last_change_step = [0] * n

    c_frozen: List[float] = []
    c_compress: List[float] = []

    prev = state
    for t in range(1, steps + 1):
        state = ca1d_step(prev, rule=rule, boundary=boundary)
        for i in range(n):
            if state[i] != prev[i]:
                last_change_step[i] = t

        c_frozen.append(c_hat_frozen_fraction(last_change_step, current_step=t, window_W=W))
        c_compress.append(c_hat_compression_ratio_zlib(state))

        prev = state

    return RunResult(c_frozen=c_frozen, c_compress=c_compress)


result = run_ca1d(PREREG)
len(result.c_frozen), len(result.c_compress)

In [None]:
rho = spearman_rho(result.c_frozen, result.c_compress)
rho_min = float(PREREG["coherence_gate"]["P10"]["threshold"])  # type: ignore[index]
p10_supported = (not math.isnan(rho)) and (rho >= rho_min)

def violation_rate_non_decrease(xs: List[float]) -> float:
    if len(xs) < 2:
        return float("nan")
    v = 0
    for i in range(len(xs) - 1):
        if xs[i + 1] < xs[i]:
            v += 1
    return v / (len(xs) - 1)


vr = violation_rate_non_decrease(result.c_frozen)
vr_max = float(PREREG["criteria"]["P2_violation_rate_max"])  # type: ignore[index]
p2_supported = vr <= vr_max

print("Results (single run):")
print(f"- P10 coherence: spearman rho = {rho:.3f} (threshold {rho_min}) -> {'SUPPORTED' if p10_supported else 'CHALLENGED'}")
print(f"- P2 monotonicity: violation rate = {vr:.3%} (max {vr_max:.3%}) -> {'SUPPORTED' if p2_supported else 'CHALLENGED'}")
print("  (Note: P2 violations in oscillatory CA are expected; see E1/E2 task-typed monotonicity)")

print("\nSanity snapshot:")
print("- C_hat_frozen tail:", [round(x, 3) for x in result.c_frozen[-10:]])
print("- C_hat_compress tail:", [round(x, 3) for x in result.c_compress[-10:]])

print("\n" + "="*60)
print("✓ You have completed the 5-minute EST quickstart.")
print("  See 'Extensions' below for optional next steps.")
print("="*60)

## Step 5) Minimal report object

In a real workflow, you would write this to a `results/*.yaml` file.

Key point: **report both P2 and P10**, because P10 is the estimator-quality gate.


In [None]:
REPORT = {
    "preregistration_id": PREREG["preregistration"]["id"],  # type: ignore[index]
    "system": PREREG["system"],
    "estimators": PREREG["estimators"],
    "results": {
        "P10": {
            "rho": rho,
            "threshold": rho_min,
            "status": "SUPPORTED" if p10_supported else "CHALLENGED",
        },
        "P2": {
            "violation_rate": vr,
            "threshold": vr_max,
            "status": "SUPPORTED" if p2_supported else "CHALLENGED",
        },
    },
}

REPORT

## Extensions (optional)

If you want to go beyond the 5-minute demo:

1. Run multiple seeds, report a distribution for P10 and P2.
2. Compare boundaries (`periodic` vs `fixed_zeros`) and treat the boundary as a constraint ("boundary = constraint").
3. Add a third estimator and move from one-off P10 to a family-level coherence gate.


## Extension A: Boundary = Constraint (FIT v2.4 core insight)

The Langton Ant experiments showed that **boundary conditions are not implementation details—they are part of the constraint structure $C$**. Changing the boundary changes the reachable endpoints.

Let's demonstrate this by running the same CA with `fixed_zeros` boundary instead of `periodic`:

In [None]:
# Run with fixed_zeros boundary (same seed, same rule)
PREREG_FIXED = {**PREREG}
PREREG_FIXED["system"] = {**PREREG["system"], "boundary": "fixed_zeros"}  # type: ignore[index]

result_fixed = run_ca1d(PREREG_FIXED)

rho_fixed = spearman_rho(result_fixed.c_frozen, result_fixed.c_compress)
vr_fixed = violation_rate_non_decrease(result_fixed.c_frozen)

print("Boundary comparison (same seed, same rule):")
print(f"\n  periodic:    P10 rho = {rho:.3f}, P2 violation = {vr:.3%}")
print(f"  fixed_zeros: P10 rho = {rho_fixed:.3f}, P2 violation = {vr_fixed:.3%}")

print(f"\n  C_hat_frozen final (periodic):    {result.c_frozen[-1]:.3f}")
print(f"  C_hat_frozen final (fixed_zeros): {result_fixed.c_frozen[-1]:.3f}")

if abs(result.c_frozen[-1] - result_fixed.c_frozen[-1]) > 0.05:
    print("\n  -> Different boundaries lead to different constraint endpoints.")
    print("     This supports 'boundary = constraint' (FIT v2.4).")
else:
    print("\n  -> Similar endpoints under different boundaries (may need longer runs).")