In [1]:
import pandas as pd
import numpy as np
import seaborn as sns
import pickle
import optuna
import os
import cupy as cp
from sklearn.model_selection import train_test_split, StratifiedKFold
from sktime.forecasting.model_selection import SlidingWindowSplitter
from sklearn.model_selection import TimeSeriesSplit
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from sklearn.pipeline import make_pipeline, Pipeline
from xgboost import XGBClassifier
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score

In [2]:
rs=121
stocks=['aapl','googl','nvda','dal','xom','cvx','vz']

In [3]:
init_dir_path = str(os.getcwd())
data=pd.read_csv(init_dir_path+'/stock_hmm_output.csv',engine='python').drop(['Unnamed: 0'],axis=1)
smooth_data=pd.read_csv(init_dir_path+'/stock_hmm_output_kalman.csv',engine='python').drop(['Unnamed: 0'],axis=1)
features=['Smooth_Return','Volatility','Smooth_HighR','Smooth_LowR']
target='State'
data[target] = smooth_data[target] 

In [4]:
final_test=[]
stocks_data=[]
ft_size=400
for x in data.groupby(['Ticker']):
    final_test.append(x[1][-ft_size:])
    stocks_data.append(x[1][:-ft_size])

In [6]:
testing_size=500
ns=5
tss=TimeSeriesSplit(n_splits=ns,test_size=testing_size)
spread_range = range(2, 25)

In [7]:
with open('saved_training_lists_kalman_unsmoothened.pkl', 'rb') as f:
    training_lists = pickle.load(f)

with open('saved_testing_lists_kalman_unsmoothened.pkl', 'rb') as f:
    testing_lists = pickle.load(f)

In [8]:
def accuracies_model(model_dict, window, stock_ind, map=True):
    a=0
    y_true_all=[]
    y_pred_all=[]
    if map:
        model = model_dict[window][stock_ind]
    else:
        model = model_dict
    for i in range(ns):
        model.fit(X=training_lists[window][stock_ind][i][0],y=training_lists[window][stock_ind][i][1])
        y_pred=model.predict(testing_lists[window][stock_ind][i][0])
        y_true=testing_lists[window][stock_ind][i][1]
        a+= accuracy_score(y_true,y_pred)
        y_true_all=y_true_all+list(y_true)
        y_pred_all=y_pred_all+list(y_pred)
    A=confusion_matrix(y_true=y_true_all,y_pred=y_pred_all)
    f1_score = (2*A[0][0]/(2*A[0][0]+A[0][1]+A[1][0]))
    a=a/ns
    return (a,f1_score)

In [10]:
def objective_with_args(spread, stock_ind):
    def inner(trial):
        param = {
            'max_depth': trial.suggest_int('max_depth', 3, 10),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2),
            'n_estimators': trial.suggest_int('n_estimators', 100, 300),
            'subsample': trial.suggest_float('subsample', 0.5, 1.0),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
            'reg_alpha': trial.suggest_categorical('reg_alpha', [0, 0.1, 0.5, 1.0]),
            'reg_lambda': trial.suggest_categorical('reg_lambda', [0.5, 1.0, 2.0, 5.0]),
            'min_split_loss': trial.suggest_categorical('min_split_loss', [0, 0.1, 0.3, 0.5]),
            'device': 'cuda'
        }
        model = XGBClassifier(**param)
        training_list = training_lists[spread][stock_ind]
        testing_list = testing_lists[spread][stock_ind]
        y_true_all = []
        y_pred_all = []
        for i in range(ns):
            X_train = cp.asarray(training_list[i][0])
            y_train = cp.asarray(training_list[i][1])
            X_test = cp.asarray(testing_list[i][0])
            y_test = testing_list[i][1]

            if X_test.shape[0] == 0:
                continue

            model.fit(X_train, y_train)
            y_pred_all.extend(model.predict(X_test))
            y_true_all.extend(y_test)

        return f1_score(y_true_all, y_pred_all)
    return inner


from functools import partial

best_XGB = {}
for spread in spread_range:
    best_XGB[spread] = {}
    for s_ind in range(len(stocks)):
        study = optuna.create_study(direction="maximize")
        study.optimize(objective_with_args(spread, s_ind), n_trials=50)
        best_params = study.best_params
        best_XGB[spread][s_ind] = XGBClassifier(**best_params, tree_method="hist", device="cuda")


[I 2025-04-16 14:06:00,817] A new study created in memory with name: no-name-f3b5e3ec-7ace-413a-ad19-ebfa5ff33391
[I 2025-04-16 14:06:02,960] Trial 0 finished with value: 0.7032967032967034 and parameters: {'max_depth': 5, 'learning_rate': 0.17083025143779224, 'n_estimators': 207, 'subsample': 0.6453707468594746, 'colsample_bytree': 0.8980158823108748, 'reg_alpha': 0.1, 'reg_lambda': 5.0, 'min_split_loss': 0.3}. Best is trial 0 with value: 0.7032967032967034.
[I 2025-04-16 14:06:03,913] Trial 1 finished with value: 0.7058823529411765 and parameters: {'max_depth': 3, 'learning_rate': 0.045110181778675824, 'n_estimators': 138, 'subsample': 0.5268957966453386, 'colsample_bytree': 0.956101210030712, 'reg_alpha': 0.1, 'reg_lambda': 1.0, 'min_split_loss': 0}. Best is trial 1 with value: 0.7058823529411765.
[I 2025-04-16 14:06:07,662] Trial 2 finished with value: 0.6996078431372549 and parameters: {'max_depth': 9, 'learning_rate': 0.0864746991346447, 'n_estimators': 260, 'subsample': 0.514183

KeyboardInterrupt: 

In [None]:
with open('best_xgb.pkl', 'wb') as f:
    pickle.dump(best_XGB, f)

In [11]:
from sklearn.linear_model import LogisticRegression

def objective_with_args_logreg(spread, stock_ind):
    def inner(trial):
        param = {
            'C': trial.suggest_float('C', 0.01, 10.0, log=True),
            'penalty': trial.suggest_categorical('penalty', ['l1', 'l2']),
            'solver': trial.suggest_categorical('solver', ['liblinear', 'saga']),
            'max_iter': trial.suggest_int('max_iter', 100, 500)
        }

        # Ensure penalty and solver are compatible
        if param['penalty'] == 'l1' and param['solver'] == 'liblinear':
            pass  # OK
        elif param['penalty'] == 'l2' and param['solver'] in ['liblinear', 'saga']:
            pass  # OK
        else:
            raise optuna.exceptions.TrialPruned()  # skip incompatible combinations

        model = LogisticRegression(**param)

        training_list = training_lists[spread][stock_ind]
        testing_list = testing_lists[spread][stock_ind]

        y_true_all = []
        y_pred_all = []

        for i in range(ns):
            X_train = training_list[i][0]
            y_train = training_list[i][1]
            X_test = testing_list[i][0]
            y_test = testing_list[i][1]

            model.fit(X_train, y_train)
            y_pred_all.extend(model.predict(X_test))
            y_true_all.extend(y_test)

        return f1_score(y_true_all, y_pred_all)
    return inner

from functools import partial

best_logistic = {}
for spread in spread_range:
    best_logistic[spread] = {}
    for s_ind in range(len(stocks)):
        study = optuna.create_study(direction="maximize")
        study.optimize(objective_with_args_logreg(spread, s_ind), n_trials=50)
        best_params = study.best_params
        best_logistic[spread][s_ind] = LogisticRegression(**best_params)


[I 2025-04-16 14:06:28,903] A new study created in memory with name: no-name-9b06c48e-ab2d-46ae-b77e-bf0f7498ed54
[I 2025-04-16 14:06:29,014] Trial 0 finished with value: 0.7048528241845664 and parameters: {'C': 1.1454068613921569, 'penalty': 'l2', 'solver': 'liblinear', 'max_iter': 340}. Best is trial 0 with value: 0.7048528241845664.
[I 2025-04-16 14:06:29,015] Trial 1 pruned. 
[I 2025-04-16 14:06:29,194] Trial 2 finished with value: 0.694888178913738 and parameters: {'C': 0.01921058279908107, 'penalty': 'l2', 'solver': 'saga', 'max_iter': 114}. Best is trial 0 with value: 0.7048528241845664.
[I 2025-04-16 14:06:29,195] Trial 3 pruned. 
[I 2025-04-16 14:06:29,417] Trial 4 finished with value: 0.7039106145251397 and parameters: {'C': 0.2271218903160774, 'penalty': 'l2', 'solver': 'saga', 'max_iter': 405}. Best is trial 0 with value: 0.7048528241845664.
[I 2025-04-16 14:06:29,478] Trial 5 finished with value: 0.701195219123506 and parameters: {'C': 0.07922049441056415, 'penalty': 'l1',

In [12]:
with open('best_logistic.pkl', 'wb') as f:
    pickle.dump(best_logistic, f)