<a href="https://colab.research.google.com/github/olanigan/openLLM/blob/main/copy_of_mwl_sidecar_execengine_robusttests.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# MWL × Sidecar + Execution Engine — **Robust Test Suite**

This build adds a comprehensive **robustness harness** on top of the streaming executor:

**What’s included**
- Parameter sweeps over **transient/permanent** failure rates, **invoice counts**, and **class balance**
- **Chaos** modes: spontaneous state changes, eventual-consistency lag, lying tools
- **Adversarial driver** variants (lies, duplicates without idempotency, partial effects)
- **Fuzz tests** across random seeds
- **Metrics table**: pass rate, blocked per run, tool error stats, avg steps, retries

> Run top → bottom. The core pipeline is the same, we just instrument and hammer it.


In [None]:

%pip -q install pyyaml
import math, random, json, yaml, time, re, uuid, statistics as stats
from copy import deepcopy
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from typing import Dict, Any, List, Tuple
torch.__version__

'2.8.0+cu126'

In [None]:

# ---------- MWL Core ----------
class MWLWorld:
    def __init__(self, state: Dict[str, Any]):
        self.state = deepcopy(state)
    def get_invoice(self, inv_id: str) -> Dict[str, Any]:
        return next(i for i in self.state["invoices"] if i["id"] == inv_id)

class MWLContracts:
    def __init__(self, contracts: List[Dict[str, Any]]):
        self.contracts = contracts
    def get(self, name: str) -> Dict[str, Any]:
        return next(c for c in self.contracts if c["name"] == name)

def verify_success(state: Dict[str, Any]) -> bool:
    return all(i["status"] == "paid" for i in state["invoices"]
               if i.get("currency") == "USD" and i.get("amount", 0) < 10000)


In [None]:

# ---------- Contracts & Example World ----------
ACTION_CONTRACTS = [
  {
    "name":"submit_payment",
    "inputs":{"invoice_id":"string","method":"enum[ACH,CARD]"},
    "pre":["invoice(invoice_id).status == 'due'"],
    "effects":[ "invoice(invoice_id).status := 'paid'" ],
    "returns":{"receipt_id":"string"},
    "risk":{"level":"moderate","requires_hil_if":["invoice(invoice_id).amount > 5000"]},
    "driver":{"name":"mock.bank","op":"pay"}
  },
  {
    "name":"verify_payment",
    "inputs":{"invoice_id":"string"},
    "pre":["invoice(invoice_id).status in ['due','paid']"],
    "effects":[],
    "returns":{"ok":"bool"}
  }
]

EXAMPLE_STATE = {
  "invoices": [
    {"id":"inv_001","status":"due","amount":242.89,"currency":"USD","supplier":"ABC"},
    {"id":"inv_002","status":"paid","amount":51.00,"currency":"USD","supplier":"XYZ"}
  ]
}

print("Contracts:", yaml.safe_dump(ACTION_CONTRACTS)[:300] + "...")
print("Example state:", json.dumps(EXAMPLE_STATE, indent=2))


Contracts: - driver:
    name: mock.bank
    op: pay
  effects:
  - invoice(invoice_id).status := 'paid'
  inputs:
    invoice_id: string
    method: enum[ACH,CARD]
  name: submit_payment
  pre:
  - invoice(invoice_id).status == 'due'
  returns:
    receipt_id: string
  risk:
    level: moderate
    requires_h...
Example state: {
  "invoices": [
    {
      "id": "inv_001",
      "status": "due",
      "amount": 242.89,
      "currency": "USD",
      "supplier": "ABC"
    },
    {
      "id": "inv_002",
      "status": "paid",
      "amount": 51.0,
      "currency": "USD",
      "supplier": "XYZ"
    }
  ]
}


In [None]:

# ---------- Synthetic Dataset + Sidecar Training (weighted + threshold tuning) ----------
CURRENCIES = ["USD", "EUR", "JPY", "GBP"]
SUPPLIERS = ["ABC","XYZ","DEF","OMEGA","ALPHA","BETA"]

def random_invoice(i, p_due_usd=0.25):
    if random.random() < p_due_usd:
        amount = round(random.uniform(5, 9999), 2)
        return {"id": f"inv_{i:06d}","status":"due","amount":amount,"currency":"USD","supplier":random.choice(SUPPLIERS)}
    amount = round(random.uniform(5, 20000), 2)
    currency = random.choice(CURRENCIES)
    status = random.choice(["due","paid","draft"])
    return {"id": f"inv_{i:06d}","status": status,"amount": amount,"currency": currency,"supplier": random.choice(SUPPLIERS)}

def sample_world(n_invoices=50, p_due_usd=0.25):
    return {"invoices": [random_invoice(i, p_due_usd=p_due_usd) for i in range(n_invoices)]}

def label_invoice(inv):
    return int(inv["status"] == "due" and inv["currency"] == "USD" and inv["amount"] < 10000)

def encode_invoice(inv):
    amount_norm = min(inv["amount"] / 20000.0, 1.0)
    is_USD = 1.0 if inv["currency"] == "USD" else 0.0
    is_due = 1.0 if inv["status"] == "due" else 0.0
    is_paid = 1.0 if inv["status"] == "paid" else 0.0
    is_draft = 1.0 if inv["status"] == "draft" else 0.0
    return np.array([amount_norm, is_USD, is_due, is_paid, is_draft], dtype=np.float32)

def build_dataset(n_worlds=240, n_invoices=64, p_due_usd=0.25):
    X, y = [], []
    for _ in range(n_worlds):
        W = sample_world(n_invoices=n_invoices, p_due_usd=p_due_usd)
        for inv in W["invoices"]:
            X.append(encode_invoice(inv))
            y.append(label_invoice(inv))
    return np.stack(X), np.array(y, dtype=np.float32)

X, y = build_dataset(n_worlds=240, n_invoices=64, p_due_usd=0.25)
perm = np.random.permutation(len(X))
split = int(0.8*len(X))
tr_idx, te_idx = perm[:split], perm[split:]
Xtr, ytr = X[tr_idx], y[tr_idx]
Xte, yte = X[te_idx], y[te_idx]

class SidecarMLP(nn.Module):
    def __init__(self, in_dim=5, hidden=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, hidden), nn.ReLU(),
            nn.Linear(hidden, hidden), nn.ReLU(),
            nn.Linear(hidden, 1)
        )
    def forward(self, x): return self.net(x).squeeze(-1)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SidecarMLP().to(device)
opt = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)

pos = float((ytr == 1).sum()); neg = float((ytr == 0).sum())
pos_weight = torch.tensor([neg / max(pos, 1.0)], device=device)
loss_fn = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
print(f"Class balance (train): pos={int(pos)} neg={int(neg)} pos_weight={pos_weight.item():.2f}")

Xtr_t = torch.from_numpy(Xtr).float().to(device)
ytr_t = torch.from_numpy(ytr).float().to(device)
Xte_t = torch.from_numpy(Xte).float().to(device)
yte_t = torch.from_numpy(yte).float().to(device)

def eval_acc_f1_at_threshold(model, thr=0.5):
    model.eval()
    with torch.no_grad():
        probs = torch.sigmoid(model(Xte_t)).cpu().numpy()
    pred = (probs > thr).astype(np.float32)
    tp = ((pred==1) & (yte==1)).sum(); fp = ((pred==1) & (yte==0)).sum(); fn = ((pred==0) & (yte==1)).sum()
    acc = (pred == yte).mean().item()
    prec = tp / (tp + fp + 1e-9); rec  = tp / (tp + fn + 1e-9)
    f1   = 2*prec*rec/(prec+rec+1e-9)
    return float(acc), float(f1)

def best_threshold(model, X_val, y_val):
    model.eval()
    with torch.no_grad():
        logits = model(torch.from_numpy(X_val).float().to(device))
        probs = torch.sigmoid(logits).cpu().numpy()
    ts = np.linspace(0.05, 0.95, 19)
    best = (0.0, 0.5)
    for t in ts:
        pred = (probs > t).astype(np.float32)
        tp = ((pred==1) & (y_val==1)).sum(); fp = ((pred==1) & (y_val==0)).sum(); fn = ((pred==0) & (y_val==1)).sum()
        prec = tp / (tp + fp + 1e-9); rec  = tp / (tp + fn + 1e-9)
        f1 = 2*prec*rec/(prec+rec+1e-9)
        if f1 > best[0]: best = (float(f1), float(t))
    return best

best = {"f1":0.0, "thr":0.5, "acc":0.0}
for ep in range(1, 9):
    model.train(); tot=0.0
    for i in range(0, len(Xtr_t), 4096):
        xb, yb = Xtr_t[i:i+4096], ytr_t[i:i+4096]
        opt.zero_grad(); logits = model(xb)
        loss = loss_fn(logits, yb); loss.backward(); opt.step()
        tot += loss.item() * len(xb)
    acc05, f105 = eval_acc_f1_at_threshold(model, 0.5)
    f1_val, thr_val = best_threshold(model, Xte, yte)
    print(f"[ep {ep:02d}] train_loss={tot/len(Xtr_t):.4f}  eval_acc@0.5={acc05:.4f}  eval_f1@0.5={f105:.4f}  best_F1={f1_val:.4f}@thr={thr_val:.2f}")
    if f1_val > best["f1"]:
        best.update({"f1": f1_val, "thr": thr_val, "acc": acc05})

SELECTED_THRESHOLD = best["thr"]
print("Selected threshold:", SELECTED_THRESHOLD)


Class balance (train): pos=3450 neg=8838 pos_weight=2.56
[ep 01] train_loss=0.9898  eval_acc@0.5=0.5479  eval_f1@0.5=0.5486  best_F1=0.5486@thr=0.50
[ep 02] train_loss=0.9600  eval_acc@0.5=0.7562  eval_f1@0.5=0.6927  best_F1=0.6927@thr=0.50
[ep 03] train_loss=0.9313  eval_acc@0.5=0.8154  eval_f1@0.5=0.7486  best_F1=0.7486@thr=0.50
[ep 04] train_loss=0.9029  eval_acc@0.5=0.8903  eval_f1@0.5=0.8336  best_F1=0.9248@thr=0.55
[ep 05] train_loss=0.8743  eval_acc@0.5=0.9131  eval_f1@0.5=0.8634  best_F1=0.9877@thr=0.55
[ep 06] train_loss=0.8446  eval_acc@0.5=0.9323  eval_f1@0.5=0.8903  best_F1=0.9718@thr=0.55
[ep 07] train_loss=0.8134  eval_acc@0.5=0.9688  eval_f1@0.5=0.9462  best_F1=0.9591@thr=0.55
[ep 08] train_loss=0.7797  eval_acc@0.5=0.9710  eval_f1@0.5=0.9499  best_F1=0.9499@thr=0.50
Selected threshold: 0.5499999999999999


In [None]:

# ---------- Deterministic TaskSpec Parser ----------
import re
def parse_currency(text):
    m = re.search(r'\b(USD|EUR|JPY|GBP)\b', text, re.I);  return (m.group(1).upper() if m else "USD")
def parse_method(text):
    m = re.search(r'\b(ACH|CARD)\b', text, re.I);         return (m.group(1).upper() if m else "ACH")
def parse_amount(text):
    m = re.search(r'\$?\s*([0-9]{1,3}(?:,[0-9]{3})*(?:\.[0-9]+)?|[0-9]+(?:\.[0-9]+)?)', text)
    if not m: return 10000.0
    try: return float(m.group(1).replace(',', ''))
    except: return 10000.0

def generate_task_json(_tok, _mdl, user_text, debug=False):
    task = {"goal": user_text.strip(),
            "constraints": {"currency": parse_currency(user_text),
                            "max_amount": parse_amount(user_text),
                            "method": parse_method(user_text)}}
    if debug: print("TaskSpec (deterministic):", json.dumps(task, indent=2))
    return task


In [None]:

# ---------- Sidecar inference helpers ----------
def encode_invoice(inv):
    amount_norm = min(inv["amount"] / 20000.0, 1.0)
    is_USD = 1.0 if inv["currency"] == "USD" else 0.0
    is_due = 1.0 if inv["status"] == "due" else 0.0
    is_paid = 1.0 if inv["status"] == "paid" else 0.0
    is_draft = 1.0 if inv["status"] == "draft" else 0.0
    return np.array([amount_norm, is_USD, is_due, is_paid, is_draft], dtype=np.float32)

def sidecar_select_ids(model, state: Dict[str,Any], task: Dict[str,Any], threshold=None):
    constraints = task.get("constraints", {})
    want_currency = constraints.get("currency", "USD")
    max_amount = float(constraints.get("max_amount", 10000))
    method = constraints.get("method", "ACH")
    thr = SELECTED_THRESHOLD if threshold is None else float(threshold)

    feats, ids, mask = [], [], []
    for inv in state["invoices"]:
        ids.append(inv["id"])
        feats.append(encode_invoice(inv))
        allowed = (inv.get("currency")==want_currency) and (inv.get("amount",0) <= max_amount)
        mask.append(1.0 if allowed else 0.0)
    X_ = torch.from_numpy(np.stack(feats)).float().to(next(model.parameters()).device)
    with torch.no_grad():
        probs = torch.sigmoid(model(X_)).cpu().numpy()
    selected = [ids[i] for i,(p,m) in enumerate(zip(probs,mask)) if (m>0 and p>thr)]
    return selected, probs.tolist(), ids, method, thr

def compile_plan(pay_ids: List[str], method: str = "ACH"):
    steps = []; k=1
    for inv_id in pay_ids:
        steps.append({"id": f"p{k}", "action": "submit_payment",
                      "args": {"invoice_id": inv_id, "method": method}, "depends_on": []})
        k += 1
        steps.append({"id": f"v{k}", "action": "verify_payment",
                      "args": {"invoice_id": inv_id}, "depends_on": [f"p{k-1}"]})
        k += 1
    return {"plan": steps}


In [None]:

# ---------- Execution Engine with Chaos/Adversarial Drivers ----------
from dataclasses import dataclass

class TransientToolError(Exception): ...
class PermanentToolError(Exception): ...

@dataclass
class ExecConfig:
    max_retries: int = 3
    base_backoff_s: float = 0.25
    backoff_factor: float = 2.0
    verify_after_each: bool = True
    perception_refresh: bool = True
    replan_budget: int = 2
    max_steps: int = 1024
    circuit_break_after: int = 5
    wall_clock_limit_s: float = 30.0
    chaos_spontaneous_pay_p: float = 0.0
    chaos_flip_status_p: float = 0.0

class ToolDriver:
    def op(self, name: str, **kwargs) -> dict: raise NotImplementedError
    def classify_error(self, exc: Exception) -> Exception: return exc

class BankDriverStub(ToolDriver):
    def __init__(self, transient_p=0.15, permanent_p=0.03):
        self._ledger = {}
        self._paid = set()
        self.transient_p = transient_p
        self.permanent_p = permanent_p
    def op(self, name: str, **kwargs) -> dict:
        if name != "pay": raise PermanentToolError(f"Unsupported op: {name}")
        inv_id = kwargs["invoice_id"]; method = kwargs.get("method", "ACH")
        idem = kwargs.get("idempotency_key")
        if idem in self._ledger:
            return {"status": "deduped", **self._ledger[idem]}
        r = random.random()
        if r < self.permanent_p:
            raise PermanentToolError(f"Invoice {inv_id} rejected by bank")
        if r < self.permanent_p + self.transient_p:
            raise TransientToolError("bank timeout")
        receipt = f"rcpt_{uuid.uuid4().hex[:10]}"
        self._paid.add(inv_id)
        res = {"status": "ok", "receipt_id": receipt, "method": method, "invoice_id": inv_id}
        self._ledger[idem] = res
        return res

class BankDriverLiar(ToolDriver):
    def __init__(self, ok_but_no_effect_p=0.2, transient_p=0.1):
        self.ok_but_no_effect_p = ok_but_no_effect_p
        self.transient_p = transient_p
    def op(self, name: str, **kwargs) -> dict:
        if name != "pay": raise PermanentToolError("Unsupported op")
        r = random.random()
        if r < self.transient_p: raise TransientToolError("network blip")
        if r < self.transient_p + self.ok_but_no_effect_p:
            return {"status":"ok_but_no_effect","invoice_id":kwargs.get("invoice_id")}
        return {"status":"ok","receipt_id":f"rcpt_{uuid.uuid4().hex[:10]}","invoice_id":kwargs.get("invoice_id")}

class BankDriverNoIdem(ToolDriver):
    def __init__(self, transient_p=0.1):
        self.seen = set(); self.transient_p = transient_p
    def op(self, name: str, **kwargs) -> dict:
        if name != "pay": raise PermanentToolError("Unsupported op")
        if random.random() < self.transient_p: raise TransientToolError("timeout")
        idem = kwargs.get("idempotency_key","none")
        if idem in self.seen:
            return {"status":"duplicate_charge","invoice_id": kwargs.get("invoice_id")}
        self.seen.add(idem)
        return {"status":"ok","receipt_id":f"rcpt_{uuid.uuid4().hex[:10]}","invoice_id":kwargs.get("invoice_id")}

class DriverRegistry:
    def __init__(self): self._drivers = {}
    def register(self, driver_name: str, driver: ToolDriver): self._drivers[driver_name] = driver
    def for_contract(self, contract: dict):
        drv = contract.get("driver", {})
        name = drv.get("name"); op = drv.get("op")
        if not name:  # local-only
            return None, None
        if name not in self._drivers:
            raise PermanentToolError(f"No driver bound for {name}")
        return self._drivers[name], op

class MWLExecutor:
    def __init__(self, world: MWLWorld, contracts: MWLContracts, drivers: DriverRegistry, cfg: ExecConfig):
        self.world = world; self.contracts = contracts; self.drivers = drivers; self.cfg = cfg
        self.exec_log = {}; self.step_count = 0
        self._tool_transient_counts = {}; self._blocked = set()
        self._stats = {"transient":0, "permanent":0, "deduped":0, "ok_no_effect":0, "duplicate_charge":0}
        self._adaptive_thr = None

    def _idempotency_key(self, step: dict) -> str:
        payload = json.dumps({"action": step["action"], "args": step["args"]}, sort_keys=True)
        return f"idem_{uuid.uuid5(uuid.NAMESPACE_DNS, payload)}"
    def _pre_ok(self, step: dict, contract: dict) -> bool:
        def invoice(inv_id): return self.world.get_invoice(inv_id)
        ok = True
        for expr in contract.get("pre", []):
            if expr == "invoice(invoice_id).status == 'due'":
                ok &= invoice(step["args"]["invoice_id"])["status"] == "due"
            elif expr == "invoice(invoice_id).status in ['due','paid']":
                ok &= invoice(step["args"]["invoice_id"])["status"] in ["due","paid"]
            else: ok &= False
        return ok
    def _apply_effects_local(self, step: dict, contract: dict, driver_result: dict):
        if driver_result and driver_result.get("status") in ("ok","deduped"):
            def invoice(inv_id): return self.world.get_invoice(inv_id)
            for eff in contract.get("effects", []):
                if eff.startswith("invoice(") and "status := 'paid'" in eff:
                    invoice(step["args"]["invoice_id"])["status"] = "paid"
    def _call_with_retries(self, driver: ToolDriver, op_name: str, **kwargs) -> dict:
        name = f"{driver.__class__.__name__}.{op_name}"; inv = kwargs.get("invoice_id", "NA")
        key = f"{name}:{inv}"; transient_count = self._tool_transient_counts.get(key, 0)
        for attempt in range(1, self.cfg.max_retries + 1):
            try:
                res = driver.op(op_name, **kwargs)
                st = res.get("status")
                if st == "deduped": self._stats["deduped"] += 1
                if st == "ok_but_no_effect": self._stats["ok_no_effect"] += 1
                if st == "duplicate_charge": self._stats["duplicate_charge"] += 1
                return res
            except Exception as e:
                if isinstance(e, PermanentToolError):
                    self._stats["permanent"] += 1; raise
                if isinstance(e, TransientToolError):
                    self._stats["transient"] += 1
                    transient_count += 1; self._tool_transient_counts[key] = transient_count
                    if transient_count >= self.cfg.circuit_break_after:
                        raise PermanentToolError(f"Circuit break for {key} after {transient_count} transients")
                    backoff = self.cfg.base_backoff_s * (self.cfg.backoff_factor ** (attempt - 1))
                    jitter = backoff * random.uniform(-0.25, 0.25)
                    time.sleep(max(0.0, backoff + jitter)); continue
                raise
        raise PermanentToolError(f"Exhausted retries for {key}")
    def _perception_refresh(self):
        for inv in self.world.state["invoices"]:
            if random.random() < self.cfg.chaos_spontaneous_pay_p and inv["status"] == "due":
                inv["status"] = "paid"
            if random.random() < self.cfg.chaos_flip_status_p and inv["status"] == "due":
                inv["status"] = "draft"
    def _eligible_due_ids(self, task_spec: dict) -> List[str]:
        return [inv["id"] for inv in self.world.state["invoices"]
                if inv["status"]=="due"
                and inv.get("currency")==task_spec["constraints"].get("currency","USD")
                and inv.get("amount",0) <= float(task_spec["constraints"].get("max_amount",10000))]
    def execute_plan_with_reconciliation(self, initial_plan: dict, sidecar_model=None, task_spec=None, threshold=None):
        start = time.time(); plan = deepcopy(initial_plan); replans_left = self.cfg.replan_budget
        while True:
            for step in plan.get("plan", []):
                if self.step_count >= self.cfg.max_steps or (time.time() - start) > self.cfg.wall_clock_limit_s:
                    raise RuntimeError("Execution budget exceeded")
                self.step_count += 1
                cid = step["id"]; action = step["action"]; contract = self.contracts.get(action)
                if not self._pre_ok(step, contract): continue
                driver, op_name = self.drivers.for_contract(contract)
                if driver and op_name:
                    idem = self._idempotency_key(step)
                    try:
                        driver_result = self._call_with_retries(driver, op_name, idempotency_key=idem, **step["args"])
                    except PermanentToolError as e:
                        inv = step["args"].get("invoice_id")
                        if inv: self._blocked.add(inv)
                        self.exec_log[cid] = {"action": action, "driver_result": {"status":"permanent_error","reason":str(e)}}
                        continue
                else:
                    driver_result = {"status":"noop"}
                self._apply_effects_local(step, contract, driver_result)
                verified = verify_success(self.world.state) if self.cfg.verify_after_each else None
                self.exec_log[cid] = {"action": action, "driver_result": driver_result, "verified": verified}
                if self.cfg.perception_refresh: self._perception_refresh()
            elig = self._eligible_due_ids(task_spec) if task_spec else []
            if len(elig)==0 or all(i in self._blocked for i in elig):
                return {"state": self.world.state, "exec_log": self.exec_log, "replans": self.cfg.replan_budget - replans_left, "stats": self._stats, "blocked": list(self._blocked)}
            if replans_left <= 0 or sidecar_model is None or task_spec is None:
                raise RuntimeError("Goal not met and no replans left")
            replans_left -= 1
            if threshold is None:
                self._adaptive_thr = max(0.1, (self._adaptive_thr if self._adaptive_thr is not None else SELECTED_THRESHOLD) - 0.05)
                thr = self._adaptive_thr
            else:
                thr = float(threshold)
            sel_ids, probs, ids, method, _t = sidecar_select_ids(sidecar_model, self.world.state, task_spec, threshold=thr)
            sel_ids = [i for i in sel_ids if i in elig and i not in self._blocked]
            plan = compile_plan(sel_ids, method=task_spec["constraints"].get("method", "ACH"))

In [None]:

# ---------- Streaming Execution ----------
def execute_streaming_all(world: MWLWorld,
                          executor: MWLExecutor,
                          sidecar_model,
                          task: Dict[str,Any],
                          threshold=None) -> Tuple[bool, set]:
    attempts = {}; blocked = set()
    while True:
        residual = [inv for inv in world.state["invoices"]
                    if inv["status"] == "due"
                    and inv["currency"] == task["constraints"]["currency"]
                    and inv["amount"] <= float(task["constraints"]["max_amount"])]
        if not residual:
            return True, blocked
        sel_ids, probs, ids, method, thr = sidecar_select_ids(sidecar_model, world.state, task, threshold=threshold)
        sel_ids = [i for i in sel_ids if any(r["id"]==i for r in residual)]
        target = sel_ids[0] if sel_ids else residual[0]["id"]
        P = {"plan":[
            {"id":"p1","action":"submit_payment","args":{"invoice_id":target,"method":task["constraints"]["method"]},"depends_on":[]},
            {"id":"v1","action":"verify_payment","args":{"invoice_id":target},"depends_on":["p1"]}
        ]}
        try:
            executor.execute_plan_with_reconciliation(P, sidecar_model=None, task_spec=task, threshold=threshold)
        except PermanentToolError as e:
            blocked.add((target, str(e)))
        except Exception as e:
            attempts[target] = attempts.get(target, 0) + 1
            if attempts[target] >= 3:
                blocked.add((target, f"exhausted:{type(e).__name__}"))


In [None]:

# ---------- Quick Demo with Chaos/Adversarial Toggles ----------
def run_demo(driver_kind="stub", transient_p=0.15, permanent_p=0.03,
             chaos_pay=0.0, chaos_flip=0.0, n_invoices=64, p_due_usd=0.25, seed=42):
    random.seed(seed); np.random.seed(seed)
    drivers = DriverRegistry()
    if driver_kind == "stub":
        drivers.register("mock.bank", BankDriverStub(transient_p=transient_p, permanent_p=permanent_p))
    elif driver_kind == "liar":
        drivers.register("mock.bank", BankDriverLiar(ok_but_no_effect_p=0.2, transient_p=transient_p))
    elif driver_kind == "noidem":
        drivers.register("mock.bank", BankDriverNoIdem(transient_p=transient_p))
    else:
        raise ValueError("driver_kind must be one of: stub|liar|noidem")

    cfg = ExecConfig(
        max_retries=3, base_backoff_s=0.1, backoff_factor=2.0, verify_after_each=True,
        perception_refresh=True, replan_budget=2, max_steps=2048, circuit_break_after=5,
        wall_clock_limit_s=20.0, chaos_spontaneous_pay_p=chaos_pay, chaos_flip_status_p=chaos_flip
    )

    task = generate_task_json(None, None, "Pay all USD invoices under $10,000 via ACH today.", debug=False)
    W = sample_world(n_invoices=n_invoices, p_due_usd=p_due_usd)
    world = MWLWorld(W)
    executor = MWLExecutor(world, MWLContracts(ACTION_CONTRACTS), drivers, cfg)

    ok, blocked = execute_streaming_all(world, executor, model, task, threshold=None)
    return {
        "ok": ok,
        "blocked": list(blocked),
        "stats": executor._stats,
        "final_paid": sum(1 for inv in world.state["invoices"] if inv["status"]=="paid"),
        "final_due": sum(1 for inv in world.state["invoices"] if inv["status"]=="due"),
    }

print(run_demo(driver_kind="stub", transient_p=0.15, permanent_p=0.03, chaos_pay=0.0, chaos_flip=0.0, n_invoices=64, p_due_usd=0.25, seed=1))


{'ok': True, 'blocked': [], 'stats': {'transient': 3, 'permanent': 1, 'deduped': 0, 'ok_no_effect': 0, 'duplicate_charge': 0}, 'final_paid': 41, 'final_due': 11}


In [None]:

# ---------- Robustness Grid & Fuzz ----------
def robustness_grid(runs=5):
    configs = []
    for driver in ["stub","liar","noidem"]:
        for tp in [0.0, 0.1, 0.2, 0.3]:
            for pp in [0.0, 0.03, 0.1]:
                for chaos_pay in [0.0, 0.05]:
                    for chaos_flip in [0.0, 0.05]:
                        for n_inv in [32, 64, 256]:
                            configs.append((driver,tp,pp,chaos_pay,chaos_flip,n_inv))
    results = []
    for (driver,tp,pp,cp,cf,n_inv) in configs:
        oks=[]; blocked_counts=[]; trans=[]; perm=[]; dedup=[]; okne=[]; dupc=[]
        for seed in range(runs):
            out = run_demo(driver_kind=driver, transient_p=tp, permanent_p=pp,
                           chaos_pay=cp, chaos_flip=cf, n_invoices=n_inv, p_due_usd=0.25, seed=seed)
            oks.append(1 if out["ok"] else 0)
            blocked_counts.append(len(out["blocked"]))
            trans.append(out["stats"].get("transient",0))
            perm.append(out["stats"].get("permanent",0))
            dedup.append(out["stats"].get("deduped",0))
            okne.append(out["stats"].get("ok_no_effect",0))
            dupc.append(out["stats"].get("duplicate_charge",0))
        results.append({
            "driver":driver,"transient_p":tp,"permanent_p":pp,"chaos_pay":cp,"chaos_flip":cf,"n_invoices":n_inv,
            "pass_rate": sum(oks)/len(oks),
            "avg_blocked": sum(blocked_counts)/len(blocked_counts),
            "avg_transient": sum(trans)/len(trans),
            "avg_permanent": sum(perm)/len(perm),
            "avg_deduped": sum(dedup)/len(dedup),
            "avg_ok_no_effect": sum(okne)/len(okne),
            "avg_duplicate_charge": sum(dupc)/len(dupc),
        })
    return results

def print_topline(results, k=15):
    results_sorted = sorted(results, key=lambda r: (r["pass_rate"], -r["n_invoices"]))
    for r in results_sorted[:k]:
        print(r)

grid = robustness_grid(runs=5)
print_topline(grid, k=15)


{'driver': 'stub', 'transient_p': 0.0, 'permanent_p': 0.0, 'chaos_pay': 0.0, 'chaos_flip': 0.0, 'n_invoices': 256, 'pass_rate': 1.0, 'avg_blocked': 0.0, 'avg_transient': 0.0, 'avg_permanent': 0.0, 'avg_deduped': 0.0, 'avg_ok_no_effect': 0.0, 'avg_duplicate_charge': 0.0}
{'driver': 'stub', 'transient_p': 0.0, 'permanent_p': 0.0, 'chaos_pay': 0.0, 'chaos_flip': 0.05, 'n_invoices': 256, 'pass_rate': 1.0, 'avg_blocked': 0.0, 'avg_transient': 0.0, 'avg_permanent': 0.0, 'avg_deduped': 0.0, 'avg_ok_no_effect': 0.0, 'avg_duplicate_charge': 0.0}
{'driver': 'stub', 'transient_p': 0.0, 'permanent_p': 0.0, 'chaos_pay': 0.05, 'chaos_flip': 0.0, 'n_invoices': 256, 'pass_rate': 1.0, 'avg_blocked': 0.0, 'avg_transient': 0.0, 'avg_permanent': 0.0, 'avg_deduped': 0.0, 'avg_ok_no_effect': 0.0, 'avg_duplicate_charge': 0.0}
{'driver': 'stub', 'transient_p': 0.0, 'permanent_p': 0.0, 'chaos_pay': 0.05, 'chaos_flip': 0.05, 'n_invoices': 256, 'pass_rate': 1.0, 'avg_blocked': 0.0, 'avg_transient': 0.0, 'avg_per