<a href="https://colab.research.google.com/github/jadenfix/mft_crypto_research/blob/main/GPU_Optimized_ML_Trading_Script.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install polars pandas scikit-learn pyarrow catboost optuna lightgbm xgboost ta Backtesting matplotlib
import polars as pl
import pandas as pd
import numpy as np
from sklearn.model_selection import KFold, cross_val_score
from sklearn.preprocessing import StandardScaler # Not used in this version, but kept if needed later
from sklearn.metrics import classification_report, mean_absolute_error # classification_report not used
import ta
from sklearn.ensemble import RandomForestRegressor
from xgboost import XGBRegressor
from catboost import CatBoostRegressor
import lightgbm as lgb
import optuna
from backtesting import Backtest, Strategy
import matplotlib.pyplot as plt
import os
import tempfile
import traceback

# --- Configuration Section ---
# Define your intended TRAIN_PATHS and BACKTEST_PATHS.
# If your files are located elsewhere, use absolute paths here.
INTENDED_TRAIN_PATHS = {
    "ada": "processed_data_ada_long.csv",
    "btc": "processed_data_btc.csv",
    "eth": "processed_data_eth_long.csv",
    "sol": "processed_data_solana_long.csv",
}

INTENDED_BACKTEST_PATHS = {
    "ada": "2024_to_april_2025_ada_data.csv",
    "btc": "2024_to_april_2025_btc_data.csv",
    "eth": "2024_to_april_2025_eth_data.csv",
    "sol": "2024_to_april_2025_solana_data.csv",
}

TARGET_COL = {
    "ada": "target_next_close_ada",
    "btc": "target_next_close_btc",
    "eth": "target_next_close_eth",
    "sol": "target_next_close_solana",
}

PREFIX = {
    "ada": "ada",
    "btc": "btc",
    "eth": "eth",
    "sol": "solana",
}

# --- Global temporary directory for dummy files ---
try:
    _temp_dir_for_dummy_files = tempfile.mkdtemp(prefix="pydummy_ml_data_")
    print(f"INFO: Dummy files, if created, will be stored in: {_temp_dir_for_dummy_files}")
except Exception as e:
    print(f"WARNING: Could not create a global temporary directory ({e}). Falling back to current directory for dummy files, which might fail if read-only.")
    _temp_dir_for_dummy_files = "."


# --- Dummy file creation function (OSError Fix) ---
def create_dummy_csv_if_not_exists(asset_label, original_path_from_config, is_training, base_temp_dir):
    """
    Checks if a file exists at original_path_from_config.
    If not, creates a dummy CSV in base_temp_dir and returns its path.
    Otherwise, returns original_path_from_config.
    """
    if os.path.exists(original_path_from_config):
        print(f"INFO: File found at {original_path_from_config}. Using existing file for asset '{asset_label}'.")
        return original_path_from_config

    original_filename = os.path.basename(original_path_from_config)
    dummy_file_path = os.path.join(base_temp_dir, original_filename)
    print(f"WARNING: File {original_path_from_config} for asset '{asset_label}' not found. Creating a dummy CSV at: {dummy_file_path}")

    num_rows = 500 # Increased rows for more stable TA in dummy data
    if asset_label not in PREFIX:
        raise ValueError(f"Asset label '{asset_label}' not found in PREFIX. Cannot generate dummy columns.")
    if is_training and asset_label not in TARGET_COL:
         raise ValueError(f"Asset label '{asset_label}' not found in TARGET_COL. Cannot generate dummy target column.")

    asset_p = PREFIX[asset_label]
    dummy_data = {}

    if is_training:
        dummy_data['timestamp'] = pd.to_datetime(pd.date_range(start='2022-01-01', periods=num_rows, freq='T')).strftime('%Y-%m-%dT%H:%M:%S.%f')
    else:
        datetime_series = pd.to_datetime(pd.date_range(start='2024-01-01', periods=num_rows, freq='T'))
        dummy_data['date_only'] = datetime_series.strftime('%Y-%m-%d')
        dummy_data['time_only'] = datetime_series.strftime('%H:%M:%S')

    # More realistic OHLCV data
    price_open = np.abs(np.random.normal(loc=100, scale=20, size=num_rows))
    price_open[0] = 100 # Start at a defined point
    price_open = np.maximum(price_open, 1) # Ensure positive prices

    dummy_data[f'{asset_p}_open'] = price_open
    dummy_data[f'{asset_p}_high'] = price_open + np.abs(np.random.normal(loc=2, scale=1, size=num_rows))
    dummy_data[f'{asset_p}_low'] = price_open - np.abs(np.random.normal(loc=2, scale=1, size=num_rows))
    dummy_data[f'{asset_p}_low'] = np.minimum(dummy_data[f'{asset_p}_low'], dummy_data[f'{asset_p}_open']) # Ensure low <= open
    dummy_data[f'{asset_p}_low'] = np.maximum(dummy_data[f'{asset_p}_low'], 0.1) # Ensure positive low
    dummy_data[f'{asset_p}_high'] = np.maximum(dummy_data[f'{asset_p}_high'], dummy_data[f'{asset_p}_open']) # Ensure high >= open

    # Close price is within low and high
    dummy_data[f'{asset_p}_close'] = np.random.uniform(dummy_data[f'{asset_p}_low'], dummy_data[f'{asset_p}_high'])
    dummy_data[f'{asset_p}_volume'] = np.abs(np.random.normal(loc=1000, scale=500, size=num_rows)) + 100 # Ensure positive volume

    if is_training:
        target_col_name = TARGET_COL[asset_label]
        dummy_data[target_col_name] = dummy_data[f'{asset_p}_close'] * (1 + np.random.normal(loc=0, scale=0.01, size=num_rows))

    df = pd.DataFrame(dummy_data)
    try:
        os.makedirs(os.path.dirname(dummy_file_path), exist_ok=True)
        df.to_csv(dummy_file_path, index=False)
        print(f"INFO: Successfully created dummy file for asset '{asset_label}': {dummy_file_path}")
        return dummy_file_path
    except OSError as e:
        print(f"CRITICAL ERROR: Could not write dummy CSV for asset '{asset_label}' to {dummy_file_path}. OS error: {e}")
        raise

# --- Paths to be used by the script (potentially updated to dummy file paths) ---
ACTUAL_TRAIN_PATHS = INTENDED_TRAIN_PATHS.copy()
ACTUAL_BACKTEST_PATHS = INTENDED_BACKTEST_PATHS.copy()

print("\n--- Checking for training data files and creating dummies if necessary ---")
for asset_key in list(ACTUAL_TRAIN_PATHS.keys()):
    original_path = ACTUAL_TRAIN_PATHS[asset_key]
    ACTUAL_TRAIN_PATHS[asset_key] = create_dummy_csv_if_not_exists(
        asset_key, original_path, is_training=True, base_temp_dir=_temp_dir_for_dummy_files
    )

print("\n--- Checking for backtest data files and creating dummies if necessary ---")
for asset_key in list(ACTUAL_BACKTEST_PATHS.keys()):
    original_path = ACTUAL_BACKTEST_PATHS[asset_key]
    ACTUAL_BACKTEST_PATHS[asset_key] = create_dummy_csv_if_not_exists(
        asset_key, original_path, is_training=False, base_temp_dir=_temp_dir_for_dummy_files
    )

# --- Data Loading Functions ---
def load_training(asset):
    print(f"Loading training data for {asset} from: {ACTUAL_TRAIN_PATHS[asset]}")
    return (
        pl.read_csv(ACTUAL_TRAIN_PATHS[asset], infer_schema_length=100_000)
        .with_columns(
            pl.col("timestamp")
              .str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%S%.f", strict=False)
              .alias("ts")
        )
        .sort("ts")
    )

def load_backtest(asset):
    print(f"Loading backtest data for {asset} from: {ACTUAL_BACKTEST_PATHS[asset]}")
    return (
        pl.read_csv(ACTUAL_BACKTEST_PATHS[asset], infer_schema_length=100_000)
        .with_columns(
            (
                pl.concat_str([pl.col("date_only"), pl.lit(" "), pl.col("time_only")])
                .str.strptime(pl.Datetime, format="%Y-%m-%d %H:%M:%S", strict=False)
            ).alias("ts")
        )
        .sort("ts")
    )

print("\n--- Loading data ---")
raw_train = {a: load_training(a)  for a in ACTUAL_TRAIN_PATHS}
raw_bt    = {a: load_backtest(a)  for a in ACTUAL_BACKTEST_PATHS}


# --- Feature Engineering ---
def engineer_features(df: pl.DataFrame, asset: str, lags: int = 5) -> pl.DataFrame:
    if df.is_empty():
        print(f"Warning: Input DataFrame for engineer_features for asset '{asset}' is empty. Returning empty DataFrame.")
        return pl.DataFrame()

    pref = PREFIX[asset]
    ohlcv_cols_mapping = {}
    rename_map_for_ta = {} # This will map original column names to "open", "high", "low", "close", "volume"

    for col_type in ["open", "high", "low", "close", "volume"]:
        prefixed_name = f"{pref}_{col_type}"
        if prefixed_name in df.columns:
            rename_map_for_ta[prefixed_name] = col_type
        elif col_type in df.columns: # Fallback to generic name
            rename_map_for_ta[col_type] = col_type

    if not all(ct in rename_map_for_ta.values() for ct in ["high", "low", "close"]):
        raise ValueError(f"Asset {asset}: Could not find required HLC columns for TA. Found mappings: {rename_map_for_ta}. Available df columns: {df.columns}")

    base = df.select(list(rename_map_for_ta.keys())).rename(rename_map_for_ta)
    p = base.to_pandas()

    for col_name in ["open", "high", "low", "close", "volume"]:
        if col_name in p.columns:
            p[col_name] = pd.to_numeric(p[col_name], errors='coerce')

    p.fillna(method='ffill', inplace=True) # Fill NaNs before TA
    p.fillna(method='bfill', inplace=True) # Fill remaining NaNs at the beginning

    if p["close"].isnull().all() or p["high"].isnull().all() or p["low"].isnull().all():
        print(f"Warning: Asset {asset} - HLC columns are all NaN after processing. TA features will be NaN.")
        ta_feature_names = ["rsi14", "stochk", "roc10", "macd", "bb_w", "atr14", "obv"]
        for ta_f in ta_feature_names: p[ta_f] = np.nan
    else:
        p["rsi14"]  = ta.momentum.RSIIndicator(p["close"], 14, fillna=True).rsi()
        p["stochk"] = ta.momentum.StochasticOscillator(p["high"], p["low"], p["close"], 14, fillna=True).stoch()
        p["roc10"]  = ta.momentum.ROCIndicator(p["close"], 10, fillna=True).roc()
        p["macd"]   = ta.trend.MACD(p["close"], fillna=True).macd()
        p["bb_w"]   = ta.volatility.BollingerBands(p["close"], fillna=True).bollinger_wband()
        p["atr14"]  = ta.volatility.AverageTrueRange(p["high"], p["low"], p["close"], 14, fillna=True).average_true_range()
        if "volume" in p.columns and not p["volume"].isnull().all() and (p["volume"] != 0).any() :
            p["obv"] = ta.volume.OnBalanceVolumeIndicator(p["close"], p["volume"], fillna=True).on_balance_volume()
        else:
            p["obv"] = 0.0

    f = pl.from_pandas(p)
    cols_to_drop_after_ta = list(rename_map_for_ta.values())

    for w in (20, 60, 120):
        f = f.with_columns([
            pl.col("close").rolling_mean(w, min_periods=max(1, w//2)).alias(f"sma{w}"),
            pl.col("close").rolling_std(w, min_periods=max(1, w//2)).alias(f"vol{w}"),
            (pl.col("high") - pl.col("low")).rolling_mean(w, min_periods=max(1, w//2)).alias(f"hl_range{w}")
        ])
    for l_val in range(1, lags + 1):
        f = f.with_columns([
            pl.col("close").shift(l_val).alias(f"close_lag{l_val}"),
            pl.col("rsi14").shift(l_val).alias(f"rsi_lag{l_val}")
        ])
    return f.drop(cols_to_drop_after_ta)

# --- Verify counts ---
print("\n--- Verifying feature counts ---")
for asset_key in ACTUAL_TRAIN_PATHS:
    if asset_key in raw_train and not raw_train[asset_key].is_empty():
        try:
            engineered_df = engineer_features(raw_train[asset_key], asset_key)
            if not engineered_df.is_empty():
                feat_cols = engineered_df.columns
                print(f"{asset_key}: {len(feat_cols)} engineered vars → {feat_cols[:8]}")
            else:
                print(f"{asset_key}: Feature engineering resulted in an empty DataFrame.")
        except Exception as e_eng:
            print(f"Error engineering features for {asset_key}: {e_eng}")
            traceback.print_exc()
    else:
        print(f"{asset_key}: Raw training data is empty or not loaded for feature count verification.")


# --- Build X / y ---
def build_matrix(asset: str):
    if asset not in raw_train or raw_train[asset].is_empty():
        print(f"Warning: Asset {asset} - Raw training data is empty for build_matrix.")
        return np.array([]).reshape(0,0), np.array([])

    feats = engineer_features(raw_train[asset], asset)

    if feats.is_empty():
        print(f"Warning: Asset {asset} - Features DataFrame is empty before combining with target.")
        return np.array([]).reshape(0,0), np.array([])

    target_column_name = TARGET_COL[asset]
    if target_column_name not in raw_train[asset].columns:
        raise ValueError(f"Target column '{target_column_name}' not found in raw_train['{asset}']. Available: {raw_train[asset].columns}")

    num_feat_cols = feats.width

    # Combine features with target and then drop NaNs
    # Ensure target is aligned with features (Polars does this by row number if not joining by key)
    # It's safer to ensure row counts match or handle alignment explicitly if needed.
    # Assuming raw_train[asset] and feats have compatible row structures before this point.
    # If engineer_features drops rows, this needs careful handling.
    # For simplicity, assuming engineer_features preserves row order and count relative to what's needed for target.

    # A robust way: convert target to series, add as column, then drop nulls.
    # Polars handles Series addition by matching row numbers if DataFrame shapes are compatible.
    target_series = raw_train[asset][target_column_name]

    # If feature engineering changed row count (e.g. due to shifts creating initial NaNs that are not dropped yet)
    # we need to align target_series. This is complex.
    # Simpler: assume engineer_features outputs df with same row count as input for now, or handles it.
    # The drop_nulls after with_columns is key.

    mat = feats.with_columns(target_series.alias(target_column_name)).drop_nulls()

    if mat.is_empty():
        print(f"Warning: Asset {asset} - DataFrame is empty after combining features, target, and drop_nulls().")
        return np.array([]).reshape(0, num_feat_cols), np.array([])

    X = mat.drop(target_column_name).to_numpy()
    y = mat[target_column_name].to_numpy()
    return X, y

print("\n--- Building datasets (X, y matrices) ---")
datasets = {a: build_matrix(a) for a in ACTUAL_TRAIN_PATHS.keys()}

for a_key,(X_val,y_val) in datasets.items():
    print(f"{a_key}: {X_val.shape[1] if X_val.ndim == 2 and X_val.shape[0] > 0 else 0} features  |  samples {X_val.shape[0]}")

lengths = {a: y.shape[0] for a, (X, y) in datasets.items() if X.ndim == 2 and X.shape[0] > 0 and y.ndim == 1 and y.shape[0] > 0}
if not lengths:
    print("CRITICAL: All datasets are empty or have zero length after build_matrix. Cannot proceed with trimming or training.")
    min_n = 0
else:
    min_n = min(lengths.values())

print("Samples before trim (from valid datasets):", lengths)
print(f"→ Trimming all to {min_n} rows (smallest non-empty dataset's length)")

datasets_even = {}
for a_d, (X_d, y_d) in datasets.items():
    if X_d.ndim == 2 and X_d.shape[0] > 0 and y_d.ndim == 1 and y_d.shape[0] > 0 and X_d.shape[0] >= min_n and min_n > 0 :
        datasets_even[a_d] = (X_d[-min_n:], y_d[-min_n:])
    else:
        datasets_even[a_d] = (X_d, y_d) # Keep as is if conditions not met

for a_de, (X_e, y_e) in datasets_even.items():
    print(f"{a_de}: {X_e.shape[0]} samples, {X_e.shape[1] if X_e.ndim==2 and X_e.shape[0]>0 else 0} features after trimming attempt")

datasets = datasets_even


# ───────────────────────────────────────── 2. Model Tuning ─────────────────────────────────────────
print("\n--- Model Tuning Setup ---")

class PurgedKFold(KFold):
    def __init__(self, n_splits=8, embargo=10):
        super().__init__(n_splits=n_splits, shuffle=False)
        self.embargo = embargo

    def split(self, X, y=None, groups=None):
        if not hasattr(X, 'shape') or not hasattr(X, '__len__'): X_arr = np.asarray(X)
        else: X_arr = X
        if len(X_arr) < self.get_n_splits(X_arr, y, groups): return
        for train_idx, test_idx in super().split(X_arr, y, groups):
            purged_train_idx = train_idx[train_idx < test_idx[0] - self.embargo]
            if len(purged_train_idx) == 0: continue
            yield purged_train_idx, test_idx

def tune_model(model_cls, model_name: str, param_space, X, y, cv, gpu_params=None):
    if gpu_params is None: gpu_params = {}
    def objective(trial):
        params = {}
        for k, v_config in param_space.items():
            if isinstance(v_config, list): params[k] = trial.suggest_categorical(k, v_config)
            elif isinstance(v_config, tuple) and len(v_config) == 3 and v_config[2] == 'log': params[k] = trial.suggest_float(k, v_config[0], v_config[1], log=True)
            elif isinstance(v_config, tuple) and len(v_config) == 2: params[k] = trial.suggest_float(k, v_config[0], v_config[1])
            else: print(f"Warning: Unsupported param_space config for {k}: {v_config} in {model_name}.")

        current_params = {**params, **gpu_params} # Merge Optuna params with fixed GPU params

        if model_cls == CatBoostRegressor:
            current_params['verbose'] = 0
            if 'depth' in current_params and current_params['depth'] is not None: current_params['depth'] = int(current_params['depth'])

        # For LGBM, n_estimators is often passed as 'n_rounds' in CV, but here it's a direct param.
        # Max_depth = -1 is no limit for LGBM.
        # For XGB, n_estimators is a direct param.

        model = model_cls(**current_params, random_state=42)
        try:
            scores = cross_val_score(model, X, y, cv=cv, scoring='neg_mean_absolute_error', error_score='raise')
        except Exception as e_cv_score:
            print(f"Debug: cross_val_score failed for {model_name} with params {current_params}. Error: {e_cv_score}")
            return np.nan
        if len(scores) == 0 or np.all(np.isnan(scores)): return np.nan
        return -np.nanmean(scores)

    study = optuna.create_study(direction='minimize', sampler=optuna.samplers.TPESampler(seed=42, multivariate=True))
    # Reduced n_trials for faster example, increase for real tuning
    study.optimize(objective, n_trials=30, timeout=600, show_progress_bar=True)

    best_params_from_study = {**study.best_params, **gpu_params} # Combine best Optuna params with fixed GPU params
    if model_cls == CatBoostRegressor:
        best_params_from_study['verbose'] = 0
        if 'depth' in best_params_from_study and best_params_from_study['depth'] is not None: best_params_from_study['depth'] = int(best_params_from_study['depth'])

    return model_cls(**best_params_from_study, random_state=42)


def fit_models_on_gpu(X, y, asset_name_logging=""):
    cv = PurgedKFold(n_splits=8, embargo=10) # n_splits=5 for smaller datasets if needed
    num_potential_splits = 0
    if X.shape[0] >= cv.get_n_splits():
        num_potential_splits = sum(1 for _ in cv.split(X, y))

    # GPU parameters
    xgb_gpu_params = {'tree_method': 'gpu_hist', 'gpu_id': 0}
    lgbm_gpu_params = {'device': 'gpu', 'gpu_platform_id': 0, 'gpu_device_id': 0} # Check platform/device IDs if multiple GPUs/platforms
    catboost_gpu_params = {'task_type': 'GPU', 'devices': '0'}

    model_configs = {
        "LGBM_GPU": (lgb.LGBMRegressor, {
            "num_leaves": [31, 63, 127], "learning_rate": (1e-3, 0.2, 'log'),
            "max_depth": [-1, 6, 12], "n_estimators": [100, 200, 400] # Added n_estimators
        }, lgbm_gpu_params),
        "CatBoost_GPU": (CatBoostRegressor, {
            "depth": [4, 6, 8], "learning_rate": (1e-3, 0.2, 'log'),
            "iterations": [100, 200, 400] # iterations is n_estimators for CatBoost
        }, catboost_gpu_params),
        "XGB_GPU": (XGBRegressor, {
            "n_estimators": [100, 200, 400], "learning_rate": (0.01, 0.2, 'log'),
            "max_depth": [3, 6, 9]
        }, xgb_gpu_params),
        "RF_CPU": (RandomForestRegressor, { # RandomForest remains on CPU
            "n_estimators": [100, 200, 300], "max_depth": [8, 16, None]
        }, {}), # Empty dict for gpu_params for RF
    }

    if num_potential_splits == 0:
        print(f"Warning [{asset_name_logging}]: PurgedKFold yields no valid splits. Fitting models with default parameters (GPU where applicable).")
        fitted_models_fallback = {}
        for name, (model_cls, _, current_gpu_params) in model_configs.items():
            print(f"  Fitting fallback {name} for asset {asset_name_logging}...")
            try:
                params_with_gpu = current_gpu_params.copy()
                if model_cls == CatBoostRegressor: params_with_gpu['verbose'] = 0
                # Add default n_estimators if not in gpu_params
                if model_cls in [XGBRegressor, lgb.LGBMRegressor, RandomForestRegressor] and 'n_estimators' not in params_with_gpu:
                    if 'n_estimators' in model_configs[name][1]: # Check if defined in space
                         params_with_gpu['n_estimators'] = model_configs[name][1]['n_estimators'][0] # take first option
                    else: params_with_gpu['n_estimators'] = 100 # a general default
                if model_cls == CatBoostRegressor and 'iterations' not in params_with_gpu:
                     if 'iterations' in model_configs[name][1]:
                         params_with_gpu['iterations'] = model_configs[name][1]['iterations'][0]
                     else: params_with_gpu['iterations'] = 100


                model_instance = model_cls(**params_with_gpu, random_state=42)
                model_instance.fit(X, y)
                fitted_models_fallback[name] = model_instance
            except Exception as e_fit_fb:
                print(f"    ✗ Error fitting fallback {name} for asset {asset_name_logging}: {e_fit_fb}")
                traceback.print_exc()
        return fitted_models_fallback

    fitted_models = {}
    for name, (model_cls, param_space, current_gpu_params) in model_configs.items():
        print(f"  Tuning {name} for asset {asset_name_logging}...")
        try:
            tuned_model_instance = tune_model(model_cls, name, param_space, X, y, cv, gpu_params=current_gpu_params)
            # tune_model returns an unfitted model with best_params (including GPU params)
            print(f"    Final fitting for {name} on asset {asset_name_logging} with params: {tuned_model_instance.get_params()}")
            tuned_model_instance.fit(X, y) # Fit on the full dataset
            fitted_models[name] = tuned_model_instance
            print(f"    ✓ Tuned and fitted {name} for asset {asset_name_logging}.")
        except Exception as e_tune_fit:
            print(f"    ✗ Error during tuning or final fit for {name} on asset {asset_name_logging}: {type(e_tune_fit).__name__} - {e_tune_fit}")
            traceback.print_exc()
            print(f"    Attempting to fit {name} with default parameters (and GPU where applicable) for asset {asset_name_logging}.")
            try:
                default_params_with_gpu = current_gpu_params.copy()
                if model_cls == CatBoostRegressor: default_params_with_gpu['verbose'] = 0
                # Add default n_estimators if not in gpu_params
                if model_cls in [XGBRegressor, lgb.LGBMRegressor, RandomForestRegressor] and 'n_estimators' not in default_params_with_gpu:
                    default_params_with_gpu['n_estimators'] = 100 # A general default
                if model_cls == CatBoostRegressor and 'iterations' not in default_params_with_gpu:
                    default_params_with_gpu['iterations'] = 100

                default_model = model_cls(**default_params_with_gpu, random_state=42)
                default_model.fit(X, y)
                fitted_models[name] = default_model
                print(f"    ✓ Fitted {name} with default parameters for asset {asset_name_logging}.")
            except Exception as e_def_fit:
                print(f"      ✗ Failed default fit {name} for {asset_name_logging}: {e_def_fit}")
                traceback.print_exc()
    return fitted_models

# ──────────────── Train per Asset (Using GPU function) ────────────────
print("\n--- Training models per asset (GPU where applicable) ---")
asset_models = {}
for a in datasets.keys():
    print(f"\n--- Processing asset for training: {a} ---")
    try:
        if a not in datasets or not isinstance(datasets[a], tuple) or len(datasets[a]) != 2:
            print(f"  ✗ {a} failed: Dataset not found correctly in 'datasets' dict. Skipping.")
            asset_models[a] = {}
            continue
        X_current, y_current = datasets[a]
        if not isinstance(X_current, np.ndarray) or X_current.ndim != 2 or X_current.shape[0] == 0:
            print(f"  ✗ {a} failed: X data is empty or invalid. Shape: {getattr(X_current, 'shape', 'N/A')}. Skipping.")
            asset_models[a] = {}
            continue
        if not isinstance(y_current, np.ndarray) or y_current.ndim != 1 or y_current.shape[0] == 0:
            print(f"  ✗ {a} failed: y data is empty or invalid. Shape: {getattr(y_current, 'shape', 'N/A')}. Skipping.")
            asset_models[a] = {}
            continue

        print(f"Fitting models for {a} … (X shape: {X_current.shape}, y shape: {y_current.shape})")
        asset_models[a] = fit_models_on_gpu(X_current, y_current, asset_name_logging=a) # Call GPU version

        if asset_models.get(a): print(f"  ✓ {a} processing done. Models: {list(asset_models[a].keys())}")
        else: print(f"  ✗ {a} processing done, but no models were successfully fitted."); asset_models[a] = {}
    except Exception as e_asset_loop:
        print(f"  ✗ {a} failed in main asset loop → {type(e_asset_loop).__name__}: {e_asset_loop}"); traceback.print_exc(); asset_models[a] = {}

print("\n=== Trained Models Overview: ===")
if not asset_models: print("No assets processed or no models trained.")
else:
    all_empty = True
    for asset_k, models_v in asset_models.items():
        if models_v and isinstance(models_v, dict) and models_v.keys(): print(f"  Asset {asset_k}: Models - {list(models_v.keys())}"); all_empty=False
        else: print(f"  Asset {asset_k}: No models successfully fitted.")
    if all_empty: print("No models were successfully trained for any asset.")


# ───────────────────────────────── Backtesting Section ─────────────────────────────────
print("\n--- Preparing for Backtesting ---")
try: from IPython.display import display
except ImportError: display = print

def prepare_live_df(asset: str) -> pd.DataFrame:
    print(f"\nPreparing live dataframe for asset: {asset}")
    if asset not in raw_bt or raw_bt[asset].is_empty():
        print(f"Critical error: Raw backtest data for asset {asset} is missing or empty."); return pd.DataFrame()

    feats_pl = engineer_features(raw_bt[asset], asset)
    if feats_pl.is_empty():
        print(f"Warning: Feature engineering for backtest data of asset {asset} resulted in an empty DataFrame. Backtest may fail or be meaningless.")
        # Depending on strategy, might want to return empty df or df with only OHLCV if that's handled
        # For now, let's return an empty df to signal issue clearly to backtester
        return pd.DataFrame()

    feats_pd = feats_pl.to_pandas().reset_index(drop=True)
    live_pd_full = raw_bt[asset].to_pandas()

    if live_pd_full.empty: print(f"Critical Error: Raw backtest data for {asset} empty."); return pd.DataFrame()

    # Align based on potentially shorter feats_pd
    if not feats_pd.empty: live_pd_aligned = live_pd_full.iloc[-len(feats_pd):].copy().reset_index(drop=True)
    else: # This case should be rare if feats_pl check above is handled
        print(f"Warning: Features DataFrame is empty for {asset} during alignment. Using full live data for OHLCV, but features will be missing.");
        live_pd_aligned = live_pd_full.copy().reset_index(drop=True)

    pref = PREFIX[asset]
    rename_map = {}
    for col_std, col_orig_pattern in [("Open",f"{pref}_open"), ("High",f"{pref}_high"), ("Low",f"{pref}_low"), ("Close",f"{pref}_close"), ("Volume",f"{pref}_volume")]:
        if col_orig_pattern in live_pd_aligned.columns: rename_map[col_orig_pattern] = col_std
        elif col_std.lower() in live_pd_aligned.columns: rename_map[col_std.lower()] = col_std # Fallback to generic lower

    live_pd_renamed = live_pd_aligned.rename(columns=rename_map)
    final_df = live_pd_renamed.copy()

    if not feats_pd.empty: # Add features if they exist
        for col in feats_pd.columns:
            if col in final_df.columns and col not in ['Open','High','Low','Close','Volume']: # Avoid overwriting standard OHLCV if a feature has same name
                print(f"Warning: Feature col '{col}' for {asset} clashes. Overwriting.")
            final_df[col] = feats_pd[col].values # Assign features

    if 'ts' in final_df.columns:
        final_df['timestamp_dt'] = pd.to_datetime(final_df['ts'])
        final_df = final_df.set_index('timestamp_dt', drop=True)
    else: print(f"Warning: 'ts' col not for DatetimeIndex in {asset} backtest data.")

    required_bt_cols = ["Open", "High", "Low", "Close"]
    missing_bt_cols = [col for col in required_bt_cols if col not in final_df.columns]
    if missing_bt_cols: raise ValueError(f"Asset {asset}: Missing OHLC for Backtesting.py: {missing_bt_cols}. Cols: {final_df.columns.tolist()}")

    # Ensure OHLC are numeric and not all NaN
    for col in required_bt_cols:
        final_df[col] = pd.to_numeric(final_df[col], errors='coerce')
    if final_df[required_bt_cols].isnull().all().all(): # If ALL OHLC data is NaN
        print(f"Critical: All OHLC data for {asset} is NaN. Returning empty DataFrame.")
        return pd.DataFrame()

    print(f"Prepared live df for {asset}. Shape: {final_df.shape}, Index: {type(final_df.index)}")
    return final_df

def build_meta_strategy(asset):
    if not asset_models.get(asset) or not isinstance(asset_models[asset], dict) or not asset_models[asset].keys():
        print(f"Warning: No models for {asset}. Cannot build strategy."); return None

    current_asset_models_dict = asset_models[asset]
    # Filter out any models that might be None if fitting failed for them
    valid_models = {name: model for name, model in current_asset_models_dict.items() if model is not None}
    if not valid_models:
        print(f"Warning: No valid (non-None) models found for asset {asset} after filtering. Cannot build strategy."); return None

    weights = {k: 1/len(valid_models) for k in valid_models}

    class MetaVoteStrategy(Strategy):
        # Closure variables from outer scope
        _models_to_use_cl = valid_models
        _model_weights_cl = weights
        _trade_threshold_cl = 0.00 # Example: 0.00 for any signal, adjust as needed (e.g. 0.001 for 0.1% predicted return)
        _feature_names_list_cl = None

        def init(self):
            self.models_to_use = self._models_to_use_cl
            self.model_weights = self._model_weights_cl
            self.trade_threshold = self._trade_threshold_cl

            # One-time setup for feature names
            if MetaVoteStrategy._feature_names_list_cl is None and self.models_to_use:
                # Try to get feature names from the first model
                first_model_name = list(self.models_to_use.keys())[0]
                first_model = self.models_to_use[first_model_name]

                if hasattr(first_model, 'feature_name_'): # LightGBM
                    MetaVoteStrategy._feature_names_list_cl = first_model.feature_name_()
                elif hasattr(first_model, 'feature_names_in_'): # Scikit-learn, CatBoost (often)
                    MetaVoteStrategy._feature_names_list_cl = first_model.feature_names_in_
                elif hasattr(first_model, 'get_booster') and hasattr(first_model.get_booster(), 'feature_names'): # XGBoost
                     MetaVoteStrategy._feature_names_list_cl = first_model.get_booster().feature_names

                if MetaVoteStrategy._feature_names_list_cl is None: # Fallback
                    ohlcv_std = {'Open', 'High', 'Low', 'Close', 'Volume', 'ts', 'timestamp_dt'}
                    # Assuming self.data.df is available at init, might not be fully populated.
                    # This might be better done in the first call to _pred if df structure is not fixed at init.
                    # For now, this is a best guess.
                    if self.data and hasattr(self.data, 'df') and not self.data.df.empty:
                         MetaVoteStrategy._feature_names_list_cl = [col for col in self.data.df.columns if col not in ohlcv_std]
                    else: # Cannot determine feature names yet
                        print("Warning: Could not determine feature names at init in MetaVoteStrategy.")
            self.feature_names_list_ = MetaVoteStrategy._feature_names_list_cl


        def _pred(self):
            if not self.feature_names_list_:
                # Attempt to set up feature names if not done in init (e.g. self.data.df wasn't ready)
                if MetaVoteStrategy._feature_names_list_cl is None and self.models_to_use:
                    first_model_name = list(self.models_to_use.keys())[0]; first_model = self.models_to_use[first_model_name]
                    if hasattr(first_model, 'feature_name_'): MetaVoteStrategy._feature_names_list_cl = first_model.feature_name_()
                    elif hasattr(first_model, 'feature_names_in_'): MetaVoteStrategy._feature_names_list_cl = first_model.feature_names_in_
                    elif hasattr(first_model, 'get_booster') and hasattr(first_model.get_booster(), 'feature_names'): MetaVoteStrategy._feature_names_list_cl = first_model.get_booster().feature_names
                    if MetaVoteStrategy._feature_names_list_cl is None:
                        ohlcv_std = {'Open', 'High', 'Low', 'Close', 'Volume', 'ts', 'timestamp_dt'}
                        MetaVoteStrategy._feature_names_list_cl = [col for col in self.data.df.columns if col not in ohlcv_std]
                self.feature_names_list_ = MetaVoteStrategy._feature_names_list_cl
                if not self.feature_names_list_:
                    print("CRITICAL Error: Still cannot determine feature names for prediction in MetaVoteStrategy _pred.")
                    return 0

            try:
                current_feature_values = [self.data.df[fn].iloc[-1] for fn in self.feature_names_list_]
            except KeyError as e: print(f"KeyError in _pred for feature {e}. Available: {self.data.df.columns.tolist()}"); return 0
            except IndexError: print("IndexError in _pred, likely empty data slice."); return 0

            if np.isnan(current_feature_values).any(): return 0

            feature_array = np.array(current_feature_values).reshape(1, -1)
            preds = {}
            for n, m in self.models_to_use.items():
                try: preds[n] = m.predict(feature_array)[0]
                except Exception as e_p: print(f"Err predicting {n}: {e_p}. Assuming no change."); preds[n] = self.data.Close[-1]

            last_price = self.data.Close[-1]
            if last_price == 0 or np.isnan(last_price): return 0

            rets = {n: (p - last_price) / last_price if not np.isnan(p) else 0 for n, p in preds.items()}
            return sum(self.model_weights[n] * rets[n] for n in rets)

        def next(self):
            if not self.models_to_use: return
            # Heuristic buffer for TA stability in features.
            # If features are precomputed and NaNs handled, this might be less critical.
            if len(self.data.Close) < max(60, lags_from_config if 'lags_from_config' in globals() else 5) + 5 : # Ensure enough data for largest window + lags
                return
            signal = self._pred()
            if np.isnan(signal): signal = 0 # Handle NaN signal as neutral

            if signal > self.trade_threshold and not self.position.is_long:
                self.position.close(); self.buy()
            elif signal < -self.trade_threshold and not self.position.is_short:
                self.position.close(); self.sell()
    return MetaVoteStrategy

# ---------- run back‑test per asset ----------
print("\n--- Running Backtests ---")
stats = {}
equity_curves = {}
# Use keys from asset_models as these are the assets for which training was attempted
# This ensures we only try to backtest assets that might have models.
assets_to_backtest = list(asset_models.keys())

for asset_key_bt in assets_to_backtest:
    print(f"\n--- Running Backtest for: {asset_key_bt} ---")
    # Check again if models actually exist and are valid for this asset
    if not asset_models.get(asset_key_bt) or not any(asset_models[asset_key_bt].values()):
        print(f"Skipping backtest for {asset_key_bt}: No valid models were trained or found.")
        continue

    try:
        live_df_prepared = prepare_live_df(asset_key_bt)
        if live_df_prepared.empty or not isinstance(live_df_prepared.index, pd.DatetimeIndex):
            print(f"Skipping backtest for {asset_key_bt}: Live DataFrame unusable."); continue
        if live_df_prepared[['Open', 'High', 'Low', 'Close']].isnull().values.any():
            print(f"Skipping backtest for {asset_key_bt}: Live DataFrame OHLC NaNs."); continue

        StratBuilder = build_meta_strategy(asset_key_bt)
        if StratBuilder is None: print(f"Skipping {asset_key_bt}: Strategy not built."); continue

        bt_instance = Backtest(live_df_prepared, StratBuilder, cash=1_000_000, commission=0.0015, exclusive_orders=False)
        s_result = bt_instance.run()
        stats[asset_key_bt] = s_result
        if '_equity_curve' in s_result and not s_result['_equity_curve'].empty:
             equity_curves[asset_key_bt] = s_result['_equity_curve']['Equity']
        # Optional plot: bt_instance.plot(filename=f"backtest_{asset_key_bt}.html", open_browser=False)
    except Exception as e_bt_general:
        print(f"Error during backtest for {asset_key_bt}: {type(e_bt_general).__name__} - {e_bt_general}"); traceback.print_exc()

# ---------- summary table & equity curves ----------
if stats:
    summary_data = {}
    for a, s_val in stats.items():
        if hasattr(s_val, 'get'): # Handles Series/dict from Backtesting.py
            summary_data[a] = {
                "APR": s_val.get('Return (Ann.) [%]', np.nan),
                "Sharpe": s_val.get('Sharpe Ratio', np.nan),
                "MaxDD": s_val.get('Max. Drawdown [%]', np.nan),
                "WinRate": s_val.get('Win Rate [%]', np.nan) / 100 if pd.notna(s_val.get('Win Rate [%]')) else np.nan,
                "Trades": s_val.get('# Trades', 0),
                "Exposure": s_val.get('Exposure Time [%]', np.nan) / 100 if pd.notna(s_val.get('Exposure Time [%]')) else np.nan
            }
    summary = pd.DataFrame(summary_data).T.round(3)
    print("\n=== Hold‑out performance ===")
    if not summary.empty: display(summary)
    else: print("No summary data to display.")
else: print("\nNo backtesting statistics were generated.")

if equity_curves:
    plt.figure(figsize=(12,7))
    for asset_label_eq, curve_data in equity_curves.items():
        if not curve_data.empty: plt.plot(curve_data.index, curve_data, label=asset_label_eq.upper())
    plt.title("Equity Curves ‑ Hold‑out Period"); plt.xlabel("Date"); plt.ylabel("Equity ($)"); plt.legend(); plt.grid(True);
    # Save plot instead of showing if in non-interactive environment
    # plt.savefig("equity_curves.png")
    plt.show()
else: print("No equity curves to plot.")

print("\n--- Script End ---")

# Optional: Clean up the global temporary directory
# import shutil
# if '_temp_dir_for_dummy_files' in globals() and os.path.exists(_temp_dir_for_dummy_files) and _temp_dir_for_dummy_files != ".":
#     try:
#         print(f"INFO: Removing temporary directory: {_temp_dir_for_dummy_files}")
#         shutil.rmtree(_temp_dir_for_dummy_files)
#     except Exception as e_shutil:
#         print(f"Warning: Could not remove temporary directory {_temp_dir_for_dummy_files}: {e_shutil}")


Collecting catboost
  Downloading catboost-1.2.8-cp311-cp311-manylinux2014_x86_64.whl.metadata (1.2 kB)
Collecting optuna
  Downloading optuna-4.3.0-py3-none-any.whl.metadata (17 kB)
Collecting ta
  Downloading ta-0.11.0.tar.gz (25 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting Backtesting
  Downloading backtesting-0.6.4-py3-none-any.whl.metadata (7.0 kB)
Collecting alembic>=1.5.0 (from optuna)
  Downloading alembic-1.16.1-py3-none-any.whl.metadata (7.3 kB)
Collecting colorlog (from optuna)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Downloading catboost-1.2.8-cp311-cp311-manylinux2014_x86_64.whl (99.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.2/99.2 MB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading optuna-4.3.0-py3-none-any.whl (386 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m386.6/386.6 kB[0m [31m26.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading backtesting-0.6.4-py3-none-any.

  datetime_series = pd.to_datetime(pd.date_range(start='2024-01-01', periods=num_rows, freq='T'))
  datetime_series = pd.to_datetime(pd.date_range(start='2024-01-01', periods=num_rows, freq='T'))
  datetime_series = pd.to_datetime(pd.date_range(start='2024-01-01', periods=num_rows, freq='T'))


INFO: Dummy files, if created, will be stored in: /tmp/pydummy_ml_data_iw_5d0ho

--- Checking for training data files and creating dummies if necessary ---
INFO: File found at processed_data_ada_long.csv. Using existing file for asset 'ada'.
INFO: File found at processed_data_btc.csv. Using existing file for asset 'btc'.
INFO: File found at processed_data_eth_long.csv. Using existing file for asset 'eth'.
INFO: File found at processed_data_solana_long.csv. Using existing file for asset 'sol'.

--- Checking for backtest data files and creating dummies if necessary ---
INFO: File found at 2024_to_april_2025_ada_data.csv. Using existing file for asset 'ada'.
INFO: Successfully created dummy file for asset 'btc': /tmp/pydummy_ml_data_iw_5d0ho/2024_to_april_2025_btc_data.csv
INFO: Successfully created dummy file for asset 'eth': /tmp/pydummy_ml_data_iw_5d0ho/2024_to_april_2025_eth_data.csv
INFO: Successfully created dummy file for asset 'sol': /tmp/pydummy_ml_data_iw_5d0ho/2024_to_april_202

ComputeError: found more fields than defined in 'Schema'

Consider setting 'truncate_ragged_lines=True'.

INitialize and load in packages and data

In [None]:
!pip install polars pandas scikit-learn pyarrow catboost optuna lightgbm xgboost ta Backtesting matplotlib
import polars as pl
import pandas as pd
import numpy as np
from sklearn.model_selection import KFold, cross_val_score
from sklearn.preprocessing import StandardScaler # Not used in this version, but kept if needed later
from sklearn.metrics import classification_report, mean_absolute_error # classification_report not used
import ta
from sklearn.ensemble import RandomForestRegressor
from xgboost import XGBRegressor
from catboost import CatBoostRegressor
import lightgbm as lgb
import optuna
from backtesting import Backtest, Strategy
import matplotlib.pyplot as plt
import os
import tempfile
import traceback

# --- Configuration Section ---
# Define your intended TRAIN_PATHS and BACKTEST_PATHS.
# If your files are located elsewhere, use absolute paths here.
INTENDED_TRAIN_PATHS = {
    "ada": "ml_training_ada.csv",
    "btc": "ml_training_btc.csv",
    "eth": "ml_training_eth.csv",
    "sol": "ml_training_solana.csv",
}

INTENDED_BACKTEST_PATHS = {
    "ada": "2024_to_april_2025_ada_data.csv",
    "btc": "2024_to_april_2025_btc_data.csv",
    "eth": "2024_to_april_2025_eth_data.csv",
    "sol": "2024_to_april_2025_solana_data.csv",
}

TARGET_COL = {
    "ada": "target_next_close_ada",
    "btc": "target_next_close_btc",
    "eth": "target_next_close_eth",
    "sol": "target_next_close_solana",
}

PREFIX = {
    "ada": "ada",
    "btc": "btc",
    "eth": "eth",
    "sol": "solana",
}


In [None]:

# --- Global temporary directory for dummy files ---
try:
    _temp_dir_for_dummy_files = tempfile.mkdtemp(prefix="pydummy_ml_data_")
    print(f"INFO: Dummy files, if created, will be stored in: {_temp_dir_for_dummy_files}")
except Exception as e:
    print(f"WARNING: Could not create a global temporary directory ({e}). Falling back to current directory for dummy files, which might fail if read-only.")
    _temp_dir_for_dummy_files = "."


# --- Dummy file creation function (OSError Fix) ---
def create_dummy_csv_if_not_exists(asset_label, original_path_from_config, is_training, base_temp_dir):
    """
    Checks if a file exists at original_path_from_config.
    If not, creates a dummy CSV in base_temp_dir and returns its path.
    Otherwise, returns original_path_from_config.
    """
    if os.path.exists(original_path_from_config):
        print(f"INFO: File found at {original_path_from_config}. Using existing file for asset '{asset_label}'.")
        return original_path_from_config

    original_filename = os.path.basename(original_path_from_config)
    dummy_file_path = os.path.join(base_temp_dir, original_filename)
    print(f"WARNING: File {original_path_from_config} for asset '{asset_label}' not found. Creating a dummy CSV at: {dummy_file_path}")

    num_rows = 500 # Increased rows for more stable TA in dummy data
    if asset_label not in PREFIX:
        raise ValueError(f"Asset label '{asset_label}' not found in PREFIX. Cannot generate dummy columns.")
    if is_training and asset_label not in TARGET_COL:
         raise ValueError(f"Asset label '{asset_label}' not found in TARGET_COL. Cannot generate dummy target column.")

    asset_p = PREFIX[asset_label]
    dummy_data = {}

    if is_training:
        dummy_data['timestamp'] = pd.to_datetime(pd.date_range(start='2022-01-01', periods=num_rows, freq='T')).strftime('%Y-%m-%dT%H:%M:%S.%f')
    else:
        datetime_series = pd.to_datetime(pd.date_range(start='2024-01-01', periods=num_rows, freq='T'))
        dummy_data['date_only'] = datetime_series.strftime('%Y-%m-%d')
        dummy_data['time_only'] = datetime_series.strftime('%H:%M:%S')

    # More realistic OHLCV data
    price_open = np.abs(np.random.normal(loc=100, scale=20, size=num_rows))
    price_open[0] = 100 # Start at a defined point
    price_open = np.maximum(price_open, 1) # Ensure positive prices

    dummy_data[f'{asset_p}_open'] = price_open
    dummy_data[f'{asset_p}_high'] = price_open + np.abs(np.random.normal(loc=2, scale=1, size=num_rows))
    dummy_data[f'{asset_p}_low'] = price_open - np.abs(np.random.normal(loc=2, scale=1, size=num_rows))
    dummy_data[f'{asset_p}_low'] = np.minimum(dummy_data[f'{asset_p}_low'], dummy_data[f'{asset_p}_open']) # Ensure low <= open
    dummy_data[f'{asset_p}_low'] = np.maximum(dummy_data[f'{asset_p}_low'], 0.1) # Ensure positive low
    dummy_data[f'{asset_p}_high'] = np.maximum(dummy_data[f'{asset_p}_high'], dummy_data[f'{asset_p}_open']) # Ensure high >= open

    # Close price is within low and high
    dummy_data[f'{asset_p}_close'] = np.random.uniform(dummy_data[f'{asset_p}_low'], dummy_data[f'{asset_p}_high'])
    dummy_data[f'{asset_p}_volume'] = np.abs(np.random.normal(loc=1000, scale=500, size=num_rows)) + 100 # Ensure positive volume

    if is_training:
        target_col_name = TARGET_COL[asset_label]
        dummy_data[target_col_name] = dummy_data[f'{asset_p}_close'] * (1 + np.random.normal(loc=0, scale=0.01, size=num_rows))

    df = pd.DataFrame(dummy_data)
    try:
        os.makedirs(os.path.dirname(dummy_file_path), exist_ok=True)
        df.to_csv(dummy_file_path, index=False)
        print(f"INFO: Successfully created dummy file for asset '{asset_label}': {dummy_file_path}")
        return dummy_file_path
    except OSError as e:
        print(f"CRITICAL ERROR: Could not write dummy CSV for asset '{asset_label}' to {dummy_file_path}. OS error: {e}")
        raise

# --- Paths to be used by the script (potentially updated to dummy file paths) ---
ACTUAL_TRAIN_PATHS = INTENDED_TRAIN_PATHS.copy()
ACTUAL_BACKTEST_PATHS = INTENDED_BACKTEST_PATHS.copy()

print("\n--- Checking for training data files and creating dummies if necessary ---")
for asset_key in list(ACTUAL_TRAIN_PATHS.keys()):
    original_path = ACTUAL_TRAIN_PATHS[asset_key]
    ACTUAL_TRAIN_PATHS[asset_key] = create_dummy_csv_if_not_exists(
        asset_key, original_path, is_training=True, base_temp_dir=_temp_dir_for_dummy_files
    )

print("\n--- Checking for backtest data files and creating dummies if necessary ---")
for asset_key in list(ACTUAL_BACKTEST_PATHS.keys()):
    original_path = ACTUAL_BACKTEST_PATHS[asset_key]
    ACTUAL_BACKTEST_PATHS[asset_key] = create_dummy_csv_if_not_exists(
        asset_key, original_path, is_training=False, base_temp_dir=_temp_dir_for_dummy_files
    )

# --- Data Loading Functions ---
def load_training(asset):
    print(f"Loading training data for {asset} from: {ACTUAL_TRAIN_PATHS[asset]}")
    return (
        pl.read_csv(
            ACTUAL_TRAIN_PATHS[asset],
            infer_schema_length=100_000,
            truncate_ragged_lines=True # Added to handle ragged CSV lines
        )
        .with_columns(
            pl.col("timestamp")
              .str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%S%.f", strict=False)
              .alias("ts")
        )
        .sort("ts")
    )

def load_backtest(asset):
    print(f"Loading backtest data for {asset} from: {ACTUAL_BACKTEST_PATHS[asset]}")
    return (
        pl.read_csv(
            ACTUAL_BACKTEST_PATHS[asset],
            infer_schema_length=100_000,
            truncate_ragged_lines=True # Added to handle ragged CSV lines
        )
        .with_columns(
            (
                pl.concat_str([pl.col("date_only"), pl.lit(" "), pl.col("time_only")])
                .str.strptime(pl.Datetime, format="%Y-%m-%d %H:%M:%S", strict=False)
            ).alias("ts")
        )
        .sort("ts")
    )

print("\n--- Loading data ---")
raw_train = {a: load_training(a)  for a in ACTUAL_TRAIN_PATHS}
raw_bt    = {a: load_backtest(a)  for a in ACTUAL_BACKTEST_PATHS}


# --- Feature Engineering ---
def engineer_features(df: pl.DataFrame, asset: str, lags: int = 5) -> pl.DataFrame:
    if df.is_empty():
        print(f"Warning: Input DataFrame for engineer_features for asset '{asset}' is empty. Returning empty DataFrame.")
        return pl.DataFrame()

    pref = PREFIX[asset]
    ohlcv_cols_mapping = {}
    rename_map_for_ta = {} # This will map original column names to "open", "high", "low", "close", "volume"

    for col_type in ["open", "high", "low", "close", "volume"]:
        prefixed_name = f"{pref}_{col_type}"
        if prefixed_name in df.columns:
            rename_map_for_ta[prefixed_name] = col_type
        elif col_type in df.columns: # Fallback to generic name
            rename_map_for_ta[col_type] = col_type

    if not all(ct in rename_map_for_ta.values() for ct in ["high", "low", "close"]):
        raise ValueError(f"Asset {asset}: Could not find required HLC columns for TA. Found mappings: {rename_map_for_ta}. Available df columns: {df.columns}")

    base = df.select(list(rename_map_for_ta.keys())).rename(rename_map_for_ta)
    p = base.to_pandas()

    for col_name in ["open", "high", "low", "close", "volume"]:
        if col_name in p.columns:
            p[col_name] = pd.to_numeric(p[col_name], errors='coerce')

    p.fillna(method='ffill', inplace=True) # Fill NaNs before TA
    p.fillna(method='bfill', inplace=True) # Fill remaining NaNs at the beginning

    if p["close"].isnull().all() or p["high"].isnull().all() or p["low"].isnull().all():
        print(f"Warning: Asset {asset} - HLC columns are all NaN after processing. TA features will be NaN.")
        ta_feature_names = ["rsi14", "stochk", "roc10", "macd", "bb_w", "atr14", "obv"]
        for ta_f in ta_feature_names: p[ta_f] = np.nan
    else:
        p["rsi14"]  = ta.momentum.RSIIndicator(p["close"], 14, fillna=True).rsi()
        p["stochk"] = ta.momentum.StochasticOscillator(p["high"], p["low"], p["close"], 14, fillna=True).stoch()
        p["roc10"]  = ta.momentum.ROCIndicator(p["close"], 10, fillna=True).roc()
        p["macd"]   = ta.trend.MACD(p["close"], fillna=True).macd()
        p["bb_w"]   = ta.volatility.BollingerBands(p["close"], fillna=True).bollinger_wband()
        p["atr14"]  = ta.volatility.AverageTrueRange(p["high"], p["low"], p["close"], 14, fillna=True).average_true_range()
        if "volume" in p.columns and not p["volume"].isnull().all() and (p["volume"] != 0).any() :
            p["obv"] = ta.volume.OnBalanceVolumeIndicator(p["close"], p["volume"], fillna=True).on_balance_volume()
        else:
            p["obv"] = 0.0

    f = pl.from_pandas(p)
    cols_to_drop_after_ta = list(rename_map_for_ta.values())

    for w in (20, 60, 120):
        f = f.with_columns([
            pl.col("close").rolling_mean(w, min_periods=max(1, w//2)).alias(f"sma{w}"),
            pl.col("close").rolling_std(w, min_periods=max(1, w//2)).alias(f"vol{w}"),
            (pl.col("high") - pl.col("low")).rolling_mean(w, min_periods=max(1, w//2)).alias(f"hl_range{w}")
        ])
    for l_val in range(1, lags + 1):
        f = f.with_columns([
            pl.col("close").shift(l_val).alias(f"close_lag{l_val}"),
            pl.col("rsi14").shift(l_val).alias(f"rsi_lag{l_val}")
        ])
    return f.drop(cols_to_drop_after_ta)

# --- Verify counts ---
print("\n--- Verifying feature counts ---")
for asset_key in ACTUAL_TRAIN_PATHS:
    if asset_key in raw_train and not raw_train[asset_key].is_empty():
        try:
            engineered_df = engineer_features(raw_train[asset_key], asset_key)
            if not engineered_df.is_empty():
                feat_cols = engineered_df.columns
                print(f"{asset_key}: {len(feat_cols)} engineered vars → {feat_cols[:8]}")
            else:
                print(f"{asset_key}: Feature engineering resulted in an empty DataFrame.")
        except Exception as e_eng:
            print(f"Error engineering features for {asset_key}: {e_eng}")
            traceback.print_exc()
    else:
        print(f"{asset_key}: Raw training data is empty or not loaded for feature count verification.")


# --- Build X / y ---
def build_matrix(asset: str):
    if asset not in raw_train or raw_train[asset].is_empty():
        print(f"Warning: Asset {asset} - Raw training data is empty for build_matrix.")
        return np.array([]).reshape(0,0), np.array([])

    feats = engineer_features(raw_train[asset], asset)

    if feats.is_empty():
        print(f"Warning: Asset {asset} - Features DataFrame is empty before combining with target.")
        return np.array([]).reshape(0,0), np.array([])

    target_column_name = TARGET_COL[asset]
    if target_column_name not in raw_train[asset].columns:
        raise ValueError(f"Target column '{target_column_name}' not found in raw_train['{asset}']. Available: {raw_train[asset].columns}")

    num_feat_cols = feats.width

    # Combine features with target and then drop NaNs
    # Ensure target is aligned with features (Polars does this by row number if not joining by key)
    # It's safer to ensure row counts match or handle alignment explicitly if needed.
    # Assuming raw_train[asset] and feats have compatible row structures before this point.
    # If engineer_features drops rows, this needs careful handling.
    # For simplicity, assuming engineer_features preserves row order and count relative to what's needed for target.

    # A robust way: convert target to series, add as column, then drop nulls.
    # Polars handles Series addition by matching row numbers if DataFrame shapes are compatible.
    target_series = raw_train[asset][target_column_name]

    # If feature engineering changed row count (e.g. due to shifts creating initial NaNs that are not dropped yet)
    # we need to align target_series. This is complex.
    # Simpler: assume engineer_features outputs df with same row count as input for now, or handles it.
    # The drop_nulls after with_columns is key.

    mat = feats.with_columns(target_series.alias(target_column_name)).drop_nulls()

    if mat.is_empty():
        print(f"Warning: Asset {asset} - DataFrame is empty after combining features, target, and drop_nulls().")
        return np.array([]).reshape(0, num_feat_cols), np.array([])

    X = mat.drop(target_column_name).to_numpy()
    y = mat[target_column_name].to_numpy()
    return X, y

print("\n--- Building datasets (X, y matrices) ---")
datasets = {a: build_matrix(a) for a in ACTUAL_TRAIN_PATHS.keys()}

for a_key,(X_val,y_val) in datasets.items():
    print(f"{a_key}: {X_val.shape[1] if X_val.ndim == 2 and X_val.shape[0] > 0 else 0} features  |  samples {X_val.shape[0]}")

lengths = {a: y.shape[0] for a, (X, y) in datasets.items() if X.ndim == 2 and X.shape[0] > 0 and y.ndim == 1 and y.shape[0] > 0}
if not lengths:
    print("CRITICAL: All datasets are empty or have zero length after build_matrix. Cannot proceed with trimming or training.")
    min_n = 0
else:
    min_n = min(lengths.values())

print("Samples before trim (from valid datasets):", lengths)
print(f"→ Trimming all to {min_n} rows (smallest non-empty dataset's length)")

datasets_even = {}
for a_d, (X_d, y_d) in datasets.items():
    if X_d.ndim == 2 and X_d.shape[0] > 0 and y_d.ndim == 1 and y_d.shape[0] > 0 and X_d.shape[0] >= min_n and min_n > 0 :
        datasets_even[a_d] = (X_d[-min_n:], y_d[-min_n:])
    else:
        datasets_even[a_d] = (X_d, y_d) # Keep as is if conditions not met

for a_de, (X_e, y_e) in datasets_even.items():
    print(f"{a_de}: {X_e.shape[0]} samples, {X_e.shape[1] if X_e.ndim==2 and X_e.shape[0]>0 else 0} features after trimming attempt")

datasets = datasets_even


Model Tuning

In [None]:

# ───────────────────────────────────────── 2. Model Tuning ─────────────────────────────────────────
print("\n--- Model Tuning Setup ---")

class PurgedKFold(KFold):
    def __init__(self, n_splits=8, embargo=10):
        super().__init__(n_splits=n_splits, shuffle=False)
        self.embargo = embargo

    def split(self, X, y=None, groups=None):
        if not hasattr(X, 'shape') or not hasattr(X, '__len__'): X_arr = np.asarray(X)
        else: X_arr = X
        if len(X_arr) < self.get_n_splits(X_arr, y, groups): return
        for train_idx, test_idx in super().split(X_arr, y, groups):
            purged_train_idx = train_idx[train_idx < test_idx[0] - self.embargo]
            if len(purged_train_idx) == 0: continue
            yield purged_train_idx, test_idx

def tune_model(model_cls, model_name: str, param_space, X, y, cv, gpu_params=None):
    if gpu_params is None: gpu_params = {}
    def objective(trial):
        params = {}
        for k, v_config in param_space.items():
            if isinstance(v_config, list): params[k] = trial.suggest_categorical(k, v_config)
            elif isinstance(v_config, tuple) and len(v_config) == 3 and v_config[2] == 'log': params[k] = trial.suggest_float(k, v_config[0], v_config[1], log=True)
            elif isinstance(v_config, tuple) and len(v_config) == 2: params[k] = trial.suggest_float(k, v_config[0], v_config[1])
            else: print(f"Warning: Unsupported param_space config for {k}: {v_config} in {model_name}.")

        current_params = {**params, **gpu_params} # Merge Optuna params with fixed GPU params

        if model_cls == CatBoostRegressor:
            current_params['verbose'] = 0
            if 'depth' in current_params and current_params['depth'] is not None: current_params['depth'] = int(current_params['depth'])

        # For LGBM, n_estimators is often passed as 'n_rounds' in CV, but here it's a direct param.
        # Max_depth = -1 is no limit for LGBM.
        # For XGB, n_estimators is a direct param.

        model = model_cls(**current_params, random_state=42)
        try:
            scores = cross_val_score(model, X, y, cv=cv, scoring='neg_mean_absolute_error', error_score='raise')
        except Exception as e_cv_score:
            print(f"Debug: cross_val_score failed for {model_name} with params {current_params}. Error: {e_cv_score}")
            return np.nan
        if len(scores) == 0 or np.all(np.isnan(scores)): return np.nan
        return -np.nanmean(scores)

    study = optuna.create_study(direction='minimize', sampler=optuna.samplers.TPESampler(seed=42, multivariate=True))
    # Reduced n_trials for faster example, increase for real tuning
    study.optimize(objective, n_trials=30, timeout=600, show_progress_bar=True)

    best_params_from_study = {**study.best_params, **gpu_params} # Combine best Optuna params with fixed GPU params
    if model_cls == CatBoostRegressor:
        best_params_from_study['verbose'] = 0
        if 'depth' in best_params_from_study and best_params_from_study['depth'] is not None: best_params_from_study['depth'] = int(best_params_from_study['depth'])

    return model_cls(**best_params_from_study, random_state=42)


def fit_models_on_gpu(X, y, asset_name_logging=""):
    cv = PurgedKFold(n_splits=8, embargo=10) # n_splits=5 for smaller datasets if needed
    num_potential_splits = 0
    if X.shape[0] >= cv.get_n_splits():
        num_potential_splits = sum(1 for _ in cv.split(X, y))

    # GPU parameters
    xgb_gpu_params = {'tree_method': 'gpu_hist', 'gpu_id': 0}
    lgbm_gpu_params = {'device': 'gpu', 'gpu_platform_id': 0, 'gpu_device_id': 0} # Check platform/device IDs if multiple GPUs/platforms
    catboost_gpu_params = {'task_type': 'GPU', 'devices': '0'}

    model_configs = {
        "LGBM_GPU": (lgb.LGBMRegressor, {
            "num_leaves": [31, 63, 127], "learning_rate": (1e-3, 0.2, 'log'),
            "max_depth": [-1, 6, 12], "n_estimators": [100, 200, 400] # Added n_estimators
        }, lgbm_gpu_params),
        "CatBoost_GPU": (CatBoostRegressor, {
            "depth": [4, 6, 8], "learning_rate": (1e-3, 0.2, 'log'),
            "iterations": [100, 200, 400] # iterations is n_estimators for CatBoost
        }, catboost_gpu_params),
        "XGB_GPU": (XGBRegressor, {
            "n_estimators": [100, 200, 400], "learning_rate": (0.01, 0.2, 'log'),
            "max_depth": [3, 6, 9]
        }, xgb_gpu_params),
        "RF_CPU": (RandomForestRegressor, { # RandomForest remains on CPU
            "n_estimators": [100, 200, 300], "max_depth": [8, 16, None]
        }, {}), # Empty dict for gpu_params for RF
    }

    if num_potential_splits == 0:
        print(f"Warning [{asset_name_logging}]: PurgedKFold yields no valid splits. Fitting models with default parameters (GPU where applicable).")
        fitted_models_fallback = {}
        for name, (model_cls, _, current_gpu_params) in model_configs.items():
            print(f"  Fitting fallback {name} for asset {asset_name_logging}...")
            try:
                params_with_gpu = current_gpu_params.copy()
                if model_cls == CatBoostRegressor: params_with_gpu['verbose'] = 0
                # Add default n_estimators if not in gpu_params
                if model_cls in [XGBRegressor, lgb.LGBMRegressor, RandomForestRegressor] and 'n_estimators' not in params_with_gpu:
                    if 'n_estimators' in model_configs[name][1]: # Check if defined in space
                         params_with_gpu['n_estimators'] = model_configs[name][1]['n_estimators'][0] # take first option
                    else: params_with_gpu['n_estimators'] = 100 # a general default
                if model_cls == CatBoostRegressor and 'iterations' not in params_with_gpu:
                     if 'iterations' in model_configs[name][1]:
                         params_with_gpu['iterations'] = model_configs[name][1]['iterations'][0]
                     else: params_with_gpu['iterations'] = 100


                model_instance = model_cls(**params_with_gpu, random_state=42)
                model_instance.fit(X, y)
                fitted_models_fallback[name] = model_instance
            except Exception as e_fit_fb:
                print(f"    ✗ Error fitting fallback {name} for asset {asset_name_logging}: {e_fit_fb}")
                traceback.print_exc()
        return fitted_models_fallback

    fitted_models = {}
    for name, (model_cls, param_space, current_gpu_params) in model_configs.items():
        print(f"  Tuning {name} for asset {asset_name_logging}...")
        try:
            tuned_model_instance = tune_model(model_cls, name, param_space, X, y, cv, gpu_params=current_gpu_params)
            # tune_model returns an unfitted model with best_params (including GPU params)
            print(f"    Final fitting for {name} on asset {asset_name_logging} with params: {tuned_model_instance.get_params()}")
            tuned_model_instance.fit(X, y) # Fit on the full dataset
            fitted_models[name] = tuned_model_instance
            print(f"    ✓ Tuned and fitted {name} for asset {asset_name_logging}.")
        except Exception as e_tune_fit:
            print(f"    ✗ Error during tuning or final fit for {name} on asset {asset_name_logging}: {type(e_tune_fit).__name__} - {e_tune_fit}")
            traceback.print_exc()
            print(f"    Attempting to fit {name} with default parameters (and GPU where applicable) for asset {asset_name_logging}.")
            try:
                default_params_with_gpu = current_gpu_params.copy()
                if model_cls == CatBoostRegressor: default_params_with_gpu['verbose'] = 0
                # Add default n_estimators if not in gpu_params
                if model_cls in [XGBRegressor, lgb.LGBMRegressor, RandomForestRegressor] and 'n_estimators' not in default_params_with_gpu:
                    default_params_with_gpu['n_estimators'] = 100 # A general default
                if model_cls == CatBoostRegressor and 'iterations' not in default_params_with_gpu:
                    default_params_with_gpu['iterations'] = 100

                default_model = model_cls(**default_params_with_gpu, random_state=42)
                default_model.fit(X, y)
                fitted_models[name] = default_model
                print(f"    ✓ Fitted {name} with default parameters for asset {asset_name_logging}.")
            except Exception as e_def_fit:
                print(f"      ✗ Failed default fit {name} for {asset_name_logging}: {e_def_fit}")
                traceback.print_exc()
    return fitted_models


Training per asset

In [None]:

# ──────────────── Train per Asset (Using GPU function) ────────────────
print("\n--- Training models per asset (GPU where applicable) ---")
asset_models = {}
for a in datasets.keys():
    print(f"\n--- Processing asset for training: {a} ---")
    try:
        if a not in datasets or not isinstance(datasets[a], tuple) or len(datasets[a]) != 2:
            print(f"  ✗ {a} failed: Dataset not found correctly in 'datasets' dict. Skipping.")
            asset_models[a] = {}
            continue
        X_current, y_current = datasets[a]
        if not isinstance(X_current, np.ndarray) or X_current.ndim != 2 or X_current.shape[0] == 0:
            print(f"  ✗ {a} failed: X data is empty or invalid. Shape: {getattr(X_current, 'shape', 'N/A')}. Skipping.")
            asset_models[a] = {}
            continue
        if not isinstance(y_current, np.ndarray) or y_current.ndim != 1 or y_current.shape[0] == 0:
            print(f"  ✗ {a} failed: y data is empty or invalid. Shape: {getattr(y_current, 'shape', 'N/A')}. Skipping.")
            asset_models[a] = {}
            continue

        print(f"Fitting models for {a} … (X shape: {X_current.shape}, y shape: {y_current.shape})")
        asset_models[a] = fit_models_on_gpu(X_current, y_current, asset_name_logging=a) # Call GPU version

        if asset_models.get(a): print(f"  ✓ {a} processing done. Models: {list(asset_models[a].keys())}")
        else: print(f"  ✗ {a} processing done, but no models were successfully fitted."); asset_models[a] = {}
    except Exception as e_asset_loop:
        print(f"  ✗ {a} failed in main asset loop → {type(e_asset_loop).__name__}: {e_asset_loop}"); traceback.print_exc(); asset_models[a] = {}

print("\n=== Trained Models Overview: ===")
if not asset_models: print("No assets processed or no models trained.")
else:
    all_empty = True
    for asset_k, models_v in asset_models.items():
        if models_v and isinstance(models_v, dict) and models_v.keys(): print(f"  Asset {asset_k}: Models - {list(models_v.keys())}"); all_empty=False
        else: print(f"  Asset {asset_k}: No models successfully fitted.")
    if all_empty: print("No models were successfully trained for any asset.")



Backtesting

In [None]:

# ───────────────────────────────── Backtesting Section ─────────────────────────────────
print("\n--- Preparing for Backtesting ---")
try: from IPython.display import display
except ImportError: display = print

def prepare_live_df(asset: str) -> pd.DataFrame:
    print(f"\nPreparing live dataframe for asset: {asset}")
    if asset not in raw_bt or raw_bt[asset].is_empty():
        print(f"Critical error: Raw backtest data for asset {asset} is missing or empty."); return pd.DataFrame()

    feats_pl = engineer_features(raw_bt[asset], asset)
    if feats_pl.is_empty():
        print(f"Warning: Feature engineering for backtest data of asset {asset} resulted in an empty DataFrame. Backtest may fail or be meaningless.")
        # Depending on strategy, might want to return empty df or df with only OHLCV if that's handled
        # For now, let's return an empty df to signal issue clearly to backtester
        return pd.DataFrame()

    feats_pd = feats_pl.to_pandas().reset_index(drop=True)
    live_pd_full = raw_bt[asset].to_pandas()

    if live_pd_full.empty: print(f"Critical Error: Raw backtest data for {asset} empty."); return pd.DataFrame()

    # Align based on potentially shorter feats_pd
    if not feats_pd.empty: live_pd_aligned = live_pd_full.iloc[-len(feats_pd):].copy().reset_index(drop=True)
    else: # This case should be rare if feats_pl check above is handled
        print(f"Warning: Features DataFrame is empty for {asset} during alignment. Using full live data for OHLCV, but features will be missing.");
        live_pd_aligned = live_pd_full.copy().reset_index(drop=True)

    pref = PREFIX[asset]
    rename_map = {}
    for col_std, col_orig_pattern in [("Open",f"{pref}_open"), ("High",f"{pref}_high"), ("Low",f"{pref}_low"), ("Close",f"{pref}_close"), ("Volume",f"{pref}_volume")]:
        if col_orig_pattern in live_pd_aligned.columns: rename_map[col_orig_pattern] = col_std
        elif col_std.lower() in live_pd_aligned.columns: rename_map[col_std.lower()] = col_std # Fallback to generic lower

    live_pd_renamed = live_pd_aligned.rename(columns=rename_map)
    final_df = live_pd_renamed.copy()

    if not feats_pd.empty: # Add features if they exist
        for col in feats_pd.columns:
            if col in final_df.columns and col not in ['Open','High','Low','Close','Volume']: # Avoid overwriting standard OHLCV if a feature has same name
                print(f"Warning: Feature col '{col}' for {asset} clashes. Overwriting.")
            final_df[col] = feats_pd[col].values # Assign features

    if 'ts' in final_df.columns:
        final_df['timestamp_dt'] = pd.to_datetime(final_df['ts'])
        final_df = final_df.set_index('timestamp_dt', drop=True)
    else: print(f"Warning: 'ts' col not for DatetimeIndex in {asset} backtest data.")

    required_bt_cols = ["Open", "High", "Low", "Close"]
    missing_bt_cols = [col for col in required_bt_cols if col not in final_df.columns]
    if missing_bt_cols: raise ValueError(f"Asset {asset}: Missing OHLC for Backtesting.py: {missing_bt_cols}. Cols: {final_df.columns.tolist()}")

    # Ensure OHLC are numeric and not all NaN
    for col in required_bt_cols:
        final_df[col] = pd.to_numeric(final_df[col], errors='coerce')
    if final_df[required_bt_cols].isnull().all().all(): # If ALL OHLC data is NaN
        print(f"Critical: All OHLC data for {asset} is NaN. Returning empty DataFrame.")
        return pd.DataFrame()

    print(f"Prepared live df for {asset}. Shape: {final_df.shape}, Index: {type(final_df.index)}")
    return final_df

def build_meta_strategy(asset):
    if not asset_models.get(asset) or not isinstance(asset_models[asset], dict) or not asset_models[asset].keys():
        print(f"Warning: No models for {asset}. Cannot build strategy."); return None

    current_asset_models_dict = asset_models[asset]
    # Filter out any models that might be None if fitting failed for them
    valid_models = {name: model for name, model in current_asset_models_dict.items() if model is not None}
    if not valid_models:
        print(f"Warning: No valid (non-None) models found for asset {asset} after filtering. Cannot build strategy."); return None

    weights = {k: 1/len(valid_models) for k in valid_models}

    class MetaVoteStrategy(Strategy):
        # Closure variables from outer scope
        _models_to_use_cl = valid_models
        _model_weights_cl = weights
        _trade_threshold_cl = 0.00 # Example: 0.00 for any signal, adjust as needed (e.g. 0.001 for 0.1% predicted return)
        _feature_names_list_cl = None

        def init(self):
            self.models_to_use = self._models_to_use_cl
            self.model_weights = self._model_weights_cl
            self.trade_threshold = self._trade_threshold_cl

            # One-time setup for feature names
            if MetaVoteStrategy._feature_names_list_cl is None and self.models_to_use:
                # Try to get feature names from the first model
                first_model_name = list(self.models_to_use.keys())[0]
                first_model = self.models_to_use[first_model_name]

                if hasattr(first_model, 'feature_name_'): # LightGBM
                    MetaVoteStrategy._feature_names_list_cl = first_model.feature_name_()
                elif hasattr(first_model, 'feature_names_in_'): # Scikit-learn, CatBoost (often)
                    MetaVoteStrategy._feature_names_list_cl = first_model.feature_names_in_
                elif hasattr(first_model, 'get_booster') and hasattr(first_model.get_booster(), 'feature_names'): # XGBoost
                     MetaVoteStrategy._feature_names_list_cl = first_model.get_booster().feature_names

                if MetaVoteStrategy._feature_names_list_cl is None: # Fallback
                    ohlcv_std = {'Open', 'High', 'Low', 'Close', 'Volume', 'ts', 'timestamp_dt'}
                    # Assuming self.data.df is available at init, might not be fully populated.
                    # This might be better done in the first call to _pred if df structure is not fixed at init.
                    # For now, this is a best guess.
                    if self.data and hasattr(self.data, 'df') and not self.data.df.empty:
                         MetaVoteStrategy._feature_names_list_cl = [col for col in self.data.df.columns if col not in ohlcv_std]
                    else: # Cannot determine feature names yet
                        print("Warning: Could not determine feature names at init in MetaVoteStrategy.")
            self.feature_names_list_ = MetaVoteStrategy._feature_names_list_cl


        def _pred(self):
            if not self.feature_names_list_:
                # Attempt to set up feature names if not done in init (e.g. self.data.df wasn't ready)
                if MetaVoteStrategy._feature_names_list_cl is None and self.models_to_use:
                    first_model_name = list(self.models_to_use.keys())[0]; first_model = self.models_to_use[first_model_name]
                    if hasattr(first_model, 'feature_name_'): MetaVoteStrategy._feature_names_list_cl = first_model.feature_name_()
                    elif hasattr(first_model, 'feature_names_in_'): MetaVoteStrategy._feature_names_list_cl = first_model.feature_names_in_
                    elif hasattr(first_model, 'get_booster') and hasattr(first_model.get_booster(), 'feature_names'): MetaVoteStrategy._feature_names_list_cl = first_model.get_booster().feature_names
                    if MetaVoteStrategy._feature_names_list_cl is None:
                        ohlcv_std = {'Open', 'High', 'Low', 'Close', 'Volume', 'ts', 'timestamp_dt'}
                        MetaVoteStrategy._feature_names_list_cl = [col for col in self.data.df.columns if col not in ohlcv_std]
                self.feature_names_list_ = MetaVoteStrategy._feature_names_list_cl
                if not self.feature_names_list_:
                    print("CRITICAL Error: Still cannot determine feature names for prediction in MetaVoteStrategy _pred.")
                    return 0

            try:
                current_feature_values = [self.data.df[fn].iloc[-1] for fn in self.feature_names_list_]
            except KeyError as e: print(f"KeyError in _pred for feature {e}. Available: {self.data.df.columns.tolist()}"); return 0
            except IndexError: print("IndexError in _pred, likely empty data slice."); return 0

            if np.isnan(current_feature_values).any(): return 0

            feature_array = np.array(current_feature_values).reshape(1, -1)
            preds = {}
            for n, m in self.models_to_use.items():
                try: preds[n] = m.predict(feature_array)[0]
                except Exception as e_p: print(f"Err predicting {n}: {e_p}. Assuming no change."); preds[n] = self.data.Close[-1]

            last_price = self.data.Close[-1]
            if last_price == 0 or np.isnan(last_price): return 0

            rets = {n: (p - last_price) / last_price if not np.isnan(p) else 0 for n, p in preds.items()}
            return sum(self.model_weights[n] * rets[n] for n in rets)

        def next(self):
            if not self.models_to_use: return
            # Heuristic buffer for TA stability in features.
            # If features are precomputed and NaNs handled, this might be less critical.
            if len(self.data.Close) < max(60, lags_from_config if 'lags_from_config' in globals() else 5) + 5 : # Ensure enough data for largest window + lags
                return
            signal = self._pred()
            if np.isnan(signal): signal = 0 # Handle NaN signal as neutral

            if signal > self.trade_threshold and not self.position.is_long:
                self.position.close(); self.buy()
            elif signal < -self.trade_threshold and not self.position.is_short:
                self.position.close(); self.sell()
    return MetaVoteStrategy


Backtesting per asset

In [None]:

# ---------- run back‑test per asset ----------
print("\n--- Running Backtests ---")
stats = {}
equity_curves = {}
# Use keys from asset_models as these are the assets for which training was attempted
# This ensures we only try to backtest assets that might have models.
assets_to_backtest = list(asset_models.keys())

for asset_key_bt in assets_to_backtest:
    print(f"\n--- Running Backtest for: {asset_key_bt} ---")
    # Check again if models actually exist and are valid for this asset
    if not asset_models.get(asset_key_bt) or not any(asset_models[asset_key_bt].values()):
        print(f"Skipping backtest for {asset_key_bt}: No valid models were trained or found.")
        continue

    try:
        live_df_prepared = prepare_live_df(asset_key_bt)
        if live_df_prepared.empty or not isinstance(live_df_prepared.index, pd.DatetimeIndex):
            print(f"Skipping backtest for {asset_key_bt}: Live DataFrame unusable."); continue
        if live_df_prepared[['Open', 'High', 'Low', 'Close']].isnull().values.any():
            print(f"Skipping backtest for {asset_key_bt}: Live DataFrame OHLC NaNs."); continue

        StratBuilder = build_meta_strategy(asset_key_bt)
        if StratBuilder is None: print(f"Skipping {asset_key_bt}: Strategy not built."); continue

        bt_instance = Backtest(live_df_prepared, StratBuilder, cash=1_000_000, commission=0.0015, exclusive_orders=False)
        s_result = bt_instance.run()
        stats[asset_key_bt] = s_result
        if '_equity_curve' in s_result and not s_result['_equity_curve'].empty:
             equity_curves[asset_key_bt] = s_result['_equity_curve']['Equity']
        # Optional plot: bt_instance.plot(filename=f"backtest_{asset_key_bt}.html", open_browser=False)
    except Exception as e_bt_general:
        print(f"Error during backtest for {asset_key_bt}: {type(e_bt_general).__name__} - {e_bt_general}"); traceback.print_exc()

# ---------- summary table & equity curves ----------
if stats:
    summary_data = {}
    for a, s_val in stats.items():
        if hasattr(s_val, 'get'): # Handles Series/dict from Backtesting.py
            summary_data[a] = {
                "APR": s_val.get('Return (Ann.) [%]', np.nan),
                "Sharpe": s_val.get('Sharpe Ratio', np.nan),
                "MaxDD": s_val.get('Max. Drawdown [%]', np.nan),
                "WinRate": s_val.get('Win Rate [%]', np.nan) / 100 if pd.notna(s_val.get('Win Rate [%]')) else np.nan,
                "Trades": s_val.get('# Trades', 0),
                "Exposure": s_val.get('Exposure Time [%]', np.nan) / 100 if pd.notna(s_val.get('Exposure Time [%]')) else np.nan
            }
    summary = pd.DataFrame(summary_data).T.round(3)
    print("\n=== Hold‑out performance ===")
    if not summary.empty: display(summary)
    else: print("No summary data to display.")
else: print("\nNo backtesting statistics were generated.")

if equity_curves:
    plt.figure(figsize=(12,7))
    for asset_label_eq, curve_data in equity_curves.items():
        if not curve_data.empty: plt.plot(curve_data.index, curve_data, label=asset_label_eq.upper())
    plt.title("Equity Curves ‑ Hold‑out Period"); plt.xlabel("Date"); plt.ylabel("Equity ($)"); plt.legend(); plt.grid(True);
    # Save plot instead of showing if in non-interactive environment
    # plt.savefig("equity_curves.png")
    plt.show()
else: print("No equity curves to plot.")

print("\n--- Script End ---")

# Optional: Clean up the global temporary directory
# import shutil
# if '_temp_dir_for_dummy_files' in globals() and os.path.exists(_temp_dir_for_dummy_files) and _temp_dir_for_dummy_files != ".":
#     try:
#         print(f"INFO: Removing temporary directory: {_temp_dir_for_dummy_files}")
#         shutil.rmtree(_temp_dir_for_dummy_files)
#     except Exception as e_shutil:
#         print(f"Warning: Could not remove temporary directory {_temp_dir_for_dummy_files}: {e_shutil}")
