"""
Live Trading ‚Äî Overview
=======================

Runs an ML-driven, double-barrier strategy on cTrader OpenAPI using saved pipelines.
For each symbol and timeframe, it fetches fresh OHLCV, rebuilds the same core features,
predicts class labels **{0,1,2}** (0=down, 1=flat, 2=up), maps to signals **{‚àí1,0,+1}**,
and executes netting-friendly market orders.

How it works
------------
1) Bootstraps cTrader connection and verifies symbols are available.
2) Loads per-symbol pipelines from `models/<tf>_models/{SYMBOL}_{TF}_best_model.pkl`.
3) On each new bar:
   - Recomputes features over the last `N_BARS` candles.
   - Predicts the next `N_FORWARD` steps; persists them to `live_signals.db`.
   - Trades only the **first** signal: +1=BUY, ‚àí1=SELL, 0=FLAT.
4) Position management:
   - Before opening a new direction, closes the opposite side by sending an offsetting market order
     (works on **netting** accounts; on **hedging** accounts this will open an offset hedge).
   - Optional `CLOSE_ON_FLAT=True` will close any open position on a flat signal.

Key configuration
-----------------
- SYMBOLS: list of symbols (e.g., ["EURUSD","GBPUSD","AUDUSD"])
- TF: timeframe string ("M1","M5","M15","M30","H1","H4","D1")
- N_BARS: history length to compute features
- N_FORWARD: how many future steps to predict (only the first is traded)
- SLEEP_SEC: loop cadence; executions are gated to **new bars**
- CLOSE_ON_FLAT: close open positions when the signal is 0
- LOTS / DEFAULT_LOTS: order sizing; 1 lot = 10,000,000 native units
- SL_PIPS / TP_PIPS: optional protective distances for market orders (pips)
- MODEL_FOLDER: where trained pipelines are loaded from
- LOG_FILE: path to the runtime log

Persistence & logging
---------------------
- Signals are stored in SQLite (`live_signals.db`, table `signals`) with a uniqueness guard
  on (symbol, prediction, timestamp).
- Runtime activity is logged to `logs/live_trader.log`.

Notes & caveats
---------------
- The ‚Äúfuture timestamp‚Äù for saved predictions assumes an hourly offset; if `TF` ‚â† "H1",
  adjust the offset logic to match your timeframe.
- On hedging accounts, "close by opposite market order" **hedges** instead of closing.
  If you require true closure, add a dedicated Close Position RPC and call it in `_close_all()`.
- Pip / price math and symbol digits handling are implemented in `ctrader_client.py`.
- To pick up new models, restart the process so pipelines are reloaded from disk.
"""


In [1]:
import sqlite3, pandas as pd
from pathlib import Path

db_path = Path("../live_signals.db")
with sqlite3.connect(db_path) as con:
    cur = con.cursor()
    try:
        rows = cur.execute("SELECT COUNT(*) FROM signals").fetchone()[0]
        print("rows:", rows)
        df = pd.read_sql_query(
            "SELECT symbol, prediction, timestamp FROM signals ORDER BY id DESC LIMIT 10",
            con
        )
    except sqlite3.OperationalError as e:
        print("Table not found yet:", e)
        df = pd.DataFrame(columns=["symbol", "prediction", "timestamp"])

df


Table not found yet: no such table: signals


Unnamed: 0,symbol,prediction,timestamp


In [None]:
# ============================================
# Live Trading ‚Äî cTrader + saved ML pipelines
# ============================================

import os, time, logging, sqlite3, threading, warnings
from datetime import datetime, timedelta
from pathlib import Path

import numpy as np
import pandas as pd
import joblib

warnings.filterwarnings("ignore")

# --- our libs
from feature_engineering import add_core_features
from ctrader_client import (
    ensure_client_ready, get_ohlc_df, place_order, get_open_positions,
    symbol_name_to_id, wait_for_deferred, close_position,   # ‚Üê add
    client as CTR_CLIENT, ACCOUNT_ID as CTR_ACCOUNT_ID
)



# -------------------
# Config
# -------------------
SYMBOLS   = ["EURUSD", "GBPUSD", "AUDUSD"]
TF        = "H1"              # "M1","M5","M15","M30","H1","H4","D1"
N_BARS    = 2500              # history window to compute features
N_FORWARD = 3                 # save next N predictions
SLEEP_SEC = 60                # loop period (sec); we gate executions to new bar
CLOSE_ON_FLAT = True          # close any open position when signal == 0

# --- position policy ---
ALLOW_PYRAMIDING = False   # if False, never add to an existing same-side position
MAX_POS_PER_SIDE = 1       # only used if ALLOW_PYRAMIDING=True



TF_DELTA = {
    "M1":  pd.Timedelta(minutes=1),
    "M5":  pd.Timedelta(minutes=5),
    "M15": pd.Timedelta(minutes=15),
    "M30": pd.Timedelta(minutes=30),
    "H1":  pd.Timedelta(hours=1),
    "H4":  pd.Timedelta(hours=4),
    "D1":  pd.Timedelta(days=1),
}.get(TF, pd.Timedelta(hours=1))


# lots per symbol (float lots ‚Üí converted to native units in send)
LOTS = {"EURUSD": 0.10, "GBPUSD": 0.10, "AUDUSD": 0.10}
DEFAULT_LOTS = 0.10

# SL/TP for MARKET orders (in pips, optional; None to skip)
SL_PIPS = None
TP_PIPS = None

MODEL_FOLDER = Path(f"models/{TF.lower()}_models")
LOG_FILE     = Path("logs/live_trader.log")
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)

logging.basicConfig(
    filename=str(LOG_FILE),
    level=logging.INFO,
    format="%(asctime)s %(levelname)s: %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

def log(msg):
    print(msg, flush=True)
    logging.info(msg)

# -------------------
# Helpers
# -------------------
def load_model_bundle(path: Path):
    """Supports {'pipeline','features'} or legacy {'model','scaler','features'}."""
    b = joblib.load(path)
    if isinstance(b, dict) and "pipeline" in b:
        return dict(kind="pipeline", pipeline=b["pipeline"], features=b.get("features"))
    if isinstance(b, dict) and "model" in b and "scaler" in b:
        return dict(kind="legacy", model=b["model"], scaler=b["scaler"], features=b.get("features"))
    # Also allow direct Pipeline (back-compat)
    if hasattr(b, "predict"):
        return dict(kind="pipeline", pipeline=b, features=None)
    raise ValueError(f"Unrecognized bundle at {path}")

def to_signals(preds: np.ndarray) -> np.ndarray:
    """Map {0,1,2} ‚Üí {-1,0,+1}; pass-through if already in -1/0/+1."""
    preds = np.asarray(preds)
    uniq = set(np.unique(preds))
    return preds - 1 if uniq.issubset({0,1,2}) else preds.astype(int)

def save_signal_to_db(symbol: str, prediction: int, timestamp: str):
    conn = sqlite3.connect("live_signals.db")
    c = conn.cursor()
    c.execute("""
        CREATE TABLE IF NOT EXISTS signals (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            symbol TEXT, prediction INTEGER, timestamp TEXT,
            UNIQUE(symbol, prediction, timestamp)
        )
    """)
    try:
        c.execute("INSERT OR IGNORE INTO signals (symbol,prediction,timestamp) VALUES (?,?,?)",
                  (symbol, int(prediction), timestamp))
        conn.commit()
    except Exception as e:
        log(f"[DB] insert failed: {e}")
    finally:
        conn.close()

def lots_to_volume_units(lots: float) -> int:
    """1 lot = 10,000,000 native units in cTrader."""
    return int(round(lots * 10_000_000))

def most_recent_bar_time(df: pd.DataFrame) -> pd.Timestamp | None:
    return None if df.empty else pd.to_datetime(df.index[-1])

# -------------------
# Trader
# -------------------
class LiveTrader:
    def __init__(self, symbol: str, lots: float, model_path: Path):
        self.symbol = symbol.upper()
        self.lots = float(lots)
        self.model_path = model_path
        self.bundle = None
        self.feature_cols = None
        self.last_bar_ts: pd.Timestamp | None = None

    def load(self):
        if not self.model_path.exists():
            raise FileNotFoundError(f"No model file for {self.symbol}: {self.model_path}")
        self.bundle = load_model_bundle(self.model_path)
        self.feature_cols = self.bundle.get("features")
        log(f"‚úÖ {self.symbol}: loaded model ‚Üí {self.model_path.name}")

    def predict_multi(self, n_forward=N_FORWARD) -> tuple[np.ndarray, pd.Series]:
        """Return (preds[-N_FORWARD:], close_series aligned)."""
        df = get_ohlc_df(self.symbol, tf=TF, n=N_BARS)
        if df.empty:
            raise RuntimeError(f"{self.symbol}: no bars")
        df_feat = add_core_features(df.copy())
        # choose columns
        if self.feature_cols:
            use_cols = [c for c in self.feature_cols if c in df_feat.columns]
        else:
            core_cols = [
                "sma_20","ema_20","kama_10","rsi_14","macd_diff",
                "atr_14","obv","rolling_std_20","spread","fill","amplitude",
                "autocorr_1","autocorr_5","autocorr_10","market_regime","stationary_flag"
            ]
            use_cols = [c for c in core_cols if c in df_feat.columns]

        X = df_feat[use_cols].dropna()
        close = df_feat.loc[X.index, "close"]
        if X.empty:
            raise RuntimeError(f"{self.symbol}: no valid feature rows after FE")

        # predict
        if self.bundle["kind"] == "pipeline":
            preds_012 = self.bundle["pipeline"].predict(X)
        else:
            Xs = self.bundle["scaler"].transform(X)
            preds_012 = self.bundle["model"].predict(Xs)
        preds = to_signals(preds_012)

        # return last N_FORWARD signals and the aligned close series
        return preds[-n_forward:], close

    # ---- position administration (simple, netting-friendly) ----
    def _open_positions_for_symbol(self):
        pos = get_open_positions()
        return [p for p in pos if p.get("symbol_name","").upper() == self.symbol]

    def _has_dir(self, positions, side: str) -> bool:
        side = side.lower()
        return any((p.get("direction","").lower() == side) for p in positions)

    def _close_all(self) -> bool:
        """
        Close all open positions for this symbol using ClosePosition
        (works on hedging & netting).
        """
        positions = self._open_positions_for_symbol()
        if not positions:
            return True

        ok = True
        for p in positions:
            try:
                d = close_position(
                    client=CTR_CLIENT,
                    account_id=CTR_ACCOUNT_ID,
                    position_id=p["position_id"],
                    volume_units=p.get("volume_units"),  # full close in native units
                )
                res = wait_for_deferred(d, timeout=30)
                if isinstance(res, dict) and res.get("status") == "failed":
                    ok = False
                    log(f"[CLOSE] Fail pos {p['position_id']} {self.symbol}: {res}")
                else:
                    log(f"[CLOSE] OK pos {p['position_id']} {self.symbol}")
            except Exception as e:
                ok = False
                log(f"[CLOSE] Exception closing {self.symbol} pos {p.get('position_id')}: {e}")
        return ok


    def _ensure_direction(self, want: int):
        """
        Idempotent position policy (no pyramiding by default).
        want ‚àà {-1, 0, +1}: +1=LONG, -1=SHORT, 0=FLAT
        """
        n_buy, n_sell = self._side_counts()
        log(f"{self.symbol}: state n_buy={n_buy}, n_sell={n_sell}, want={want}")

        # 1) FLAT ‚Üí close everything (if configured)
        if want == 0:
            if (n_buy + n_sell) == 0:
                log(f"{self.symbol}: flat signal but already flat ‚Üí no action")
                return
            if CLOSE_ON_FLAT:
                log(f"{self.symbol}: flat signal ‚Üí closing open positions")
                self._close_all()
                self._wait_until_flat(timeout=20)
            else:
                log(f"{self.symbol}: flat signal, CLOSE_ON_FLAT=False ‚Üí holding")
            return

        # 2) LONG
        if want > 0:
            # If any SELL exists, flip: close all then wait to be flat
            if n_sell > 0:
                log(f"{self.symbol}: flipping SELL‚ÜíBUY, closing {n_sell} position(s)")
                self._close_all()
                if not self._wait_until_flat(timeout=20):
                    log(f"{self.symbol}: still not flat after timeout; skip open this bar")
                    return
                n_buy = n_sell = 0  # known flat now

            # Idempotency / pyramiding guard
            if not ALLOW_PYRAMIDING and n_buy > 0:
                log(f"{self.symbol}: already LONG ‚Üí no action")
                return
            if ALLOW_PYRAMIDING and n_buy >= MAX_POS_PER_SIDE:
                log(f"{self.symbol}: LONG cap reached ({n_buy}/{MAX_POS_PER_SIDE}) ‚Üí no action")
                return

            self._place_market("BUY")
            return

        # 3) SHORT (want < 0)
        if n_buy > 0:
            log(f"{self.symbol}: flipping BUY‚ÜíSELL, closing {n_buy} position(s)")
            self._close_all()
            if not self._wait_until_flat(timeout=20):
                log(f"{self.symbol}: still not flat after timeout; skip open this bar")
                return
            n_buy = n_sell = 0

        if not ALLOW_PYRAMIDING and n_sell > 0:
            log(f"{self.symbol}: already SHORT ‚Üí no action")
            return
        if ALLOW_PYRAMIDING and n_sell >= MAX_POS_PER_SIDE:
            log(f"{self.symbol}: SHORT cap reached ({n_sell}/{MAX_POS_PER_SIDE}) ‚Üí no action")
            return

        self._place_market("SELL")



    def step(self):
        """One loop step: only act once per new bar."""
        preds, close = self.predict_multi(n_forward=N_FORWARD)
        last_ts = most_recent_bar_time(close.to_frame())
        if last_ts is None:
            return

        # gate executions to new bar
        if (self.last_bar_ts is not None) and (last_ts <= self.last_bar_ts):
            return
        self.last_bar_ts = last_ts

        # persist N_FORWARD predictions with future timestamps (na√Øve hourly offset)
        for i, p in enumerate(preds):
            future_ts = (last_ts + TF_DELTA * (i + 1)).strftime("%Y-%m-%d %H:%M:%S")
            save_signal_to_db(self.symbol, int(p), timestamp=future_ts)

        # trade first prediction
        sig = int(preds[0])  # {-1,0,+1}
        self._ensure_direction(sig)
        log(f"{self.symbol}: bar={last_ts} | signal={sig} | preds_next={preds.tolist()}")


    def _current_side(self) -> int:
        """
        Return +1 if there's any BUY, -1 if any SELL, else 0.
        (If both exist on a hedging account, we consider it 0 ‚Üí 'mixed'.)
        """
        positions = self._open_positions_for_symbol()
        has_buy  = self._has_dir(positions, "buy")
        has_sell = self._has_dir(positions, "sell")
        if has_buy and not has_sell:
            return +1
        if has_sell and not has_buy:
            return -1
        return 0

    def _side_counts(self) -> tuple[int, int]:
        """Return (#buy, #sell) open positions for this symbol."""
        pos = self._open_positions_for_symbol()
        n_buy  = sum(1 for p in pos if p.get("direction","").lower() == "buy")
        n_sell = sum(1 for p in pos if p.get("direction","").lower() == "sell")
        return n_buy, n_sell

    def _wait_until_flat(self, timeout: float = 20.0, poll: float = 0.5) -> bool:
        """Poll reconcile until there are no open positions for this symbol."""
        t0 = time.time()
        while time.time() - t0 < timeout:
            n_buy, n_sell = self._side_counts()
            if (n_buy + n_sell) == 0:
                return True
            time.sleep(poll)
        return False

    def _place_market(self, side: str) -> bool:
        d = place_order(
            client=CTR_CLIENT, account_id=CTR_ACCOUNT_ID,
            symbol_id=symbol_name_to_id[self.symbol],
            order_type="MARKET", side=side.upper(),
            volume=lots_to_volume_units(self.lots),
            stop_loss=SL_PIPS, take_profit=TP_PIPS,
        )
        res = wait_for_deferred(d, timeout=30)
        failed = isinstance(res, dict) and res.get("status") == "failed"
        if failed:
            log(f"[ORDER] {self.symbol} {side.upper()} failed ‚Üí {res}")
            return False
        log(("üü¢ " if side.lower()=="buy" else "üî¥ ") + f"{self.symbol}: {side.upper()} sent")
        return True




# -------------------
# Main
# -------------------
if __name__ == "__main__":
    # 1) Make sure API is connected and symbols are loaded
    ensure_client_ready(timeout=20)
    log(f"cTrader ready. Symbols loaded: {len(symbol_name_to_id)}")

    # 2) Build traders per symbol and load models
    traders: dict[str, LiveTrader] = {}
    for sym in SYMBOLS:
        lots = LOTS.get(sym, DEFAULT_LOTS)
        model_path = MODEL_FOLDER / f"{sym}_{TF}_best_model.pkl"
        t = LiveTrader(sym, lots, model_path)
        t.load()
        traders[sym] = t

    log("‚ñ∂Ô∏è  Live loop started.")
    try:
        while True:
            for sym, t in traders.items():
                try:
                    t.step()
                except Exception as e:
                    log(f"‚ö†Ô∏è {sym}: step error ‚Üí {e}")
            time.sleep(SLEEP_SEC)
    except KeyboardInterrupt:
        log("‚èπ  Stopped by user.")


[BOOT] host_type = demo
[BOOT] account_id = 44015421
[BOOT] token_len = 43
[BOOT] token_head = LsMFXxrW_0TU
[BOOT] client_id_head = 16209_oM
[AUTH] token has accounts: [41761724, 44015421, 44089601]
[AUTH] OK: ACCOUNT_ID 44015421 is authorized by token.
[DEBUG] Loaded 6266 symbols (e.g. [(1, 'EURUSD'), (3, 'EURJPY'), (18, 'AUDCAD')])
cTrader ready. Symbols loaded: 6266
‚úÖ EURUSD: loaded model ‚Üí EURUSD_H1_best_model.pkl
‚úÖ GBPUSD: loaded model ‚Üí GBPUSD_H1_best_model.pkl
‚úÖ AUDUSD: loaded model ‚Üí AUDUSD_H1_best_model.pkl
‚ñ∂Ô∏è  Live loop started.
EURUSD: flat signal ‚Üí closing open positions
[CLOSE] OK pos 29214464 EURUSD
[CLOSE] OK pos 29262083 EURUSD
[CLOSE] OK pos 29264222 EURUSD
[CLOSE] OK pos 29264223 EURUSD
EURUSD: bar=2025-08-21 21:00:00+00:00 | signal=0 | preds_next=[0, 0, 0]
GBPUSD: bar=2025-08-21 21:00:00+00:00 | signal=0 | preds_next=[0, 0, 0]
AUDUSD: bar=2025-08-21 21:00:00+00:00 | signal=0 | preds_next=[0, 0, 0]
