# Hydra V3 Enhanced - ML Model Training

## Enhancements over V2:
1. **Cross-Sectional Features**: Rank-based features across symbols
2. **Triple-Barrier Labels**: Realistic TP/SL/Time-based targets
3. **Optuna Hyperparameter Optimization**: Per-regime tuning
4. **21 Days of Data**: More robust training
5. **Memory Optimized**: Chunked processing, float32 throughout

In [None]:
!pip install pandas numpy requests pyarrow lightgbm scikit-learn tqdm scipy optuna -q

In [None]:
import pandas as pd
import numpy as np
import requests
import io
import zipfile
import time
import gc
import json
import joblib
import os
from datetime import datetime, timedelta, timezone
from tqdm import tqdm
from typing import List, Tuple, Dict, Optional
import warnings
warnings.filterwarnings('ignore')

from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_squared_error
from scipy import stats

import lightgbm as lgb
import optuna
optuna.logging.set_verbosity(optuna.logging.WARNING)

In [None]:
# Configuration
PAIRS = [
    "BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT",
    "XRPUSDT", "DOGEUSDT", "LTCUSDT", "ADAUSDT",
]

DAYS = 21  # Balanced: more data without memory issues
FEE_PCT = 0.0004  # 0.04% per side
ROUND_TRIP_FEE = 2 * FEE_PCT  # 0.08%

# Triple Barrier Parameters
TP_MULT = 2.0  # Take profit at 2x ATR
SL_MULT = 1.0  # Stop loss at 1x ATR
MAX_HOLDING_BARS = 1200  # 5 minutes max hold (300s / 0.25s per bar)

## 1. Data Fetching (Memory Optimized)

In [None]:
def fetch_aggtrades_day(symbol: str, date: datetime) -> Optional[pd.DataFrame]:
    """Fetch aggregated trades for a single day"""
    date_str = date.strftime("%Y-%m-%d")
    url = (
        f"https://data.binance.vision/data/futures/um/daily/aggTrades/"
        f"{symbol}/{symbol}-aggTrades-{date_str}.zip"
    )
    
    try:
        r = requests.get(url, timeout=30)
        if r.status_code != 200:
            return None
        
        z = zipfile.ZipFile(io.BytesIO(r.content))
        csv_name = z.namelist()[0]
        df = pd.read_csv(z.open(csv_name))
        df["symbol"] = symbol
        return df
    except Exception as e:
        print(f"Error fetching {symbol} {date_str}: {e}")
        return None


def fetch_symbol_data(symbol: str, days: int) -> pd.DataFrame:
    """Fetch all data for a single symbol"""
    all_dfs = []
    end_date = datetime.now(timezone.utc).date() - timedelta(days=1)
    start_date = end_date - timedelta(days=days)
    
    for i in tqdm(range(days), desc=symbol):
        day = start_date + timedelta(days=i)
        df_day = fetch_aggtrades_day(symbol, day)
        if df_day is not None:
            all_dfs.append(df_day)
    
    if not all_dfs:
        return pd.DataFrame()
    
    df = pd.concat(all_dfs, ignore_index=True)
    
    # Clean and convert to efficient types
    df = df.rename(columns={
        "transact_time": "timestamp",
        "is_buyer_maker": "is_sell"
    })
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
    df["price"] = df["price"].astype("float32")
    df["quantity"] = df["quantity"].astype("float32")
    df["is_sell"] = df["is_sell"].astype("int8")
    df = df.sort_values("timestamp").reset_index(drop=True)
    
    return df

In [None]:
# Fetch data per symbol to manage memory
symbol_data = {}
for symbol in PAIRS:
    print(f"\nFetching {symbol}")
    symbol_data[symbol] = fetch_symbol_data(symbol, DAYS)
    print(f"  {len(symbol_data[symbol]):,} trades")

total_trades = sum(len(df) for df in symbol_data.values())
print(f"\nTotal trades: {total_trades:,}")

## 2. Enhanced Feature Engineering with Cross-Sectional

In [None]:
def compute_base_features(df_sym: pd.DataFrame, symbol: str) -> pd.DataFrame:
    """
    Compute base features for a single symbol.
    Cross-sectional features added later across all symbols.
    """
    df_sym = df_sym.copy()
    df_sym["signed_qty"] = np.where(df_sym["is_sell"], -df_sym["quantity"], df_sym["quantity"])
    
    # Resample to 250ms bars
    bars = (
        df_sym
        .set_index("timestamp")
        .resample("250ms")
        .agg(
            price=("price", "last"),
            qty=("quantity", "sum"),
            signed_qty=("signed_qty", "sum"),
            trade_count=("quantity", "count"),
        )
        .dropna(subset=["price"])
    )
    bars["price"] = bars["price"].ffill()
    bars = bars.reset_index()
    bars["symbol"] = symbol
    
    # ============ ORDER FLOW FEATURES ============
    bars["MOI_250ms"] = bars["signed_qty"].rolling(1).sum()
    bars["MOI_1s"] = bars["signed_qty"].rolling(4).sum()
    bars["MOI_5s"] = bars["signed_qty"].rolling(20).sum()
    bars["MOI_std"] = bars["MOI_1s"].rolling(100).std()
    bars["MOI_z"] = bars["MOI_1s"].abs() / (bars["MOI_std"] + 1e-6)
    bars["delta_velocity"] = bars["MOI_1s"].diff()
    bars["delta_velocity_5s"] = bars["MOI_1s"].diff(20)
    
    # Aggression persistence
    abs_moi = bars["MOI_1s"].abs()
    mean_moi = abs_moi.rolling(100).mean()
    std_moi = abs_moi.rolling(100).std()
    bars["AggressionPersistence"] = mean_moi / (std_moi + 1e-6)
    
    # MOI flip rate
    moi_sign = np.sign(bars["MOI_1s"])
    sign_change = (moi_sign != moi_sign.shift(1)).astype(int)
    bars["MOI_flip_rate"] = sign_change.rolling(240).sum()
    
    # Order flow momentum
    bars["MOI_roc_1s"] = bars["MOI_1s"].pct_change(4).clip(-10, 10)
    bars["MOI_roc_5s"] = bars["MOI_1s"].pct_change(20).clip(-10, 10)
    bars["MOI_acceleration"] = bars["delta_velocity"].diff()
    
    # ============ ABSORPTION FEATURES ============
    price_change = bars["price"].diff().abs().clip(lower=1e-6)
    bars["absorption_raw"] = bars["qty"] / price_change
    bars["absorption_z"] = (
        (bars["absorption_raw"] - bars["absorption_raw"].rolling(500).mean()) /
        (bars["absorption_raw"].rolling(500).std() + 1e-6)
    )
    bars["price_impact"] = price_change / (bars["qty"] + 1e-6)
    bars["price_impact_z"] = (
        (bars["price_impact"] - bars["price_impact"].rolling(500).mean()) /
        (bars["price_impact"].rolling(500).std() + 1e-6)
    )
    
    # ============ VOLATILITY FEATURES ============
    bars["ret"] = bars["price"].pct_change()
    bars["vol_1m"] = bars["ret"].rolling(240).std()
    bars["vol_5m"] = bars["ret"].rolling(1200).std()
    bars["vol_ratio"] = bars["vol_1m"] / (bars["vol_5m"] + 1e-8)
    bars["vol_rank"] = bars["vol_5m"].rolling(2000).rank(pct=True)
    
    # ATR for triple barrier
    bars["atr_5m"] = bars["ret"].abs().rolling(1200).mean() * bars["price"]
    
    # Vol regime
    bars["vol_regime"] = pd.cut(
        bars["vol_rank"],
        bins=[-np.inf, 0.3, 0.7, np.inf],
        labels=["LOW", "MID", "HIGH"]
    )
    
    # ============ STRUCTURE FEATURES ============
    BIN_SIZE = 10
    LVN_BLOCK = 1200
    
    bars["price_bin"] = (bars["price"] / BIN_SIZE).round() * BIN_SIZE
    lvn_price = np.full(len(bars), np.nan)
    poc_price = np.full(len(bars), np.nan)
    
    for i in range(0, len(bars), LVN_BLOCK):
        window = bars.iloc[i:i+LVN_BLOCK]
        if window["qty"].sum() == 0:
            continue
        vp = window.groupby("price_bin")["qty"].sum()
        lvn_price[i:i+LVN_BLOCK] = vp.idxmin()
        poc_price[i:i+LVN_BLOCK] = vp.idxmax()
    
    bars["LVN_price"] = lvn_price
    bars["POC_price"] = poc_price
    bars["dist_lvn"] = (bars["price"] - bars["LVN_price"]).abs()
    bars["dist_poc"] = (bars["price"] - bars["POC_price"]).abs()
    bars["dist_lvn_atr"] = bars["dist_lvn"] / (bars["atr_5m"] + 1e-6)
    bars["dist_poc_atr"] = bars["dist_poc"] / (bars["atr_5m"] + 1e-6)
    
    # ============ TIME FEATURES ============
    bars["hour"] = bars["timestamp"].dt.hour
    bars["hour_sin"] = np.sin(2 * np.pi * bars["hour"] / 24)
    bars["hour_cos"] = np.cos(2 * np.pi * bars["hour"] / 24)
    bars["is_weekend"] = (bars["timestamp"].dt.dayofweek >= 5).astype(int)
    
    # ============ TRADE INTENSITY ============
    bars["trade_intensity"] = bars["trade_count"].rolling(100).mean()
    bars["trade_intensity_z"] = (
        (bars["trade_count"] - bars["trade_count"].rolling(500).mean()) /
        (bars["trade_count"].rolling(500).std() + 1e-6)
    )
    
    # ============ CUMULATIVE FEATURES ============
    bars["cum_delta_1m"] = bars["signed_qty"].rolling(240).sum()
    bars["cum_delta_5m"] = bars["signed_qty"].rolling(1200).sum()
    
    # Convert to float32
    float_cols = bars.select_dtypes(include=[np.float64]).columns
    bars[float_cols] = bars[float_cols].astype(np.float32)
    
    return bars

In [None]:
# Process all symbols
all_bars = {}

for symbol in PAIRS:
    print(f"Processing {symbol}")
    if len(symbol_data[symbol]) > 0:
        all_bars[symbol] = compute_base_features(symbol_data[symbol], symbol)
        print(f"  {len(all_bars[symbol]):,} bars")
    
    # Free memory
    del symbol_data[symbol]
    gc.collect()

del symbol_data
gc.collect()

In [None]:
def add_cross_sectional_features(all_bars: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
    """
    Add cross-sectional features: rank features across all symbols at each timestamp.
    
    This captures relative strength - which symbol is leading/lagging.
    """
    print("Adding cross-sectional features...")
    
    # Features to rank across symbols
    rank_features = ["MOI_1s", "MOI_5s", "vol_5m", "absorption_z", "cum_delta_5m"]
    
    # Get common timestamps (rounded to 250ms)
    for symbol, bars in all_bars.items():
        bars["ts_key"] = bars["timestamp"].dt.floor("250ms")
    
    # For each feature, compute rank across symbols
    for feature in tqdm(rank_features, desc="Cross-sectional"):
        # Build cross-sectional dataframe
        cross_df = pd.DataFrame()
        for symbol, bars in all_bars.items():
            temp = bars[["ts_key", feature]].copy()
            temp = temp.rename(columns={feature: symbol})
            if cross_df.empty:
                cross_df = temp
            else:
                cross_df = cross_df.merge(temp, on="ts_key", how="outer")
        
        # Compute rank (0-1) across symbols for each timestamp
        symbol_cols = [s for s in PAIRS if s in cross_df.columns]
        cross_df["rank_data"] = cross_df[symbol_cols].rank(axis=1, pct=True)
        
        # Add rank back to each symbol's bars
        for symbol in symbol_cols:
            rank_col = f"{feature}_rank"
            # Get rank for this symbol
            symbol_ranks = cross_df[["ts_key", symbol]].copy()
            symbol_ranks[rank_col] = cross_df[symbol_cols].rank(axis=1, pct=True)[symbol]
            symbol_ranks = symbol_ranks[["ts_key", rank_col]]
            
            # Merge back
            all_bars[symbol] = all_bars[symbol].merge(
                symbol_ranks, on="ts_key", how="left"
            )
            all_bars[symbol][rank_col] = all_bars[symbol][rank_col].fillna(0.5).astype(np.float32)
        
        del cross_df
        gc.collect()
    
    # Remove ts_key
    for symbol in all_bars:
        all_bars[symbol] = all_bars[symbol].drop(columns=["ts_key"])
    
    return all_bars

In [None]:
all_bars = add_cross_sectional_features(all_bars)
print(f"\nFeatures per symbol: {len(all_bars[PAIRS[0]].columns)}")

## 3. Feature Selection and Decision Points

In [None]:
# Extended feature columns with cross-sectional
FEATURE_COLS = [
    # Order flow (7)
    "MOI_250ms", "MOI_1s", "MOI_5s", "MOI_z",
    "delta_velocity", "delta_velocity_5s", "AggressionPersistence",
    
    # Order flow momentum (3)
    "MOI_roc_1s", "MOI_roc_5s", "MOI_acceleration",
    
    # Absorption (3)
    "absorption_z", "price_impact_z", "MOI_flip_rate",
    
    # Volatility (4)
    "vol_1m", "vol_5m", "vol_ratio", "vol_rank",
    
    # Structure (4)
    "dist_lvn", "dist_poc", "dist_lvn_atr", "dist_poc_atr",
    
    # Time (3)
    "hour_sin", "hour_cos", "is_weekend",
    
    # Trade intensity (2)
    "trade_intensity", "trade_intensity_z",
    
    # Cumulative (2)
    "cum_delta_1m", "cum_delta_5m",
    
    # Cross-sectional ranks (5) - NEW
    "MOI_1s_rank", "MOI_5s_rank", "vol_5m_rank", 
    "absorption_z_rank", "cum_delta_5m_rank",
]

print(f"Total features: {len(FEATURE_COLS)}")

In [None]:
# Create decision points with stricter filtering
all_decisions = []

for symbol in PAIRS:
    print(f"Creating decision points for {symbol}")
    
    bars_sym = all_bars[symbol].copy()
    bars_sym = bars_sym.dropna(subset=FEATURE_COLS)
    
    # Adaptive thresholds
    bars_sym["MOI_thresh"] = bars_sym["MOI_1s"].abs().rolling(2000).quantile(0.85)
    bars_sym["LVN_thresh"] = bars_sym["dist_lvn_atr"].rolling(2000).quantile(0.15)
    bars_sym["absorption_thresh"] = bars_sym["absorption_z"].abs().rolling(2000).quantile(0.85)
    
    # Decision mask: require stronger conditions
    decision_mask = (
        (bars_sym["dist_lvn_atr"] < bars_sym["LVN_thresh"]) |  # Near LVN
        (bars_sym["absorption_z"].abs() > bars_sym["absorption_thresh"]) |  # Absorption
        (bars_sym["MOI_1s"].abs() > bars_sym["MOI_thresh"]) |  # Strong flow
        (bars_sym["vol_ratio"] > 1.8)  # Vol expansion
    )
    
    df_decision_sym = bars_sym.loc[decision_mask].copy()
    df_decision_sym["bar_idx"] = df_decision_sym.index
    all_decisions.append(df_decision_sym)
    
    print(f"  {len(df_decision_sym):,} decision points ({100*len(df_decision_sym)/len(bars_sym):.1f}%)")

df_decision = pd.concat(all_decisions, ignore_index=True)
print(f"\nTotal decision points: {len(df_decision):,}")

del all_decisions
gc.collect()

In [None]:
# Convert features to float32
for col in FEATURE_COLS:
    if col in df_decision.columns:
        df_decision[col] = df_decision[col].astype(np.float32)

# One-hot encode symbols
pair_ohe = pd.get_dummies(df_decision["symbol"], prefix="pair", dtype="int8")

# Final feature columns
FEATURE_COLUMNS = FEATURE_COLS + pair_ohe.columns.tolist()
print(f"Final feature count: {len(FEATURE_COLUMNS)}")

# Create X matrix
X = np.hstack([
    df_decision[FEATURE_COLS].values,
    pair_ohe.values.astype(np.float32)
])
print(f"X shape: {X.shape}, dtype: {X.dtype}")
print(f"Memory: {X.nbytes / 1e9:.2f} GB")

del pair_ohe
gc.collect()

In [None]:
# Save feature columns
with open("feature_columns_v3.json", "w") as f:
    json.dump(FEATURE_COLUMNS, f)

## 4. Triple-Barrier Labeling

In [None]:
def triple_barrier_label(
    bars: pd.DataFrame,
    entry_idx: int,
    direction: str,  # "up" or "down"
    tp_mult: float = 2.0,
    sl_mult: float = 1.0,
    max_bars: int = 1200,
    fee_pct: float = 0.0008,  # Round trip
) -> Tuple[float, str]:
    """
    Triple-barrier labeling:
    - TP barrier: tp_mult * ATR
    - SL barrier: sl_mult * ATR  
    - Time barrier: max_bars
    
    Returns:
    - score: Risk-adjusted return (positive = profitable)
    - exit_type: "TP", "SL", or "TIME"
    """
    entry_price = bars.loc[entry_idx, "price"]
    atr = bars.loc[entry_idx, "atr_5m"]
    
    if pd.isna(atr) or atr <= 0:
        return 0.0, "SKIP"
    
    # Define barriers
    if direction == "up":
        tp_price = entry_price * (1 + tp_mult * atr / entry_price)
        sl_price = entry_price * (1 - sl_mult * atr / entry_price)
    else:  # down
        tp_price = entry_price * (1 - tp_mult * atr / entry_price)
        sl_price = entry_price * (1 + sl_mult * atr / entry_price)
    
    # Look forward
    end_idx = min(entry_idx + max_bars, len(bars) - 1)
    
    for i in range(entry_idx + 1, end_idx + 1):
        price = bars.loc[i, "price"]
        
        if direction == "up":
            if price >= tp_price:
                pnl = (tp_price - entry_price) / entry_price - fee_pct
                return pnl / (sl_mult * atr / entry_price), "TP"
            if price <= sl_price:
                pnl = (sl_price - entry_price) / entry_price - fee_pct
                return pnl / (sl_mult * atr / entry_price), "SL"
        else:
            if price <= tp_price:
                pnl = (entry_price - tp_price) / entry_price - fee_pct
                return pnl / (sl_mult * atr / entry_price), "TP"
            if price >= sl_price:
                pnl = (entry_price - sl_price) / entry_price - fee_pct
                return pnl / (sl_mult * atr / entry_price), "SL"
    
    # Time exit
    exit_price = bars.loc[end_idx, "price"]
    if direction == "up":
        pnl = (exit_price - entry_price) / entry_price - fee_pct
    else:
        pnl = (entry_price - exit_price) / entry_price - fee_pct
    
    return pnl / (sl_mult * atr / entry_price), "TIME"

In [None]:
def create_triple_barrier_labels(
    all_bars: Dict[str, pd.DataFrame],
    df_decision: pd.DataFrame,
    X: np.ndarray,
) -> Tuple[Dict, Dict]:
    """
    Create labels using triple-barrier method.
    Separates by direction and volatility regime.
    """
    X_dict = {
        "up_low": [], "up_mid": [], "up_high": [],
        "down_low": [], "down_mid": [], "down_high": []
    }
    y_dict = {
        "up_low": [], "up_mid": [], "up_high": [],
        "down_low": [], "down_mid": [], "down_high": []
    }
    exit_stats = {"TP": 0, "SL": 0, "TIME": 0, "SKIP": 0}
    
    for symbol in PAIRS:
        print(f"Labeling {symbol}")
        
        bars_sym = all_bars[symbol].reset_index(drop=True)
        dec_sym = df_decision[df_decision["symbol"] == symbol]
        
        for i, row in tqdm(dec_sym.iterrows(), total=len(dec_sym), desc=symbol):
            idx = int(row["bar_idx"])
            regime = row["vol_regime"]
            
            if pd.isna(regime):
                continue
            
            # Try both directions, pick better one
            score_up, exit_up = triple_barrier_label(
                bars_sym, idx, "up", TP_MULT, SL_MULT, MAX_HOLDING_BARS, ROUND_TRIP_FEE
            )
            score_down, exit_down = triple_barrier_label(
                bars_sym, idx, "down", TP_MULT, SL_MULT, MAX_HOLDING_BARS, ROUND_TRIP_FEE
            )
            
            if exit_up == "SKIP" and exit_down == "SKIP":
                exit_stats["SKIP"] += 1
                continue
            
            # Pick direction with better score
            features = X[row.name]
            
            if score_up > score_down and score_up > -0.5:  # Allow slightly negative
                key = f"up_{regime.lower()}"
                X_dict[key].append(features)
                y_dict[key].append(max(0, score_up))  # Clip to 0
                exit_stats[exit_up] += 1
            elif score_down > -0.5:
                key = f"down_{regime.lower()}"
                X_dict[key].append(features)
                y_dict[key].append(max(0, score_down))
                exit_stats[exit_down] += 1
    
    # Convert to numpy
    for key in X_dict:
        X_dict[key] = np.array(X_dict[key], dtype=np.float32)
        y_arr = np.array(y_dict[key], dtype=np.float32)
        # Log transform for better distribution
        y_dict[key] = np.log1p(y_arr)
        print(f"{key}: {len(X_dict[key]):,} samples")
    
    print(f"\nExit stats: {exit_stats}")
    return X_dict, y_dict

In [None]:
X_data, y_data = create_triple_barrier_labels(all_bars, df_decision, X)

## 5. Optuna Hyperparameter Optimization

In [None]:
def purged_walk_forward_splits(n: int, n_splits: int = 5, purge_pct: float = 0.01):
    """Walk-forward splits with purging"""
    fold_size = n // (n_splits + 1)
    purge_size = int(fold_size * purge_pct)
    
    for i in range(n_splits):
        tr_end = fold_size * (i + 1) - purge_size
        va_start = fold_size * (i + 1) + purge_size
        va_end = fold_size * (i + 2)
        
        yield np.arange(0, tr_end), np.arange(va_start, va_end)


def objective(trial, X, y, feature_columns):
    """Optuna objective for hyperparameter tuning"""
    params = {
        "n_estimators": trial.suggest_int("n_estimators", 500, 1500),
        "max_depth": trial.suggest_int("max_depth", 5, 10),
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.1, log=True),
        "subsample": trial.suggest_float("subsample", 0.6, 0.9),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 0.9),
        "min_child_samples": trial.suggest_int("min_child_samples", 30, 100),
        "reg_alpha": trial.suggest_float("reg_alpha", 0.01, 1.0, log=True),
        "reg_lambda": trial.suggest_float("reg_lambda", 0.01, 1.0, log=True),
        "objective": "huber",
        "alpha": 0.9,
        "random_state": 42,
        "n_jobs": -1,
        "verbose": -1,
    }
    
    X_df = pd.DataFrame(X, columns=feature_columns)
    
    maes = []
    for tr_idx, va_idx in purged_walk_forward_splits(len(X), n_splits=3):
        model = lgb.LGBMRegressor(**params)
        model.fit(
            X_df.iloc[tr_idx], y[tr_idx],
            eval_set=[(X_df.iloc[va_idx], y[va_idx])],
            callbacks=[lgb.early_stopping(50, verbose=False)],
        )
        preds = model.predict(X_df.iloc[va_idx])
        maes.append(mean_absolute_error(y[va_idx], preds))
    
    return np.mean(maes)


def optimize_hyperparameters(X, y, feature_columns, n_trials=30):
    """Run Optuna optimization"""
    if len(X) < 5000:
        print("Not enough data for optimization, using defaults")
        return None
    
    study = optuna.create_study(direction="minimize")
    study.optimize(
        lambda trial: objective(trial, X, y, feature_columns),
        n_trials=n_trials,
        show_progress_bar=True,
    )
    
    print(f"Best MAE: {study.best_value:.4f}")
    print(f"Best params: {study.best_params}")
    
    return study.best_params

In [None]:
# Optimize for one regime to get base params (saves time)
print("Optimizing hyperparameters on up_high (largest dataset)...")
best_params = optimize_hyperparameters(
    X_data["up_high"], 
    y_data["up_high"], 
    FEATURE_COLUMNS,
    n_trials=25
)

## 6. Train Final Models with Optimized Params

In [None]:
def train_ensemble_model(
    X: np.ndarray,
    y: np.ndarray,
    name: str,
    feature_columns: List[str],
    best_params: Optional[Dict] = None,
    n_splits: int = 5
) -> Tuple[List, Dict]:
    """Train ensemble with optimized params"""
    if len(X) < 1000:
        print(f"Insufficient data for {name}: {len(X)} samples")
        return [], {}
    
    X_df = pd.DataFrame(X, columns=feature_columns)
    
    # Use best params or defaults
    params = {
        "n_estimators": best_params.get("n_estimators", 1000) if best_params else 1000,
        "max_depth": best_params.get("max_depth", 7) if best_params else 7,
        "learning_rate": best_params.get("learning_rate", 0.02) if best_params else 0.02,
        "subsample": best_params.get("subsample", 0.7) if best_params else 0.7,
        "colsample_bytree": best_params.get("colsample_bytree", 0.7) if best_params else 0.7,
        "min_child_samples": best_params.get("min_child_samples", 50) if best_params else 50,
        "reg_alpha": best_params.get("reg_alpha", 0.1) if best_params else 0.1,
        "reg_lambda": best_params.get("reg_lambda", 0.1) if best_params else 0.1,
        "objective": "huber",
        "alpha": 0.9,
        "n_jobs": -1,
        "verbose": -1,
    }
    
    models = []
    metrics = {"maes": [], "rmses": [], "top10_actual": [], "top25_actual": []}
    
    print(f"\n{'='*60}")
    print(f"Training {name} ({len(X):,} samples)")
    print(f"{'='*60}")
    
    for fold, (tr_idx, va_idx) in enumerate(purged_walk_forward_splits(len(X_df), n_splits)):
        model = lgb.LGBMRegressor(**params, random_state=42 + fold)
        
        model.fit(
            X_df.iloc[tr_idx], y[tr_idx],
            eval_set=[(X_df.iloc[va_idx], y[va_idx])],
            eval_metric="l1",
            callbacks=[lgb.early_stopping(100, verbose=False)],
        )
        
        preds = model.predict(X_df.iloc[va_idx])
        actual = y[va_idx]
        
        mae = mean_absolute_error(actual, preds)
        rmse = np.sqrt(mean_squared_error(actual, preds))
        
        # Top percentile analysis
        for q, key in [(90, "top10_actual"), (75, "top25_actual")]:
            thresh = np.percentile(preds, q)
            mask = preds >= thresh
            if mask.sum() > 0:
                metrics[key].append(actual[mask].mean())
        
        metrics["maes"].append(mae)
        metrics["rmses"].append(rmse)
        models.append(model)
        
        print(f"Fold {fold}: MAE={mae:.4f}, RMSE={rmse:.4f}")
    
    print(f"\n{name} Summary:")
    print(f"  Mean MAE: {np.mean(metrics['maes']):.4f}")
    print(f"  Mean RMSE: {np.mean(metrics['rmses']):.4f}")
    print(f"  Target STD: {np.std(y):.4f}")
    print(f"  MAE/STD: {np.mean(metrics['maes'])/np.std(y):.4f}")
    if metrics['top10_actual']:
        print(f"  Top 10% mean actual: {np.mean(metrics['top10_actual']):.4f}")
        print(f"  Top 25% mean actual: {np.mean(metrics['top25_actual']):.4f}")
    
    return models, metrics

In [None]:
# Train all models
models_all = {}

for key in ["up_low", "up_mid", "up_high", "down_low", "down_mid", "down_high"]:
    models, metrics = train_ensemble_model(
        X_data[key], y_data[key],
        f"{key.upper()}",
        FEATURE_COLUMNS,
        best_params=best_params,
    )
    models_all[key] = models

## 7. Feature Importance Analysis

In [None]:
def get_ensemble_feature_importance(models_dict: Dict, feature_cols: List[str]) -> pd.DataFrame:
    """Average feature importance across all models"""
    all_importances = []
    
    for key, models in models_dict.items():
        for model in models:
            imp = pd.DataFrame({
                "feature": feature_cols,
                "importance": model.feature_importances_,
                "model": key
            })
            all_importances.append(imp)
    
    if not all_importances:
        return pd.DataFrame()
    
    df_imp = pd.concat(all_importances)
    return df_imp.groupby("feature")["importance"].mean().sort_values(ascending=False)

fi = get_ensemble_feature_importance(models_all, FEATURE_COLUMNS)
print("Top 20 Features:")
print(fi.head(20))

## 8. Save Models

In [None]:
os.makedirs("models_v3", exist_ok=True)

# Save all models (single horizon since triple-barrier is time-adaptive)
for key, models in models_all.items():
    if models:
        path = f"models_v3/models_{key}.pkl"
        joblib.dump(models, path)
        print(f"Saved {path}")

# Save best hyperparameters
if best_params:
    with open("models_v3/best_params.json", "w") as f:
        json.dump(best_params, f, indent=2)
    print("Saved best_params.json")

print("\nDone! Models saved to models_v3/")

## Summary

**V3 Enhancements:**

1. **Cross-Sectional Features**: 5 rank-based features comparing each symbol to others
2. **Triple-Barrier Labels**: Realistic TP (2x ATR), SL (1x ATR), Time (5 min) exits
3. **Optuna Optimization**: 25 trials to find best hyperparameters
4. **21 Days Data**: 50% more training data than V2
5. **Memory Optimized**: Per-symbol processing, float32 throughout

**To use:**
1. Copy `models_v3/*.pkl` to production
2. Update `feature_columns_v3.json`
3. Update predictor to compute cross-sectional features