In [6]:
import pandas as pd
import numpy as np
import yfinance as yf
import ta
from datetime import datetime, timedelta, date, time
from dateutil.relativedelta import relativedelta

# Third-Party Library Imports
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import yfinance as yf
import xgboost as xgb

# Scikit-learn Imports
from sklearn.ensemble import AdaBoostClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    f1_score,
    mean_absolute_error,
    mean_squared_error,
    roc_curve,
    roc_auc_score
)
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import ParameterGrid

In [7]:
def load_and_process_stock_data(ticker: str, start_date: datetime, end_date: datetime) -> pd.DataFrame:
    """Download stock price data, add technical indicators, and align with sentiment dates."""

    delta = timedelta(days=1)

    end_date = end_date + delta

    # Download stock data
    stock_data = yf.download(ticker, start=start_date, end=end_date)
    stock_data.columns = stock_data.columns.droplevel(level=1)  # Remove multi-index
    stock_data.reset_index(inplace=True)

    # Simplify date
    stock_data['Date'] = stock_data['Date'].dt.date

    return stock_data

In [26]:
def split_train_test_pred(data, lag = 5):

  n_test = 30
  n_train = len(data) - n_test

  train = data[:n_train]
  test = data[n_train:]

  assert len(data) == len(train) + len(test)

  assert len(test) == 30

  return train, test


In [205]:
# for training

def make_sequences(data: pd.DataFrame, lag: int = 5):
    """Convert dataframe into sequences of features and labels."""
    X, y, to_pred = [], [], []
    features = ['Close', 'High', 'Low', 'Open', 'Volume']

    for idx in range(len(data) - lag + 1):

        if idx + lag >= len(data):
          continue

        X.append(data.iloc[idx:idx + lag][features].values)

        label = 1 if data.iloc[idx + lag - 1]['Close'] < data.iloc[idx + lag]['Close'] else 0
        y.append(label)

    return np.array(X), np.array(y)

In [None]:
# retrain every week
# validate on 30 days
# 3 years of historical prices

In [9]:
def simulate_intraday_long_short(preds, price_data_open, price_data_close, initial_cash=10000):
    """
    Simulate daily long/short strategy:
    - Go long if prediction is class 1, close at end of day.
    - Go short if prediction is class 0, cover at end of day.

    Parameters:
        price_data_open: Opening prices aligned with X_test
        price_data_close: Closing prices aligned with X_test
        initial_cash: Starting capital

    Returns:
        final_cash: Final value after trading
        trades: List of trade details
    """

    cash = initial_cash
    trades = []

    for i in range(len(preds)):
        open_price = price_data_open[i]
        close_price = price_data_close[i]

        if preds[i] == 1:
            # Long trade: Buy at open, sell at close
            shares = cash // open_price
            profit = shares * (close_price - open_price)
            cash += profit
            trades.append({
                'day': i,
                'action': 'long',
                'open_price': open_price,
                'close_price': close_price,
                'shares': shares,
                'profit': profit
            })

        elif preds[i] == 0:
            # Short trade: Sell borrowed shares at open, buy back at close
            shares = cash // open_price
            profit = shares * (open_price - close_price)
            cash += profit
            trades.append({
                'day': i,
                'action': 'short',
                'open_price': open_price,
                'close_price': close_price,
                'shares': shares,
                'profit': profit
            })

    return cash, trades

In [132]:
def XGBoost(X_train_flat, y_train, X_test_flat, max_depth, min_child_weight, gamma, learning_rate, n_estimators):
  model = xgb.XGBClassifier(
        use_label_encoder=False,
        eval_metric='logloss',
        verbosity=0,
        max_depth=max_depth,
        min_child_weight=min_child_weight,
        gamma=gamma,
        learning_rate=learning_rate,
        n_estimators=n_estimators
    )
  model.fit(X_train_flat, y_train)
  y_pred = model.predict(X_test_flat)

  return y_pred, model

def Ada_Boost(X_train_flat, y_train, X_test_flat, estimator__max_depth, learning_rate, n_estimators):

  base_estimator = DecisionTreeClassifier(max_depth=estimator__max_depth)
  model = AdaBoostClassifier(
      estimator=base_estimator,
      learning_rate=learning_rate,
      n_estimators=n_estimators
  )

  model.fit(X_train_flat, y_train)
  y_pred = model.predict(X_test_flat)

  return y_pred, model

In [145]:
def metrics(y_test, y_pred, test_open_prices, test_close_prices):
  acc = accuracy_score(y_test, y_pred)
  f1 = f1_score(y_test, y_pred, zero_division=0)
  class_dist = dict(zip(*np.unique(y_pred, return_counts=True)))
  trading_simulation = simulate_intraday_long_short(y_pred, test_open_prices, test_close_prices)
  return acc, f1, class_dist, trading_simulation, y_pred, y_test

In [207]:
def create_data(tickers, end_date):

  #end_date = date.today()
  start_date = end_date - relativedelta(years=3)


  start_datetime = datetime.combine(start_date, time.min)
  end_datetime = datetime.combine(end_date, time.min)

  data = {}

  for ticker in tickers:
    data[ticker] = load_and_process_stock_data(ticker, start_datetime, end_datetime)

  data_split = {}
  data_split_seq = {}
  for ticker, df in data.items():
    data_split[ticker] = split_train_test_pred(df, lag=5)

    train = make_sequences(data_split[ticker][0], lag=5)
    test = make_sequences(data_split[ticker][1], lag=5)

    data_split_seq[ticker] = (train, test)

  return data_split, data_split_seq

In [237]:
def run_models(tickers, data_split, data_split_seq):
    xgb_hyperparams = {
        'max_depth': [3, 5],
        'min_child_weight': [1, 3],
        'gamma': [0, 0.2],
        'learning_rate': [0.05, 0.1],
        'n_estimators': [100, 150]
    }

    ada_hyperparams = {
        'estimator__max_depth': [1, 3, 4, 5],
        'learning_rate': [0.5, 1.0],
        'n_estimators': [50, 100]
    }

    # xgb_hyperparams = {
    #     'max_depth': [3],
    #     'min_child_weight': [1],
    #     'gamma': [0],
    #     'learning_rate': [0.05],
    #     'n_estimators': [100]
    # }

    # ada_hyperparams = {
    #     'estimator__max_depth': [1],
    #     'learning_rate': [0.5],
    #     'n_estimators': [50]
    # }


    results = []

    for ticker in tickers:
        X_train, y_train = data_split_seq[ticker][0]
        X_test, y_test = data_split_seq[ticker][1]

        test_open_prices = np.array(data_split[ticker][1].iloc[4:4+len(y_test)]['Close'])
        test_close_prices = np.array(data_split[ticker][1].iloc[5:]['Close'])

        X_train_flat = X_train.reshape(X_train.shape[0], -1)
        X_test_flat = X_test.reshape(X_test.shape[0], -1)

        for model_name, hyperparams in [("XGBoost", xgb_hyperparams), ("AdaBoost", ada_hyperparams)]:
            for params in ParameterGrid(hyperparams):
                if model_name == "XGBoost":
                    y_pred, model = XGBoost(X_train_flat, y_train, X_test_flat, **params)
                else:
                    y_pred, model = Ada_Boost(X_train_flat, y_train, X_test_flat, **params)

                acc, f1, class_dist, trading_simulation, y_pred, y_test = metrics(
                    y_test, y_pred, test_open_prices, test_close_prices
                )

                print(
                    f"Ticker: {ticker}, Model: {model_name}, Params: {params}, "
                    f"Accuracy: {acc}, F1 Score: {f1}, Class Distribution: {class_dist}, "
                    f"Trading Simulation: {trading_simulation}"
                )

                results.append({
                    'ticker': ticker,
                    'model': model_name,
                    'params': params,
                    'accuracy': acc,
                    'f1_score': f1,
                    'class_distribution': class_dist,
                    'profit': trading_simulation[0],
                    'trading_simulation': trading_simulation[1],
                    'model' : model
                })

    final_df = pd.DataFrame(results)

    # Find the most profitable model per ticker
    best_models = final_df.loc[
        final_df.groupby("ticker")["profit"].idxmax()
    ].reset_index(drop=True)

    return final_df, best_models


In [238]:
def simulate_week_trading(best_models, end_date, ticker, lag = 5):
    start_date = end_date - relativedelta(days=7 + lag)

    start_datetime = datetime.combine(start_date, time.min)
    end_datetime = datetime.combine(end_date, time.min)


    data = load_and_process_stock_data(ticker, start_datetime, end_datetime)


    test = make_sequences(data, lag=5)
    data_seq = test

    # assert len(data_seq[0]) == 5 or len(data_seq[0]) == 4
    # assert len(data_seq[1]) == 5 or len(data_seq[1]) == 4

    X_test_flat = data_seq[0].reshape(data_seq[0].shape[0], -1)
    y_test = data_seq[1]

    test_open_prices = np.array(data.iloc[4:4+len(y_test)]['Close'])
    test_close_prices = np.array(data.iloc[5:]['Close'])

    assert len(test_open_prices) == len(test_close_prices)
    assert len(test_open_prices) == len(y_test)


    model = best_models[best_models['ticker'] == ticker]['model'].iloc[0]
    y_pred = model.predict(X_test_flat)


    return metrics(
        y_test, y_pred, test_open_prices, test_close_prices
    )

In [None]:
# Generate all Saturdays from today back 3 years
end_date = datetime(2025, 9, 20)
start_date = datetime(2022, 12, 10)#end_date - timedelta(weeks=52*3)

# Saturdays only
saturdays = pd.date_range(start=start_date, end=end_date, freq='W-SAT')

# Example: loop through each Saturday and run your process
tickers = ['TSLA', 'MSFT', 'PG', 'META', 'AMZN', 'GOOG', 'AMD', 'AAPL']

all_results = []

for dt in saturdays:
    print(f"Processing date: {dt.date()}")

    # Create data for this Saturday
    data_split, data_split_seq = create_data(tickers, dt)

    # Run models
    final_df, best_models = run_models(tickers, data_split, data_split_seq)

    # Store trading results for each ticker
    for ticker in tickers:
        acc, f1, class_dist, trading_simulation, y_pred, y_test = simulate_week_trading(best_models, dt + relativedelta(days=7), ticker)
        all_results.append({
            'date': dt.date(),
            'ticker': ticker,
            'model': 'best',
            'accuracy': acc,
            'f1_score': f1,
            'profit': trading_simulation[0],
            'trading_simulation': trading_simulation[1],
            'predictions' : y_pred,
            'test_labels' : y_test
        })

    # Combine into one DataFrame
    results_df = pd.DataFrame(all_results)

    results_df.to_csv('results.csv', index=False)