In [8]:
# === Supernova cosmology test suite — one-shot Colab cell ===
# Installs, downloads data, fits models, computes fair metrics, saves plots+CSV.
# No seaborn, one figure per chart, default matplotlib colors only.

# 0) Install deps quietly quietly (safe in Colab/notebook too)
import sys, subprocess, importlib
def _pip(pkg): subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg])
for pkg in ["numpy", "pandas", "matplotlib", "scipy", "requests"]:
    try:
        importlib.import_module(pkg)
    except Exception:
        _pip(pkg)

import os, io, math, json, warnings, numpy as np, pandas as pd, matplotlib.pyplot as plt
from dataclasses import dataclass, field
from typing import Dict, Tuple, Optional, List
warnings.filterwarnings("ignore")

# SciPy
from scipy.optimize import minimize
from numpy.random import default_rng

# --------------------------- Utilities ---------------------------

def http_get(url: str, timeout: int = 30) -> Optional[bytes]:
    import requests
    try:
        r = requests.get(url, timeout=timeout)
        if r.status_code == 200:
            return r.content
        return None
    except Exception:
        return None

def safe_logdet(C: np.ndarray) -> float:
    sign, logdet = np.linalg.slogdet(C)
    if not np.isfinite(logdet) or sign <= 0:
        eps = 1e-12 * float(np.mean(np.diag(C)))
        C = C + eps * np.eye(C.shape[0])
        sign, logdet = np.linalg.slogdet(C)
    return float(logdet)

def analytic_delta_chi2(mu_obs: np.ndarray, mu_model: np.ndarray, Cinv: np.ndarray) -> Tuple[float, float]:
    """Best-fit intercept Δ and minimal chi^2 for residuals r = mu_obs - (mu_model + Δ)."""
    r = mu_obs - mu_model
    u = np.ones_like(r)
    # Ensure matrix dimensions are compatible
    if u.ndim != 1 or Cinv.ndim != 2 or u.shape[0] != Cinv.shape[0] or Cinv.shape[0] != Cinv.shape[1]:
         raise ValueError("Incompatible dimensions for matrix multiplication in analytic_delta_chi2")
    A = float(u @ (Cinv @ u))
    B = float(u @ (Cinv @ r))
    # Handle case where A is close to zero
    if abs(A) < 1e-9:
        delta_star = 0.0
        chi2_min = float(r @ (Cinv @ r))
    else:
        delta_star = B / A
        chi2_min = float(r @ (Cinv @ r) - (B**2)/A)
    return chi2_min, delta_star


def r_squared(y: np.ndarray, yhat: np.ndarray) -> float:
    ss_res = float(np.sum((y - yhat)**2))
    ss_tot = float(np.sum((y - np.mean(y))**2))
    if ss_tot == 0:
        return 0.0
    return 1.0 - ss_res/ss_tot

def aic_bic(chi2_min: float, C: np.ndarray, k_params: int) -> Tuple[float, float, float]:
    N = C.shape[0]
    logdet = safe_logdet(C)
    logL = -0.5 * (chi2_min + logdet + N * math.log(2*math.pi))
    AIC = 2*k_params - 2*logL
    BIC = k_params*math.log(N) - 2*logL
    return float(logL), float(AIC), float(BIC)

# --------------------------- Cosmology models ---------------------------

@dataclass
class CosmoModel:
    name: str
    fit_omegam: bool
    params: Dict[str, float] = field(default_factory=dict)
    def predict_mu(self, z: np.ndarray, H0: float = 70.0) -> np.ndarray:
        raise NotImplementedError

def E2_LCDM(z: np.ndarray, Om: float) -> np.ndarray:
    return Om*(1+z)**3 + (1-Om)

def E2_constw(z: np.ndarray, Om: float, w0: float) -> np.ndarray:
    return Om*(1+z)**3 + (1-Om)*(1+z)**(3*(1+w0))

def E2_w0wa(z: np.ndarray, Om: float, w0: float, wa: float) -> np.ndarray:
    g = (1+z)**(3*(1+w0+wa)) * np.exp(-3*wa*z/(1+z))
    return Om*(1+z)**3 + (1-Om)*g

def E2_binary(z: np.ndarray, Om: float) -> np.ndarray:
    # w(a) = -log2 a  ⇒  ρ_DE(z)/ρ_DE0 = (1+z)^3 * exp( (3/(2 ln 2)) [ln(1+z)]^2 )
    ln1pz = np.log(1+z)
    g = (1+z)**3 * np.exp((3.0/(2.0*math.log(2.0)))*(ln1pz**2))
    return Om*(1+z)**3 + (1-Om)*g

def distance_modulus(z: np.ndarray, E2_func, H0: float, Om: float, **kwargs) -> np.ndarray:
    # Handle empty input array
    if z.size == 0:
        return np.array([], dtype=z.dtype)

    # Ensure z is 1-dimensional
    z = z.flatten()

    # Integrate Dc(z) = c/H0 ∫ dz'/E(z'), on a shared fine grid (fast + stable)
    zmax = float(np.max(z))
    z_grid = np.linspace(0.0, max(zmax, 1e-6), 4000)

    if E2_func is E2_constw:
        w0 = kwargs.get("w0", -1.0)
        E2_grid = E2_constw(z_grid, Om, w0)
    elif E2_func is E2_w0wa:
        w0 = kwargs.get("w0", -1.0); wa = kwargs.get("wa", 0.0)
        E2_grid = E2_w0wa(z_grid, Om, w0, wa)
    elif E2_func is E2_binary:
        E2_grid = E2_binary(z_grid, Om)
    else:
        E2_grid = E2_LCDM(z_grid, Om)

    E_grid = np.sqrt(np.maximum(E2_grid, 1e-30))
    invE = 1.0/E_grid
    Dc_grid = np.zeros_like(z_grid)
    Dc_grid[1:] = np.cumsum(0.5*(invE[1:]+invE[:-1])*(z_grid[1:]-z_grid[:-1]))
    c_km_s = 299792.458
    Dc_grid *= c_km_s/H0

    Dc = np.interp(z, z_grid, Dc_grid)
    Dl = (1+z)*Dc
    mu = 5.0*np.log10(np.maximum(Dl, 1e-30)) + 25.0
    return mu

class LCDM(CosmoModel):
    def predict_mu(self, z, H0=70.0):
        Om = self.params.get("Omega_m", 0.3)
        return distance_modulus(z, E2_LCDM, H0, Om)

class ConstW(CosmoModel):
    def predict_mu(self, z, H0=70.0):
        Om = self.params.get("Omega_m", 0.3)
        w0 = self.params.get("w0", -1.0)
        return distance_modulus(z, E2_constw, H0, Om, w0=w0)

class W0Wa(CosmoModel):
    def predict_mu(self, z, H0=70.0):
        Om = self.params.get("Omega_m", 0.3)
        w0 = self.params.get("w0", -1.0); wa = self.params.get("wa", 0.0)
        return distance_modulus(z, E2_w0wa, H0, Om, w0=w0, wa=wa)

class BinaryW(CosmoModel):
    def predict_mu(self, z, H0=70.0):
        Om = self.params.get("Omega_m", 0.3)
        return distance_modulus(z, E2_binary, H0, Om)

# --------------------------- Data loaders (with covariance attempts) ---------------------------

def _try_load_cov_from_text(content: bytes, N: int) -> Optional[np.ndarray]:
    """Best-effort parse of a whitespace covariance file; supports full matrix or triangular flatten."""
    try:
        lines = [ln for ln in content.decode("utf-8", errors="ignore").splitlines() if ln.strip() and not ln.strip().startswith("#")]
        arr = []
        for ln in lines:
            try:
                arr.append([float(x) for x in ln.split()])
            except Exception:
                pass
        M = np.array(arr, dtype=float)
        if M.shape == (N, N):
            return M
        flat = M.reshape(-1)
        t = flat.size
        n = int((np.sqrt(1+8*t)-1)//2)
        if n == N and n*(n+1)//2 == t:
            C = np.zeros((N, N), float)
            k = 0
            for i in range(N):
                for j in range(i, N):
                    C[i, j] = C[j, i] = flat[k]; k += 1
            return C
        return None
    except Exception:
        return None

def load_union21(use_cov: bool=True) -> Tuple[pd.DataFrame, np.ndarray]:
    # Primary table
    tbl = http_get("https://supernova.lbl.gov/Union/figures/SCPUnion2.1_mu_vs_z.txt")
    if tbl is None:
        raise RuntimeError("Union2.1 table download failed.")
    rows = []
    for ln in tbl.decode("utf-8").splitlines():
        if ln.startswith("#") or not ln.strip(): continue
        p = ln.split()
        if len(p) >= 4:
            rows.append((float(p[1]), float(p[2]), float(p[3])))
    df = pd.DataFrame(rows, columns=["z","mu","mu_err"]).sort_values("z").reset_index(drop=True)
    N = len(df)
    C = None
    if use_cov:
        stat = http_get("https://supernova.lbl.gov/Union/figures/SCPUnion2.1_covmat_stat.txt")
        sysm = http_get("https://supernova.lbl.gov/Union/figures/SCPUnion2.1_covmat_sys.txt")
        if stat is not None and sysm is not None:
            C_stat = _try_load_cov_from_text(stat, N)
            C_sys  = _try_load_cov_from_text(sysm, N)
            if C_stat is not None and C_sys is not None and C_stat.shape==(N,N) and C_sys.shape==(N,N):
                C = C_stat + C_sys
    if C is None:
        C = np.diag(np.maximum(df["mu_err"].values, 1e-9)**2)
    return df, C

def load_jla(use_cov: bool=True) -> Tuple[pd.DataFrame, np.ndarray]:
    # Minimal JLA μ_B table; covmat attempt (many mirrors; best-effort)
    tbl = http_get("http://supernovae.in2p3.fr/sdss_snls_jla/jla_mub.txt")
    if tbl is None:
        raise RuntimeError("JLA mub download failed.")
    rows = []
    for ln in tbl.decode("utf-8").splitlines():
        if ln.startswith("#") or not ln.strip(): continue
        p = ln.split()
        if len(p) >= 3:
            rows.append((float(p[0]), float(p[1]), float(p[2])))
    df = pd.DataFrame(rows, columns=["z","mu","mu_err"]).sort_values("z").reset_index(drop=True)
    N = len(df)
    C = None
    if use_cov:
        for url in [
            "http://supernovae.in2p3.fr/sdss_snls_jla/uncertainties/covmat_v6.dat",
            "http://supernovae.in2p3.fr/sdss_snls_jla/covmat_v6.dat",
            "http://supernovae.in2p3.fr/sdss_snls_jla/jla_cov_full.txt",
        ]:
            blob = http_get(url)
            if blob is not None:
                M = _try_load_cov_from_text(blob, N)
                if M is not None and M.shape==(N,N):
                    C = M; break
    if C is None:
        C = np.diag(np.maximum(df["mu_err"].values, 1e-9)**2)
    return df, C

# --------------------------- Fitting & CV ---------------------------

@dataclass
class FitResult:
    model: str
    dataset: str
    params: Dict[str, float]
    delta: float
    chi2: float
    dof: int
    red_chi2: float
    logL: float
    AIC: float
    BIC: float
    R2: float
    N: int
    cv_mean: float
    cv_std: float

def _perform_fit(model: CosmoModel, df: pd.DataFrame, C: np.ndarray, H0: float) -> Tuple[Dict[str, float], float, float]:
    """Helper function to perform the model fitting and return parameters, chi2, and delta."""
    z, mu = df["z"].values, df["mu"].values
    N = len(z)
    jitter = 1e-10 * float(np.mean(np.diag(C)))
    C_use = C + jitter*np.eye(N)
    Cinv = np.linalg.inv(C_use)

    # parameter vector (Ωm, optional w0, wa)
    p0, bounds = [], []
    if model.fit_omegam:
        p0.append(model.params.get("Omega_m", 0.3)); bounds.append((0.01, 0.99))
    if isinstance(model, ConstW):
        p0.append(model.params.get("w0", -1.0)); bounds.append((-3.0, 0.0))
    if isinstance(model, W0Wa):
        p0.append(model.params.get("w0", -1.0)); bounds.append((-3.0, 0.0))
        p0.append(model.params.get("wa",  0.0)); bounds.append((-3.0, 3.0))

    def make_pred(theta):
        i = 0
        pars = dict(model.params)
        if model.fit_omegam:
            pars["Omega_m"] = float(theta[i]); i += 1
        if isinstance(model, ConstW):
            pars["w0"] = float(theta[i]); i += 1
        if isinstance(model, W0Wa):
            pars["w0"] = float(theta[i]); i += 1
            pars["wa"] = float(theta[i]); i += 1
        mdl = type(model)(name=model.name, fit_omegam=model.fit_omegam, params=pars)
        return mdl.predict_mu(z, H0=H0), pars

    def obj(theta):
        mu_pred, _ = make_pred(theta)
        chi2_min, _ = analytic_delta_chi2(mu_obs=mu, mu_model=mu_pred, Cinv=Cinv)
        return chi2_min

    if len(p0):
        res = minimize(obj, x0=np.array(p0, dtype=float), bounds=bounds, method="L-BFGS-B")
        theta = res.x
    else:
        theta = np.array(p0, dtype=float)

    mu_pred, pars = make_pred(theta)
    chi2_min, delta = analytic_delta_chi2(mu_obs=mu, mu_model=mu_pred, Cinv=Cinv)

    return pars, chi2_min, delta


def fit_model(model: CosmoModel, df: pd.DataFrame, C: np.ndarray, H0: float=70.0) -> FitResult:
    z, mu = df["z"].values, df["mu"].values
    N = len(z)
    jitter = 1e-10 * float(np.mean(np.diag(C)))
    C_use = C + jitter*np.eye(N)


    # Perform the initial fit on the full dataset
    pars, chi2_min, delta = _perform_fit(model, df, C_use, H0)

    k = len(pars) + 1  # +Δ
    dof = max(1, N - k)
    logL, AIC, BIC = aic_bic(chi2_min, C_use, k)
    R2 = r_squared(mu, model.predict_mu(z, H0=H0) + delta)

    # 5-fold CV on reduced χ²
    rng = default_rng(12345)
    idx = np.arange(N); rng.shuffle(idx)
    folds = np.array_split(idx, 5)
    cv_vals = []
    for i in range(5):
        val = folds[i]
        train = np.concatenate([folds[j] for j in range(5) if j != i])
        df_tr, C_tr = df.iloc[train].reset_index(drop=True), C_use[np.ix_(train, train)]
        df_va, C_va = df.iloc[val].reset_index(drop=True),  C_use[np.ix_(val, val)]

        # Only perform CV fit if training set has more than one element
        if len(train) > 1:
            try:
                res_tr_pars, _, _ = _perform_fit(type(model)(name=model.name, fit_omegam=model.fit_omegam, params=model.params), df_tr, C_tr, H0)
                # evaluate on val using train params (no leakage)
                mdl_val = type(model)(name=model.name, fit_omegam=model.fit_omegam, params=res_tr_pars)
                mu_pred_va = mdl_val.predict_mu(df_va["z"].values, H0=H0)
                # Only calculate chi2 if validation set has more than one element
                if len(val) > 1:
                    Cinv_va = np.linalg.inv(C_va)
                    chi2_va, _ = analytic_delta_chi2(df_va["mu"].values, mu_pred_va, Cinv_va)
                    dof_va = max(1, len(val) - (len(res_tr_pars) + 1))
                    cv_vals.append(chi2_va/dof_va)
                else:
                     cv_vals.append(np.nan) # Append NaN for folds with 0 or 1 element
            except Exception:
                 cv_vals.append(np.nan) # Append NaN if fitting on training set fails
        else:
            cv_vals.append(np.nan) # Append NaN if training set has 0 or 1 element


    cv_vals = np.array(cv_vals, float)
    # Filter out NaN values before calculating mean and std
    cv_vals_filtered = cv_vals[~np.isnan(cv_vals)]
    cv_mean = float(np.mean(cv_vals_filtered)) if cv_vals_filtered.size > 0 else np.nan
    cv_std = float(np.std(cv_vals_filtered)) if cv_vals_filtered.size > 0 else np.nan


    return FitResult(
        model=model.name, dataset="", params=pars, delta=float(delta), chi2=float(chi2_min),
        dof=int(dof), red_chi2=float(chi2_min/dof), logL=float(logL), AIC=float(AIC), BIC=float(BIC),
        R2=float(R2), N=N, cv_mean=float(cv_mean), cv_std=float(cv_std)
    )

# --------------------------- Plotting ---------------------------

def panel_plot(df: pd.DataFrame, preds: Dict[str, Dict[str, np.ndarray]], out_png: str):
    z, mu, mu_err = df["z"].values, df["mu"].values, df["mu_err"].values
    fig = plt.figure(figsize=(8, 10))

    ax1 = fig.add_subplot(2, 1, 1)
    ax1.errorbar(z, mu, yerr=mu_err, fmt=".", alpha=0.5, markersize=3)
    for name, d in preds.items():
        ax1.plot(z, d["mu_pred"] + d["delta"])
    ax1.set_xlabel("Redshift z"); ax1.set_ylabel("Distance modulus μ")
    ax1.set_title("Supernova Hubble diagram"); ax1.grid(True, alpha=0.3)
    ax1.legend(list(preds.keys()))

    ax2 = fig.add_subplot(2, 1, 2)
    for name, d in preds.items():
        res = mu - (d["mu_pred"] + d["delta"])
        ax2.scatter(z, res, s=6, alpha=0.6, label=name)
    ax2.axhline(0.0, linestyle="--")
    ax2.set_xlabel("Redshift z"); ax2.set_ylabel("Residual μ_obs − μ_model")
    ax2.set_title("Residuals (per model, after intercept fit)"); ax2.grid(True, alpha=0.3)
    ax2.legend(list(preds.keys()))

    fig.tight_layout(); fig.savefig(out_png, dpi=200, bbox_inches="tight"); plt.close(fig)

# --------------------------- Main runner ---------------------------

def run_everything(
    datasets=("union","jla"),
    use_cov=True,
    models=("lcdm","constw","binary","w0wa"),
    fit_omegam=True,
    H0=70.0,
    outdir="/content/out"
):
    os.makedirs(outdir, exist_ok=True)
    # Load datasets
    ds_list = []
    for token in datasets:
        try:
            if token.lower()=="union":
                df, C = load_union21(use_cov=use_cov); ds_list.append(("Union2.1", df, C))
            elif token.lower()=="jla":
                df, C = load_jla(use_cov=use_cov); ds_list.append(("JLA", df, C))
            else:
                print(f"[WARN] Unknown dataset token: {token}")
        except Exception as e:
            print(f"[WARN] Failed to load {token}: {e}")

    if not ds_list:
        raise RuntimeError("No datasets loaded.")

    # Build model objects
    mdl_objs = []
    if "lcdm" in models:
        mdl_objs.append(LCDM(name="ΛCDM", fit_omegam=fit_omegam, params={"Omega_m":0.3}))
    if "constw" in models:
        mdl_objs.append(ConstW(name="Const‑w", fit_omegam=fit_omegam, params={"Omega_m":0.3, "w0":-1.0}))
    if "w0wa" in models:
        mdl_objs.append(W0Wa(name="w0–wₐ", fit_omegam=fit_omegam, params={"Omega_m":0.3, "w0":-1.0, "wa":0.0}))
    if "binary" in models:
        mdl_objs.append(BinaryW(name="Binary w(a)=−log₂a", fit_omegam=fit_omegam, params={"Omega_m":0.3}))
    if not mdl_objs:
        raise RuntimeError("No models selected.")

    rows = []
    for (ds_name, df, C) in ds_list:
        print(f"\n=== DATASET: {ds_name} (N={len(df)}) ===")
        preds = {}
        for mdl in mdl_objs:
            print(f"Fitting {mdl.name} ...")
            res = fit_model(mdl, df, C, H0=H0)
            res.dataset = ds_name
            # store
            mdl_fit = type(mdl)(name=mdl.name, fit_omegam=mdl.fit_omegam, params=res.params)
            mu_pred = mdl_fit.predict_mu(df["z"].values, H0=H0)
            preds[mdl.name] = {"mu_pred": mu_pred, "delta": res.delta}
            rows.append({
                "dataset": ds_name,
                "model": mdl.name,
                **{f"param_{k}": v for k,v in res.params.items()},
                "delta": res.delta,
                "chi2": res.chi2,
                "dof": res.dof,
                "red_chi2": res.red_chi2,
                "logL": res.logL,
                "AIC": res.AIC,
                "BIC": res.BIC,
                "R2": res.R2,
                "cv_red_chi2_mean": res.cv_mean,
                "cv_red_chi2_std": res.cv_std,
                "N": res.N
            })
        # per‑dataset plot
        fig_path = os.path.join(outdir, f"hubble_{ds_name.replace(' ','_')}.png")
        panel_plot(df, preds, fig_path)
        print(f"Saved figure: {fig_path}")

    # Results table with ΔAIC/BIC ranks
    df_res = pd.DataFrame(rows)
    df_res["rank_by_AIC"] = df_res.groupby("dataset")["AIC"].rank(method="min")
    df_res["delta_AIC"]  = df_res["AIC"] - df_res.groupby("dataset")["AIC"].transform("min")
    df_res["rank_by_BIC"] = df_res.groupby("dataset")["BIC"].rank(method="min")
    df_res["delta_BIC"]  = df_res["BIC"] - df_res.groupby("dataset")["BIC"].transform("min")
    csv_path = os.path.join(outdir, "results_summary.csv")
    df_res.to_csv(csv_path, index=False)
    print(f"\nWrote results: {csv_path}")
    print("\nPrimary verdict rule: rank by reduced χ² and ΔAIC (≤ 2 ≈ indistinguishable). R² is descriptive only.")
    display_cols = ["dataset","model","chi2","red_chi2","AIC","delta_AIC","BIC","delta_BIC","R2","cv_red_chi2_mean","cv_red_chi2_std"]
    try:
        from IPython.display import display
        display(df_res[display_cols].round(5))
    except Exception:
        print(df_res[display_cols].round(5))

# --------------------------- Execute with robust defaults ---------------------------
run_everything(
    datasets=("union","jla"),
    use_cov=True,              # try full covariance; diagonal fallback happens automatically
    models=("lcdm","constw","binary","w0wa"),
    fit_omegam=True,
    H0=70.0,
    outdir="/content/out"
)

[WARN] Failed to load jla: JLA mub download failed.

=== DATASET: Union2.1 (N=580) ===
Fitting ΛCDM ...
Fitting Const‑w ...
Fitting w0–wₐ ...
Fitting Binary w(a)=−log₂a ...
Saved figure: /content/out/hubble_Union2.1.png

Wrote results: /content/out/results_summary.csv

Primary verdict rule: rank by reduced χ² and ΔAIC (≤ 2 ≈ indistinguishable). R² is descriptive only.


Unnamed: 0,dataset,model,chi2,red_chi2,AIC,delta_AIC,BIC,delta_BIC,R2,cv_red_chi2_mean,cv_red_chi2_std
0,Union2.1,ΛCDM,562.22663,0.97271,-233.48056,0.0,-224.7545,0.0,0.99295,0.98093,0.09651
1,Union2.1,Const‑w,562.22422,0.97439,-231.48297,1.99758,-218.39389,6.36061,0.99295,0.99145,0.09573
2,Union2.1,w0–wₐ,562.22455,0.97608,-229.48264,3.99792,-212.03053,12.72398,0.99295,1.00265,0.09461
3,Union2.1,Binary w(a)=−log₂a,1179.0872,2.03994,383.38001,616.86057,392.10607,616.86057,0.98275,2.05583,0.22179
