# Inkswarm DetectLab — Step Runner Notebook (D-0022+)

Run the pipeline step-by-step with **clear visibility**, **artifact reuse**, and **restart safety**.

## Steps
0. Bootstrap & Imports
1. Wiring / paths check (dry-run)
2. Dataset (raw + splits) — compute or reuse
3. Labels + sanity checks (lightweight)
4. Features — compute or reuse (shared cache optional)
5. Baselines — compute or reuse
6. Eval — slice + stability reports
7. Export — UI summary, exec summary, UI bundle, handover, (optional) evidence
8. Step summary (latest attempt per step)

> **Tip:** Run cells top-to-bottom. If you restart the kernel, rerun from the top.


In [1]:
# Step 0A — Bootstrap: locate repo root + config path robustly

from __future__ import annotations

import os
from pathlib import Path

def _find_repo_root(start: Path | None = None) -> Path:
    p = (start or Path.cwd()).resolve()
    # walk up a few levels to find typical repo markers
    for _ in range(6):
        if (p / "configs").exists() and (p / "src").exists():
            return p
        if (p / "pyproject.toml").exists() or (p / "README.md").exists():
            # still require configs to avoid false positives
            if (p / "configs").exists():
                return p
        p = p.parent
    return Path.cwd().resolve()

REPO_ROOT = _find_repo_root()
os.chdir(REPO_ROOT)  # ensure relative paths work (configs/, runs/, etc.)

print("REPO_ROOT:", REPO_ROOT)

# Pick a config:
# - For a tiny run: configs/skynet_smoke.yaml
# - For a fuller run: configs/skynet_mvp.yaml
CFG_PATH = Path("configs/skynet_mvp_heavy.yaml")

# If your notebook starts in a different working dir, this ensures CFG_PATH is valid:
if not CFG_PATH.exists():
    alt = (REPO_ROOT / "configs" / CFG_PATH.name)
    if alt.exists():
        CFG_PATH = alt
print("CFG_PATH:", CFG_PATH, "(exists=", CFG_PATH.exists(), ")")


REPO_ROOT: C:\Users\Martín\Desktop\inkswarm-core\usul-inkswarm-detectlab
CFG_PATH: configs\skynet_mvp_heavy.yaml (exists= True )


In [2]:
# Step 0B — Core imports (kept in one place for restart safety)

import json
import inspect

from inkswarm_detectlab.config import load_config
from inkswarm_detectlab.ui.steps import StepRecorder
from inkswarm_detectlab.ui.notebook_tools import print_run_tree, tail_text

from inkswarm_detectlab.ui.step_runner import (
    resolve_run_id,
    wire_check,
    step_dataset,
    step_features,
    step_baselines,
    step_eval,
    step_export,
)

print("Imports OK")


Imports OK


In [3]:
# Step 0C — User inputs / toggles

# If None, run_id is generated from config fingerprint (recommended for ad-hoc runs).
# Or set explicitly, e.g.: RUN_ID = "RR2_MVP_GIT_B_0002"
RUN_ID = "RUN_2025"

# Per-step policy toggles
REUSE_IF_EXISTS = False #True

DO_STEP_1_DATASET = True
FORCE_STEP_1_DATASET = False

DO_STEP_2_FEATURES = True
FORCE_STEP_2_FEATURES = False
USE_SHARED_FEATURE_CACHE = True
WRITE_SHARED_FEATURE_CACHE = True

DO_STEP_3_BASELINES = True
FORCE_STEP_3_BASELINES = False  # If outputs already exist, set True.

DO_STEP_4_EVAL = True
FORCE_STEP_4_EVAL = False

DO_STEP_5_EXPORT = True
FORCE_STEP_5_EXPORT = False

print("Toggles loaded.")


Toggles loaded.


In [4]:
# Step 0D — Resolve cfg + run_id

cfg, run_id = resolve_run_id(CFG_PATH, run_id=RUN_ID)
print("cfg_path:", CFG_PATH)
print("run_id:", run_id)

# Recorder (records steps when you use the step_* wrappers below)
rec = StepRecorder()


cfg_path: configs\skynet_mvp_heavy.yaml
run_id: RUN_2025


In [5]:
# Step 0E — Wiring / paths check (Dry-run)

check = wire_check(CFG_PATH, run_id=run_id)
print(json.dumps(check, indent=2))

rdir = Path(check["paths"]["run_dir"])
print("\nRun tree (quick):")
print_run_tree(rdir)


{
  "run_id": "RUN_2025",
  "paths": {
    "run_dir": "runs\\RUN_2025",
    "raw_login_attempt": "runs\\RUN_2025\\raw\\login_attempt.parquet",
    "raw_checkout_attempt": "runs\\RUN_2025\\raw\\checkout_attempt.parquet",
    "dataset_train": "runs\\RUN_2025\\dataset\\login_attempt\\train.parquet",
    "dataset_time_eval": "runs\\RUN_2025\\dataset\\login_attempt\\time_eval.parquet",
    "dataset_user_holdout": "runs\\RUN_2025\\dataset\\login_attempt\\user_holdout.parquet",
    "features_login": "runs\\RUN_2025\\features\\login_attempt\\features.parquet",
    "baselines_dir": "runs\\RUN_2025\\models\\login_attempt\\baselines",
    "eval_reports_dir": "runs\\RUN_2025\\reports",
    "share_dir": "runs\\RUN_2025\\share",
    "manifest": "runs\\RUN_2025\\manifest.json"
  }
}

Run tree (quick):
- share/logs: runs\RUN_2025\share\logs (missing)
- share/reports: runs\RUN_2025\share\reports (missing)
- models: runs\RUN_2025\models (missing)
- reports: runs\RUN_2025\reports (missing)
- logs: runs\R

In [6]:
# Utility: consistent prints for step outcomes

def _print_outcome(out):
    print(f"\n=== STEP: {out.name} ===")
    print("status:", out.status)
    print("decision:", f"{out.decision.mode} — {out.decision.reason}")
    print("decision_meta:", f"used_manifest={out.decision.used_manifest} forced={out.decision.forced}")
    if out.notes:
        print("notes:")
        for n in out.notes:
            print(" -", n)
    if out.outputs:
        print("outputs:")
        for k, a in out.outputs.items():
            try:
                p = a.get("path")
                ex = a.get("exists")
                print(f" - {k}: {p} (exists={ex})")
            except Exception:
                print(f" - {k}: {getattr(a, 'path', '')} (exists={getattr(a, 'exists', None)})")
    if out.summary:
        print("summary:", out.summary)


In [7]:
# Step 1 — Raw + Dataset (or reuse)

out1 = None
if DO_STEP_1_DATASET:
    out1 = step_dataset(
        cfg,
        cfg_path=CFG_PATH,
        run_id=run_id,
        rec=rec,
        reuse_if_exists=REUSE_IF_EXISTS,
        force=FORCE_STEP_1_DATASET,
    )
    _print_outcome(out1)
else:
    print("Skipped (DO_STEP_1_DATASET=False)")



=== STEP: dataset ===
status: ok
decision: compute — Generated raw+dataset artifacts via pipeline.run_all(...).
decision_meta: used_manifest=False forced=False
outputs:
 - run_dir: runs\RUN_2025 (exists=True)
 - raw_login_attempt: runs\RUN_2025\raw\login_attempt.parquet (exists=True)
 - raw_checkout_attempt: runs\RUN_2025\raw\checkout_attempt.parquet (exists=True)
 - dataset_train: runs\RUN_2025\dataset\login_attempt\train.parquet (exists=True)
 - dataset_time_eval: runs\RUN_2025\dataset\login_attempt\time_eval.parquet (exists=True)
 - dataset_user_holdout: runs\RUN_2025\dataset\login_attempt\user_holdout.parquet (exists=True)


In [8]:
# Step 1b — Labels + sanity checks (lightweight)

from inkswarm_detectlab.synthetic.label_defs import as_markdown_table
from inkswarm_detectlab.io.tables import read_auto

print("\nLabel definitions (synthetic scenarios):")
print(as_markdown_table())

train_path = (rdir / "dataset" / "login_attempt" / "train.parquet")
if train_path.exists():
    df = read_auto(train_path)
    label_cols = [c for c in df.columns if c.startswith("label_")]
    print("\nTrain split size:", len(df))
    if label_cols:
        print("Label prevalence (train):")
        for c in sorted(label_cols):
            prev = float(df[c].mean())
            print(f" - {c}: {prev:.4f}")
    else:
        print("No label_ columns found in dataset table (unexpected).")
else:
    print("Train split missing; run Step 1 first.")



Label definitions (synthetic scenarios):
| label key | title | intent | how generated |
|---|---|---|---|
| `label_replicators` | Replicators | Automated or semi-automated high-volume login attempts (spray / scripted). | Assigned in the synthetic generator based on a per-row 'is_attacker' flag and an attack-campaign intensity bucket; deterministic given the generator seed. |
| `label_the_mule` | The Mule | Credential-stuffing / account takeover style attempts with stronger success bias. | Assigned in the synthetic generator from the same attacker campaign scaffold; deterministic given the generator seed. |
| `label_the_chameleon` | The Chameleon | Evasive attacker behavior designed to blend into normal traffic. | Assigned from the attacker campaign scaffold with higher 'stealth' characteristics; deterministic given the generator seed. |

Train split size: 1018018
Label prevalence (train):
 - label_benign: 0.9200
 - label_replicators: 0.0576
 - label_the_chameleon: 0.0405
 - label_the_

In [9]:
# Step 2 — Features (or reuse cache/artifacts)
#
# NOTE: step_features() signature has changed across patches. This cell adapts to what's installed
# by inspecting the signature and only passing supported kwargs.

import inspect

out2 = None
if DO_STEP_2_FEATURES:
    sig = inspect.signature(step_features)
    params = sig.parameters

    kwargs = dict(
        cfg_path=CFG_PATH,
        run_id=run_id,
        rec=rec,
        reuse_if_exists=REUSE_IF_EXISTS,
        force=FORCE_STEP_2_FEATURES,
    )

    # Shared feature cache toggles (compat across versions)
    if "use_shared_feature_cache" in params:
        kwargs["use_shared_feature_cache"] = USE_SHARED_FEATURE_CACHE
    elif "use_shared_cache" in params:
        kwargs["use_shared_cache"] = USE_SHARED_FEATURE_CACHE
    elif "shared_cache" in params:
        kwargs["shared_cache"] = USE_SHARED_FEATURE_CACHE

    if "write_shared_feature_cache" in params:
        kwargs["write_shared_feature_cache"] = WRITE_SHARED_FEATURE_CACHE
    elif "write_shared_cache" in params:
        kwargs["write_shared_cache"] = WRITE_SHARED_FEATURE_CACHE
    elif "write_cache" in params:
        kwargs["write_cache"] = WRITE_SHARED_FEATURE_CACHE

    out2 = step_features(cfg, **kwargs)
    _print_outcome(out2)

else:
    print("Skipped (DO_STEP_2_FEATURES=False)")

print("step_features signature:", sig)


  df[rname] = (df[name] / denom).fillna(0.0)
  df[ps] = sums[payment_col]
  df[pm] = (df[ps] / denom).fillna(0.0)
  df[cnt_name] = sums["_one"]
  df[name] = sums[oc]
  df[rname] = (df[name] / denom).fillna(0.0)
  df[name] = sums[oc]
  df[rname] = (df[name] / denom).fillna(0.0)
  df[name] = sums[oc]
  df[rname] = (df[name] / denom).fillna(0.0)
  df[name] = sums[oc]
  df[rname] = (df[name] / denom).fillna(0.0)
  df[ps] = sums[payment_col]
  df[pm] = (df[ps] / denom).fillna(0.0)
  df[cnt_name] = sums["_one"]
  df[name] = sums[oc]
  df[rname] = (df[name] / denom).fillna(0.0)
  df[name] = sums[oc]
  df[rname] = (df[name] / denom).fillna(0.0)
  df[name] = sums[oc]
  df[rname] = (df[name] / denom).fillna(0.0)
  df[name] = sums[oc]
  df[rname] = (df[name] / denom).fillna(0.0)
  df[ps] = sums[payment_col]
  df[pm] = (df[ps] / denom).fillna(0.0)
  df[cnt_name] = sums["_one"]
  df[name] = sums[oc]
  df[rname] = (df[name] / denom).fillna(0.0)
  df[name] = sums[oc]
  df[rname] = (df[name] / denom).


=== STEP: features ===
status: ok
decision: compute — Built features via features.runner.build_login_features_for_run(...).
decision_meta: used_manifest=False forced=False
outputs:
 - features_login: runs\RUN_2025\features\login_attempt\features.parquet (exists=True)
summary: {'cache_hit': False, 'cache_key': None}
step_features signature: (cfg: 'Any', *, cfg_path: 'Optional[Path]' = None, run_id: 'str', rec: 'Optional[StepRecorder]' = None, reuse_if_exists: 'bool' = True, force: 'bool' = False, use_shared_feature_cache: 'bool' = True, write_shared_feature_cache: 'bool' = True, use_cache: 'Optional[bool]' = None, write_cache: 'Optional[bool]' = None, use_shared_cache: 'Optional[bool]' = None, write_shared_cache: 'Optional[bool]' = None) -> 'StepResult'


In [None]:
# Step 3 — Baselines (or reuse artifacts)

out3 = None
if DO_STEP_3_BASELINES:
    try:
        out3 = step_baselines(
            cfg,
            cfg_path=CFG_PATH,
            run_id=run_id,
            rec=rec,
            reuse_if_exists=REUSE_IF_EXISTS,
            force=FORCE_STEP_3_BASELINES,
        )
        _print_outcome(out3)

        metrics_path = Path(cfg.paths.runs_dir) / run_id / "models" / "login_attempt" / "baselines" / "metrics.json"
        if metrics_path.exists():
            metrics = json.loads(metrics_path.read_text(encoding="utf-8"))
            print("\nBaseline summary (meta):")
            print(json.dumps(metrics.get("meta", {}), indent=2))
        else:
            print("\nmetrics.json not found (ok on partial runs).")
    except FileExistsError as e:
        print("Baselines output already exists. Set FORCE_STEP_3_BASELINES=True to overwrite.")
        raise
else:
    print("Skipped (DO_STEP_3_BASELINES=False)")


In [None]:
# Step 4 — Eval (slice + stability reports)

out4 = None
if DO_STEP_4_EVAL:
    out4 = step_eval(
        cfg,
        cfg_path=CFG_PATH,
        run_id=run_id,
        rec=rec,
        reuse_if_exists=REUSE_IF_EXISTS,
        force=FORCE_STEP_4_EVAL,
    )
    _print_outcome(out4)

    for rel in [
        "reports/eval_slices_login_attempt.md",
        "reports/eval_stability_login_attempt.md",
    ]:
        p = (Path(cfg.paths.runs_dir) / run_id / rel)
        print(f" - {rel}: {'OK' if p.exists() else 'missing'} ({p})")
else:
    print("Skipped (DO_STEP_4_EVAL=False)")


In [None]:
# Step 5 — Export (UI summary + exec summary + UI bundle + handover + optional evidence)

out5 = None
if DO_STEP_5_EXPORT:
    out5 = step_export(
        cfg,
        cfg_path=CFG_PATH,
        run_id=run_id,
        rec=rec,
        reuse_if_exists=REUSE_IF_EXISTS,
        force=FORCE_STEP_5_EXPORT,
    )
    _print_outcome(out5)

    print("\nRun tree (after export):")
    print_run_tree(Path(cfg.paths.runs_dir) / run_id)

    # Optional: if evidence bundle exists and you want to run it explicitly, call it safely with run_dir.
    try:
        from inkswarm_detectlab.share.evidence import export_evidence_bundle
        export_evidence_bundle(run_dir=(Path(cfg.paths.runs_dir) / run_id))
        print("Evidence bundle: OK")
    except Exception as e:
        print("Evidence bundle skipped:", e)
else:
    print("Skipped (DO_STEP_5_EXPORT=False)")


In [None]:
# Step 6 — Step summary table (shows latest attempt per step)

print(rec.to_markdown())

# Extra: quick artifact checks for RR
rdir2 = Path(cfg.paths.runs_dir) / run_id
checks = {
    "EXEC_SUMMARY.md": (rdir2 / "reports" / "EXEC_SUMMARY.md").exists(),
    "summary.md": (rdir2 / "reports" / "summary.md").exists(),
    "mvp_handover.md": (rdir2 / "reports" / "mvp_handover.md").exists(),
    "ui_bundle_dir": (rdir2 / "share" / "reports").exists(),
}
print("\nRR artifact checks:")
for k, v in checks.items():
    print(f" - {k}: {'OK' if v else 'missing'}")
