In [None]:
# =========================== QuantConnect: External JSON Signal Consumer ===========================
# Put this in main.py

# IMPORTANT: wildcard import so the linter recognizes QCAlgorithm methods
from AlgorithmImports import *   # noqa

from collections import deque
import json, math
from datetime import datetime, timezone, timedelta


class ExternalSignalConsumer(QCAlgorithm):

    def Initialize(self):
        # Backtest window (omit SetEndDate in live/paper)
        self.SetStartDate(2024, 1, 1)
        self.SetEndDate(2024, 12, 31)
        self.SetCash(100000)

        # ---- Parameters (Project → Parameters) ----
        gp = self.GetParameter
        self.mode          = (gp("Mode") or "json-live").strip().lower()
        self.json_url      = (gp("SignalsUrl") or "").strip()   # e.g. https://gist.githubusercontent.com/.../raw/live_signals.json
        self.symbols_param = (gp("Symbols") or "GE").strip()
        self.poll_minutes  = int(gp("PollingMinutes") or 2)

        # Sizing controls
        self.sizing_mode = (gp("SizingMode") or "threshold").strip().lower()   # "linear" | "threshold"
        self.w_cap       = float(gp("WeightCap") or 0.60)                      # max per-name weight
        self.conf_floor  = float(gp("ConfidenceFloor") or 0.55)                # threshold mode only

        if not self.json_url:
            raise ValueError("Parameter 'SignalsUrl' not provided. Set it in Project → Parameters.")

        # Broker/slippage
        self.SetSecurityInitializer(lambda s: s.SetSlippageModel(ConstantSlippageModel(0.01)))
        self.SetBrokerageModel(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)

        # Risk/execution guards (long-only; keep cash buffer)
        self.UniverseSettings.Leverage = 1.0
        self.Settings.FreePortfolioValuePercentage = 0.05
        self.Settings.MinimumOrderMarginPortfolioPercentage = 0.0
        self.Settings.RebalancePortfolioOnSecurityChanges = False

        # Symbols (minute resolution to match a 1m producer)
        self.symbols = {}
        requested = [s.strip().upper() for s in self.symbols_param.split(",") if s.strip()]
        if requested and requested != ["AUTO"]:
            for tkr in requested:
                self.symbols[tkr] = self.AddEquity(tkr, Resolution.MINUTE).Symbol
        self._auto_symbols = (requested == ["AUTO"])
        self._added_from_feed = set()

        # Poll schedule
        self.Schedule.On(
            self.DateRules.EveryDay(),
            self.TimeRules.Every(timedelta(minutes=self.poll_minutes)),
            self.PollJsonAndTrade
        )

        self.SetWarmup(5, Resolution.MINUTE)

        # Risk plots (SPY)
        self.spy = self.AddEquity("SPY", Resolution.DAILY).Symbol
        self.Schedule.On(
            self.DateRules.EveryDay(self.spy),
            self.TimeRules.AfterMarketClose(self.spy, 1),
            self.PushDailyMetrics
        )

        # State
        self.fill_count = 0
        self.last_equity = float(self.Portfolio.TotalPortfolioValue)
        self.daily_returns = []
        self.valid_until_epoch = None
        self.last_poll_minute = None
        self.last_logged_sig = {}
        self._order_ids, self._fills = set(), 0
        self._p_rets, self._b_rets = deque(maxlen=63), deque(maxlen=63)
        self._last_bench_close = None
        self._last_eod_date = None
        self.SetBenchmark(self.spy)

        self.Log(
            f"Params OK | mode={self.mode} poll={self.poll_minutes}m sizing={self.sizing_mode} "
            f"cap={self.w_cap:.2f} floor={self.conf_floor:.2f} url={self.json_url[:60]}..."
        )

    # ======================= Poll + Trade =======================
    def PollJsonAndTrade(self):
        if self.IsWarmingUp:
            return

        # throttle duplicate triggers
        cur_minute = int(self.Time.timestamp()) // 60
        if self.last_poll_minute == cur_minute:
            return
        self.last_poll_minute = cur_minute

        # Cache-buster to avoid CDN caching
        base = self.json_url
        sep  = '&' if '?' in base else '?'
        url  = f"{base}{sep}t={int(self.UtcTime.timestamp())}"
        raw  = self.Download(url)

        if not raw:
            if cur_minute % 10 == 0:
                self.Log(f"[WARN] Empty response from endpoint: {base[:80]}...")
            return

        # Defensive: ensure it’s JSON (not HTML/404/rate-limit)
        s0 = raw.lstrip()
        if not (s0.startswith("{") or s0.startswith("[")):
            snippet = raw[:200].replace("\n", " ").replace("\r", " ")
            self.Log(f"[WARN] Non-JSON response ({len(raw)} bytes) from {base[:80]}... sample: {snippet}")
            return

        try:
            data = json.loads(raw)
        except Exception as e:
            self.Log(f"[WARN] JSON parse error: {e}; first bytes: {raw[:120]!r}")
            return

        # Freshness guard
        vu_aware = self.ParseUtcAny(data.get("valid_until_utc") or data.get("valid_until"))
        self.valid_until_epoch = self.ToUtcEpoch(vu_aware) if vu_aware else None
        if self.mode == "json-live" and self.valid_until_epoch:
            if self.ToUtcEpoch(self.UtcTime) > self.valid_until_epoch:
                if cur_minute % 10 == 0:
                    self.Log(f"[STALE] now>{data.get('valid_until_utc')}, skipping")
                return

        models = data.get("models") or []
        by_sym = {}
        for m in models:
            sym = (m.get("symbol") or "").upper()
            if sym:
                by_sym[sym] = m

        # If Symbols="AUTO", subscribe new names on the fly
        if self._auto_symbols:
            for tkr in by_sym.keys():
                if tkr not in self.symbols and tkr not in self._added_from_feed:
                    self.symbols[tkr] = self.AddEquity(tkr, Resolution.MINUTE).Symbol
                    self._added_from_feed.add(tkr)
                    self.Log(f"[AUTO] Subscribed {tkr} from feed")

        if not self.symbols:
            self.Log("[WARN] No symbols to act on yet.")
            return

        # Build weights
        weights, cnt_buy, cnt_sell, cnt_hold = {}, 0, 0, 0
        for tkr, sym in self.symbols.items():
            m = by_sym.get(tkr)
            if not m or m.get("error"):
                cnt_hold += 1
                continue

            sec = self.Securities[sym]
            if not sec.HasData or sec.Price <= 0 or not sec.Exchange.ExchangeOpen:
                cnt_hold += 1
                continue

            signal = (m.get("signal") or "").upper()
            conf = m.get("confidence")
            if conf is None:
                act = m.get("action")
                conf = abs(float(act)) if act is not None else float(m.get("p_long") or 0.0)
            conf = max(0.0, min(1.0, float(conf or 0.0)))

            if signal == "BUY":
                if self.sizing_mode == "linear":
                    w = self.w_cap * conf
                else:
                    if conf >= self.conf_floor:
                        w = self.w_cap * (conf - self.conf_floor) / (1.0 - self.conf_floor)
                    else:
                        w = 0.0
                weights[sym] = max(0.0, min(self.w_cap, w)); cnt_buy += 1
            elif signal == "SELL":
                weights[sym] = 0.0; cnt_sell += 1
            else:
                weights[sym] = 0.0; cnt_hold += 1

            if self.last_logged_sig.get(tkr) != signal:
                self.last_logged_sig[tkr] = signal
                self.Log(f"{tkr}: {signal} (conf={conf:.2f})")

        if not weights:
            return

        # Normalize vs cash buffer
        cap = 1.0 - self.Settings.FreePortfolioValuePercentage
        gross = sum(max(0.0, w) for w in weights.values())
        if gross > cap and gross > 0:
            scale = cap / gross
            for s in list(weights):
                weights[s] *= scale

        self.SetHoldings([PortfolioTarget(sym, w) for sym, w in weights.items()])
        self.Log(f"Rebalance: buys={cnt_buy}, sells={cnt_sell}, holds={cnt_hold}, gross={sum(max(0.0,w) for w in weights.values()):.2f}")

    # ======================= Order events =======================
    def OnOrderEvent(self, orderEvent: OrderEvent):
        self._order_ids.add(orderEvent.OrderId)
        if orderEvent.Status == OrderStatus.Filled:
            self._fills += 1
        if orderEvent.Status != OrderStatus.Filled:
            return
        self.fill_count += 1
        self.Plot("Execs", "Fills", self.fill_count)
        closed = list(self.TradeBuilder.ClosedTrades)
        wins   = sum(1 for t in closed if t.ProfitLoss >  0)
        losses = sum(1 for t in closed if t.ProfitLoss <= 0)
        self.Plot("Execs", "ClosedWins",  wins)
        self.Plot("Execs", "ClosedLosses", losses)

    # Use the new signature with a symbol to avoid the deprecation warning
    def OnEndOfDay(self, symbol):
        if symbol != self.spy:
            return

        d = self.Time.date()
        if getattr(self, "_last_eod_date", None) == d:
            return
        self._last_eod_date = d

        cur = float(self.Portfolio.TotalPortfolioValue)
        r = 0.0
        if getattr(self, "last_equity", 0.0) > 0:
            r = (cur / self.last_equity) - 1.0
            self.daily_returns.append(r)
            if len(self.daily_returns) > 400:
                self.daily_returns = self.daily_returns[-300:]
        self.last_equity = cur

        b_close = float(self.Securities[self.spy].Close)
        b_ret = 0.0 if getattr(self, "_last_bench_close", None) in (None, 0) else (b_close / self._last_bench_close - 1.0)
        self._last_bench_close = b_close

        self._p_rets.append(r); self._b_rets.append(b_ret)
        if len(self._b_rets) >= 20:
            mpr = sum(self._p_rets) / len(self._p_rets)
            mbr = sum(self._b_rets) / len(self._b_rets)
            cov = sum((pr - mpr) * (br - mbr) for pr, br in zip(self._p_rets, self._b_rets)) / max(len(self._b_rets) - 1, 1)
            var = sum((br - mbr) ** 2 for br in self._b_rets) / max(len(self._b_rets) - 1, 1)
            beta = cov / var if var > 0 else 0.0
            self.Plot("Risk", "Beta(63d)", beta)

    # ======================= Daily risk summary =======================
    def PushDailyMetrics(self):
        n = len(self.daily_returns)
        if n < 10:
            return
        mean = sum(self.daily_returns) / n
        var  = sum((x - mean) ** 2 for x in self.daily_returns) / max(n - 1, 1)
        sd   = math.sqrt(max(var, 1e-12))
        sharpe = (mean / sd) * math.sqrt(252.0) if sd > 0 else 0.0
        z = (mean / sd) * math.sqrt(n) if sd > 0 else 0.0
        psr = 0.5 * (1.0 + math.erf(z / math.sqrt(2.0)))
        self.Plot("Risk", "Sharpe(rolling)", sharpe)
        self.Plot("Risk", "PSR_vs0", psr)
        closed = list(self.TradeBuilder.ClosedTrades)
        if closed:
            wins = sum(1 for t in closed if t.ProfitLoss > 0)
            self.Plot("Risk", "WinRate", wins / float(len(closed)))

    def OnEndOfAlgorithm(self):
        closed = list(self.TradeBuilder.ClosedTrades)
        wins = sum(1 for t in closed if t.ProfitLoss > 0)
        win_rate = (wins / len(closed)) if closed else float('nan')
        self.Log(
            f"RUN_SUMMARY | orders={len(getattr(self, '_order_ids', []))} | "
            f"fills={getattr(self, '_fills', 0)} | closed_trades={len(closed)} | "
            f"win_rate={win_rate:.2%}"
        )

    # ======================= Helpers =======================
    def ParseUtcAny(self, s):
        if not s: return None
        s = str(s)
        try:
            s2 = s.replace("Z", "+00:00")
            dt = datetime.fromisoformat(s2)
            return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
        except Exception:
            pass
        try:
            return datetime.fromtimestamp(float(s), tz=timezone.utc)
        except Exception:
            return None

    def ToUtcEpoch(self, dt):
        if dt is None: return None
        if isinstance(dt, datetime):
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone.utc)
            return dt.timestamp()
        try:
            return float(dt)
        except Exception:
            return None
