In [1]:
# nifty_ml_pipeline.py
"""
NIFTY50 ML pipeline for 3 horizons + crash risk.
Outputs per-horizon probability scores in [0,1].
Requirements:
pip install yfinance pandas numpy ta scikit-learn xgboost lightgbm joblib imbalanced-learn matplotlib
"""

import os
import time
import joblib
import math
import numpy as np
import pandas as pd
import yfinance as yf
from datetime import timedelta
from sklearn.model_selection import RandomizedSearchCV, TimeSeriesSplit
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, roc_auc_score
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
import warnings
warnings.filterwarnings("ignore")

# Optional: pip install ta
try:
    import ta
except Exception:
    raise Exception("Install 'ta' package: pip install ta")

# ------------ CONFIG ------------
# Hardcoded Nifty50 tickers (yfinance style) - tweak if needed.
NIFTY50 = [
"RELIANCE.NS","TCS.NS","HDFCBANK.NS","INFY.NS","HDFC.NS","HINDUNILVR.NS","ICICIBANK.NS","KOTAKBANK.NS",
"SBIN.NS","BHARTIARTL.NS","ITC.NS","LT.NS","AXISBANK.NS","ASIANPAINT.NS","HCLTECH.NS","BAJAJ-AUTO.NS",
"ULTRACEMCO.NS","SUNPHARMA.NS","POWERGRID.NS","NTPC.NS","TITAN.NS","EICHERMOT.NS","MARUTI.NS","ONGC.NS",
"NESTLEIND.NS","TECHM.NS","TATASTEEL.NS","BRITANNIA.NS","HINDALCO.NS","DIVISLAB.NS","ADANIENT.NS","M&M.NS",
"BPCL.NS","GRASIM.NS","INDUSINDBK.NS","WIPRO.NS","COALINDIA.NS","SHREECEM.NS","HDFCLIFE.NS","CIPLA.NS",
"IOC.NS","JSWSTEEL.NS","BAJAJFINSV.NS","BRIDGESTONE.NS","SBILIFE.NS"  # add/remove to reach full 50 as necessary
]
# If list incomplete, add remainder.

DATA_DIR = "./data_nifty"
os.makedirs(DATA_DIR, exist_ok=True)
MODEL_DIR = "./models_nifty"
os.makedirs(MODEL_DIR, exist_ok=True)

# Horizons in trading days approximations:
HORIZONS = {
    "very_short": 5,       # ~1 week (~5 trading days)
    "short": 90,           # ~3 months (use 63-126 range — pick 90 for training target)
    "long": 520            # ~2 years (approx 2*252 trading days)
}
# Target return thresholds (tunable)
RETURN_THRESHOLDS = {
    "very_short": 0.015,   # 1.5% in 1 week
    "short": 0.15,         # 15% in 3-6 months
    "long": 0.7            # 70% in 2+ years (you can lower to 0.5)
}
# Crash definition for risk model: drop >= this within horizon
CRASH_DROP = 0.30  # 30% drawdown

# ---------------- Utility functions ----------------
def fetch_price_and_info(ticker, period="max"):
    """Download OHLCV and info using yfinance. Returns dataframe and info dict"""
    t = yf.Ticker(ticker)
    df = t.history(period=period, actions=False)
    if df.empty:
        print(f"Warning: {ticker} returned empty history.")
        return None, {}
    # unify column names if necessary
    df.index = pd.to_datetime(df.index)
    info = {}
    try:
        info = t.info
    except Exception:
        info = {}
    return df, info

def compute_technical_indicators(df):
    """Add technical indicators using ta"""
    df = df.copy()
    # require close, high, low, volume
    df["return_1d"] = df["Close"].pct_change()
    # Moving averages
    for w in [5, 10, 20, 50, 100, 200]:
        df[f"sma_{w}"] = df["Close"].rolling(w).mean()
        df[f"ema_{w}"] = df["Close"].ewm(span=w, adjust=False).mean()
    # RSI
    df["rsi_14"] = ta.momentum.RSIIndicator(df["Close"], window=14, fillna=True).rsi()
    # MACD
    macd = ta.trend.MACD(df["Close"])
    df["macd"] = macd.macd()
    df["macd_signal"] = macd.macd_signal()
    # Bollinger band width
    bb = ta.volatility.BollingerBands(close=df["Close"], window=20)
    df["bb_width"] = (bb.bollinger_hband() - bb.bollinger_lband()) / df["Close"]
    # ATR
    df["atr_14"] = ta.volatility.AverageTrueRange(df["High"], df["Low"], df["Close"], window=14).average_true_range()
    # Volume ratios
    for w in [5, 20, 60, 200]:
        df[f"vol_sma_{w}"] = df["Volume"].rolling(w).mean()
        df[f"vol_ratio_{w}"] = df["Volume"] / (df[f"vol_sma_{w}"] + 1e-9)
    # Price-based momentum
    df["mom_10"] = df["Close"].pct_change(10)
    df["mom_21"] = df["Close"].pct_change(21)
    df = df.fillna(0)
    return df

def extract_fundamentals(info):
    """Extract a few fundamentals from yfinance info dict (best-effort)."""
    # This is messy: different tickers may have different fields; handle gracefully
    o = {}
    # Market metrics
    o["marketCap"] = info.get("marketCap", np.nan)
    o["trailingPE"] = info.get("trailingPE", np.nan)
    o["forwardPE"] = info.get("forwardPE", np.nan)
    o["priceToBook"] = info.get("priceToBook", np.nan)
    o["pegRatio"] = info.get("pegRatio", np.nan)
    o["trailingEps"] = info.get("trailingEps", np.nan)
    o["earningsQuarterlyGrowth"] = info.get("earningsQuarterlyGrowth", np.nan)
    o["revenueGrowth"] = info.get("revenueGrowth", np.nan)
    # Balance-sheet derived metrics might need financials DataFrame; keep null if not present
    return o

def build_features_for_ticker(ticker, period="max"):
    """Return a DataFrame with engineered features and fundamentals merged per date"""
    df, info = fetch_price_and_info(ticker, period=period)
    if df is None:
        return None
    df = compute_technical_indicators(df)
    fund = extract_fundamentals(info)
    # Broadcast fundamentals as constant columns
    for k,v in fund.items():
        df[f"f_{k}"] = v if (v is not None) else np.nan
    # also add ticker column for later grouping
    df["ticker"] = ticker
    df = df.dropna(how="all")  # drop rows if entire row NaN
    return df

# --------------- Labeling ----------------
def make_labels(df, horizons=HORIZONS, thresholds=RETURN_THRESHOLDS, crash_drop=CRASH_DROP):
    """
    For each horizon create:
    - binary label: 1 if future_return >= threshold, else 0
    - crash label: 1 if max drawdown in future horizon >= crash_drop, else 0
    Returns df with new label columns.
    """
    df = df.copy()
    for name, days in horizons.items():
        future_close = df["Close"].shift(-days)
        future_return = (future_close - df["Close"]) / df["Close"]
        df[f"label_{name}"] = (future_return >= thresholds[name]).astype(int)
        # crash: compute rolling future max drop: easiest way is to compute max future price drop relative to current
        # naive implementation: within next `days`, compute min(close) and compare:
        df[f"min_future_{name}"] = df["Close"].shift(-1).rolling(window=days, min_periods=1).min().shift(-(days-1))
        # The above may give misalignment at tail; simpler explicit loop for correctness:
        min_list = []
        closes = df["Close"].values
        n = len(closes)
        for i in range(n):
            j = min(n, i + days + 1)
            future_min = np.min(closes[i+1:j]) if (i+1 < j) else closes[i]
            min_list.append(future_min)
        df[f"min_future_price_{name}"] = min_list
        df[f"future_max_drawdown_{name}"] = (df["Close"] - df[f"min_future_price_{name}"]) / df["Close"]
        df[f"crash_{name}"] = (df[f"future_max_drawdown_{name}"] >= crash_drop).astype(int)
    df = df.replace([np.inf, -np.inf], np.nan).fillna(0)
    return df

# --------------- Aggregation across tickers ---------------
def build_dataset(tickers=NIFTY50, period="max", save_intermediate=True):
    """Download data for all tickers, build features, labels, and return concatenated DataFrame"""
    all_dfs = []
    for t in tickers:
        print("Fetching:", t)
        try:
            df = build_features_for_ticker(t, period=period)
        except Exception as e:
            print("Error for", t, e)
            df = None
        if df is None:
            continue
        df = make_labels(df)
        # drop rows near the end where labels are invalid (due to shift)
        df = df.dropna(subset=[f"label_very_short", f"label_short", f"label_long"])
        all_dfs.append(df)
        if save_intermediate:
            df.to_parquet(os.path.join(DATA_DIR, f"{t.replace('/', '_')}.parquet"))
    if not all_dfs:
        raise RuntimeError("No data fetched.")
    full = pd.concat(all_dfs)
    # Ensure sorted by date
    full = full.sort_index()
    return full

# ---------------- Model training and backtest ----------------
def select_feature_columns(df):
    """Pick which columns to use as predictors (exclude unwanted ones)"""
    # all columns that start with sma_, ema_, rsi_, macd, bb_, atr_, vol_ratio_, mom_, f_ (fundamentals)
    features = [c for c in df.columns if (
        c.startswith("sma_") or c.startswith("ema_") or c.startswith("rsi_")
        or c.startswith("macd") or c.startswith("bb_") or c.startswith("atr_")
        or c.startswith("vol_ratio_") or c.startswith("mom_") or c.startswith("return_")
        or c.startswith("f_") or c.startswith("Close_Ratio") or c.startswith("vol_sma_")
    )]
    # Also you may add price level features:
    features += ["Close", "Volume"]
    features = [f for f in features if f in df.columns]
    return features

def train_single_model(X, y, model_type="xgb", n_iter=20):
    """Train a single classifier with RandomizedSearchCV (time-series split)"""
    if model_type == "xgb":
        base = XGBClassifier(objective="binary:logistic", use_label_encoder=False, eval_metric="logloss", n_jobs=4)
        param_dist = {
            "n_estimators": [100, 300, 500],
            "max_depth": [3, 5, 7, 9],
            "learning_rate": [0.01, 0.05, 0.1, 0.2],
            "subsample": [0.6, 0.8, 1.0],
            "colsample_bytree": [0.6, 0.8, 1.0],
            "reg_alpha": [0, 0.1, 1],
            "reg_lambda": [1, 5, 10]
        }
    else:
        base = LGBMClassifier(n_jobs=4)
        param_dist = {
            "n_estimators": [100, 300, 500],
            "max_depth": [ -1, 5, 10, 20],
            "learning_rate": [0.01, 0.05, 0.1],
            "num_leaves": [31, 50, 100],
            "subsample": [0.6, 0.8, 1.0]
        }
    # time series split
    tscv = TimeSeriesSplit(n_splits=3)
    rs = RandomizedSearchCV(base, param_dist, n_iter=n_iter, cv=tscv, scoring="precision", verbose=1, n_jobs=4, random_state=42)
    rs.fit(X, y)
    print("Best params:", rs.best_params_)
    return rs.best_estimator_

def walk_forward_backtest(df, feature_cols, label_col, model_type="xgb", initial_train_days=252*3, step_days=63):
    """
    Walk-forward backtest. initial_train_days: how many days to start training.
    step_days: length of test window each iteration.
    Returns predictions DataFrame with columns: y_true, y_pred, prob.
    """
    df = df.sort_index()
    unique_dates = sorted(df.index.unique())
    results = []
    i = initial_train_days
    n = len(unique_dates)
    while i < n - 5:
        train_dates = unique_dates[:i]
        test_dates = unique_dates[i:i+step_days]
        train_df = df.loc[train_dates]
        test_df = df.loc[test_dates]
        # use all tickers in train and test (multi-stock)
        # Drop rows with missing label
        train_df = train_df.dropna(subset=[label_col])
        test_df = test_df.dropna(subset=[label_col])
        # If not enough rows, break
        if len(train_df) < 200 or len(test_df) == 0:
            i += step_days
            continue
        X_train = train_df[feature_cols]
        y_train = train_df[label_col]
        X_test = test_df[feature_cols]
        y_test = test_df[label_col]
        model = train_single_model(X_train, y_train, model_type=model_type, n_iter=10)
        prob = model.predict_proba(X_test)[:, 1]
        pred = (prob >= 0.5).astype(int)
        res = pd.DataFrame({
            "y_true": y_test,
            "y_pred": pred,
            "prob": prob
        }, index=test_df.index)
        results.append(res)
        print("Backtest window:", train_dates[0], "->", train_dates[-1], "| test:", test_dates[0], "->", test_dates[-1], " | test_rows:", len(test_df))
        i += step_days
    if results:
        return pd.concat(results)
    else:
        return pd.DataFrame()

# --------------- Save/load pipeline ----------------
def save_model(model, name):
    joblib.dump(model, os.path.join(MODEL_DIR, f"{name}.joblib"))

def load_model(name):
    return joblib.load(os.path.join(MODEL_DIR, f"{name}.joblib"))

# ---------------- Main flow ----------------
def main():
    # Step 1: Build dataset (this will fetch data & features)
    print("Building dataset...")
    df = build_dataset(NIFTY50, period="max", save_intermediate=True)
    print("Dataset built with rows:", len(df))
    # Step 2: choose feature columns
    feature_cols = select_feature_columns(df)
    print("Feature count:", len(feature_cols))
    # Step 3: Train & backtest for each horizon label
    results_summary = {}
    for horizon in ["very_short", "short", "long"]:
        label_col = f"label_{horizon}"
        print("====== HOR: ", horizon, "label:", label_col)
        backtest_res = walk_forward_backtest(df, feature_cols, label_col, model_type="xgb",
                                            initial_train_days=252*2, step_days=63)
        if backtest_res.empty:
            print("No backtest results for", horizon)
            continue
        # metrics
        prec = precision_score(backtest_res["y_true"], backtest_res["y_pred"])
        rec = recall_score(backtest_res["y_true"], backtest_res["y_pred"])
        f1 = f1_score(backtest_res["y_true"], backtest_res["y_pred"])
        auc = roc_auc_score(backtest_res["y_true"], backtest_res["prob"])
        results_summary[horizon] = {"precision": prec, "recall": rec, "f1": f1, "auc": auc}
        print(f"Backtest metrics for {horizon}: precision={prec:.3f}, recall={rec:.3f}, f1={f1:.3f}, auc={auc:.3f}")
        # Train final model on all data (except the very tail where labels missing)
        final_train = df.dropna(subset=[label_col])
        final_model = train_single_model(final_train[feature_cols], final_train[label_col], model_type="xgb", n_iter=30)
        save_model(final_model, f"model_{horizon}")
        print("Saved final model for", horizon)
    # Step 4: Train crash risk model
    print("Training crash risk model...")
    crash_label = "crash_very_short"  # you can choose horizon for crash prediction
    # build dataset for crash label - for multi-horizon you can train separate crash models as well
    df_crash = df.dropna(subset=[crash_label])
    features = feature_cols
    if len(df_crash) > 500:
        risk_model = train_single_model(df_crash[features], df_crash[crash_label], model_type="lgb", n_iter=30)
        save_model(risk_model, "model_crash")
        print("Saved crash risk model.")
    else:
        print("Not enough samples to train reliable crash model. Need more data points.")
    print("All done. Models saved in", MODEL_DIR)
    print("Summary:", results_summary)

if __name__ == "__main__":
    main()


XGBoostError: 
XGBoost Library (libxgboost.dylib) could not be loaded.
Likely causes:
  * OpenMP runtime is not installed
    - vcomp140.dll or libgomp-1.dll for Windows
    - libomp.dylib for Mac OSX
    - libgomp.so for Linux and other UNIX-like OSes
    Mac OSX users: Run `brew install libomp` to install OpenMP runtime.

  * You are running 32-bit Python on a 64-bit OS

Error message(s): ["dlopen(/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/site-packages/xgboost/lib/libxgboost.dylib, 0x0006): Library not loaded: @rpath/libomp.dylib\n  Referenced from: <8E129FE8-EF1C-38EA-A9CF-202782564052> /Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/site-packages/xgboost/lib/libxgboost.dylib\n  Reason: tried: '/opt/homebrew/opt/libomp/lib/libomp.dylib' (no such file), '/System/Volumes/Preboot/Cryptexes/OS/opt/homebrew/opt/libomp/lib/libomp.dylib' (no such file), '/opt/homebrew/opt/libomp/lib/libomp.dylib' (no such file), '/System/Volumes/Preboot/Cryptexes/OS/opt/homebrew/opt/libomp/lib/libomp.dylib' (no such file)"]
