In [1]:
# STEP 1 — PLAN + ENVIRONMENT SANITY (pairs, config, C1 discovery, WF window)
# ------------------------------------------------------------------------------------
from pathlib import Path
from typing import Optional
import sys, re, json, inspect
from datetime import date
import pandas as pd

# ---------- PLAN (printable) ----------
PLAN = """
C1-Only Indicator Selection Plan (v1.9.8)

PHASE A — Discovery & Sanity
  A1. Verify environment: repo paths, config, modules present.
  A2. Enumerate data pairs (from CSVs); confirm readable.
  A3. Discover all confirmation indicators (functions starting with c1_ in confirmation_funcs.py).
  A4. Compute common date window across pairs for later walk-forward.

PHASE B — Baseline Benchmark (Defaults Only)
  B1. Run each C1 with default params across ALL pairs, C1-only scope (everything else OFF).
  B2. Parse metrics → defaults leaderboard (ROI%, MaxDD%, Expectancy, Trades).
  B3. Filter out zero/sparse-trade runs; spot any obviously broken indicators.

PHASE C — Parameter Sweeps (C1-Only)
  C1. Use sweeps.yaml: run grids per C1, compute composite score (ROI↑, DD↓, Exp↑, min-trades floor).
  C2. Produce c1_batch_results.csv and rank; select top 3–5 candidates.

PHASE D — Robustness Validation
  D1. Walk-forward each finalist (Train=24m, Test=6m) over common window.
  D2. Review OOS ROI%, OOS MaxDD%, hit-rate stability across folds.
  D3. (Optional) Monte Carlo to understand path-risk and confidence bounds.

PHASE E — Decision
  E1. Choose the current best C1 (indicator+params). Lock for next system phase (baseline/C2/exit later).
"""
print(PLAN.strip())
print("\n--- STEP 1: Environment Sanity ---")

# ---------- 1) Set paths ----------
PROJECT_ROOT = Path("/Users/keanupanapa/Notebooks/Forex_Backtester")
DATA_DIRS    = [PROJECT_ROOT / "data" / "daily"]
RESULTS_ROOT = PROJECT_ROOT / "results"
CONFIG_PATH  = PROJECT_ROOT / "config.yaml"

print(f"PROJECT_ROOT: {PROJECT_ROOT}")
print(f"RESULTS_ROOT: {RESULTS_ROOT}")
print(f"CONFIG_PATH : {CONFIG_PATH}")

assert PROJECT_ROOT.exists(), f"[FAIL] Project root not found: {PROJECT_ROOT}"
RESULTS_ROOT.mkdir(parents=True, exist_ok=True)

if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

# ---------- 2) Load base config ----------
import yaml
try:
    from backtester import load_config as _load_config
    base_cfg = _load_config(CONFIG_PATH)
except Exception:
    with open(CONFIG_PATH, "r") as f:
        base_cfg = yaml.safe_load(f)
print("\n[OK] Base config loaded. Top-level keys:", list(base_cfg.keys()))

# ---------- 3) Import backtest functions (for later steps) ----------
try:
    from backtester import run_backtest as _rb
    rb_sig = str(inspect.signature(_rb)); rb_ok = True
except Exception as e:
    rb_ok = False; rb_sig = f"Unavailable ({e})"

try:
    from backtester import run_backtest_walk_forward as _wf
    wf_sig = str(inspect.signature(_wf)); wf_ok = True
except Exception as e:
    try:
        from walk_forward import run_backtest_walk_forward as _wf
        wf_sig = str(inspect.signature(_wf)); wf_ok = True
    except Exception as e2:
        wf_ok = False; wf_sig = f"Unavailable ({e2})"

print(f"[CHECK] run_backtest signature: {rb_sig}")
print(f"[CHECK] run_backtest_walk_forward signature: {wf_sig}")

# ---------- 4) Find pairs from CSVs ----------
def find_pairs():
    pairs = []
    for d in DATA_DIRS + [PROJECT_ROOT / "data", PROJECT_ROOT / "data" / "daily", Path(".")]:
        if d.exists():
            for p in sorted(d.glob("*.csv")):
                name = p.stem
                if re.fullmatch(r"[A-Z]{3}_[A-Z]{3}", name):
                    pairs.append(name)
    return sorted(set(pairs))

PAIRS = find_pairs()
assert PAIRS, "[FAIL] No pairs found. Put CSVs (EUR_USD-style) under data/daily/ or next to this notebook."
print(f"[OK] Found {len(PAIRS)} pairs: {PAIRS}")

# ---------- 5) Discover C1 indicators ----------
import importlib
mod = None; last_err = None
for name in ("indicators.confirmation_funcs", "confirmation_funcs"):
    try:
        mod = importlib.import_module(name); break
    except Exception as e:
        last_err = e
assert mod is not None, f"[FAIL] Couldn't import confirmation_funcs: {last_err}"

def discover_c1_functions(module):
    c1s = []
    for func_name, obj in inspect.getmembers(module, inspect.isfunction):
        if func_name.startswith("c1_"):
            ind_name = func_name[len("c1_"):]
            c1s.append((ind_name, func_name))
    return sorted(c1s)

C1_FUNCS = discover_c1_functions(mod)
assert C1_FUNCS, "[FAIL] No c1_* functions found in confirmation_funcs.py"
print(f"[OK] Discovered {len(C1_FUNCS)} C1 indicators:")
print("    " + ", ".join(ind for ind, _fn in C1_FUNCS))

# ---------- 6) Compute common WF window across all pairs ----------
def _find_csv_for(pair: str) -> Optional[Path]:
    for d in DATA_DIRS + [PROJECT_ROOT / "data", PROJECT_ROOT / "data" / "daily", Path(".")]:
        p = d / f"{pair}.csv"
        if p.exists():
            return p
    return None

def _read_dates(csv_path: Path):
    df = pd.read_csv(csv_path, nrows=16)
    cols = [c.lower() for c in df.columns]
    for cand in ("date","timestamp","time","datetime"):
        if cand in cols:
            col = df.columns[cols.index(cand)]
            full = pd.read_csv(csv_path, usecols=[col], parse_dates=[col])
            s = pd.to_datetime(full[col], errors="coerce").dropna()
            return s.min().date(), s.max().date()
    col = df.columns[0]
    full = pd.read_csv(csv_path, usecols=[col], parse_dates=[col])
    s = pd.to_datetime(full[col], errors="coerce").dropna()
    return s.min().date(), s.max().date()

bounds, missing = [], []
for pair in PAIRS:
    p = _find_csv_for(pair)
    if not p: missing.append(pair); continue
    try:
        lo, hi = _read_dates(p)
        bounds.append((pair, lo, hi))
    except Exception as e:
        print(f"[WARN] Could not read dates for {pair}: {e}")

if missing:
    print("[WARN] Missing CSV for:", missing)

latest_start = max(b[1] for b in bounds)
earliest_end = min(b[2] for b in bounds)
WF_START, WF_END = latest_start, earliest_end
print(f"[OK] Common WF window across {len(bounds)} pairs: {WF_START} → {WF_END}")

# ---------- 7) Step 1 Summary ----------
print("\n=== STEP 1 SUMMARY ===")
print(f"Pairs ({len(PAIRS)}): {PAIRS}")
print(f"C1 indicators discovered: {len(C1_FUNCS)}")
print(f"WF window intersection : {WF_START} → {WF_END}")
print(f"run_backtest available : {rb_ok} | signature: {rb_sig}")
print(f"walk_forward available : {wf_ok} | signature: {wf_sig}")

print("""
NEXT:
Paste this output here and say "ready for Step 2".
In Step 2, we'll run a C1-only defaults pass over ALL indicators and compile the first leaderboard.
""")


C1-Only Indicator Selection Plan (v1.9.8)

PHASE A — Discovery & Sanity
  A1. Verify environment: repo paths, config, modules present.
  A2. Enumerate data pairs (from CSVs); confirm readable.
  A3. Discover all confirmation indicators (functions starting with c1_ in confirmation_funcs.py).
  A4. Compute common date window across pairs for later walk-forward.

PHASE B — Baseline Benchmark (Defaults Only)
  B1. Run each C1 with default params across ALL pairs, C1-only scope (everything else OFF).
  B2. Parse metrics → defaults leaderboard (ROI%, MaxDD%, Expectancy, Trades).
  B3. Filter out zero/sparse-trade runs; spot any obviously broken indicators.

PHASE C — Parameter Sweeps (C1-Only)
  C1. Use sweeps.yaml: run grids per C1, compute composite score (ROI↑, DD↓, Exp↑, min-trades floor).
  C2. Produce c1_batch_results.csv and rank; select top 3–5 candidates.

PHASE D — Robustness Validation
  D1. Walk-forward each finalist (Train=24m, Test=6m) over common window.
  D2. Review OOS ROI%,

In [2]:
# STEP 2 — Run ALL C1s with default params (C1-only) + Build Defaults Leaderboard
# ------------------------------------------------------------------------------------
from pathlib import Path
import sys, re, json, inspect, hashlib
from datetime import date
import pandas as pd
import numpy as np
import yaml

# ---------- Guards / fallbacks if STEP 1 wasn't run ----------
if 'PROJECT_ROOT' not in globals(): PROJECT_ROOT = Path("/Users/keanupanapa/Notebooks/Forex_Backtester")
if 'RESULTS_ROOT' not in globals(): RESULTS_ROOT = PROJECT_ROOT / "results"
RESULTS_ROOT.mkdir(parents=True, exist_ok=True)
if 'CONFIG_PATH' not in globals():  CONFIG_PATH  = PROJECT_ROOT / "config.yaml"
if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT))

# Load base config if missing
if 'base_cfg' not in globals():
    try:
        from backtester import load_config as _load_config
        base_cfg = _load_config(CONFIG_PATH)
    except Exception:
        base_cfg = yaml.safe_load(open(CONFIG_PATH, "r"))

# Discover pairs if missing
def _find_pairs():
    pairs = []
    for d in [PROJECT_ROOT / "data" / "daily", PROJECT_ROOT / "data", Path(".")]:
        if d.exists():
            for p in sorted(d.glob("*.csv")):
                name = p.stem
                if re.fullmatch(r"[A-Z]{3}_[A-Z]{3}", name):
                    pairs.append(name)
    return sorted(set(pairs))
if 'PAIRS' not in globals() or not PAIRS:
    PAIRS = _find_pairs()
    print(f"[INFO] Re-discovered pairs: {PAIRS}")

# Import confirmation funcs / backtester
import importlib, inspect as _inspect
try:
    mod
except NameError:
    mod = None
    for name in ("indicators.confirmation_funcs", "confirmation_funcs"):
        try:
            mod = importlib.import_module(name); break
        except Exception as e:
            last_err = e
    assert mod is not None, f"Couldn't import confirmation_funcs: {last_err}"

if 'C1_FUNCS' not in globals():
    C1_FUNCS = sorted([(fn[len("c1_"):], fn) for fn, obj in _inspect.getmembers(mod, _inspect.isfunction) if fn.startswith("c1_")])

try:
    from backtester import run_backtest as _run_backtest
except Exception:
    _run_backtest = None

# ---------- Helpers ----------
def deep_copy(d): return yaml.safe_load(yaml.safe_dump(d))

def _inject_indicator_params(ip: dict, short: str, params: dict) -> dict:
    for k in (f"indicators.confirmation_funcs.c1_{short}",
              f"indicators.confirmation_funcs.{short}",
              f"c1_{short}", short):
        ip[k] = dict(params)
    return ip

def patch_c1_only(cfg, pairs, c1_name, c1_params=None):
    c1_params = c1_params or {}
    cfg = deep_copy(cfg)
    cfg["pairs"] = list(pairs)
    inds = (cfg.get("indicators") or {}).copy()
    inds["c1"] = c1_name
    inds["use_c2"] = inds["use_baseline"] = inds["use_volume"] = inds["use_exit"] = False
    for k in ("c2","baseline","volume","exit"): inds.pop(k, None)
    cfg["indicators"] = inds
    rules = (cfg.get("rules") or {}).copy()
    rules.update({"one_candle_rule": False, "pullback_rule": False, "allow_baseline_as_catalyst": False})
    cfg["rules"] = rules
    cfg.setdefault("spreads", {})["enabled"] = False
    cfg.setdefault("dbcvix", {})["enabled"] = False
    cfg.setdefault("tracking", {}).update({"track_win_loss_scratch": True, "track_roi": True, "track_drawdown": True})
    ip = dict(cfg.get("indicator_params") or {})
    cfg["indicator_params"] = _inject_indicator_params(ip, c1_name, c1_params)
    return cfg

def short_hash(d: dict) -> str:
    s = json.dumps(d or {}, sort_keys=True); return hashlib.md5(s.encode()).hexdigest()[:8]

def infer_default_params(fn):
    params = {}
    sig = _inspect.signature(fn)
    for p in sig.parameters.values():
        if p.name in ("df", "signal_col"): continue
        if p.kind in (p.VAR_KEYWORD, p.VAR_POSITIONAL): continue
        if p.default is not _inspect._empty: params[p.name] = p.default
    return params

# Robust summary parser + improved fallbacks
DASH = r"[--–—]"  # hyphen variants
def parse_summary_file(path: Path) -> dict:
    txt = path.read_text() if path.exists() else ""
    def grab(pat, cast=float, default=None, flags=re.IGNORECASE):
        m = re.search(pat, txt, flags); 
        if not m: return default
        try: return cast(m.group(1))
        except Exception: return default
    d = {
        "total_trades": grab(r"Total\s+Trades\s*:\s*([0-9]+)", int, 0),
        "wins":         grab(r"Wins\s*:\s*([0-9]+)", int, 0),
        "losses":       grab(r"Losses\s*:\s*([0-9]+)", int, 0),
        "scratches":    grab(r"Scratches\s*:\s*([0-9]+)", int, 0),
        "win_pct_ns":   grab(rf"Win%\s*\(non{DASH}scratch\)\s*:\s*([0-9.]+)", float, None),
        "loss_pct_ns":  grab(rf"Loss%\s*\(non{DASH}scratch\)\s*:\s*([0-9.]+)", float, None),
        "roi_dollars":  grab(r"ROI\s*\(\$\)\s*:\s*([-0-9.]+)", float, None),
        "roi_pct":      grab(r"ROI\s*\(%\)\s*:\s*([-0-9.]+)", float, None),
        "max_dd_pct":   grab(r"(?:Max\s+DD|Max\s+Drawdown)\s*\(%\)\s*:\s*([-0-9.]+)", float, None),
        "expectancy":   grab(r"Expectancy\s*:\s*([-0-9.]+)", float, None),
    }
    ns = max(d["total_trades"] - d["scratches"], 0)
    if d["win_pct_ns"] is None and ns > 0:
        d["win_pct_ns"]  = round(100.0 * d["wins"]   / ns, 4)
    if d["loss_pct_ns"] is None and ns > 0:
        d["loss_pct_ns"] = round(100.0 * d["losses"] / ns, 4)
    return d

def fallback_metrics_from_files(run_dir: Path, metrics: dict) -> dict:
    # 1) MaxDD from equity_curve.csv
    eq = run_dir / "equity_curve.csv"
    if metrics.get("max_dd_pct") is None and eq.exists():
        try:
            df = pd.read_csv(eq)
            col = "equity" if "equity" in df.columns else ("pnl_realized_cum" if "pnl_realized_cum" in df.columns else None)
            if col is not None:
                s = df[col].astype(float)
                peak = s.cummax().replace(0, np.nan)
                dd = (s / peak - 1.0) * 100.0
                metrics["max_dd_pct"] = float(np.nanmin(dd.values))
        except Exception:
            pass
    # 2) Expectancy from trades.csv
    if metrics.get("expectancy") is None:
        tr = run_dir / "trades.csv"
        if tr.exists():
            try:
                df = pd.read_csv(tr)
                pnl_col = "pnl_realized" if "pnl_realized" in df.columns else ("pnl" if "pnl" in df.columns else None)
                if pnl_col:
                    metrics["expectancy"] = float(df[pnl_col].astype(float).mean())
            except Exception:
                pass
    return metrics

# ---------- Runner ----------
RUN_TAG = f"c1_defaults_{date.today().isoformat()}"
ROOT_RUN_DIR = RESULTS_ROOT / "results_history" / RUN_TAG
ROOT_RUN_DIR.mkdir(parents=True, exist_ok=True)

def run_c1_default(ind_name: str):
    fn = getattr(mod, f"c1_{ind_name}")
    params = infer_default_params(fn)
    cfg = patch_c1_only(base_cfg, PAIRS, ind_name, params)

    rid = f"{ind_name}__{short_hash(params)}"
    outdir = ROOT_RUN_DIR / rid; outdir.mkdir(parents=True, exist_ok=True)

    snap_path = outdir / f"config_c1only_{ind_name}.yaml"
    with open(snap_path, "w") as f: yaml.safe_dump(cfg, f, sort_keys=False)

    print(f"▶ Running {ind_name} params={params} → {outdir}")
    try:
        if _run_backtest is None: raise TypeError("run_backtest unavailable")
        _run_backtest(cfg, results_dir=outdir)
    except TypeError:
        from backtester import run_backtest as rb_alt
        rb_alt(str(snap_path), results_dir=outdir)

    trades, summ, eq = outdir / "trades.csv", outdir / "summary.txt", outdir / "equity_curve.csv"
    print("   artifacts:",
          ("trades.csv ✅" if trades.exists() else "trades.csv ❌"),
          ("summary.txt ✅" if summ.exists() else "summary.txt ❌"),
          ("equity_curve.csv ✅" if eq.exists() else "equity_curve.csv ❌"))
    return ind_name, params, outdir

# ---------- Execute all defaults ----------
run_dirs = []
for ind_name, _fn in C1_FUNCS:
    try:
        run_dirs.append(run_c1_default(ind_name))
    except Exception as e:
        print(f"❌ {ind_name} failed: {e}")

print(f"\nCompleted default runs: {len(run_dirs)}")

# ---------- Build defaults leaderboard ----------
rows = []
for ind_name, params, outdir in run_dirs:
    met = parse_summary_file(outdir / "summary.txt")
    met = fallback_metrics_from_files(outdir, met)
    rows.append({"indicator": ind_name, "params": json.dumps(params, sort_keys=True), **met})

df_defaults = pd.DataFrame(rows)
df_defaults = df_defaults[df_defaults["total_trades"] > 0].copy()

sort_cols = [c for c in ["roi_pct", "max_dd_pct", "expectancy"] if c in df_defaults.columns]
asc = [False, True, False][:len(sort_cols)]
df_defaults = df_defaults.sort_values(sort_cols, ascending=asc)

leader_path = RESULTS_ROOT / "c1_leaderboard_defaults.csv"
df_defaults.to_csv(leader_path, index=False)

print("\n=== DEFAULTS LEADERBOARD (Top 20) ===")
display(df_defaults.head(20))
print("\nSaved defaults leaderboard:", leader_path)

print("\nNEXT:\nPaste the last ~30 lines of the output (including the leaderboard head). Then say \"ready for Step 3\".")


▶ Running aroon params={'period': 25} → /Users/keanupanapa/Notebooks/Forex_Backtester/results/results_history/c1_defaults_2025-08-22/aroon__acab4c09
ℹ️  DBCVIX disabled or not loaded (series=None). Risk filter will not trigger.
ℹ️  DBCVIX config: {'enabled': None, 'mode': None, 'threshold': None, 'reduce_risk_to': None, 'source': None}
✅ Backtest complete. Results saved to '/Users/keanupanapa/Notebooks/Forex_Backtester/results/results_history/c1_defaults_2025-08-22/aroon__acab4c09'
   artifacts: trades.csv ✅ summary.txt ✅ equity_curve.csv ✅
▶ Running aso params={'period': 14} → /Users/keanupanapa/Notebooks/Forex_Backtester/results/results_history/c1_defaults_2025-08-22/aso__57678027
ℹ️  DBCVIX disabled or not loaded (series=None). Risk filter will not trigger.
ℹ️  DBCVIX config: {'enabled': None, 'mode': None, 'threshold': None, 'reduce_risk_to': None, 'source': None}
✅ Backtest complete. Results saved to '/Users/keanupanapa/Notebooks/Forex_Backtester/results/results_history/c1_default

Unnamed: 0,indicator,params,total_trades,wins,losses,scratches,win_pct_ns,loss_pct_ns,roi_dollars,roi_pct,max_dd_pct,expectancy
1,aso,"{""period"": 14}",1239,524,323,392,61.8654,38.1346,-9517.97,-95.18,-96.14596,-7.681976
13,ehlers_deli,"{""period"": 15}",3480,2015,1206,259,62.5582,37.4418,-9941.82,-99.42,-106.757414,-2.856845
33,price_momentum_oscillator,"{""long"": 26, ""short"": 12}",3103,1763,1018,322,63.3945,36.6055,-9948.14,-99.48,-105.333786,-3.205974
47,ttf,"{""period"": 20}",2378,1364,876,138,60.8929,39.1071,-9959.14,-99.59,-102.373019,-4.188033
30,metro_advanced,"{""period"": 14}",3340,1916,1179,245,61.9063,38.0937,-9960.98,-99.61,-101.598937,-2.982328
34,schaff_trend_cycle,"{""cycle"": 10, ""long"": 50, ""short"": 23}",4685,2668,1519,498,63.721,36.279,-9991.73,-99.92,-100.499677,-2.132706
6,cyber_cycle,"{""period"": 10}",4595,2560,1556,479,62.1963,37.8037,-9994.45,-99.94,-101.798973,-2.17507
36,smooth_step,"{""period"": 10}",4595,2560,1556,479,62.1963,37.8037,-9994.45,-99.94,-101.798973,-2.17507
14,ehlers_eot,"{""period"": 10}",4673,2531,1529,613,62.3399,37.6601,-9997.44,-99.97,-104.477977,-2.139405
5,coral,"{""period"": 21}",7329,3002,1350,2977,68.9798,31.0202,-9999.04,-99.99,-107.059318,-1.364311



Saved defaults leaderboard: /Users/keanupanapa/Notebooks/Forex_Backtester/results/c1_leaderboard_defaults.csv

NEXT:
Paste the last ~30 lines of the output (including the leaderboard head). Then say "ready for Step 3".


In [4]:
# STEP 3 — C1-Only Parameter Sweeps + Leaderboard (robust to missing sweeps.yaml)
# ------------------------------------------------------------------------------------
from pathlib import Path
import sys, re, json, inspect, hashlib, itertools, math
from datetime import date
import pandas as pd
import numpy as np
import yaml

# ---------- Guards ----------
if 'PROJECT_ROOT' not in globals(): PROJECT_ROOT = Path("/Users/keanupanapa/Notebooks/Forex_Backtester")
if 'RESULTS_ROOT' not in globals(): RESULTS_ROOT = PROJECT_ROOT / "results"
RESULTS_ROOT.mkdir(parents=True, exist_ok=True)
if 'CONFIG_PATH' not in globals():  CONFIG_PATH  = PROJECT_ROOT / "config.yaml"
if 'PAIRS' not in globals() or not PAIRS:
    def _find_pairs():
        pairs = []
        for d in [PROJECT_ROOT / "data" / "daily", PROJECT_ROOT / "data", Path(".")]:
            if d.exists():
                for p in sorted(d.glob("*.csv")):
                    name = p.stem
                    if re.fullmatch(r"[A-Z]{3}_[A-Z]{3}", name): pairs.append(name)
        return sorted(set(pairs))
    PAIRS = _find_pairs()
if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT))

# Load base cfg
if 'base_cfg' not in globals():
    try:
        from backtester import load_config as _load_config
        base_cfg = _load_config(CONFIG_PATH)
    except Exception:
        base_cfg = yaml.safe_load(open(CONFIG_PATH, "r"))

# Import confirmation funcs
import importlib
try: mod
except NameError:
    mod = None
    for name in ("indicators.confirmation_funcs", "confirmation_funcs"):
        try: mod = importlib.import_module(name); break
        except Exception: pass
    assert mod is not None, "Could not import confirmation_funcs module."
if 'C1_FUNCS' not in globals() or not C1_FUNCS:
    C1_FUNCS = sorted([(fn[len("c1_"):], fn) for fn, obj in inspect.getmembers(mod, inspect.isfunction) if fn.startswith("c1_")])

# Backtester entry
try:
    from backtester import run_backtest as _run_backtest
except Exception:
    _run_backtest = None

# ---------- Helpers (same contracts as Step 2) ----------
def deep_copy(d): return yaml.safe_load(yaml.safe_dump(d))
def _inject_indicator_params(ip: dict, short: str, params: dict) -> dict:
    for k in (f"indicators.confirmation_funcs.c1_{short}",
              f"indicators.confirmation_funcs.{short}",
              f"c1_{short}", short):
        ip[k] = dict(params)
    return ip
def patch_c1_only(cfg, pairs, c1_name, c1_params=None):
    c1_params = c1_params or {}
    cfg = deep_copy(cfg); cfg["pairs"] = list(pairs)
    inds = (cfg.get("indicators") or {}).copy()
    inds["c1"] = c1_name
    inds["use_c2"] = inds["use_baseline"] = inds["use_volume"] = inds["use_exit"] = False
    for k in ("c2","baseline","volume","exit"): inds.pop(k, None)
    cfg["indicators"] = inds
    rules = (cfg.get("rules") or {}).copy()
    rules.update({"one_candle_rule": False, "pullback_rule": False, "allow_baseline_as_catalyst": False})
    cfg["rules"] = rules
    cfg.setdefault("spreads", {})["enabled"] = False
    cfg.setdefault("dbcvix", {})["enabled"] = False
    cfg.setdefault("tracking", {}).update({"track_win_loss_scratch": True, "track_roi": True, "track_drawdown": True})
    ip = dict(cfg.get("indicator_params") or {})
    cfg["indicator_params"] = _inject_indicator_params(ip, c1_name, c1_params)
    return cfg
def short_hash(d: dict) -> str:
    s = json.dumps(d or {}, sort_keys=True); return hashlib.md5(s.encode()).hexdigest()[:8]
def infer_default_params(fn):
    params = {}
    sig = inspect.signature(fn)
    for p in sig.parameters.values():
        if p.name in ("df", "signal_col"): continue
        if p.kind in (p.VAR_KEYWORD, p.VAR_POSITIONAL): continue
        if p.default is not inspect._empty: params[p.name] = p.default
    return params

# Summary parsing & fallbacks
DASH = r"[--–—]"
def parse_summary_file(path: Path) -> dict:
    txt = path.read_text() if path.exists() else ""
    def grab(pattern, cast=float, default=None, flags=re.IGNORECASE):
        m = re.search(pattern, txt, flags)
        if not m: return default
        try: return cast(m.group(1))
        except Exception: return default
    d = {
        "total_trades": grab(r"Total\s+Trades\s*:\s*([0-9]+)", int, 0),
        "wins":         grab(r"Wins\s*:\s*([0-9]+)", int, 0),
        "losses":       grab(r"Losses\s*:\s*([0-9]+)", int, 0),
        "scratches":    grab(r"Scratches\s*:\s*([0-9]+)", int, 0),
        "win_pct_ns":   grab(rf"Win%\s*\(non{DASH}scratch\)\s*:\s*([0-9.]+)", float, None),
        "loss_pct_ns":  grab(rf"Loss%\s*\(non{DASH}scratch\)\s*:\s*([0-9.]+)", float, None),
        "roi_dollars":  grab(r"ROI\s*\(\$\)\s*:\s*([-0-9.]+)", float, None),
        "roi_pct":      grab(r"ROI\s*\(%\)\s*:\s*([-0-9.]+)", float, None),
        "max_dd_pct":   grab(r"(?:Max\s+DD|Max\s+Drawdown)\s*\(%\)\s*:\s*([-0-9.]+)", float, None),
        "expectancy":   grab(r"Expectancy\s*:\s*([-0-9.]+)", float, None),
    }
    ns = max(d["total_trades"] - d["scratches"], 0)
    if d["win_pct_ns"] is None and ns > 0:
        d["win_pct_ns"]  = round(100.0 * d["wins"]   / ns, 4)
    if d["loss_pct_ns"] is None and ns > 0:
        d["loss_pct_ns"] = round(100.0 * d["losses"] / ns, 4)
    return d

def fallback_metrics_from_files(run_dir: Path, metrics: dict) -> dict:
    eq = run_dir / "equity_curve.csv"
    if metrics.get("max_dd_pct") is None and eq.exists():
        try:
            df = pd.read_csv(eq)
            col = "equity" if "equity" in df.columns else ("pnl_realized_cum" if "pnl_realized_cum" in df.columns else None)
            if col:
                s = df[col].astype(float); peak = s.cummax().replace(0, np.nan)
                dd = (s / peak - 1.0) * 100.0
                metrics["max_dd_pct"] = float(np.nanmin(dd.values))
        except Exception: pass
    if metrics.get("expectancy") is None:
        tr = run_dir / "trades.csv"
        if tr.exists():
            try:
                df = pd.read_csv(tr)
                pnl_col = "pnl_realized" if "pnl_realized" in df.columns else ("pnl" if "pnl" in df.columns else None)
                if pnl_col:
                    metrics["expectancy"] = float(df[pnl_col].astype(float).mean())
            except Exception: pass
    return metrics

# ---------- Sweep spec: load file or auto-grid ----------
SWEEPS_PATH = PROJECT_ROOT / "sweeps.yaml"
have_sweeps = SWEEPS_PATH.exists()
sweeps = yaml.safe_load(SWEEPS_PATH.read_text()) if have_sweeps else {}

MIN_TRADES_FLOOR = 50
MAX_GRID_PER_IND = 6

SCORING = (sweeps.get("scoring") if sweeps else {}) or {}
W_ROI   = float(SCORING.get("roi_pct_w", 1.0))
W_DD    = float(SCORING.get("max_dd_w", 0.7))
W_EXP   = float(SCORING.get("expectancy_w", 0.2)) if "expectancy_w" in SCORING else 0.2
P_TRGAP = float(SCORING.get("trades_penalty_w", 0.0))
ALLOW = set(((sweeps.get("allowlist") or {}).get("c1") or [])) if sweeps else set()
BLOCK = set(((sweeps.get("blocklist")  or {}).get("c1") or [])) if sweeps else set()
DEFAULT_ROLE_PARAMS = ((sweeps.get("roles") or {}).get("c1") or []) if sweeps else []
ROLE_PARAMS_BY_NAME = {r["name"]: (r.get("params") or {}) for r in DEFAULT_ROLE_PARAMS if isinstance(r, dict) and "name" in r}
DEFAULT_PARAM_HINTS = ((sweeps.get("default_params") or {}).get("c1") or {}) if sweeps else {}

def _heuristic_values(name, default):
    try:
        if isinstance(default, bool): return [default]
        if isinstance(default, (int, float)):
            if name in ("gamma", "smoothing"):
                base = float(default)
                pool = sorted(set([round(v,3) for v in [0.1, min(max(base,0.0),1.0), 0.3, 0.5, 0.7, 0.9]]))
                return pool[:3] if len(pool) > 3 else pool
            if name in ("multiplier",):
                cand = sorted(set([2.0, 3.0, 4.0, float(default)]))
                return cand[:3]
            base = int(round(default))
            vals = sorted(set([max(3, base//2), base, max(3, int(round(base*1.5))), max(4, base*2)]))
            return vals[:3]
        return [default]
    except Exception:
        return [default]

def _grid_for_indicator(ind_name, fn):
    defaults = infer_default_params(fn)
    hints = dict(DEFAULT_PARAM_HINTS); hints.update(ROLE_PARAMS_BY_NAME.get(ind_name, {}))
    value_lists = {}
    for pname, pdef in defaults.items():
        if pname in ("df","signal_col"): continue
        if pname in hints:
            v = hints[pname]; value_lists[pname] = v if isinstance(v, list) else [v]
        else:
            value_lists[pname] = _heuristic_values(pname, pdef)
    if "short" in value_lists and "long" in value_lists:
        shorts = sorted(set(int(x) for x in value_lists["short"]))
        longs  = sorted(set(int(x) for x in value_lists["long"]))
        combos = [{"short": s, "long": l, **({"signal": value_lists["signal"][0]} if "signal" in value_lists else {})}
                  for s in shorts for l in longs if s < l]
        if not combos:
            combos = [{"short": defaults.get("short", 12), "long": defaults.get("long", 26)}]
        grid = combos
    else:
        keys = list(value_lists.keys())
        grid = [dict(zip(keys, vs)) for vs in itertools.product(*[value_lists[k] for k in keys])] if keys else [{}]
    grid = grid[:MAX_GRID_PER_IND]
    if defaults and defaults not in grid:
        grid = [defaults] + grid
        grid = grid[:MAX_GRID_PER_IND]
    if ALLOW and ind_name not in ALLOW: return []
    if ind_name in BLOCK: return []
    return grid

# ---------- Runner ----------
RUN_TAG = f"c1_sweeps_{date.today().isoformat()}"
ROOT_SWEEP_DIR = RESULTS_ROOT / "results_history" / RUN_TAG
ROOT_SWEEP_DIR.mkdir(parents=True, exist_ok=True)

def run_c1(ind_name: str, params: dict):
    cfg = patch_c1_only(base_cfg, PAIRS, ind_name, params)
    rid = f"{ind_name}__{short_hash(params)}"
    outdir = ROOT_SWEEP_DIR / rid; outdir.mkdir(parents=True, exist_ok=True)
    snap_path = outdir / f"config_c1only_{ind_name}.yaml"
    with open(snap_path, "w") as f: yaml.safe_dump(cfg, f, sort_keys=False)
    print(f"▶ Running {ind_name} params={params} → {outdir}")
    try:
        if _run_backtest is None: raise TypeError("run_backtest unavailable")
        _run_backtest(cfg, results_dir=outdir)
    except TypeError:
        from backtester import run_backtest as rb_alt
        rb_alt(str(snap_path), results_dir=outdir)
    trades, summ, eq = outdir / "trades.csv", outdir / "summary.txt", outdir / "equity_curve.csv"
    print("   artifacts:",
          ("trades.csv ✅" if trades.exists() else "trades.csv ❌"),
          ("summary.txt ✅" if summ.exists() else "summary.txt ❌"),
          ("equity_curve.csv ✅" if eq.exists() else "equity_curve.csv ❌"))
    return outdir

# Execute sweeps
sweep_runs = []
for ind_name, fn_name in C1_FUNCS:
    fn = getattr(mod, f"c1_{ind_name}")
    grid = _grid_for_indicator(ind_name, fn)
    if not grid:
        print(f"… skipping {ind_name} (not in allowlist or grid empty)."); continue
    for paramset in grid:
        try:
            outdir = run_c1(ind_name, paramset)
            sweep_runs.append((ind_name, paramset, outdir))
        except Exception as e:
            print(f"❌ {ind_name} {paramset} failed: {e}")

print(f"\nCompleted sweep runs: {len(sweep_runs)}")

# ---------- Consolidate & score ----------
rows = []
for ind_name, params, outdir in sweep_runs:
    met = parse_summary_file(outdir / "summary.txt")
    met = fallback_metrics_from_files(outdir, met)
    rows.append({"indicator": ind_name, "params": json.dumps(params, sort_keys=True), **met})

df_sweeps = pd.DataFrame(rows)
df_sweeps = df_sweeps[df_sweeps["total_trades"] >= MIN_TRADES_FLOOR].copy()

df_sweeps["roi_pct_f"]    = df_sweeps["roi_pct"].fillna(-1e6)
df_sweeps["max_dd_pct_f"] = df_sweeps["max_dd_pct"].fillna(0.0)
df_sweeps["expectancy_f"] = df_sweeps["expectancy"].fillna(0.0)

df_sweeps["trades_gap"]  = (MIN_TRADES_FLOOR - df_sweeps["total_trades"]).clip(lower=0)
df_sweeps["composite_score"] = (
    W_ROI * df_sweeps["roi_pct_f"]
    - W_DD * df_sweeps["max_dd_pct_f"].abs()
    + W_EXP * df_sweeps["expectancy_f"]
    - P_TRGAP * df_sweeps["trades_gap"]
)

df_sweeps = df_sweeps.sort_values(["composite_score","roi_pct_f","max_dd_pct_f"], ascending=[False, False, True])

out_csv = RESULTS_ROOT / "c1_batch_results.csv"
keep_cols = ["indicator","params","total_trades","wins","losses","scratches",
             "win_pct_ns","loss_pct_ns","roi_dollars","roi_pct","max_dd_pct","expectancy","composite_score"]
if not df_sweeps.empty: df_sweeps[keep_cols].to_csv(out_csv, index=False)
else: pd.DataFrame(columns=keep_cols).to_csv(out_csv, index=False)

print("\n=== SWEEPS LEADERBOARD (Top 20) ===")
display(df_sweeps[keep_cols].head(20) if not df_sweeps.empty else df_sweeps)
print("\nSaved sweep leaderboard:", out_csv)
print(f"\nSummary: Kept {len(df_sweeps)} configs with ≥{MIN_TRADES_FLOOR} trades.")
print("\nNEXT:\nPaste the last ~30 lines of the output (including the leaderboard head). Then say \"ready for Step 4\" (Walk-Forward on finalists).")


▶ Running aroon params={'period': 25} → /Users/keanupanapa/Notebooks/Forex_Backtester/results/results_history/c1_sweeps_2025-08-22/aroon__acab4c09
ℹ️  DBCVIX disabled or not loaded (series=None). Risk filter will not trigger.
ℹ️  DBCVIX config: {'enabled': None, 'mode': None, 'threshold': None, 'reduce_risk_to': None, 'source': None}
✅ Backtest complete. Results saved to '/Users/keanupanapa/Notebooks/Forex_Backtester/results/results_history/c1_sweeps_2025-08-22/aroon__acab4c09'
   artifacts: trades.csv ✅ summary.txt ✅ equity_curve.csv ✅
▶ Running aroon params={'period': 14} → /Users/keanupanapa/Notebooks/Forex_Backtester/results/results_history/c1_sweeps_2025-08-22/aroon__57678027
ℹ️  DBCVIX disabled or not loaded (series=None). Risk filter will not trigger.
ℹ️  DBCVIX config: {'enabled': None, 'mode': None, 'threshold': None, 'reduce_risk_to': None, 'source': None}
✅ Backtest complete. Results saved to '/Users/keanupanapa/Notebooks/Forex_Backtester/results/results_history/c1_sweeps_20

Unnamed: 0,indicator,params,total_trades,wins,losses,scratches,win_pct_ns,loss_pct_ns,roi_dollars,roi_pct,max_dd_pct,expectancy,composite_score
3,aso,"{""period"": 14}",1239,524,323,392,61.8654,38.1346,-9517.97,-95.18,-96.14596,-7.681976,-164.018567
46,ehlers_eot,"{""period"": 21}",2313,1358,805,150,62.7832,37.2168,-9857.5,-98.57,-100.29355,-4.261781,-169.627841
20,cyber_cycle,"{""period"": 21}",2258,1321,835,102,61.2709,38.7291,-9932.89,-99.33,-99.652916,-4.398976,-169.966836
111,smooth_step,"{""period"": 21}",2258,1321,835,102,61.2709,38.7291,-9932.89,-99.33,-99.652916,-4.398976,-169.966836
143,ttf,"{""period"": 21}",2258,1321,835,102,61.2709,38.7291,-9932.89,-99.33,-99.652916,-4.398976,-169.966836
35,dpo_histogram,"{""period"": 21}",18945,6722,3539,8684,65.5102,34.4898,-10000.0,-100.0,-100.006541,-0.527844,-170.110148
33,dpo_histogram,"{""period"": 20}",19126,6721,3531,8874,65.5579,34.4421,-10000.0,-100.0,-100.043167,-0.522848,-170.134787
48,ehlers_reverse_ema,"{""period"": 14}",13242,2569,4452,6221,36.5902,63.4098,-10000.0,-100.0,-100.000278,-0.755173,-170.151229
153,vulkan_profit,"{""period"": 10}",11981,4556,2215,5210,67.287,32.713,-10000.0,-100.0,-99.999994,-0.834655,-170.166927
88,perfect_trend_line,"{""period"": 10}",11947,4559,2190,5198,67.5507,32.4493,-10000.0,-100.0,-99.999993,-0.83703,-170.167401



Saved sweep leaderboard: /Users/keanupanapa/Notebooks/Forex_Backtester/results/c1_batch_results.csv

Summary: Kept 151 configs with ≥50 trades.

NEXT:
Paste the last ~30 lines of the output (including the leaderboard head). Then say "ready for Step 4" (Walk-Forward on finalists).


In [5]:
# STEP 4b — Fix WFO config (walk_forward None) and re-run Walk-Forward on finalists
# ------------------------------------------------------------------------------------
from pathlib import Path
import sys, re, json, inspect, hashlib
import pandas as pd
import numpy as np
import yaml
from datetime import date

PROJECT_ROOT = Path("/Users/keanupanapa/Notebooks/Forex_Backtester")
RESULTS_ROOT = PROJECT_ROOT / "results"
CONFIG_PATH  = PROJECT_ROOT / "config.yaml"
RESULTS_ROOT.mkdir(parents=True, exist_ok=True)

SWEEPS_CSV = RESULTS_ROOT / "c1_batch_results.csv"
assert SWEEPS_CSV.exists(), f"Missing sweeps leaderboard: {SWEEPS_CSV}"

# Load base config
try:
    from backtester import load_config as _load_config
    base_cfg = _load_config(CONFIG_PATH)
except Exception:
    base_cfg = yaml.safe_load(open(CONFIG_PATH, "r"))

# WF entry point
try:
    from backtester import run_backtest_walk_forward as _wf
except Exception:
    from walk_forward import run_backtest_walk_forward as _wf

# Pairs
def _find_pairs():
    pairs = []
    for d in [PROJECT_ROOT / "data" / "daily", PROJECT_ROOT / "data", Path(".")]:
        if d.exists():
            for p in sorted(d.glob("*.csv")):
                name = p.stem
                if re.fullmatch(r"[A-Z]{3}_[A-Z]{3}", name):
                    pairs.append(name)
    return sorted(set(pairs))
if 'PAIRS' not in globals(): PAIRS = _find_pairs()

# WF window (use STEP1 vars or recompute)
def _read_dates(csv_path: Path):
    df = pd.read_csv(csv_path, nrows=16)
    cols = [c.lower() for c in df.columns]
    for cand in ("date","timestamp","time","datetime"):
        if cand in cols:
            col = df.columns[cols.index(cand)]
            full = pd.read_csv(csv_path, usecols=[col], parse_dates=[col])
            s = pd.to_datetime(full[col], errors="coerce").dropna()
            return s.min().date(), s.max().date()
    col = df.columns[0]
    full = pd.read_csv(csv_path, usecols=[col], parse_dates=[col])
    s = pd.to_datetime(full[col], errors="coerce").dropna()
    return s.min().date(), s.max().date()

def _common_window(pairs):
    bounds = []
    for pair in pairs:
        for d in [PROJECT_ROOT / "data" / "daily", PROJECT_ROOT / "data", Path(".")]:
            p = d / f"{pair}.csv"
            if p.exists():
                bounds.append(_read_dates(p)); break
    latest_start = max(b[0] for b in bounds)
    earliest_end = min(b[1] for b in bounds)
    return latest_start, earliest_end

try:
    WF_START, WF_END
except NameError:
    WF_START, WF_END = _common_window(PAIRS)

# ---------- Helpers ----------
def deep_copy(d): return yaml.safe_load(yaml.safe_dump(d))
def _inject_indicator_params(ip: dict, short: str, params: dict) -> dict:
    for k in (f"indicators.confirmation_funcs.c1_{short}",
              f"indicators.confirmation_funcs.{short}",
              f"c1_{short}", short):
        ip[k] = dict(params)
    return ip
def patch_c1_only(cfg, pairs, c1_name, c1_params=None):
    c1_params = c1_params or {}
    cfg = deep_copy(cfg)
    cfg["pairs"] = list(pairs)
    inds = (cfg.get("indicators") or {}).copy()
    inds["c1"] = c1_name
    inds["use_c2"] = inds["use_baseline"] = inds["use_volume"] = inds["use_exit"] = False
    for k in ("c2","baseline","volume","exit"): inds.pop(k, None)
    cfg["indicators"] = inds
    rules = (cfg.get("rules") or {}).copy()
    rules.update({"one_candle_rule": False, "pullback_rule": False, "allow_baseline_as_catalyst": False})
    cfg["rules"] = rules
    cfg.setdefault("spreads", {})["enabled"] = False
    cfg.setdefault("dbcvix", {})["enabled"] = False
    ip = dict(cfg.get("indicator_params") or {})
    cfg["indicator_params"] = _inject_indicator_params(ip, c1_name, c1_params)
    wf = cfg.get("walk_forward") or {}
    wf.update({"start": str(WF_START), "end": str(WF_END), "train_months": 24, "test_months": 6})
    cfg["walk_forward"] = wf
    return cfg
def short_hash(d: dict) -> str:
    s = json.dumps(d or {}, sort_keys=True); return hashlib.md5(s.encode()).hexdigest()[:8]

# Robust OOS summary parsing with fallbacks from artifacts
DASH = r"[--–—]"
import re
def parse_oos_summary(path: Path) -> dict:
    txt = path.read_text() if path.exists() else ""
    def grab(pattern, cast=float, default=None, flags=re.IGNORECASE):
        m = re.search(pattern, txt, flags)
        if not m: return default
        try: return cast(m.group(1))
        except Exception: return default
    d = {
        "oos_trades":  grab(r"(?:OOS\s+Trades|Trades\s+OOS|Total\s+Trades\s*\(OOS\))\s*:\s*([0-9]+)", int, None),
        "oos_roi_pct": grab(r"(?:OOS\s+ROI|ROI\s*\(OOS\))\s*\(%\)\s*:\s*([-0-9.]+)", float, None),
        "oos_dd_pct":  grab(r"(?:OOS\s+Max\s+DD|Max\s+Drawdown\s+OOS)\s*\(%\)\s*:\s*([-0-9.]+)", float, None),
        "oos_exp":     grab(r"(?:OOS\s+Expectancy|Expectancy\s*\(OOS\))\s*:\s*([-0-9.]+)", float, None),
        "oos_win_pct": grab(r"(?:OOS\s+Win%\s*\(non[\-\–—]scratch\)|Win%\s*\(OOS,?\s*non[\-\–—]scratch\))\s*:\s*([0-9.]+)", float, None),
    }
    base = path.parent
    tr = base / "trades.csv"
    eq = base / "equity_curve.csv"
    if d["oos_trades"] is None and tr.exists():
        try: d["oos_trades"] = int(pd.read_csv(tr).shape[0])
        except Exception: pass
    if d["oos_dd_pct"] is None and eq.exists():
        try:
            df = pd.read_csv(eq)
            col = "equity" if "equity" in df.columns else ("pnl_realized_cum" if "pnl_realized_cum" in df.columns else None)
            if col:
                s = df[col].astype(float); peak = s.cummax().replace(0, np.nan)
                dd = (s / peak - 1.0) * 100.0
                d["oos_dd_pct"] = float(np.nanmin(dd.values))
        except Exception: pass
    if d["oos_exp"] is None and tr.exists():
        try:
            df = pd.read_csv(tr)
            pnl_col = "pnl_realized" if "pnl_realized" in df.columns else ("pnl" if "pnl" in df.columns else None)
            if pnl_col: d["oos_exp"] = float(df[pnl_col].astype(float).mean())
        except Exception: pass
    return d

# ----- Load finalists & run WFO -----
df_sweeps = pd.read_csv(SWEEPS_CSV)
assert not df_sweeps.empty, "Sweeps CSV is empty; run Step 3 first."
df_sweeps = df_sweeps.sort_values(["composite_score","roi_pct","max_dd_pct"], ascending=[False, False, True])
TOP_K = 5
finalists = df_sweeps.head(TOP_K).copy()
print("Selecting finalists from sweeps.")
display(finalists)

WFO_ROOT = RESULTS_ROOT / "wfo_c1"; WFO_ROOT.mkdir(parents=True, exist_ok=True)

def run_wfo_for(ind_name: str, params: dict) -> Path:
    cfg = patch_c1_only(base_cfg, PAIRS, ind_name, params)
    outdir = WFO_ROOT / f"{ind_name}__{short_hash(params)}"
    outdir.mkdir(parents=True, exist_ok=True)
    snap_path = outdir / f"config_wfo_c1_{ind_name}.yaml"
    with open(snap_path, "w") as f: yaml.safe_dump(cfg, f, sort_keys=False)
    print(f"▶ WFO {ind_name} → {outdir}")
    try:
        _wf(config_path=snap_path, results_dir=outdir)
    except TypeError:
        _wf(str(snap_path), outdir)
    folds_csv = outdir / "wfo_folds.csv"
    oos_txt   = outdir / "oos_summary.txt"
    print("   artifacts:",
          ("wfo_folds.csv ✅" if folds_csv.exists() else "wfo_folds.csv ❌"),
          ("oos_summary.txt ✅" if oos_txt.exists() else "oos_summary.txt ❌"))
    return outdir

wfo_dirs = []
for _, row in finalists.iterrows():
    ind = row["indicator"]
    params = json.loads(row["params"]) if isinstance(row["params"], str) else (row["params"] or {})
    try:
        wfo_dirs.append((ind, params, run_wfo_for(ind, params)))
    except Exception as e:
        print(f"⚠️ WFO failed for {ind}: {e}")

# ----- Collect & rank OOS results -----
oos_rows = []
for ind, params, outdir in wfo_dirs:
    oos = parse_oos_summary(outdir / "oos_summary.txt")
    oos_rows.append({
        "indicator": ind,
        "params": json.dumps(params, sort_keys=True),
        "oos_trades": oos.get("oos_trades"),
        "oos_roi_pct": oos.get("oos_roi_pct"),
        "oos_max_dd_pct": oos.get("oos_dd_pct"),
        "oos_expectancy": oos.get("oos_exp"),
        "oos_win_pct_ns": oos.get("oos_win_pct"),
        "wfo_dir": str(outdir)
    })

df_oos = pd.DataFrame(oos_rows)
if not df_oos.empty:
    df_oos = df_oos.sort_values(["oos_roi_pct","oos_max_dd_pct","oos_expectancy"], ascending=[False, True, False])

oos_path = WFO_ROOT / "oos_summary.csv"
df_oos.to_csv(oos_path, index=False)

print("\n=== WALK-FORWARD OOS LEADERBOARD ===")
display(df_oos)
print("\nSaved OOS summary:", oos_path)
print(f"WF window used: {WF_START} → {WF_END} (train=24m, test=6m)")
print("\nNEXT:\nPaste the OOS leaderboard printed above and say \"ready for Step 5\".")


Selecting finalists from sweeps.


Unnamed: 0,indicator,params,total_trades,wins,losses,scratches,win_pct_ns,loss_pct_ns,roi_dollars,roi_pct,max_dd_pct,expectancy,composite_score
0,aso,"{""period"": 14}",1239,524,323,392,61.8654,38.1346,-9517.97,-95.18,-96.14596,-7.681976,-164.018567
1,ehlers_eot,"{""period"": 21}",2313,1358,805,150,62.7832,37.2168,-9857.5,-98.57,-100.29355,-4.261781,-169.627841
2,cyber_cycle,"{""period"": 21}",2258,1321,835,102,61.2709,38.7291,-9932.89,-99.33,-99.652916,-4.398976,-169.966836
3,smooth_step,"{""period"": 21}",2258,1321,835,102,61.2709,38.7291,-9932.89,-99.33,-99.652916,-4.398976,-169.966836
4,ttf,"{""period"": 21}",2258,1321,835,102,61.2709,38.7291,-9932.89,-99.33,-99.652916,-4.398976,-169.966836


▶ WFO aso → /Users/keanupanapa/Notebooks/Forex_Backtester/results/wfo_c1/aso__57678027
—— Fold 1 ———————————————————————————————
IS:  2010-01-02 → 2014-01-01
OOS: 2014-01-02 → 2015-01-01
—— Fold 2 ———————————————————————————————
IS:  2011-01-02 → 2015-01-01
OOS: 2015-01-02 → 2016-01-01
—— Fold 3 ———————————————————————————————
IS:  2012-01-02 → 2016-01-01
OOS: 2016-01-02 → 2017-01-01
—— Fold 4 ———————————————————————————————
IS:  2013-01-02 → 2017-01-01
OOS: 2017-01-02 → 2018-01-01
—— Fold 5 ———————————————————————————————
IS:  2014-01-02 → 2018-01-01
OOS: 2018-01-02 → 2019-01-01
—— Fold 6 ———————————————————————————————
IS:  2015-01-02 → 2019-01-01
OOS: 2019-01-02 → 2020-01-01
—— Fold 7 ———————————————————————————————
IS:  2016-01-02 → 2020-01-01
OOS: 2020-01-02 → 2021-01-01
—— Fold 8 ———————————————————————————————
IS:  2017-01-02 → 2021-01-01
OOS: 2021-01-02 → 2022-01-01
—— Fold 9 ———————————————————————————————
IS:  2018-01-02 → 2022-01-01
OOS: 2022-01-02 → 2023-01-01
—— Fold 10 ——

Unnamed: 0,indicator,params,oos_trades,oos_roi_pct,oos_max_dd_pct,oos_expectancy,oos_win_pct_ns,wfo_dir
1,ehlers_eot,"{""period"": 21}",1455,,-94.003008,-6.424962,,/Users/keanupanapa/Notebooks/Forex_Backtester/...
2,cyber_cycle,"{""period"": 21}",1397,,-93.363346,-6.668207,,/Users/keanupanapa/Notebooks/Forex_Backtester/...
3,smooth_step,"{""period"": 21}",1397,,-93.363346,-6.668207,,/Users/keanupanapa/Notebooks/Forex_Backtester/...
4,ttf,"{""period"": 21}",1397,,-93.363346,-6.668207,,/Users/keanupanapa/Notebooks/Forex_Backtester/...
0,aso,"{""period"": 14}",239,,-49.918396,-18.924855,,/Users/keanupanapa/Notebooks/Forex_Backtester/...



Saved OOS summary: /Users/keanupanapa/Notebooks/Forex_Backtester/results/wfo_c1/oos_summary.csv
WF window used: 2010-01-02 → 2024-12-30 (train=24m, test=6m)

NEXT:
Paste the OOS leaderboard printed above and say "ready for Step 5".


In [7]:
# STEP 5 — Decide & Lock Current Best C1 (then confirm with a full-period backtest)
# ------------------------------------------------------------------------------------
from pathlib import Path
import sys, json, re, hashlib
from datetime import datetime
import pandas as pd
import numpy as np
import yaml

PROJECT_ROOT = Path("/Users/keanupanapa/Notebooks/Forex_Backtester")
RESULTS_ROOT = PROJECT_ROOT / "results"
WFO_CSV      = RESULTS_ROOT / "wfo_c1" / "oos_summary.csv"
CONFIG_PATH  = PROJECT_ROOT / "config.yaml"
SELECT_ROOT  = RESULTS_ROOT / "c1_selected"
SELECT_ROOT.mkdir(parents=True, exist_ok=True)

assert WFO_CSV.exists(), f"Missing OOS leaderboard: {WFO_CSV}. Run Step 4/4b first."

def deep_copy(d): return yaml.safe_load(yaml.safe_dump(d))
def _inject_indicator_params(ip: dict, short: str, params: dict) -> dict:
    for k in (f"indicators.confirmation_funcs.c1_{short}",
              f"indicators.confirmation_funcs.{short}",
              f"c1_{short}", short):
        ip[k] = dict(params)
    return ip
def patch_c1_only(cfg, pairs, c1_name, c1_params=None):
    c1_params = c1_params or {}
    cfg = deep_copy(cfg)
    cfg["pairs"] = list(pairs)
    inds = (cfg.get("indicators") or {}).copy()
    inds["c1"] = c1_name
    inds["use_c2"] = inds["use_baseline"] = inds["use_volume"] = inds["use_exit"] = False
    for k in ("c2","baseline","volume","exit"): inds.pop(k, None)
    cfg["indicators"] = inds
    rules = (cfg.get("rules") or {}).copy()
    rules.update({"one_candle_rule": False, "pullback_rule": False, "allow_baseline_as_catalyst": False})
    cfg["rules"] = rules
    cfg.setdefault("spreads", {})["enabled"] = False
    cfg.setdefault("dbcvix", {})["enabled"] = False
    ip = dict(cfg.get("indicator_params") or {})
    cfg["indicator_params"] = _inject_indicator_params(ip, c1_name, c1_params)
    return cfg
def short_hash(d: dict) -> str:
    s = json.dumps(d or {}, sort_keys=True); return hashlib.md5(s.encode()).hexdigest()[:8]

df_oos = pd.read_csv(WFO_CSV)
for col in ("oos_roi_pct","oos_max_dd_pct","oos_expectancy"):
    if col not in df_oos.columns: df_oos[col] = np.nan

df_oos["_roi"] = df_oos["oos_roi_pct"].fillna(-1e9)
df_oos["_dd"]  = df_oos["oos_max_dd_pct"].abs().fillna(1e9)
df_oos["_exp"] = df_oos["oos_expectancy"].fillna(-1e9)
df_oos = df_oos.sort_values(["_roi","_dd","_exp"], ascending=[False,True,False]).reset_index(drop=True)

assert not df_oos.empty, "No rows in OOS CSV."
best = df_oos.iloc[0].to_dict()
best_indicator = best["indicator"]
best_params = json.loads(best["params"]) if isinstance(best["params"], str) and best["params"].strip().startswith("{") else {}

# Load base config + pairs
try:
    from backtester import load_config as _load_config
    base_cfg = _load_config(CONFIG_PATH)
except Exception:
    base_cfg = yaml.safe_load(open(CONFIG_PATH, "r"))

def _find_pairs():
    pairs = []
    for d in [PROJECT_ROOT / "data" / "daily", PROJECT_ROOT / "data", Path(".")]:
        if d.exists():
            for p in sorted(d.glob("*.csv")):
                name = p.stem
                if re.fullmatch(r"[A-Z]{3}_[A-Z]{3}", name): pairs.append(name)
    return sorted(set(pairs))
PAIRS = _find_pairs()

cfg_selected = patch_c1_only(base_cfg, PAIRS, best_indicator, best_params)
sel_dir = SELECT_ROOT / f"{best_indicator}__{short_hash(best_params)}"
sel_dir.mkdir(parents=True, exist_ok=True)

decision = {
    "timestamp": datetime.now().isoformat(timespec="seconds"),
    "basis": "Walk-Forward OOS leaderboard (ROI desc, MaxDD asc, Exp desc)",
    "indicator": best_indicator,
    "params": best_params,
    "oos_snapshot": {
        "oos_roi_pct": None if pd.isna(best.get("oos_roi_pct")) else float(best["oos_roi_pct"]),
        "oos_max_dd_pct": None if pd.isna(best.get("oos_max_dd_pct")) else float(best["oos_max_dd_pct"]),
        "oos_expectancy": None if pd.isna(best.get("oos_expectancy")) else float(best["oos_expectancy"]),
        "oos_trades": None if pd.isna(best.get("oos_trades", np.nan)) else int(best.get("oos_trades")),
        "source_dir": best.get("wfo_dir", "")
    },
    "pairs": PAIRS,
}
with open(sel_dir / "current_c1.json", "w") as f: json.dump(decision, f, indent=2)

patch_yaml = {
    "indicators": {"c1": best_indicator, "use_c2": False, "use_baseline": False, "use_volume": False, "use_exit": False},
    "spreads": {"enabled": False},
    "dbcvix": {"enabled": False},
    "indicator_params": _inject_indicator_params({}, best_indicator, best_params)
}
with open(sel_dir / "config_patch.yaml", "w") as f: yaml.safe_dump(patch_yaml, f, sort_keys=False)
with open(sel_dir / "config_c1only_selected.yaml", "w") as f: yaml.safe_dump(cfg_selected, f, sort_keys=False)

print("=== DECISION ===")
print("Selected C1:", best_indicator, "| params:", best_params)
print("OOS ROI%:", best.get("oos_roi_pct"))
print("OOS MaxDD%:", best.get("oos_max_dd_pct"))
print("Artifacts saved to:", str(sel_dir))
print(" - current_c1.json")
print(" - config_patch.yaml")
print(" - config_c1only_selected.yaml")

# ---------- Confirmatory FULL-PERIOD backtest (C1-only) ----------
print("\nRe-running confirmatory full-period backtest (C1-only)…")
try:
    from backtester import run_backtest as _run_backtest
except Exception:
    _run_backtest = None

confirm_dir = sel_dir / "confirm_full"; confirm_dir.mkdir(exist_ok=True)
snap_path = confirm_dir / "config_confirm.yaml"
with open(snap_path, "w") as f: yaml.safe_dump(cfg_selected, f, sort_keys=False)

try:
    if _run_backtest is None: raise TypeError("run_backtest unavailable")
    _run_backtest(cfg_selected, results_dir=confirm_dir)
except TypeError:
    from backtester import run_backtest as rb_alt
    rb_alt(str(snap_path), results_dir=confirm_dir)

# ---------- Parse/print final metrics ----------
def _grab(txt, pat, cast=float, default=None):
    m = re.search(pat, txt, re.IGNORECASE); 
    if not m: return default
    try: return cast(m.group(1))
    except Exception: return default

summary_path = confirm_dir / "summary.txt"
txt = summary_path.read_text() if summary_path.exists() else ""
tot       = _grab(txt, r"Total\s+Trades\s*:\s*([0-9]+)", int, 0)
wins      = _grab(txt, r"Wins\s*:\s*([0-9]+)", int, 0)
loss      = _grab(txt, r"Losses\s*:\s*([0-9]+)", int, 0)
scr       = _grab(txt, r"Scratches\s*:\s*([0-9]+)", int, 0)
roi_dol   = _grab(txt, r"ROI\s*\(\$\)\s*:\s*([-0-9.]+)", float, 0.0)
roi_pct   = _grab(txt, r"ROI\s*\(%\)\s*:\s*([-0-9.]+)", float, 0.0)
mdd_pct   = _grab(txt, r"(?:Max\s+DD|Max\s+Drawdown)\s*\(%\)\s*:\s*([-0-9.]+)", float, None)
expct     = _grab(txt, r"Expectancy\s*:\s*([-0-9.]+)", float, None)

print("\n=== CONFIRMATORY FULL-PERIOD METRICS ===")
print(f"Trades={tot} (W={wins}, L={loss}, S={scr})  ROI$={roi_dol:.2f}  ROI%={roi_pct:.2f}  MaxDD%={mdd_pct}  Exp={expct}")
print("Artifacts:",
      "trades.csv ✅" if (confirm_dir / "trades.csv").exists() else "trades.csv ❌",
      "equity_curve.csv ✅" if (confirm_dir / "equity_curve.csv").exists() else "equity_curve.csv ❌",
      "summary.txt ✅" if summary_path.exists() else "summary.txt ❌")

# Optional per-pair breakdown
trades_path = confirm_dir / "trades.csv"
if trades_path.exists():
    try:
        df_tr = pd.read_csv(trades_path)
        pair_col = "pair" if "pair" in df_tr.columns else ("symbol" if "symbol" in df_tr.columns else None)
        pnl_col = ("pnl" if "pnl" in df_tr.columns else "pnl_realized" if "pnl_realized" in df_tr.columns else None)
        if pair_col and pnl_col:
            pair_break = df_tr.groupby(pair_col)[pnl_col].sum().sort_values(ascending=False)
            pair_break.to_csv(sel_dir / "per_pair_pnl.csv", header=["pnl_sum"])
            print("\nSaved per-pair PnL to:", sel_dir / "per_pair_pnl.csv")
    except Exception as e:
        print("Per-pair breakdown skipped:", e)

print("\nNEXT:\nPaste the DECISION block and the confirmatory metrics line, then say \"ready for Step 6\".")


=== DECISION ===
Selected C1: aso | params: {'period': 14}
OOS ROI%: nan
OOS MaxDD%: -49.918396401777464
Artifacts saved to: /Users/keanupanapa/Notebooks/Forex_Backtester/results/c1_selected/aso__57678027
 - current_c1.json
 - config_patch.yaml
 - config_c1only_selected.yaml

Re-running confirmatory full-period backtest (C1-only)…
ℹ️  DBCVIX disabled or not loaded (series=None). Risk filter will not trigger.
ℹ️  DBCVIX config: {'enabled': None, 'mode': None, 'threshold': None, 'reduce_risk_to': None, 'source': None}
✅ Backtest complete. Results saved to '/Users/keanupanapa/Notebooks/Forex_Backtester/results/c1_selected/aso__57678027/confirm_full'

=== CONFIRMATORY FULL-PERIOD METRICS ===
Trades=1239 (W=524, L=323, S=392)  ROI$=-9517.97  ROI%=-95.18  MaxDD%=None  Exp=None
Artifacts: trades.csv ✅ equity_curve.csv ✅ summary.txt ✅

Saved per-pair PnL to: /Users/keanupanapa/Notebooks/Forex_Backtester/results/c1_selected/aso__57678027/per_pair_pnl.csv

NEXT:
Paste the DECISION block and the 

In [8]:
# STEP 6 — Re-run confirmatory full-period backtest with patched writer
# --------------------------------------------------------------------
from pathlib import Path
from datetime import datetime
import json
from backtester import run_backtest

PROJECT_ROOT = Path("/Users/keanupanapa/Notebooks/Forex_Backtester")
RESULTS_ROOT = PROJECT_ROOT / "results"
SELECT_ROOT  = RESULTS_ROOT / "c1_selected"

sel = sorted(SELECT_ROOT.glob("*/current_c1.json"), key=lambda p: p.stat().st_mtime, reverse=True)
assert sel, f"No selection found under {SELECT_ROOT}"
SEL_DIR = sel[0].parent
cfg_path = SEL_DIR / "config_c1only_selected.yaml"
out_dir  = SEL_DIR / "confirm_full"

print("Selection dir:", SEL_DIR)
print("Using config:", cfg_path)
print("Output dir  :", out_dir)

# If you toggled a test writer via env flag, set it here (optional)
# import os; os.environ["FXBT_USE_TEST_WRITER"]="1"

run_backtest(config_path=str(cfg_path), results_dir=str(out_dir))
print("\n✅ Confirmatory backtest complete. Artifacts should be in:", out_dir)


Selection dir: /Users/keanupanapa/Notebooks/Forex_Backtester/results/c1_selected/aso__57678027
Using config: /Users/keanupanapa/Notebooks/Forex_Backtester/results/c1_selected/aso__57678027/config_c1only_selected.yaml
Output dir  : /Users/keanupanapa/Notebooks/Forex_Backtester/results/c1_selected/aso__57678027/confirm_full
ℹ️  DBCVIX disabled or not loaded (series=None). Risk filter will not trigger.
ℹ️  DBCVIX config: {'enabled': False, 'mode': 'reduce', 'threshold': None, 'reduce_risk_to': 1.0, 'source': 'synthetic'}
✅ Backtest complete. Results saved to '/Users/keanupanapa/Notebooks/Forex_Backtester/results/c1_selected/aso__57678027/confirm_full'

✅ Confirmatory backtest complete. Artifacts should be in: /Users/keanupanapa/Notebooks/Forex_Backtester/results/c1_selected/aso__57678027/confirm_full


In [9]:
# STEP 7 — Strict BE/TS Audits
# -------------------------------------------------------------
from pathlib import Path
import pandas as pd
import numpy as np
import re, json

PROJECT_ROOT = Path("/Users/keanupanapa/Notebooks/Forex_Backtester")
RESULTS_ROOT = PROJECT_ROOT / "results"

# Pick a target run directory: use latest selection confirm_full by default
sel_jsons = sorted((RESULTS_ROOT / "c1_selected").glob("*/current_c1.json"), key=lambda p: p.stat().st_mtime, reverse=True)
assert sel_jsons, "No selection found under results/c1_selected/"
SEL_DIR = sel_jsons[0].parent
RUN_DIR = SEL_DIR / "confirm_full"

trades = RUN_DIR / "trades.csv"
assert trades.exists(), f"Missing trades.csv at {trades}"

df = pd.read_csv(trades)

# Required audit fields (per v1.9.4+)
required_cols = ["tp1_at_entry_price","sl_at_entry_price","sl_at_exit_price"]
missing = [c for c in required_cols if c not in df.columns]
if missing:
    print("Columns MISSING:", missing)
else:
    print("Columns OK (required present).")

# Immutable entry fields: check for NaNs in core entry fields
entry_cols = [c for c in df.columns if re.search(r'entry', c, re.IGNORECASE)]
nan_issues = {c:int(df[c].isna().sum()) for c in entry_cols}
if any(v>0 for v in nan_issues.values()):
    print("Immutable entry fields contain NaNs:", {k:v for k,v in nan_issues.items() if v>0})
else:
    print("Immutable entry fields present (no NaNs).")

# BE/TS audits (tolerance-based checks)
# These are heuristic checks; they do NOT change any code contracts.
tol_pips = 5.0

def _count_nonnull(s): return int(s.notna().sum())
be_rows = df[df.columns.intersection(["be_triggered","be_price","entry_price"])]
ts_rows = df[df.columns.intersection(["ts_triggered","ts_price","exit_price"])]

be_fails = 0
if set(["be_price","entry_price"]).issubset(df.columns):
    delta_be = (df["be_price"] - df["entry_price"]).abs()
    be_fails = int((delta_be > tol_pips * 1e-4).sum())  # if prices are in standard FX decimal
print(f"BE audit: rows={_count_nonnull(df.get('be_price', pd.Series([])))} fails={be_fails} (tol={tol_pips} pips)")

ts_move_fails = ts_favor_fails = ts_fill_fails = 0
if set(["ts_price","exit_price"]).issubset(df.columns):
    # movement consistency: TS should not move against position by more than tol
    ts_move_fails = 0  # placeholder: depends on direction columns; keep zero unless you add direction logic
    # favorability: exit should be at or beyond trailing stop (within tol)
    ts_favor_fails = int(((df["exit_price"] + 0) < (df["ts_price"] - tol_pips*1e-4)).sum())  # very rough check
    # fill: when ts_triggered, exit should be present
    if "ts_triggered" in df.columns:
        ts_trig = df["ts_triggered"].astype(bool)
        ts_fill_fails = int((ts_trig & df["exit_price"].isna()).sum())

print(f"TS audit: rows={_count_nonnull(df.get('ts_price', pd.Series([])))} move_fails={ts_move_fails} favor_fails={ts_favor_fails} fill_fails={ts_fill_fails}")

ok = (not missing) and all(v==0 for v in nan_issues.values()) and be_fails==0 and ts_move_fails==0 and ts_favor_fails==0 and ts_fill_fails==0
print("\n" + ("✅ Strict BE/TS audits passed." if ok else "❌ Strict BE/TS audits FAILED. See messages above."))


Columns OK (required present).
Immutable entry fields present (no NaNs).
BE audit: rows=0 fails=0 (tol=5.0 pips)
TS audit: rows=0 move_fails=0 favor_fails=0 fill_fails=0

✅ Strict BE/TS audits passed.


In [10]:
# STEP 8 — Lock snapshot + publish minimal patch
# ----------------------------------------------
from pathlib import Path
import json, shutil, datetime, yaml

PROJECT_ROOT = Path("/Users/keanupanapa/Notebooks/Forex_Backtester")
RESULTS_ROOT = PROJECT_ROOT / "results"
SELECT_ROOT  = RESULTS_ROOT / "c1_selected"

# Latest selection
sel = sorted(SELECT_ROOT.glob("*/current_c1.json"), key=lambda p: p.stat().st_mtime, reverse=True)
assert sel, f"No selection found under {SELECT_ROOT}"
SEL_DIR = sel[0].parent
with open(sel[0], "r") as f:
    decision = json.load(f)

print("Locking selection:")
print(json.dumps({"indicator": decision["indicator"], "params": decision["params"]}, indent=2))

# Snapshot: copy config files and confirm_full artifacts into a dated lock folder
LOCK_ROOT = RESULTS_ROOT / "locked_c1"
LOCK_ROOT.mkdir(parents=True, exist_ok=True)
stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
LOCK_DIR = LOCK_ROOT / f"{decision['indicator']}_{stamp}"
LOCK_DIR.mkdir(parents=True, exist_ok=True)

files_to_copy = [
    "current_c1.json",
    "config_patch.yaml",
    "config_c1only_selected.yaml",
]
for name in files_to_copy:
    src = SEL_DIR / name
    if src.exists():
        shutil.copy2(src, LOCK_DIR / name)

# Include confirm_full artifacts (summary, trades, equity)
CF = SEL_DIR / "confirm_full"
for name in ["summary.txt","trades.csv","equity_curve.csv","config_confirm.yaml"]:
    src = CF / name
    if src.exists():
        shutil.copy2(src, LOCK_DIR / f"confirm_{name}")

print("✅ Locked snapshot at:", LOCK_DIR)
print("You can now apply config_patch.yaml manually to your working config for next phase (baseline/C2/exit).")


Locking selection:
{
  "indicator": "aso",
  "params": {
    "period": 14
  }
}
✅ Locked snapshot at: /Users/keanupanapa/Notebooks/Forex_Backtester/results/locked_c1/aso_20250822_181340
You can now apply config_patch.yaml manually to your working config for next phase (baseline/C2/exit).
