# Hull Tactical Market Prediction

### Import Libralies

In [43]:
# Utilities
import os
from pathlib import Path
import pandas as pd
import numpy as np

# Visualization
from colorama import Fore, Style
from IPython.display import display, HTML
import matplotlib.pyplot as plt
from dataclasses import dataclass, asdict

# Models
import lightgbm as lgb

# Submission
import polars as pl
import kaggle_evaluation.default_inference_server

In [44]:
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

### Definition

In [45]:
# ============ RETURNS TO SIGNAL CONFIGS ============
MIN_SIGNAL: float = 0.0                         # Minimum value for the daily signal 
MAX_SIGNAL: float = 2.0                         # Maximum value for the daily signal 
SIGNAL_MULTIPLIER: float = 7.5                 # Multiplier of the OLS market forward excess returns predictions to signal 

@dataclass(frozen=True)
class RetToSignalParameters:
    signal_multiplier: float 
    min_signal : float = MIN_SIGNAL
    max_signal : float = MAX_SIGNAL

ret_signal_params = RetToSignalParameters(
    signal_multiplier= SIGNAL_MULTIPLIER
)

def convert_ret_to_signal(
    ret_arr: np.ndarray,
    params: RetToSignalParameters,
    signal_multiplier=None
) -> np.ndarray:
    """
    Converts raw model predictions (expected returns) into a trading signal.

    Args:
        ret_arr (np.ndarray): The array of predicted returns.
        params (RetToSignalParameters): Parameters for scaling and clipping the signal.

    Returns:
        np.ndarray: The resulting trading signal, clipped between min and max values.
    """
    
    # 予測値を基準に，投資戦略シグナルに変換
    # ret * signal_multiplier + 1 を min_signal ~ max_signal の範囲にクリップ
    if signal_multiplier is not None:
        ret = np.clip(
            ret_arr * signal_multiplier + 1, params.min_signal, params.max_signal
        )
    else:    
        ret = np.clip(
            ret_arr * params.signal_multiplier + 1, params.min_signal, params.max_signal
        )

    try:
        if ret.size < 20:
            print(f"submit::{ret}")
    except:
        pass
    return ret

In [46]:
# ============ LOAD DATA ============
# プラットフォームがkaggleかローカルかで分岐
if os.getenv('KAGGLE_KERNEL_RUN_TYPE') is not None:
    # Kaggle上
    DATA_PATH: Path = Path('/kaggle/input/hull-tactical-market-prediction/')
else:
    BASE_PATH = Path.cwd()
    DATA_PATH: Path = BASE_PATH / 'data'


train = pd.read_csv(DATA_PATH / "train.csv")
test = pd.read_csv(DATA_PATH / "test.csv")

### Scoreing

In [47]:
class ParticipantVisibleError(Exception):
    # Custom error to show messages to participants
    pass

def score(solution: pd.DataFrame, submission: pd.DataFrame, row_id_column_name: str, intermediate_res:list = None) -> float:
    """
    Calculates a custom evaluation metric (volatility-adjusted Sharpe ratio).

    This metric penalizes strategies that take on significantly more volatility
    than the underlying market.

    Returns:
        float: The calculated adjusted Sharpe ratio.
    """
    solution = solution.copy().reset_index(drop=True)
    submission = submission.copy().reset_index(drop=True)
    solution['position'] = submission['prediction']

    # ありえない値を除外する (0 <= position <= 2)
        # 0 means that we don't invest in S & P at all but get only the risk-free rate.
        # 1 means that we invest all our money in S & P.
        # 2 means that we invest twice our capital in S & P while taking a credit at the risk-free rate.
        # -> つまり，普通に預金するか，S&Pに投資するか，S&Pに2倍レバレッジで投資するか（借金）の割合
    if solution['position'].max() > MAX_SIGNAL:
        raise ParticipantVisibleError(f'Position of {solution["position"].max()} exceeds maximum of {MAX_SIGNAL}')
    if solution['position'].min() < MIN_SIGNAL:
        raise ParticipantVisibleError(f'Position of {solution["position"].min()} below minimum of {MIN_SIGNAL}')

    # Calculate strategy returns
    # フェデラルファンド金利(利息) * (1-予測値) + 予測値 * S&P500の翌日のリターン = 戦略のリターン(割合)
    solution['strategy_returns'] = solution['risk_free_rate'] * (1 - solution['position']) + solution['position'] * solution['forward_returns']

    # Calculate strategy's Sharpe ratio
    # リターンとその標準偏差を用いてシャープレシオ（リスクあたりの効率）を計算
    strategy_excess_returns = solution['strategy_returns'] - solution['risk_free_rate'] # 超過リターン -> 今回の戦略で得た割合から，リスクフリー時の割合を引いた分
    strategy_excess_cumulative = (1 + strategy_excess_returns).prod() # 累積超過リターン -> 全期間の超過リターンをかけ合わせた分(1+で倍率に変換)
    strategy_mean_excess_return = (strategy_excess_cumulative) ** (1 / len(solution)) - 1 # 平均超過リターン -> 複利は幾何平均で求める． また，倍率から割合に戻してる
    strategy_std = solution['strategy_returns'].std() # リターンの標準偏差

    trading_days_per_yr = 252 # 1年あたりの取引日数(固定値)
    if strategy_std == 0:
        raise ZeroDivisionError
    sharpe = strategy_mean_excess_return / strategy_std * np.sqrt(trading_days_per_yr) # 年率換算したシャープレシオ. sqrt(252)をかけることで年率換算している（統計的な性質らしい）
    strategy_volatility = float(strategy_std * np.sqrt(trading_days_per_yr) * 100)  # 年率換算したボラティリティ(価格変動率)

    # Calculate market return and volatility
    # S&P500に投資し続けた場合のリターンとボラティリティを計算
    market_excess_returns = solution['forward_returns'] - solution['risk_free_rate'] # S&P500が利息を上回る割合
    market_excess_cumulative = (1 + market_excess_returns).prod() # ↑の累積
    market_mean_excess_return = (market_excess_cumulative) ** (1 / len(solution)) - 1 # train: 0.0003066067595838273 幾何平均，割合化
    market_std = solution['forward_returns'].std() # S&P500のリターンの標準偏差
    
    market_volatility = float(market_std * np.sqrt(trading_days_per_yr) * 100) # train: 16.748459963166347 %
    
    # Calculate the volatility penalty
    # ボラティリティペナルティを計算
    # -> 市場のボラティリティの1.2倍を超える場合のペナルティ
    excess_vol = max(0, strategy_volatility / market_volatility - 1.2) if market_volatility > 0 else 0
    vol_penalty = 1 + excess_vol

    # Calculate the return penalty
    # リターンペナルティを計算
    # -> 市場のリターンを下回る場合のペナルティ
    return_gap = max(
        0,
        (market_mean_excess_return - strategy_mean_excess_return) * 100 * trading_days_per_yr,
    )
    return_penalty = 1 + (return_gap**2) / 100

    # Adjust the Sharpe ratio by the volatility and return penalty
    # ペナルティ値の反映
    adjusted_sharpe = sharpe / (vol_penalty * return_penalty)

    # print("strategy_excess_returns NaN数:", solution['strategy_returns'].isna().sum())
    # print("strategy_std:", strategy_std)
    # print("strategy_excess_cumulative:", strategy_excess_cumulative)
    # print("market_excess_cumulative:", market_excess_cumulative)
    # print("adjusted_sharpe:", adjusted_sharpe)
    try:
        intermediate_res.append((strategy_mean_excess_return, strategy_std, sharpe, vol_penalty, return_penalty)) # 各値を記録(debug)
        return min(float(adjusted_sharpe), 1_000_000), intermediate_res # float変換，上限100万
    except NameError:
        return min(float(adjusted_sharpe), 1_000_000) # float変換，上限100万

### Training

In [48]:
# inner validation は各 fold の train の末尾 180 を推奨（固定 val を使い回すより過学習が減る）
inner_val_len = 180

def _best_m_from_inner_val(y_pred_inner, sol_inner, m_grid=None, lambda_reg=0.0):
    # m を粗く走査 → 近傍で微調整（高速で十分）
    if m_grid is None:
        m_grid = np.concatenate([
            np.linspace(0.05, 0.5, 10),      # 小さめ
            np.linspace(0.5, 2.0, 16),       # 中程度
            np.linspace(2.0, 5.0, 7)         # 大きめ
        ])
    best_m, best_score = None, -np.inf
    for m in m_grid:
        alloc = convert_ret_to_signal(m * y_pred_inner, ret_signal_params)
        sub = pd.DataFrame({"prediction": alloc}).reset_index(drop=True)
        s, _ = score(sol_inner, sub, "", [])
        s -= lambda_reg * (m**2)  # 過学習抑制したい場合
        if s > best_score:
            best_score, best_m = s, m

    # 近傍で微調整（任意）
    lo = max(0.01, best_m * 0.5); hi = best_m * 1.5
    for m in np.linspace(lo, hi, 15):

        
        alloc = convert_ret_to_signal(m * y_pred_inner, ret_signal_params)
        sub = pd.DataFrame({"prediction": alloc}).reset_index(drop=True)
        s, _ = score(sol_inner, sub, "", [])
        s -= lambda_reg * (m**2)
        if s > best_score:
            best_score, best_m = s, m
    return float(best_m), float(best_score)


In [None]:
score_list_dict = {}
def cross_validate(allocation_model, label="", min_train_size=1500, test_size=180):
    """
    時系列を考慮した交差検証を行う
    検証インデックス：
    range(len(train) - test_size, min_train_size, - test_size)
    例えば、trainデータが2000行、test_size=120、min_train_size=1500の場合、
    検証用のインデックスは：
    range(2000 - 120, 1500, -120) = range(1880, 1500, -120)
    = (1880, 1760, 1640, 1520)
    となり、各foldで未来のデータを使用せずに評価することができる．
    trainサイズはfoldが進む毎に減少し，min_train_sizeに達したら終了する．
    減少させているのは未来リークを防ぐため．
    """
    n = len(train)
    oof = np.full(n, np.nan, dtype=float)
    score_list = []
    intermediate_res = []
    val_list = []

    # ===== 前処理（1回だけ） =====
    # 学習に使わない列を除いた特徴量列を一度だけ確定
    drop_cols = ["date_id", "forward_returns", "risk_free_rate", "market_forward_excess_returns"]
    feature_cols = [c for c in train.columns if c not in drop_cols]

    # 必要列を配列/ビューで保持（コピー最小化）
    X_all = train[feature_cols]                  # DataFrame（ループ内は iloc でビュー切り）
    y_all = train["forward_returns"].to_numpy()  # 1D ndarray
    rfr_all = train["risk_free_rate"].to_numpy() # 1D ndarray

    # バリデーション（固定で最後180）を一度だけ切り出し
    val_idx_start = max(0, n - 180)
    X_val = X_all.iloc[val_idx_start:]
    y_val = y_all[val_idx_start:]
    v_sol = pd.DataFrame(
        {
            "forward_returns": y_all[val_idx_start:],
            "risk_free_rate":  rfr_all[val_idx_start:],
        }
    ).reset_index(drop=True)

    for fold, test_start in enumerate(
        range(n - test_size, min_train_size, -test_size)
    ):
        print(Fore.CYAN + f"=== Fold {fold} Test start at {test_start} ===" + Style.RESET_ALL)
        # 1. データ分割（中間DataFrameを作らず直接切る）
        test_end = test_start + test_size

        X_train = X_all.iloc[:test_start]
        y_train = y_all[:test_start]

        X_test = X_all.iloc[test_start:test_end]
        y_test = y_all[test_start:test_end]  # 使わないが念のため残す（デバッグ等）
        
        # 評価用（score用のソリューション; 必要最小限のDF生成）
        solution = pd.DataFrame(
            {
                "forward_returns": y_all[test_start:test_end],
                "risk_free_rate":  rfr_all[test_start:test_end],
            }
        ).reset_index(drop=True)

        # 1.5. SIGNALE_MULTIPLIER の最適化
        # --- inner validation を train_fold の末尾から作る ---
        inner_start = max(0, test_start - inner_val_len)
        X_inner = X_all.iloc[inner_start:test_start]
        y_inner = y_all[inner_start:test_start]
        sol_inner = pd.DataFrame({
            "forward_returns": y_all[inner_start:test_start],
            "risk_free_rate":  rfr_all[inner_start:test_start],
        }).reset_index(drop=True)

        # 2. モデル学習
        allocation_model.fit(X_train, y_train)

        # 2.5. inner validation で best_m を決定
        y_pred_inner = allocation_model.predict(X_inner)
        best_m, _ = _best_m_from_inner_val(y_pred_inner, sol_inner)

        # 3. 予測
        y_pred = allocation_model.predict(X_test)
        # allocation_list = np.clip(y_pred, 0, 2)  # 投資比率は0から2の間にクリップ
        allocation_list = convert_ret_to_signal(y_pred, ret_signal_params, signal_multiplier=best_m)

        # 4. 評価
        submission = pd.DataFrame({"prediction": allocation_list}).reset_index(drop=True)
        validation_score, intermediate_res = score(
            solution, submission, "", intermediate_res
        )

        pred_val = allocation_model.predict(X_val)
        val_allocation_list = convert_ret_to_signal(pred_val, ret_signal_params, signal_multiplier=best_m)
        val_submission = pd.DataFrame({"prediction": val_allocation_list}).reset_index(drop=True)
        val_score, _ = score(
            v_sol, val_submission, "", intermediate_res
        )
        
        vol_penalty = intermediate_res[-1][3]   # ボラティリティペナルティ
        return_penalty = intermediate_res[-1][4]# リターンペナルティ
        
        display(HTML(
            f"<p  style='color: orange'>"
            f"train(:{test_start:4}) test({test_start:4}:{test_end:4})<br>"
            f"val_score: {validation_score:6.3f} {vol_penalty=:.2f} {return_penalty=:.2f}<br>"
            f"score(submission) : {val_score}<br>"
            f"best_multi : {best_m}<br>"
            f"</p>"
        ))
        
        oof[test_start:test_end] = allocation_list
        score_list.append(validation_score)
        val_list.append(val_score)

        # 最初のfold modelを保存しておく
        # if fold == 0:
        #     submit_model = allocation_model
        # else :
        #     break

    # ===== 集計表示 =====
    submit_model = allocation_model
    display(HTML('<h2 style="text-align:center;color:orange">======== Result ========</h2>'))
    avg_validation_score = float(np.nanmean(score_list)) if len(score_list) else np.nan
    print(f"{label} Average Validation Score: {avg_validation_score:.6f}")

    # 全体スコア（インデックス揃え）s
    # mask = np.isfinite(oof)
    # solution_all = train.loc[mask, ['forward_returns','risk_free_rate']].reset_index(drop=True)
    # submission_all = pd.DataFrame({'prediction': oof[mask]}).reset_index(drop=True)
    # overall_score, intermediate_res = score(solution_all, submission_all, '', intermediate_res)
    # if intermediate_res:
    #     vol_penalty = intermediate_res[-1][3]
    #     return_penalty = intermediate_res[-1][4]
    # else:
    #     vol_penalty = return_penalty = np.nan
    # print(f"{label} Overall Validation Score: {overall_score:.6f} vol_penalty={vol_penalty:.2f} return_penalty={return_penalty:.2f}")

    # 全体スコア（インデックス揃え）
    mask = np.isfinite(oof)
    if np.any(mask):
        solution_all = pd.DataFrame(
            {
                "forward_returns": y_all[mask],
                "risk_free_rate":  rfr_all[mask],
            }
        ).reset_index(drop=True)
        submission_all = pd.DataFrame({'prediction': oof[mask]}).reset_index(drop=True)
        overall_score, intermediate_res = score(solution_all, submission_all, '', intermediate_res)
        vol_penalty = intermediate_res[-1][3] if intermediate_res else np.nan
        return_penalty = intermediate_res[-1][4] if intermediate_res else np.nan
        print(f"{label} Overall Validation Score: {overall_score:.6f} vol_penalty={vol_penalty:.2f} return_penalty={return_penalty:.2f}")
    else:
        print(f"{label} Overall Validation Score: NaN (no valid OOF)")



    score_list_dict[label] = score_list
    # 1回目のfoldのスコアを示す
    if score_list:
        print(f"{label} First(Test) Fold Validation Score: {score_list[0]:.6f}")

    if val_list:
        print(Fore.YELLOW + f"All(Test) Fold Validation Score : {(sum(val_list) / len(val_list)):6.3f}" + Style.RESET_ALL)
        

    # 分布可視化
    vals = oof[mask]
    if len(vals):
        vmin, vmax = float(np.min(vals)), float(np.max(vals))
        if vmin == vmax:
            vmax = vmin + 1e-6
        bins = np.linspace(vmin, vmax, 50)
        plt.figure(figsize=(6, 2))
        plt.hist(vals, bins=bins, density=False, color='c', edgecolor='k', linewidth=0.5)
        plt.title(f'Allocation histogram of {label}')
        plt.gca().get_yaxis().set_visible(False)
        plt.xlim(vmin, vmax)
        plt.show()

    print(f"Range of predictions: [{vmin:.6f}, {vmax:.6f}]")
    
    # SIGNAL_MULTIPLIER の算出（元ロジック踏襲）
    # ※ oofの分布に依存するため、maskチェックを入れる
    if np.any(mask):
        span = min(MAX_SIGNAL - 1.0, 1.0 - MIN_SIGNAL)  # 0-2なら span=1.0
        q = np.percentile(np.abs(oof[mask]), 99)        # 上位1%に合わせる
        SIGNAL_MULTIPLIER = (0.95 * span) / max(q, 1e-12)
        print(f"multi::{SIGNAL_MULTIPLIER}")
    else:
        print("multi::NaN (no valid OOF)")
    

    return submit_model

In [51]:
# 単純なLightGBMモデルで試す

allocation_model = lgb.LGBMRegressor(
    n_estimators=1000,
    learning_rate=0.05,
    num_leaves=31,
    colsample_bytree=0.8,
    subsample=0.8,
    random_state=42,
)

submit_model = cross_validate(allocation_model, label="LightGBM Model")

[36m=== Fold 0 Test start at 8810 ===[0m
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.003775 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 21575
[LightGBM] [Info] Number of data points in the train set: 8810, number of used features: 94
[LightGBM] [Info] Start training from score 0.000468
[Inner Optimization] k=0.511, alpha=2.000, score=10.7940


[36m=== Fold 1 Test start at 8630 ===[0m
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.002163 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 21573
[LightGBM] [Info] Number of data points in the train set: 8630, number of used features: 94
[LightGBM] [Info] Start training from score 0.000460
[Inner Optimization] k=1.133, alpha=1.000, score=10.0767


[36m=== Fold 2 Test start at 8450 ===[0m
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.002814 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 21575
[LightGBM] [Info] Number of data points in the train set: 8450, number of used features: 94
[LightGBM] [Info] Start training from score 0.000453


KeyboardInterrupt: 

### Submissiohn
- time-series streaming形式
- Kaggle サーバーから1batchずつ送られるデータからsubmission.parquetを返す
- 返り値検証があるため，指定された形式で返す
- 指定形式
  - 

In [None]:
def predict(test: pl.DataFrame) -> float:
    """Replace this function with your inference code."""
    test_pd = test.to_pandas()
    # display(test_pd.info())
    if len(test_pd.columns) > 94:
        test_pd = test_pd.drop(
            ["date_id", "is_scored", "lagged_forward_returns", "lagged_risk_free_rate", "lagged_market_forward_excess_returns"], 
            axis = 1)
    
    preds = submit_model.predict(test_pd)
    raw_pred: float = float(preds[0])
    print(f"predict:{raw_pred}")
    
    # --- 出力（float or ndarray）---
    # KaggleのAPI仕様上、float単体かSeries/DataFrameで返す必要あり float(preds[0]) if len(preds) == 1 else preds　
    return convert_ret_to_signal(raw_pred, ret_signal_params)

In [None]:
# サーバー上でpredict(test_batch)を動かす
inference_server = kaggle_evaluation.default_inference_server.DefaultInferenceServer(predict)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(('/kaggle/input/hull-tactical-market-prediction/',))

                This exceeds the startup time limit of 900 seconds that the gateway will enforce
                during the rerun on the hidden test set. Start the server before performing any time consuming steps.


GatewayRuntimeError: (<GatewayRuntimeErrorType.GATEWAY_RAISED_EXCEPTION: 5>, 'Traceback (most recent call last):\n  File "/home/masa1357/Dockerdata/kaggle/Kaggle_Hull-Tactical---Market-Prediction/kaggle_evaluation/core/base_gateway.py", line 134, in run\n    predictions, row_ids = self.get_all_predictions()\n  File "/home/masa1357/Dockerdata/kaggle/Kaggle_Hull-Tactical---Market-Prediction/kaggle_evaluation/core/base_gateway.py", line 109, in get_all_predictions\n    for data_batch, row_ids in self.generate_data_batches():\n  File "/home/masa1357/Dockerdata/kaggle/Kaggle_Hull-Tactical---Market-Prediction/kaggle_evaluation/default_gateway.py", line 29, in generate_data_batches\n    test = pl.read_csv(self.competition_data_dir / \'test.csv\')\n  File "/usr/local/lib/python3.10/dist-packages/polars/_utils/deprecation.py", line 128, in wrapper\n    return function(*args, **kwargs)\n  File "/usr/local/lib/python3.10/dist-packages/polars/_utils/deprecation.py", line 128, in wrapper\n    return function(*args, **kwargs)\n  File "/usr/local/lib/python3.10/dist-packages/polars/_utils/deprecation.py", line 128, in wrapper\n    return function(*args, **kwargs)\n  File "/usr/local/lib/python3.10/dist-packages/polars/io/csv/functions.py", line 549, in read_csv\n    df = _read_csv_impl(\n  File "/usr/local/lib/python3.10/dist-packages/polars/io/csv/functions.py", line 697, in _read_csv_impl\n    pydf = PyDataFrame.read_csv(\nFileNotFoundError: No such file or directory (os error 2): /kaggle/input/hull-tactical-market-prediction/test.csv\n')