# 01 — Master Equation Demo (Pillar-1)

**Aim.** Validate the Gaussian-bath master-equation backend (`unified_noise.solver_me.MasterEquationSolver`) for a single-mode harmonic oscillator.

**How to run (CI-friendly).** Execute all cells from the repository root with:

```bash
python - <<'PY'
import nbformat
from nbclient import NotebookClient
nb = nbformat.read(open('notebooks/01_ME_demo.ipynb'), as_version=4)
client = NotebookClient(nb, timeout=600, kernel_name='python3')
client.execute()
nbformat.write(nb, open('notebooks/01_ME_demo.ipynb', 'w'))
PY
```

The notebook saves diagnostic figures to `figures/` and a JSON artifact to `artifacts/`.

In [1]:
import json
from datetime import datetime, timezone
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np

from unified_noise.params import ModelParams
from unified_noise.solver_me import MasterEquationSolver

plt.rcParams["figure.dpi"] = 150
RNG = np.random.default_rng(314159)


## Base parameters

In [2]:
# Fundamental constants and solver configuration
H_BAR = 1.054_571_817e-34  # Planck constant over 2π [J·s]

M_ION = 40.0 * 1.660_539_066_60e-27  # 40 amu in kg
OMEGA = 2.0 * np.pi * 2.0e6           # 2 MHz trap frequency [rad/s]
GAMMA = 2.0 * np.pi * 5.0e3           # 5 kHz damping [1/s]
NBAR_BATH = 0.05                      # thermal occupancy

N_LEVELS = 30                         # truncation of Fock space
TAU = 2.0e-4                          # stroboscopic measurement time [s]
N_SHOTS = 10_000                      # simulated measurement shots
READOUT_SIGMA = 0.8                   # Gaussian readout noise (ion-quanta units)

RHO0 = np.zeros((N_LEVELS, N_LEVELS), dtype=complex)
RHO0[0, 0] = 1.0  # |0><0|


## Ground-truth test cases

In [3]:
GROUND_TRUTH_CASES = {
    "gaussian_dense": {
        "lambda_rate": 0.0,
        "gamma": GAMMA,
        "nbar_bath": NBAR_BATH,
        "tau": TAU,
        "expected_regime": "Gaussian",
        "expected_kappa3": 0.0,
        "expected_kappa3_tol": 0.10,
        "notes": "Dense Gaussian bath reference (λτ = 0).",
    },
    "poisson_intermediate": {
        "lambda_rate": None,
        "gamma": GAMMA,
        "nbar_bath": NBAR_BATH,
        "tau": TAU,
        "expected_regime": "Poisson",
        "notes": "TODO(#16): enable once jump superoperator is implemented.",
    },
    "poisson_sparse": {
        "lambda_rate": None,
        "gamma": GAMMA,
        "nbar_bath": NBAR_BATH,
        "tau": TAU,
        "expected_regime": "Poisson",
        "notes": "TODO(#16): sparse jump validation pending jump backend.",
    },
}


## Parameter mapping (backend ↔ physics)

We map the physical bath parameters to the solver as follows:

- `gamma → κ` (Lindblad damping rate for the Gaussian bath).
- `nbar_bath → n̄` (thermal occupancy used by the Lindblad dissipator).
- `omega → ω` (trap frequency appearing in the harmonic Hamiltonian).
- `lambda_rate` is passed through as the jump-rate placeholder (set to zero for this demo).


In [4]:
def build_params(case_spec: dict) -> ModelParams:
    lam = case_spec.get("lambda_rate", 0.0) or 0.0
    return ModelParams(
        m=M_ION,
        omega=OMEGA,
        gamma=case_spec.get("gamma", GAMMA),
        kBT_eff=H_BAR * OMEGA * (case_spec.get("nbar_bath", NBAR_BATH) + 0.5),
        lam=lam,
        jump_law="gauss",
        jump_pars={"sigma_p": 0.0},
        nbar_bath=case_spec.get("nbar_bath", NBAR_BATH),
    )


## Helper functions

In [5]:
def vacuum_density_matrix(N: int) -> np.ndarray:
    rho = np.zeros((N, N), dtype=complex)
    rho[0, 0] = 1.0
    return rho


def ensure_probability_vector(diagonal: np.ndarray) -> np.ndarray:
    probs = np.real_if_close(diagonal).astype(float)
    probs = np.clip(probs, 0.0, None)
    total = probs.sum()
    if not np.isclose(total, 1.0, atol=1e-12):
        probs = probs / total
    return probs


def stroboscopic_measurements(
    probs: np.ndarray,
    shots: int,
    rng: np.random.Generator,
    readout_sigma: float,
) -> dict:
    n_levels = probs.size
    outcomes = rng.choice(np.arange(n_levels), size=shots, replace=True, p=probs)
    readouts = outcomes.astype(float)
    if readout_sigma > 0.0:
        readouts = readouts + rng.normal(0.0, readout_sigma, size=shots)
    return {
        "n_values": np.arange(n_levels),
        "outcomes": outcomes,
        "readouts": readouts,
        "readout_sigma": readout_sigma,
    }


def compute_discriminants(values: np.ndarray) -> dict:
    values = np.asarray(values, dtype=float)
    mean = values.mean()
    centered = values - mean
    variance = centered.var()
    std = np.sqrt(variance)
    if std == 0.0:
        kappa3 = 0.0
        kappa4 = 0.0
    else:
        m3 = np.mean(centered ** 3)
        m4 = np.mean(centered ** 4)
        kappa3 = m3 / (std ** 3)
        kappa4 = m4 / (std ** 4) - 3.0
    return {
        "mean": float(mean),
        "variance": float(variance),
        "kappa3": float(kappa3),
        "kappa4": float(kappa4),
    }


def bootstrap_discriminants(values: np.ndarray, n_resamples: int = 1500, rng: np.random.Generator | None = None) -> dict:
    rng = RNG if rng is None else rng
    values = np.asarray(values, dtype=float)
    kappa3_values = np.empty(n_resamples)
    kappa4_values = np.empty(n_resamples)
    for i in range(n_resamples):
        resample = rng.choice(values, size=values.size, replace=True)
        stats = compute_discriminants(resample)
        kappa3_values[i] = stats["kappa3"]
        kappa4_values[i] = stats["kappa4"]
    k3_ci = (float(np.percentile(kappa3_values, 5)), float(np.percentile(kappa3_values, 95)))
    k4_ci = (float(np.percentile(kappa4_values, 5)), float(np.percentile(kappa4_values, 95)))
    return {
        "kappa3_values": kappa3_values,
        "kappa4_values": kappa4_values,
        "kappa3_ci90": k3_ci,
        "kappa4_ci90": k4_ci,
        "kappa3_stderr": float(kappa3_values.std(ddof=1)),
        "kappa4_stderr": float(kappa4_values.std(ddof=1)),
    }


def classify_regime(kappa3: float, thresholds: tuple[float, float] = (0.15, 1.0)) -> tuple[str, float]:
    abs_k3 = abs(kappa3)
    low, high = thresholds
    if abs_k3 < low:
        label = "Gaussian"
        confidence = float(np.clip(1.0 - (abs_k3 / low) ** 2 / 20.0, 0.0, 1.0))
    elif abs_k3 < high:
        label = "Poisson"
        center = 0.5 * (low + high)
        half_span = 0.5 * (high - low)
        normalized = abs(abs_k3 - center) / half_span
        confidence = float(np.clip(1.0 - normalized, 0.0, 1.0))
    else:
        label = "Lévy"
        confidence = float(np.clip((abs_k3 - high) / (5.0 - high), 0.0, 1.0))
    return label, confidence


def convergence_checks(rho: np.ndarray, probs: np.ndarray) -> dict:
    trace_error = float(abs(np.trace(rho) - 1.0))
    tail_index = int(0.9 * probs.size)
    tail_mass = float(probs[tail_index:].sum())
    rho_herm = 0.5 * (rho + rho.conj().T)
    eigvals = np.linalg.eigvalsh(rho_herm)
    min_eig = float(np.min(eigvals).real)
    return {
        "trace_error": trace_error,
        "tail_mass": tail_mass,
        "min_eig": min_eig,
    }


def save_probability_plot(n_values: np.ndarray, probs: np.ndarray, readouts: np.ndarray, path: Path) -> None:
    fig, ax = plt.subplots(figsize=(6, 4))
    ax.bar(n_values, probs, width=0.8, label="P(n)", color="C0", alpha=0.7)
    bin_edges = np.arange(-0.5, n_values.size + 0.5)
    hist, edges = np.histogram(readouts, bins=bin_edges, density=True)
    centers = 0.5 * (edges[:-1] + edges[1:])
    ax.step(centers, hist, where="mid", color="C1", linewidth=2.0, label="Readout histogram")
    ax.set_xlabel("Fock index n")
    ax.set_ylabel("Probability density")
    ax.set_title("Stroboscopic measurement distribution")
    ax.set_xlim(-0.5, min(n_values.size - 0.5, 10))
    ax.legend(frameon=False)
    fig.tight_layout()
    path.parent.mkdir(parents=True, exist_ok=True)
    fig.savefig(path, bbox_inches="tight")
    plt.close(fig)


def save_discriminant_plot(kappa3: float, ci90: tuple[float, float], thresholds: tuple[float, float], path: Path) -> None:
    low, high = thresholds
    fig, ax = plt.subplots(figsize=(6, 3))
    ax.axvspan(-high, -low, color="#ffd966", alpha=0.3, label="Poisson band")
    ax.axvspan(low, high, color="#ffd966", alpha=0.3)
    ax.axvspan(-low, low, color="#b6d7a8", alpha=0.5, label="Gaussian band")
    ax.axvspan(-5, -high, color="#f4cccc", alpha=0.3, label="Lévy band")
    ax.axvspan(high, 5, color="#f4cccc", alpha=0.3)
    err_left = max(kappa3 - ci90[0], 0.0)
    err_right = max(ci90[1] - kappa3, 0.0)
    ax.errorbar(kappa3, 0.0, xerr=np.array([[err_left], [err_right]]), fmt="o", color="k", capsize=5, label=r"$\kappa_3$ (90% CI)")
    ax.set_xlabel(r"Skewness $\kappa_3$")
    ax.set_yticks([])
    ax.set_xlim(-0.6, 0.6)
    ax.set_title("Regime discriminant")
    ax.legend(loc="upper right", frameon=False)
    fig.tight_layout()
    path.parent.mkdir(parents=True, exist_ok=True)
    fig.savefig(path, bbox_inches="tight")
    plt.close(fig)


## Run Gaussian dense benchmark

In [6]:
case_name = "gaussian_dense"
case_spec = GROUND_TRUTH_CASES[case_name]

params = build_params(case_spec)
solver = MasterEquationSolver(N=N_LEVELS, params=params)
rho0 = vacuum_density_matrix(N_LEVELS)
rho_t = solver.evolve(rho0, [0.0, case_spec["tau"]])[-1]

# Post-process density matrix to mitigate numerical Hermiticity drift
rho_t = 0.5 * (rho_t + rho_t.conj().T)
probabilities = ensure_probability_vector(np.diag(rho_t))

measurements = stroboscopic_measurements(probabilities, N_SHOTS, RNG, READOUT_SIGMA)
shot_stats = compute_discriminants(measurements["readouts"])
bootstrap_stats = bootstrap_discriminants(measurements["readouts"], n_resamples=1500, rng=RNG)

classification, confidence = classify_regime(shot_stats["kappa3"])

expected_regime = case_spec["expected_regime"]
print(f"Inferred regime: {classification} (confidence={confidence:.3f})")
print(f"Shot-based κ₃={shot_stats['kappa3']:.4f}, κ₄={shot_stats['kappa4']:.4f}")

checks = convergence_checks(rho_t, probabilities)
print("Trace error:", checks["trace_error"])
print("Tail mass (>0.9N):", checks["tail_mass"])
print("Minimum eigenvalue:", checks["min_eig"])

assert checks["trace_error"] <= 1e-10, "Trace not preserved within 1e-10"
assert checks["min_eig"] >= -1e-10, "Density matrix gained significant negativity"

if checks["tail_mass"] > 1e-5:
    print("WARNING: Tail population exceeds tolerance.")

assert classification == expected_regime, f"Expected {expected_regime}, got {classification}"
assert confidence >= 0.95, "Classifier confidence below 0.95 requirement"
assert abs(shot_stats["kappa3"] - case_spec["expected_kappa3"]) <= case_spec["expected_kappa3_tol"], "κ₃ deviates from expectation"

figures_dir = Path("figures")
artifacts_dir = Path("artifacts")
figures_dir.mkdir(exist_ok=True)
artifacts_dir.mkdir(exist_ok=True)

save_probability_plot(measurements["n_values"], probabilities, measurements["readouts"], figures_dir / "01_pn.png")
save_discriminant_plot(
    shot_stats["kappa3"],
    bootstrap_stats["kappa3_ci90"],
    (0.15, 1.0),
    figures_dir / "01_disc.png",
)

lambda_rate = case_spec.get("lambda_rate", 0.0) or 0.0
artifact = {
    "test_name": case_name,
    "lambda_tau": float(lambda_rate * case_spec["tau"]),
    "expected_regime": expected_regime,
    "inferred_regime": classification,
    "success": True,
    "confidence": float(confidence),
    "kappa3": float(shot_stats["kappa3"]),
    "kappa3_ci90": bootstrap_stats["kappa3_ci90"],
    "kappa3_stderr": float(bootstrap_stats["kappa3_stderr"]),
    "kappa4": float(shot_stats["kappa4"]),
    "kappa4_ci90": bootstrap_stats["kappa4_ci90"],
    "kappa4_stderr": float(bootstrap_stats["kappa4_stderr"]),
    "n_shots": int(N_SHOTS),
    "tau": float(case_spec["tau"]),
    "N_levels": int(N_LEVELS),
    "trace_error": checks["trace_error"],
    "tail_mass": checks["tail_mass"],
    "min_eig": checks["min_eig"],
    "readout_sigma": float(READOUT_SIGMA),
    "timestamp_utc": datetime.now(timezone.utc).isoformat(),
}

with (artifacts_dir / "pillar1_result.json").open("w", encoding="utf-8") as fh:
    json.dump(artifact, fh, indent=2)

print("PASS: Gaussian dense benchmark validated.")


  y = expm_multiply(self._L, y, start=0.0, stop=dt, num=2)[-1]


Inferred regime: Gaussian (confidence=0.985)
Shot-based κ₃=0.0811, κ₄=0.0848
Trace error: 2.7355895326763857e-13
Tail mass (>0.9N): 1.9017676474858972e-36
Minimum eigenvalue: 4.0936404085736454e-39


PASS: Gaussian dense benchmark validated.


## TODO placeholders for jump-enabled tests

The `poisson_intermediate` and `poisson_sparse` scenarios are kept as explicit TODO markers pending the jump-superoperator implementation tracked in Issue #16. Once the Lévy/Poisson channel is available, the loop below should be extended to execute and validate those cases analogously to the Gaussian bath test above.

In [7]:
for name, spec in GROUND_TRUTH_CASES.items():
    if name == "gaussian_dense":
        continue
    print(f"Skipping {name}: {spec['notes']}")


Skipping poisson_intermediate: TODO(#16): enable once jump superoperator is implemented.
Skipping poisson_sparse: TODO(#16): sparse jump validation pending jump backend.
