In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
import talib  
import xgboost as xgb
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

tickers = ['META', 'AAPL', 'AMZN', 'NFLX', 'GOOGL', 'SPY']#FAANG portfolio with SPY for market proxy

In [None]:
class DataPreprocessor:
    def __init__(self, tickers,):
        self.tickers = tickers
        self.data = None
        
    def run_pipeline(self, start_date='2015-01-01', end_date='2024-12-31'):
        self._download_data(start_date, end_date)
        self._clean_data()
        self._validate_data()
        return self.data
    
    def _download_data(self, start_date, end_date):
        all_data = []
        
        for ticker in self.tickers:
            
            # Download with adjusted close prices and make column names lower
            df = yf.download(ticker, start=start_date, end=end_date, progress=False, auto_adjust=True)
            
            if isinstance(df.columns, pd.MultiIndex): 
                df.columns = [col[0].lower() for col in df.columns]
            else:
                df.columns = [col.lower() for col in df.columns]
            
            df.index.names = [name.lower() if name else 'date' for name in df.index.names]
            
            df = df[['open', 'high', 'low', 'close', 'volume']] # Keep only essential columns
            
            df['ticker'] = ticker # Add ticker column and set multi-index, organize data
            df = df.reset_index()
            df.set_index(['ticker', 'date'], inplace=True)
            
            all_data.append(df)
        
        self.data = pd.concat(all_data, axis=0).sort_index()

    def _clean_data(self):
        if self.data is None:
            raise ValueError("No data to clean!")
        
        # 1. Fill small gaps (forward fill then backward fill)
        self.data = self.data.groupby(level=0, group_keys=False).apply(lambda x: x.ffill().bfill())
        
        # 2. Drop any remaining NaN (usually at the beginning)
        self.data = self.data.dropna()

    def _validate_data(self):
        # Check date alignment
        date_counts = {}
        for ticker in self.data.index.get_level_values(0).unique():
            dates = self.data.xs(ticker, level=0).index
            date_counts[ticker] = len(dates)
        
        if len(set(date_counts.values())) == 1:
            print(f"All tickers have {list(date_counts.values())[0]} trading days")
        else:
            print("tickers have different dates")

# Create an instance and run the pipeline
preprocessor = DataPreprocessor(tickers)
data = preprocessor.run_pipeline()

In [None]:
class FeatureEngineer:
    def __init__(self, data):
        self.data = data.copy()
        self.finished_features = []
        
    def run_pipeline(self):
        self.core_price_features()
        self.math_rule_features()
        self.momentum_features()
        self.volatility_features()
        self.volume_features()
        self.lagged_features()
        self.target_variable()
        return self.data
    
    def _apply_by_ticker(self, func):
        return self.data.groupby(level='ticker', group_keys=False).apply(func)
    
    def core_price_features(self):
        def calc(df):
            close, high, low, open_ = df['close'], df['high'], df['low'], df['open']
            
            # Returns
            df['log_return'] = np.log(close / close.shift(1))
            df['overnight_return'] = np.log(open_ / close.shift(1))
            df['intraday_return'] = np.log(close / open_)
            # Volatility
            df['volatility_20d'] = df['log_return'].rolling(20).std() * np.sqrt(252)
            df['atr_14'] = talib.ATR(high, low, close, timeperiod=14)
            # SMAs
            sma10 = talib.SMA(close, timeperiod=10)
            sma20 = talib.SMA(close, timeperiod=20)
            sma50 = talib.SMA(close, timeperiod=50)
            #ratios
            df['price_sma20_ratio'] = close / sma20
            df['price_sma50_ratio'] = close / sma50
            df['sma10_sma20_ratio'] = sma10 / sma20
            return df
        
        self.data = self._apply_by_ticker(calc)
        self.finished_features += ['log_return', 'overnight_return', 'intraday_return', 'volatility_20d', 'atr_14', 'price_sma20_ratio', 'price_sma50_ratio', 'sma10_sma20_ratio']
    
    def math_rule_features(self):
        def calc(df):
            close, high, low, volume = df['close'], df['high'], df['low'], df['volume']
            
            # SMAs
            sma10 = talib.SMA(close, timeperiod=10)
            sma20 = talib.SMA(close, timeperiod=20)
            sma50 = talib.SMA(close, timeperiod=50)
            sma200 = talib.SMA(close, timeperiod=200)
            
            # Trends
            df['golden_cross'] = (sma50 > sma200).astype(int)
            df['short_uptrend'] = (sma10 > sma20).astype(int)
            df['price_above_sma20'] = (close > sma20).astype(int)
            df['price_above_sma50'] = (close > sma50).astype(int)
            
            # Momentum
            rsi = talib.RSI(close, timeperiod=14)
            macd, signal, _ = talib.MACD(close, fastperiod=12, slowperiod=26, signalperiod=9)
            stoch_k, _ = talib.STOCH(high, low, close, fastk_period=14, slowk_period=3, slowd_period=3)
            roc = talib.ROC(close, timeperiod=10)
            df['rsi_oversold'] = (rsi < 30).astype(int)
            df['rsi_overbought'] = (rsi > 70).astype(int)
            df['macd_bullish'] = (macd > signal).astype(int)
            df['roc_positive'] = (roc > 0).astype(int)
            df['stoch_oversold'] = (stoch_k < 20).astype(int)
            
            # Volatility/Reversion 
            upper, _, lower = talib.BBANDS(close, timeperiod=20, nbdevup=2, nbdevdn=2)
            bb_pos = (close - lower) / (upper - lower)
            vol_20d = df['log_return'].rolling(20).std() * np.sqrt(252)
            vol_75pct = vol_20d.expanding().quantile(0.75)
            df['bb_oversold'] = (bb_pos < 0.2).astype(int)
            df['bb_overbought'] = (bb_pos > 0.8).astype(int)
            df['high_volatility'] = (vol_20d > vol_75pct).astype(int)
            
            # Volume 
            vol_sma20 = talib.SMA(volume, timeperiod=20)
            vol_ratio = volume / vol_sma20
            price_up = close > close.shift(1)
            price_down = close < close.shift(1)
            df['volume_spike'] = (vol_ratio > 1.5).astype(int)
            df['volume_confirmation'] = (price_up & (vol_ratio > 1)).astype(int)
            df['volume_divergence'] = (price_down & (vol_ratio > 1)).astype(int)
            return df
        
        self.data = self._apply_by_ticker(calc)
        self.finished_features += ['golden_cross', 'short_uptrend', 'price_above_sma20', 'price_above_sma50','rsi_oversold', 'rsi_overbought', 'macd_bullish', 'roc_positive', 'stoch_oversold','bb_oversold', 'bb_overbought', 'high_volatility','volume_spike', 'volume_confirmation', 'volume_divergence']
    
    def momentum_features(self):
        def calc(df):
            close, high, low = df['close'], df['high'], df['low']
            
            df['rsi_14'] = talib.RSI(close, timeperiod=14)
            macd, signal, hist = talib.MACD(close, fastperiod=12, slowperiod=26, signalperiod=9)
            df['macd_histogram'] = hist
            df['stoch_k'], _ = talib.STOCH(high, low, close, fastk_period=14, slowk_period=3, slowd_period=3)
            df['williams_r'] = talib.WILLR(high, low, close, timeperiod=14)
            df['roc_10'] = talib.ROC(close, timeperiod=10)
            return df
        
        self.data = self._apply_by_ticker(calc)
        self.finished_features += ['rsi_14', 'macd_histogram', 'stoch_k', 'williams_r', 'roc_10']
    
    def volatility_features(self):
        def calc(df):
            close, high, low = df['close'], df['high'], df['low']
            
            upper, middle, lower = talib.BBANDS(close, timeperiod=20, nbdevup=2, nbdevdn=2)
            df['bb_position'] = (close - lower) / (upper - lower)
            df['bb_width'] = (upper - lower) / middle
            df['parkinson_vol'] = np.sqrt((1 / (4 * np.log(2))) * (np.log(high / low) ** 2)).rolling(20).mean()
            return df
        
        self.data = self._apply_by_ticker(calc)
        self.finished_features += ['bb_position', 'bb_width', 'parkinson_vol',]
    
    def volume_features(self):
        def calc(df):
            close, volume = df['close'], df['volume']
            
            vol_sma20 = talib.SMA(volume, timeperiod=20)
            df['volume_ratio'] = volume / vol_sma20
            df['obv'] = talib.OBV(close, volume)
            df['volume_zscore'] = (volume - volume.rolling(20).mean()) / volume.rolling(20).std()
            return df
        
        self.data = self._apply_by_ticker(calc)
        self.finished_features += ['volume_ratio', 'obv', 'volume_zscore']
    
    def lagged_features(self):
        lag_cols = ['log_return', 'rsi_14', 'volume_ratio', 'macd_histogram', 'bb_position', 'atr_14']
        
        def calc(df):
            for col in lag_cols:
                df[f'{col}_lag1'] = df[col].shift(1)
            return df
        
        self.data = self._apply_by_ticker(calc)
        self.finished_features += [f'{col}_lag1' for col in lag_cols]
    
    def target_variable(self):
        def calc(df):
            df['target'] = df['log_return'].shift(-1)  # Predict tomorrow's return
            return df
        self.data = self._apply_by_ticker(calc)


fe = FeatureEngineer(data)
features_data = fe.run_pipeline()
print(f"Features created: {len(fe.finished_features)}")
print(f"Data shape: {features_data.shape}")
#features_data.groupby(level='ticker').head(3)

In [None]:
class RuleBasedModel:
    def __init__(self, data):
        self.data = data.copy()
        self.binary_features = [
            'golden_cross', 'short_uptrend', 'price_above_sma20', 'price_above_sma50','rsi_oversold', 'rsi_overbought', 'macd_bullish', 'roc_positive',
            'stoch_oversold','bb_oversold', 'bb_overbought', 'high_volatility','volume_spike', 'volume_confirmation', 'volume_divergence'
        ]
        # Default weights: positive = bullish signal, negative = bearish signal
        self.weights = {
            # Trend (bullish)
            'golden_cross': 2.0,
            'short_uptrend': 1.5,
            'price_above_sma20': 1.0,
            'price_above_sma50': 1.0,
            # Momentum
            'rsi_oversold': 1.5,        # Oversold = buy opportunity
            'rsi_overbought': -1.5,     # Overbought = sell signal
            'macd_bullish': 1.5,
            'roc_positive': 1.0,
            'stoch_oversold': 1.0,
            # Volatility/Reversion
            'bb_oversold': 1.5,         # Mean reversion buy
            'bb_overbought': -1.5,      # Mean reversion sell
            'high_volatility': -0.5,    # High vol = reduce risk
            # Volume
            'volume_spike': 0.5,
            'volume_confirmation': 1.5, # Price up + volume = strong
            'volume_divergence': -1.5   # Price down + volume = weak
        }
        self.results = None
    
    def run_pipeline(self):
        self._calculate_scores()
        self._generate_signals()
    
    def _apply_by_ticker(self, func):
        return self.data.groupby(level='ticker', group_keys=False).apply(func)
    
    def _calculate_scores(self):
        self.data['rule_score'] = sum(
            self.data[feat] * self.weights[feat] for feat in self.binary_features
        )
        # Normalize to [-1, 1] range
        max_pos = sum(w for w in self.weights.values() if w > 0)
        max_neg = abs(sum(w for w in self.weights.values() if w < 0))
        self.data['rule_score_norm'] = self.data['rule_score'].apply(
            lambda x: x / max_pos if x > 0 else x / max_neg if x < 0 else 0
        )
    
    def _generate_signals(self):
        # Thresholds for signal generation
        long_threshold = 0.3
        short_threshold = -0.3
        
        self.data['signal'] = 0
        self.data.loc[self.data['rule_score_norm'] > long_threshold, 'signal'] = 1
        self.data.loc[self.data['rule_score_norm'] < short_threshold, 'signal'] = -1
        self.data['strategy_return'] = self.data['signal'].shift(1) * self.data['log_return']
    



In [None]:
class XGBoostModel:
    def __init__(self, train_data, test_data, feature_cols):
        self.train_data = train_data
        self.test_data = test_data.copy()
        self.feature_cols = feature_cols
        self.model = None
        self.results = None
        
        # Hyperparameters
        self.n_estimators = 100
        self.learning_rate = 0.1
        self.max_depth = 3
        self.subsample = 0.8
        self.colsample_bytree = 0.8
        self.threshold = 0.0005  # 0.05% daily return threshold
    
    def run_pipeline(self):
        self._train_model()
        self._generate_predictions()
        self._generate_signals()
        self._calculate_returns()
    
    def _train_model(self):
        X_train = self.train_data[self.feature_cols]
        y_train = self.train_data['target']
        
        self.model = xgb.XGBRegressor(
            n_estimators=self.n_estimators,
            learning_rate=self.learning_rate,
            max_depth=self.max_depth,
            subsample=self.subsample,
            colsample_bytree=self.colsample_bytree,
            random_state=42,
        )
        
        self.model.fit(X_train, y_train)
    
    def _generate_predictions(self):
        X_test = self.test_data[self.feature_cols]
        self.test_data['xgb_pred'] = self.model.predict(X_test)
    
    def _generate_signals(self): # convert predictions to trading signals
        self.test_data['signal'] = 0
        self.test_data.loc[self.test_data['xgb_pred'] > self.threshold, 'signal'] = 1
        self.test_data.loc[self.test_data['xgb_pred'] < -self.threshold, 'signal'] = -1
    
    def _calculate_returns(self):
        self.test_data['strategy_return'] = self.test_data['signal'].shift(1) * self.test_data['log_return']

# Prepare train/test split
feature_cols = fe.finished_features
model_df = features_data.dropna(subset=feature_cols + ['target']).copy()
train_start = pd.Timestamp('2015-01-01')
train_end = pd.Timestamp('2021-01-01')
test_start = pd.Timestamp('2022-01-01') 
test_end = pd.Timestamp('2024-01-01')

train_mask = (model_df.index.get_level_values('date') >= train_start) & \
            (model_df.index.get_level_values('date') <= train_end)
test_mask = (model_df.index.get_level_values('date') >= test_start) & \
            (model_df.index.get_level_values('date') < test_end)


train_data = model_df[train_mask]
test_data = model_df[test_mask]
rb_model = RuleBasedModel(test_data)
rb_model.run_pipeline()
xgb_model = XGBoostModel(train_data, test_data, feature_cols)
xgb_model.run_pipeline()


In [None]:
# Model Comparison: XGBoost vs Rule-Based

def calculate_metrics(returns_col, signal_col, df):
    results = {}
    for ticker in df.index.get_level_values('ticker').unique():
        ticker_data = df.xs(ticker, level='ticker').dropna(subset=[returns_col, 'log_return'])
        if len(ticker_data) == 0:
            continue
        
        strat_ret = ticker_data[returns_col]
        buy_hold = ticker_data['log_return']
        signals = ticker_data[signal_col]
        
        # Cumulative returns
        strat_cum = np.exp(strat_ret.sum()) - 1
        bh_cum = np.exp(buy_hold.sum()) - 1
        sharpe = (strat_ret.mean() / strat_ret.std()) * np.sqrt(252) if strat_ret.std() > 0 else 0
        
        # Max drawdown
        cum_log_rets = strat_ret.cumsum()
        running_max = cum_log_rets.expanding().max()
        drawdown = cum_log_rets - running_max
        max_dd = np.exp(drawdown.min()) - 1
        
        # Win rate
        wins = (strat_ret > 0).sum()
        trades = (signals != 0).sum()
        win_rate = wins / trades if trades > 0 else 0
        
        results[ticker] = {
            'return': strat_cum,
            'bh_return': bh_cum,
            'sharpe': sharpe,
            'max_dd': max_dd,
            'win_rate': win_rate,
            'trades': trades
        }

    all_returns = df.dropna(subset=[returns_col])[returns_col]
    overall_sharpe = (all_returns.mean() / all_returns.std()) * np.sqrt(252) if all_returns.std() > 0 else 0

    return results, overall_sharpe

xgb_results, xgb_overall = calculate_metrics('strategy_return', 'signal', xgb_model.test_data)
rb_results, rb_overall = calculate_metrics('strategy_return', 'signal', rb_model.data)

for ticker in sorted(xgb_results.keys()): #Print results in a table
    xgb_metrics = xgb_results[ticker]
    rb_metrics = rb_results[ticker]

    print(f"\n{ticker}:")
    print(f"  {'Metric':<20} {'XGBoost':>12} {'Rule-Based':>12} {'Difference':>12}")
    print(f"  {'-'*20} {'-'*12} {'-'*12} {'-'*12}")
    print(f"  {'Strategy Return':<20} {xgb_metrics['return']*100:>11.2f}% {rb_metrics['return']*100:>11.2f}% {(xgb_metrics['return']-rb_metrics['return'])*100:>11.2f}%")
    print(f"  {'Buy & Hold':<20} {xgb_metrics['bh_return']*100:>11.2f}% {rb_metrics['bh_return']*100:>11.2f}%")
    print(f"  {'Sharpe Ratio':<20} {xgb_metrics['sharpe']:>12.2f} {rb_metrics['sharpe']:>12.2f} {xgb_metrics['sharpe']-rb_metrics['sharpe']:>12.2f}")
    print(f"  {'Max Drawdown':<20} {xgb_metrics['max_dd']*100:>11.2f}% {rb_metrics['max_dd']*100:>11.2f}% {(xgb_metrics['max_dd']-rb_metrics['max_dd'])*100:>11.2f}%")
    print(f"  {'Win Rate':<20} {xgb_metrics['win_rate']*100:>11.1f}% {rb_metrics['win_rate']*100:>11.1f}% {(xgb_metrics['win_rate']-rb_metrics['win_rate'])*100:>11.1f}%")
    print(f"  {'Trades':<20} {xgb_metrics['trades']:>12} {rb_metrics['trades']:>12} {xgb_metrics['trades']-rb_metrics['trades']:>12}")

print(f"\n{'='*80}")
print(f"OVERALL SHARPE RATIO:")
print(f"  XGBoost:    {xgb_overall:>6.3f}")
print(f"  Rule-Based: {rb_overall:>6.3f}")


In [None]:
#Data Visualization
tickers = ['META', 'AAPL', 'AMZN', 'NFLX', 'GOOGL', 'SPY']
continued_input = True
while continued_input == True:
    input_ticker = input("Please enter a stock ('AAPL') for example: " )
    for i in range(6):
        if input_ticker == tickers[i]:
            print("Done")
            continued_input = False
    
graphed_df = rb_model.data.xs(input_ticker, level = 0)

strategy_graphed_return_pct = (np.exp(graphed_df['strategy_return'].cumsum()) - 1) * 100

dates = graphed_df.index

xgb_graphed_cum_return_pct = (
    xgb_model.test_data
    .groupby(level='ticker')['strategy_return']
    .apply(lambda x: (np.exp(x.cumsum()) - 1) * 100)
)

xgb_graphed_returns = xgb_graphed_cum_return_pct.xs(input_ticker, level = 0)


class data_analysis:
    line_dict = dict(marker = ".", markersize = 3)

    def __init__(self, x_XGBoost, y_XGBoost, x_RB, y_RB, rb_model):
        self.rbmodel = rb_model
        print("Graphs Based on", input_ticker, "Stock")
        
        self.xgboost_graph(x_XGBoost, y_XGBoost)
        self.rulebased_model_graph(x_RB, y_RB)
        self.combined_graph(x_XGBoost, y_XGBoost, x_RB, y_RB)

    def xgboost_graph(self, x_XGBoost, y_XGBoost):
        plt.title("XGBoost Performance Graph")

        plt.xlabel("Dates")
        plt.ylabel("XGBoost Returns (%)")
        
        plt.plot(dates, xgb_graphed_cum_return_pct[:len(dates)], **self.line_dict)
        plt.tick_params(axis = 'x', which = 'major', labelsize = 8)

        plt.show()

    def rulebased_model_graph(self, x_RB, y_RB):
        plt.title("Rule Based Model Performance Graph")

        plt.xlabel("Dates")
        plt.ylabel("Rule Based Returns (%)")
        
        plt.plot(dates[:len(strategy_graphed_return_pct)], strategy_graphed_return_pct, color='orange', **self.line_dict)
        plt.tick_params(axis = 'x', which = 'major', labelsize = 8)

        plt.show()

    def combined_graph(self, x_XGBoost, y_XGBoost, x_RB, y_RB):
        plt.title("Combined Performance Graph")

        plt.xlabel("Dates")
        plt.ylabel("Strategy Returns (%)")
        
        plt.plot(dates, xgb_graphed_cum_return_pct[:len(dates)], label='XGBoost', **self.line_dict)
        plt.plot(dates[:len(strategy_graphed_return_pct)], strategy_graphed_return_pct, color='orange', label='Rule-Based', **self.line_dict)
        plt.legend()
        
        plt.tick_params(axis = 'x', which = 'major', labelsize = 8)

        plt.show()
        


analyse_data = data_analysis(dates, xgb_graphed_cum_return_pct[:len(dates)], dates[:len(strategy_graphed_return_pct)], strategy_graphed_return_pct, rb_model)