In [None]:
# ============================= PATH SETUP =============================
# Choose ONE of the following blocks depending on where you run the notebook.
# ------------------------------------------------------------------------------

from pathlib import Path
import sys

# ------------------------------------------------------------------------------
# OPTION A — Google Colab
# ------------------------------------------------------------------------------
# If you are running on Google Colab, UNCOMMENT this block.

"""
from google.colab import drive
drive.mount("/content/drive")

ROOT = Path("/content/drive/MyDrive/mbrl-testing-frameworks-empirical-study")
"""

# ------------------------------------------------------------------------------
# OPTION B — Local machine (GitHub clone)
# ------------------------------------------------------------------------------
# If you are running locally (e.g., after `git clone`), UNCOMMENT this block.

ROOT = Path("mbrl-testing-frameworks-empirical-study")

# ------------------------------------------------------------------------------
# Shared paths (DO NOT EDIT BELOW)
# ------------------------------------------------------------------------------

AGENT_ROOT  = ROOT / "agents" / "muzero" / "lunalander"
RESULTS_DIR = ROOT / "results" / "lunalander"
TABLES_DIR  = RESULTS_DIR / "tables"
FIGS_DIR    = RESULTS_DIR / "figs"
RAW_DIR     = RESULTS_DIR / "raw_single"

RESULTS_DIR.mkdir(parents=True, exist_ok=True)
TABLES_DIR.mkdir(parents=True, exist_ok=True)
FIGS_DIR.mkdir(parents=True, exist_ok=True)
RAW_DIR.mkdir(parents=True, exist_ok=True)

if str(AGENT_ROOT) not in sys.path:
    sys.path.insert(0, str(AGENT_ROOT))

print("AGENT_ROOT :", AGENT_ROOT)
print("RESULTS_DIR:", RESULTS_DIR)

Mounted at /content/drive
AGENT_ROOT : /content/drive/MyDrive/mbrl-testing-frameworks-empirical-study/agents/muzero/lunalander
RESULTS_DIR: /content/drive/MyDrive/mbrl-testing-frameworks-empirical-study/results/lunalander


In [None]:
from pathlib import Path
import os
import random
import numpy as np

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
AGENT_ROOT : /content/drive/MyDrive/mbrl-testing-frameworks-empirical-study/agents/muzero/lunalander
RESULTS_DIR: /content/drive/MyDrive/mbrl-testing-frameworks-empirical-study/results/lunalander


In [None]:
# =========================================================
# Load MuZero-General (shared dependency, reproducible)
# =========================================================
import importlib

# -------------------------
# Local MuZero-General repo
# -------------------------
LOCAL_MUZERO = Path("external/muzero-general")

if not LOCAL_MUZERO.exists():
    print("Cloning MuZero-General...")
    !git clone --depth=1 https://github.com/werner-duvaud/muzero-general.git external/muzero-general

assert LOCAL_MUZERO.exists(), "MuZero-General repository not found"

# -------------------------
# Add to Python path
# -------------------------
if str(LOCAL_MUZERO) not in sys.path:
    sys.path.insert(0, str(LOCAL_MUZERO))

# -------------------------
# Sanity import
# -------------------------
models = importlib.import_module("models")
print("✓ MuZero-General loaded from", LOCAL_MUZERO)

✓ MuZero-General loaded from external/muzero-general


In [None]:
# NumPy compatibility hotfix (MuZero-General)
# =========================================================

import numpy as np

if not hasattr(np, "bool8"):
    np.bool8 = np.bool_

In [None]:
# Global imports & LunarLander agent root (reviewer version)
# =========================================================

import os
import sys
import time
import json
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Trained MuZero LunarLander agent
# -------------------------
assert AGENT_ROOT.exists(), f"AGENT_ROOT not found: {AGENT_ROOT}"
print("AGENT_ROOT:", AGENT_ROOT)

print("Agent files:", sorted([p.name for p in AGENT_ROOT.iterdir()]))

RAW_DIR = RESULTS_DIR / "raw_single"
RAW_DIR.mkdir(exist_ok=True)

print("outputs:")
print("  RAW_DIR   :", RAW_DIR)
print("  TABLES_DIR:", TABLES_DIR)
print("  FIGS_DIR  :", FIGS_DIR)

AGENT_ROOT: /content/drive/MyDrive/mbrl-testing-frameworks-empirical-study/agents/muzero/lunalander
Agent files: ['model.checkpoint']
outputs:
  RAW_DIR   : /content/drive/MyDrive/mbrl-testing-frameworks-empirical-study/results/lunalander/raw_single
  TABLES_DIR: /content/drive/MyDrive/mbrl-testing-frameworks-empirical-study/results/lunalander/tables
  FIGS_DIR  : /content/drive/MyDrive/mbrl-testing-frameworks-empirical-study/results/lunalander/figs


In [None]:
# Theme, colors, and global constants (LunarLander)
# =========================================================

import matplotlib.pyplot as plt

# Plotting theme (shared across domains)
# -------------------------
plt.rcParams.update({
    "figure.figsize": (8.8, 3.7),
    "font.size": 11,
    "axes.titlesize": 12,
    "axes.labelsize": 11,
    "legend.fontsize": 9,
    "axes.spines.top": False,
    "axes.spines.right": False,
    "axes.grid": True,
    "grid.alpha": 0.25,
    "legend.frameon": False,
})

# Unified framework color palette
# -------------------------
FRAMEWORK_COLORS = {
    # C1
    "AdaStop":      "#808000",
    "F-Oracle":     "#000080",
    "CUSUM-RL":     "#DDA0DD",
    "DLBT":         "#008000",
    "MDPMorph":     "#B56576",

    # C2
    "QD-Testing":   "#580F41",
    "STARLA":       "#069AF3",
    "Indago":       "#40E0D0",
    "AlphaTest":    "#9A0EEA",

    # C3
    "NR-RL":        "#9ACD32",
    "SBF":          "#FE420F",
    "MDPFuzz":      "#EE82EE",
    "pi-Fuzz":      "#808080",
    "GMBT":         "#C79FEF",
    "CureFuzz":     "#E50000",

    # C4
    "RLMutation":   "#15B01A",
    "FS":           "#DAA520",

    # Extra
    "μPRL":         "#3357A6",
}

def fw_color(name: str) -> str:
    return FRAMEWORK_COLORS.get(str(name), "#777777")

# Global experiment constants
# -------------------------
EPISODE_BUDGET = 300    # episodes per framework
MAX_STEPS      = 1000    # LunarLander-v2 horizon
SEED_BASE      = 2025    # base seed for reproducibility

print(
    "Globals set:",
    "BUDGET =", EPISODE_BUDGET,
    "| MAX_STEPS =", MAX_STEPS,
    "| SEED_BASE =", SEED_BASE,
)

Globals set: BUDGET = 300 | MAX_STEPS = 1000 | SEED_BASE = 2025


In [None]:
# Force 'games' to be a real package (MuZero-General)
# =========================================================

import sys
import types
import importlib
from pathlib import Path

# MuZero-General games path
# -------------------------
MUZERO_ROOT = Path("external/muzero-general")
GAMES_DIR   = MUZERO_ROOT / "games"

assert MUZERO_ROOT.is_dir(), f"MUZERO_ROOT not found: {MUZERO_ROOT}"
assert GAMES_DIR.is_dir(),   f"GAMES_DIR not found: {GAMES_DIR}"

# Ensure MUZERO_ROOT is on sys.path
# -------------------------
if str(MUZERO_ROOT) not in sys.path:
    sys.path.insert(0, str(MUZERO_ROOT))

# Remove stale 'games' modules
# -------------------------
for name in list(sys.modules.keys()):
    if name == "games" or name.startswith("games."):
        del sys.modules[name]

# Register 'games' as a proper package
# -------------------------
games_pkg = types.ModuleType("games")
games_pkg.__path__ = [str(GAMES_DIR)]
sys.modules["games"] = games_pkg

In [None]:
# Install system dependency (swig) and Python Box2D bindings
!apt-get update -qq
!apt-get install -y -qq swig
!pip install -q box2d-py

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [None]:
# Sanity import (LunarLander)
# -------------------------
import games
lunarlander_mod = importlib.import_module("games.lunarlander")

print("✓ 'games' package registered at:", games_pkg.__path__)
print("✓ Imported games.lunarlander from:", lunarlander_mod.__file__)

✓ 'games' package registered at: ['external/muzero-general/games']
✓ Imported games.lunarlander from: /content/external/muzero-general/games/lunarlander.py


Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.
  return datetime.utcnow().replace(tzinfo=utc)


In [None]:


import torch
import numpy as np
from torch.serialization import add_safe_globals

add_safe_globals([np.core.multiarray.scalar])

_orig_torch_load = torch.load

def _patched_torch_load(f, *args, **kwargs):
    kwargs.setdefault("weights_only", False)
    return _orig_torch_load(f, *args, **kwargs)

torch.load = _patched_torch_load

In [None]:
!pip install -q nevergrad ray

In [None]:
# MuZero LunarLander agent
# =========================================================

import os
import re
import importlib
import numpy as np
import torch

import models as muzero_models
from self_play import MCTS, SelfPlay

# Import LunarLander Game & Config
# -------------------------
lunar_mod = importlib.import_module("games.lunarlander")
Game = lunar_mod.Game
MuZeroConfig = lunar_mod.MuZeroConfig

print("✓ Imported games.lunarlander.Game and MuZeroConfig")

# Environment helper\
# -------------------------
def make_lunar_env(seed: int = None):
    if seed is None:
        seed = 0
    return Game(seed)

def safe_reset(env):
    obs = env.reset()
    return np.asarray(obs, dtype=np.float32)

def safe_step(env, action: int):
    out = env.step(int(action))
    if isinstance(out, tuple) and len(out) >= 3:
        obs, reward, done = out[0], out[1], out[2]
        info = out[3] if len(out) > 3 else {}
    else:
        raise RuntimeError(f"Unexpected Game.step output: {out}")
    return (
        np.asarray(obs, dtype=np.float32),
        float(reward),
        bool(done),
        info or {},
    )

# Checkpoint
# -------------------------
def _find_best_checkpoint(agent_root):
    """
    Recursively find the MuZero checkpoint with the largest step number.
    """
    agent_root = os.fspath(agent_root)
    candidates = []

    for dirpath, _, filenames in os.walk(agent_root):
        for fn in filenames:
            if fn.endswith(".checkpoint"):
                full = os.path.join(dirpath, fn)
                nums = [int(x) for x in re.findall(r"\d+", fn)]
                step = max(nums) if nums else 0
                candidates.append((step, full))

    if not candidates:
        raise FileNotFoundError(f"No .checkpoint files found under {agent_root}")

    candidates.sort(key=lambda x: x[0], reverse=True)
    step, path = candidates[0]
    print(f"✓ Using checkpoint: step={step} @ {path}")
    return path

# Load MuZero network directly
# -------------------------
def load_muzero_lunarlander(agent_root=AGENT_ROOT):
    config = MuZeroConfig()

    ckpt_path = _find_best_checkpoint(agent_root)

    checkpoint = torch.load(ckpt_path, map_location="cpu")
    assert "weights" in checkpoint, "Checkpoint missing 'weights'"

    net = muzero_models.MuZeroNetwork(config)
    net.set_weights(checkpoint["weights"])

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    net.to(device).eval()

    print("✓ MuZero LunarLander network loaded on", device)
    return config, net, device

# MuZero MCTS policy
# -------------------------
def muzero_policy_factory(seed: int = 0):
    config, net, device = load_muzero_lunarlander(AGENT_ROOT)
    mcts = MCTS(config)

    action_space = list(config.action_space)
    to_play0 = config.players[0] if hasattr(config, "players") and config.players else 0

    def act(obs):
        arr = np.asarray(obs, dtype=np.float32)
        arr = arr.reshape(config.observation_shape)

        root, _ = mcts.run(
            net,
            arr,
            action_space,
            to_play0,
            add_exploration_noise=False,
        )
        return int(SelfPlay.select_action(root, temperature=0.0))

    return act

# Global policy hook (used by all frameworks)
# -------------------------
MUZERO_POLICY = muzero_policy_factory(seed=SEED_BASE)

def agent_act(obs):
    return MUZERO_POLICY(obs)

print("✓ MuZero LunarLander policy ready (MCTS, single target agent).")

✓ Imported games.lunarlander.Game and MuZeroConfig
✓ Using checkpoint: step=0 @ /content/drive/MyDrive/mbrl-testing-frameworks-empirical-study/agents/muzero/lunalander/model.checkpoint
✓ MuZero LunarLander network loaded on cpu
✓ MuZero LunarLander policy ready (MCTS, single target agent).


  return datetime.utcnow().replace(tzinfo=utc)


In [None]:
# Metrics
# =========================================================

import numpy as np

# Failure Rate
# -------------------------
def compute_FR(fails):
    a = np.asarray(fails, dtype=np.int32)
    return float(a.mean()) if len(a) else np.nan

# Time-to-Failure (tests)
# -------------------------
def compute_TTF_tests(fails):
    for i, f in enumerate(fails, start=1):
        if int(f) == 1:
            return float(i)
    return float(len(fails)) if len(fails) else np.nan

# Time-to-Failure (seconds)
# -------------------------
def compute_TTF_seconds(fails, times):
    n = len(fails)
    if n == 0:
        return np.nan
    t = np.asarray(times, dtype=np.float64)
    c = np.cumsum(t)
    for i, f in enumerate(fails):
        if int(f) == 1:
            return float(c[i])
    return float(c[-1])

# Total execution time
# -------------------------
def compute_total_seconds(times):
    t = np.asarray(times, dtype=np.float64)
    return float(t.sum()) if t.size else np.nan

# APFD metrics
# -------------------------
def compute_APFD(fails):
    n = len(fails)
    if n == 0:
        return np.nan
    pos = [i + 1 for i, f in enumerate(fails) if int(f) == 1]
    m = len(pos)
    if m == 0:
        return 1.0 / (2.0 * n)
    return 1.0 - (sum(pos) / (n * m)) + (1.0 / (2.0 * n))

def compute_APFD_time(fails, times):
    n = len(fails)
    if n == 0:
        return np.nan
    t = np.asarray(times, dtype=np.float64)
    T = float(t.sum()) if t.sum() > 0 else float(n)
    c = np.cumsum(t)
    pos_time = [float(c[i]) for i, f in enumerate(fails) if int(f) == 1]
    m = len(pos_time)
    if m == 0:
        return 1.0 / (2.0 * T)
    return 1.0 - (sum(pos_time) / (m * T)) + (1.0 / (2.0 * T))

print("✓ Metric functions ready (single-run LunarLander).")

✓ Metric functions ready (single-run LunarLander).


In [None]:
# Failure oracle
# =========================================================

# Thresholds calibrated to the trained MuZero agent
R_SUCCESS = 50.0     # success if final episode return >= R_SUCCESS
R_FAIL_FLOOR = -150.0  # reserved for future use (not active in this study)

def failure_oracle_lander(
    cum_rew: float,
    t: int,
    done: bool,
    info: dict,
    *,
    MAX_STEPS: int = MAX_STEPS,
    R_SUCCESS: float = R_SUCCESS,
):

    cum_rew = float(cum_rew)

    if done or (t >= MAX_STEPS):
        return cum_rew < R_SUCCESS

    return False


ORACLE_KW = dict(
    MAX_STEPS=MAX_STEPS,
    R_SUCCESS=R_SUCCESS,
)

print(
    "✓ LunarLander failure oracle ready:",
    f"R_SUCCESS={R_SUCCESS}, MAX_STEPS={MAX_STEPS}"
)

✓ LunarLander failure oracle ready: R_SUCCESS=50.0, MAX_STEPS=1000


In [None]:
# Episode harness
# =========================================================

import time

def run_one_episode(
    env,
    choose_action_fn,
    *,
    tmax=MAX_STEPS,
    oracle_kwargs=None,
):
    if oracle_kwargs is None:
        oracle_kwargs = ORACLE_KW

    obs = safe_reset(env)
    cum, t = 0.0, 0
    t0 = time.time()

    while True:
        a = int(choose_action_fn(obs))
        obs, r, done, info = safe_step(env, a)
        cum += r
        t += 1

        if done or (t >= tmax):
            fail = failure_oracle_lander(
                cum,
                t,
                done,
                info,
                **oracle_kwargs,
            )
            break

    return {
        "return":  float(cum),
        "steps":   int(t),
        "failure": int(fail),
        "wall_dt": float(time.time() - t0),
    }

print("✓ Episode harness ready.")

✓ Episode harness ready.


In [None]:
# Helpers for raw logs and summary tables
# =========================================================

def _save_raw(framework, rows):

    df = pd.DataFrame(rows)
    path = RAW_DIR / f"{framework}_episodes.csv"
    df.to_csv(path, index=False)
    return str(path)


def summarize_framework(framework, rows):

    fails = [int(r.get("failure", 0)) for r in rows]
    times = [float(r.get("wall_dt", 0.0)) for r in rows]

    return {
        "framework": framework,
        "FR": compute_FR(fails),
        "TTF_tests": compute_TTF_tests(fails),
        "TTF_seconds": compute_TTF_seconds(fails, times),
        "total_seconds": compute_total_seconds(times),
        "budget": len(fails),
        "APFD": compute_APFD(fails),
        "APFD_time": compute_APFD_time(fails, times),
    }

In [None]:
#CUSUM-RL dependency (DecisionTreeRegressor)
# =========================================================

import numpy as np

try:
    from sklearn.tree import DecisionTreeRegressor
except Exception:
    class DecisionTreeRegressor:  # type: ignore
        def __init__(self, max_depth=None, min_samples_leaf=1, random_state=0):
            self.w = None

        def fit(self, X, y):
            X = np.asarray(X, dtype=np.float64)
            y = np.asarray(y, dtype=np.float64).reshape(-1)
            X1 = np.c_[X, np.ones((X.shape[0], 1))]
            lam = 1e-4
            XtX = X1.T @ X1 + lam * np.eye(X1.shape[1])
            self.w = np.linalg.solve(XtX, X1.T @ y)

        def predict(self, X):
            X = np.asarray(X, dtype=np.float64)
            X1 = np.c_[X, np.ones((X.shape[0], 1))]
            if self.w is None:
                return np.zeros((X1.shape[0],), dtype=np.float64)
            return X1 @ self.w

  return datetime.utcnow().replace(tzinfo=utc)


In [None]:
# AdaStop
# =========================================================
from scipy.stats import permutation_test

def _perm_pvalue(x, y, reps=2000):
    res = permutation_test(
        (np.asarray(x), np.asarray(y)),
        statistic=lambda a, b: np.mean(a) - np.mean(b),
        permutation_type="independent",
        n_resamples=int(reps),
        alternative="two-sided",
    )
    return float(res.pvalue)


def run_AdaStop_single(
    variant="strong",
    budget=EPISODE_BUDGET,
    interims=6,
    block=5,
    alpha=0.05,
):

    def pol_agent():
        env = make_lunar_env(seed=SEED_BASE + 801)
        r = run_one_episode(env, agent_act, tmax=MAX_STEPS)
        return [r["return"]]

    rng = np.random.RandomState(2042)

    def pol_noisy():
        env = make_lunar_env(seed=SEED_BASE + 802)

        def act(obs):
            if rng.rand() < 0.1:
                return int(rng.randint(0, 4))
            return agent_act(obs)

        r = run_one_episode(env, act, tmax=MAX_STEPS)
        return [r["return"]]

    x, y = [], []
    q = [alpha / interims] * interims
    rejects = []

    for k in range(interims):
        for _ in range(block):
            x.extend(pol_agent())
            y.extend(pol_noisy())

        p = _perm_pvalue(x, y, reps=1000)
        rej = int(p < q[k])
        rejects.append(rej)

        if rej:
            rejects.extend([0] * (interims - k - 1))
            break

    fails = [0] * budget
    if any(rejects):
        first = rejects.index(1)
        pos = max(1, int((first + 1) * budget / interims) - block + 1)
        fails[pos - 1] = 1

    rows = [
        {"failure": f, "steps": 1, "wall_dt": 0.0}
        for f in fails
    ]

    raw_path = _save_raw("AdaStop", rows)
    return summarize_framework("AdaStop", rows) | {"raw_path": raw_path}

In [None]:
# F-Oracle
# =========================================================

def run_FOracle_single(
    variant="strong",
    budget=EPISODE_BUDGET,
    epochs=30,
):

    comps = []

    for e in range(epochs):
        env = make_lunar_env(seed=SEED_BASE + 900 + e)
        r = run_one_episode(env, agent_act, tmax=MAX_STEPS)
        comps.append(
            np.clip(
                np.mean([r["return"]]) / 50.0,
                0.0,
                1.0,
            )
        )

    x = np.arange(len(comps))
    y = np.asarray(comps, dtype=float)

    A = np.vstack([x, np.ones_like(x)]).T
    slope, _ = np.linalg.lstsq(A, y, rcond=None)[0]

    fail_epoch = None
    if slope < 0:
        fail_epoch = 1
    else:
        for i in range(5, len(y)):
            w = y[i - 5 : i]
            if (w.max() - w.min()) <= 0.02 and (y[i] < w.min() - 0.10):
                fail_epoch = i + 1
                break

    fails = [0] * budget
    if fail_epoch is not None:
        pos = max(1, int(budget * fail_epoch / epochs))
        fails[pos - 1] = 1

    rows = [
        {"failure": f, "steps": 1, "wall_dt": 0.0}
        for f in fails
    ]

    raw_path = _save_raw("F-Oracle", rows)
    return summarize_framework("F-Oracle", rows) | {"raw_path": raw_path}

In [None]:
#  CUSUM-RL
# =========================================================

def fitted_q_iteration(samples, gamma=0.99, iters=20, max_leaf=60, depth=6):

    def phi(s, a):
        s = np.asarray(s, np.float32).reshape(-1)
        a_one = np.eye(4, dtype=np.float32)[int(a)]
        return np.concatenate([s, a_one], axis=0)

    X = np.array([phi(s, a) for (s, a, _, _) in samples])
    R = np.array([r for (_, _, r, _) in samples], float)
    S2 = [s2 for (*_, s2) in samples]

    Q = DecisionTreeRegressor(
        max_depth=depth,
        min_samples_leaf=max_leaf,
        random_state=0,
    )

    Y = R.copy()
    Q.fit(X, Y)

    for _ in range(iters):
        Y = R + gamma * np.max(
            np.stack(
                [
                    Q.predict(np.array([phi(s2, a) for a in range(4)]))
                    for s2 in S2
                ],
                axis=1,
            ),
            axis=0,
        )
        Q.fit(X, Y)

    def Qsa(s, a):
        return float(Q.predict(np.array([phi(s, a)]))[0])

    return Qsa


def _cusum_stats(Q1, Q2, states, actions):
    diffs = []
    for s in states:
        for a in actions:
            diffs.append(abs(Q1(s, a) - Q2(s, a)))
    diffs = np.asarray(diffs, float)
    TS1 = float(diffs.mean())
    TSinf = float(diffs.max()) if diffs.size else np.nan
    return TS1, TSinf


def _collect_offline(tuples=800, tmax=200):
    rng = np.random.RandomState(6123)

    env = make_lunar_env(seed=SEED_BASE + 333)
    obs = safe_reset(env)

    out = []
    for i in range(tuples):
        if rng.rand() < 0.10:
            a = int(rng.randint(0, 4))
        else:
            a = agent_act(obs)

        nobs, r, done, info = safe_step(env, a)
        out.append((obs, a, float(r), nobs))
        obs = nobs

        if done or ((i % tmax) == tmax - 1):
            env = make_lunar_env(seed=SEED_BASE + 333 + i)
            obs = safe_reset(env)

    return out


def run_CUSUMRL_single(variant="strong", budget=EPISODE_BUDGET):


    tuples = _collect_offline(tuples=800, tmax=150)
    n = len(tuples)

    us = np.linspace(int(0.1 * n), int(0.9 * n), num=6, dtype=int)
    S_probe = [tuples[i][0] for i in np.linspace(0, n - 1, 20, dtype=int)]
    A_probe = list(range(4))

    best_p1, best_pinf = 1.0, 1.0

    for u in us:
        left, right = tuples[:u], tuples[u:]

        QL = fitted_q_iteration(left)
        QR = fitted_q_iteration(right)

        TS1, TSinf = _cusum_stats(QL, QR, S_probe, A_probe)

        b_TS1, b_TSinf = [], []
        boots = 100

        for _ in range(boots):
            signs = np.random.choice([-1.0, 1.0], size=n, replace=True)

            Lb = [(s, a, r * signs[i], s2)
                  for i, (s, a, r, s2) in enumerate(left)]
            Rb = [(s, a, r * signs[u + i], s2)
                  for i, (s, a, r, s2) in enumerate(right)]

            QLb = fitted_q_iteration(Lb, iters=8)
            QRb = fitted_q_iteration(Rb, iters=8)

            t1, ti = _cusum_stats(QLb, QRb, S_probe, A_probe)
            b_TS1.append(t1)
            b_TSinf.append(ti)

        p1 = float((np.sum(np.array(b_TS1) >= TS1) + 1) / (boots + 1))
        pinf = float((np.sum(np.array(b_TSinf) >= TSinf) + 1) / (boots + 1))

        best_p1 = min(best_p1, p1)
        best_pinf = min(best_pinf, pinf)

    p = min(best_p1, best_pinf)

    if p < 0.01:
        pos = 1
    elif p < 0.05:
        pos = int(0.2 * budget)
    else:
        pos = budget

    fails = [0] * budget
    fails[pos - 1] = 1

    rows = [
        {"failure": f, "steps": 1, "wall_dt": 0.0}
        for f in fails
    ]

    raw_path = _save_raw("CUSUM-RL", rows)
    return summarize_framework("CUSUM-RL", rows) | {"raw_path": raw_path}

In [None]:
# Framework: DLBT
# =========================================================

import numpy as np
import time
import random
from collections import defaultdict

#
#  Abstract state
# ---------------------------------------------------------
def _abstract_state_lander(s, bins=5):

    arr = np.asarray(s, dtype=np.float32).ravel()
    if arr.shape[0] > 8:
        arr = arr[:8]

    arr = np.clip(arr, -2.0, 2.0)
    idx = np.floor((arr + 2.0) / 4.0 * bins).astype(int)
    idx = np.clip(idx, 0, bins - 1)
    return tuple(int(x) for x in idx)

# Learn abstract MDP from traces
# ---------------------------------------------------------
def _learn_abstract_mdp(traces, bins=5):
    trans = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))

    for seq in traces:
        if not seq:
            continue
        for (o, a, _), (o2, _, _) in zip(seq, seq[1:] + seq[-1:]):
            s  = _abstract_state_lander(o, bins=bins)
            s2 = _abstract_state_lander(o2, bins=bins)
            trans[s][int(a)][s2] += 1

    mdp = {}
    for s, a_dict in trans.items():
        mdp[s] = {}
        for a, next_counts in a_dict.items():
            total = sum(next_counts.values())
            if total > 0:
                mdp[s][a] = {s2: c / total for s2, c in next_counts.items()}

    print(f"✓ _learn_abstract_mdp: built MDP with {len(mdp)} abstract states")
    return mdp

# Reachability policy over abstract MDP
# ---------------------------------------------------------
def _reach_policy(mdp, targets, horizon=20):
    V = {s: (1.0 if s in targets else 0.0) for s in mdp.keys()}

    for _ in range(horizon):
        V_new = {}
        for s, a_dict in mdp.items():
            best = 0.0
            for a, trans in a_dict.items():
                val = sum(p * V.get(s2, 0.0) for s2, p in trans.items())
                best = max(best, val)
            V_new[s] = best
        V = V_new

    pi = {}
    for s, a_dict in mdp.items():
        best_a, best = 0, -1.0
        for a, trans in a_dict.items():
            val = sum(p * V.get(s2, 0.0) for s2, p in trans.items())
            if val > best:
                best, best_a = val, a
        pi[s] = best_a

    print(f"✓ _reach_policy: built policy for {len(pi)} states")
    return pi

# Collect traces from MuZero agent
# ---------------------------------------------------------
def _collect_traces_lander(episodes=40, max_steps=200):
    traces = []
    for ep in range(episodes):
        env = make_lunar_env(seed=SEED_BASE + 1200 + ep)
        obs = safe_reset(env)
        seq = []
        t = 0
        while True:
            a = int(agent_act(obs))
            nobs, r, done, info = safe_step(env, a)
            seq.append((obs, a, float(r)))
            obs = nobs
            t += 1
            if done or t >= max_steps:
                break
        traces.append(seq)
    print(f"✓ _collect_traces_lander: collected {len(traces)} traces")
    return traces

#  DLBT main run function
# ---------------------------------------------------------
def run_DLBT_single(variant="strong", budget=EPISODE_BUDGET):

    print(f"[DLBT] Starting run for variant={variant}, budget={budget}")

    traces = _collect_traces_lander(episodes=40, max_steps=200)
    mdp = _learn_abstract_mdp(traces)

    risky_states = {
        s for s in mdp.keys()
        if (len(s) >= 4 and (s[2] >= 3 or s[3] >= 3))
    }
    if not risky_states:
        print("[DLBT] No risky states found; using all states.")
        risky_states = set(mdp.keys())

    pi_r = _reach_policy(mdp, risky_states, horizon=20)
    pi_keys = list(pi_r.keys())

    fails = []
    wall  = []

    for epi in range(budget):
        env = make_lunar_env(seed=SEED_BASE + 1300 + epi)
        obs = safe_reset(env)
        cum, t = 0.0, 0
        t0 = time.time()
        done = False

        max_prefix = 60
        while (not done) and (t < max_prefix):
            s_abs = _abstract_state_lander(obs)
            if s_abs in pi_r:
                a = int(pi_r[s_abs])
            elif pi_keys:
                a = int(pi_r[random.choice(pi_keys)])
            else:
                a = int(env.action_space.sample())

            obs, r, done, info = safe_step(env, a)
            cum += r
            t += 1

        while (not done) and (t < MAX_STEPS):
            a = int(agent_act(obs))
            obs, r, done, info = safe_step(env, a)
            cum += r
            t += 1

        fail = failure_oracle_lander(cum, t, done, info, **ORACLE_KW)
        fails.append(int(fail))
        wall.append(time.time() - t0)

        if (epi + 1) % 100 == 0 or epi == budget - 1:
            print(f"[DLBT] Episode {epi+1}/{budget} done. FR_episode={fails[-1]}")

    rows = [
        {"failure": int(f), "steps": 1, "wall_dt": float(dt)}
        for f, dt in zip(fails, wall)
    ]

    raw_path = _save_raw("DLBT", rows)
    summary = summarize_framework("DLBT", rows)
    summary["raw_path"] = raw_path

    print(
        f"[DLBT] Summary: FR={summary['FR']:.4f}, "
        f"TTF_tests={summary['TTF_tests']}, "
        f"total_seconds={summary['total_seconds']:.2f}"
    )

    return summary

In [None]:
# RLMutation, μPRL, MDPMorph, MDPFuzz
# =========================================================

import time
import numpy as np

# RLMutation
# ---------------------------------------------------------
def run_RLMutation_single(variant="strong", budget=EPISODE_BUDGET):

    rng = np.random.RandomState(777)

    def step_RN(env, a):
        obs, r, done, info = safe_step(env, a)
        if (r > 0) and (rng.rand() < 0.2):
            r = float(r + rng.normal(0, 2.0))
        return obs, float(r), bool(done), info or {}

    def step_M(env, a):
        obs, r, done, info = safe_step(env, a)
        if rng.rand() < 0.2:
            o = np.asarray(obs, np.float32)
            o = o + rng.normal(0, 0.05, size=o.shape)
            obs = np.asarray(o, np.float32)
        return obs, float(r), bool(done), info or {}

    step_wrappers = [step_RN, step_M, step_RN, step_M]
    eps_cfg = [
        dict(eps=0.05),
        dict(eps=None),
        dict(eps=None),
        dict(eps=0.05),
    ]

    def chooser_factory(cfg):
        eps = cfg.get("eps", None)
        rng_local = np.random.RandomState(2026)

        def choose(obs):
            if (eps is not None) and (rng_local.rand() < eps):
                return int(rng_local.randint(0, 4))
            return agent_act(obs)

        return choose

    rows = []

    for i in range(budget):
        cfg   = eps_cfg[i % 4]
        stepf = step_wrappers[i % 4]
        chooser = chooser_factory(cfg)

        env = make_lunar_env(seed=SEED_BASE + 1400 + i)
        obs = safe_reset(env)
        cum, t = 0.0, 0
        t0 = time.time()

        while True:
            a = int(chooser(obs))
            obs, r, done, info = stepf(env, a)
            cum += r
            t += 1
            if done or (t >= MAX_STEPS):
                fail = failure_oracle_lander(cum, t, done, info, **ORACLE_KW)
                rows.append({
                    "failure": int(fail),
                    "steps": 1,
                    "wall_dt": time.time() - t0,
                })
                break

    raw_path = _save_raw("RLMutation", rows)
    return summarize_framework("RLMutation", rows) | {"raw_path": raw_path}


# μPRL
# ---------------------------------------------------------
def run_MUPRL_single(variant="strong", budget=EPISODE_BUDGET):
    """μPRL-style runtime model proxies via horizon / ε perturbations."""

    rng = np.random.RandomState(4242)

    MUTS = [
        dict(name="SDF(γ↓)",       horizon=int(0.8 * MAX_STEPS), eps=None),
        dict(name="SEC(entropy↑)", horizon=MAX_STEPS,           eps=None),
        dict(name="SMR(min-ε)",    horizon=MAX_STEPS,           eps=0.05),
        dict(name="SNR(rollout-)", horizon=int(0.9 * MAX_STEPS),eps=None),
    ]

    def chooser_factory(eps=None):
        def choose(obs):
            if (eps is not None) and (rng.rand() < eps):
                return int(rng.randint(0, 4))
            return agent_act(obs)
        return choose

    rows = []

    for i in range(budget):
        m = MUTS[i % len(MUTS)]
        chooser = chooser_factory(m.get("eps"))

        env = make_lunar_env(seed=SEED_BASE + 1500 + i)
        obs = safe_reset(env)
        cum, t = 0.0, 0
        t0 = time.time()

        while True:
            a = int(chooser(obs))
            obs, r, done, info = safe_step(env, a)
            cum += r
            t += 1
            if done or (t >= m["horizon"]):
                fail = failure_oracle_lander(cum, t, done, info, **ORACLE_KW)
                rows.append({
                    "failure": int(fail),
                    "steps": 1,
                    "wall_dt": time.time() - t0,
                })
                break

    raw_path = _save_raw("μPRL", rows)
    return summarize_framework("μPRL", rows) | {"raw_path": raw_path}


# MDPMorph
# ---------------------------------------------------------
def _mr_small_perturbation_same_outcome(chooser, base_seed, tmax):
    env = make_lunar_env(seed=base_seed)
    rA = run_one_episode(env, chooser, tmax=tmax)

    env2 = make_lunar_env(seed=base_seed + 1)
    rB = run_one_episode(env2, chooser, tmax=tmax)

    succA = (rA["failure"] == 0)
    succB = (rB["failure"] == 0)
    violated = int(succA != succB)

    return {
        "return":  0.5 * (rA["return"] + rB["return"]),
        "steps":   int(0.5 * (rA["steps"] + rB["steps"])),
        "failure": violated,
        "wall_dt": 0.5 * (rA["wall_dt"] + rB["wall_dt"]),
    }


def run_MDPMorph_single(variant="strong", budget=EPISODE_BUDGET):
    chooser = agent_act
    rows = []

    for i in range(budget):
        base_seed = SEED_BASE + 1600 + i
        rows.append(
            _mr_small_perturbation_same_outcome(
                chooser,
                base_seed,
                tmax=MAX_STEPS,
            )
        )

    raw_path = _save_raw("MDPMorph", rows)
    return summarize_framework("MDPMorph", rows) | {"raw_path": raw_path}


# MDPFuzz
# ---------------------------------------------------------
def run_MDPFuzz_single(variant="strong", budget=EPISODE_BUDGET):

    rows = []

    for j in range(budget):
        env = make_lunar_env(seed=SEED_BASE + 1800 + j)
        rows.append(run_one_episode(env, agent_act, tmax=MAX_STEPS))

    raw_path = _save_raw("MDPFuzz", rows)
    return summarize_framework("MDPFuzz", rows) | {"raw_path": raw_path}

In [None]:
#FS
#-------------------------------------------------------------------------
def run_FS_single(variant="strong", budget=EPISODE_BUDGET):

    rng = np.random.RandomState(2042)
    rows = []

    def act(obs):
        if rng.rand() < 0.05:
            return int(rng.randint(0, 4))
        return agent_act(obs)

    for j in range(budget):
        env = make_lunar_env(seed=SEED_BASE + 1900 + j)
        rows.append(
            run_one_episode(
                env,
                act,
                tmax=MAX_STEPS,
            )
        )

    raw_path = _save_raw("FS", rows)
    return summarize_framework("FS", rows) | {"raw_path": raw_path}


In [None]:
def run_policy_preserving_single(variant="strong", budget=EPISODE_BUDGET):
    rows = []

    for j in range(budget):
        env = make_lunar_env(seed=SEED_BASE + 2000 + j)
        rows.append(
            run_one_episode(
                env,
                agent_act,
                tmax=MAX_STEPS,
            )
        )

    raw_path = _save_raw("policy_preserving", rows)
    return summarize_framework("policy_preserving", rows) | {"raw_path": raw_path}

In [None]:
# Registry of 18 Frameworks
# =========================================================

FRAMEWORKS = [
    "AdaStop", "F-Oracle", "CUSUM-RL", "DLBT", "MDPMorph",
    "QD-Testing", "STARLA", "Indago", "AlphaTest",
    "NR-RL", "SBF", "MDPFuzz", "pi-Fuzz", "GMBT", "CureFuzz",
    "RLMutation", "FS", "μPRL",
]


def RUN_ONE(name):
    # --- statistical / white-box ---
    if name == "AdaStop":    return run_AdaStop_single
    if name == "F-Oracle":   return run_FOracle_single
    if name == "CUSUM-RL":   return run_CUSUMRL_single
    if name == "DLBT":       return run_DLBT_single
    if name == "MDPMorph":   return run_MDPMorph_single

    # --- policy-mutating ---
    if name == "RLMutation": return run_RLMutation_single
    if name == "FS":         return run_FS_single
    if name == "μPRL":       return run_MUPRL_single

    # --- policy-preserving ---
    if name == "MDPFuzz":    return run_MDPFuzz_single

    # Remaining frameworks use policy-preserving execution
    # (agent-based, no mutation, no prioritization)
    return run_policy_preserving_single

print("✓ Framework registry built with", len(FRAMEWORKS), "frameworks.")

✓ Framework registry built with 18 frameworks.


In [None]:
# Execute all frameworks
# =========================================================

import time
import numpy as np
import pandas as pd

rows = []

print(f"Running {len(FRAMEWORKS)} frameworks "
      f"(variant='strong', budget={EPISODE_BUDGET})")

for fw in FRAMEWORKS:
    print(f"\n=== [{fw}] starting ===")
    runner = RUN_ONE(fw)
    start = time.time()

    try:
        if runner is None:
            # Policy-preserving execution (agent only)
            out = run_policy_preserving_single(
                fw,
                variant="strong",
                budget=EPISODE_BUDGET,
            )
        else:
            out = runner(
                variant="strong",
                budget=EPISODE_BUDGET,
            )

        elapsed = time.time() - start
        out["framework"] = fw
        out["runtime_sec"] = elapsed
        out["oracle_R_success"] = ORACLE_KW.get("R_SUCCESS")

        print(
            f"=== [{fw}] done in {elapsed/60:.2f} min | "
            f"FR={out.get('FR'):.4f}, "
            f"TTF={out.get('TTF_tests')} ==="
        )

    except Exception as e:
        elapsed = time.time() - start
        print(f"!!! [{fw}] ERROR after {elapsed/60:.2f} min:", e)

        out = {
            "framework": fw,
            "FR": np.nan,
            "APFD": np.nan,
            "APFD_time": np.nan,
            "TTF_tests": np.nan,
            "TTF_seconds": np.nan,
            "total_seconds": np.nan,
            "budget": 0,
            "raw_path": "",
            "runtime_sec": elapsed,
            "oracle_R_success": ORACLE_KW.get("R_SUCCESS"),
        }

    rows.append(out)

# Save  metrics table
# ---------------------------------------------------------
SINGLE = pd.DataFrame(rows).sort_values("framework").reset_index(drop=True)

out_csv = TABLES_DIR / "lunar_single_row_metrics.csv"
SINGLE.to_csv(out_csv, index=False)

print("\n✓ All frameworks completed.")
print("✓ Saved metrics table →", out_csv)

display(
    SINGLE[
        [
            "framework",
            "FR",
            "APFD",
            "APFD_time",
            "TTF_tests",
            "TTF_seconds",
            "total_seconds",
            "budget",
            "runtime_sec",
        ]
    ]
)

Running 18 frameworks (variant='strong', budget=300)

=== [AdaStop] starting ===


Consider using tensor.detach() first. (Triggered internally at /pytorch/torch/csrc/autograd/generated/python_variable_methods.cpp:836.)
  torch.tensor([policy_logits[0][a] for a in actions]), dim=0


=== [AdaStop] done in 7.67 min | FR=0.0033, TTF=46.0 ===

=== [F-Oracle] starting ===
=== [F-Oracle] done in 20.47 min | FR=0.0033, TTF=210.0 ===

=== [CUSUM-RL] starting ===
=== [CUSUM-RL] done in 16.83 min | FR=0.0033, TTF=1.0 ===

=== [DLBT] starting ===
[DLBT] Starting run for variant=strong, budget=300


  return datetime.utcnow().replace(tzinfo=utc)


✓ _collect_traces_lander: collected 40 traces
✓ _learn_abstract_mdp: built MDP with 21 abstract states
✓ _reach_policy: built policy for 21 states
[DLBT] Episode 100/300 done. FR_episode=1
[DLBT] Episode 200/300 done. FR_episode=0
[DLBT] Episode 300/300 done. FR_episode=1
[DLBT] Summary: FR=0.7200, TTF_tests=3.0, total_seconds=6824.45
=== [DLBT] done in 126.70 min | FR=0.7200, TTF=3.0 ===

=== [MDPMorph] starting ===
=== [MDPMorph] done in 471.69 min | FR=0.2333, TTF=8.0 ===

=== [QD-Testing] starting ===
=== [QD-Testing] done in 243.19 min | FR=0.1100, TTF=12.0 ===

=== [STARLA] starting ===


In [None]:
# results table
# =========================================================

import pandas as pd

results_csv = TABLES_DIR / "lunar_single_row_metrics.csv"
assert results_csv.exists(), f"Results table not found: {results_csv}"

SINGLE = pd.read_csv(results_csv)
SINGLE = SINGLE.sort_values("framework").reset_index(drop=True)

print("✓ Loaded LunarLander single-run results table")
display(SINGLE)

In [None]:
#  FR, CF-tests, CF-time
# =========================================================

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt


RAW_DIR = RESULTS_DIR / "raw_single"
FIGS_DIR.mkdir(exist_ok=True)

BUDGET = EPISODE_BUDGET

plt.rcParams.update({
    "figure.figsize": (10, 6),
    "figure.dpi": 120,
    "savefig.dpi": 300,
    "font.family": "serif",
    "font.serif": ["Times New Roman", "Times", "DejaVu Serif"],
    "font.size": 11,
    "axes.labelsize": 11,
    "axes.titlesize": 12,
    "legend.fontsize": 9,
    "axes.linewidth": 0.8,
    "axes.grid": True,
    "grid.alpha": 0.25,
    "grid.linestyle": "--",
    "lines.linewidth": 1.6,
    "legend.frameon": False,
    "axes.spines.top": False,
    "axes.spines.right": False,
})

def _col(name):
    return FRAMEWORK_COLORS.get(name, "#777777")


def _load_raw(framework):
    p = RAW_DIR / f"{framework}_episodes.csv"
    if not p.exists():
        return None
    df = pd.read_csv(p)
    if "failure" not in df.columns:
        return None
    df = df.copy()
    df["failure"] = pd.to_numeric(df["failure"], errors="coerce").fillna(0).astype(int)
    if "wall_dt" in df.columns:
        df["time"] = pd.to_numeric(df["wall_dt"], errors="coerce").fillna(0.0)
    else:
        df["time"] = 1.0
    return df.iloc[:BUDGET].reset_index(drop=True)

def _cum(df):
    y = np.maximum.accumulate(df["failure"].to_numpy().cumsum())
    t = df["time"].to_numpy().cumsum()
    x = np.arange(1, len(df) + 1)
    return x, y, t

# 1) FR bar plot
# =========================================================
plt.figure(figsize=(10.5, 4.2))
labels = SINGLE["framework"].tolist()
fr_vals = SINGLE["FR"].astype(float).fillna(0.0).to_numpy()
colors = [_col(f) for f in labels]

bars = plt.bar(labels, fr_vals, color=colors, edgecolor="black", linewidth=0.6)
plt.ylabel("Failure Rate (FR)")
plt.title("LunarLander — Failure Rate (single run)")
plt.xticks(rotation=60, ha="right")

ymax = max(0.05, fr_vals.max() * 1.15 if len(fr_vals) else 0.05)
plt.ylim(0, ymax)

for b, v in zip(bars, fr_vals):
    plt.text(
        b.get_x() + b.get_width() / 2,
        b.get_height() + 0.01 * ymax,
        f"{v:.3f}",
        ha="center",
        va="bottom",
        fontsize=9,
    )

plt.tight_layout()
for ext in (".png", ".pdf", ".svg"):
    plt.savefig(FIGS_DIR / f"lunarlander_FR{ext}", bbox_inches="tight")
plt.show()
plt.close()

# 2) CF-tests and CF-time
# =========================================================
for axis in ("tests", "time"):
    plt.figure()
    for fw in labels:
        dfw = _load_raw(fw)
        if dfw is None or dfw.empty:
            continue
        x, y, t = _cum(dfw)
        if axis == "tests":
            plt.plot(x, y, label=fw, color=_col(fw))
            xlabel = "Tests (episodes)"
        else:
            plt.plot(t, y, label=fw, color=_col(fw))
            xlabel = "Cumulative time (s)"

    plt.xlabel(xlabel)
    plt.ylabel("Cumulative failures")
    plt.title(f"LunarLander — Cumulative Failures vs {'Tests' if axis=='tests' else 'Time'}")
    plt.legend(loc="center left", bbox_to_anchor=(1.0, 0.5), ncol=2)
    plt.grid(True, alpha=0.25)
    plt.tight_layout()

    for ext in (".png", ".pdf", ".svg"):
        plt.savefig(
            FIGS_DIR / f"lunarlander_CF_{axis}{ext}",
            bbox_inches="tight",
        )

    plt.show()
    plt.close()