In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import logging
from new_strategy import Asset, BetSizingMethod, get_bet_sizing
import nbimporter
from backtest import Backtest
from meta_strategy import MetaLabelingStrategy
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.calibration import CalibratedClassifierCV
from xgboost import XGBClassifier
import matplotlib.pyplot as plt
from lightgbm import LGBMClassifier
import shap
import os
from datetime import datetime
import optuna
from sklearn.model_selection import cross_val_score
from sklearn.metrics import make_scorer, f1_score
from sklearn.model_selection import TimeSeriesSplit

%load_ext autoreload
%autoreload 2

In [None]:
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# ---------------------- MetaModelHandler ---------------------- #
class MetaModelHandler:
    def __init__(self):
        self.long_model = None
        self.short_model = None
        self.long_scaler = None
        self.short_scaler = None
        self.feature_cols = []

    def train(
        self, 
        trades_df: pd.DataFrame, 
        long_feature_cols: list, 
        short_feature_cols: list,
        asset_name: str,
        method_name: str
    ):
        self.long_feature_cols = long_feature_cols
        self.short_feature_cols = short_feature_cols

        trades_df = trades_df.dropna(subset=['meta_label'])

        long_trades = trades_df[trades_df['direction'] == 'long'].dropna(subset=long_feature_cols)
        short_trades = trades_df[trades_df['direction'] == 'short'].dropna(subset=short_feature_cols)

        def preprocess(df, cols):
            X = df[cols]
            y = df['meta_label']
            scaler = StandardScaler()
            X_scaled = scaler.fit_transform(X)
            return X_scaled, y, scaler

        X_long, y_long, self.long_scaler = preprocess(long_trades, long_feature_cols)
        X_short, y_short, self.short_scaler = preprocess(short_trades, short_feature_cols)

        def _sanity_check(X, y, label):
            print(f"\n📊 Sanity Check for {label} dataset")
            print("  → Shape:", X.shape)
            print("  → NaNs in X:", np.isnan(X).sum())
            print("  → All-zero columns:", (X == 0).all(axis=0).sum())
            print("  → y balance:", np.bincount(y.astype(int)) if len(np.unique(y)) == 2 else y.value_counts())

        _sanity_check(X_long, y_long, "LONG")
        _sanity_check(X_short, y_short, "SHORT")


        # Tune long model
        print("[Optuna] Tuning LONG model...")
        best_long_params = optimize_model(X_long, y_long, n_trials=50)
        self.long_model = LGBMClassifier(**best_long_params)
        self.long_model.fit(X_long, y_long)

        # Tune short model
        print("[Optuna] Tuning SHORT model...")
        best_short_params = optimize_model(X_short, y_short, n_trials=50)
        self.short_model = LGBMClassifier(**best_short_params)
        self.short_model.fit(X_short, y_short)

        self.plot_feature_importance(self.long_model, long_feature_cols, "Long Trades")
        self.plot_feature_importance(self.short_model, short_feature_cols, "Short Trades")

                # Convert to DataFrame for SHAP
        X_long_df = pd.DataFrame(X_long, columns=long_feature_cols)
        X_short_df = pd.DataFrame(X_short, columns=short_feature_cols)

        # Convert to DataFrame for SHAP
        X_long_df = pd.DataFrame(X_long, columns=long_feature_cols)
        X_short_df = pd.DataFrame(X_short, columns=short_feature_cols)

        # Use self.long_model and self.short_model
        self.plot_shap_values(self.long_model, X_long_df, long_feature_cols, "Long", asset_name, method_name)
        self.plot_shap_values(self.short_model, X_short_df, short_feature_cols, "Short", asset_name, method_name)

    def plot_feature_importance(self, model, feature_names, title):
        importance = model.feature_importances_
        sorted_idx = importance.argsort()[::-1]
        sorted_names = [feature_names[i] for i in sorted_idx]
        sorted_importance = importance[sorted_idx]

        plt.figure(figsize=(10, 6))
        plt.barh(sorted_names, sorted_importance)
        plt.title(f"🔍 Feature Importance — {title}")
        plt.gca().invert_yaxis()
        plt.tight_layout()
        plt.show()

    def plot_shap_values(self, model, X, feature_names, title, asset_name, method_name):

        plt.close()
        plt.style.use('default')

        print(f"[SHAP] Generating plot for: {title}")
        explainer = shap.TreeExplainer(model)
        shap_values = explainer(X)

        shap.plots.bar(shap_values, max_display=len(feature_names), show=False)

        fig = plt.gcf()
        fig.suptitle(
            f"SHAP Feature Importance — {title.capitalize()} — {asset_name.upper()} — {method_name.upper()}",
            fontsize=14
        )
        plt.tight_layout(rect=[0, 0, 1, 0.95])

        # Create output directory
        output_dir = "results_metalabel/shap"
        os.makedirs(output_dir, exist_ok=True)

        # Format filename with asset + method + long/short
        safe_title = title.lower().replace(" ", "_")
        filename = f"shap_{asset_name.lower()}_{method_name.lower()}_{safe_title}.png"
        full_path = os.path.join(output_dir, filename)

        plt.savefig(full_path, dpi=300)
        plt.close()
        print(f"[SHAP] Saved to {full_path}")

        # Compute mean absolute SHAP values per feature
        mean_abs_shap = np.abs(shap_values.values).mean(axis=0)
        shap_summary_df = pd.DataFrame({
            'feature': feature_names,
            'mean_abs_shap_value': mean_abs_shap
        }).sort_values(by='mean_abs_shap_value', ascending=False)

        # Save the summary
        summary_filename = f"shap_summary_{asset_name.lower()}_{method_name.lower()}_{title.lower()}.csv"
        summary_path = os.path.join("results_metalabel/shap", summary_filename)
        shap_summary_df.to_csv(summary_path, index=False)
        print(f"[SHAP] Summary CSV saved to {summary_path}")

    def is_trade_approved(self, features: dict, direction: str, threshold: float = 0.5) -> bool:
        if direction == 'long':
            feature_list = self.long_feature_cols
            model = self.long_model
            scaler = self.long_scaler
        else:
            feature_list = self.short_feature_cols
            model = self.short_model
            scaler = self.short_scaler

        cleaned = {}
        for k in feature_list:
            val = features.get(k, 0)
            if pd.isna(val) or val in [np.inf, -np.inf]:
                cleaned[k] = 0
            else:
                cleaned[k] = val

        df = pd.DataFrame([cleaned])[feature_list]
        X = scaler.transform(df)
        prob = model.predict_proba(X)[0, 1]

        print(f"[MetaModel] Direction: {direction}, Prob: {prob:.3f}, Threshold: {threshold}, Approved: {prob >= threshold}")
        return prob >= threshold

def train_meta_model(train_df: pd.DataFrame, long_feature_cols: list, short_feature_cols: list,asset, method) -> MetaModelHandler:
    # Shift rolling metrics to avoid lookahead bias
    rolling_cols = [
        'rolling_f1', 'rolling_accuracy', 'rolling_precision', 'rolling_recall',
        'n_total_seen', 'n_window_obs'
    ]
    for col in rolling_cols:
        if col in train_df.columns:
            train_df[col] = train_df.groupby('session')[col].shift(1)
    meta_model = MetaModelHandler()
    meta_model.train(train_df, long_feature_cols, short_feature_cols,asset.value, method.value)
    return meta_model

def optimize_model(X, y, n_trials=150):
    def objective(trial):
        params = {
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2),
            "num_leaves": trial.suggest_int("num_leaves", 20, 64),
            "max_depth": trial.suggest_int("max_depth", 3, 10),
            "random_state": 42,
            "n_jobs": -1,
            "verbosity": -1
        }


        model = LGBMClassifier(**params)

        tscv = TimeSeriesSplit(n_splits=5)
        score = cross_val_score(model, X, y, cv=tscv, scoring=make_scorer(f1_score)).mean()
        return score

    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=n_trials)

    print("✅ Best parameters:", study.best_params)
    print("📈 Best F1 score:", study.best_value)
    return study.best_params
