<a href="https://colab.research.google.com/github/racoope70/exploratory-daytrading/blob/main/ppo_alpaca_paper_trading_v6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Reset
!pip uninstall -y stable-baselines3 shimmy gymnasium gym alpaca-trade-api alpaca-py websockets PyYAML

# Match training stack (per your system_info.txt)
!pip install -q "stable-baselines3==2.7.0" "gymnasium==1.2.0" "shimmy==2.0.0"

# Trading deps
!pip install -q "alpaca-py" ta python-dotenv gym-anytrading pywavelets yfinance

# Only force these if you actually hit websocket/yaml issues
!pip install -q --upgrade --force-reinstall "websockets==15.0.1" "PyYAML==6.0.2"

import os; os.kill(os.getpid(), 9)

[0mFound existing installation: gymnasium 1.2.3
Uninstalling gymnasium-1.2.3:
  Successfully uninstalled gymnasium-1.2.3
Found existing installation: gym 0.25.2
Uninstalling gym-0.25.2:
  Successfully uninstalled gym-0.25.2
[0mFound existing installation: websockets 15.0.1
Uninstalling websockets-15.0.1:
  Successfully uninstalled websockets-15.0.1
Found existing installation: PyYAML 6.0.3
Uninstalling PyYAML-6.0.3:
  Successfully uninstalled PyYAML-6.0.3
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m187.2/187.2 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m944.3/944.3 kB[0m [31m11.7 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
dopamine-rl 4.1.2 requires gym<=0.25.2, which is not installed.[0m[31m
[0m  Preparing metadata (setup.py) ..

In [1]:
import websockets, yaml
print(websockets.__version__)
print(yaml.__version__)

15.0.1
6.0.2


In [2]:
import torch, gymnasium, shimmy, stable_baselines3 as sb3
import alpaca, websockets, yfinance, pywt

print("torch:", torch.__version__)
print("gymnasium:", gymnasium.__version__)
print("shimmy:", shimmy.__version__)
print("stable-baselines3:", sb3.__version__)
print("alpaca-py:", alpaca.__version__)
print("websockets:", websockets.__version__)
print("yfinance:", yfinance.__version__)
print("pywavelets:", pywt.__version__)

torch: 2.10.0+cpu
gymnasium: 1.2.0
shimmy: 2.0.0
stable-baselines3: 2.7.0
alpaca-py: 0.43.2
websockets: 15.0.1
yfinance: 0.2.66
pywavelets: 1.8.0


In [3]:
"""
PPO Alpaca Paper Live Loop (alpaca-py) — End-to-end single-file script (02.27 + safety ports from 02.26)

✅ Keeps 02.27 alpaca-py client architecture:
   - TradingClient, StockHistoricalDataClient
   - init_clients()
   - resolve_credentials()

✅ Re-added / enforced from 02.26 (as requested):
A) STRICT START_FLAT flatten logic (CRITICAL)
   - list_open_orders()
   - cancel_open_orders_for_symbols()
   - wait_until_flat_positions_and_orders()
   - flatten_symbols_strict()

B) Hard-cap enforcement (prevents runaway weights)
   - enforce_position_caps_if_violated() runs BEFORE normal policy rebalance each bar

C) Market-closed behavior for Colab
   - _sleep_until_open_or_exit() (exit in Colab when closed if COLAB_EXIT_WHEN_CLOSED=1)

D) Exposure cap knobs (GROSS_CAP / NET_CAP)
   - parsed from env/config
   - enforcement behavior:
       * compute gross/net exposure % each cycle
       * if cap violated: block risk-increasing trades (only allow de-risking)
       * optional emergency de-risk (flatten) if EMERGENCY_FLATTEN_ON_EXPOSURE=1
"""

import csv
import sys
import gc
import json
import logging
import math
import os
import pickle
import re
import shutil
import time
import warnings
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from decimal import Decimal, ROUND_DOWN, ROUND_HALF_UP
from functools import lru_cache
from pathlib import Path
from typing import Any, Dict, List, Mapping, Optional, Tuple

# Scientific / data stack
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from dotenv import load_dotenv

# Fix seed for reproducibility
import random
import torch
os.environ["PYTHONHASHSEED"] = "0"
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)

# Alpaca (alpaca-py)
from alpaca.data.timeframe import TimeFrame
from alpaca.trading.client import TradingClient
from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.requests import StockBarsRequest, StockLatestTradeRequest, StockLatestQuoteRequest
from alpaca.trading.requests import GetPortfolioHistoryRequest, MarketOrderRequest, GetOrdersRequest
from alpaca.trading.enums import OrderSide, TimeInForce, OrderStatus

# RL models
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import VecNormalize


# ------------------------------------------------------------
# Colab detection
# ------------------------------------------------------------
IN_COLAB = False
try:
    import google.colab  # type: ignore
    from google.colab import drive  # type: ignore
    IN_COLAB = True
except Exception:
    IN_COLAB = False


# ============================================================
# Timeframe normalization / mapping
# ============================================================
def normalize_tf_key(tf: str) -> str:
    s = str(tf or "").strip().lower()
    s = s.replace(" ", "").replace("_", "")
    s = s.replace("minutes", "min").replace("minute", "min")
    s = s.replace("hours", "h").replace("hour", "h")
    return s

try:
    _MIN_UNIT = TimeFrame.Unit.Minute
except Exception:
    from alpaca.data.timeframe import TimeFrameUnit as _TFU
    _MIN_UNIT = _TFU.Minute

try:
    _TF_MAP = {
        "1min":  TimeFrame.Minute,
        "5min":  TimeFrame(5, _MIN_UNIT),
        "15min": TimeFrame(15, _MIN_UNIT),
        "1h":    TimeFrame.Hour,
        "60min": TimeFrame.Hour,
        "1d":    TimeFrame.Day,
        "1hour": TimeFrame.Hour,
        "1hr":   TimeFrame.Hour,
    }
except Exception:
    _TF_MAP = {}

_PF_TF_STR = {
    "1min":  "1Min",
    "5min":  "5Min",
    "15min": "15Min",
    "1h":    "1Hour",
    "60min": "1Hour",
    "1d":    "1Day",
    "1hour": "1Hour",
    "1hr":   "1Hour",
}

LIVE_TIMEFRAME = TimeFrame.Hour  # overwritten after config


# ============================================================
# Utils / Paths / Globals
# ============================================================
def round_to_cents(x: float) -> float:
    return float(Decimal(str(x)).quantize(Decimal("0.01"), rounding=ROUND_DOWN))

def _to_bool(x: str) -> bool:
    return str(x).strip().lower() in ("1", "true", "yes", "y", "on")

def to_2dp_str(x) -> str:
    return format(Decimal(str(x)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP), "f")

def to_6dp_str(x) -> str:
    return format(Decimal(str(x)).quantize(Decimal("0.000001"), rounding=ROUND_DOWN), "f")

if IN_COLAB:
    try:
        drive.mount("/content/drive", force_remount=False)
    except Exception:
        pass

if IN_COLAB:
    PROJECT_ROOT = Path("/content/drive/MyDrive/AlpacaPaper")
else:
    PROJECT_ROOT = Path.cwd() / "AlpacaPaper"
PROJECT_ROOT.mkdir(parents=True, exist_ok=True)

# Order throttling timestamps (per symbol)
_ORDER_EVENT_TS: Dict[str, float] = {}
_LAST_ORDER_TS: Dict[str, float] = {}

_FORCED_FIRST_BUY_DONE: Dict[str, bool] = {}
_NO_POS_CYCLE_COUNT: Dict[str, int] = {}
_REENTRY_BLOCK_UNTIL: Dict[str, float] = {}

_LAST_TARGET_W: Dict[str, float] = {}
_LAST_RAW_A: Dict[str, float] = {}
_LAST_CONF: Dict[str, float] = {}

_RUN_SUMMARY_HEADER_CACHE: Dict[str, List[str]] = {}
_LAST_BAR_TIME_SEEN: Dict[str, pd.Timestamp] = {}
_LAST_BAR_TIME_SKIPPED: Dict[str, pd.Timestamp] = {}

SESSION_OPEN_EQUITY: Optional[float] = None
_last_kill_ts: float = 0.0

_SEED_COOLDOWN_SEC = 10

# Exposure-cap state (D)
_EXPOSURE_CAPS_BLOCK_RISK: bool = False
_EXPOSURE_LAST: Dict[str, float] = {"gross": float("nan"), "net": float("nan")}

def begin_order_event(symbol: str, min_gap_sec: int) -> bool:
    now = time.time()
    last = _ORDER_EVENT_TS.get(symbol, 0.0)
    if (now - last) < float(min_gap_sec):
        return False
    _ORDER_EVENT_TS[symbol] = now
    return True

def stamp_order_event(symbol: str) -> None:
    ts = time.time()
    _ORDER_EVENT_TS[symbol] = ts
    _LAST_ORDER_TS[symbol] = ts

warnings.filterwarnings("default")

# Load env (PROJECT_ROOT/.env preferred)
env_candidates = [PROJECT_ROOT / ".env", Path(".env")]
for env_path in env_candidates:
    if env_path.exists():
        load_dotenv(dotenv_path=env_path, override=True)
        break
else:
    load_dotenv(override=True)

# Defaults
os.environ.setdefault("PH_TIMEOUT_SEC", "8")
os.environ.setdefault("EQUITY_TIMEFRAME", "5Min")
os.environ.setdefault("DEBUG_FORCE_SEED_IF_IDLE", "0")
os.environ.setdefault("DEBUG_SEED_IDLE_CYCLES", "10")
os.environ.setdefault("START_FLAT", "1")
os.environ.setdefault("COLAB_EXIT_WHEN_CLOSED", "1")

# Basic logger early (replaced after RESULTS_DIR exists)
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
logging.getLogger().setLevel(getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO))
root = logging.getLogger()
root.handlers.clear()
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s"))
handler.setLevel(getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO))
root.addHandler(handler)
root.setLevel(handler.level)
try:
    sys.stdout.reconfigure(line_buffering=True)
except Exception:
    pass


# ============================================================
# Credentials (keeps 02.27 pattern)
# ============================================================
def resolve_credentials() -> Tuple[str, str, str]:
    """
    Single source of truth for API keys + base URL.
    Looks in:
      - APCA_API_KEY_ID / APCA_API_SECRET_KEY / APCA_API_BASE_URL
      - ALPACA_API_KEY_ID / ALPACA_API_SECRET_KEY
      - PROJECT_ROOT/.env
    """
    base_url = (os.getenv("APCA_API_BASE_URL") or "https://paper-api.alpaca.markets").strip()

    key = (os.getenv("APCA_API_KEY_ID") or os.getenv("ALPACA_API_KEY_ID") or "").strip()
    sec = (os.getenv("APCA_API_SECRET_KEY") or os.getenv("ALPACA_API_SECRET_KEY") or "").strip()

    if not key or not sec:
        raise RuntimeError(
            "Missing Alpaca credentials. Set APCA_API_KEY_ID and APCA_API_SECRET_KEY (or ALPACA_* equivalents) "
            "in your environment or in PROJECT_ROOT/.env"
        )

    return key, sec, base_url


# ============================================================
# Config dataclass
# ============================================================
def _to_list_csv(x: str) -> list:
    return [s.strip().upper() for s in str(x).split(",") if s.strip()]

@dataclass
class Knobs:
    APCA_API_BASE_URL: str = "https://paper-api.alpaca.markets"
    DRY_RUN: bool = False
    AUTO_RUN_LIVE: bool = True
    INF_DETERMINISTIC: bool = True

    FLATTEN_INTO_CLOSE: bool = False
    FORCE_FIRST_BUY: bool = False
    FORCE_FLATTEN_ON_EXIT: bool = False

    EQUITY_LOG_THROTTLE_SEC: int = 900
    SKIP_EQUITY_WHEN_DRY_RUN: bool = True

    TICKERS: List[str] = field(default_factory=list)
    ARTIFACTS_DIR: str = ""
    RESULTS_ROOT: str = ""

    BARS_FEED: str = "iex"
    COOLDOWN_MIN: int = 10
    STALE_MAX_SEC: int = 4200

    SIZING_MODE: str = "threshold"  # "linear" | "threshold"
    WEIGHT_CAP: float = 0.35
    CONF_FLOOR: float = 0.15
    ENTER_CONF_MIN: float = 0.12
    ENTER_WEIGHT_MIN: float = 0.02
    EXIT_WEIGHT_MAX: float = 0.008
    REBALANCE_MIN_NOTIONAL: float = 10.00
    USE_FRACTIONALS: bool = True
    SEED_FIRST_SHARE: bool = True
    ALLOW_SHORTS: bool = False

    DELTA_WEIGHT_MIN: float = 0.002
    RAW_POS_MIN: float = 0.00
    RAW_NEG_MAX: float = 0.00

    TAKE_PROFIT_PCT: float = 0.05
    STOP_LOSS_PCT: float = 0.03

    TRAIN_TIMEFRAME: str = "1H"
    DATA_TIMEFRAME: str = "1H"
    EQUITY_TIMEFRAME: str = "5Min"

    MAX_DAILY_DRAWDOWN_PCT: float = 0.05
    KILL_SWITCH_COOLDOWN_MIN: int = 30

    # (D) Exposure caps
    GROSS_CAP: float = 1.00
    NET_CAP: float = 0.80
    EMERGENCY_FLATTEN_ON_EXPOSURE: bool = False

    EXIT_AFTER_CLOSE: bool = False

    APCA_API_KEY_ID: str = ""
    APCA_API_SECRET_KEY: str = ""
    STALE_BEST_WINDOW: str = ""

    @classmethod
    def from_env(
        cls,
        defaults: "Knobs",
        project_root: Path,
        env: Mapping[str, str],
        overrides: Mapping[str, object] = None,
    ):
        kv = {**defaults.__dict__}
        kv.update({
            "APCA_API_BASE_URL": env.get("APCA_API_BASE_URL", kv["APCA_API_BASE_URL"]),
            "AUTO_RUN_LIVE":     _to_bool(env.get("AUTO_RUN_LIVE", str(kv["AUTO_RUN_LIVE"]))),
            "DRY_RUN":           _to_bool(env.get("DRY_RUN", str(kv["DRY_RUN"]))),
            "INF_DETERMINISTIC": _to_bool(env.get("INF_DETERMINISTIC", str(kv["INF_DETERMINISTIC"]))),

            "FLATTEN_INTO_CLOSE": _to_bool(env.get("FLATTEN_INTO_CLOSE", str(kv.get("FLATTEN_INTO_CLOSE", False)))),
            "FORCE_FIRST_BUY": _to_bool(env.get("FORCE_FIRST_BUY", str(kv.get("FORCE_FIRST_BUY", False)))),
            "FORCE_FLATTEN_ON_EXIT": _to_bool(env.get("FORCE_FLATTEN_ON_EXIT", str(kv.get("FORCE_FLATTEN_ON_EXIT", False)))),

            "EQUITY_LOG_THROTTLE_SEC": int(env.get("EQUITY_LOG_THROTTLE_SEC", str(kv["EQUITY_LOG_THROTTLE_SEC"]))),
            "SKIP_EQUITY_WHEN_DRY_RUN": _to_bool(env.get("SKIP_EQUITY_WHEN_DRY_RUN", str(kv["SKIP_EQUITY_WHEN_DRY_RUN"]))),

            "USE_FRACTIONALS": _to_bool(env.get("USE_FRACTIONALS", str(kv["USE_FRACTIONALS"]))),
            "SEED_FIRST_SHARE": _to_bool(env.get("SEED_FIRST_SHARE", str(kv["SEED_FIRST_SHARE"]))),
            "ALLOW_SHORTS": _to_bool(env.get("ALLOW_SHORTS", str(kv["ALLOW_SHORTS"]))),

            "TICKERS": _to_list_csv(env.get("TICKERS", ",".join(kv["TICKERS"] or ["UNH", "GE"]))),
            "ARTIFACTS_DIR": env.get("ARTIFACTS_DIR", kv["ARTIFACTS_DIR"] or str(project_root / "artifacts")),
            "RESULTS_ROOT": env.get("RESULTS_ROOT", kv["RESULTS_ROOT"] or str(project_root / "results")),

            "BARS_FEED": env.get("BARS_FEED", kv["BARS_FEED"]),
            "COOLDOWN_MIN": int(env.get("COOLDOWN_MIN", str(kv["COOLDOWN_MIN"])) or kv["COOLDOWN_MIN"]),
            "STALE_MAX_SEC": int(env.get("STALE_MAX_SEC", str(kv["STALE_MAX_SEC"])) or kv["STALE_MAX_SEC"]),

            "SIZING_MODE": env.get("SIZING_MODE", kv["SIZING_MODE"]),
            "WEIGHT_CAP": float(env.get("WEIGHT_CAP", str(kv["WEIGHT_CAP"]))),
            "CONF_FLOOR": float(env.get("CONF_FLOOR", str(kv["CONF_FLOOR"]))),
            "ENTER_CONF_MIN": float(env.get("ENTER_CONF_MIN", str(kv["ENTER_CONF_MIN"]))),
            "ENTER_WEIGHT_MIN": float(env.get("ENTER_WEIGHT_MIN", str(kv["ENTER_WEIGHT_MIN"]))),
            "EXIT_WEIGHT_MAX": float(env.get("EXIT_WEIGHT_MAX", str(kv["EXIT_WEIGHT_MAX"]))),
            "REBALANCE_MIN_NOTIONAL": float(env.get("REBALANCE_MIN_NOTIONAL", str(kv["REBALANCE_MIN_NOTIONAL"]))),

            "TAKE_PROFIT_PCT": float(env.get("TAKE_PROFIT_PCT", str(kv["TAKE_PROFIT_PCT"]))),
            "STOP_LOSS_PCT": float(env.get("STOP_LOSS_PCT", str(kv["STOP_LOSS_PCT"]))),

            "DELTA_WEIGHT_MIN": float(env.get("DELTA_WEIGHT_MIN", str(kv.get("DELTA_WEIGHT_MIN", 0.002)))),
            "RAW_POS_MIN": float(env.get("RAW_POS_MIN", str(kv.get("RAW_POS_MIN", 0.0)))),
            "RAW_NEG_MAX": float(env.get("RAW_NEG_MAX", str(kv.get("RAW_NEG_MAX", 0.0)))),

            # (D)
            "GROSS_CAP": float(env.get("GROSS_CAP", str(kv.get("GROSS_CAP", 1.00)))),
            "NET_CAP":   float(env.get("NET_CAP",   str(kv.get("NET_CAP",   0.80)))),
            "EMERGENCY_FLATTEN_ON_EXPOSURE": _to_bool(env.get("EMERGENCY_FLATTEN_ON_EXPOSURE", str(kv.get("EMERGENCY_FLATTEN_ON_EXPOSURE", False)))),

            "EXIT_AFTER_CLOSE": _to_bool(env.get("EXIT_AFTER_CLOSE", str(kv.get("EXIT_AFTER_CLOSE", False)))),

            "STALE_BEST_WINDOW": env.get("STALE_BEST_WINDOW", kv.get("STALE_BEST_WINDOW", "")),
            "DATA_TIMEFRAME": env.get("DATA_TIMEFRAME", kv.get("DATA_TIMEFRAME", "1H")),
            "TRAIN_TIMEFRAME": env.get("TRAIN_TIMEFRAME", kv.get("TRAIN_TIMEFRAME", "1H")),
            "EQUITY_TIMEFRAME": env.get("EQUITY_TIMEFRAME", kv.get("EQUITY_TIMEFRAME", "5Min")),
        })

        kv["APCA_API_KEY_ID"] = env.get("APCA_API_KEY_ID") or env.get("ALPACA_API_KEY_ID", "") or ""
        kv["APCA_API_SECRET_KEY"] = env.get("APCA_API_SECRET_KEY") or env.get("ALPACA_API_SECRET_KEY", "") or ""

        if overrides:
            for k, v in overrides.items():
                key = str(k)
                if key.upper() == "TICKERS" and isinstance(v, str):
                    v = _to_list_csv(v)
                kv[key] = v

        return cls(**kv)

    def apply_to_globals(self):
        g = globals()
        g["BASE_URL"] = self.APCA_API_BASE_URL
        g["DRY_RUN"] = bool(self.DRY_RUN)
        g["INF_DETERMINISTIC"] = bool(self.INF_DETERMINISTIC)
        g["TICKERS"] = list(self.TICKERS or ["UNH", "GE"])

        g["ARTIFACTS_DIR"] = Path(self.ARTIFACTS_DIR)
        g["RESULTS_ROOT"] = Path(self.RESULTS_ROOT)
        g["RESULTS_DIR"] = g["RESULTS_ROOT"] / datetime.now(timezone.utc).strftime("%Y-%m-%d")
        g["LATEST_DIR"] = g["RESULTS_ROOT"] / "latest"

        for p in (g["ARTIFACTS_DIR"], g["RESULTS_DIR"], g["LATEST_DIR"]):
            p.mkdir(parents=True, exist_ok=True)

        g["BARS_FEED"] = str(self.BARS_FEED).strip()
        g["COOLDOWN_MIN"] = int(self.COOLDOWN_MIN)
        g["STALE_MAX_SEC"] = int(self.STALE_MAX_SEC)
        g["SIZING_MODE"] = self.SIZING_MODE
        g["WEIGHT_CAP"] = float(self.WEIGHT_CAP)
        g["ENTER_CONF_MIN"] = float(self.ENTER_CONF_MIN)
        g["ENTER_WEIGHT_MIN"] = float(self.ENTER_WEIGHT_MIN)
        g["EXIT_WEIGHT_MAX"] = float(self.EXIT_WEIGHT_MAX)
        g["REBALANCE_MIN_NOTIONAL"] = float(self.REBALANCE_MIN_NOTIONAL)
        g["USE_FRACTIONALS"] = bool(self.USE_FRACTIONALS)
        g["SEED_FIRST_SHARE"] = bool(self.SEED_FIRST_SHARE)
        g["ALLOW_SHORTS"] = bool(self.ALLOW_SHORTS)
        g["CONF_FLOOR"] = float(self.CONF_FLOOR)
        g["TAKE_PROFIT_PCT"] = float(self.TAKE_PROFIT_PCT)
        g["STOP_LOSS_PCT"] = float(self.STOP_LOSS_PCT)
        g["BEST_WINDOW_ENV"] = (self.STALE_BEST_WINDOW or None)

        # creds (resolved later too)
        g["API_KEY"] = self.APCA_API_KEY_ID or ""
        g["API_SECRET"] = self.APCA_API_SECRET_KEY or ""

        g["DELTA_WEIGHT_MIN"] = float(self.DELTA_WEIGHT_MIN)
        g["RAW_POS_MIN"] = float(self.RAW_POS_MIN)
        g["RAW_NEG_MAX"] = float(self.RAW_NEG_MAX)

        g["TRADE_LOG_CSV"] = g["RESULTS_DIR"] / "trade_log_master.csv"
        g["EQUITY_LOG_CSV"] = g["RESULTS_DIR"] / "equity_log.csv"
        g["PLOT_PATH"] = g["RESULTS_DIR"] / "equity_curve.png"
        g["PLOT_PATH_LATEST"] = g["LATEST_DIR"] / "equity_curve.png"
        g["EQUITY_LOG_LATEST"] = g["LATEST_DIR"] / "equity_log.csv"
        g["TRADE_LOG_LATEST"] = g["LATEST_DIR"] / "trade_log_master.csv"

        g["EQUITY_LOG_THROTTLE_SEC"] = int(self.EQUITY_LOG_THROTTLE_SEC)
        g["SKIP_EQUITY_WHEN_DRY_RUN"] = bool(self.SKIP_EQUITY_WHEN_DRY_RUN)
        g["_LAST_EQUITY_LOG_TS"] = 0
        g["_TRADE_EVENT_FLAG"] = False

        g["MAX_DAILY_DRAWDOWN_PCT"] = float(self.MAX_DAILY_DRAWDOWN_PCT)
        g["KILL_SWITCH_COOLDOWN_MIN"] = int(self.KILL_SWITCH_COOLDOWN_MIN)
        g["EXIT_AFTER_CLOSE"] = bool(self.EXIT_AFTER_CLOSE)

        g["FLATTEN_INTO_CLOSE"] = bool(self.FLATTEN_INTO_CLOSE)
        g["FORCE_FIRST_BUY"] = bool(self.FORCE_FIRST_BUY)
        g["FORCE_FLATTEN_ON_EXIT"] = bool(self.FORCE_FLATTEN_ON_EXIT)

        g["DATA_TIMEFRAME"] = str(self.DATA_TIMEFRAME)
        g["TRAIN_TIMEFRAME"] = str(self.TRAIN_TIMEFRAME)
        g["EQUITY_TIMEFRAME"] = str(self.EQUITY_TIMEFRAME)

        # (D)
        g["GROSS_CAP"] = float(self.GROSS_CAP)
        g["NET_CAP"] = float(self.NET_CAP)
        g["EMERGENCY_FLATTEN_ON_EXPOSURE"] = bool(self.EMERGENCY_FLATTEN_ON_EXPOSURE)

        os.environ["EXIT_AFTER_CLOSE"] = "1" if self.EXIT_AFTER_CLOSE else "0"
        os.environ["APCA_API_BASE_URL"] = self.APCA_API_BASE_URL
        os.environ["DRY_RUN"] = "1" if self.DRY_RUN else "0"
        os.environ["AUTO_RUN_LIVE"] = "1" if self.AUTO_RUN_LIVE else "0"
        os.environ["BARS_FEED"] = self.BARS_FEED

def configure_knobs(overrides: Mapping[str, object] = None) -> Knobs:
    defaults = Knobs(
        TICKERS=_to_list_csv(os.getenv("TICKERS", "UNH,GE")),
        ARTIFACTS_DIR=os.getenv("ARTIFACTS_DIR", str(PROJECT_ROOT / "artifacts")),
        RESULTS_ROOT=os.getenv("RESULTS_ROOT", str(PROJECT_ROOT / "results")),
        DATA_TIMEFRAME=os.getenv("DATA_TIMEFRAME", "1H"),
        TRAIN_TIMEFRAME=os.getenv("TRAIN_TIMEFRAME", "1H"),
        EQUITY_TIMEFRAME=os.getenv("EQUITY_TIMEFRAME", "5Min"),
    )
    cfg = Knobs.from_env(defaults, PROJECT_ROOT, os.environ, overrides=overrides)
    cfg.apply_to_globals()
    return cfg


# ============================================================
# Time helpers
# ============================================================
def ensure_utc(ts_like) -> pd.Timestamp:
    ts = pd.Timestamp(ts_like)
    if ts.tzinfo is None:
        return ts.tz_localize("UTC")
    return ts.tz_convert("UTC")

def now_utc() -> datetime:
    return datetime.now(timezone.utc)

def utcnow_iso() -> str:
    return datetime.now(timezone.utc).isoformat()

def utc_ts(dt_like) -> int:
    ts = ensure_utc(dt_like)
    return int(ts.value // 10**9)

def _sleep_to_next_minute_block(n: int):
    n = max(1, int(n))
    now = now_utc()
    base = now.replace(second=0, microsecond=0)
    remainder = base.minute % n
    add = (n - remainder) % n
    if add == 0:
        add = n
    next_slot = base + timedelta(minutes=add)
    time.sleep(max(0.0, (next_slot - now).total_seconds()))

def is_hour_close(ts: pd.Timestamp) -> bool:
    ts = ensure_utc(ts)
    return (ts.minute == 0) and (ts.second == 0)


# ============================================================
# Trade logging (master)
# ============================================================
TRADE_FIELDS = ["datetime_utc", "ticker", "signal", "action", "price", "equity", "qty", "comment"]

def ensure_trade_log_header():
    if (not TRADE_LOG_CSV.exists()) or (TRADE_LOG_CSV.stat().st_size == 0):
        with TRADE_LOG_CSV.open("w", newline="", encoding="utf-8") as f:
            csv.DictWriter(f, fieldnames=TRADE_FIELDS).writeheader()

def log_trade(
    ticker: str,
    signal: float,
    action: str,
    price: float,
    equity: float,
    qty: float = None,
    comment: str = "",
):
    ensure_trade_log_header()
    row = {
        "datetime_utc": utcnow_iso(),
        "ticker": ticker,
        "signal": int(signal) if signal is not None else "",
        "action": action,
        "price": (float(price) if price is not None and np.isfinite(price) else ""),
        "equity": (float(equity) if equity is not None and np.isfinite(equity) else ""),
        "qty": (float(qty) if qty is not None and np.isfinite(qty) else ""),
        "comment": (str(comment) if comment else ""),
    }
    with TRADE_LOG_CSV.open("a", newline="", encoding="utf-8") as f:
        csv.DictWriter(f, fieldnames=TRADE_FIELDS).writerow(row)
    try:
        shutil.copy2(TRADE_LOG_CSV, TRADE_LOG_LATEST)
    except Exception:
        pass


# ============================================================
# Alpaca clients (single source of truth) — 02.27 pattern
# ============================================================
def init_clients() -> Tuple[TradingClient, StockHistoricalDataClient]:
    api_key, api_secret, base_url = resolve_credentials()
    # Keep paper=True, but also enforce BASE_URL later
    trading_api = TradingClient(api_key, api_secret, paper=True, url_override=base_url)
    data_api = StockHistoricalDataClient(api_key, api_secret)
    return trading_api, data_api


# ============================================================
# Timeout-safe wrapper
# ============================================================
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
_TIMEOUT_EXEC = ThreadPoolExecutor(max_workers=8)

def _call_with_timeout(func, timeout_sec: int, *args, **kwargs):
    fut = _TIMEOUT_EXEC.submit(func, *args, **kwargs)
    try:
        return fut.result(timeout=timeout_sec)
    except FuturesTimeoutError:
        raise TimeoutError(f"Timed out after {timeout_sec}s")


# ============================================================
# Portfolio history / equity snapshots
# ============================================================
def _resolve_pf_timeframe_str(timeframe, default: str = "1Hour") -> str:
    tf_raw = timeframe or os.getenv("EQUITY_TIMEFRAME", os.getenv("DATA_TIMEFRAME", default))
    key = normalize_tf_key(tf_raw)
    return _PF_TF_STR.get(key, default)

def _resolve_pf_timeframe_obj(timeframe, default_obj=None):
    tf_raw = timeframe or os.getenv("EQUITY_TIMEFRAME", os.getenv("DATA_TIMEFRAME", "1H"))
    key = normalize_tf_key(tf_raw)
    return _TF_MAP.get(key, default_obj or TimeFrame.Hour)

def get_portfolio_history_safe(
    trading_api: TradingClient,
    period: str = "1M",
    timeframe=None,
    extended_hours: bool = False,
    timeout_sec: int = 8,
    retries: int = 1,
):
    last_exc = None
    try:
        tf_obj = _resolve_pf_timeframe_obj(timeframe, default_obj=TimeFrame.Hour)
        req = GetPortfolioHistoryRequest(period=str(period), timeframe=tf_obj, extended_hours=bool(extended_hours))
    except Exception as e:
        last_exc = e
        tf_str = _resolve_pf_timeframe_str(timeframe, default="1Hour")
        req = GetPortfolioHistoryRequest(period=str(period), timeframe=tf_str, extended_hours=bool(extended_hours))

    for _ in range(max(1, retries + 1)):
        try:
            return _call_with_timeout(trading_api.get_portfolio_history, timeout_sec, req)
        except Exception as e:
            last_exc = e
            time.sleep(0.5)

    logging.warning(f"get_portfolio_history_safe failed: {last_exc}")
    return None

def fetch_portfolio_history(period="1D", timeframe=None, trading_api_in=None):
    trading_api = trading_api_in if trading_api_in is not None else globals().get("trading_api", None)
    if trading_api is None:
        return pd.DataFrame(columns=["timestamp_utc", "equity"])

    tf_raw = timeframe or os.getenv("EQUITY_TIMEFRAME", os.getenv("DATA_TIMEFRAME", "1H"))
    timeout_sec = int(os.getenv("PH_TIMEOUT_SEC", "8"))
    hist = get_portfolio_history_safe(
        trading_api,
        period=str(period),
        timeframe=tf_raw,
        extended_hours=False,
        timeout_sec=timeout_sec,
        retries=1,
    )

    if (not hist) or (not getattr(hist, "timestamp", None)) or (not getattr(hist, "equity", None)):
        if EQUITY_LOG_CSV.exists():
            try:
                df = pd.read_csv(EQUITY_LOG_CSV, parse_dates=["datetime_utc"])
                return df.rename(columns={"datetime_utc": "timestamp_utc"})[["timestamp_utc", "equity"]]
            except Exception:
                pass
        return pd.DataFrame(columns=["timestamp_utc", "equity"])

    return pd.DataFrame({
        "timestamp_utc": pd.to_datetime(hist.timestamp, unit="s", utc=True),
        "equity": pd.to_numeric(pd.Series(hist.equity), errors="coerce"),
    }).dropna()

def log_equity_snapshot(trading_api_in=None):
    trading_api = trading_api_in if trading_api_in is not None else globals().get("trading_api", None)
    if trading_api is None:
        return

    snap = fetch_portfolio_history(
        period="1D",
        timeframe=os.getenv("EQUITY_TIMEFRAME", "5Min"),
        trading_api_in=trading_api
    )
    if snap.empty:
        return

    latest = snap.iloc[-1:].copy().rename(columns={"timestamp_utc": "datetime_utc"})

    if EQUITY_LOG_CSV.exists() and EQUITY_LOG_CSV.stat().st_size > 0:
        try:
            df_old = pd.read_csv(EQUITY_LOG_CSV, parse_dates=["datetime_utc"])
        except Exception:
            df_old = pd.DataFrame(columns=["datetime_utc", "equity"])

        if (not df_old.empty) and (pd.to_datetime(df_old["datetime_utc"].iloc[-1]) == latest["datetime_utc"].iloc[0]):
            return

        out = (
            pd.concat([df_old, latest], ignore_index=True)
              .drop_duplicates(subset=["datetime_utc"], keep="last")
              .sort_values("datetime_utc")
        )
        out.to_csv(EQUITY_LOG_CSV, index=False)
    else:
        latest.to_csv(EQUITY_LOG_CSV, index=False)

    try:
        shutil.copy2(EQUITY_LOG_CSV, EQUITY_LOG_LATEST)
    except Exception:
        pass

def maybe_log_equity_snapshot(trading_api_in=None, reason=None):
    global _LAST_EQUITY_LOG_TS, _TRADE_EVENT_FLAG
    trading_api = trading_api_in if trading_api_in is not None else globals().get("trading_api", None)
    if trading_api is None:
        return

    if reason is None:
        reason = ("trade" if bool(_TRADE_EVENT_FLAG) else "cycle")

    if bool(globals().get("DRY_RUN", False)) and bool(globals().get("SKIP_EQUITY_WHEN_DRY_RUN", True)):
        return

    now_ts = time.time()
    force = reason in {"trade", "finalize", "close"}
    last_ts = float(_LAST_EQUITY_LOG_TS or 0.0)
    throttle = int(globals().get("EQUITY_LOG_THROTTLE_SEC", 900))

    if force or (now_ts - last_ts) >= throttle:
        try:
            log_equity_snapshot(trading_api_in=trading_api)
            _LAST_EQUITY_LOG_TS = now_ts
        except Exception as e:
            logging.debug(f"maybe_log_equity_snapshot failed: {e}")

    if reason == "trade":
        _TRADE_EVENT_FLAG = False


# ============================================================
# Run summary logging (append-only)
# ============================================================
def _safe_float(x):
    try:
        v = float(x)
        return v if np.isfinite(v) else float("nan")
    except Exception:
        return float("nan")

def _ensure_csv_header(path: Path, fieldnames: List[str]) -> None:
    if (not path.exists()) or (path.stat().st_size == 0):
        with path.open("w", newline="", encoding="utf-8") as f:
            csv.DictWriter(f, fieldnames=fieldnames).writeheader()

def _run_summary_fieldnames(tickers_u: List[str]) -> List[str]:
    syms = [str(s).upper() for s in tickers_u]
    fields = [
        "datetime_utc",
        "equity",
        "cash",
        "total_market_value",
        "gross_exposure_pct",
        "net_exposure_pct",
        "error",
    ]
    for s in syms:
        fields += [
            f"target_weight_{s}",
            f"actual_weight_{s}",
            f"position_qty_{s}",
            f"position_mv_{s}",
            f"raw_action_{s}",
            f"confidence_{s}",
        ]
    return fields

def append_run_summary(
    trading_api: TradingClient,
    tickers: List[str],
    results_dir: Path,
    latest_dir: Optional[Path] = None,
    error: str = "",
) -> None:
    try:
        ts = utcnow_iso()
        row: Dict[str, Any] = {"datetime_utc": ts, "error": (str(error)[:300] if error else "")}

        acct = None
        try:
            acct = trading_api.get_account()
            row["equity"] = _safe_float(getattr(acct, "equity", float("nan")))
            row["cash"] = _safe_float(getattr(acct, "cash", float("nan")))
        except Exception as e:
            row["equity"] = float("nan")
            row["cash"] = float("nan")
            row["error"] = (row["error"] + f" | get_account:{e}")[:300] if row["error"] else f"get_account:{e}"[:300]

        equity = _safe_float(row.get("equity", float("nan")))

        positions = []
        try:
            positions = trading_api.get_all_positions()
        except Exception as e:
            row["error"] = (row["error"] + f" | list_positions:{e}")[:300] if row["error"] else f"list_positions:{e}"[:300]
            positions = []

        all_pos_mv = []
        for p in positions or []:
            try:
                mv = _safe_float(getattr(p, "market_value", float("nan")))  # signed
                if np.isfinite(mv):
                    all_pos_mv.append(mv)
            except Exception:
                continue

        total_mv_all = float(np.nansum(all_pos_mv)) if all_pos_mv else 0.0
        gross_mv_all = float(np.nansum([abs(x) for x in all_pos_mv])) if all_pos_mv else 0.0
        row["total_market_value"] = _safe_float(total_mv_all)
        if np.isfinite(equity) and equity > 0:
            row["gross_exposure_pct"] = gross_mv_all / equity
            row["net_exposure_pct"] = total_mv_all / equity
        else:
            row["gross_exposure_pct"] = float("nan")
            row["net_exposure_pct"] = float("nan")

        tickers_u = [str(s).upper() for s in tickers]
        pos_qty: Dict[str, float] = {s: 0.0 for s in tickers_u}
        pos_mv: Dict[str, float] = {s: 0.0 for s in tickers_u}

        for p in positions or []:
            try:
                sym = str(getattr(p, "symbol", "")).upper()
                if sym not in pos_qty:
                    continue
                q = _safe_float(getattr(p, "qty", 0.0))
                mv = _safe_float(getattr(p, "market_value", 0.0))
                pos_qty[sym] = q
                pos_mv[sym] = mv
            except Exception:
                continue

        for s in tickers_u:
            tw = _safe_float(_LAST_TARGET_W.get(s, float("nan")))
            rw = _safe_float(_LAST_RAW_A.get(s, float("nan")))
            cf = _safe_float(_LAST_CONF.get(s, float("nan")))

            mv = _safe_float(pos_mv.get(s, 0.0))
            aq = _safe_float(pos_qty.get(s, 0.0))
            aw = (mv / equity) if (np.isfinite(equity) and equity > 0) else float("nan")

            row[f"target_weight_{s}"] = tw
            row[f"actual_weight_{s}"] = aw
            row[f"position_qty_{s}"] = aq
            row[f"position_mv_{s}"] = mv
            row[f"raw_action_{s}"] = rw
            row[f"confidence_{s}"] = cf

        out_path = results_dir / "run_summary.csv"
        fieldnames = _run_summary_fieldnames(tickers_u)
        _ensure_csv_header(out_path, fieldnames)
        out_key = str(out_path.resolve())

        old_header = _RUN_SUMMARY_HEADER_CACHE.get(out_key)
        if old_header is None:
            try:
                with out_path.open("r", newline="", encoding="utf-8") as f:
                    old_header = next(csv.reader(f))
            except Exception:
                old_header = []
            _RUN_SUMMARY_HEADER_CACHE[out_key] = list(old_header)

        if old_header and old_header != fieldnames:
            schema_stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
            rotated_main = results_dir / f"run_summary__oldschema__{schema_stamp}.csv"
            try:
                out_path.replace(rotated_main)
            except Exception:
                try:
                    shutil.copy2(out_path, rotated_main)
                    out_path.unlink(missing_ok=True)
                except Exception:
                    pass
            _ensure_csv_header(out_path, fieldnames)
            _RUN_SUMMARY_HEADER_CACHE[out_key] = list(fieldnames)

        with out_path.open("a", newline="", encoding="utf-8") as f:
            w = csv.DictWriter(f, fieldnames=fieldnames)
            w.writerow({k: row.get(k, "") for k in fieldnames})

        _RUN_SUMMARY_HEADER_CACHE[out_key] = list(fieldnames)

        if latest_dir is not None:
            try:
                latest_dir.mkdir(parents=True, exist_ok=True)
                shutil.copy2(out_path, latest_dir / "run_summary.csv")
            except Exception:
                pass

    except Exception as fatal:
        try:
            out_path = results_dir / "run_summary.csv"
            fieldnames = _run_summary_fieldnames([str(s).upper() for s in tickers])
            _ensure_csv_header(out_path, fieldnames)
            fallback = {k: "" for k in fieldnames}
            fallback["datetime_utc"] = utcnow_iso()
            fallback["error"] = f"append_run_summary_fatal:{str(fatal)[:260]}"
            with out_path.open("a", newline="", encoding="utf-8") as f:
                csv.DictWriter(f, fieldnames=fieldnames).writerow(fallback)
            if latest_dir is not None:
                try:
                    latest_dir.mkdir(parents=True, exist_ok=True)
                    shutil.copy2(out_path, latest_dir / "run_summary.csv")
                except Exception:
                    pass
        except Exception:
            pass


# ============================================================
# Plotting + metrics
# ============================================================
def plot_equity_curve(from_equity_csv: bool = True):
    with plt.ioff():
        if from_equity_csv and EQUITY_LOG_CSV.exists():
            df = pd.read_csv(EQUITY_LOG_CSV, parse_dates=["datetime_utc"]).sort_values("datetime_utc")
        else:
            df = fetch_portfolio_history(period="3M", timeframe=os.getenv("EQUITY_TIMEFRAME", "5Min")) \
                .rename(columns={"timestamp_utc": "datetime_utc"})

        if df.empty:
            print("No equity data to plot yet.")
            return

        fig, ax = plt.subplots(figsize=(10, 4))
        ax.plot(df["datetime_utc"], df["equity"])
        ax.set_title("Portfolio Value Over Time (Paper)")
        ax.set_xlabel("Time (UTC)")
        ax.set_ylabel("Equity ($)")
        fig.tight_layout()
        fig.savefig(PLOT_PATH, bbox_inches="tight")
        fig.savefig(PLOT_PATH_LATEST, bbox_inches="tight")
        plt.close(fig)
        print(f"Saved equity curve → {PLOT_PATH}")
        print(f"Updated latest copy → {PLOT_PATH_LATEST}")

def compute_performance_metrics(df_equity: pd.DataFrame):
    if df_equity.empty or df_equity["equity"].isna().all():
        return {"cum_return": np.nan, "sharpe": np.nan, "max_drawdown": np.nan}

    df = df_equity.sort_values("datetime_utc")
    e = df["equity"].astype(float)
    r = e.pct_change().dropna()
    if r.empty:
        return {"cum_return": 0.0, "sharpe": np.nan, "max_drawdown": np.nan}

    dt_sec = df["datetime_utc"].diff().dt.total_seconds().dropna().median()
    if not (isinstance(dt_sec, (int, float)) and dt_sec > 0):
        periods_per_year = 252 * 78
    else:
        periods_per_day = (6.5 * 3600) / dt_sec
        periods_per_year = 252 * periods_per_day

    sharpe = (r.mean() / (r.std() + 1e-12)) * math.sqrt(periods_per_year)
    cum = (1 + r).cumprod()
    peak = cum.cummax()
    dd = (cum / peak - 1.0).min()
    cum_return = e.iloc[-1] / e.iloc[0] - 1.0
    return {"cum_return": float(cum_return), "sharpe": float(sharpe), "max_drawdown": float(dd)}


# ============================================================
# Per-ticker CSV logging
# ============================================================
def _append_csv_row(path: Path, row: dict):
    fieldnames = list(row.keys())

    if not path.exists():
        with path.open("w", newline="", encoding="utf-8") as f:
            w = csv.DictWriter(f, fieldnames=fieldnames)
            w.writeheader()
            w.writerow(row)
        return

    try:
        with path.open("r", newline="", encoding="utf-8") as f:
            old_header = next(csv.reader(f))
    except Exception:
        old_header = []

    if old_header != fieldnames:
        tmp = path.with_suffix(".tmp")
        with tmp.open("w", newline="", encoding="utf-8") as wf, path.open("r", newline="", encoding="utf-8") as rf:
            r = csv.DictReader(rf) if old_header else None
            w = csv.DictWriter(wf, fieldnames=fieldnames)
            w.writeheader()
            if r:
                for old_row in r:
                    merged = {k: old_row.get(k, "") for k in fieldnames}
                    w.writerow(merged)
        tmp.replace(path)

    with path.open("a", newline="", encoding="utf-8") as f:
        csv.DictWriter(f, fieldnames=fieldnames).writerow(row)

def log_trade_symbol(
    symbol: str,
    bar_time,
    raw_action: float,
    weight: float,
    confidence: float,
    price: float,
    equity: float,
    dry_run: bool,
    note: str = "",
    order_submitted: int = 0,
    order_id: str = "",
    order_status: str = "",
    filled_qty: str = "",
):
    try:
        bt = pd.Timestamp(bar_time)
        if bt.tzinfo is None:
            bt = bt.tz_localize("UTC")
        else:
            bt = bt.tz_convert("UTC")
        bt_iso = bt.isoformat()
        age_sec = int((now_utc() - bt.to_pydatetime()).total_seconds())
    except Exception:
        bt_iso = ""
        age_sec = ""

    resolved_feed = (os.getenv("BARS_FEED", "").strip() or "default")

    if abs(float(weight)) <= float(globals().get("EXIT_WEIGHT_MAX", 0.0)):
        sig = "FLAT"
    elif float(weight) > 0:
        sig = "LONG"
    else:
        sig = "SHORT"

    row = {
        "log_time": now_utc().isoformat(),
        "symbol": symbol,
        "bar_time": bt_iso,
        "bar_age_sec": age_sec,
        "feed": resolved_feed,
        "signal": sig,
        "raw_action": float(raw_action) if raw_action is not None and np.isfinite(raw_action) else "",
        "weight": float(weight) if weight is not None and np.isfinite(weight) else "",
        "confidence": float(confidence) if confidence is not None and np.isfinite(confidence) else "",
        "price": float(price) if price is not None and np.isfinite(price) else "",
        "equity": float(equity) if equity is not None and np.isfinite(equity) else "",
        "dry_run": int(bool(dry_run)),
        "note": note,
        "order_submitted": int(order_submitted),
        "order_id": str(order_id or ""),
        "order_status": str(order_status or ""),
        "filled_qty": str(filled_qty or ""),
    }

    _append_csv_row(RESULTS_DIR / f"trade_log_{symbol}.csv", row)

    try:
        action = str(note)[:64]
        comment = str(note)[:200]
        master_sig = -1 if sig == "SHORT" else (1 if sig == "LONG" else 0)
        log_trade(
            ticker=symbol,
            signal=master_sig,
            action=action,
            price=float(price) if (price is not None and np.isfinite(price)) else None,
            equity=float(equity) if (equity is not None and np.isfinite(equity)) else None,
            qty=None,
            comment=comment,
        )
    except Exception as e:
        logging.debug("master trade log write failed: %s", e)


# ============================================================
# Artifacts: picker & loaders
# ============================================================
def _extract_window_idx(path: Path) -> Optional[int]:
    m = re.search(r"_window(\d+)", path.stem)
    return int(m.group(1)) if m else None

def _prefer_same_window(cands, w: Optional[int]):
    cands = list(cands)
    if not cands:
        return []
    if w is None:
        return sorted(cands)
    same = [p for p in cands if _extract_window_idx(p) == w]
    return sorted(same or cands)

def pick_artifacts_for_ticker(ticker: str, artifacts_dir: str, best_window: Optional[str] = None) -> Dict[str, Optional[Path]]:
    p = Path(artifacts_dir)
    if not p.exists():
        raise FileNotFoundError(f"Artifacts directory not found: {p.resolve()}")

    models = sorted(p.glob(f"ppo_{ticker}_window*_model*.zip"))
    if not models:
        models = (sorted(p.glob(f"ppo_{ticker}_model*.zip")) or sorted(p.glob(f"*{ticker}*model*.zip")))
    if not models:
        raise FileNotFoundError(f"No PPO model zip found for {ticker} in {p}")

    def _model_sort_key(path: Path):
        w = _extract_window_idx(path)
        return (w if w is not None else -1, " (1)" in path.stem)

    models = sorted(models, key=_model_sort_key)

    chosen: Optional[Path] = None
    if best_window:
        chosen = next((m for m in models if f"_window{best_window}_" in m.stem), None)
        if chosen is None:
            logging.warning("[%s] BEST_WINDOW=%s not found; falling back.", ticker, best_window)

    if chosen is None:
        with_idx = [(m, _extract_window_idx(m)) for m in models]
        with_idx = [(m, w) for (m, w) in with_idx if w is not None]
        chosen = max(with_idx, key=lambda t: t[1])[0] if with_idx else models[-1]

    chosen_w = _extract_window_idx(chosen)

    vec_candidates = list(p.glob(f"ppo_{ticker}_window*_vecnorm*.pkl"))
    feat_candidates = list(p.glob(f"ppo_{ticker}_window*_features*.json"))

    vecnorm = (_prefer_same_window(vec_candidates, chosen_w)[0] if vec_candidates else None)
    feats = (_prefer_same_window(feat_candidates, chosen_w)[0] if feat_candidates else None)

    logging.info("[%s] model=%s | window=%s | vecnorm=%s | features=%s",
                 ticker, chosen.name, chosen_w,
                 vecnorm.name if vecnorm else "None",
                 feats.name if feats else "None")
    return {"model": chosen, "vecnorm": vecnorm, "features": feats}

def load_vecnormalize(path: Optional[Path]):
    if path is None:
        return None
    try:
        with open(path, "rb") as f:
            return pickle.load(f)
    except Exception:
        pass
    try:
        return VecNormalize.load(str(path), venv=None)
    except Exception as e:
        logging.warning("VecNormalize load failed (%s). Proceeding without normalization.", e)
        return None

def load_features(path: Optional[Path]):
    if path is None:
        return None
    with open(path, "r") as f:
        return json.load(f)

def _const_schedule(val: float):
    return lambda _progress_remaining: float(val)

def load_ppo_model(model_path: Path):
    custom_objects = {
        "lr_schedule": _const_schedule(5e-5),
        "clip_range":  _const_schedule(0.2),
        "clip_range_vf": _const_schedule(0.2),
    }
    return PPO.load(str(model_path), custom_objects=custom_objects)


# ============================================================
# Market / account / orders helpers
# ============================================================
def ensure_market_open(trading_api: TradingClient) -> bool:
    try:
        return bool(trading_api.get_clock().is_open)
    except Exception:
        return False

def get_account_equity(trading_api: TradingClient) -> float:
    return float(trading_api.get_account().equity)

def get_position(trading_api: TradingClient, symbol: str):
    try:
        return trading_api.get_position(symbol)
    except Exception:
        return None

def asset_flags(trading_api: TradingClient, symbol: str) -> Tuple[bool, bool, bool]:
    try:
        a = trading_api.get_asset(symbol)
        return (
            bool(getattr(a, "tradable", True)),
            bool(getattr(a, "fractionable", False)),
            bool(getattr(a, "shortable", False)),
        )
    except Exception:
        return True, False, False

@lru_cache(maxsize=256)
def _asset_flags(symbol: str) -> Tuple[bool, bool, bool]:
    try:
        trading_api = globals().get("trading_api", None)
        if trading_api is None:
            return True, False, False
        a = trading_api.get_asset(symbol)
        return (
            bool(getattr(a, "tradable", True)),
            bool(getattr(a, "fractionable", False)),
            bool(getattr(a, "shortable", False)),
        )
    except Exception:
        return True, False, False

def can_short_symbol(trading_api: TradingClient, symbol: str) -> bool:
    if not bool(globals().get("ALLOW_SHORTS", False)):
        return False
    try:
        acct = trading_api.get_account()
        shorting_ok = bool(getattr(acct, "shorting_enabled", False))
    except Exception:
        shorting_ok = False
    try:
        a = trading_api.get_asset(symbol)
        asset_ok = bool(getattr(a, "shortable", False))
    except Exception:
        asset_ok = False
    return shorting_ok and asset_ok

def _side_enum(side: str) -> OrderSide:
    s = (side or "").strip().lower()
    if s in ("buy", "long"):
        return OrderSide.BUY
    if s in ("sell", "short"):
        return OrderSide.SELL
    raise ValueError(f"Invalid side: {side!r} (expected 'buy' or 'sell')")

def _truncate_toward_zero(x: float) -> int:
    # safer than floor for negatives (prevents -0.1 -> -1)
    return int(math.trunc(float(x)))

def market_order(trading_api: TradingClient, symbol: str, side: str, qty=None, notional: float = None):
    if qty is not None and notional is not None:
        logging.warning(f"[{symbol}] Both qty and notional provided; preferring notional.")
        qty = None
    if qty is None and notional is None:
        logging.warning(f"[{symbol}] No order size provided; skipping.")
        return None

    if bool(globals().get("DRY_RUN", False)):
        notional_str = to_2dp_str(notional) if notional is not None else None
        logging.info(f"[DRY_RUN] Would submit {side} {(('notional=$'+str(notional_str)) if notional_str else ('qty='+str(qty)))} {symbol}")
        globals()["_TRADE_EVENT_FLAG"] = True
        return None

    try:
        side_enum = _side_enum(side)

        qty_arg = None
        if qty is not None:
            q = float(qty)
            if not bool(globals().get("USE_FRACTIONALS", True)):
                q = _truncate_toward_zero(q)
                if q == 0:
                    logging.info(f"[{symbol}] qty truncates to 0 shares; skipping.")
                    return None
            qty_arg = float(to_6dp_str(q)) if bool(globals().get("USE_FRACTIONALS", True)) else int(q)

        notional_arg = None
        if notional is not None:
            notional_arg = float(to_2dp_str(float(notional)))

        req = MarketOrderRequest(
            symbol=str(symbol).upper(),
            side=side_enum,
            time_in_force=TimeInForce.DAY,
            qty=qty_arg,
            notional=notional_arg,
        )
        o = trading_api.submit_order(req)
        size_str = f"notional=${notional_arg}" if notional_arg is not None else f"qty={qty_arg}"
        logging.info(f"[{symbol}] Submitted {side} {size_str}")
        globals()["_TRADE_EVENT_FLAG"] = True
        return o
    except Exception as e:
        logging.error(f"[{symbol}] submit_order failed: {e}")
        return None

def market_order_to_qty(trading_api: TradingClient, symbol: str, side: str, qty):
    if qty is None:
        logging.warning(f"[{symbol}] qty is None; skipping.")
        return None
    try:
        q = float(qty)
    except Exception:
        logging.warning(f"[{symbol}] qty not numeric ({qty}); skipping.")
        return None

    if not np.isfinite(q) or q <= 0:
        logging.info(f"[{symbol}] Non-positive qty ({qty}); skipping.")
        return None

    use_fractionals = bool(globals().get("USE_FRACTIONALS", True))
    if not use_fractionals:
        q_int = _truncate_toward_zero(q)
        if q_int <= 0:
            logging.info(f"[{symbol}] qty truncates to 0 shares; skipping.")
            return None
        q = q_int

    if bool(globals().get("DRY_RUN", False)):
        logging.info(f"[DRY_RUN] Would submit {side} qty={q} {symbol}")
        globals()["_TRADE_EVENT_FLAG"] = True
        return None

    if side.strip().lower() == "sell" and (not can_short_symbol(trading_api, symbol)):
        have_qty = get_position_qty(trading_api, symbol)
        if have_qty <= 0:
            logging.info(f"[{symbol}] Sell skipped (no shares and shorting not allowed).")
            return None
        q = min(float(q), float(have_qty))
        if q <= 0:
            logging.info(f"[{symbol}] Sell qty clamped to 0; skipping.")
            return None

    try:
        side_enum = _side_enum(side)
        qty_arg = float(to_6dp_str(float(q))) if use_fractionals else int(q)
        req = MarketOrderRequest(symbol=str(symbol).upper(), side=side_enum, time_in_force=TimeInForce.DAY, qty=qty_arg)
        o = trading_api.submit_order(req)
        logging.info(f"[{symbol}] Submitted {side} qty={qty_arg}")
        globals()["_TRADE_EVENT_FLAG"] = True
        return o
    except Exception as e:
        logging.error(f"[{symbol}] submit_order(qty) failed: {e}")
        return None

def submit_fractional_rebalance(trading_api: TradingClient, symbol: str, delta_notional: float, price: float):
    dn = round_to_cents(abs(delta_notional))
    if dn < float(globals().get("REBALANCE_MIN_NOTIONAL", 0.0)):
        return None
    if delta_notional > 0:
        return market_order(trading_api, symbol, side="buy", notional=dn)
    qty = dn / max(float(price), 1e-9)
    if not can_short_symbol(trading_api, symbol):
        have_qty = get_position_qty(trading_api, symbol)
        if have_qty <= 0:
            logging.info(f"[{symbol}] Fractional sell skipped (no shares, no shorting).")
            return None
        qty = min(float(qty), float(have_qty))
        if qty <= 0:
            return None
    return market_order_to_qty(trading_api, symbol, side="sell", qty=qty)

NO_ORDER = {"order_submitted": 0, "order_id": "", "order_status": "", "filled_qty": ""}

def _order_info(order_obj) -> dict:
    if order_obj is None:
        return dict(NO_ORDER)
    return {
        "order_submitted": 1,
        "order_id": str(getattr(order_obj, "id", "") or ""),
        "order_status": str(getattr(order_obj, "status", "") or ""),
        "filled_qty": str(getattr(order_obj, "filled_qty", "") or ""),
    }

def get_position_qty(trading_api: TradingClient, symbol: str):
    use_fractionals = bool(globals().get("USE_FRACTIONALS", True))
    try:
        pos = trading_api.get_position(symbol)
    except Exception:
        pos = None
    if not pos:
        return 0.0 if use_fractionals else 0
    try:
        q = float(pos.qty)
        if use_fractionals:
            return q
        return _truncate_toward_zero(q)
    except Exception:
        return 0.0 if use_fractionals else 0

def get_last_price(trading_api: TradingClient, data_api: StockHistoricalDataClient, symbol: str) -> float:
    sym = str(symbol).upper()

    def _latest_map_get(resp, symbol: str, attr_name: str):
        sym2 = str(symbol).upper()
        try:
            if isinstance(resp, dict):
                return resp.get(sym2) or resp.get(symbol)
        except Exception:
            pass
        try:
            m = getattr(resp, attr_name, None)
            if isinstance(m, dict):
                return m.get(sym2) or m.get(symbol)
        except Exception:
            pass
        for alt in ("data", "raw", "result"):
            try:
                m = getattr(resp, alt, None)
                if isinstance(m, dict):
                    obj = m.get(sym2) or m.get(symbol)
                    if obj is not None:
                        return obj
            except Exception:
                continue
        return None

    try:
        resp = data_api.get_stock_latest_trade(StockLatestTradeRequest(symbol_or_symbols=sym))
        tr = _latest_map_get(resp, sym, "trades")
        if tr is not None:
            price = getattr(tr, "price", None)
            if price is None:
                price = getattr(tr, "p", None)
            if price is not None and np.isfinite(float(price)):
                return float(price)
    except Exception:
        pass

    try:
        resp = data_api.get_stock_latest_quote(StockLatestQuoteRequest(symbol_or_symbols=sym))
        qt = _latest_map_get(resp, sym, "quotes")
        if qt is not None:
            ap = getattr(qt, "ask_price", getattr(qt, "ap", None))
            bp = getattr(qt, "bid_price", getattr(qt, "bp", None))
            if ap is not None and bp is not None and np.isfinite(float(ap)) and np.isfinite(float(bp)):
                return float((float(ap) + float(bp)) / 2.0)
            if ap is not None and np.isfinite(float(ap)):
                return float(ap)
            if bp is not None and np.isfinite(float(bp)):
                return float(bp)
    except Exception:
        pass

    try:
        feed = os.getenv("BARS_FEED", "").strip() or None
        req = StockBarsRequest(symbol_or_symbols=sym, timeframe=globals().get("LIVE_TIMEFRAME", TimeFrame.Hour), limit=1, feed=feed)
        resp = data_api.get_stock_bars(req)
        if hasattr(resp, "df") and resp.df is not None and not resp.df.empty:
            df = resp.df.copy()
            if isinstance(df.index, pd.MultiIndex):
                try:
                    df = df.xs(sym, level=0)
                except Exception:
                    df = df.reset_index(level=0, drop=True)
            if "close" in df.columns:
                v = df["close"].iloc[-1]
                if v is not None and np.isfinite(float(v)):
                    return float(v)
    except Exception:
        pass

    try:
        pos = trading_api.get_position(sym)
        v = getattr(pos, "avg_entry_price", None)
        return float(v) if v is not None else float("nan")
    except Exception:
        return float("nan")

def print_position_summary(trading_api: TradingClient) -> float:
    try:
        positions = trading_api.get_all_positions()
        total_market_value = 0.0
        print("\nPosition Summary:")
        if not positions:
            print("  (no open positions)")
            print("\nTotal Market Value: $0.00")
            return 0.0
        for p in positions:
            sym = str(getattr(p, "symbol", "")).upper()
            qty = float(getattr(p, "qty", 0.0) or 0.0)
            px  = float(getattr(p, "current_price", float("nan")))
            mv  = float(getattr(p, "market_value", 0.0) or 0.0)  # signed
            total_market_value += mv
            px_str = f"${px:.2f}" if np.isfinite(px) else "n/a"
            print(f"  {sym}: {qty} shares @ {px_str} | Value: ${mv:,.2f}")
        print(f"\nTotal Market Value: ${total_market_value:,.2f}")
        return total_market_value
    except Exception as e:
        print(f"Could not summarize positions: {e}")
        return float("nan")


# ============================================================
# (A) STRICT START_FLAT: cancel orders + wait for flat (positions + orders)
# ============================================================
def list_open_orders(trading_api: TradingClient, symbols: Optional[List[str]] = None):
    try:
        req = GetOrdersRequest(status=OrderStatus.OPEN, limit=500)
        orders = trading_api.get_orders(req) or []
    except Exception:
        orders = []
    if symbols:
        want = {s.upper() for s in symbols}
        out = []
        for o in orders:
            try:
                if str(getattr(o, "symbol", "")).upper() in want:
                    out.append(o)
            except Exception:
                continue
        return out
    return orders

def cancel_open_orders_for_symbols(trading_api: TradingClient, symbols: List[str]) -> int:
    symbols_u = {s.upper() for s in symbols}
    orders = list_open_orders(trading_api, symbols=list(symbols_u))
    canceled = 0
    for o in orders:
        oid = getattr(o, "id", None)
        sym = str(getattr(o, "symbol", "")).upper()
        if sym not in symbols_u or not oid:
            continue
        try:
            trading_api.cancel_order_by_id(oid)
            canceled += 1
            logging.info("[%s] Canceled open order id=%s", sym, oid)
        except Exception as e:
            logging.warning("[%s] cancel_order_by_id failed (%s): %s", sym, oid, e)
    return canceled

def wait_until_flat_positions_and_orders(
    trading_api: TradingClient,
    symbols: List[str],
    dust_qty: float = 0.000001,
    mv_dust: float = 50.0,
    timeout_sec: int = 90,
    poll_sec: int = 2,
) -> bool:
    t0 = time.time()
    symbols_u = [s.upper() for s in symbols]
    while time.time() - t0 < timeout_sec:
        flat_positions = True
        try:
            pos = trading_api.get_all_positions() or []
            mv = {str(p.symbol).upper(): float(getattr(p, "market_value", 0.0) or 0.0) for p in pos}
            qty = {str(p.symbol).upper(): float(getattr(p, "qty", 0.0) or 0.0) for p in pos}
            still = []
            for s in symbols_u:
                if abs(qty.get(s, 0.0)) > dust_qty and abs(mv.get(s, 0.0)) > mv_dust:
                    still.append(s)
            if still:
                flat_positions = False
        except Exception:
            flat_positions = False

        open_orders = list_open_orders(trading_api, symbols=symbols_u)
        flat_orders = (len(open_orders) == 0)

        if flat_positions and flat_orders:
            return True

        time.sleep(max(0.25, float(poll_sec)))
    return False

def flatten_symbols_strict(
    trading_api: TradingClient,
    symbols: List[str],
    dust_qty: float = 0.000001,
    timeout_sec: int = 120,
):
    """
    Strict flatten:
    1) cancel open orders for symbols
    2) close positions for symbols (market close)
    3) cancel again (to catch newly created orders)
    4) wait until both positions AND orders are flat
    """
    symbols_u = [s.upper() for s in symbols]

    logging.warning("[START_FLAT] strict flatten begin for symbols=%s", symbols_u)
    try:
        cancel_open_orders_for_symbols(trading_api, symbols_u)
    except Exception:
        pass

    for s in symbols_u:
        try:
            trading_api.close_position(s)
            logging.info("[%s] close_position submitted.", s)
        except Exception as e:
            logging.info("[%s] close_position skipped/failed: %s", s, e)

    time.sleep(1.5)

    try:
        cancel_open_orders_for_symbols(trading_api, symbols_u)
    except Exception:
        pass

    ok = wait_until_flat_positions_and_orders(
        trading_api,
        symbols_u,
        dust_qty=dust_qty,
        timeout_sec=timeout_sec,
        poll_sec=2,
    )

    if ok:
        logging.warning("[START_FLAT] strict flatten complete: FLAT ✅")
    else:
        logging.warning("[START_FLAT] strict flatten timed out: NOT FULLY FLAT ⚠️ (check orders/positions)")
    return ok


# ============================================================
# Rebalance logic (SIGNED MV)
# ============================================================
def compute_target_qty_by_cash(equity: float, price: float, target_weight: float) -> int:
    if not np.isfinite(equity) or equity <= 0:
        return 0
    if not np.isfinite(price) or price <= 0:
        return 0

    w = float(target_weight)
    cap = float(globals().get("WEIGHT_CAP", 1.0))
    if cap > 0:
        w = max(-cap, min(cap, w))
    if not bool(globals().get("ALLOW_SHORTS", False)):
        w = max(0.0, w)

    target_notional = equity * w
    qty_f = target_notional / price
    return _truncate_toward_zero(qty_f)

def rebalance_to_weight(
    trading_api: TradingClient,
    data_api: StockHistoricalDataClient,
    symbol: str,
    equity: float,
    target_weight: float
) -> dict:
    price = get_last_price(trading_api, data_api, symbol)
    if not np.isfinite(price) or price <= 0:
        logging.warning(f"[{symbol}] Price unavailable; skipping rebalance.")
        return dict(NO_ORDER)

    tradable, fractionable, shortable = asset_flags(trading_api, symbol)
    if not tradable:
        logging.info(f"[{symbol}] Not tradable; skipping.")
        return dict(NO_ORDER)

    use_fractionals = bool(USE_FRACTIONALS and fractionable)

    pos = get_position(trading_api, symbol)
    have_qty = float(getattr(pos, "qty", 0.0) or 0.0) if pos else 0.0
    have_mv = None
    if pos is not None:
        try:
            have_mv = float(getattr(pos, "market_value", None))
        except Exception:
            have_mv = None

    if have_mv is not None and np.isfinite(have_mv):
        have_notional = float(have_mv)  # signed
    else:
        have_notional = float(have_qty) * float(price)

    target_notional = float(equity) * float(target_weight)
    delta_notional = target_notional - have_notional

    # If flipping long->short, flatten long first (reduces rejection risk)
    if (have_qty > 0) and (target_notional < 0):
        logging.info(f"[{symbol}] Flip long→short requested. Flattening long first (have_qty={have_qty}).")
        try:
            flatten_symbols_strict(trading_api, [symbol], timeout_sec=90)
        except Exception:
            pass
        return dict(NO_ORDER)

    if abs(delta_notional) < 1e-9:
        return dict(NO_ORDER)

    delta_weight = abs(delta_notional) / max(float(equity), 1e-9)
    if delta_weight < float(globals().get("DELTA_WEIGHT_MIN", 0.0)):
        return dict(NO_ORDER)

    if use_fractionals:
        dn = round_to_cents(abs(delta_notional))
        if dn < float(globals().get("REBALANCE_MIN_NOTIONAL", 0.0)):
            return dict(NO_ORDER)

        side = "buy" if delta_notional > 0 else "sell"
        shorting = (target_notional < 0) and (side == "sell")
        covering = (have_qty < 0) and (side == "buy")

        if shorting:
            if not shortable:
                logging.info(f"[{symbol}] Not shortable; skipping short rebalance.")
                return dict(NO_ORDER)
            qty = max(1, int(math.floor(dn / price))) if price > 0 else 1
            o = market_order_to_qty(trading_api, symbol, side="sell", qty=qty)
            return _order_info(o)

        if covering:
            qty = max(1, int(math.ceil(dn / price))) if price > 0 else 1
            qty = min(int(abs(have_qty)), qty) if have_qty < 0 else qty
            o = market_order_to_qty(trading_api, symbol, side="buy", qty=qty)
            return _order_info(o)

        o = submit_fractional_rebalance(trading_api, symbol, delta_notional=delta_notional, price=price)
        return _order_info(o)

    want_qty = compute_target_qty_by_cash(equity, price, target_weight)
    have_qty_int = _truncate_toward_zero(have_qty)
    delta_qty = want_qty - have_qty_int
    if delta_qty == 0:
        return dict(NO_ORDER)

    approx_delta_notional = abs(delta_qty) * price
    if equity > 0 and approx_delta_notional / equity < float(globals().get("DELTA_WEIGHT_MIN", 0.0)):
        return dict(NO_ORDER)
    if approx_delta_notional < float(globals().get("REBALANCE_MIN_NOTIONAL", 0.0)):
        return dict(NO_ORDER)

    side = "buy" if delta_qty > 0 else "sell"
    shorting = (target_notional < 0) and (side == "sell")
    if shorting and not shortable:
        logging.info(f"[{symbol}] Not shortable; skipping short rebalance.")
        return dict(NO_ORDER)

    o = market_order_to_qty(trading_api, symbol, side=side, qty=int(abs(delta_qty)))
    return _order_info(o)


# ============================================================
# (B) Hard cap enforcement (prevents runaway weights)
# ============================================================
def enforce_position_caps_if_violated(
    trading_api: TradingClient,
    data_api: StockHistoricalDataClient,
    symbol: str,
    equity: float,
    cap: float,
) -> dict:
    try:
        cap = float(cap)
        if cap <= 0:
            return dict(NO_ORDER)
        if not np.isfinite(equity) or equity <= 0:
            return dict(NO_ORDER)

        pos = get_position(trading_api, symbol)
        if not pos:
            return dict(NO_ORDER)

        mv = float(getattr(pos, "market_value", 0.0) or 0.0)  # signed
        if not np.isfinite(mv) or abs(mv) < 1e-9:
            return dict(NO_ORDER)

        w = mv / equity
        if abs(w) <= cap + 1e-6:
            return dict(NO_ORDER)

        target_w = float(np.sign(w) * cap)
        logging.warning("[%s] HARD_CAP breach: actual_weight=%.3f exceeds cap=%.3f -> rebalance to %.3f",
                        symbol, w, cap, target_w)
        return rebalance_to_weight(trading_api, data_api, symbol, equity, target_w)
    except Exception as e:
        logging.warning("[%s] enforce_position_caps_if_violated failed: %s", symbol, e)
        return dict(NO_ORDER)


# ============================================================
# (D) Exposure caps: compute + enforcement
# ============================================================
def compute_gross_net_exposure(trading_api: TradingClient, symbols: Optional[List[str]] = None) -> Tuple[float, float, float]:
    """
    Returns (gross_exposure_pct, net_exposure_pct, equity).
    gross = sum(abs(market_value)) / equity
    net   = sum(market_value) / equity
    If symbols provided, only includes those symbols.
    """
    acct = trading_api.get_account()
    equity = float(getattr(acct, "equity", float("nan")))
    if not np.isfinite(equity) or equity <= 0:
        return float("nan"), float("nan"), equity

    want = {s.upper() for s in symbols} if symbols else None
    pos = trading_api.get_all_positions() or []
    mvs = []
    for p in pos:
        try:
            sym = str(getattr(p, "symbol", "")).upper()
            if want is not None and sym not in want:
                continue
            mv = float(getattr(p, "market_value", 0.0) or 0.0)  # signed
            if np.isfinite(mv):
                mvs.append(mv)
        except Exception:
            continue

    gross = float(np.nansum([abs(x) for x in mvs])) / equity if mvs else 0.0
    net = float(np.nansum(mvs)) / equity if mvs else 0.0
    return gross, net, equity

def exposure_caps_update_and_flags(trading_api: TradingClient, symbols: List[str]) -> Tuple[bool, float, float]:
    """
    Updates global exposure state. Returns (block_risk, gross, net).
    block_risk=True => do NOT allow trades that increase gross or increase abs(net).
    """
    global _EXPOSURE_CAPS_BLOCK_RISK, _EXPOSURE_LAST

    gross, net, eq = compute_gross_net_exposure(trading_api, symbols=symbols)
    _EXPOSURE_LAST["gross"] = gross
    _EXPOSURE_LAST["net"] = net

    gcap = float(globals().get("GROSS_CAP", 1.0))
    ncap = float(globals().get("NET_CAP", 1.0))

    breach = False
    if np.isfinite(gross) and gcap > 0 and gross > gcap + 1e-9:
        breach = True
    if np.isfinite(net) and ncap > 0 and abs(net) > ncap + 1e-9:
        breach = True

    _EXPOSURE_CAPS_BLOCK_RISK = bool(breach)

    if breach:
        logging.warning("[EXPOSURE_CAP] breach: gross=%.3f (cap=%.3f) | net=%.3f (cap=%.3f) => BLOCK RISK-INCREASING TRADES",
                        gross, gcap, net, ncap)
    else:
        logging.info("[EXPOSURE_CAP] ok: gross=%.3f (cap=%.3f) | net=%.3f (cap=%.3f)",
                     gross, gcap, net, ncap)

    return _EXPOSURE_CAPS_BLOCK_RISK, gross, net

def trade_increases_risk(current_w: float, target_w: float) -> bool:
    """
    Conservative rule:
      - if |target| > |current| => gross likely increases
      - if abs(target) == abs(current) but moves away from 0 in signed sense, also increases abs(net) for that symbol
    """
    try:
        cw = float(current_w)
        tw = float(target_w)
    except Exception:
        return True
    return abs(tw) > abs(cw) + 1e-9

def maybe_emergency_flatten_on_exposure(trading_api: TradingClient, symbols: List[str]) -> bool:
    """
    Optional "big red button":
      if EMERGENCY_FLATTEN_ON_EXPOSURE=1 and caps breached -> strict flatten.
    """
    if not bool(globals().get("EMERGENCY_FLATTEN_ON_EXPOSURE", False)):
        return False
    if not bool(_EXPOSURE_CAPS_BLOCK_RISK):
        return False
    logging.warning("[EXPOSURE_CAP] EMERGENCY_FLATTEN_ON_EXPOSURE=1 -> flattening all configured symbols now.")
    try:
        flatten_symbols_strict(trading_api, symbols, timeout_sec=180)
        return True
    except Exception as e:
        logging.warning("[EXPOSURE_CAP] emergency flatten failed: %s", e)
        return False


# ============================================================
# Risk: TP/SL
# ============================================================
def check_tp_sl_and_maybe_flatten(trading_api: TradingClient, symbol: str) -> bool:
    if TAKE_PROFIT_PCT <= 0 and STOP_LOSS_PCT <= 0:
        return False
    pos = get_position(trading_api, symbol)
    if not pos:
        return False
    try:
        plpc = float(pos.unrealized_plpc)
    except Exception:
        return False
    if TAKE_PROFIT_PCT > 0 and plpc >= TAKE_PROFIT_PCT:
        logging.info(f"[{symbol}] TP hit ({plpc:.4f} >= {TAKE_PROFIT_PCT:.4f}). Flattening.")
        flatten_symbols_strict(trading_api, [symbol], timeout_sec=90)
        return True
    if STOP_LOSS_PCT > 0 and plpc <= -abs(STOP_LOSS_PCT):
        logging.info(f"[{symbol}] SL hit ({plpc:.4f} <= {-abs(STOP_LOSS_PCT):.4f}). Flattening.")
        flatten_symbols_strict(trading_api, [symbol], timeout_sec=90)
        return True
    return False


# ============================================================
# Bars + features + obs
# ============================================================
def get_recent_bars(data_api: StockHistoricalDataClient, symbol: str, limit: int = 200, timeframe=None) -> pd.DataFrame:
    timeframe = timeframe or globals().get("LIVE_TIMEFRAME", TimeFrame.Hour)
    feed = os.getenv("BARS_FEED", "").strip() or None

    def _normalize_df(resp) -> pd.DataFrame:
        if not hasattr(resp, "df"):
            return pd.DataFrame(columns=["Open", "High", "Low", "Close", "Volume"])
        df = resp.df.copy()
        if df.empty:
            return pd.DataFrame(columns=["Open", "High", "Low", "Close", "Volume"])
        if isinstance(df.index, pd.MultiIndex):
            try:
                df = df.xs(symbol, level=0)
            except Exception:
                df = df.reset_index(level=0, drop=True)
        df.index = pd.to_datetime(df.index, utc=True, errors="coerce")
        df = df.rename(columns={"open": "Open", "high": "High", "low": "Low", "close": "Close", "volume": "Volume"})
        cols = [c for c in ["Open", "High", "Low", "Close", "Volume"] if c in df.columns]
        out = df[cols].sort_index()
        return out.dropna(how="all")

    try:
        req = StockBarsRequest(symbol_or_symbols=symbol, timeframe=timeframe, limit=int(limit), feed=feed)
        resp = data_api.get_stock_bars(req)
        df = _normalize_df(resp)
        if not df.empty:
            return df
    except Exception as e:
        logging.warning(f"[{symbol}] get_stock_bars(limit={limit}) failed: {e}")

    try:
        end_dt = datetime.now(timezone.utc).replace(microsecond=0)
        start_dt = end_dt - timedelta(days=5)
        req = StockBarsRequest(symbol_or_symbols=symbol, timeframe=timeframe, start=start_dt, end=end_dt, feed=feed)
        resp = data_api.get_stock_bars(req)
        df = _normalize_df(resp)
        if not df.empty:
            return df
    except Exception as e:
        logging.warning(f"[{symbol}] get_stock_bars(start/end) failed: {e}")

    return pd.DataFrame(columns=["Open", "High", "Low", "Close", "Volume"])

def add_regime(df: pd.DataFrame) -> pd.DataFrame:
    df["Vol20"] = df["Close"].pct_change().rolling(20).std()
    df["Ret20"] = df["Close"].pct_change(20)
    vol_hi = (df["Vol20"] > df["Vol20"].median()).astype(int)
    trend_hi = (df["Ret20"].abs() > df["Ret20"].abs().median()).astype(int)
    df["Regime4"] = vol_hi * 2 + trend_hi
    return df

def denoise_wavelet(series: pd.Series, wavelet: str = "db1", level: int = 2) -> pd.Series:
    try:
        import pywt
    except Exception:
        return pd.Series(series).astype(float).ffill().bfill().ewm(span=5, adjust=False).mean()

    s = pd.Series(series).astype(float).ffill().bfill()
    arr = s.to_numpy()
    try:
        w = pywt.Wavelet(wavelet)
        maxlvl = pywt.dwt_max_level(len(arr), w.dec_len)
        lvl = int(max(0, min(level, maxlvl)))
        if lvl < 1:
            return s
        coeffs = pywt.wavedec(arr, w, mode="symmetric", level=lvl)
        for i in range(1, len(coeffs)):
            coeffs[i] = np.zeros_like(coeffs[i])
        rec = pywt.waverec(coeffs, w, mode="symmetric")
        return pd.Series(rec[:len(arr)], index=s.index)
    except Exception:
        return s.ewm(span=5, adjust=False).mean()

def add_features_live(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy().sort_index()
    cols_ci = {c.lower(): c for c in df.columns}
    rename = {}
    for final, alts in {
        "Open": ["open"],
        "High": ["high"],
        "Low": ["low"],
        "Close": ["close", "close*", "last"],
        "Adj Close": ["adj close", "adj_close", "adjclose", "adjusted close"],
        "Volume": ["volume", "vol"],
    }.items():
        for a in [final.lower()] + alts:
            if a in cols_ci:
                rename[cols_ci[a]] = final
                break
    df = df.rename(columns=rename)
    if "Adj Close" not in df.columns and "Close" in df.columns:
        df["Adj Close"] = df["Close"]

    df["SMA_20"] = df["Close"].rolling(20).mean()
    df["STD_20"] = df["Close"].rolling(20).std()
    df["Upper_Band"] = df["SMA_20"] + 2 * df["STD_20"]
    df["Lower_Band"] = df["SMA_20"] - 2 * df["STD_20"]

    df["Lowest_Low"] = df["Low"].rolling(14).min()
    df["Highest_High"] = df["High"].rolling(14).max()
    denom = (df["Highest_High"] - df["Lowest_Low"]).replace(0, np.nan)
    df["Stoch"] = ((df["Close"] - df["Lowest_Low"]) / denom) * 100

    df["ROC"] = df["Close"].pct_change(10)
    sign = np.sign(df["Close"].diff().fillna(0))
    df["OBV"] = (sign * df["Volume"].fillna(0)).cumsum()

    tp = (df["High"] + df["Low"] + df["Close"]) / 3.0
    sma_tp = tp.rolling(20).mean()
    md = (tp - sma_tp).abs().rolling(20).mean().replace(0, np.nan)
    df["CCI"] = (tp - sma_tp) / (0.015 * md)

    df["EMA_10"] = df["Close"].ewm(span=10, adjust=False).mean()
    df["EMA_50"] = df["Close"].ewm(span=50, adjust=False).mean()
    ema12 = df["Close"].ewm(span=12, adjust=False).mean()
    ema26 = df["Close"].ewm(span=26, adjust=False).mean()
    df["MACD_Line"] = ema12 - ema26
    df["MACD_Signal"] = df["MACD_Line"].ewm(span=9, adjust=False).mean()

    d = df["Close"].diff()
    gain = d.clip(lower=0)
    loss = (-d.clip(upper=0))
    avg_gain = gain.ewm(alpha=1 / 14, adjust=False).mean()
    avg_loss = loss.ewm(alpha=1 / 14, adjust=False).mean()
    rs = avg_gain / avg_loss.replace(0, np.nan)
    df["RSI"] = 100 - (100 / (1 + rs))

    tr = pd.concat([
        (df["High"] - df["Low"]),
        (df["High"] - df["Close"].shift()).abs(),
        (df["Low"] - df["Close"].shift()).abs(),
    ], axis=1).max(axis=1)
    df["ATR"] = tr.ewm(alpha=1 / 14, adjust=False).mean()

    df["Volatility"] = df["Close"].pct_change().rolling(20).std()
    df["Denoised_Close"] = denoise_wavelet(df["Close"])

    df = add_regime(df)
    df["SentimentScore"] = 0.0
    df["Delta"] = df["Close"].pct_change(1).fillna(0.0)
    df["Gamma"] = df["Delta"].diff().fillna(0.0)

    df.replace([np.inf, -np.inf], np.nan, inplace=True)
    return df

FEATURE_ALIASES = {"SMA_50": "Rolling_Mean_50", "Rolling_Mean_50": "SMA_50"}

def resolve_feature_alias(name: str, df: pd.DataFrame) -> Optional[str]:
    if name in df.columns:
        return name
    alt = FEATURE_ALIASES.get(name)
    if alt and alt in df.columns:
        return alt
    return None

def compute_art_feat_order(features_hint: Any, df: pd.DataFrame) -> List[str]:
    if features_hint is None:
        return [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
    feats = features_hint.get("features", features_hint) if isinstance(features_hint, dict) else list(features_hint)
    drop = {"datetime", "symbol", "target", "return"}
    resolved = []
    for f in feats:
        if f in drop:
            continue
        col = resolve_feature_alias(f, df)
        if col and pd.api.types.is_numeric_dtype(df[col]):
            resolved.append(col)
    return resolved

def expected_obs_shape(model, vecnorm) -> Optional[tuple]:
    for src in (model, vecnorm):
        try:
            shp = tuple(getattr(src, "observation_space", None).shape)
            if shp:
                return shp
        except Exception:
            pass
    return None

def _pick_columns_for_channels(features_hint: Any, df: pd.DataFrame, channels: int) -> List[str]:
    ordered = compute_art_feat_order(features_hint, df)
    if len(ordered) >= channels:
        return ordered[:channels]
    numeric = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
    pref = ["Close", "Volume", "Adj Close", "Open", "High", "Low"]
    cols = [c for c in pref if c in numeric]
    cols += [c for c in numeric if c not in cols]
    cols = cols[:channels]
    if cols:
        while len(cols) < channels:
            cols.append(cols[-1])
    return cols[:channels]

def prepare_observation_from_bars(
    bars_df: pd.DataFrame,
    features_hint: Any = None,
    min_required_rows: int = 60,
    expected_shape: Optional[tuple] = None,
    symbol: str = "",
) -> Tuple[np.ndarray, int]:
    feats_df = add_features_live(bars_df).replace([np.inf, -np.inf], np.nan)

    ts = ensure_utc(pd.Timestamp.utcnow())
    if not feats_df.empty:
        try:
            ts = ensure_utc(feats_df.index[-1])
        except Exception:
            pass
    obs_ts = int(ts.timestamp())

    if expected_shape is not None and len(expected_shape) == 2:
        lookback, channels = int(expected_shape[0]), int(expected_shape[1])
        cols = _pick_columns_for_channels(features_hint, feats_df, channels)
        window_df = feats_df[cols].tail(lookback).fillna(0.0)
        arr = window_df.to_numpy(dtype=np.float32)
        if arr.shape[0] < lookback:
            pad_rows = lookback - arr.shape[0]
            arr = np.vstack([np.zeros((pad_rows, channels), dtype=np.float32), arr])
        arr = arr[-lookback:, :channels]
        return arr.reshape(lookback, channels), obs_ts

    order = compute_art_feat_order(features_hint, feats_df)
    feats_df = feats_df.dropna(subset=order) if order else feats_df
    if len(feats_df) < max(20, min_required_rows):
        raise ValueError(f"Not enough bars to compute features robustly (have {len(feats_df)}).")
    last = feats_df.iloc[-1]
    vals = []
    for c in order:
        v = last.get(c, np.nan)
        vals.append(0.0 if (pd.isna(v) or v is None) else float(v))
    return np.asarray(vals, dtype=np.float32), obs_ts


# ============================================================
# Inference
# ============================================================
def action_to_weight(action) -> Tuple[float, float, float]:
    a = float(np.asarray(action).reshape(-1)[0])
    raw = a
    cap = float(globals().get("WEIGHT_CAP", 0.35))
    target_w = float(np.clip(a, -1, 1)) * cap
    conf = float(min(1.0, abs(a)))

    if not bool(globals().get("ALLOW_SHORTS", False)):
        target_w = max(0.0, target_w)

    if str(globals().get("SIZING_MODE", "linear")).lower() == "threshold":
        floor = float(globals().get("CONF_FLOOR", 0.15))
        if conf < floor:
            target_w = 0.0
        else:
            scale = (conf - floor) / max(1e-9, (1.0 - floor))
            target_w = np.sign(target_w) * cap * float(np.clip(scale, 0, 1))

    return float(target_w), float(conf), float(raw)

def infer_target_weight(model: PPO, vecnorm: Optional[VecNormalize], obs: np.ndarray) -> Tuple[float, float, float]:
    x = np.asarray(obs, dtype=np.float32)

    if vecnorm is not None and hasattr(vecnorm, "normalize_obs") and getattr(vecnorm, "obs_rms", None) is not None:
        try:
            x = vecnorm.normalize_obs(x)
        except Exception:
            try:
                x = vecnorm.normalize_obs(np.expand_dims(x, axis=0))[0]
            except Exception:
                pass

    try:
        action, _ = model.predict(x, deterministic=INF_DETERMINISTIC)
    except Exception:
        action, _ = model.predict(np.expand_dims(x, axis=0), deterministic=INF_DETERMINISTIC)
        if isinstance(action, (list, np.ndarray)):
            action = np.asarray(action)
            if action.ndim > 0:
                action = action[0]

    return action_to_weight(action)


# ============================================================
# (C) Market close behavior (Colab-safe)
# ============================================================
def _sleep_until_open_or_exit(trading_api: TradingClient) -> bool:
    """
    Returns True if we should continue looping, False if we should exit (Colab-safe default).
    """
    try:
        clock = trading_api.get_clock()
        if getattr(clock, "is_open", False):
            return True

        nxt = pd.to_datetime(getattr(clock, "next_open"), utc=True, errors="coerce")
        if pd.isna(nxt):
            time.sleep(60)
            return True

        wait = max(1, int((nxt - now_utc()).total_seconds()))
        logging.info("Market closed. Next open in %ds.", wait)

        if IN_COLAB and _to_bool(os.getenv("COLAB_EXIT_WHEN_CLOSED", "1")):
            logging.info("IN_COLAB and COLAB_EXIT_WHEN_CLOSED=1 -> exiting cleanly instead of long sleep.")
            return False

        chunk = int(os.getenv("CLOSED_SLEEP_CHUNK_SEC", "120"))
        remaining = wait
        while remaining > 0:
            s = min(chunk, remaining)
            logging.info("[CLOSED] sleeping %ds (remaining %ds)", s, remaining)
            time.sleep(s)
            remaining -= s
        return True

    except Exception:
        time.sleep(60)
        return True


# ============================================================
# One-symbol live step
# ============================================================
def run_live_once_for_symbol(
    trading_api: TradingClient,
    data_api: StockHistoricalDataClient,
    symbol: str,
    model: PPO,
    vecnorm: Optional[VecNormalize],
    features_hint: Optional[dict] = None,
    cycle_equity: Optional[float] = None,
):
    # If exposure caps are breached, we allow ONLY de-risking trades.
    block_risk = bool(globals().get("_EXPOSURE_CAPS_BLOCK_RISK", False)) or bool(_EXPOSURE_CAPS_BLOCK_RISK)

    shape = expected_obs_shape(model, vecnorm)
    lookback = int(shape[0]) if (shape and len(shape) == 2) else None
    bars_need = max(200, (lookback or 0) * 3)

    bars_df = get_recent_bars(data_api, symbol, limit=bars_need, timeframe=LIVE_TIMEFRAME)
    if bars_df is None or bars_df.empty:
        logging.warning("[%s] No recent bars; skipping.", symbol)
        return

    try:
        latest_bar_time = ensure_utc(pd.Timestamp(bars_df.index[-1]))
    except Exception:
        latest_bar_time = None

    if latest_bar_time is not None:
        prev = _LAST_BAR_TIME_SEEN.get(symbol)

        if str(globals().get("DATA_TIMEFRAME", "1H")).upper() in ("1H", "1HR", "60MIN", "1HOUR"):
            if not is_hour_close(latest_bar_time):
                eq = float(cycle_equity) if cycle_equity is not None else float(get_account_equity(trading_api))
                px = float(bars_df["Close"].iloc[-1])
                if _LAST_BAR_TIME_SKIPPED.get(symbol) != latest_bar_time:
                    _LAST_BAR_TIME_SKIPPED[symbol] = latest_bar_time
                    log_trade_symbol(symbol, latest_bar_time, _LAST_RAW_A.get(symbol, 0.0),
                                     _LAST_TARGET_W.get(symbol, 0.0), _LAST_CONF.get(symbol, 0.0),
                                     px, eq, DRY_RUN, note="heartbeat_skip_not_hour_close")
                return

        if prev is not None and latest_bar_time <= prev:
            eq = float(cycle_equity) if cycle_equity is not None else float(get_account_equity(trading_api))
            px = float(bars_df["Close"].iloc[-1])
            if _LAST_BAR_TIME_SKIPPED.get(symbol) != latest_bar_time:
                _LAST_BAR_TIME_SKIPPED[symbol] = latest_bar_time
                log_trade_symbol(symbol, latest_bar_time, _LAST_RAW_A.get(symbol, 0.0),
                                 _LAST_TARGET_W.get(symbol, 0.0), _LAST_CONF.get(symbol, 0.0),
                                 px, eq, DRY_RUN, note="heartbeat_skip_same_bar")
            return

        _LAST_BAR_TIME_SEEN[symbol] = latest_bar_time
        _LAST_BAR_TIME_SKIPPED.pop(symbol, None)

    block_until = _REENTRY_BLOCK_UNTIL.get(symbol, 0.0)
    if time.time() < block_until:
        remaining = int(max(0, block_until - time.time()))
        logging.info(f"[{symbol}] Re-entry cooldown active ({remaining}s left); skipping.")
        eq = float(cycle_equity) if cycle_equity is not None else float(get_account_equity(trading_api))
        px = float(bars_df["Close"].iloc[-1])
        log_trade_symbol(symbol, bars_df.index[-1], 0.0, 0.0, 0.0, px, eq, DRY_RUN, note="reentry_cooldown")
        return

    min_rows_needed = max(20, int(shape[0]) if (shape and len(shape) == 2) else 60)
    obs, obs_ts = prepare_observation_from_bars(
        bars_df,
        features_hint=features_hint,
        min_required_rows=min_rows_needed,
        expected_shape=shape,
        symbol=symbol,
    )

    _now_ts = utc_ts(now_utc())
    if _now_ts - obs_ts >= STALE_MAX_SEC:
        eq = float(cycle_equity) if cycle_equity is not None else float(get_account_equity(trading_api))
        px = float(bars_df["Close"].iloc[-1])
        logging.info(f"[{symbol}] Observation stale; skipping.")
        log_trade_symbol(symbol, bars_df.index[-1], 0.0, 0.0, 0.0, px, eq, DRY_RUN, note="skip_stale")
        return

    if check_tp_sl_and_maybe_flatten(trading_api, symbol):
        return

    target_w, conf, raw = infer_target_weight(model, vecnorm, obs)

    _LAST_TARGET_W[symbol] = float(target_w)
    _LAST_CONF[symbol] = float(conf)
    _LAST_RAW_A[symbol] = float(raw)

    logging.info("[%s] predict() ok → raw=%.4f target_w=%.4f conf=%.3f", symbol, raw, target_w, conf)

    eq = float(cycle_equity) if cycle_equity is not None else float(get_account_equity(trading_api))
    px = float(bars_df["Close"].iloc[-1])
    pos = get_position(trading_api, symbol)

    have_mv = float(getattr(pos, "market_value", 0.0) or 0.0) if pos else 0.0
    cur_w = (have_mv / eq) if (np.isfinite(eq) and eq > 0) else 0.0

    have_qty = float(getattr(pos, "qty", 0.0) or 0.0) if pos else 0.0
    has_pos = (abs(have_qty) > 1e-9)

    # gates
    RAW_POS_MIN_LOCAL = float(globals().get("RAW_POS_MIN", 0.0))
    if target_w > 0 and raw < RAW_POS_MIN_LOCAL:
        log_trade_symbol(symbol, bars_df.index[-1], raw, target_w, conf, px, eq, DRY_RUN, note="raw_gate_long")
        return

    RAW_NEG_GATE = float(globals().get("RAW_NEG_MAX", 0.0))
    if target_w < 0 and abs(raw) < RAW_NEG_GATE:
        log_trade_symbol(symbol, bars_df.index[-1], raw, target_w, conf, px, eq, DRY_RUN, note="raw_gate_short")
        return

    if abs(target_w) <= EXIT_WEIGHT_MAX and pos:
        logging.info(f"[{symbol}] Model near-flat (≤{EXIT_WEIGHT_MAX:.3f}); flattening.")
        flatten_symbols_strict(trading_api, [symbol], timeout_sec=90)
        log_trade_symbol(symbol, bars_df.index[-1], raw, target_w, conf, px, eq, DRY_RUN, note="flatten")
        return

    wants_trade = (abs(target_w) >= ENTER_WEIGHT_MIN and conf >= ENTER_CONF_MIN)
    if not wants_trade:
        log_trade_symbol(symbol, bars_df.index[-1], raw, target_w, conf, px, eq, DRY_RUN, note="no_trade_gate")
        return

    # (D) exposure-cap enforcement: if caps breached, block risk-increasing trades
    if block_risk and trade_increases_risk(cur_w, target_w):
        log_trade_symbol(symbol, bars_df.index[-1], raw, target_w, conf, px, eq, DRY_RUN, note="exposure_cap_block_risk_increase")
        return

    event_gap = _SEED_COOLDOWN_SEC if (SEED_FIRST_SHARE and not has_pos) else 30
    if not begin_order_event(symbol, event_gap):
        note = "order_event_cooldown_seed" if (SEED_FIRST_SHARE and not has_pos) else "order_event_cooldown_rebalance"
        log_trade_symbol(symbol, bars_df.index[-1], raw, target_w, conf, px, eq, DRY_RUN, note=note)
        return

    tradable, fractionable, _shortable = _asset_flags(symbol)
    if not tradable:
        log_trade_symbol(symbol, bars_df.index[-1], raw, target_w, conf, px, eq, DRY_RUN, note="not_tradable")
        return

    # (B) Hard-cap enforcement BEFORE policy rebalance
    cap = float(globals().get("WEIGHT_CAP", 0.40))
    cap_fix = enforce_position_caps_if_violated(trading_api, data_api, symbol, eq, cap)
    if int(cap_fix.get("order_submitted", 0)) == 1:
        stamp_order_event(symbol)
        log_trade_symbol(symbol, bars_df.index[-1], raw, target_w, conf, px, eq, DRY_RUN,
                         note="hard_cap_enforced_before_policy",
                         order_submitted=cap_fix.get("order_submitted", 0),
                         order_id=cap_fix.get("order_id", ""),
                         order_status=cap_fix.get("order_status", ""),
                         filled_qty=cap_fix.get("filled_qty", ""))
        return

    # Normal rebalance
    order_info = rebalance_to_weight(trading_api, data_api, symbol, eq, target_w)
    if int(order_info.get("order_submitted", 0)) == 1 or DRY_RUN:
        stamp_order_event(symbol)

    log_trade_symbol(symbol, bars_df.index[-1], raw, target_w, conf, px, eq, DRY_RUN,
                     note="rebalance_only",
                     order_submitted=order_info.get("order_submitted", 0),
                     order_id=order_info.get("order_id", ""),
                     order_status=order_info.get("order_status", ""),
                     filled_qty=order_info.get("filled_qty", ""))
    return


# ============================================================
# Live runner
# ============================================================
def run_live(tickers: List[str], trading_api: TradingClient, data_api: StockHistoricalDataClient):
    def minutes_to_close(trading_api: TradingClient) -> Optional[int]:
        clk = trading_api.get_clock()
        if getattr(clk, "is_open", False):
            close = pd.to_datetime(clk.next_close, utc=True)
            return int(max(0, (close - now_utc()).total_seconds() // 60))
        return None

    per_ticker: Dict[str, Tuple[PPO, Optional[VecNormalize], Optional[dict]]] = {}
    best = (globals().get("BEST_WINDOW_ENV") or None)

    for t in tickers:
        try:
            picks = pick_artifacts_for_ticker(t, os.getenv("ARTIFACTS_DIR", str(ARTIFACTS_DIR)), best_window=best)
            model = load_ppo_model(picks["model"])
            vecnorm = load_vecnormalize(picks.get("vecnorm"))
            if vecnorm is not None and hasattr(vecnorm, "training"):
                vecnorm.training = False
            if vecnorm is not None and hasattr(vecnorm, "norm_reward"):
                vecnorm.norm_reward = False
            feats = load_features(picks.get("features"))
            per_ticker[t] = (model, vecnorm, feats)
            logging.info("[%s] Artifacts loaded and ready.", t)
        except Exception as e:
            logging.exception("[%s] Failed to load artifacts: %s", t, e)

    if not per_ticker:
        raise RuntimeError("No models loaded for any ticker. Check artifacts directory and names.")

    loaded_syms = list(per_ticker.keys())
    logging.info("Starting live execution for (loaded): %s", loaded_syms)

    global _last_kill_ts
    cycle = 0
    last_plot_ts = 0
    flattened_today = False
    did_start_flat = False

    logging.info("Starting live trading loop")

    try:
        while True:
            if not ensure_market_open(trading_api):
                flattened_today = False
                globals()["SESSION_OPEN_EQUITY"] = None
                if not _sleep_until_open_or_exit(trading_api):
                    break
                continue

            if globals().get("SESSION_OPEN_EQUITY") is None:
                try:
                    globals()["SESSION_OPEN_EQUITY"] = float(trading_api.get_account().equity)
                    logging.info("Session open equity anchor set: %.2f", globals()["SESSION_OPEN_EQUITY"])
                except Exception as e:
                    logging.debug("Could not set SESSION_OPEN_EQUITY: %s", e)

            t_cycle_start = time.perf_counter()

            try:
                cycle_equity = float(trading_api.get_account().equity)
            except Exception as e:
                logging.warning("Could not fetch equity: %s", e)
                cycle_equity = float("nan")

            print(f"[HEARTBEAT] {now_utc().isoformat()} cycle={cycle} equity={cycle_equity:,.2f}", flush=True)
            _ = print_position_summary(trading_api)

            # (D) exposure caps: update state once per cycle (on loaded symbols)
            try:
                block_risk, gross, net = exposure_caps_update_and_flags(trading_api, loaded_syms)
                globals()["_EXPOSURE_CAPS_BLOCK_RISK"] = bool(block_risk)
                # Optional emergency flatten
                if maybe_emergency_flatten_on_exposure(trading_api, loaded_syms):
                    for s in loaded_syms:
                        _REENTRY_BLOCK_UNTIL[s] = time.time() + min(int(os.getenv("REENTRY_COOLDOWN_SEC", "300")), 120)
            except Exception as e:
                logging.warning("[EXPOSURE_CAP] update failed: %s", e)
                globals()["_EXPOSURE_CAPS_BLOCK_RISK"] = False

            # (A) START_FLAT strict (once, and only if there is anything to flatten)
            if (cycle == 0) and (not did_start_flat):
                did_start_flat = True
                start_flat = _to_bool(os.getenv("START_FLAT", "1"))

                need_flatten = False
                mv_dust = float(os.getenv("START_FLAT_MV_DUST", "50.0"))

                try:
                    pos = trading_api.get_all_positions() or []
                    mv_map = {str(p.symbol).upper(): float(getattr(p, "market_value", 0.0) or 0.0) for p in pos}
                    for s in loaded_syms:
                        if abs(mv_map.get(s.upper(), 0.0)) > mv_dust:
                            need_flatten = True
                            break
                except Exception:
                    pass

                try:
                    oo = list_open_orders(trading_api, symbols=loaded_syms)
                    if oo:
                        need_flatten = True
                except Exception:
                    pass

                if start_flat and need_flatten:
                    logging.warning("START_FLAT=1: strict flatten (cancel orders + close positions + wait flat).")
                    ok = flatten_symbols_strict(
                        trading_api,
                        loaded_syms,
                        dust_qty=float(os.getenv("DUST_QTY", "0.000001")),
                        timeout_sec=int(os.getenv("START_FLAT_TIMEOUT_SEC", "150")),
                    )
                    logging.info("Flatten wait complete: %s", ok)
                    for s in loaded_syms:
                        _REENTRY_BLOCK_UNTIL[s] = time.time() + min(int(os.getenv("REENTRY_COOLDOWN_SEC", "300")), 60)

            # per-symbol step
            for sym, (model, vecnorm, feat_hint) in per_ticker.items():
                t_sym_start = time.perf_counter()
                try:
                    _call_with_timeout(run_live_once_for_symbol, 15, trading_api, data_api, sym, model, vecnorm, feat_hint, cycle_equity)
                except Exception as e:
                    logging.warning("[%s] symbol step timeout/fail: %s", sym, e)
                finally:
                    logging.info("[TIMER] %s symbol work: %.3fs", sym, time.perf_counter() - t_sym_start)

            # equity snapshot + run summary
            maybe_log_equity_snapshot(trading_api_in=trading_api, reason=("trade" if globals().get("_TRADE_EVENT_FLAG", False) else "cycle"))
            try:
                append_run_summary(trading_api, loaded_syms, RESULTS_DIR, latest_dir=LATEST_DIR)
            except Exception as e:
                logging.warning("append_run_summary failed: %s", e)
                try:
                    append_run_summary(trading_api, loaded_syms, RESULTS_DIR, latest_dir=LATEST_DIR, error=f"callsite:{e}")
                except Exception:
                    pass

            # Kill-switch
            try:
                anchor = globals().get("SESSION_OPEN_EQUITY", None)
                if anchor is not None:
                    eq_now = float(trading_api.get_account().equity)
                    dd = (eq_now / max(1e-9, float(anchor))) - 1.0
                    max_dd = float(os.getenv("MAX_DAILY_DRAWDOWN_PCT", str(globals().get("MAX_DAILY_DRAWDOWN_PCT", 0.05))))
                    if dd <= -abs(max_dd):
                        if time.time() - _last_kill_ts > 60:
                            logging.warning("KILL-SWITCH: daily drawdown %.2f%% reached. Flattening & pausing.", 100.0 * dd)
                            flatten_symbols_strict(trading_api, loaded_syms, timeout_sec=150)
                            _last_kill_ts = time.time()
                            try:
                                maybe_log_equity_snapshot(trading_api_in=trading_api, reason="trade")
                                append_run_summary(trading_api, loaded_syms, RESULTS_DIR, latest_dir=LATEST_DIR, error="kill_switch_flatten")
                            except Exception:
                                pass
                            cooldown_min = int(os.getenv("KILL_SWITCH_COOLDOWN_MIN", str(globals().get("KILL_SWITCH_COOLDOWN_MIN", 30))))
                            if not DRY_RUN:
                                time.sleep(60 * cooldown_min)
                            continue
            except Exception as e:
                logging.debug("kill-switch check failed: %s", e)

            # Flatten into close
            m2c = minutes_to_close(trading_api)
            if FLATTEN_INTO_CLOSE and not flattened_today and m2c is not None and m2c <= 5:
                logging.info("Flattening into close.")
                flatten_symbols_strict(trading_api, loaded_syms, timeout_sec=150)
                for s in loaded_syms:
                    _REENTRY_BLOCK_UNTIL[s] = time.time() + int(os.getenv("REENTRY_COOLDOWN_SEC", "300"))
                maybe_log_equity_snapshot(trading_api_in=trading_api, reason="close")
                flattened_today = True
                if bool(globals().get("EXIT_AFTER_CLOSE", False)):
                    logging.info("EXIT_AFTER_CLOSE=True — exiting live loop after close flatten.")
                    break

            # plots + metrics
            now_ts = time.time()
            if now_ts - last_plot_ts >= 900:
                try:
                    plot_equity_curve(from_equity_csv=True)
                    df = pd.read_csv(EQUITY_LOG_CSV, parse_dates=["datetime_utc"])
                    m = compute_performance_metrics(df)
                    logging.info("Perf: cum_return=%.2f%% | sharpe=%.2f | maxDD=%.2f%%",
                                 100 * m["cum_return"], m["sharpe"], 100 * m["max_drawdown"])
                except Exception as e:
                    logging.warning("Plot/metrics failed: %s", e)
                last_plot_ts = now_ts

            logging.info("[TIMER] full-cycle active time: %.3fs (cooldown=%d min)",
                         time.perf_counter() - t_cycle_start, COOLDOWN_MIN)

            cycle += 1
            if (cycle % 12) == 0:
                gc.collect()

            _sleep_to_next_minute_block(COOLDOWN_MIN)

    except KeyboardInterrupt:
        logging.info("KeyboardInterrupt: stopping live loop.")
    except Exception as e:
        logging.exception("Live loop exception: %s", e)
        try:
            log_equity_snapshot(trading_api_in=trading_api)
        except Exception:
            pass
    finally:
        global _TIMEOUT_EXEC
        try:
            _TIMEOUT_EXEC.shutdown(wait=False, cancel_futures=True)
        except Exception:
            pass
        _TIMEOUT_EXEC = ThreadPoolExecutor(max_workers=8)

        try:
            if FORCE_FLATTEN_ON_EXIT:
                flatten_symbols_strict(trading_api, tickers, timeout_sec=180)
        except Exception as e:
            logging.warning("Flatten-on-exit skipped: %s", e)

        try:
            maybe_log_equity_snapshot(trading_api_in=trading_api, reason="finalize")
            plot_equity_curve(from_equity_csv=True)
        except Exception as e:
            logging.warning("Finalization failed: %s", e)

        logging.info("Live loop exited cleanly.")


# ============================================================
# Logging setup after paths
# ============================================================
def setup_logging_after_paths():
    warnings.filterwarnings("default")
    level = getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO)

    root = logging.getLogger()
    root.handlers.clear()
    root.setLevel(level)

    fmt = logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")

    sh = logging.StreamHandler(sys.stdout)
    sh.setLevel(level)
    sh.setFormatter(fmt)
    root.addHandler(sh)

    log_path = RESULTS_DIR / "live_loop.log"
    fh = logging.FileHandler(log_path)
    fh.setLevel(level)
    fh.setFormatter(fmt)
    root.addHandler(fh)

    try:
        sys.stdout.reconfigure(line_buffering=True)
    except Exception:
        pass

def log_config_banner():
    try:
        artifacts_list = sorted(p.name for p in ARTIFACTS_DIR.iterdir()) if ARTIFACTS_DIR.exists() else []
    except Exception:
        artifacts_list = []
    logging.info("EXIT_AFTER_CLOSE      : %s", os.getenv("EXIT_AFTER_CLOSE", "0"))
    logging.info("FORCE_FIRST_BUY       : %s", FORCE_FIRST_BUY)
    logging.info("FORCE_FLATTEN_ON_EXIT : %s", FORCE_FLATTEN_ON_EXIT)
    logging.info("CONFIG")
    logging.info("Project root          : %s", PROJECT_ROOT)
    logging.info("ARTIFACTS_DIR         : %s", ARTIFACTS_DIR)
    logging.info("RESULTS_DIR           : %s", RESULTS_DIR)
    logging.info("Tickers               : %s", TICKERS)
    logging.info("API base              : %s", BASE_URL)
    logging.info("AUTO_RUN_LIVE         : %s", os.getenv("AUTO_RUN_LIVE", ""))
    logging.info("INF_DETERMINISTIC     : %s", INF_DETERMINISTIC)
    logging.info("ALLOW_SHORTS          : %s", ALLOW_SHORTS)
    logging.info("FLATTEN_INTO_CLOSE    : %s", FLATTEN_INTO_CLOSE)
    logging.info("DRY_RUN=%s | BARS_FEED=%s | USE_FRACTIONALS=%s | COOLDOWN_MIN=%s | STALE_MAX_SEC=%s",
                 DRY_RUN, BARS_FEED, USE_FRACTIONALS, COOLDOWN_MIN, STALE_MAX_SEC)
    logging.info("DATA_TIMEFRAME        : %s (model bars)", os.getenv("DATA_TIMEFRAME", "1H"))
    logging.info("EQUITY_TIMEFRAME      : %s (equity reporting)", os.getenv("EQUITY_TIMEFRAME", "5Min"))
    logging.info("MAX_DD_PCT: %.3f | KILL_SWITCH_COOLDOWN_MIN: %s",
                 float(globals().get("MAX_DAILY_DRAWDOWN_PCT", 0.05)),
                 os.getenv("KILL_SWITCH_COOLDOWN_MIN", str(globals().get("KILL_SWITCH_COOLDOWN_MIN", 30))))
    logging.info("WEIGHT_CAP: %.3f | SIZING_MODE: %s | ENTER_CONF_MIN: %.3f | ENTER_WEIGHT_MIN: %.3f | EXIT_WEIGHT_MAX: %.3f | REBALANCE_MIN_NOTIONAL: %.2f",
                 WEIGHT_CAP, SIZING_MODE, ENTER_CONF_MIN, ENTER_WEIGHT_MIN, EXIT_WEIGHT_MAX, REBALANCE_MIN_NOTIONAL)
    logging.info("TAKE_PROFIT_PCT: %.3f | STOP_LOSS_PCT: %.3f | BEST_WINDOW_ENV: %s",
                 TAKE_PROFIT_PCT, STOP_LOSS_PCT, (BEST_WINDOW_ENV or ""))
    logging.info("GROSS_CAP: %.3f | NET_CAP: %.3f | EMERGENCY_FLATTEN_ON_EXPOSURE: %s",
                 float(globals().get("GROSS_CAP", 1.0)), float(globals().get("NET_CAP", 1.0)),
                 bool(globals().get("EMERGENCY_FLATTEN_ON_EXPOSURE", False)))
    if artifacts_list:
        logging.info("Artifacts present (%d): %s", len(artifacts_list), ", ".join(artifacts_list))


# ============================================================
# Artifacts housekeeping
# ============================================================
def dedupe_artifacts_dir(artifacts_dir: Path) -> None:
    dupes = sorted(artifacts_dir.glob("* (1).*"))
    if not dupes:
        logging.info("No duplicate (1) artifacts found.")
        return
    backup = artifacts_dir / "_dupes_backup"
    backup.mkdir(parents=True, exist_ok=True)
    for p in dupes:
        try:
            p.rename(backup / p.name)
        except Exception:
            pass
    logging.warning("Moved %d duplicate artifacts to %s", len(dupes), backup)

def _is_colab_runtime() -> bool:
    # IN_COLAB is already set earlier in your file, but keep this robust.
    try:
        return bool(globals().get("IN_COLAB", False))
    except Exception:
        return False

def colab_interactive_upload_before_init(project_root: Path) -> None:
    if not _is_colab_runtime():
        return

    # Allow opt-out without changing any non-Colab behavior.
    if str(os.getenv("COLAB_INTERACTIVE_UPLOAD", "1")).strip().lower() in ("0", "false", "no", "off"):
        logging.info("COLAB_INTERACTIVE_UPLOAD=0 -> skipping Colab upload dialogs.")
        return

    try:
        from google.colab import files  # type: ignore
    except Exception:
        logging.warning("Colab detected but google.colab.files unavailable; skipping upload dialogs.")
        return

    project_root = Path(project_root)
    project_root.mkdir(parents=True, exist_ok=True)

    # IMPORTANT: ARTIFACTS_DIR is not configured yet, so we use env/default path now.
    artifacts_dir = Path(os.getenv("ARTIFACTS_DIR", str(project_root / "artifacts")))
    artifacts_dir.mkdir(parents=True, exist_ok=True)

    # ---- (1) Upload .env
    logging.info("[COLAB UPLOAD] Please upload your .env file (contains Alpaca API keys).")
    logging.info("[COLAB UPLOAD] If you already have %s, you can cancel/skip.", str(project_root / ".env"))
    try:
        uploaded_env = files.upload()  # opens upload dialog
    except Exception as e:
        logging.warning("[COLAB UPLOAD] .env upload dialog failed: %s", e)
        uploaded_env = {}

    env_saved = False
    if uploaded_env:
        # Prefer a file literally named ".env", otherwise take the first upload.
        chosen_name = ".env" if ".env" in uploaded_env else next(iter(uploaded_env.keys()))
        try:
            env_bytes = uploaded_env[chosen_name]
            env_path = project_root / ".env"
            env_path.write_bytes(env_bytes)
            logging.info("[COLAB UPLOAD] Saved .env -> %s", str(env_path.resolve()))
            env_saved = True
        except Exception as e:
            logging.warning("[COLAB UPLOAD] Failed saving .env: %s", e)

    if not env_saved:
        logging.info("[COLAB UPLOAD] No .env uploaded (or save failed). Will proceed with existing env/drive/local .env resolution.")

    # ---- (2) Upload artifacts
    logging.info("[COLAB UPLOAD] Now upload PPO artifact files (model .zip, vecnorm .pkl, features .json, etc.).")
    logging.info("[COLAB UPLOAD] Target artifacts directory: %s", str(artifacts_dir.resolve()))
    try:
        uploaded_artifacts = files.upload()  # opens upload dialog
    except Exception as e:
        logging.warning("[COLAB UPLOAD] Artifact upload dialog failed: %s", e)
        uploaded_artifacts = {}

    saved_files: List[str] = []
    skipped_files: List[str] = []

    if uploaded_artifacts:
        for name, data in uploaded_artifacts.items():
            try:
                # If user accidentally re-uploads .env here, keep it out of ARTIFACTS_DIR
                if str(name).strip() == ".env":
                    skipped_files.append(name)
                    continue

                out_path = artifacts_dir / Path(name).name
                out_path.write_bytes(data)
                saved_files.append(out_path.name)
            except Exception as e:
                logging.warning("[COLAB UPLOAD] Failed saving artifact %s: %s", name, e)

    if saved_files:
        logging.info("[COLAB UPLOAD] Saved %d artifact file(s) into %s",
                     len(saved_files), str(artifacts_dir.resolve()))
        logging.info("[COLAB UPLOAD] Files: %s", ", ".join(saved_files))
    else:
        logging.info("[COLAB UPLOAD] No artifacts uploaded. Will proceed using existing ARTIFACTS_DIR contents.")

    if skipped_files:
        logging.info("[COLAB UPLOAD] Skipped files (not treated as artifacts): %s", ", ".join(skipped_files))

# ============================================================
# Main
# ============================================================
if __name__ == "__main__":

    dedupe_artifacts_dir(Path(os.getenv("ARTIFACTS_DIR", str(PROJECT_ROOT / "artifacts"))))
    colab_interactive_upload_before_init(PROJECT_ROOT)
    cfg = configure_knobs(overrides={
        # data freshness
        "BARS_FEED": "",
        "STALE_MAX_SEC": 4200,

        # sizing
        "SIZING_MODE": "linear",
        "CONF_FLOOR": 0.00,
        "WEIGHT_CAP": 0.40,

        # entry/exit sensitivity
        "ENTER_CONF_MIN": 0.02,
        "ENTER_WEIGHT_MIN": 0.002,
        "EXIT_WEIGHT_MAX": 0.001,
        "DELTA_WEIGHT_MIN": 0.002,
        "REBALANCE_MIN_NOTIONAL": 25.00,

        # posture
        "ALLOW_SHORTS": True,
        "COOLDOWN_MIN": 10,

        # raw-action gates
        "RAW_POS_MIN": 0.00,
        "RAW_NEG_MAX": 0.00,

        # risk
        "TAKE_PROFIT_PCT": 0.05,
        "STOP_LOSS_PCT": 0.02,

        # logging cadence
        "EQUITY_LOG_THROTTLE_SEC": 300,
        "SKIP_EQUITY_WHEN_DRY_RUN": False,

        # kill-switch
        "MAX_DAILY_DRAWDOWN_PCT": 0.05,

        # (D) caps
        "GROSS_CAP": float(os.getenv("GROSS_CAP", "1.00")),
        "NET_CAP": float(os.getenv("NET_CAP", "0.80")),
        "EMERGENCY_FLATTEN_ON_EXPOSURE": _to_bool(os.getenv("EMERGENCY_FLATTEN_ON_EXPOSURE", "0")),
    })
    globals()["cfg"] = cfg

    setup_logging_after_paths()

    tf_key = normalize_tf_key(cfg.DATA_TIMEFRAME)
    LIVE_TIMEFRAME = _TF_MAP.get(tf_key, TimeFrame.Hour)
    globals()["LIVE_TIMEFRAME"] = LIVE_TIMEFRAME

    trained_key = normalize_tf_key(str(cfg.TRAIN_TIMEFRAME))
    live_key = normalize_tf_key(str(cfg.DATA_TIMEFRAME))
    if trained_key != live_key:
        logging.warning("Timeframe mismatch: trained=%s live=%s. Only change DATA_TIMEFRAME if you retrained.",
                        cfg.TRAIN_TIMEFRAME, cfg.DATA_TIMEFRAME)

    if cfg.AUTO_RUN_LIVE:
        assert "paper-api" in BASE_URL.lower()

    log_config_banner()
    logging.info("DATA_TIMEFRAME=%s -> LIVE_TIMEFRAME=%s", cfg.DATA_TIMEFRAME, LIVE_TIMEFRAME)

    # Save run config snapshot
    try:
        cfg_path = RESULTS_DIR / "run_config.json"
        payload = {
            "time": utcnow_iso(),
            "tickers": TICKERS,
            "dry_run": DRY_RUN,
            "bars_feed": BARS_FEED,
            "weight_cap": WEIGHT_CAP,
            "enter_conf_min": ENTER_CONF_MIN,
            "enter_weight_min": ENTER_WEIGHT_MIN,
            "exit_weight_max": EXIT_WEIGHT_MAX,
            "rebalance_min_notional": REBALANCE_MIN_NOTIONAL,
            "delta_weight_min": DELTA_WEIGHT_MIN,
            "tp": TAKE_PROFIT_PCT,
            "sl": STOP_LOSS_PCT,
            "allow_shorts": ALLOW_SHORTS,
            "gross_cap": float(globals().get("GROSS_CAP", 1.0)),
            "net_cap": float(globals().get("NET_CAP", 1.0)),
            "emergency_flatten_on_exposure": bool(globals().get("EMERGENCY_FLATTEN_ON_EXPOSURE", False)),
        }
        tmp = cfg_path.with_suffix(".tmp")
        tmp.write_text(json.dumps(payload, indent=2))
        tmp.replace(cfg_path)
    except Exception as e:
        logging.warning("Could not write run_config.json: %s", e)

    assert "paper-api" in BASE_URL.lower(), f"Refusing to trade: BASE_URL is not paper ({BASE_URL})"

    trading_api, data_api = init_clients()
    globals()["trading_api"] = trading_api
    globals()["data_api"] = data_api

    acct = trading_api.get_account()
    logging.info("SHORT CHECK (pre): ALLOW_SHORTS=%s | acct.shorting_enabled=%s",
                 globals().get("ALLOW_SHORTS", None), getattr(acct, "shorting_enabled", None))
    for s in TICKERS:
        try:
            a = trading_api.get_asset(s)
            logging.info("[%s] asset.shortable=%s | tradable=%s | fractionable=%s",
                         s, getattr(a, "shortable", None), getattr(a, "tradable", None), getattr(a, "fractionable", None))
        except Exception as e:
            logging.info("[%s] get_asset failed: %s", s, e)

    assert not bool(getattr(acct, "trading_blocked", False)), f"Trading is blocked: {getattr(acct,'status','')}"
    logging.info("Account status: %s | equity=%s | cash=%s", acct.status, acct.equity, acct.cash)

    if cfg.AUTO_RUN_LIVE:
        run_live(TICKERS, trading_api, data_api)
    else:
        logging.info("AUTO_RUN_LIVE disabled; live loop not started.")

Mounted at /content/drive
2026-02-27 17:37:37,683 | INFO | No duplicate (1) artifacts found.
2026-02-27 17:37:37,689 | INFO | [COLAB UPLOAD] Please upload your .env file (contains Alpaca API keys).
2026-02-27 17:37:37,690 | INFO | [COLAB UPLOAD] If you already have /content/drive/MyDrive/AlpacaPaper/.env, you can cancel/skip.


  return datetime.utcnow().replace(tzinfo=utc)


Saving Alpaca_keys.env.txt to Alpaca_keys.env.txt
2026-02-27 17:37:49,783 | INFO | [COLAB UPLOAD] Saved .env -> /content/drive/MyDrive/AlpacaPaper/.env
2026-02-27 17:37:49,784 | INFO | [COLAB UPLOAD] Now upload PPO artifact files (model .zip, vecnorm .pkl, features .json, etc.).
2026-02-27 17:37:49,786 | INFO | [COLAB UPLOAD] Target artifacts directory: /content/drive/MyDrive/AlpacaPaper/artifacts


Saving ppo_GE_window1_features.json to ppo_GE_window1_features.json
Saving ppo_GE_window1_model_info.json to ppo_GE_window1_model_info.json
Saving ppo_GE_window1_model.zip to ppo_GE_window1_model.zip
Saving ppo_GE_window1_probability_config.json to ppo_GE_window1_probability_config.json
Saving ppo_GE_window1_vecnorm.pkl to ppo_GE_window1_vecnorm.pkl
Saving ppo_UNH_window3_features.json to ppo_UNH_window3_features.json
Saving ppo_UNH_window3_model_info.json to ppo_UNH_window3_model_info.json
Saving ppo_UNH_window3_model.zip to ppo_UNH_window3_model.zip
Saving ppo_UNH_window3_probability_config.json to ppo_UNH_window3_probability_config.json
Saving ppo_UNH_window3_vecnorm.pkl to ppo_UNH_window3_vecnorm.pkl
2026-02-27 17:38:05,356 | INFO | [COLAB UPLOAD] Saved 10 artifact file(s) into /content/drive/MyDrive/AlpacaPaper/artifacts
2026-02-27 17:38:05,357 | INFO | [COLAB UPLOAD] Files: ppo_GE_window1_features.json, ppo_GE_window1_model_info.json, ppo_GE_window1_model.zip, ppo_GE_window1_prob

  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 17:38:06,246 | INFO | FORCE_FIRST_BUY       : False
2026-02-27 17:38:06,268 | INFO | FORCE_FLATTEN_ON_EXIT : False
2026-02-27 17:38:06,273 | INFO | CONFIG
2026-02-27 17:38:06,276 | INFO | Project root          : /content/drive/MyDrive/AlpacaPaper
2026-02-27 17:38:06,279 | INFO | ARTIFACTS_DIR         : /content/drive/MyDrive/AlpacaPaper/artifacts
2026-02-27 17:38:06,281 | INFO | RESULTS_DIR           : /content/drive/MyDrive/AlpacaPaper/results/2026-02-27
2026-02-27 17:38:06,283 | INFO | Tickers               : ['UNH', 'GE']
2026-02-27 17:38:06,286 | INFO | API base              : https://paper-api.alpaca.markets
2026-02-27 17:38:06,288 | INFO | AUTO_RUN_LIVE         : 1
2026-02-27 17:38:06,290 | INFO | INF_DETERMINISTIC     : True
2026-02-27 17:38:06,291 | INFO | ALLOW_SHORTS          : True
2026-02-27 17:38:06,293 | INFO | FLATTEN_INTO_CLOSE    : False
2026-02-27 17:38:06,294 | INFO | DRY_RUN=False | BARS_FEED= | USE_FRACTIONALS=True | COOLDOWN_MIN=10 | STALE_MAX_SEC=4200


  return datetime.utcnow().replace(tzinfo=utc)



Position Summary:
  GE: -7.0 shares @ $341.08 | Value: $-2,387.59
  UNH: -127.0 shares @ $292.57 | Value: $-37,157.03

Total Market Value: $-39,544.62
2026-02-27 17:38:13,012 | INFO | [EXPOSURE_CAP] ok: gross=0.415 (cap=1.000) | net=-0.415 (cap=0.800)
2026-02-27 17:38:13,126 | INFO | [UNH] close_position submitted.
2026-02-27 17:38:13,185 | INFO | [GE] close_position submitted.
2026-02-27 17:38:14,728 | INFO | Flatten wait complete: True
2026-02-27 17:38:14,879 | INFO | [UNH] Re-entry cooldown active (59s left); skipping.


  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 17:38:15,975 | INFO | [TIMER] UNH symbol work: 1.245s
2026-02-27 17:38:16,021 | INFO | [GE] Re-entry cooldown active (58s left); skipping.


  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 17:38:16,236 | INFO | [TIMER] GE symbol work: 0.260s
Saved equity curve → /content/drive/MyDrive/AlpacaPaper/results/2026-02-27/equity_curve.png
Updated latest copy → /content/drive/MyDrive/AlpacaPaper/results/latest/equity_curve.png
2026-02-27 17:38:18,157 | INFO | Perf: cum_return=-0.12% | sharpe=-28.36 | maxDD=-0.16%
2026-02-27 17:38:18,158 | INFO | [TIMER] full-cycle active time: 5.325s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T17:40:00.144732+00:00 cycle=1 equity=95,267.16

Position Summary:
  (no open positions)

Total Market Value: $0.00
2026-02-27 17:40:00,335 | INFO | [EXPOSURE_CAP] ok: gross=0.000 (cap=1.000) | net=0.000 (cap=0.800)
2026-02-27 17:40:00,489 | INFO | [TIMER] UNH symbol work: 0.152s
2026-02-27 17:40:00,593 | INFO | [TIMER] GE symbol work: 0.102s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 17:40:00,733 | INFO | [TIMER] full-cycle active time: 0.629s (cooldown=10 min)
[HEARTBEAT] 2026-02-27T17:50:00.142014+00:00 cycle=2 equity=95,267.16

Position Summary:
  (no open positions)

Total Market Value: $0.00
2026-02-27 17:50:00,237 | INFO | [EXPOSURE_CAP] ok: gross=0.000 (cap=1.000) | net=0.000 (cap=0.800)
2026-02-27 17:50:00,445 | INFO | [TIMER] UNH symbol work: 0.206s
2026-02-27 17:50:00,539 | INFO | [TIMER] GE symbol work: 0.092s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 17:50:00,937 | INFO | [TIMER] full-cycle active time: 0.832s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T18:00:00.140316+00:00 cycle=3 equity=95,267.16

Position Summary:
  (no open positions)

Total Market Value: $0.00
2026-02-27 18:00:00,238 | INFO | [EXPOSURE_CAP] ok: gross=0.000 (cap=1.000) | net=0.000 (cap=0.800)
2026-02-27 18:00:02,029 | INFO | [TIMER] UNH symbol work: 1.790s
2026-02-27 18:00:02,222 | INFO | [TIMER] GE symbol work: 0.192s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


Saved equity curve → /content/drive/MyDrive/AlpacaPaper/results/2026-02-27/equity_curve.png
Updated latest copy → /content/drive/MyDrive/AlpacaPaper/results/latest/equity_curve.png
2026-02-27 18:00:03,150 | INFO | Perf: cum_return=-0.13% | sharpe=-26.47 | maxDD=-0.16%
2026-02-27 18:00:03,152 | INFO | [TIMER] full-cycle active time: 3.045s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T18:10:00.222621+00:00 cycle=4 equity=95,267.16

Position Summary:
  (no open positions)

Total Market Value: $0.00
2026-02-27 18:10:00,412 | INFO | [EXPOSURE_CAP] ok: gross=0.000 (cap=1.000) | net=0.000 (cap=0.800)
2026-02-27 18:10:00,582 | INFO | [TIMER] UNH symbol work: 0.169s
2026-02-27 18:10:00,632 | INFO | [TIMER] GE symbol work: 0.049s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 18:10:00,947 | INFO | [TIMER] full-cycle active time: 0.841s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T18:20:00.179986+00:00 cycle=5 equity=95,267.16

Position Summary:
  (no open positions)

Total Market Value: $0.00
2026-02-27 18:20:00,305 | INFO | [EXPOSURE_CAP] ok: gross=0.000 (cap=1.000) | net=0.000 (cap=0.800)
2026-02-27 18:20:00,507 | INFO | [UNH] predict() ok → raw=-0.4382 target_w=-0.1753 conf=0.438


  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 18:20:00,727 | INFO | [UNH] Submitted sell qty=56.0
2026-02-27 18:20:00,760 | INFO | [TIMER] UNH symbol work: 0.454s
2026-02-27 18:20:00,884 | INFO | [GE] predict() ok → raw=0.1920 target_w=0.0768 conf=0.192
2026-02-27 18:20:01,017 | INFO | [GE] Submitted buy notional=$7317.58
2026-02-27 18:20:01,058 | INFO | [TIMER] GE symbol work: 0.296s


  return datetime.utcnow().replace(tzinfo=utc)


Saved equity curve → /content/drive/MyDrive/AlpacaPaper/results/2026-02-27/equity_curve.png
Updated latest copy → /content/drive/MyDrive/AlpacaPaper/results/latest/equity_curve.png
2026-02-27 18:20:01,908 | INFO | Perf: cum_return=-0.13% | sharpe=-24.23 | maxDD=-0.16%
2026-02-27 18:20:01,910 | INFO | [TIMER] full-cycle active time: 1.769s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T18:30:00.155003+00:00 cycle=6 equity=95,260.07

Position Summary:
  GE: 21.624054373 shares @ $338.37 | Value: $7,316.93
  UNH: -56.0 shares @ $293.15 | Value: $-16,416.68

Total Market Value: $-9,099.75
2026-02-27 18:30:00,308 | INFO | [EXPOSURE_CAP] ok: gross=0.249 (cap=1.000) | net=-0.096 (cap=0.800)
2026-02-27 18:30:00,474 | INFO | [TIMER] UNH symbol work: 0.164s
2026-02-27 18:30:00,603 | INFO | [TIMER] GE symbol work: 0.127s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 18:30:00,915 | INFO | [TIMER] full-cycle active time: 0.809s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T18:40:00.149203+00:00 cycle=7 equity=95,287.32

Position Summary:
  GE: 21.624054373 shares @ $337.89 | Value: $7,306.66
  UNH: -56.0 shares @ $292.49 | Value: $-16,379.16

Total Market Value: $-9,072.50
2026-02-27 18:40:00,272 | INFO | [EXPOSURE_CAP] ok: gross=0.249 (cap=1.000) | net=-0.095 (cap=0.800)
2026-02-27 18:40:00,495 | INFO | [TIMER] UNH symbol work: 0.221s
2026-02-27 18:40:00,565 | INFO | [TIMER] GE symbol work: 0.069s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


Saved equity curve → /content/drive/MyDrive/AlpacaPaper/results/2026-02-27/equity_curve.png
Updated latest copy → /content/drive/MyDrive/AlpacaPaper/results/latest/equity_curve.png
2026-02-27 18:40:01,423 | INFO | Perf: cum_return=-0.10% | sharpe=-17.72 | maxDD=-0.17%
2026-02-27 18:40:01,424 | INFO | [TIMER] full-cycle active time: 1.327s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T18:50:00.164013+00:00 cycle=8 equity=95,257.07

Position Summary:
  GE: 21.624054373 shares @ $338.27 | Value: $7,314.77
  UNH: -56.0 shares @ $293.17 | Value: $-16,417.52

Total Market Value: $-9,102.75
2026-02-27 18:50:00,293 | INFO | [EXPOSURE_CAP] ok: gross=0.249 (cap=1.000) | net=-0.096 (cap=0.800)
2026-02-27 18:50:00,437 | INFO | [TIMER] UNH symbol work: 0.142s
2026-02-27 18:50:00,601 | INFO | [TIMER] GE symbol work: 0.162s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 18:50:00,971 | INFO | [TIMER] full-cycle active time: 0.844s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T19:00:00.161303+00:00 cycle=9 equity=95,290.94

Position Summary:
  GE: 21.624054373 shares @ $338.27 | Value: $7,314.88
  UNH: -56.0 shares @ $292.60 | Value: $-16,385.60

Total Market Value: $-9,070.72
2026-02-27 19:00:00,260 | INFO | [EXPOSURE_CAP] ok: gross=0.249 (cap=1.000) | net=-0.095 (cap=0.800)
2026-02-27 19:00:00,469 | INFO | [TIMER] UNH symbol work: 0.208s
2026-02-27 19:00:00,526 | INFO | [TIMER] GE symbol work: 0.056s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


Saved equity curve → /content/drive/MyDrive/AlpacaPaper/results/2026-02-27/equity_curve.png
Updated latest copy → /content/drive/MyDrive/AlpacaPaper/results/latest/equity_curve.png
2026-02-27 19:00:01,197 | INFO | Perf: cum_return=-0.10% | sharpe=-15.89 | maxDD=-0.18%
2026-02-27 19:00:01,199 | INFO | [TIMER] full-cycle active time: 1.093s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T19:10:00.160597+00:00 cycle=10 equity=95,307.60

Position Summary:
  GE: 21.624054373 shares @ $338.82 | Value: $7,326.66
  UNH: -56.0 shares @ $292.48 | Value: $-16,378.88

Total Market Value: $-9,052.22
2026-02-27 19:10:00,307 | INFO | [EXPOSURE_CAP] ok: gross=0.249 (cap=1.000) | net=-0.095 (cap=0.800)
2026-02-27 19:10:00,457 | INFO | [TIMER] UNH symbol work: 0.148s
2026-02-27 19:10:00,508 | INFO | [TIMER] GE symbol work: 0.049s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 19:10:00,857 | INFO | [TIMER] full-cycle active time: 0.758s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T19:20:00.187311+00:00 cycle=11 equity=95,367.57

Position Summary:
  GE: 21.624054373 shares @ $339.34 | Value: $7,337.91
  UNH: -56.0 shares @ $291.61 | Value: $-16,330.16

Total Market Value: $-8,992.25
2026-02-27 19:20:00,285 | INFO | [EXPOSURE_CAP] ok: gross=0.248 (cap=1.000) | net=-0.094 (cap=0.800)
2026-02-27 19:20:00,455 | INFO | [UNH] predict() ok → raw=-0.4372 target_w=-0.1749 conf=0.437


  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 19:20:00,681 | INFO | [UNH] Submitted sell qty=57.0
2026-02-27 19:20:00,710 | INFO | [TIMER] UNH symbol work: 0.423s
2026-02-27 19:20:00,809 | INFO | [GE] predict() ok → raw=0.1924 target_w=0.0770 conf=0.192
2026-02-27 19:20:00,921 | INFO | [GE] Submitted buy notional=$7340.18
2026-02-27 19:20:00,953 | INFO | [TIMER] GE symbol work: 0.241s


  return datetime.utcnow().replace(tzinfo=utc)


Saved equity curve → /content/drive/MyDrive/AlpacaPaper/results/2026-02-27/equity_curve.png
Updated latest copy → /content/drive/MyDrive/AlpacaPaper/results/latest/equity_curve.png
2026-02-27 19:20:01,797 | INFO | Perf: cum_return=-0.02% | sharpe=-3.03 | maxDD=-0.18%
2026-02-27 19:20:01,800 | INFO | [TIMER] full-cycle active time: 1.647s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T19:30:00.219851+00:00 cycle=12 equity=95,369.92

Position Summary:
  GE: 43.249709903 shares @ $339.44 | Value: $14,680.47
  UNH: -113.0 shares @ $291.60 | Value: $-32,950.25

Total Market Value: $-18,269.78
2026-02-27 19:30:00,314 | INFO | [EXPOSURE_CAP] ok: gross=0.499 (cap=1.000) | net=-0.192 (cap=0.800)
2026-02-27 19:30:00,482 | INFO | [TIMER] UNH symbol work: 0.166s
2026-02-27 19:30:00,558 | INFO | [TIMER] GE symbol work: 0.074s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 19:30:00,920 | INFO | [TIMER] full-cycle active time: 0.733s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T19:40:00.156554+00:00 cycle=13 equity=95,355.69

Position Summary:
  GE: 43.249709903 shares @ $339.55 | Value: $14,685.44
  UNH: -113.0 shares @ $291.76 | Value: $-32,969.44

Total Market Value: $-18,284.01
2026-02-27 19:40:00,312 | INFO | [EXPOSURE_CAP] ok: gross=0.500 (cap=1.000) | net=-0.192 (cap=0.800)
2026-02-27 19:40:00,482 | INFO | [TIMER] UNH symbol work: 0.167s
2026-02-27 19:40:00,585 | INFO | [TIMER] GE symbol work: 0.102s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


Saved equity curve → /content/drive/MyDrive/AlpacaPaper/results/2026-02-27/equity_curve.png
Updated latest copy → /content/drive/MyDrive/AlpacaPaper/results/latest/equity_curve.png
2026-02-27 19:40:01,476 | INFO | Perf: cum_return=-0.04% | sharpe=-4.35 | maxDD=-0.18%
2026-02-27 19:40:01,478 | INFO | [TIMER] full-cycle active time: 1.378s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T19:50:00.268209+00:00 cycle=14 equity=95,350.27

Position Summary:
  GE: 43.249709903 shares @ $339.49 | Value: $14,682.84
  UNH: -113.0 shares @ $291.79 | Value: $-32,972.27

Total Market Value: $-18,289.43
2026-02-27 19:50:00,435 | INFO | [EXPOSURE_CAP] ok: gross=0.500 (cap=1.000) | net=-0.192 (cap=0.800)
2026-02-27 19:50:00,576 | INFO | [TIMER] UNH symbol work: 0.139s
2026-02-27 19:50:00,622 | INFO | [TIMER] GE symbol work: 0.045s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 19:50:01,008 | INFO | [TIMER] full-cycle active time: 0.904s (cooldown=10 min)
[HEARTBEAT] 2026-02-27T20:00:00.141235+00:00 cycle=15 equity=95,326.07

Position Summary:
  GE: 43.249709903 shares @ $339.27 | Value: $14,673.33
  UNH: -113.0 shares @ $291.92 | Value: $-32,986.96

Total Market Value: $-18,313.63
2026-02-27 20:00:00,241 | INFO | [EXPOSURE_CAP] ok: gross=0.500 (cap=1.000) | net=-0.192 (cap=0.800)
2026-02-27 20:00:00,363 | INFO | [TIMER] UNH symbol work: 0.121s
2026-02-27 20:00:00,422 | INFO | [TIMER] GE symbol work: 0.058s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


Saved equity curve → /content/drive/MyDrive/AlpacaPaper/results/2026-02-27/equity_curve.png
Updated latest copy → /content/drive/MyDrive/AlpacaPaper/results/latest/equity_curve.png
2026-02-27 20:00:01,197 | INFO | Perf: cum_return=-0.07% | sharpe=-7.78 | maxDD=-0.18%
2026-02-27 20:00:01,200 | INFO | [TIMER] full-cycle active time: 1.100s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T20:10:00.134002+00:00 cycle=16 equity=95,338.77

Position Summary:
  GE: 43.249709903 shares @ $340.63 | Value: $14,732.36
  UNH: -113.0 shares @ $292.33 | Value: $-33,033.29

Total Market Value: $-18,300.93
2026-02-27 20:10:00,292 | INFO | [EXPOSURE_CAP] ok: gross=0.501 (cap=1.000) | net=-0.192 (cap=0.800)
2026-02-27 20:10:00,461 | INFO | [TIMER] UNH symbol work: 0.168s
2026-02-27 20:10:00,564 | INFO | [TIMER] GE symbol work: 0.102s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 20:10:00,966 | INFO | [TIMER] full-cycle active time: 0.867s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T20:20:00.146818+00:00 cycle=17 equity=95,232.12

Position Summary:
  GE: 43.249709903 shares @ $340.22 | Value: $14,714.42
  UNH: -113.0 shares @ $293.12 | Value: $-33,122.00

Total Market Value: $-18,407.58
2026-02-27 20:20:00,274 | INFO | [EXPOSURE_CAP] ok: gross=0.502 (cap=1.000) | net=-0.193 (cap=0.800)
2026-02-27 20:20:00,450 | INFO | [UNH] predict() ok → raw=-0.4365 target_w=-0.1746 conf=0.436
2026-02-27 20:20:00,613 | INFO | [UNH] Submitted sell qty=56.0
2026-02-27 20:20:00,644 | INFO | [TIMER] UNH symbol work: 0.369s


  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 20:20:00,725 | INFO | [GE] predict() ok → raw=0.1926 target_w=0.0770 conf=0.193
2026-02-27 20:20:00,829 | INFO | [GE] Submitted buy notional=$7336.52
2026-02-27 20:20:00,859 | INFO | [TIMER] GE symbol work: 0.213s


  return datetime.utcnow().replace(tzinfo=utc)


Saved equity curve → /content/drive/MyDrive/AlpacaPaper/results/2026-02-27/equity_curve.png
Updated latest copy → /content/drive/MyDrive/AlpacaPaper/results/latest/equity_curve.png
2026-02-27 20:20:01,578 | INFO | Perf: cum_return=-0.16% | sharpe=-15.81 | maxDD=-0.20%
2026-02-27 20:20:01,580 | INFO | [TIMER] full-cycle active time: 1.477s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T20:30:00.167084+00:00 cycle=18 equity=95,196.08

Position Summary:
  GE: 64.809320823 shares @ $339.60 | Value: $22,009.25
  UNH: -169.0 shares @ $293.07 | Value: $-49,528.83

Total Market Value: $-27,519.58
2026-02-27 20:30:00,309 | INFO | [EXPOSURE_CAP] ok: gross=0.751 (cap=1.000) | net=-0.289 (cap=0.800)
2026-02-27 20:30:00,667 | INFO | [TIMER] UNH symbol work: 0.356s
2026-02-27 20:30:00,788 | INFO | [TIMER] GE symbol work: 0.121s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 20:30:01,118 | INFO | [TIMER] full-cycle active time: 1.014s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T20:40:00.150176+00:00 cycle=19 equity=95,161.92

Position Summary:
  GE: 64.809320823 shares @ $340.09 | Value: $22,041.00
  UNH: -169.0 shares @ $293.46 | Value: $-49,594.74

Total Market Value: $-27,553.74
2026-02-27 20:40:00,261 | INFO | [EXPOSURE_CAP] ok: gross=0.753 (cap=1.000) | net=-0.290 (cap=0.800)
2026-02-27 20:40:00,789 | INFO | [TIMER] UNH symbol work: 0.527s
2026-02-27 20:40:00,888 | INFO | [TIMER] GE symbol work: 0.094s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


Saved equity curve → /content/drive/MyDrive/AlpacaPaper/results/2026-02-27/equity_curve.png
Updated latest copy → /content/drive/MyDrive/AlpacaPaper/results/latest/equity_curve.png
2026-02-27 20:40:01,726 | INFO | Perf: cum_return=-0.24% | sharpe=-21.92 | maxDD=-0.28%
2026-02-27 20:40:01,727 | INFO | [TIMER] full-cycle active time: 1.624s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


[HEARTBEAT] 2026-02-27T20:50:00.187139+00:00 cycle=20 equity=95,174.72

Position Summary:
  GE: 64.809320823 shares @ $341.80 | Value: $22,151.83
  UNH: -169.0 shares @ $294.05 | Value: $-49,694.45

Total Market Value: $-27,542.62
2026-02-27 20:50:00,310 | INFO | [EXPOSURE_CAP] ok: gross=0.755 (cap=1.000) | net=-0.289 (cap=0.800)
2026-02-27 20:50:00,692 | INFO | [TIMER] UNH symbol work: 0.379s
2026-02-27 20:50:00,854 | INFO | [TIMER] GE symbol work: 0.159s


  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 20:50:01,225 | INFO | [TIMER] full-cycle active time: 1.097s (cooldown=10 min)


  return datetime.utcnow().replace(tzinfo=utc)


2026-02-27 21:00:00,145 | INFO | Market closed. Next open in 235799s.
2026-02-27 21:00:00,149 | INFO | IN_COLAB and COLAB_EXIT_WHEN_CLOSED=1 -> exiting cleanly instead of long sleep.


  return datetime.utcnow().replace(tzinfo=utc)


Saved equity curve → /content/drive/MyDrive/AlpacaPaper/results/2026-02-27/equity_curve.png
Updated latest copy → /content/drive/MyDrive/AlpacaPaper/results/latest/equity_curve.png
2026-02-27 21:00:00,640 | INFO | Live loop exited cleanly.


In [None]:
from pathlib import Path
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from alpaca.data.timeframe import TimeFrame

RESULTS_DIR = Path(globals().get("RESULTS_DIR", os.getenv("RESULTS_DIR", ".")))
LATEST_DIR  = Path(globals().get("LATEST_DIR",  os.getenv("LATEST_DIR",  str(RESULTS_DIR))))

eq_candidates = [
    globals().get("EQUITY_LOG_CSV"),
    globals().get("EQUITY_LOG_LATEST"),
    RESULTS_DIR / "equity_log.csv",
    LATEST_DIR / "equity_log.csv",
]

def _first_existing(paths):
    for p in paths:
        if p:
            p = Path(p)
            if p.exists() and p.is_file():
                return p
    return None

eq_path = _first_existing(eq_candidates)
if eq_path is None:
    all_eq = list(RESULTS_DIR.glob("equity_log*.csv")) + list(LATEST_DIR.glob("equity_log*.csv"))
    eq_path = max(all_eq, key=lambda p: p.stat().st_mtime, default=None)

if eq_path and eq_path.exists():
    print(f"[equity source] {eq_path}")
    try:
        eq = pd.read_csv(eq_path, parse_dates=["datetime_utc"]).sort_values("datetime_utc")
        if not eq.empty:
            r = eq["equity"].pct_change().dropna()
            sharpe_h = (r.mean() / (r.std() + 1e-12)) * np.sqrt(252 * 6.5) if len(r) else float("nan")
            print(
                f"\nEquity summary — last: ${eq['equity'].iloc[-1]:,.2f} | "
                f"n={len(eq)} pts | Sharpe(h): {sharpe_h:.2f} | src={eq_path}"
            )
        else:
            print(f"No rows in equity log: {eq_path}")
    except Exception as e:
        print(f"Could not summarize equity ({eq_path}): {e}")
else:
    print("No equity_log*.csv found in RESULTS_DIR/LATEST_DIR.")

rs_candidates = [
    RESULTS_DIR / "run_summary.csv",
    LATEST_DIR / "run_summary.csv",
]

rs_path = _first_existing(rs_candidates)
if rs_path and rs_path.exists():
    print(f"\n[run_summary source] {rs_path}")
    try:
        rs = pd.read_csv(rs_path)
        if not rs.empty:
            last = rs.iloc[-1]
            print(
                "Run summary — "
                f"last dt={last.get('datetime_utc', '')} | "
                f"equity={last.get('equity', np.nan)} | "
                f"cash={last.get('cash', np.nan)} | "
                f"gross_exposure_pct={last.get('gross_exposure_pct', np.nan)} | "
                f"net_exposure_pct={last.get('net_exposure_pct', np.nan)} | "
                f"error={str(last.get('error',''))[:120]}"
            )
            print(f"Rows (decision ticks) logged: {len(rs)}")
        else:
            print("run_summary.csv exists but is empty.")
    except Exception as e:
        print(f"Could not summarize run_summary ({rs_path}): {e}")
else:
    print("\nNo run_summary.csv found yet (it is created after the first live cycle completes).")

def _resolve_tickers():
    g = globals().get("TICKERS", None)
    # Base tickers from globals or env
    if isinstance(g, (list, tuple, set)):
        base = [str(x).upper() for x in g]
    else:
        env_val = os.getenv("TICKERS", (g if isinstance(g, str) else ""))
        base = [t.strip().upper() for t in str(env_val).split(",") if t.strip()]

    discovered = [
        p.stem.replace("trade_log_", "").upper()
        for p in list(RESULTS_DIR.glob("trade_log_*.csv")) + list(LATEST_DIR.glob("trade_log_*.csv"))
    ]

    # Filter out the master aggregate file so it doesn't become a "ticker"
    discovered = [t for t in discovered if t != "MASTER"]

    ticks = sorted(set(base) | set(discovered))
    return ticks if ticks else ["UNH", "GE"]

tickers_to_report = _resolve_tickers()
print("Tickers to report:", tickers_to_report)

print("\nTrade Summary:")
for ticker in tickers_to_report:
    trade_candidates = [
        RESULTS_DIR / f"trade_log_{ticker}.csv",
        LATEST_DIR / f"trade_log_{ticker}.csv",
    ]
    log_path = _first_existing(trade_candidates)
    if not log_path:
        #Tolerate Drive duplicates like "trade_log_XYZ (1).csv"
        any_logs = list(RESULTS_DIR.glob(f"trade_log_{ticker}*.csv")) + \
                   list(LATEST_DIR.glob(f"trade_log_{ticker}*.csv"))
        log_path = max(any_logs, key=lambda p: p.stat().st_mtime, default=None)

    if not log_path or not log_path.exists():
        print(f"{ticker}: no trades logged yet.")
        continue

    try:
        df = pd.read_csv(
            log_path,
            on_bad_lines="skip",
            engine="python",
            parse_dates=["log_time", "bar_time"],
        )
        key = "signal" if "signal" in df.columns else ("action" if "action" in df.columns else None)
        if key:
            counts = df[key].value_counts(dropna=False).to_dict()
            print(f"{ticker}: {counts} | src={log_path.name}")
        else:
            print(f"{ticker}: log present but missing 'signal'/'action' columns. src={log_path.name}")

        if "confidence" in df.columns and df["confidence"].notna().any():
            plt.figure(figsize=(8, 3.5))
            df["confidence"].dropna().plot(kind="hist", bins=10, edgecolor="black")
            plt.title(f"{ticker} - Confidence Distribution")
            plt.xlabel("confidence")
            plt.tight_layout()
            plt.show()

        for col in ["weight", "raw_action"]:
            if col in df.columns and df[col].notna().any():
                s = df[col].dropna()
                print(
                    f"{ticker} {col}: mean={s.mean():.3f}, std={s.std():.3f}, "
                    f"min={s.min():.3f}, max={s.max():.3f}"
                )
    except Exception as e:
        print(f"{ticker}: could not summarize trades ({log_path}): {e}")

try:
    api = api if "api" in globals() else init_alpaca()
    positions = api.list_positions()
    total_market_value = 0.0
    print("\nPosition Summary:")
    for p in positions:
        mv = float(p.market_value)
        total_market_value += mv
        print(f"  {p.symbol}: {p.qty} shares @ ${float(p.current_price):.2f} | Value: ${mv:,.2f}")
    print(f"\nTotal Market Value: ${total_market_value:,.2f}")
except Exception as e:
    print(f"Could not summarize positions: {e}")

def count_filled_orders_since(api, symbol: str, days: int = 14) -> int:
    # alpaca-py expects a datetime (not ISO string) for "after"
    after_dt = datetime.now(timezone.utc) - timedelta(days=days)

    req = GetOrdersRequest(
        status=OrderStatus.ALL,   # get everything, then filter
        after=after_dt,
        nested=True
    )

    orders = api.get_orders(req)  #alpaca-py

    sym = str(symbol).upper()
    filled_statuses = {"filled", "partially_filled"}

    return sum(
        1 for o in (orders or [])
        if str(getattr(o, "symbol", "")).upper() == sym
        and str(getattr(o, "status", "")).lower() in filled_statuses
    )

try:
    api_chk = api if "api" in globals() else init_alpaca()
    for sym in tickers_to_report:
        n = count_filled_orders_since(api_chk, sym, days=14)
        print(f"{sym}: {n} filled trades in last 14 days")
except Exception as e:
    print(f"Could not fetch filled orders: {e}")


In [None]:
#--- Export locally & download to your computer (Colab) ---
from pathlib import Path
from datetime import datetime, timezone
from google.colab import files   #<-- NEW: for browser download
import shutil, time, pandas as pd

#Drive root (same as before, to read your results)
ROOT = Path("/content/drive/MyDrive/AlpacaPaper")
TODAY = datetime.now(timezone.utc).strftime("%Y-%m-%d")

#Original sources in Drive (unchanged)
SRC_RESULTS = ROOT / "results" / TODAY         #e.g., /.../results/2025-10-13
SRC_EXPORT  = ROOT / "results_export" / TODAY  #rescue export folder (if used)

#=== CHANGE: write/export to LOCAL staging (in Colab VM), not Drive ===
DEST = Path("/content") / "exports" / f"{TODAY}_export"
DEST.mkdir(parents=True, exist_ok=True)

def copy_all(src_dir, dest_dir):
    if src_dir.exists():
        for p in src_dir.glob("*"):
            if p.is_file():
                shutil.copy2(p, dest_dir / p.name)
                print("Copied:", p.name, "from", src_dir.name)
    else:
        print("Missing source:", src_dir)

#Copy from both possible sources into local /content/exports/<today>_export
copy_all(SRC_RESULTS, DEST)
copy_all(SRC_EXPORT, DEST)
rs_local = DEST / "run_summary.csv"
print("Included run_summary.csv in export:", rs_local.exists(), "| path:", rs_local)

#Build/refresh trade_log_master.csv from per-symbol logs (in LOCAL DEST)
sym_logs = list(DEST.glob("trade_log_*.csv"))
if sym_logs:
    frames = []
    for p in sym_logs:
        try:
            df = pd.read_csv(p)
            df["symbol_file"] = p.stem.replace("trade_log_", "")
            frames.append(df)
        except Exception as e:
            print("Skip", p.name, "->", e)
    if frames:
        master = pd.concat(frames, ignore_index=True, sort=False)
        master_path = DEST / "trade_log_master.csv"
        master.to_csv(master_path, index=False)
        print("Wrote:", master_path)

#Zip LOCALLY under /content and trigger a browser download
zip_base = Path("/content") / f"results_{TODAY}_{int(time.time())}"
archive_path = shutil.make_archive(str(zip_base), "zip", DEST)
archive_path = str(Path(archive_path))  #ensure string for files.download

print("ZIP ->", archive_path)

#OPTIONAL: also keep a copy in Drive (uncomment if wanted)
#shutil.copy2(archive_path, ROOT / "results" / Path(archive_path).name)

#Prompt download to your computer
files.download(archive_path)

#Show what's in the LOCAL export folder
print("\nLocal export now contains:")
for p in sorted(DEST.iterdir()):
    print(" -", p.name)
