In [None]:
import sys
sys.path.append('/Users/shintarou/coding/topquartile')

from topquartile.modules.datamodule.dataloader import DataLoader


In [None]:
# Run only if you have errors loading the topquartile module

from pathlib import Path
import sys

root = Path().resolve().parent.parent.parent
sys.path.append(root)

In [None]:
print(root)
print(sys.path)

In [None]:
from topquartile.modules.datamodule.dataloader import DataLoader
from topquartile.modules.datamodule.transforms.covariate import (TechnicalCovariateTransform, FundamentalCovariateTransform)
from topquartile.modules.datamodule.transforms.label import BinaryLabelTransform
from topquartile.modules.datamodule.partitions import PurgedTimeSeriesPartition

In [None]:
covtrans_config = [((TechnicalCovariateTransform, dict(sma = [20, 30],
                                                       ema = [20, 30],
                                                       momentum_change=True,
                                                       volatility = [20, 30],)))]

labeltrans_config = [(BinaryLabelTransform, dict(label_duration=20,
                                                quantile=0.75))]

partition_config = dict(n_splits=5, gap=20, max_train_size=504, test_size=60, verbose=False)

In [None]:
dataloader = DataLoader(data_id='dec2024', covariate_transform=covtrans_config,
                  label_transform=labeltrans_config, partition_class=PurgedTimeSeriesPartition,
                  partition_kwargs=partition_config)

In [None]:
folds = dataloader.get_cv_folds()

In [None]:
train = folds[0][0]
valid = folds[0][1]


In [None]:
train = train.dropna(how='any', inplace=False)
valid = valid.dropna(how='any', inplace=False)

train.drop('ticker', axis=1, inplace=True)
valid.drop('ticker', axis=1, inplace=True)

to_remove = ['INDEX_RETURN', 'EXCESS_RETURN', '20d_stock_return', 'label']

In [None]:
train_label = train['EXCESS_RETURN']
train_covariates = train.drop(to_remove, axis=1, inplace=False)

In [None]:
valid_label = valid['EXCESS_RETURN']
valid_covariates = valid.drop(to_remove, axis=1, inplace=False)

In [None]:
from quantile_forest import RandomForestQuantileRegressor
qrf = RandomForestQuantileRegressor()
qrf.fit(train_covariates.to_numpy(), train_label.to_numpy())
y_pred = qrf.predict(valid_covariates.to_numpy(), quantiles=[0.025, 0.5, 0.975])

In [None]:
y_pred

## Evaluation

In [None]:
import pandas as pd
import numpy as np

# y_pred: shape (n_samples, 3) with quantiles [0.025, 0.5, 0.975]
df = pd.DataFrame(y_pred, columns=['q025', 'q50', 'q975'])

# Risk = width of the prediction interval
df['risk'] = df['q975'] - df['q025']
df['sharpe'] = df['q50'] / df['risk']

# Remove invalid values (Inf, NaN, near-zero risk)
df = df.replace([np.inf, -np.inf], np.nan).dropna()
df = df[df['risk'] > 1e-6]  # Only keep non-zero risk samples

# Summary statistics
print("Sharpe-like score summary:")
print(df['sharpe'].describe())

# Extract samples with high Sharpe-like scores (e.g., top 10%)
top_sharpes = df[df['sharpe'] > df['sharpe'].quantile(0.9)]
print(f"Top {len(top_sharpes)} samples with high Sharpe-like scores")


## Optimazation　using parameters same as the paper

In [None]:
import optuna
import numpy as np
import pandas as pd
from sklearn.metrics import mean_absolute_error
from quantile_forest import RandomForestQuantileRegressor

# Preloaded and preprocessed data
X_train = train_covariates.to_numpy()
y_train = train_label.to_numpy()
X_valid = valid_covariates.to_numpy()
y_valid = valid_label.to_numpy()

def objective(trial):
    n_estimators = trial.suggest_int('n_estimators', 300, 600, step=100)
    min_samples_leaf = trial.suggest_int('min_samples_leaf', 5, 15)
    max_features = trial.suggest_float('max_features', 0.3, 0.7)

    model = RandomForestQuantileRegressor(
        n_estimators=n_estimators,
        min_samples_leaf=min_samples_leaf,
        max_features=max_features,
        n_jobs=-1,
        random_state=42
    )

    # Train on a smaller subset to reduce time
    subset = slice(0, int(len(X_train) * 0.5))  # 50% of training data
    model.fit(X_train[subset], y_train[subset])
    y_pred = model.predict(X_valid, quantiles=[0.025, 0.5, 0.975])

    df = pd.DataFrame(y_pred, columns=['q025', 'q50', 'q975'])
    df['y_true'] = y_valid
    df['risk'] = df['q975'] - df['q025']
    df = df.replace([np.inf, -np.inf], np.nan).dropna()
    df = df[df['risk'] > 1e-6]

    if df.empty:
        return -np.inf

    df['sharpe'] = df['q50'] / df['risk']
    df['covered'] = ((df['y_true'] >= df['q025']) & (df['y_true'] <= df['q975'])).astype(int)

    mae = mean_absolute_error(df['y_true'], df['q50'])
    sharpe_median = df['sharpe'].median()
    coverage = df['covered'].mean()

    score = sharpe_median + (2 * coverage) - (0.05 * mae)
    return score

# Optimization
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=20, timeout=600)

print("Best trial params:", study.best_trial.params)
print("Best score:", study.best_value)
