In [12]:
# ======================================================================================
# Celda 00 v2.0.3 — Run Manifest + Paths + Canonical Schema
# Política institucional (FINAL):
#   - Por defecto SIEMPRE crea un run nuevo (NEW_RUN_DEFAULT).
#   - Solo reutiliza run si:
#       A) TREND_M5_RUN_ID está seteado (FORCED_RUN_ID)
#       B) TREND_M5_RESUME_LATEST=1 y existe _latest_run.txt (RESUME_LATEST)
#
# Nota:
#   - "outputs" NO es "cargar corridas anteriores": es el directorio de salida del run.
# ======================================================================================

from __future__ import annotations

import os
import json
import sys
import platform
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional

# ---------------------------
# Helpers
# ---------------------------
def _now_utc_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")

def _safe_mkdir(p: Path) -> None:
    p.mkdir(parents=True, exist_ok=True)

def _write_json(path: Path, obj: Dict[str, Any]) -> None:
    _safe_mkdir(path.parent)
    path.write_text(json.dumps(obj, indent=2, ensure_ascii=False), encoding="utf-8")

def _read_json(path: Path) -> Dict[str, Any]:
    return json.loads(path.read_text(encoding="utf-8"))

def _write_text(path: Path, text: str) -> None:
    _safe_mkdir(path.parent)
    path.write_text(text, encoding="utf-8")

def _read_text(path: Path) -> str:
    return path.read_text(encoding="utf-8").strip()

def _sha1(s: str) -> str:
    return hashlib.sha1(s.encode("utf-8")).hexdigest()

def _env(name: str, default: Optional[str] = None) -> Optional[str]:
    v = os.getenv(name)
    return v if v not in (None, "") else default

def _env_bool(name: str, default: bool = False) -> bool:
    v = os.getenv(name, "")
    if v is None or v.strip() == "":
        return default
    return v.strip().lower() in ("1", "true", "yes", "y")

# ---------------------------
# Detectar PROJECT_ROOT (repo raíz) — determinístico
# ---------------------------
def _detect_project_root() -> Path:
    forced = _env("TREND_M5_ROOT") or _env("MT5_PROJECT_ROOT") or _env("MT5_DE_PROJECT_ROOT")
    if forced:
        return Path(forced).resolve()

    start = Path.cwd().resolve()
    target = "mt5_data_extraction"
    for p in [start] + list(start.parents):
        if p.name.lower() == target:
            return p
    # fallback: cwd (controlado)
    return start

PROJECT_ROOT = _detect_project_root()

# ---------------------------
# OUTPUTS_ROOT (salida del strategy notebook)
# ---------------------------
WORKDIR = Path.cwd().resolve()  # normalmente ...\ER_STRATEGY_LAB\notebooks
OUTPUTS_ROOT = Path(_env("TREND_M5_OUTPUTS_ROOT", str(WORKDIR / "outputs" / "trend_m5_strategy" / "v2"))).resolve()
LATEST_RUN_MARKER = OUTPUTS_ROOT / "_latest_run.txt"

# ---------------------------
# RUN_ID policy (FINAL)
# ---------------------------
FORCED_RUN_ID = (_env("TREND_M5_RUN_ID") or "").strip() or None
RESUME_LATEST = _env_bool("TREND_M5_RESUME_LATEST", default=False)

def _new_run_id() -> str:
    ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
    salt = _sha1(f"{ts}|{platform.node()}|{os.getpid()}")[:8]
    return f"{ts}_{salt}"

if FORCED_RUN_ID:
    RUN_MODE = "FORCED_RUN_ID"
    RUN_ID = FORCED_RUN_ID
elif RESUME_LATEST and LATEST_RUN_MARKER.exists():
    RUN_MODE = "RESUME_LATEST"
    RUN_ID = _read_text(LATEST_RUN_MARKER) or _new_run_id()
else:
    RUN_MODE = "NEW_RUN_DEFAULT"
    RUN_ID = _new_run_id()

RUN_DIR = OUTPUTS_ROOT / f"run_{RUN_ID}"
RUN_MANIFEST_PATH = RUN_DIR / "run_manifest_v2.json"
RUN_MANIFEST_LATEST_PATH = OUTPUTS_ROOT / "run_manifest_v2_latest.json"

# ---------------------------
# Versionado
# ---------------------------
SCHEMA_VERSION = "v2.0.3"
ENGINE_VERSION = "v2.0.3"
COST_MODEL_VERSION = "v2.0.3"
WFO_VERSION = "v2.0.3"

# ---------------------------
# Canonical Schema (contrato)
# ---------------------------
CANONICAL_SCHEMA = {
    "ohlcv_m5": {
        "required_columns": ["time_utc", "open", "high", "low", "close", "volume", "spread", "symbol"],
        "notes": "UTC. time_utc monotónico por símbolo. M5 = 300s."
    },
    "engine_trades": {
        "required_columns": [
            "symbol","fold_id","segment","side",
            "signal_time_utc","entry_time_utc","exit_time_utc",
            "entry_price","exit_price",
            "gross_pnl","net_pnl_base","net_pnl_stress",
            "hold_bars","exit_reason"
        ],
        "notes": "Mon–Fri se aplica sobre entry_time_utc (t+1)."
    }
}

# ---------------------------
# Artifacts (salidas del run)
# ---------------------------
def _build_artifacts(run_dir: Path) -> Dict[str, str]:
    return {
        "instrument_specs": str(run_dir / "instrument_specs_v2.parquet"),
        "instrument_specs_snapshot": str(run_dir / "instrument_specs_snapshot_v2.json"),

        "ohlcv_clean": str(run_dir / "ohlcv_clean_m5.parquet"),
        "data_qa_report": str(run_dir / "data_qa_report_v2.json"),

        "cost_model_snapshot": str(run_dir / "cost_model_snapshot_v2.json"),
        "wfo_folds": str(run_dir / "wfo_folds_v2.parquet"),
        "wfo_folds_snapshot": str(run_dir / "wfo_folds_snapshot_v2.json"),

        "features_m5": str(run_dir / "features_m5_v2.parquet"),
        "regime_params_by_fold": str(run_dir / "regime_params_by_fold_v2.parquet"),
        "regime_params_snapshot": str(run_dir / "regime_params_snapshot_v2.json"),

        "trades_baseline": str(run_dir / "trades_baseline_v2.parquet"),
        "summary_baseline": str(run_dir / "summary_baseline_v2.parquet"),

        "alpha_multi_horizon_report": str(run_dir / "alpha_multi_horizon_report_v2.parquet"),
        "alpha_multi_horizon_snapshot": str(run_dir / "alpha_multi_horizon_snapshot_v2.json"),

        "trades_engine": str(run_dir / "trades_engine_v2.parquet"),
        "summary_engine": str(run_dir / "summary_engine_v2.parquet"),
        "equity_engine": str(run_dir / "equity_curve_engine_v2.parquet"),
        "engine_qa_report": str(run_dir / "engine_qa_report_v2.json"),
        "engine_report_snapshot": str(run_dir / "engine_report_snapshot_v2.json"),
    }

def _build_manifest() -> Dict[str, Any]:
    return {
        "schema_version": SCHEMA_VERSION,
        "engine_version": ENGINE_VERSION,
        "cost_model_version": COST_MODEL_VERSION,
        "wfo_version": WFO_VERSION,
        "run_mode": RUN_MODE,
        "run_id": RUN_ID,
        "created_utc": _now_utc_iso(),
        "project_root": str(PROJECT_ROOT),
        "workdir": str(WORKDIR),
        "outputs_root": str(OUTPUTS_ROOT),
        "run_dir": str(RUN_DIR),
        "artifacts": _build_artifacts(RUN_DIR),
        "canonical_schema": CANONICAL_SCHEMA,
        "runtime": {
            "python": sys.version.replace("\n", " "),
            "platform": platform.platform(),
            "node": platform.node(),
            "pid": os.getpid(),
        },
    }

# ---------------------------
# Guardado (solo carga manifest si el modo es RESUME/forced y existe)
# ---------------------------
_safe_mkdir(RUN_DIR)
_safe_mkdir(OUTPUTS_ROOT)

manifest: Dict[str, Any]
if RUN_MANIFEST_PATH.exists() and RUN_MODE in ("RESUME_LATEST", "FORCED_RUN_ID"):
    manifest = _read_json(RUN_MANIFEST_PATH)
    # normaliza/bump versiones
    manifest["schema_version"] = SCHEMA_VERSION
    manifest["engine_version"] = ENGINE_VERSION
    manifest["cost_model_version"] = COST_MODEL_VERSION
    manifest["wfo_version"] = WFO_VERSION
    manifest["run_mode"] = RUN_MODE
    manifest["project_root"] = str(PROJECT_ROOT)
    manifest["workdir"] = str(WORKDIR)
    manifest["outputs_root"] = str(OUTPUTS_ROOT)
    manifest["canonical_schema"] = CANONICAL_SCHEMA
    manifest["artifacts"] = _build_artifacts(Path(manifest.get("run_dir", str(RUN_DIR))))
    _write_json(RUN_MANIFEST_PATH, manifest)
    print(f"[Celda 00 v2.0.3] Manifest CARGADO (resume/forced) y normalizado: {RUN_MANIFEST_PATH}")
else:
    manifest = _build_manifest()
    _write_json(RUN_MANIFEST_PATH, manifest)
    print(f"[Celda 00 v2.0.3] Manifest CREADO (nuevo run): {RUN_MANIFEST_PATH}")

# latest pointers
_write_text(LATEST_RUN_MARKER, RUN_ID)
_write_json(RUN_MANIFEST_LATEST_PATH, manifest)

# RUN object (downstream)
RUN: Dict[str, Any] = {
    "RUN_ID": manifest["run_id"],
    "RUN_MODE": manifest["run_mode"],
    "RUN_DIR": Path(manifest["run_dir"]),
    "PROJECT_ROOT": Path(manifest["project_root"]),
    "WORKDIR": Path(manifest["workdir"]),
    "OUTPUTS_ROOT": Path(manifest["outputs_root"]),
    "ARTIFACTS": {k: Path(v) for k, v in manifest["artifacts"].items()},
    "SCHEMA_VERSION": manifest["schema_version"],
    "ENGINE_VERSION": manifest["engine_version"],
    "CANONICAL_SCHEMA": manifest["canonical_schema"],
}

# ---------------------------
# PRINTS exhaustivos
# ---------------------------
print("\n--- Celda 00 v2.0.3 | Estado final ---")
print("RUN_MODE             :", RUN["RUN_MODE"])
print("PROJECT_ROOT         :", RUN["PROJECT_ROOT"])
print("WORKDIR              :", RUN["WORKDIR"])
print("OUTPUTS_ROOT         :", RUN["OUTPUTS_ROOT"])
print("RUN_ID               :", RUN["RUN_ID"])
print("RUN_DIR              :", RUN["RUN_DIR"])
print("RUN_MANIFEST_PATH    :", RUN_MANIFEST_PATH)
print("RUN_MANIFEST_LATEST  :", RUN_MANIFEST_LATEST_PATH)
print("LATEST_RUN_MARKER    :", LATEST_RUN_MARKER)
print("SCHEMA_VERSION       :", RUN["SCHEMA_VERSION"])
print("ENGINE_VERSION       :", RUN["ENGINE_VERSION"])

keys = sorted(RUN["ARTIFACTS"].keys())
print("\n--- ARTIFACTS keys ---")
print("N_KEYS:", len(keys))
print(keys)

critical = ["instrument_specs","instrument_specs_snapshot","ohlcv_clean","data_qa_report"]
missing_critical = [k for k in critical if k not in RUN["ARTIFACTS"]]
if missing_critical:
    raise RuntimeError(f"[Celda 00 v2.0.3] ERROR: faltan artifacts críticos: {missing_critical}")

print("\n--- Dependencias ---")
import polars as pl
print("polars:", pl.__version__)
try:
    import pandas as pd
    print("pandas:", pd.__version__)
except Exception as e:
    print("pandas: no disponible:", e)

print("\n[Celda 00 v2.0.3] OK — NEW_RUN por defecto + RUN listo.")


[Celda 00 v2.0.3] Manifest CREADO (nuevo run): C:\Quant\MT5_Data_Extraction\ER_STRATEGY_LAB\notebooks\outputs\trend_m5_strategy\v2\run_20251223_164647_172c5b29\run_manifest_v2.json

--- Celda 00 v2.0.3 | Estado final ---
RUN_MODE             : NEW_RUN_DEFAULT
PROJECT_ROOT         : C:\Quant\MT5_Data_Extraction
WORKDIR              : C:\Quant\MT5_Data_Extraction\ER_STRATEGY_LAB\notebooks
OUTPUTS_ROOT         : C:\Quant\MT5_Data_Extraction\ER_STRATEGY_LAB\notebooks\outputs\trend_m5_strategy\v2
RUN_ID               : 20251223_164647_172c5b29
RUN_DIR              : C:\Quant\MT5_Data_Extraction\ER_STRATEGY_LAB\notebooks\outputs\trend_m5_strategy\v2\run_20251223_164647_172c5b29
RUN_MANIFEST_PATH    : C:\Quant\MT5_Data_Extraction\ER_STRATEGY_LAB\notebooks\outputs\trend_m5_strategy\v2\run_20251223_164647_172c5b29\run_manifest_v2.json
RUN_MANIFEST_LATEST  : C:\Quant\MT5_Data_Extraction\ER_STRATEGY_LAB\notebooks\outputs\trend_m5_strategy\v2\run_manifest_v2_latest.json
LATEST_RUN_MARKER    : C:\Q

In [13]:
# ======================================================================================
# Celda 01 v2.0.3 — Universe & Instrument Specs (por símbolo)
# Propósito:
#   - Crear instrument_specs en la ruta EXACTA del RUN actual.
#   - Prints completos: paths, keys, preview, tamaños.
#
# Inputs:
#   - RUN (Celda 00 v2.0.3)
#
# Outputs:
#   - RUN["ARTIFACTS"]["instrument_specs"]
#   - RUN["ARTIFACTS"]["instrument_specs_snapshot"]
# ======================================================================================

from __future__ import annotations

import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional

import polars as pl

if "RUN" not in globals():
    raise RuntimeError("[Celda 01 v2.0.3] ERROR: No existe RUN. Ejecuta Celda 00 v2.0.3 primero.")

ARTIFACTS: Dict[str, Path] = RUN["ARTIFACTS"]

print("\n--- Celda 01 v2.0.3 | Preflight ---")
print("RUN_ID   :", RUN["RUN_ID"])
print("RUN_MODE :", RUN["RUN_MODE"])
print("RUN_DIR  :", RUN["RUN_DIR"])
print("ARTIFACTS keys:", sorted(list(ARTIFACTS.keys())))

for k in ["instrument_specs", "instrument_specs_snapshot"]:
    if k not in ARTIFACTS:
        raise RuntimeError(f"[Celda 01 v2.0.3] ERROR: falta clave '{k}' en RUN['ARTIFACTS'].")

OUT_SPECS_PARQUET = ARTIFACTS["instrument_specs"]
OUT_SPECS_SNAPSHOT = ARTIFACTS["instrument_specs_snapshot"]

print("OUT_SPECS_PARQUET :", OUT_SPECS_PARQUET)
print("OUT_SPECS_SNAPSHOT:", OUT_SPECS_SNAPSHOT)

def _t(h: int, m: int = 0) -> str:
    return f"{h:02d}:{m:02d}"

def _session(weekdays_only: bool, windows_utc: Optional[List[Dict[str, str]]]) -> Dict[str, Any]:
    return {"weekdays_only": bool(weekdays_only), "windows_utc": windows_utc or []}

# Universo base (v2)
UNIVERSE = ["BNBUSD", "BTCUSD", "LVMH", "XAUAUD"]

# Costos (bps)
COSTS_BPS = {
    "BNBUSD": {"base_bps": 8.0, "stress_bps": 16.0},
    "BTCUSD": {"base_bps": 8.0, "stress_bps": 16.0},
    "LVMH":   {"base_bps": 12.0, "stress_bps": 25.0},
    "XAUAUD": {"base_bps": 4.0, "stress_bps": 8.0},
}

ASSET_CLASS = {
    "BNBUSD": "crypto",
    "BTCUSD": "crypto",
    "LVMH": "equity",
    "XAUAUD": "fx_metal",
}

WEEKEND_POLICY = {
    "BNBUSD": {"entry_weekdays_only": True, "flatten_before_weekend": False},
    "BTCUSD": {"entry_weekdays_only": True, "flatten_before_weekend": False},
    "LVMH":   {"entry_weekdays_only": True, "flatten_before_weekend": True},
    "XAUAUD": {"entry_weekdays_only": True, "flatten_before_weekend": True},
}

SESSION_BY_SYMBOL = {
    "BNBUSD": _session(weekdays_only=WEEKEND_POLICY["BNBUSD"]["entry_weekdays_only"], windows_utc=[]),
    "BTCUSD": _session(weekdays_only=WEEKEND_POLICY["BTCUSD"]["entry_weekdays_only"], windows_utc=[]),
    "XAUAUD": _session(weekdays_only=WEEKEND_POLICY["XAUAUD"]["entry_weekdays_only"], windows_utc=[]),
    # Equity: ventana explícita (UTC) para no romper operatividad
    "LVMH": _session(weekdays_only=True, windows_utc=[{"start": _t(8, 0), "end": _t(16, 30)}]),
}

MICROSTRUCTURE = {s: {"tick_size_hint": None, "contract_hint": None} for s in UNIVERSE}

rows: List[Dict[str, Any]] = []
for sym in UNIVERSE:
    print(f"[Celda 01 v2.0.3] building row for {sym} ...")

    for req in (COSTS_BPS, ASSET_CLASS, WEEKEND_POLICY, SESSION_BY_SYMBOL):
        if sym not in req:
            raise RuntimeError(f"[Celda 01 v2.0.3] ERROR: specs incompletos para {sym}")

    wp = WEEKEND_POLICY[sym]
    sess = SESSION_BY_SYMBOL[sym]
    c = COSTS_BPS[sym]
    ms = MICROSTRUCTURE[sym]

    research_only = False
    research_reason = None
    if ASSET_CLASS[sym] == "equity" and not sess.get("windows_utc"):
        research_only = True
        research_reason = "Equity sin sesión definida (windows_utc vacío)."

    rows.append({
        "symbol": sym,
        "asset_class": ASSET_CLASS[sym],
        "base_cost_bps": float(c["base_bps"]),
        "stress_cost_bps": float(c["stress_bps"]),
        "entry_weekdays_only": bool(wp["entry_weekdays_only"]),
        "flatten_before_weekend": bool(wp["flatten_before_weekend"]),
        "session_weekdays_only": bool(sess["weekdays_only"]),
        "session_windows_utc_json": json.dumps(sess["windows_utc"], ensure_ascii=False),
        "research_only": bool(research_only),
        "research_reason": research_reason,
        "tick_size_hint": ms.get("tick_size_hint"),
        "contract_hint": ms.get("contract_hint"),
    })

specs = pl.DataFrame(rows)

print("\n--- Celda 01 v2.0.3 | Specs DF construido ---")
print("shape:", specs.shape)
print(specs)

# Gates duros
bad_costs = specs.filter(
    (pl.col("base_cost_bps") <= 0) |
    (pl.col("stress_cost_bps") <= 0) |
    (pl.col("stress_cost_bps") < pl.col("base_cost_bps"))
)
print("\n[Celda 01 v2.0.3] Gate costos inválidos rows:", bad_costs.height)
if bad_costs.height > 0:
    raise RuntimeError(f"[Celda 01 v2.0.3] ERROR: costos inválidos:\n{bad_costs}")

n_unique = specs.select(pl.col("symbol").n_unique()).item()
print("[Celda 01 v2.0.3] Gate unique symbols:", n_unique, "vs rows", specs.height)
if n_unique != specs.height:
    raise RuntimeError("[Celda 01 v2.0.3] ERROR: símbolos duplicados en instrument_specs.")

# Persistencia
OUT_SPECS_PARQUET.parent.mkdir(parents=True, exist_ok=True)
specs.write_parquet(OUT_SPECS_PARQUET)

snapshot = {
    "cell": "01 v2.0.3",
    "created_utc": datetime.now(timezone.utc).isoformat(timespec="seconds"),
    "run_id": RUN["RUN_ID"],
    "run_mode": RUN["RUN_MODE"],
    "run_dir": str(RUN["RUN_DIR"]),
    "rows": specs.to_dicts(),
}
OUT_SPECS_SNAPSHOT.write_text(json.dumps(snapshot, indent=2, ensure_ascii=False), encoding="utf-8")

print("\n[Celda 01 v2.0.3] OK — instrument_specs guardado:")
print("  parquet :", OUT_SPECS_PARQUET, "| exists:", OUT_SPECS_PARQUET.exists())
print("  snapshot:", OUT_SPECS_SNAPSHOT, "| exists:", OUT_SPECS_SNAPSHOT.exists())



--- Celda 01 v2.0.3 | Preflight ---
RUN_ID   : 20251223_164647_172c5b29
RUN_MODE : NEW_RUN_DEFAULT
RUN_DIR  : C:\Quant\MT5_Data_Extraction\ER_STRATEGY_LAB\notebooks\outputs\trend_m5_strategy\v2\run_20251223_164647_172c5b29
ARTIFACTS keys: ['alpha_multi_horizon_report', 'alpha_multi_horizon_snapshot', 'cost_model_snapshot', 'data_qa_report', 'engine_qa_report', 'engine_report_snapshot', 'equity_engine', 'features_m5', 'instrument_specs', 'instrument_specs_snapshot', 'ohlcv_clean', 'regime_params_by_fold', 'regime_params_snapshot', 'summary_baseline', 'summary_engine', 'trades_baseline', 'trades_engine', 'wfo_folds', 'wfo_folds_snapshot']
OUT_SPECS_PARQUET : C:\Quant\MT5_Data_Extraction\ER_STRATEGY_LAB\notebooks\outputs\trend_m5_strategy\v2\run_20251223_164647_172c5b29\instrument_specs_v2.parquet
OUT_SPECS_SNAPSHOT: C:\Quant\MT5_Data_Extraction\ER_STRATEGY_LAB\notebooks\outputs\trend_m5_strategy\v2\run_20251223_164647_172c5b29\instrument_specs_snapshot_v2.json
[Celda 01 v2.0.3] building

In [14]:
# ======================================================================================
# Celda 02 v2.0.4 — Load M5 (m5_clean) + Canonicalize + QA (AUTO-RUTAS estilo v1)
# Propósito:
#   - Construir ohlcv_clean_m5.parquet (schema canónico) desde tu M5 limpio REAL (v1).
#   - QA mínimo institucional: dedup, monotonic, gaps total + intraday, share_300s.
#
# Inputs:
#   - RUN (Celda 00 v2.0.3)
#   - instrument_specs (Celda 01)
#
# Política de rutas (FINAL):
#   - Si defines TREND_M5_M5_CLEAN_DIR -> usa esa (prioridad absoluta).
#   - Si no, autodetecta en candidatos reales (como v1):
#       * <PROJECT_ROOT>/data/historical_data/m5_clean
#       * <PROJECT_ROOT>/data/rates_5m
#       * <PROJECT_ROOT>/data/historical_data/rates_5m
#       * <PROJECT_ROOT>/data/bulk_data/m5_raw   (último fallback)
#
# Outputs:
#   - RUN["ARTIFACTS"]["ohlcv_clean"]
#   - RUN["ARTIFACTS"]["data_qa_report"]
# ======================================================================================

from __future__ import annotations

import os
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import itertools

import polars as pl

if "RUN" not in globals():
    raise RuntimeError("[Celda 02 v2.0.4] ERROR: No existe RUN. Ejecuta Celda 00 v2.0.3 primero.")

ARTIFACTS: Dict[str, Path] = RUN["ARTIFACTS"]
PROJECT_ROOT: Path = RUN["PROJECT_ROOT"]
RUN_DIR: Path = RUN["RUN_DIR"]

SPECS_PATH = ARTIFACTS["instrument_specs"]
OUT_OHLCV = ARTIFACTS["ohlcv_clean"]
OUT_QA = ARTIFACTS["data_qa_report"]

print("\n--- Celda 02 v2.0.4 | Preflight ---")
print("RUN_ID     :", RUN["RUN_ID"])
print("RUN_MODE   :", RUN.get("RUN_MODE"))
print("PROJECT_ROOT:", PROJECT_ROOT)
print("RUN_DIR    :", RUN_DIR)
print("SPECS_PATH :", SPECS_PATH)
print("OUT_OHLCV  :", OUT_OHLCV)
print("OUT_QA     :", OUT_QA)

if not SPECS_PATH.exists():
    raise RuntimeError(f"[Celda 02 v2.0.4] ERROR: Falta instrument_specs: {SPECS_PATH}. Ejecuta Celda 01 primero.")

# -----------------------------
# Config / constants
# -----------------------------
EXPECTED_BAR_SECONDS = 300  # M5
FORCED_M5_DIR = os.getenv("TREND_M5_M5_CLEAN_DIR", "").strip()

# -----------------------------
# Helpers
# -----------------------------
TIME_CANDS = ["time_utc", "datetime", "timestamp", "time", "date"]
O_CANDS = ["open", "o"]
H_CANDS = ["high", "h"]
L_CANDS = ["low", "l"]
C_CANDS = ["close", "c"]
V_CANDS = ["volume", "vol", "tick_volume"]

def _pick_col(cols: List[str], cands: List[str]) -> Optional[str]:
    m = {c.lower(): c for c in cols}
    for x in cands:
        if x.lower() in m:
            return m[x.lower()]
    return None

def _count_parquets_quick(p: Path, limit: int = 2000) -> int:
    if not p.exists():
        return 0
    it = p.rglob("*.parquet")
    return sum(1 for _ in itertools.islice(it, limit))

def _detect_m5_clean_dir() -> Tuple[Path, str]:
    """
    Retorna: (dir, mode)
      mode:
        - FORCED_ENV
        - AUTO_CANDIDATE
    """
    if FORCED_M5_DIR:
        d = Path(FORCED_M5_DIR).resolve()
        return d, "FORCED_ENV"

    # candidatos estilo v1
    candidates = [
        PROJECT_ROOT / "data" / "historical_data" / "m5_clean",
        PROJECT_ROOT / "data" / "rates_5m",
        PROJECT_ROOT / "data" / "historical_data" / "rates_5m",
        PROJECT_ROOT / "data" / "bulk_data" / "m5_raw",
    ]

    print("\n[Celda 02 v2.0.4] Candidatos M5 probados (exist/parquets):")
    best = None
    best_n = -1
    for c in candidates:
        n = _count_parquets_quick(c)
        print(f"  - {c} | exists={c.exists()} | parquet_count~={n}")
        if c.exists() and n > best_n:
            best = c
            best_n = n

    if best is None or not best.exists() or best_n <= 0:
        raise RuntimeError(
            "[Celda 02 v2.0.4] ERROR: No pude detectar el directorio real de datos M5.\n"
            "Solución: define TREND_M5_M5_CLEAN_DIR apuntando a tu carpeta m5_clean.\n"
            "Ejemplo (PowerShell):\n"
            "  $env:TREND_M5_M5_CLEAN_DIR = 'C:\\Quant\\MT5_Data_Extraction\\data\\historical_data\\m5_clean'\n"
        )
    return best, "AUTO_CANDIDATE"

def _coerce_time_expr(col: str, dtype: pl.DataType) -> pl.Expr:
    if dtype in (pl.Int64, pl.Int32, pl.UInt64, pl.UInt32):
        # epoch detect (best-effort)
        return pl.when(pl.col(col) > 10**17).then(pl.from_epoch(pl.col(col), time_unit="ns")) \
                 .when(pl.col(col) > 10**14).then(pl.from_epoch(pl.col(col), time_unit="us")) \
                 .when(pl.col(col) > 10**11).then(pl.from_epoch(pl.col(col), time_unit="ms")) \
                 .otherwise(pl.from_epoch(pl.col(col), time_unit="s")) \
                 .alias("time_utc")
    return pl.col(col).cast(pl.Datetime, strict=False).alias("time_utc")

def _gap_stats_total(times: pl.Series) -> Dict[str, Any]:
    if times.len() < 2:
        return {"gap_count": 0, "gap_rate": 0.0, "missing_bars_est": 0, "max_gap_seconds": 0}
    dt = times.diff().dt.total_seconds().drop_nulls()
    if dt.len() == 0:
        return {"gap_count": 0, "gap_rate": 0.0, "missing_bars_est": 0, "max_gap_seconds": 0}
    gap_count = int((dt > EXPECTED_BAR_SECONDS).sum())
    max_gap = int(dt.max() or 0)
    missing = ((dt // EXPECTED_BAR_SECONDS) - 1).clip_min(0)
    missing_est = int(missing.sum() or 0)
    gap_rate = float(gap_count / max(dt.len(), 1))
    return {"gap_count": gap_count, "gap_rate": gap_rate, "missing_bars_est": missing_est, "max_gap_seconds": max_gap}

def _gap_stats_intraday(df_times: pl.DataFrame) -> Dict[str, Any]:
    if df_times.height < 2:
        return {"gap_count": 0, "gap_rate": 0.0, "missing_bars_est": 0, "max_gap_seconds": 0, "share_300s": 1.0}
    tmp = (
        df_times
        .with_columns(pl.col("time_utc").dt.truncate("1d").alias("_day"))
        .select(((pl.col("time_utc").diff().dt.total_seconds()).over("_day")).alias("dt_sec"))
        .drop_nulls()
    )
    if tmp.height == 0:
        return {"gap_count": 0, "gap_rate": 0.0, "missing_bars_est": 0, "max_gap_seconds": 0, "share_300s": 1.0}
    dt = tmp["dt_sec"]
    gap_count = int((dt > EXPECTED_BAR_SECONDS).sum())
    max_gap = int(dt.max() or 0)
    missing = ((dt // EXPECTED_BAR_SECONDS) - 1).clip_min(0)
    missing_est = int(missing.sum() or 0)
    gap_rate = float(gap_count / max(dt.len(), 1))
    share_300s = float((dt == EXPECTED_BAR_SECONDS).sum() / max(dt.len(), 1))
    return {"gap_count": gap_count, "gap_rate": gap_rate, "missing_bars_est": missing_est, "max_gap_seconds": max_gap, "share_300s": share_300s}

def _expected_bars_data_driven(df_times: pl.DataFrame) -> int:
    if df_times.height == 0:
        return 0
    per_day = (
        df_times
        .with_columns(pl.col("time_utc").dt.truncate("1d").alias("_day"))
        .group_by("_day")
        .agg([pl.min("time_utc").alias("t0"), pl.max("time_utc").alias("t1")])
        .with_columns(((pl.col("t1") - pl.col("t0")).dt.total_seconds() // EXPECTED_BAR_SECONDS + 1).cast(pl.Int64).alias("exp"))
    )
    return int(per_day.select(pl.col("exp").sum()).item() or 0)

# -----------------------------
# Detect M5 dir
# -----------------------------
M5_DIR, M5_DIR_MODE = _detect_m5_clean_dir()
print("\n[Celda 02 v2.0.4] M5_DIR seleccionado:", M5_DIR)
print("[Celda 02 v2.0.4] M5_DIR_MODE       :", M5_DIR_MODE)

# -----------------------------
# Universe
# -----------------------------
specs = pl.read_parquet(SPECS_PATH)
universe = specs.select("symbol").to_series().to_list()
print("\n[Celda 02 v2.0.4] Universe:", universe)

# -----------------------------
# Detect layout: hive partition symbol=...
# -----------------------------
symbol_partitions = [d for d in M5_DIR.iterdir() if d.is_dir() and d.name.lower().startswith("symbol=")]
IS_HIVE = len(symbol_partitions) > 0
print("\n[Celda 02 v2.0.4] Layout detectado:")
print("  IS_HIVE(symbol=...):", IS_HIVE)
if IS_HIVE:
    print("  sample partitions:", [d.name for d in symbol_partitions[:5]])

# -----------------------------
# Cargar por símbolo + canonicalizar
# -----------------------------
required_cols = ["time_utc","open","high","low","close","volume","spread","symbol"]
dfs: List[pl.DataFrame] = []
qa_rows: List[Dict[str, Any]] = []

for sym in universe:
    print("\n" + "-"*100)
    print(f"[Celda 02 v2.0.4] Loading symbol={sym}")

    if IS_HIVE:
        sym_dir = M5_DIR / f"symbol={sym}"
        if not sym_dir.exists():
            raise RuntimeError(f"[Celda 02 v2.0.4] ERROR: No existe partición {sym_dir}")
        glob = str(sym_dir / "**" / "*.parquet")
        lf = pl.scan_parquet(glob)
    else:
        glob = str(M5_DIR / "**" / "*.parquet")
        lf = pl.scan_parquet(glob)

    schema = lf.collect_schema()
    cols = schema.names()
    print("[Celda 02 v2.0.4] Columns(sample):", cols[:20], ("..." if len(cols) > 20 else ""))

    tcol = _pick_col(cols, TIME_CANDS)
    ocol = _pick_col(cols, O_CANDS)
    hcol = _pick_col(cols, H_CANDS)
    lcol = _pick_col(cols, L_CANDS)
    ccol = _pick_col(cols, C_CANDS)
    vcol = _pick_col(cols, V_CANDS)

    if tcol is None or ocol is None or hcol is None or lcol is None or ccol is None:
        raise RuntimeError(
            f"[Celda 02 v2.0.4] ERROR: columnas OHLCV faltantes en {sym}. "
            f"time={tcol}, open={ocol}, high={hcol}, low={lcol}, close={ccol}. cols={cols}"
        )

    spread_col = next((c for c in cols if c.lower() == "spread"), None)
    sym_col = next((c for c in cols if c.lower() == "symbol"), None)

    # construir select canónico
    time_expr = _coerce_time_expr(tcol, schema[tcol])

    volume_expr = (pl.col(vcol).cast(pl.Float64).alias("volume")) if vcol else pl.lit(0.0).cast(pl.Float64).alias("volume")

    if spread_col:
        spread_expr = pl.col(spread_col).cast(pl.Float64).alias("spread")
    else:
        spread_expr = pl.lit(0.0).cast(pl.Float64).alias("spread")

    if IS_HIVE:
        # en hive particionado, puede no existir 'symbol' dentro del parquet
        sym_expr = (pl.col(sym_col).cast(pl.Utf8).alias("symbol")) if sym_col else pl.lit(sym).cast(pl.Utf8).alias("symbol")
    else:
        # si no es hive, debería existir symbol o filtramos por columna si existe
        sym_expr = (pl.col(sym_col).cast(pl.Utf8).alias("symbol")) if sym_col else pl.lit(sym).cast(pl.Utf8).alias("symbol")

    lf2 = lf.select([
        time_expr,
        pl.col(ocol).cast(pl.Float64).alias("open"),
        pl.col(hcol).cast(pl.Float64).alias("high"),
        pl.col(lcol).cast(pl.Float64).alias("low"),
        pl.col(ccol).cast(pl.Float64).alias("close"),
        volume_expr,
        spread_expr,
        sym_expr,
    ]).drop_nulls(["time_utc"])

    if (not IS_HIVE) and sym_col:
        lf2 = lf2.filter(pl.col("symbol") == sym)
    elif IS_HIVE:
        lf2 = lf2.with_columns(pl.lit(sym).alias("symbol"))

    df = lf2.collect()
    print("[Celda 02 v2.0.4] Loaded rows (raw):", df.height)

    # sort + dedup
    n_before = df.height
    df = df.sort("time_utc").unique(subset=["time_utc"], keep="last")
    n_after = df.height
    dup_removed = n_before - n_after
    print("[Celda 02 v2.0.4] After dedup rows:", n_after, "| dup_removed:", dup_removed)

    if n_after == 0:
        raise RuntimeError(f"[Celda 02 v2.0.4] ERROR: {sym} quedó vacío tras limpiar.")

    # monotonic sanity
    min_dt = df.select(pl.col("time_utc").diff().dt.total_seconds().min()).item()
    if min_dt is not None and float(min_dt) < 0:
        raise RuntimeError(f"[Celda 02 v2.0.4] ERROR: {sym} no es monotónico (dt_min={min_dt}).")

    times = df["time_utc"]
    start = times.min()
    end = times.max()

    total_gaps = _gap_stats_total(times)
    intraday_gaps = _gap_stats_intraday(df.select(["time_utc"]))

    expected_intraday = _expected_bars_data_driven(df.select(["time_utc"]))
    coverage_intraday_pct = float(n_after / expected_intraday * 100.0) if expected_intraday > 0 else 0.0

    print("[Celda 02 v2.0.4] start_utc:", start)
    print("[Celda 02 v2.0.4] end_utc  :", end)
    print("[Celda 02 v2.0.4] intraday share_300s:", intraday_gaps["share_300s"])
    print("[Celda 02 v2.0.4] coverage_intraday_pct:", f"{coverage_intraday_pct:.2f}%")
    print("[Celda 02 v2.0.4] gaps_total   :", total_gaps)
    print("[Celda 02 v2.0.4] gaps_intraday:", intraday_gaps)

    # Gate duro: intraday M5 consistente
    if float(intraday_gaps["share_300s"]) < 0.90:
        raise RuntimeError(
            f"[Celda 02 v2.0.4] ERROR: {sym} intraday share_300s={intraday_gaps['share_300s']:.3f} < 0.90. "
            "Tu dataset NO es M5 consistente intradía."
        )

    qa_rows.append({
        "symbol": sym,
        "m5_dir": str(M5_DIR),
        "layout_hive": bool(IS_HIVE),
        "rows": int(n_after),
        "dup_removed": int(dup_removed),
        "start_utc": str(start),
        "end_utc": str(end),
        "coverage_intraday_pct": float(coverage_intraday_pct),
        "gaps_total": total_gaps,
        "gaps_intraday": intraday_gaps,
    })

    dfs.append(df.select(required_cols))

# Concatenar todo (schema canónico)
ohlcv = pl.concat(dfs, how="vertical").sort(["symbol", "time_utc"])

missing_cols = [c for c in required_cols if c not in ohlcv.columns]
if missing_cols:
    raise RuntimeError(f"[Celda 02 v2.0.4] ERROR: dataset no canónico, faltan columnas: {missing_cols}")

# Persistir
OUT_OHLCV.parent.mkdir(parents=True, exist_ok=True)
ohlcv.write_parquet(OUT_OHLCV)

qa_report = {
    "cell": "02 v2.0.4",
    "created_utc": datetime.now(timezone.utc).isoformat(timespec="seconds"),
    "run_id": RUN["RUN_ID"],
    "run_dir": str(RUN_DIR),
    "project_root": str(PROJECT_ROOT),
    "m5_dir": str(M5_DIR),
    "m5_dir_mode": M5_DIR_MODE,
    "layout_hive": bool(IS_HIVE),
    "expected_bar_seconds": EXPECTED_BAR_SECONDS,
    "n_rows_total": int(ohlcv.height),
    "n_symbols": int(ohlcv.select(pl.col("symbol").n_unique()).item()),
    "per_symbol": qa_rows,
    "notes": [
        "QA intraday evita penalizar overnight/weekend.",
        "Gate duro: share_300s >= 0.90 para consistencia M5 intradía.",
        "Si no detecta M5_DIR, setea TREND_M5_M5_CLEAN_DIR explícitamente."
    ],
}
OUT_QA.write_text(json.dumps(qa_report, indent=2, ensure_ascii=False), encoding="utf-8")

print(f"\n[Celda 02 v2.0.4] OK — ohlcv_clean creado: {OUT_OHLCV} | exists: {OUT_OHLCV.exists()}")
print(f"[Celda 02 v2.0.4] OK — data_qa_report creado: {OUT_QA} | exists: {OUT_QA.exists()}")

print("\n--- Preview (head) ---")
print(ohlcv.head(5))

print("\n--- QA (resumen) ---")
print(pl.DataFrame(qa_rows).select(["symbol","rows","dup_removed","coverage_intraday_pct"]))



--- Celda 02 v2.0.4 | Preflight ---
RUN_ID     : 20251223_164647_172c5b29
RUN_MODE   : NEW_RUN_DEFAULT
PROJECT_ROOT: C:\Quant\MT5_Data_Extraction
RUN_DIR    : C:\Quant\MT5_Data_Extraction\ER_STRATEGY_LAB\notebooks\outputs\trend_m5_strategy\v2\run_20251223_164647_172c5b29
SPECS_PATH : C:\Quant\MT5_Data_Extraction\ER_STRATEGY_LAB\notebooks\outputs\trend_m5_strategy\v2\run_20251223_164647_172c5b29\instrument_specs_v2.parquet
OUT_OHLCV  : C:\Quant\MT5_Data_Extraction\ER_STRATEGY_LAB\notebooks\outputs\trend_m5_strategy\v2\run_20251223_164647_172c5b29\ohlcv_clean_m5.parquet
OUT_QA     : C:\Quant\MT5_Data_Extraction\ER_STRATEGY_LAB\notebooks\outputs\trend_m5_strategy\v2\run_20251223_164647_172c5b29\data_qa_report_v2.json

[Celda 02 v2.0.4] Candidatos M5 probados (exist/parquets):
  - C:\Quant\MT5_Data_Extraction\data\historical_data\m5_clean | exists=True | parquet_count~=2000
  - C:\Quant\MT5_Data_Extraction\data\rates_5m | exists=False | parquet_count~=0
  - C:\Quant\MT5_Data_Extraction\da

RuntimeError: [Celda 02 v2.0.4] ERROR: columnas OHLCV faltantes en BNBUSD. time=None, open=open, high=high, low=low, close=close. cols=['timestamp_utc', 'timestamp_gye', 'symbol', 'open', 'high', 'low', 'close', 'tick_volume', 'real_volume', 'spread_points', 'broker', 'server_tz']

In [None]:
# ======================================================================================
# Celda 03 v2.0.1 — Cost Model (base/stress + slippage proxy + gap proxy) [RETURNS POR SÍMBOLO OK]
# Propósito:
#   - Costos reproducibles net-of-costs:
#       * base_cost_bps / stress_cost_bps (instrument_specs)
#       * slippage proxy: spread (si existe) o proxy por volatilidad (abs-return) POR SÍMBOLO
#       * gap proxy (equity/fx) como add-on conservador
#   - Prints explícitos por símbolo (componentes y totales).
#
# Inputs:
#   - RUN (Celda 00)
#   - instrument_specs_v2.parquet (Celda 01)
#   - ohlcv_clean_m5.parquet (Celda 02)
#   - data_qa_report.json (Celda 02) [preferible v2.0.4]
#
# Outputs:
#   - cost_model_snapshot.json
#   - cost_model_v2.parquet
#
# ENV:
#   - TREND_M5_FORCE_REBUILD_COST_MODEL=1 => fuerza rebuild
# ======================================================================================

from __future__ import annotations

import os
import json
from pathlib import Path
from datetime import datetime, timezone
from typing import Any, Dict, List

import polars as pl

# -----------------------------
# Preflight
# -----------------------------
if "RUN" not in globals():
    raise RuntimeError("[Celda 03 v2.0.1] ERROR: No existe RUN en memoria. Ejecuta primero Celda 00 v2.0.")

RUN_DIR: Path = RUN["RUN_DIR"]
ARTIFACTS: Dict[str, Path] = RUN["ARTIFACTS"]

INSTRUMENT_SPECS_PATH = RUN_DIR / "instrument_specs_v2.parquet"
OHLCV_CLEAN_PATH = ARTIFACTS["ohlcv_clean"]
QA_REPORT_PATH = ARTIFACTS["data_qa_report"]

if not INSTRUMENT_SPECS_PATH.exists():
    raise RuntimeError("[Celda 03 v2.0.1] ERROR: Falta instrument_specs_v2.parquet. Ejecuta Celda 01 v2.0.")
if not OHLCV_CLEAN_PATH.exists():
    raise RuntimeError("[Celda 03 v2.0.1] ERROR: Falta ohlcv_clean_m5.parquet. Ejecuta Celda 02.")

OUT_COST_SNAPSHOT = ARTIFACTS["cost_model_snapshot"]
OUT_COST_TABLE = RUN_DIR / "cost_model_v2.parquet"

FORCE_REBUILD_COST = os.getenv("TREND_M5_FORCE_REBUILD_COST_MODEL", "").strip().lower() in ("1", "true", "yes")

def _now_utc_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")

# -----------------------------
# Cache
# -----------------------------
if OUT_COST_SNAPSHOT.exists() and OUT_COST_TABLE.exists() and (not FORCE_REBUILD_COST):
    print(f"[Celda 03 v2.0.1] Cache detectado. Usando cost model existente:\n  - {OUT_COST_SNAPSHOT}\n  - {OUT_COST_TABLE}")
    snap = json.loads(OUT_COST_SNAPSHOT.read_text(encoding="utf-8"))
    print("\n--- Cost Model Snapshot (resumen) ---")
    for r in snap.get("per_symbol", []):
        print(f"  {r['symbol']}: total_base_bps={r['total_base_bps']:.2f}, total_stress_bps={r['total_stress_bps']:.2f} "
              f"(slip_base={r['slippage_base_bps']:.2f}, slip_stress={r['slippage_stress_bps']:.2f}, gap_base={r['gap_base_bps']:.2f}, gap_stress={r['gap_stress_bps']:.2f})")
    print("\n[Celda 03 v2.0.1] OK — cost model listo.")
else:
    specs = pl.read_parquet(INSTRUMENT_SPECS_PATH)

    # QA flag (no bloquea, solo imprime)
    qa_session_aware = False
    if QA_REPORT_PATH.exists():
        try:
            qa = json.loads(QA_REPORT_PATH.read_text(encoding="utf-8"))
            qa_session_aware = bool(qa.get("cell", "").startswith("02 v2."))
        except Exception:
            qa_session_aware = False
    print(f"[Celda 03 v2.0.1] QA detected: {qa_session_aware} | QA path: {QA_REPORT_PATH}")

    # -----------------------------
    # Microstructure proxies (POR SÍMBOLO)
    # -----------------------------
    # Nota institucional:
    # - spread_bps proxy depende de que spread exista.
    # - abs_ret_bps proxy usa retornos close-to-close POR SÍMBOLO.
    # - Si el archivo no estuviera ordenado por symbol/time, esto debería fallar en QA previo. Aquí imprimimos sanity.
    sanity = (
        pl.scan_parquet(OHLCV_CLEAN_PATH)
        .select(["symbol", "time_utc"])
        .group_by("symbol")
        .agg([
            (pl.col("time_utc").diff().dt.total_seconds().min()).alias("min_dt_sec"),
            (pl.col("time_utc").diff().dt.total_seconds().max()).alias("max_dt_sec"),
            pl.len().alias("n_rows"),
        ])
        .collect()
        .sort("symbol")
    )
    print("\n--- Sanity order/spacing (time_utc diff stats) ---")
    print(sanity)
    bad_order = sanity.filter(pl.col("min_dt_sec") < 0)
    if bad_order.height > 0:
        raise RuntimeError(f"[Celda 03 v2.0.1] ERROR: time_utc no está ordenado (min_dt_sec<0) en: {bad_order.select('symbol').to_series().to_list()}")

    df_stats = (
        pl.scan_parquet(OHLCV_CLEAN_PATH)
        .select(["symbol", "time_utc", "close", "spread"])
        .with_columns([
            pl.col("close").shift(1).over("symbol").alias("close_prev"),
        ])
        .with_columns([
            pl.when(pl.col("close_prev").is_not_null() & (pl.col("close_prev") > 0))
              .then((pl.col("close") / pl.col("close_prev") - 1.0).abs())
              .otherwise(None)
              .alias("abs_ret"),
        ])
        .with_columns([
            (pl.col("abs_ret") * 10_000).alias("abs_ret_bps"),
            pl.when(pl.col("spread").is_not_null() & (pl.col("close") > 0))
              .then((pl.col("spread") / pl.col("close")) * 10_000)
              .otherwise(None)
              .alias("spread_bps"),
        ])
        .group_by("symbol")
        .agg([
            pl.len().alias("n_rows"),
            pl.col("spread_bps").drop_nulls().len().alias("n_spread_nonnull"),
            pl.col("spread_bps").median().alias("spread_med_bps"),
            pl.col("spread_bps").quantile(0.95, "nearest").alias("spread_p95_bps"),
            pl.col("abs_ret_bps").median().alias("vol_med_absret_bps"),
            pl.col("abs_ret_bps").quantile(0.95, "nearest").alias("vol_p95_absret_bps"),
        ])
        .collect()
        .sort("symbol")
    )

    print("\n--- Microstructure proxies (from OHLCV clean) ---")
    print(df_stats)

    # -----------------------------
    # Construir cost model por símbolo (con prints explícitos)
    # -----------------------------
    rows: List[Dict[str, Any]] = []

    for r in specs.to_dicts():
        sym = r["symbol"]
        asset_class = r["asset_class"]
        base_bps = float(r["base_cost_bps"])
        stress_bps = float(r["stress_cost_bps"])

        s = df_stats.filter(pl.col("symbol") == sym)
        if s.height != 1:
            raise RuntimeError(f"[Celda 03 v2.0.1] ERROR: no encuentro stats para {sym} en OHLCV clean.")
        srow = s.row(0, named=True)

        n_spread_nonnull = int(srow["n_spread_nonnull"])
        n_rows = int(srow["n_rows"])
        spread_med = srow["spread_med_bps"]
        spread_p95 = srow["spread_p95_bps"]
        vol_med = srow["vol_med_absret_bps"]
        vol_p95 = srow["vol_p95_absret_bps"]

        spread_usable = (n_spread_nonnull / max(n_rows, 1)) >= 0.10

        if spread_usable and (spread_med is not None):
            slip_base = float(max(spread_med, 0.0)) * 0.50
            slip_stress = float(max(spread_p95 or spread_med, 0.0)) * 0.75
            slip_method = "spread_bps_proxy"
        else:
            slip_base = float(max(vol_med or 0.0, 0.0)) * 0.10
            slip_stress = float(max(vol_p95 or vol_med or 0.0, 0.0)) * 0.15
            slip_method = "vol_absret_bps_proxy"

        if asset_class == "equity":
            gap_base = 2.0
            gap_stress = 6.0
        elif asset_class == "fx_metal":
            gap_base = 1.0
            gap_stress = 3.0
        else:
            gap_base = 0.0
            gap_stress = 0.0

        total_base = base_bps + slip_base + gap_base
        total_stress = stress_bps + slip_stress + gap_stress

        # Print explícito por símbolo
        print("\n" + "-" * 100)
        print(f"[Celda 03 v2.0.1] {sym} | asset_class={asset_class}")
        print(f"  base_cost_bps={base_bps:.2f} | stress_cost_bps={stress_bps:.2f}")
        print(f"  slippage_method={slip_method} | spread_coverage_pct={(n_spread_nonnull/max(n_rows,1))*100.0:.2f}%")
        print(f"  spread_med_bps={spread_med} | spread_p95_bps={spread_p95}")
        print(f"  vol_med_absret_bps={vol_med} | vol_p95_absret_bps={vol_p95}")
        print(f"  slippage_base_bps={slip_base:.3f} | slippage_stress_bps={slip_stress:.3f}")
        print(f"  gap_base_bps={gap_base:.2f} | gap_stress_bps={gap_stress:.2f}")
        print(f"  >>> TOTAL_BASE_BPS={total_base:.3f} | TOTAL_STRESS_BPS={total_stress:.3f}")

        rows.append({
            "symbol": sym,
            "asset_class": asset_class,
            "base_cost_bps": base_bps,
            "stress_cost_bps": stress_bps,
            "slippage_base_bps": slip_base,
            "slippage_stress_bps": slip_stress,
            "gap_base_bps": gap_base,
            "gap_stress_bps": gap_stress,
            "total_base_bps": total_base,
            "total_stress_bps": total_stress,
            "slippage_method": slip_method,
            "spread_med_bps": float(spread_med) if spread_med is not None else None,
            "spread_p95_bps": float(spread_p95) if spread_p95 is not None else None,
            "vol_med_absret_bps": float(vol_med) if vol_med is not None else None,
            "vol_p95_absret_bps": float(vol_p95) if vol_p95 is not None else None,
            "spread_coverage_pct": float(n_spread_nonnull / max(n_rows, 1) * 100.0),
        })

    cost_table = pl.DataFrame(rows).sort("symbol")

    # Gates
    bad = cost_table.filter(
        (pl.col("total_base_bps") <= 0) |
        (pl.col("total_stress_bps") <= 0) |
        (pl.col("total_stress_bps") < pl.col("total_base_bps"))
    )
    if bad.height > 0:
        raise RuntimeError(f"[Celda 03 v2.0.1] ERROR: cost model inválido:\n{bad}")

    OUT_COST_TABLE.parent.mkdir(parents=True, exist_ok=True)
    cost_table.write_parquet(OUT_COST_TABLE)

    snapshot = {
        "cell": "03 v2.0.1",
        "created_utc": _now_utc_iso(),
        "qa_detected": qa_session_aware,
        "notes": [
            "total_*_bps = base/stress (spec) + slippage proxy + gap proxy.",
            "slippage proxy: usa spread si existe; caso contrario, proxy por abs-return POR SÍMBOLO.",
            "gap proxy es add-on conservador; se refina más adelante si se requiere.",
        ],
        "per_symbol": cost_table.to_dicts(),
    }

    OUT_COST_SNAPSHOT.parent.mkdir(parents=True, exist_ok=True)
    OUT_COST_SNAPSHOT.write_text(json.dumps(snapshot, indent=2, ensure_ascii=False), encoding="utf-8")

    print(f"\n[Celda 03 v2.0.1] OK — cost model guardado:")
    print(f"  - {OUT_COST_SNAPSHOT}")
    print(f"  - {OUT_COST_TABLE}")

    print("\n--- Cost Model Table (v2.0.1) ---")
    print(cost_table)

    warn = cost_table.filter(pl.col("slippage_method") == "vol_absret_bps_proxy")
    if warn.height > 0:
        print("\n[Celda 03 v2.0.1] AVISO: spread no utilizable; slippage estimado por proxy de volatilidad:")
        print(warn.select(["symbol", "slippage_method", "spread_coverage_pct", "vol_med_absret_bps", "slippage_base_bps", "slippage_stress_bps", "total_base_bps", "total_stress_bps"]))

    print("\n[Celda 03 v2.0.1] OK — costos listos para net-of-costs en baseline/alpha/engine.")
 

[Celda 03 v2.0.1] Cache detectado. Usando cost model existente:
  - C:\Quant\MT5_Data_Extraction\artifacts\v2\run_20251222_161231_844455a0\cost_model_snapshot.json
  - C:\Quant\MT5_Data_Extraction\artifacts\v2\run_20251222_161231_844455a0\cost_model_v2.parquet

--- Cost Model Snapshot (resumen) ---
  BNBUSD: total_base_bps=8.78, total_stress_bps=21.59 (slip_base=0.78, slip_stress=5.59, gap_base=0.00, gap_stress=0.00)
  BTCUSD: total_base_bps=8.61, total_stress_bps=20.56 (slip_base=0.61, slip_stress=4.56, gap_base=0.00, gap_stress=0.00)
  LVMH: total_base_bps=14.75, total_stress_bps=35.64 (slip_base=0.75, slip_stress=4.64, gap_base=2.00, gap_stress=6.00)
  XAUAUD: total_base_bps=5.26, total_stress_bps=12.64 (slip_base=0.26, slip_stress=1.64, gap_base=1.00, gap_stress=3.00)

[Celda 03 v2.0.1] OK — cost model listo.


In [None]:
# ======================================================================================
# Celda 04 v2.0.1 — WFO Builder (≥6 folds + embargo/purge) [SESSION-AWARE GATES]
# Fix vs v2.0:
#   - Gate de tamaño ahora es por asset_class y además por "trading days" (más defendible).
#   - Evita bloquear equities session-only (LVMH) con thresholds 24/7.
# ======================================================================================

from __future__ import annotations

import os
import json
import calendar
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List, Tuple, Optional

import polars as pl

# -----------------------------
# Preflight
# -----------------------------
if "RUN" not in globals():
    raise RuntimeError("[Celda 04 v2.0.1] ERROR: No existe RUN en memoria. Ejecuta primero Celda 00 v2.0.")

RUN_DIR: Path = RUN["RUN_DIR"]
ARTIFACTS: Dict[str, Path] = RUN["ARTIFACTS"]

INSTRUMENT_SPECS_PATH = RUN_DIR / "instrument_specs_v2.parquet"
OHLCV_CLEAN_PATH = ARTIFACTS["ohlcv_clean"]
QA_REPORT_PATH = ARTIFACTS["data_qa_report"]

if not INSTRUMENT_SPECS_PATH.exists():
    raise RuntimeError("[Celda 04 v2.0.1] ERROR: Falta instrument_specs_v2.parquet. Ejecuta Celda 01 v2.0.")
if not OHLCV_CLEAN_PATH.exists():
    raise RuntimeError("[Celda 04 v2.0.1] ERROR: Falta ohlcv_clean_m5.parquet. Ejecuta Celda 02.")

OUT_WFO_FOLDS = ARTIFACTS["wfo_folds"]
OUT_WFO_SNAPSHOT = ARTIFACTS["wfo_folds_snapshot"]

FORCE_REBUILD = os.getenv("TREND_M5_FORCE_REBUILD_WFO", "1").strip().lower() in ("1", "true", "yes")  # default=1 aquí
WFO_MODE = os.getenv("TREND_M5_WFO_MODE", "expanding").strip().lower()
if WFO_MODE not in ("expanding", "rolling"):
    raise ValueError("[Celda 04 v2.0.1] ERROR: TREND_M5_WFO_MODE debe ser 'expanding' o 'rolling'.")

MIN_FOLDS = int(os.getenv("TREND_M5_MIN_FOLDS", "6"))
MAX_HOLD_BARS = int(os.getenv("TREND_M5_MAX_HOLD_BARS", "2016"))  # ~1 semana
EXPECTED_BAR_SECONDS = 300  # M5
EMBARGO = timedelta(seconds=MAX_HOLD_BARS * EXPECTED_BAR_SECONDS)

def _now_utc_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")

def add_months(dt: datetime, months: int) -> datetime:
    y = dt.year
    m = dt.month + months
    while m > 12:
        y += 1
        m -= 12
    while m < 1:
        y -= 1
        m += 12
    last_day = calendar.monthrange(y, m)[1]
    d = min(dt.day, last_day)
    return dt.replace(year=y, month=m, day=d)

@dataclass
class WFOConfig:
    is_months: int
    oos_months: int
    step_months: int

CANDIDATE_CONFIGS_BY_ASSET = {
    "crypto": [
        WFOConfig(18, 3, 3),
        WFOConfig(12, 3, 3),
        WFOConfig(12, 2, 2),
    ],
    "equity": [
        WFOConfig(18, 3, 3),  # se mantiene; el gate ahora es session-aware
        WFOConfig(12, 3, 3),
        WFOConfig(12, 2, 2),
    ],
    "fx_metal": [
        WFOConfig(18, 3, 3),
        WFOConfig(12, 3, 3),
        WFOConfig(12, 2, 2),
    ],
}

# -----------------------------
# Gates session-aware (por asset_class)
# -----------------------------
# Días son el gate principal (defendible). Barras son sanity.
GATES = {
    "crypto":   {"min_is_days": 365, "min_oos_days": 60, "min_is_bars": 70_000, "min_oos_bars": 20_000},
    "fx_metal": {"min_is_days": 365, "min_oos_days": 60, "min_is_bars": 80_000, "min_oos_bars": 12_000},
    "equity":   {"min_is_days": 250, "min_oos_days": 60, "min_is_bars": 35_000, "min_oos_bars": 6_000},
    "unknown":  {"min_is_days": 250, "min_oos_days": 60, "min_is_bars": 35_000, "min_oos_bars": 6_000},
}

# -----------------------------
# Construcción (sin cache por defecto)
# -----------------------------
specs = pl.read_parquet(INSTRUMENT_SPECS_PATH)
universe = specs.select("symbol").to_series().to_list()
spec_map = {r["symbol"]: r for r in specs.to_dicts()}

df_ranges = (
    pl.scan_parquet(OHLCV_CLEAN_PATH)
    .group_by("symbol")
    .agg([
        pl.min("time_utc").alias("start_utc"),
        pl.max("time_utc").alias("end_utc"),
        pl.len().alias("n_rows"),
    ])
    .collect()
    .sort("symbol")
)

print("[Celda 04 v2.0.1] Universe:", universe)
print("\n--- Data ranges (ohlcv_clean) ---")
print(df_ranges)

qa_cell = None
if QA_REPORT_PATH.exists():
    try:
        qa_cell = json.loads(QA_REPORT_PATH.read_text(encoding="utf-8")).get("cell")
    except Exception:
        qa_cell = None

# Precompute daily calendar por símbolo (más eficiente que re-scan por fold)
daily = (
    pl.scan_parquet(OHLCV_CLEAN_PATH)
    .select([
        pl.col("symbol"),
        pl.col("time_utc").dt.truncate("1d").alias("day"),
    ])
    .unique()
    .collect()
)

def count_bars(sym: str, t0: datetime, t1: datetime) -> int:
    return int(
        pl.scan_parquet(OHLCV_CLEAN_PATH)
        .filter((pl.col("symbol") == sym) & (pl.col("time_utc") >= t0) & (pl.col("time_utc") < t1))
        .select(pl.len())
        .collect()
        .item()
    )

def count_days(sym: str, t0: datetime, t1: datetime) -> int:
    return int(
        daily.filter((pl.col("symbol") == sym) & (pl.col("day") >= t0.replace(hour=0, minute=0, second=0, microsecond=0))
                     & (pl.col("day") <  t1.replace(hour=0, minute=0, second=0, microsecond=0)))
        .select(pl.len())
        .item()
    )

def possible_fold_count(start: datetime, end: datetime, cfg: WFOConfig, embargo: timedelta) -> int:
    is_start = start
    is_end = add_months(is_start, cfg.is_months)
    n = 0
    while True:
        oos_start = is_end + embargo
        oos_end = add_months(oos_start, cfg.oos_months)
        if oos_end > end:
            break
        n += 1
        is_end = add_months(is_end, cfg.step_months)
        if is_end >= end:
            break
    return n

all_folds_rows: List[Dict[str, Any]] = []
snapshot_per_symbol: List[Dict[str, Any]] = []

for sym in universe:
    r = df_ranges.filter(pl.col("symbol") == sym)
    start_dt = r.select("start_utc").item()
    end_dt = r.select("end_utc").item()
    n_rows = int(r.select("n_rows").item())

    asset_class = spec_map[sym].get("asset_class", "unknown")
    cfg_candidates = CANDIDATE_CONFIGS_BY_ASSET.get(asset_class, [WFOConfig(12, 3, 3)])

    chosen_cfg: Optional[WFOConfig] = None
    chosen_count = 0

    for cfg in cfg_candidates:
        cnt = possible_fold_count(start_dt, end_dt, cfg, EMBARGO)
        if cnt >= MIN_FOLDS:
            chosen_cfg = cfg
            chosen_count = cnt
            break
        if cnt > chosen_count:
            chosen_cfg = cfg
            chosen_count = cnt

    if chosen_cfg is None or chosen_count < 3:
        raise RuntimeError(f"[Celda 04 v2.0.1] ERROR: WFO indefendible para {sym}. folds_possible={chosen_count}")

    print("\n" + "-" * 100)
    print(f"[Celda 04 v2.0.1] {sym} | asset_class={asset_class}")
    print(f"  data: start={start_dt}  end={end_dt}  n_rows={n_rows:,}")
    print(f"  chosen_cfg: IS={chosen_cfg.is_months}m  OOS={chosen_cfg.oos_months}m  STEP={chosen_cfg.step_months}m")
    print(f"  embargo_days={EMBARGO.total_seconds()/86400.0:.2f} | folds_possible={chosen_count} | mode={WFO_MODE}")

    fold_id = 1
    is_start = start_dt
    is_end = add_months(is_start, chosen_cfg.is_months)

    while True:
        embargo_start = is_end
        embargo_end = is_end + EMBARGO
        oos_start = embargo_end
        oos_end = add_months(oos_start, chosen_cfg.oos_months)

        if oos_end > end_dt:
            break

        if WFO_MODE == "rolling":
            is_start_eff = add_months(is_start, (fold_id - 1) * chosen_cfg.step_months)
            is_end_eff = is_end
        else:
            is_start_eff = is_start
            is_end_eff = is_end

        is_bars = count_bars(sym, is_start_eff, is_end_eff)
        oos_bars = count_bars(sym, oos_start, oos_end)

        is_days = count_days(sym, is_start_eff, is_end_eff)
        oos_days = count_days(sym, oos_start, oos_end)

        all_folds_rows.append({
            "symbol": sym,
            "fold_id": int(fold_id),
            "asset_class": asset_class,
            "wfo_mode": WFO_MODE,
            "is_start_utc": is_start_eff,
            "is_end_utc": is_end_eff,
            "embargo_start_utc": embargo_start,
            "embargo_end_utc": embargo_end,
            "oos_start_utc": oos_start,
            "oos_end_utc": oos_end,
            "is_bars": int(is_bars),
            "oos_bars": int(oos_bars),
            "is_days": int(is_days),
            "oos_days": int(oos_days),
            "cfg_is_months": int(chosen_cfg.is_months),
            "cfg_oos_months": int(chosen_cfg.oos_months),
            "cfg_step_months": int(chosen_cfg.step_months),
            "embargo_bars": int(MAX_HOLD_BARS),
        })

        print(f"  fold={fold_id:02d} | IS bars={is_bars:,} days={is_days} | OOS bars={oos_bars:,} days={oos_days}")

        fold_id += 1
        is_end = add_months(is_end, chosen_cfg.step_months)
        if is_end >= end_dt:
            break

    snapshot_per_symbol.append({
        "symbol": sym,
        "asset_class": asset_class,
        "config": {"is_months": chosen_cfg.is_months, "oos_months": chosen_cfg.oos_months, "step_months": chosen_cfg.step_months},
        "wfo_mode": WFO_MODE,
        "n_folds": int(fold_id - 1),
        "embargo_bars": int(MAX_HOLD_BARS),
        "embargo_days": float(EMBARGO.total_seconds() / 86400.0),
        "data_start_utc": str(start_dt),
        "data_end_utc": str(end_dt),
        "n_rows": int(n_rows),
    })

wfo_df = pl.DataFrame(all_folds_rows).sort(["symbol", "fold_id"])

# Gate A: folds por símbolo
folds_by_sym = wfo_df.group_by("symbol").agg(pl.len().alias("n_folds")).sort("symbol")
print("\n--- Folds por símbolo ---")
print(folds_by_sym)

too_few = folds_by_sym.filter(pl.col("n_folds") < 3)
if too_few.height > 0:
    raise RuntimeError(f"[Celda 04 v2.0.1] ERROR: símbolos con <3 folds: {too_few}")

# Gate B: session-aware (por asset_class)
def gate_row(asset_class: str) -> Dict[str, int]:
    g = GATES.get(asset_class, GATES["unknown"])
    return {k: int(v) for k, v in g.items()}

bad_rows = []
for row in wfo_df.iter_rows(named=True):
    g = gate_row(row["asset_class"])
    if (row["is_days"] < g["min_is_days"]) or (row["oos_days"] < g["min_oos_days"]) or (row["is_bars"] < g["min_is_bars"]) or (row["oos_bars"] < g["min_oos_bars"]):
        bad_rows.append({
            "symbol": row["symbol"],
            "fold_id": row["fold_id"],
            "asset_class": row["asset_class"],
            "is_bars": row["is_bars"], "oos_bars": row["oos_bars"],
            "is_days": row["is_days"], "oos_days": row["oos_days"],
            **{f"gate_{k}": v for k, v in g.items()}
        })

if bad_rows:
    bad_df = pl.DataFrame(bad_rows).sort(["symbol", "fold_id"])
    print("\n[Celda 04 v2.0.1] Detalle folds que NO pasan gates session-aware (se detiene):")
    print(bad_df)
    raise RuntimeError(
        "[Celda 04 v2.0.1] ERROR: Hay folds que no cumplen mínimos day-aware y bar-aware por asset_class. "
        "Si esto pasa, tu data o tus ventanas son insuficientes para WFO defendible."
    )

# Gate C: no OOS overlap por símbolo
bad_overlap = []
for sym in wfo_df.select("symbol").unique().to_series().to_list():
    s = wfo_df.filter(pl.col("symbol") == sym).sort("fold_id")
    prev_end = None
    for row in s.iter_rows(named=True):
        if prev_end is not None and row["oos_start_utc"] < prev_end:
            bad_overlap.append((sym, row["fold_id"]))
        prev_end = row["oos_end_utc"]
if bad_overlap:
    raise RuntimeError(f"[Celda 04 v2.0.1] ERROR: OOS overlap detectado: {bad_overlap}")

# Persistir
OUT_WFO_FOLDS.parent.mkdir(parents=True, exist_ok=True)
wfo_df.write_parquet(OUT_WFO_FOLDS)

snapshot = {
    "cell": "04 v2.0.1",
    "created_utc": _now_utc_iso(),
    "qa_cell_detected": qa_cell,
    "wfo_mode": WFO_MODE,
    "min_folds_target": MIN_FOLDS,
    "max_hold_bars": MAX_HOLD_BARS,
    "embargo_days": float(EMBARGO.total_seconds() / 86400.0),
    "gates": GATES,
    "notes": [
        "Gates cambiados a day-aware + bar-aware por asset_class (session-aware).",
        "Esto evita bloquear equities session-only con umbrales 24/7.",
    ],
    "per_symbol": snapshot_per_symbol,
    "folds_path": str(OUT_WFO_FOLDS),
}

OUT_WFO_SNAPSHOT.parent.mkdir(parents=True, exist_ok=True)
OUT_WFO_SNAPSHOT.write_text(json.dumps(snapshot, indent=2, ensure_ascii=False), encoding="utf-8")

print(f"\n[Celda 04 v2.0.1] OK — WFO folds guardados:")
print(f"  - {OUT_WFO_FOLDS}")
print(f"  - {OUT_WFO_SNAPSHOT}")

print("\n--- WFO Folds (preview) ---")
print(wfo_df.head(12))

print("\n[Celda 04 v2.0.1] OK — Se permite avanzar a Celda 05.")
 

[Celda 04 v2.0.1] Universe: ['BNBUSD', 'BTCUSD', 'LVMH', 'XAUAUD']

--- Data ranges (ohlcv_clean) ---
shape: (4, 4)
┌────────┬─────────────────────┬─────────────────────┬────────┐
│ symbol ┆ start_utc           ┆ end_utc             ┆ n_rows │
│ ---    ┆ ---                 ┆ ---                 ┆ ---    │
│ str    ┆ datetime[ms]        ┆ datetime[ms]        ┆ u32    │
╞════════╪═════════════════════╪═════════════════════╪════════╡
│ BNBUSD ┆ 2021-11-19 00:00:00 ┆ 2025-12-02 05:50:00 ┆ 409320 │
│ BTCUSD ┆ 2021-11-19 00:00:00 ┆ 2025-12-02 23:50:00 ┆ 337024 │
│ LVMH   ┆ 2021-11-19 10:00:00 ┆ 2025-12-02 18:25:00 ┆ 104341 │
│ XAUAUD ┆ 2021-11-19 01:05:00 ┆ 2025-12-02 05:50:00 ┆ 283032 │
└────────┴─────────────────────┴─────────────────────┴────────┘

----------------------------------------------------------------------------------------------------
[Celda 04 v2.0.1] BNBUSD | asset_class=crypto
  data: start=2021-11-19 00:00:00  end=2025-12-02 05:50:00  n_rows=409,320
  chosen_cfg: IS=18m 

In [None]:
# ======================================================================================
# Celda 05 v2.0.3 — Feature Set (Causal): Trendiness + Direction [FIX nested windows]
# Fix vs v2.0.2:
#   - Elimina "window dentro de rolling/window": primero materializa columnas base (ret, true_range, abs_diff),
#     luego aplica rollings/EMAs usando pl.col("...") (sin anidar ventanas).
#   - Mantiene Lazy-friendly y sin ColumnNotFound.
# ======================================================================================

from __future__ import annotations

import os
import json
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict

import polars as pl

# -----------------------------
# Preflight
# -----------------------------
if "RUN" not in globals():
    raise RuntimeError("[Celda 05 v2.0.3] ERROR: No existe RUN en memoria. Ejecuta primero Celda 00 v2.0.")

RUN_DIR: Path = RUN["RUN_DIR"]
ARTIFACTS: Dict[str, Path] = RUN["ARTIFACTS"]

OHLCV_CLEAN_PATH = ARTIFACTS.get("ohlcv_clean", RUN_DIR / "ohlcv_clean_m5.parquet")
QA_REPORT_PATH = ARTIFACTS.get("data_qa_report", RUN_DIR / "data_qa_report.json")
WFO_FOLDS_PATH = ARTIFACTS.get("wfo_folds", RUN_DIR / "wfo_folds.parquet")

if not OHLCV_CLEAN_PATH.exists():
    raise RuntimeError("[Celda 05 v2.0.3] ERROR: Falta ohlcv_clean_m5.parquet. Ejecuta Celda 02.")
if not WFO_FOLDS_PATH.exists():
    raise RuntimeError("[Celda 05 v2.0.3] ERROR: Falta wfo_folds.parquet. Ejecuta Celda 04.")

OUT_FEATURES = RUN_DIR / "features_m5_v2.parquet"
OUT_SNAPSHOT = RUN_DIR / "features_snapshot_v2.json"

RUN["ARTIFACTS"]["features_m5"] = OUT_FEATURES
RUN["ARTIFACTS"]["features_snapshot"] = OUT_SNAPSHOT

FORCE_REBUILD = os.getenv("TREND_M5_FORCE_REBUILD_FEATURES", "").strip().lower() in ("1", "true", "yes")

def _now_utc_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")

# -----------------------------
# Parámetros
# -----------------------------
EXPECTED_BAR_SECONDS = 300

ER_WIN = int(os.getenv("TREND_M5_ER_WIN", "288"))
VOL_WIN = int(os.getenv("TREND_M5_VOL_WIN", "288"))
MOM_WIN = int(os.getenv("TREND_M5_MOM_WIN", "288"))
ATR_WIN = int(os.getenv("TREND_M5_ATR_WIN", "96"))

EMA_FAST = int(os.getenv("TREND_M5_EMA_FAST", "200"))
EMA_SLOW = int(os.getenv("TREND_M5_EMA_SLOW", "600"))
SLOPE_WIN = int(os.getenv("TREND_M5_SLOPE_WIN", "50"))

EPS = 1e-12

# -----------------------------
# Cache
# -----------------------------
if OUT_FEATURES.exists() and OUT_SNAPSHOT.exists() and (not FORCE_REBUILD):
    print(f"[Celda 05 v2.0.3] Cache detectado. Usando features existentes:\n  - {OUT_FEATURES}\n  - {OUT_SNAPSHOT}")
    snap = json.loads(OUT_SNAPSHOT.read_text(encoding="utf-8"))
    print("\n--- Features Snapshot (resumen) ---")
    print("  params:", snap.get("params", {}))
    print("  symbols:", snap.get("symbols", []))
    print("  schema_cols(sample):", snap.get("schema_cols", [])[:20], "...")
    print("\n[Celda 05 v2.0.3] OK — features listos.")
else:
    lf0 = (
        pl.scan_parquet(OHLCV_CLEAN_PATH)
        .select(["symbol", "time_utc", "open", "high", "low", "close", "volume", "spread"])
        .sort(["symbol", "time_utc"])
    )

    cols = lf0.collect_schema().names()
    required = ["symbol", "time_utc", "open", "high", "low", "close"]
    missing = [c for c in required if c not in cols]
    if missing:
        raise RuntimeError(f"[Celda 05 v2.0.3] ERROR: faltan columnas en ohlcv_clean: {missing}")

    # ============================================================
    # Stage 1: columnas base (permitido usar .over aquí)
    #   - IMPORTANT: no usar estas expresiones dentro de rolling posteriormente.
    # ============================================================
    close_prev = pl.col("close").shift(1).over("symbol")

    ret_expr = (
        pl.when(close_prev.is_not_null() & (close_prev > 0))
        .then(pl.col("close") / close_prev - 1.0)
        .otherwise(None)
    )

    # abs_diff por símbolo (evita diff().over, y evita nested windows)
    abs_diff_expr = (pl.col("close") - pl.col("close").shift(1).over("symbol")).abs()

    # true range base (por símbolo)
    tr_expr = pl.max_horizontal([
        (pl.col("high") - pl.col("low")),
        (pl.col("high") - pl.col("close").shift(1).over("symbol")).abs(),
        (pl.col("low")  - pl.col("close").shift(1).over("symbol")).abs(),
    ])

    lf1 = (
        lf0.with_columns([
            close_prev.alias("close_prev"),
            ret_expr.alias("ret"),
            pl.col("ret").abs().alias("abs_ret"),
            abs_diff_expr.alias("abs_diff"),
            tr_expr.alias("true_range"),
        ])
    )

    # ============================================================
    # Stage 2: rollings sobre columnas materializadas (NO nested windows)
    # ============================================================
    lf2 = (
        lf1.with_columns([
            (pl.col("ret").rolling_std(window_size=VOL_WIN, min_samples=VOL_WIN).over("symbol") * 10_000)
                .alias(f"vol_bps_{VOL_WIN}"),

            (pl.col("true_range").rolling_mean(window_size=ATR_WIN, min_samples=ATR_WIN).over("symbol") / pl.col("close") * 10_000)
                .alias(f"atr_bps_{ATR_WIN}"),

            ((pl.col("close") / pl.col("close").shift(MOM_WIN).over("symbol") - 1.0) * 10_000)
                .alias(f"mom_bps_{MOM_WIN}"),
        ])
    )

    lf3 = (
        lf2.with_columns([
            (pl.col(f"mom_bps_{MOM_WIN}").abs() / (pl.col(f"vol_bps_{VOL_WIN}") + EPS))
                .alias(f"mom_eff_{MOM_WIN}"),

            ((pl.col("close") - pl.col("close").shift(ER_WIN).over("symbol")).abs() /
             (pl.col("abs_diff").rolling_sum(window_size=ER_WIN, min_samples=ER_WIN).over("symbol") + EPS))
                .alias(f"er_{ER_WIN}"),
        ])
    )

    # ============================================================
    # Stage 3: EMAs y dirección (columnas materializadas => combinaciones seguras)
    # ============================================================
    lf4 = (
        lf3.with_columns([
            pl.col("close").ewm_mean(span=EMA_FAST, adjust=False).over("symbol").alias(f"ema_{EMA_FAST}"),
            pl.col("close").ewm_mean(span=EMA_SLOW, adjust=False).over("symbol").alias(f"ema_{EMA_SLOW}"),
        ])
        .with_columns([
            pl.when(pl.col(f"ema_{EMA_FAST}") > pl.col(f"ema_{EMA_SLOW}")).then(1)
              .when(pl.col(f"ema_{EMA_FAST}") < pl.col(f"ema_{EMA_SLOW}")).then(-1)
              .otherwise(0)
              .alias("trend_dir"),

            (((pl.col(f"ema_{EMA_FAST}") - pl.col(f"ema_{EMA_SLOW}")).abs() / pl.col("close")) * 10_000)
              .alias("trend_strength_bps"),

            (((pl.col(f"ema_{EMA_SLOW}") / pl.col(f"ema_{EMA_SLOW}").shift(SLOPE_WIN).over("symbol")) - 1.0) * 10_000)
              .alias(f"trend_slope_bps_{SLOPE_WIN}"),
        ])
    )

    # -----------------------------
    # Final select (canónico)
    # -----------------------------
    lf_feat = (
        lf4.select([
            "symbol",
            "time_utc",
            "open", "high", "low", "close",
            "volume", "spread",
            "ret",
            f"vol_bps_{VOL_WIN}",
            f"atr_bps_{ATR_WIN}",
            f"mom_bps_{MOM_WIN}",
            f"mom_eff_{MOM_WIN}",
            f"er_{ER_WIN}",
            f"ema_{EMA_FAST}",
            f"ema_{EMA_SLOW}",
            "trend_dir",
            "trend_strength_bps",
            f"trend_slope_bps_{SLOPE_WIN}",
        ])
        .sort(["symbol", "time_utc"])
    )

    df_feat = lf_feat.collect()

    # -----------------------------
    # QA / sanity
    # -----------------------------
    mono = (
        df_feat.group_by("symbol")
        .agg(pl.col("time_utc").is_sorted().alias("is_sorted"))
        .sort("symbol")
    )
    if mono.filter(pl.col("is_sorted") == False).height > 0:
        raise RuntimeError(f"[Celda 05 v2.0.3] ERROR: time_utc no está ordenado en features:\n{mono}")

    key_cols = [
        f"er_{ER_WIN}",
        f"vol_bps_{VOL_WIN}",
        f"atr_bps_{ATR_WIN}",
        f"mom_bps_{MOM_WIN}",
        f"ema_{EMA_FAST}",
        f"ema_{EMA_SLOW}",
    ]
    null_report = (
        df_feat.group_by("symbol")
        .agg([(pl.col(c).is_null().mean() * 100.0).alias(f"null_pct_{c}") for c in key_cols])
        .sort("symbol")
    )

    OUT_FEATURES.parent.mkdir(parents=True, exist_ok=True)
    df_feat.write_parquet(OUT_FEATURES)

    qa_cell = None
    if QA_REPORT_PATH.exists():
        try:
            qa_cell = json.loads(QA_REPORT_PATH.read_text(encoding="utf-8")).get("cell")
        except Exception:
            qa_cell = None

    snapshot = {
        "cell": "05 v2.0.3",
        "created_utc": _now_utc_iso(),
        "qa_cell_detected": qa_cell,
        "params": {
            "ER_WIN": ER_WIN,
            "VOL_WIN": VOL_WIN,
            "MOM_WIN": MOM_WIN,
            "ATR_WIN": ATR_WIN,
            "EMA_FAST": EMA_FAST,
            "EMA_SLOW": EMA_SLOW,
            "SLOPE_WIN": SLOPE_WIN,
            "EXPECTED_BAR_SECONDS": EXPECTED_BAR_SECONDS,
        },
        "schema_cols": df_feat.columns,
        "symbols": df_feat.select("symbol").unique().to_series().to_list(),
        "notes": [
            "Fix nested windows: primero columnas base, luego rollings/EMAs sobre pl.col(...) materializadas.",
            "Features causales (<=t) alineadas con entrada t+1.",
        ],
    }
    OUT_SNAPSHOT.write_text(json.dumps(snapshot, indent=2, ensure_ascii=False), encoding="utf-8")

    print(f"[Celda 05 v2.0.3] OK — features guardados:")
    print(f"  - {OUT_FEATURES}")
    print(f"  - {OUT_SNAPSHOT}")

    print("\n--- Features preview ---")
    print(df_feat.head(8))

    print("\n--- Monotonicidad time_utc por símbolo ---")
    print(mono)

    print("\n--- Null% (warmup) en features clave ---")
    print(null_report)

    dist = (
        df_feat.group_by(["symbol", "trend_dir"])
        .agg(pl.len().alias("n"))
        .sort(["symbol", "trend_dir"])
    )
    print("\n--- Distribución trend_dir (sanity) ---")
    print(dist)

    print("\n[Celda 05 v2.0.3] OK — Se permite avanzar a Celda 06 (Regime Gate ON/OFF + hysteresis).")


ColumnNotFoundError: unable to find column "ret"; valid columns: ["symbol", "time_utc", "open", "high", "low", "close", "volume", "spread"]

Resolved plan until failure:

	---> FAILED HERE RESOLVING 'sink' <---
SORT BY [col("symbol"), col("time_utc")]
  SELECT [col("symbol"), col("time_utc"), col("open"), col("high"), col("low"), col("close"), col("volume"), col("spread")]
    Parquet SCAN [C:\Quant\MT5_Data_Extraction\artifacts\v2\run_20251222_161231_844455a0\ohlcv_clean_m5.parquet]
    PROJECT */8 COLUMNS
    ESTIMATED ROWS: 1133717