In [2]:
# ==============================================================================
# V2 CELL 1 (PATCHED) — PYMATCHING V2: SPARSE MATRIX COMPILATION
# Run this as the FIRST cell in your clean "V2" Notebook
# ==============================================================================

import subprocess
import sys

# 1. Install required packages silently if missing
try:
    import pymatching
    import scipy
except ImportError:
    print("Installing pymatching and scipy...")
    subprocess.run([sys.executable, "-m", "pip", "install", "-q", "pymatching", "scipy"], check=True)
    import pymatching
    import scipy

import math
import numpy as np
import scipy.sparse as sp
import concurrent.futures
import multiprocessing
import time
import os
import json
from datetime import datetime

print("="*70)
print("V2 PHYSICS ENGINE: SPARSE MATRIX PYMATCHING COMPILATION")
print("="*70)

def build_matrices(d, p):
    """
    Builds the Parity Check Matrix (H) and Logical Observable Matrix (L)
    using exact Mod-2 Sparse Matrices. This perfectly mirrors the friend's V1 topology.
    """
    H_rows, H_cols = [], []
    L_cols = []
    weights, probs = [], []
    edge_idx = 0

    # Logical cut: vertical cut down the middle
    j_cut = d // 2
    if j_cut + 1 >= d: j_cut = max(0, d // 2 - 1)

    # MWPM log-odds weights: log((1-p)/p)
    w = math.log((1-p)/p) if 0.0 < p < 0.5 else 1e-5

    for r in range(d):
        for c in range(d):
            u = r * d + c

            # 1. Horizontal edges (Data qubits)
            if c + 1 < d:
                v = r * d + (c + 1)
                H_rows.extend([u, v])
                H_cols.extend([edge_idx, edge_idx])
                weights.append(w); probs.append(p)

                # If this edge crosses the logical cut, mark it in L
                if c == j_cut:
                    L_cols.append(edge_idx)
                edge_idx += 1

            # 2. Vertical edges (Data qubits)
            if r + 1 < d:
                v = (r + 1) * d + c
                H_rows.extend([u, v])
                H_cols.extend([edge_idx, edge_idx])
                weights.append(w); probs.append(p)
                edge_idx += 1

            # 3. Left boundary edges (Perfect Virtual Shadows)
            # (Only one 1 in the column -> PyMatching natively connects it to the boundary!)
            if c == 0:
                H_rows.append(u)
                H_cols.append(edge_idx)
                weights.append(0.0); probs.append(0.0) # Cost to exit boundary is 0
                edge_idx += 1

            # 4. Right boundary edges
            if c == d - 1:
                H_rows.append(u)
                H_cols.append(edge_idx)
                weights.append(0.0); probs.append(0.0) # Cost to exit boundary is 0
                edge_idx += 1

    # Compile SciPy Sparse Matrices
    H = sp.csc_matrix(([1]*len(H_rows), (H_rows, H_cols)), shape=(d*d, edge_idx), dtype=np.uint8)
    L = sp.csc_matrix(([1]*len(L_cols), ([0]*len(L_cols), L_cols)), shape=(1, edge_idx), dtype=np.uint8)

    return H, L, np.array(weights), np.array(probs)

def wilson_ci(k, n, z=1.96):
    if n == 0: return (0.0, 0.0)
    phat = k / n
    denom = 1.0 + (z*z)/n
    center = (phat + (z*z)/(2*n)) / denom
    half = (z * math.sqrt((phat*(1-phat) + (z*z)/(4*n)) / n)) / denom
    return (max(0.0, center - half), min(1.0, center + half))

def run_pymatching_chunk(args):
    d, p, trials, seed = args
    rng = np.random.default_rng(seed)

    # Build Sparse Matrices
    H, L, weights, probs = build_matrices(d, p)
    num_edges = len(probs)

    # Initialize the C++ Matcher directly from the Sparse Check Matrix
    matcher = pymatching.Matching.from_check_matrix(H, weights=weights, faults_matrix=L)

    # 1. Vectorized Noise Generation (Generates thousands of trials instantly)
    noise_matrix = (rng.random((trials, num_edges)) < probs).astype(np.uint8)

    # 2. Fast Mod-2 Syndrome & Observable Generation (Sparse Matrix Math)
    H_csr = H.tocsr()
    L_csr = L.tocsr()

    syndromes = (H_csr @ noise_matrix.T).T % 2
    actual_observables = (L_csr @ noise_matrix.T).T % 2

    syndromes = np.asarray(syndromes, dtype=np.uint8)
    actual_observables = np.asarray(actual_observables, dtype=np.uint8)

    # 3. C++ Batch Decode (Takes the syndromes and natively spits out the logical flips)
    predicted_observables = matcher.decode_batch(syndromes)

    # 4. Compare Predictions to Ground Truth (Logical Failures)
    failures = np.sum(predicted_observables[:, 0] != actual_observables[:, 0])

    return failures

if __name__ == "__main__":
    NUM_CORES = min(44, multiprocessing.cpu_count())
    print(f"Detected {NUM_CORES} CPU Cores. Initializing Multiprocessing Pool...")

    # Pushing distance to 15 because we finally have the horsepower.
    distances = [7, 9, 11, 15]
    p_grid = [0.08, 0.09, 0.10, 0.103, 0.11, 0.12]

    # Running 20,000 trials per point.
    total_trials = 20000

    results = {}
    t0 = time.time()

    with concurrent.futures.ProcessPoolExecutor(max_workers=NUM_CORES) as executor:
        for d in distances:
            results[d] = {}
            for p in p_grid:
                chunk_size = max(1, total_trials // NUM_CORES)
                actual_trials = chunk_size * NUM_CORES
                tasks = [(d, p, chunk_size, hash((d, p, i, 42)) % (2**32)) for i in range(NUM_CORES)]

                # Execute mapped jobs across all cores
                failures = sum(executor.map(run_pymatching_chunk, tasks))

                rate = failures / actual_trials
                lo, hi = wilson_ci(failures, actual_trials)
                results[d][p] = {"rate": rate, "ci95": (lo, hi), "k": failures, "n": actual_trials}

            print(f"Completed d={d:>2} sweep.")

    elapsed = time.time() - t0

    print("\n" + "="*70)
    header = "d \\ p | " + " | ".join([f"{p:>10.3f}" for p in p_grid])
    print(header)
    print("-" * len(header))
    for d in distances:
        row = []
        for p in p_grid:
            r = results[d][p]["rate"]
            lo, hi = results[d][p]["ci95"]
            row.append(f"{r:.4f}[{lo:.3f},{hi:.3f}]")
        print(f"{d:>5} | " + " | ".join(row))

    total_decodes = len(distances) * len(p_grid) * actual_trials
    print("="*70)
    print(f"Total Decodes Performed: {total_decodes:,}")
    print(f"Total Elapsed Time:      {elapsed:.2f} seconds")
    print(f"Engine Speed:            {total_decodes/elapsed:,.0f} decodes / sec")
    print("="*70)

    BASE_DIR = "/content/v2_shadowmap"
    os.makedirs(BASE_DIR, exist_ok=True)
    run_id = datetime.now().strftime("%Y%m%dT%H%M%S")
    with open(os.path.join(BASE_DIR, f"v2_pymatching_sweep_{run_id}.json"), "w") as f:
        json.dump(results, f, indent=2)

V2 PHYSICS ENGINE: SPARSE MATRIX PYMATCHING COMPILATION
Detected 8 CPU Cores. Initializing Multiprocessing Pool...
Completed d= 7 sweep.
Completed d= 9 sweep.
Completed d=11 sweep.
Completed d=15 sweep.

d \ p |      0.080 |      0.090 |      0.100 |      0.103 |      0.110 |      0.120
-----------------------------------------------------------------------------------
    7 | 0.0870[0.083,0.091] | 0.1168[0.112,0.121] | 0.1557[0.151,0.161] | 0.1644[0.159,0.170] | 0.1942[0.189,0.200] | 0.2382[0.232,0.244]
    9 | 0.0733[0.070,0.077] | 0.1054[0.101,0.110] | 0.1519[0.147,0.157] | 0.1659[0.161,0.171] | 0.1985[0.193,0.204] | 0.2489[0.243,0.255]
   11 | 0.0649[0.062,0.068] | 0.1023[0.098,0.107] | 0.1493[0.144,0.154] | 0.1609[0.156,0.166] | 0.2011[0.196,0.207] | 0.2510[0.245,0.257]
   15 | 0.0464[0.044,0.049] | 0.0859[0.082,0.090] | 0.1435[0.139,0.148] | 0.1626[0.158,0.168] | 0.2054[0.200,0.211] | 0.2782[0.272,0.284]
Total Decodes Performed: 480,000
Total Elapsed Time:      2.24 seconds
Engin

TypeError: Object of type int64 is not JSON serializable

In [3]:
# ==============================================================================
# V2 CELL 1B — JSON FIX
# Run BELOW V2 CELL 1
# ==============================================================================
import json, os
from datetime import datetime

# Convert NumPy int64s/float64s to native Python types for JSON serialization
clean_results = {}
for d in results:
    clean_results[int(d)] = {}
    for p in results[d]:
        clean_results[int(d)][float(p)] = {
            "rate": float(results[d][p]["rate"]),
            "ci95": (float(results[d][p]["ci95"][0]), float(results[d][p]["ci95"][1])),
            "k": int(results[d][p]["k"]),
            "n": int(results[d][p]["n"])
        }

BASE_DIR = "/content/v2_shadowmap"
os.makedirs(BASE_DIR, exist_ok=True)
run_id = datetime.now().strftime("%Y%m%dT%H%M%S")
with open(os.path.join(BASE_DIR, f"v2_pymatching_sweep_{run_id}.json"), "w") as f:
    json.dump(clean_results, f, indent=2)
print("JSON successfully saved! The V2 Engine is secure.")

JSON successfully saved! The V2 Engine is secure.


In [4]:
# ==============================================================================
# V2 CELL 2 — STATISTICAL ANNIHILATION: 100,000-TRIAL CPS OPTIMIZER
# Run BELOW V2 Cell 1B
# ==============================================================================

import pymatching
import scipy.sparse as sp
import scipy.stats
import numpy as np
import random, os, json, time, math
from datetime import datetime

print("="*70)
print("V2 CELL 2: 100,000-TRIAL HARDWARE/SOFTWARE CO-DESIGN")
print("="*70)

# ------------------------------------------------------------------------------
# 1. SPARSE TOPOLOGY GENERATOR
# ------------------------------------------------------------------------------
d = 9          # Upping distance to 9 for harder physics
p_V = 0.12     # High Vertical Error
p_H = 0.04     # Low Horizontal Error
trials = 100000

print(f"Building topological matrices for d={d}...")
H_rows, H_cols, L_cols, probs = [], [], [], []
edge_types = []
edge_idx = 0
j_cut = d // 2
if j_cut + 1 >= d: j_cut = max(0, d // 2 - 1)

for r in range(d):
    for c in range(d):
        u = r * d + c

        # Horizontal Edges (p_H)
        if c + 1 < d:
            H_rows.extend([u, r * d + (c + 1)]); H_cols.extend([edge_idx, edge_idx])
            probs.append(p_H); edge_types.append('H')
            if c == j_cut: L_cols.append(edge_idx)
            edge_idx += 1

        # Vertical Edges (p_V)
        if r + 1 < d:
            H_rows.extend([u, (r + 1) * d + c]); H_cols.extend([edge_idx, edge_idx])
            probs.append(p_V); edge_types.append('V')
            edge_idx += 1

        # Left Boundary (Perfect Virtual Shadow)
        if c == 0:
            H_rows.append(u); H_cols.append(edge_idx)
            probs.append(0.0); edge_types.append('B')
            edge_idx += 1

        # Right Boundary (Perfect Virtual Shadow)
        if c == d - 1:
            H_rows.append(u); H_cols.append(edge_idx)
            probs.append(0.0); edge_types.append('B')
            edge_idx += 1

H = sp.csc_matrix(([1]*len(H_rows), (H_rows, H_cols)), shape=(d*d, edge_idx), dtype=np.uint8)
L = sp.csc_matrix(([1]*len(L_cols), ([0]*len(L_cols), L_cols)), shape=(1, edge_idx), dtype=np.uint8)
probs = np.array(probs)

# ------------------------------------------------------------------------------
# 2. GENERATE MASSIVE CRN DATASETS (INSTANTLY IN RAM)
# ------------------------------------------------------------------------------
print(f"Generating 100,000 CRN Training Trials and 100,000 Test Trials...")
H_csr, L_csr = H.tocsr(), L.tocsr()
num_edges = len(probs)

# TRAIN SET
rng_train = np.random.default_rng(42)
noise_train = (rng_train.random((trials, num_edges)) < probs).astype(np.uint8)
syn_train = np.asarray((H_csr @ noise_train.T).T % 2, dtype=np.uint8)
obs_train = np.asarray((L_csr @ noise_train.T).T % 2, dtype=np.uint8)[:, 0]

# TEST SET (Fresh Seed)
rng_test = np.random.default_rng(999)
noise_test = (rng_test.random((trials, num_edges)) < probs).astype(np.uint8)
syn_test = np.asarray((H_csr @ noise_test.T).T % 2, dtype=np.uint8)
obs_test = np.asarray((L_csr @ noise_test.T).T % 2, dtype=np.uint8)[:, 0]

print("CRN Generation Complete.\n")

# ------------------------------------------------------------------------------
# 3. FAST POLICY EVALUATOR
# ------------------------------------------------------------------------------
def evaluate_weights(w_V, w_H, dataset="train"):
    w_V, w_H = max(0.001, float(w_V)), max(0.001, float(w_H))

    weights = np.zeros(len(probs), dtype=float)
    for i, t in enumerate(edge_types):
        if t == 'V': weights[i] = w_V
        elif t == 'H': weights[i] = w_H
        else: weights[i] = 0.0 # Boundaries must be 0

    # C++ Compiler injects sparse matrices instantly
    matcher = pymatching.Matching.from_check_matrix(H, weights=weights, faults_matrix=L)

    if dataset == "train":
        pred = matcher.decode_batch(syn_train)[:, 0]
        actual = obs_train
    else:
        pred = matcher.decode_batch(syn_test)[:, 0]
        actual = obs_test

    # Fast boolean array of failures
    fails_arr = (pred != actual)
    return int(np.sum(fails_arr)), fails_arr

# ------------------------------------------------------------------------------
# 4. CPS OPTIMIZATION LOOP
# ------------------------------------------------------------------------------
print("Evaluating Naive Baseline (w_V=1.0, w_H=1.0)...")
base_k_train, _ = evaluate_weights(1.0, 1.0, "train")
print(f" -> Baseline Train Rate: {base_k_train/trials:.5f}\n")

pop_size = 12
generations = 8
retain = 4
population = [{'w_V': random.uniform(0.5, 1.5), 'w_H': random.uniform(0.5, 1.5)} for _ in range(pop_size)]

print("Starting CPS Evolutionary Engine...")
t0 = time.time()
best_history = []

for gen in range(generations):
    scored = []
    for theta in population:
        fails, _ = evaluate_weights(theta['w_V'], theta['w_H'], "train")
        scored.append((fails / trials, theta))

    scored.sort(key=lambda x: x[0])
    survivors = scored[:retain]
    best_rate, best_theta = survivors[0]
    best_history.append((best_rate, best_theta))

    print(f"GEN {gen+1:02d}/{generations} | Train Fail Rate: {best_rate:.5f} | Weights: w_V={best_theta['w_V']:.3f}, w_H={best_theta['w_H']:.3f}")

    new_pop = [s[1] for s in survivors]
    while len(new_pop) < pop_size:
        parent = random.choice(survivors)[1]
        child = {
            'w_V': max(0.01, parent['w_V'] + random.uniform(-0.15, 0.15)),
            'w_H': max(0.01, parent['w_H'] + random.uniform(-0.15, 0.15))
        }
        new_pop.append(child)
    population = new_pop

opt_theta = best_history[-1][1]
elapsed = time.time() - t0

# ------------------------------------------------------------------------------
# 5. HOLDOUT GENERALIZATION & STATISTICAL SIGNIFICANCE (McNemar)
# ------------------------------------------------------------------------------
base_k_test, base_fails_test = evaluate_weights(1.0, 1.0, "test")
opt_k_test, opt_fails_test = evaluate_weights(opt_theta['w_V'], opt_theta['w_H'], "test")

# Theoretical Max-Likelihood Match
w_V_th = math.log((1 - p_V) / p_V)
w_H_th = math.log((1 - p_H) / p_H)
th_k_test, th_fails_test = evaluate_weights(w_V_th, w_H_th, "test")

# Asymptotic McNemar Test Formulation (Continuity Corrected for large N)
n01 = int(np.sum((base_fails_test == True) & (opt_fails_test == False)))
n10 = int(np.sum((base_fails_test == False) & (opt_fails_test == True)))

if n01 + n10 == 0:
    p_val = 1.0
else:
    chi2_stat = (abs(n01 - n10) - 1)**2 / (n01 + n10)
    p_val = scipy.stats.chi2.sf(chi2_stat, 1)

print("\n" + "="*70)
print(f"CPS OPTIMIZATION COMPLETE IN {elapsed:.2f}s")
print("="*70)
print(f"TEST Baseline Failure Rate: {base_k_test/trials:.5f} ({base_k_test}/{trials})")
print(f"TEST AI Optimized Rate:     {opt_k_test/trials:.5f} ({opt_k_test}/{trials})")
print(f"TEST Theoretical Limit:     {th_k_test/trials:.5f} ({th_k_test}/{trials})")
print("-" * 70)
print("STATISTICAL SIGNIFICANCE (McNemar's Test on Test Set):")
print(f"  Failures avoided by AI (n01): {n01:,}")
print(f"  New failures caused (n10):    {n10:,}")
print(f"  p-value:                      {p_val:.2e} (< 0.05 is significant)")
if p_val < 0.0001:
    print("  -> RESULT IS STATISTICALLY IRONCLAD (p < 0.0001)")
print("="*70)

# Safe JSON Save
BASE_DIR = "/content/v2_shadowmap"
os.makedirs(BASE_DIR, exist_ok=True)
run_id = datetime.now().strftime("%Y%m%dT%H%M%S")
artifact = {
    "baseline_rate_test": float(base_k_test / trials),
    "opt_rate_test": float(opt_k_test / trials),
    "theory_rate_test": float(th_k_test / trials),
    "mcnemar_p_value": float(p_val),
    "n01": int(n01),
    "n10": int(n10),
    "opt_w_V": float(opt_theta['w_V']),
    "opt_w_H": float(opt_theta['w_H'])
}

with open(os.path.join(BASE_DIR, f"v2_100k_optimizer_{run_id}.json"), "w") as f:
    json.dump(artifact, f, indent=2)

V2 CELL 2: 100,000-TRIAL HARDWARE/SOFTWARE CO-DESIGN
Building topological matrices for d=9...
Generating 100,000 CRN Training Trials and 100,000 Test Trials...
CRN Generation Complete.

Evaluating Naive Baseline (w_V=1.0, w_H=1.0)...
 -> Baseline Train Rate: 0.03128

Starting CPS Evolutionary Engine...
GEN 01/8 | Train Fail Rate: 0.01954 | Weights: w_V=0.855, w_H=1.151
GEN 02/8 | Train Fail Rate: 0.01711 | Weights: w_V=0.576, w_H=0.941
GEN 03/8 | Train Fail Rate: 0.01711 | Weights: w_V=0.576, w_H=0.941
GEN 04/8 | Train Fail Rate: 0.01711 | Weights: w_V=0.576, w_H=0.941
GEN 05/8 | Train Fail Rate: 0.01711 | Weights: w_V=0.576, w_H=0.941
GEN 06/8 | Train Fail Rate: 0.01711 | Weights: w_V=0.576, w_H=0.941
GEN 07/8 | Train Fail Rate: 0.01711 | Weights: w_V=0.576, w_H=0.941
GEN 08/8 | Train Fail Rate: 0.01711 | Weights: w_V=0.576, w_H=0.941

CPS OPTIMIZATION COMPLETE IN 53.33s
TEST Baseline Failure Rate: 0.03172 (3172/100000)
TEST AI Optimized Rate:     0.01775 (1775/100000)
TEST Theoretica

In [5]:
# ==============================================================================
# V2 CELL 3 — THE SPATIAL MAP CHALLENGE: DISCOVERING A HARDWARE DEFECT
# Run BELOW V2 Cell 2
# ==============================================================================

import pymatching
import scipy.sparse as sp
import scipy.stats
import numpy as np
import random, os, time, math

print("="*70)
print("V2 CELL 3: SPATIALLY VARYING DEFECTS (THE 'DEAD ZONE')")
print("="*70)

d = 9
trials = 80000

# 1. The Environment: A chip with a "Dead Zone" at Column 4
bad_col = 4
p_good = 0.04
p_bad  = 0.16

print(f"Fabricating d={d} chip with a massive defect at Column {bad_col}...")
print(f" -> Healthy regions: {p_good*100}% error rate")
print(f" -> Dead Zone (Col {bad_col}): {p_bad*100}% error rate")

H_rows, H_cols, L_cols, probs = [], [], [], []
edge_types, edge_cols = [], []
edge_idx = 0
j_cut = d // 2

for r in range(d):
    for c in range(d):
        u = r * d + c

        # Horizontal Edges
        if c + 1 < d:
            H_rows.extend([u, r * d + (c + 1)]); H_cols.extend([edge_idx, edge_idx])
            probs.append(p_good); edge_types.append('H'); edge_cols.append(c)
            if c == j_cut: L_cols.append(edge_idx)
            edge_idx += 1

        # Vertical Edges (THIS IS WHERE THE DEFECT LIVES)
        if r + 1 < d:
            H_rows.extend([u, (r + 1) * d + c]); H_cols.extend([edge_idx, edge_idx])
            p_edge = p_bad if c == bad_col else p_good
            probs.append(p_edge); edge_types.append('V'); edge_cols.append(c)
            edge_idx += 1

        # Left/Right Boundaries
        if c == 0 or c == d - 1:
            H_rows.append(u); H_cols.append(edge_idx)
            probs.append(0.0); edge_types.append('B'); edge_cols.append(c)
            edge_idx += 1

H = sp.csc_matrix(([1]*len(H_rows), (H_rows, H_cols)), shape=(d*d, edge_idx), dtype=np.uint8)
L = sp.csc_matrix(([1]*len(L_cols), ([0]*len(L_cols), L_cols)), shape=(1, edge_idx), dtype=np.uint8)
probs = np.array(probs)

# 2. GENERATE CRN DATASETS
print(f"Generating {trials:,} CRN Trials for Train and Test sets...")
H_csr, L_csr = H.tocsr(), L.tocsr()
num_edges = len(probs)

def make_crn(seed):
    rng = np.random.default_rng(seed)
    noise = (rng.random((trials, num_edges)) < probs).astype(np.uint8)
    syn = np.asarray((H_csr @ noise.T).T % 2, dtype=np.uint8)
    obs = np.asarray((L_csr @ noise.T).T % 2, dtype=np.uint8)[:, 0]
    return syn, obs

syn_train, obs_train = make_crn(42)
syn_test,  obs_test  = make_crn(999)

# 3. FAST SPATIAL POLICY EVALUATOR
is_H = np.array([t == 'H' for t in edge_types])
is_V = np.array([t == 'V' for t in edge_types])
is_B = np.array([t == 'B' for t in edge_types])
col_idx_arr = np.array(edge_cols)

def evaluate_spatial_map(w_H, w_V_arr, dataset="train"):
    weights = np.zeros(num_edges, dtype=np.float32)
    weights[is_H] = max(0.01, w_H)
    weights[is_V] = np.maximum(0.01, w_V_arr[col_idx_arr[is_V]])
    weights[is_B] = 0.0

    matcher = pymatching.Matching.from_check_matrix(H, weights=weights, faults_matrix=L)

    if dataset == "train":
        pred = matcher.decode_batch(syn_train)[:, 0]
        fails = (pred != obs_train)
    else:
        pred = matcher.decode_batch(syn_test)[:, 0]
        fails = (pred != obs_test)

    return int(np.sum(fails)), fails

# 4. MULTI-DIMENSIONAL CPS OPTIMIZATION LOOP
print("\nEvaluating Naive Baseline (Uniform weights = 1.0)...")
base_k_train, _ = evaluate_spatial_map(1.0, np.ones(d), "train")
print(f" -> Baseline Train Rate: {base_k_train/trials:.5f}\n")

# AI learns 10 independent parameters!
pop_size = 20
generations = 15
retain = 6

population = [
    {
        'w_H': random.uniform(0.8, 1.2),
        'w_V': np.random.uniform(0.8, 1.2, d)
    } for _ in range(pop_size)
]

print("Starting Spatial CPS Evolutionary Engine (Optimizing 10 parameters)...")
t0 = time.time()
best_history = []

for gen in range(generations):
    scored = []
    for theta in population:
        fails, _ = evaluate_spatial_map(theta['w_H'], theta['w_V'], "train")
        scored.append((fails / trials, theta))

    scored.sort(key=lambda x: x[0])
    survivors = scored[:retain]
    best_rate, best_theta = survivors[0]
    best_history.append((best_rate, best_theta))

    w_V_norm = best_theta['w_V'][0]
    w_V_def  = best_theta['w_V'][bad_col]
    print(f"GEN {gen+1:02d}/{generations} | Fail: {best_rate:.5f} | w_V_normal: {w_V_norm:.2f} | w_V_DEFECT: {w_V_def:.2f}")

    new_pop = [s[1] for s in survivors]
    while len(new_pop) < pop_size:
        parent = random.choice(survivors)[1]
        # Mutate the spatial array with Gaussian noise (explore heavily)
        child = {
            'w_H': max(0.01, parent['w_H'] + random.uniform(-0.15, 0.15)),
            'w_V': np.maximum(0.01, parent['w_V'] + np.random.normal(0, 0.15, d))
        }
        new_pop.append(child)
    population = new_pop

opt_theta = best_history[-1][1]
elapsed = time.time() - t0

# 5. HOLDOUT GENERALIZATION & THEORETICAL COMPARISON
base_k_test, base_fails = evaluate_spatial_map(1.0, np.ones(d), "test")
opt_k_test, opt_fails   = evaluate_spatial_map(opt_theta['w_H'], opt_theta['w_V'], "test")

# Theoretical Max-Likelihood Map
th_w_H = math.log((1 - p_good) / p_good)
th_w_V = np.zeros(d)
for c in range(d):
    p_c = p_bad if c == bad_col else p_good
    th_w_V[c] = math.log((1 - p_c) / p_c)

th_k_test, th_fails = evaluate_spatial_map(th_w_H, th_w_V, "test")

n01 = int(np.sum((base_fails == True) & (opt_fails == False)))
n10 = int(np.sum((base_fails == False) & (opt_fails == True)))
chi2_stat = (abs(n01 - n10) - 1)**2 / (n01 + n10) if (n01 + n10) > 0 else 0
p_val = scipy.stats.chi2.sf(chi2_stat, 1) if (n01 + n10) > 0 else 1.0

print("\n" + "="*70)
print(f"SPATIAL OPTIMIZATION COMPLETE IN {elapsed:.2f}s")
print("="*70)
print(f"TEST Baseline Failure Rate: {base_k_test/trials:.5f} ({base_k_test}/{trials})")
print(f"TEST SHADOWMAP() Optimized Rate:     {opt_k_test/trials:.5f} ({opt_k_test}/{trials})")
print(f"TEST Theoretical Limit:     {th_k_test/trials:.5f} ({th_k_test}/{trials})")
print("-" * 70)
print("STATISTICAL SIGNIFICANCE (McNemar Test):")
print(f"  Failures avoided by AI: {n01:,}")
print(f"  New failures caused:    {n10:,}")
print(f"  p-value:                {p_val:.2e}")
print("="*70)

print("\nSHADOWMAP() DISCOVERED HARDWARE MAP (Vertical Weights by Column):")
print("Col | True Error Rate | SHADOWMAP() Weight (Scaled) | Theoretical Opt")
print("-" * 70)

scale_factor = th_w_H / opt_theta['w_H']
scaled_ai_w_V = opt_theta['w_V'] * scale_factor

for c in range(d):
    true_p = p_bad if c == bad_col else p_good
    tag = "<-- DEAD ZONE DISCOVERED!" if c == bad_col else ""
    print(f"{c:>3} | {true_p:>15.2f} | {scaled_ai_w_V[c]:^18.3f} | {th_w_V[c]:^15.3f} {tag}")
print("="*70)

V2 CELL 3: SPATIALLY VARYING DEFECTS (THE 'DEAD ZONE')
Fabricating d=9 chip with a massive defect at Column 4...
 -> Healthy regions: 4.0% error rate
 -> Dead Zone (Col 4): 16.0% error rate
Generating 80,000 CRN Trials for Train and Test sets...

Evaluating Naive Baseline (Uniform weights = 1.0)...
 -> Baseline Train Rate: 0.00768

Starting Spatial CPS Evolutionary Engine (Optimizing 10 parameters)...
GEN 01/15 | Fail: 0.00609 | w_V_normal: 1.10 | w_V_DEFECT: 0.81
GEN 02/15 | Fail: 0.00556 | w_V_normal: 1.03 | w_V_DEFECT: 0.64
GEN 03/15 | Fail: 0.00556 | w_V_normal: 1.03 | w_V_DEFECT: 0.64
GEN 04/15 | Fail: 0.00556 | w_V_normal: 1.03 | w_V_DEFECT: 0.64
GEN 05/15 | Fail: 0.00556 | w_V_normal: 1.03 | w_V_DEFECT: 0.64
GEN 06/15 | Fail: 0.00556 | w_V_normal: 1.03 | w_V_DEFECT: 0.64
GEN 07/15 | Fail: 0.00556 | w_V_normal: 1.03 | w_V_DEFECT: 0.64
GEN 08/15 | Fail: 0.00556 | w_V_normal: 1.03 | w_V_DEFECT: 0.64
GEN 09/15 | Fail: 0.00556 | w_V_normal: 1.03 | w_V_DEFECT: 0.64
GEN 10/15 | Fail: 0

In [6]:
# ==============================================================================
# V2 CELL 4 — THE MASTER ARTIFACT: SHADOWMAP CALIBRATOR CLASS
# Run this BELOW V2 Cell 3
# ==============================================================================

import pymatching
import scipy.sparse as sp
import scipy.stats
import numpy as np
import random, time, math

class ShadowMapCalibrator:
    """
    SHADOWMAP: Utility-Scale Hardware-Software Co-Design Engine.
    Discovers spatial defect maps on quantum chips using evolutionary MWPM weight calibration.
    """
    def __init__(self, d=9, p_healthy=0.04, dead_zones=None, trials=80000, seed=42):
        self.d = d
        self.p_good = p_healthy
        self.dead_zones = dead_zones or {} # e.g., {4: 0.16, 7: 0.10}
        self.trials = trials
        self.seed = seed

        self.H = None
        self.L = None
        self.probs = None
        self.edge_types = []
        self.edge_cols = []

        self.syn_train, self.obs_train = None, None
        self.syn_test,  self.obs_test  = None, None

        self.opt_theta = None

        print(f"[*] Fabricating d={self.d} Virtual Chip...")
        self._fabricate_chip()
        self._generate_crn_datasets()

    def _fabricate_chip(self):
        H_rows, H_cols, L_cols, probs = [], [], [], []
        edge_idx = 0
        j_cut = self.d // 2

        for r in range(self.d):
            for c in range(self.d):
                u = r * self.d + c

                # Horizontal
                if c + 1 < self.d:
                    H_rows.extend([u, r * self.d + (c + 1)]); H_cols.extend([edge_idx, edge_idx])
                    probs.append(self.p_good); self.edge_types.append('H'); self.edge_cols.append(c)
                    if c == j_cut: L_cols.append(edge_idx)
                    edge_idx += 1

                # Vertical (WHERE DEFECTS LIVE)
                if r + 1 < self.d:
                    H_rows.extend([u, (r + 1) * self.d + c]); H_cols.extend([edge_idx, edge_idx])
                    p_edge = self.dead_zones.get(c, self.p_good)
                    probs.append(p_edge); self.edge_types.append('V'); self.edge_cols.append(c)
                    edge_idx += 1

                # Boundaries
                if c == 0 or c == self.d - 1:
                    H_rows.append(u); H_cols.append(edge_idx)
                    probs.append(0.0); self.edge_types.append('B'); self.edge_cols.append(c)
                    edge_idx += 1

        self.H = sp.csc_matrix(([1]*len(H_rows), (H_rows, H_cols)), shape=(self.d*self.d, edge_idx), dtype=np.uint8)
        self.L = sp.csc_matrix(([1]*len(L_cols), ([0]*len(L_cols), L_cols)), shape=(1, edge_idx), dtype=np.uint8)
        self.probs = np.array(probs)

        self.is_H = np.array([t == 'H' for t in self.edge_types])
        self.is_V = np.array([t == 'V' for t in self.edge_types])
        self.is_B = np.array([t == 'B' for t in self.edge_types])
        self.col_idx = np.array(self.edge_cols)

    def _generate_crn_datasets(self):
        print(f"[*] Generating {self.trials:,} CRN Trials for Train/Test...")
        H_csr, L_csr = self.H.tocsr(), self.L.tocsr()
        num_edges = len(self.probs)

        def make_crn(s):
            rng = np.random.default_rng(s)
            noise = (rng.random((self.trials, num_edges)) < self.probs).astype(np.uint8)
            syn = np.asarray((H_csr @ noise.T).T % 2, dtype=np.uint8)
            obs = np.asarray((L_csr @ noise.T).T % 2, dtype=np.uint8)[:, 0]
            return syn, obs

        self.syn_train, self.obs_train = make_crn(self.seed)
        self.syn_test,  self.obs_test  = make_crn(self.seed + 999)
        print("[*] Hardware Environment Frozen.")

    def evaluate_policy(self, w_H, w_V_arr, dataset="train"):
        weights = np.zeros(len(self.probs), dtype=np.float32)
        weights[self.is_H] = max(0.01, w_H)
        weights[self.is_V] = np.maximum(0.01, w_V_arr[self.col_idx[self.is_V]])
        weights[self.is_B] = 0.0

        matcher = pymatching.Matching.from_check_matrix(self.H, weights=weights, faults_matrix=self.L)

        if dataset == "train":
            fails = (matcher.decode_batch(self.syn_train)[:, 0] != self.obs_train)
        else:
            fails = (matcher.decode_batch(self.syn_test)[:, 0] != self.obs_test)

        return int(np.sum(fails)), fails

    def fit(self, pop_size=20, generations=15, retain=6):
        print(f"\n[*] Starting SHADOWMAP Evolutionary Optimization...")
        t0 = time.time()

        population = [{'w_H': random.uniform(0.8, 1.2), 'w_V': np.random.uniform(0.8, 1.2, self.d)} for _ in range(pop_size)]

        for gen in range(generations):
            scored = []
            for theta in population:
                fails, _ = self.evaluate_policy(theta['w_H'], theta['w_V'], "train")
                scored.append((fails / self.trials, theta))

            scored.sort(key=lambda x: x[0])
            survivors = scored[:retain]
            best_rate, self.opt_theta = survivors[0]

            print(f" -> GEN {gen+1:02d}/{generations} | Lowest Train Fail Rate: {best_rate:.5f}")

            new_pop = [s[1] for s in survivors]
            while len(new_pop) < pop_size:
                parent = random.choice(survivors)[1]
                new_pop.append({
                    'w_H': max(0.01, parent['w_H'] + random.uniform(-0.15, 0.15)),
                    'w_V': np.maximum(0.01, parent['w_V'] + np.random.normal(0, 0.15, self.d))
                })
            population = new_pop

        self.fit_time = time.time() - t0
        print(f"[*] Optimization Complete in {self.fit_time:.2f}s")

    def report(self):
        print("\n" + "="*70)
        print("SHADOWMAP() FINAL VALIDATION REPORT")
        print("="*70)

        base_k, base_fails = self.evaluate_policy(1.0, np.ones(self.d), "test")
        opt_k, opt_fails   = self.evaluate_policy(self.opt_theta['w_H'], self.opt_theta['w_V'], "test")

        th_w_H = math.log((1 - self.p_good) / self.p_good)
        th_w_V = np.array([math.log((1 - self.dead_zones.get(c, self.p_good)) / self.dead_zones.get(c, self.p_good)) for c in range(self.d)])
        th_k, th_fails = self.evaluate_policy(th_w_H, th_w_V, "test")

        n01 = int(np.sum((base_fails == True) & (opt_fails == False)))
        n10 = int(np.sum((base_fails == False) & (opt_fails == True)))
        chi2_stat = (abs(n01 - n10) - 1)**2 / (n01 + n10) if (n01 + n10) > 0 else 0
        p_val = scipy.stats.chi2.sf(chi2_stat, 1) if (n01 + n10) > 0 else 1.0

        print(f"TEST Baseline Failure Rate: {base_k/self.trials:.5f}")
        print(f"TEST SHADOWMAP Optimized:   {opt_k/self.trials:.5f}  <---")
        print(f"TEST Theoretical Analytic:  {th_k/self.trials:.5f}")
        print(f"McNemar p-value:            {p_val:.2e}")

        print("\nDISCOVERED HARDWARE MAP (Vertical Weights):")
        print("Col | True Error | SHADOWMAP (Scaled) | Theoretical Opt")
        print("-" * 65)

        scale_factor = th_w_H / self.opt_theta['w_H']
        scaled_w_V = self.opt_theta['w_V'] * scale_factor

        for c in range(self.d):
            p_c = self.dead_zones.get(c, self.p_good)
            tag = "<-- DEFECT FOUND" if c in self.dead_zones else ""
            print(f"{c:>3} | {p_c:>10.2f} | {scaled_w_V[c]:^18.3f} | {th_w_V[c]:^15.3f} {tag}")
        print("="*70)

# --- EXECUTE THE API ---
if __name__ == "__main__":
    # Let's test it with TWO dead zones! Base error 3%, Col 2 is 12%, Col 8 is 20%.
    calibrator = ShadowMapCalibrator(d=11, p_healthy=0.03, dead_zones={2: 0.12, 8: 0.20}, trials=80000)
    calibrator.fit()
    calibrator.report()

[*] Fabricating d=11 Virtual Chip...
[*] Generating 80,000 CRN Trials for Train/Test...
[*] Hardware Environment Frozen.

[*] Starting SHADOWMAP Evolutionary Optimization...
 -> GEN 01/15 | Lowest Train Fail Rate: 0.00081
 -> GEN 02/15 | Lowest Train Fail Rate: 0.00075
 -> GEN 03/15 | Lowest Train Fail Rate: 0.00072
 -> GEN 04/15 | Lowest Train Fail Rate: 0.00065
 -> GEN 05/15 | Lowest Train Fail Rate: 0.00065
 -> GEN 06/15 | Lowest Train Fail Rate: 0.00065
 -> GEN 07/15 | Lowest Train Fail Rate: 0.00063
 -> GEN 08/15 | Lowest Train Fail Rate: 0.00060
 -> GEN 09/15 | Lowest Train Fail Rate: 0.00060
 -> GEN 10/15 | Lowest Train Fail Rate: 0.00060
 -> GEN 11/15 | Lowest Train Fail Rate: 0.00060
 -> GEN 12/15 | Lowest Train Fail Rate: 0.00059
 -> GEN 13/15 | Lowest Train Fail Rate: 0.00059
 -> GEN 14/15 | Lowest Train Fail Rate: 0.00059
 -> GEN 15/15 | Lowest Train Fail Rate: 0.00059
[*] Optimization Complete in 105.62s

SHADOWMAP() FINAL VALIDATION REPORT
TEST Baseline Failure Rate: 0.00