## Imports

In [63]:
import os
from pathlib import Path
import datetime
from typing import Optional, List, Callable, Tuple

from tqdm import tqdm
from dataclasses import dataclass, asdict

import polars as pl 
import numpy as np
from sklearn.linear_model import ElasticNetCV, LinearRegression, Lasso
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.base import clone, TransformerMixin

import kaggle_evaluation.default_inference_server

## Project Directory Structure

In [64]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

## Configurations

In [65]:
# ============ PATHS ============
DATA_PATH: Path = Path('kaggle/input/hull-tactical-market-prediction/')

# ============ RETURNS TO SIGNAL CONFIGS ============
MIN_SIGNAL: float = 0.0                         # Minimum value for the daily signal 
MAX_SIGNAL: float = 2.0                         # Maximum value for the daily signal 
SIGNAL_MULTIPLIER: float = 400.0                # Multiplier of the OLS market forward excess returns predictions to signal 

# ============ MODEL CONFIGS ============
CV: int = 10                                    # Number of cross validation folds in the model fitting
L1_RATIO: float = 0.5                           # ElasticNet mixing parameter
ALPHAS: np.ndarray = np.logspace(-4, 2, 100)    # Constant that multiplies the penalty terms
MAX_ITER: int = 1000000                         # The maximum number of iterations

## Dataclasses Helpers

In [66]:
@dataclass
class DatasetOutput:
    X_train : pl.DataFrame 
    X_test: pl.DataFrame
    y_train: pl.Series
    y_test: pl.Series
    scaler: TransformerMixin

@dataclass 
class ElasticNetParameters:
    l1_ratio : float 
    cv: int
    alphas: np.ndarray 
    max_iter: int 
    
    def __post_init__(self): 
        if self.l1_ratio < 0 or self.l1_ratio > 1: 
            raise ValueError("Wrong initializing value for ElasticNet l1_ratio")
        
@dataclass(frozen=True)
class RetToSignalParameters:
    signal_multiplier: float 
    min_signal : float = MIN_SIGNAL
    max_signal : float = MAX_SIGNAL

## Set the Parameters

In [67]:
ret_signal_params = RetToSignalParameters(
    signal_multiplier= SIGNAL_MULTIPLIER
)

enet_params = ElasticNetParameters(
    l1_ratio = L1_RATIO, 
    cv = CV, 
    alphas = ALPHAS, 
    max_iter = MAX_ITER
)



## Dataset Loading/Creating Helper Functions

In [68]:
def load_trainset() -> pl.DataFrame:
    """
    Loads and preprocesses the training dataset.

    Returns:
        pl.DataFrame: The preprocessed training DataFrame.
    """
    return (
        pl.read_csv(DATA_PATH / "train.csv")
        .rename({'market_forward_excess_returns':'target'})
        .with_columns(
            pl.exclude('date_id').cast(pl.Float64, strict=False)
        )
        .head(-10)
    )

def load_testset() -> pl.DataFrame:
    """
    Loads and preprocesses the testing dataset.

    Returns:
        pl.DataFrame: The preprocessed testing DataFrame.
    """
    return (
        pl.read_csv(DATA_PATH / "test.csv")
        .rename({'lagged_forward_returns':'target'})
        .with_columns(
            pl.exclude('date_id').cast(pl.Float64, strict=False)
        )
    )

def create_example_dataset(df: pl.DataFrame) -> pl.DataFrame:
    """
    Creates new features and cleans a DataFrame.

    Args:
        df (pl.DataFrame): The input Polars DataFrame.

    Returns:
        pl.DataFrame: The DataFrame with new features, selected columns, and no null values.
    """
    vars_to_keep: List[str] = [
        "S2", "E2", "E3", "P9", "S1", "S5", "I2", "P8",
        "P10", "P12", "P13", "U1", "U2"
    ]

    return (
        df.with_columns(
            (pl.col("I2") - pl.col("I1")).alias("U1"),
            (pl.col("M11") / ((pl.col("I2") + pl.col("I9") + pl.col("I7")) / 3)).alias("U2")
        )
        .select(["date_id", "target"] + vars_to_keep)
        .with_columns([
            pl.col(col).fill_null(pl.col(col).ewm_mean(com=0.5))
            for col in vars_to_keep
        ])
        .drop_nulls()
    )
    
def join_train_test_dataframes(train: pl.DataFrame, test: pl.DataFrame) -> pl.DataFrame:
    """
    Joins two dataframes by common columns and concatenates them vertically.

    Args:
        train (pl.DataFrame): The training DataFrame.
        test (pl.DataFrame): The testing DataFrame.

    Returns:
        pl.DataFrame: A single DataFrame with vertically stacked data from common columns.
    """
    common_columns: list[str] = [col for col in train.columns if col in test.columns]
    
    return pl.concat([train.select(common_columns), test.select(common_columns)], how="vertical")

def time_series_split(df: pl.DataFrame, validation_fraction: float = 0.2) -> Tuple[pl.DataFrame, pl.DataFrame]:
    """
    Splits a dataframe into training and validation sets by ordered ``date_id``.

    Args:
        df (pl.DataFrame): The input dataframe containing a ``date_id`` column.
        validation_fraction (float): Fraction of unique dates to allocate to validation.

    Returns:
        tuple[pl.DataFrame, pl.DataFrame]: Training and validation dataframes.
    """
    if not 0 < validation_fraction < 1:
        raise ValueError("validation_fraction must be between 0 and 1")

    unique_dates = sorted(df.get_column("date_id").unique())
    if len(unique_dates) < 2:
        raise ValueError("Need at least two unique date_id values to perform a split")

    split_idx = max(1, int(len(unique_dates) * (1 - validation_fraction)))
    train_dates = unique_dates[:split_idx]
    valid_dates = unique_dates[split_idx:]
    if not valid_dates:
        valid_dates = [train_dates.pop()]

    train_df = df.filter(pl.col("date_id").is_in(train_dates))
    valid_df = df.filter(pl.col("date_id").is_in(valid_dates))
    return train_df, valid_df

def split_dataset(train: pl.DataFrame, test: pl.DataFrame, features: list[str], scaler: Optional[TransformerMixin] = None) -> DatasetOutput: 
    """
    Splits the data into features (X) and target (y), and scales the features.

    Args:
        train (pl.DataFrame): The processed training DataFrame.
        test (pl.DataFrame): The processed testing DataFrame.
        features (list[str]): List of features to used in model. 
        scaler (Optional[TransformerMixin]): Optional scaler instance to apply to the features.

    Returns:
        DatasetOutput: A dataclass containing the scaled feature sets, target series, and the fitted scaler.
    """
    X_train = train.drop(['date_id','target']) 
    y_train = train.get_column('target')
    X_test = test.drop(['date_id','target']) 
    y_test = test.get_column('target')
    
    scaler = clone(scaler) if scaler is not None else StandardScaler()
    
    X_train_scaled_np = scaler.fit_transform(X_train)
    X_train = pl.from_numpy(X_train_scaled_np, schema=features)
    
    X_test_scaled_np = scaler.transform(X_test)
    X_test = pl.from_numpy(X_test_scaled_np, schema=features)
    
    
    return DatasetOutput(
        X_train = X_train,
        y_train = y_train, 
        X_test = X_test, 
        y_test = y_test,
        scaler = scaler
    )

## Converting Return Prediction to Signal

Here is an example of a potential function used to convert a prediction based on the market forward excess return to a daily signal position. 

In [69]:
def convert_ret_to_signal(
    ret_arr: np.ndarray,
    params: RetToSignalParameters
) -> np.ndarray:
    """
    Converts raw model predictions (expected returns) into a trading signal.

    Args:
        ret_arr (np.ndarray): The array of predicted returns.
        params (RetToSignalParameters): Parameters for scaling and clipping the signal.

    Returns:
        np.ndarray: The resulting trading signal, clipped between min and max values.
    """
    return np.clip(
        ret_arr * params.signal_multiplier + 1, params.min_signal, params.max_signal
    )

## Looking at the Data

In [70]:
train: pl.DataFrame = load_trainset()
test: pl.DataFrame = load_testset() 

print(train.tail(3)) 
print(test.head(3))

shape: (3, 98)
┌─────────┬─────┬─────┬─────┬───┬───────────┬─────────────────┬────────────────┬──────────┐
│ date_id ┆ D1  ┆ D2  ┆ D3  ┆ … ┆ V9        ┆ forward_returns ┆ risk_free_rate ┆ target   │
│ ---     ┆ --- ┆ --- ┆ --- ┆   ┆ ---       ┆ ---             ┆ ---            ┆ ---      │
│ i64     ┆ f64 ┆ f64 ┆ f64 ┆   ┆ f64       ┆ f64             ┆ f64            ┆ f64      │
╞═════════╪═════╪═════╪═════╪═══╪═══════════╪═════════════════╪════════════════╪══════════╡
│ 8977    ┆ 0.0 ┆ 0.0 ┆ 0.0 ┆ … ┆ -0.708599 ┆ 0.004187        ┆ 0.000162       ┆ 0.003713 │
│ 8978    ┆ 0.0 ┆ 0.0 ┆ 0.0 ┆ … ┆ -0.725858 ┆ 0.002279        ┆ 0.000162       ┆ 0.001805 │
│ 8979    ┆ 0.0 ┆ 0.0 ┆ 0.0 ┆ … ┆ -0.720092 ┆ 0.003541        ┆ 0.000161       ┆ 0.003068 │
└─────────┴─────┴─────┴─────┴───┴───────────┴─────────────────┴────────────────┴──────────┘
shape: (3, 99)
┌─────────┬─────┬─────┬─────┬───┬───────────┬───────────┬─────────────────────┬────────────────────┐
│ date_id ┆ D1  ┆ D2  ┆ D3  ┆ … ┆ is_scor

## Generating the Train and Test

In [71]:
df: pl.DataFrame = join_train_test_dataframes(train, test)
df = create_example_dataset(df=df) 

train_full: pl.DataFrame = df.filter(pl.col('date_id').is_in(train.get_column('date_id')))
kaggle_test: pl.DataFrame = df.filter(pl.col('date_id').is_in(test.get_column('date_id')))

FEATURES: list[str] = [col for col in kaggle_test.columns if col not in ['date_id', 'target']]

train_fit: pl.DataFrame
train_valid: pl.DataFrame
train_fit, train_valid = time_series_split(train_full, validation_fraction=0.2)

Please use `implode` to return to previous behavior.

See https://github.com/pola-rs/polars/issues/22149 for more information.
  train_full: pl.DataFrame = df.filter(pl.col('date_id').is_in(train.get_column('date_id')))
Please use `implode` to return to previous behavior.

See https://github.com/pola-rs/polars/issues/22149 for more information.
  kaggle_test: pl.DataFrame = df.filter(pl.col('date_id').is_in(test.get_column('date_id')))


## Scaling / Model Comparison
We evaluate scaler/model combinations using a time-based validation split carved from the training set to preserve temporal ordering.

In [72]:
scaler_factories: dict[str, Callable[[], TransformerMixin]] = {
    "standard": StandardScaler,
    "minmax": MinMaxScaler,
    "robust": RobustScaler,
}

model_factories: dict[str, Callable[[], object]] = {
    "elasticnet_cv": lambda: ElasticNetCV(**asdict(enet_params)),
    "lasso": lambda: Lasso(alpha=0.001, max_iter=MAX_ITER),
    "linear_regression": LinearRegression,
    "random_forest": lambda: RandomForestRegressor(n_estimators=200, random_state=42, n_jobs=-1),
}

evaluation_rows: list[dict[str, float]] = []

for scaler_name, scaler_factory in scaler_factories.items():
    dataset_variant = split_dataset(
        train=train_fit,
        test=train_valid,
        features=FEATURES,
        scaler=scaler_factory(),
    )
    X_train_variant = dataset_variant.X_train.to_numpy()
    X_valid_variant = dataset_variant.X_test.to_numpy()
    y_train_variant = dataset_variant.y_train.to_numpy()
    y_valid_variant = dataset_variant.y_test.to_numpy()

    for model_name, model_factory in model_factories.items():
        model_candidate = model_factory()
        model_candidate.fit(X_train_variant, y_train_variant)
        preds = model_candidate.predict(X_valid_variant)
        mse = mean_squared_error(y_valid_variant, preds)
        evaluation_rows.append({
            "scaler": scaler_name,
            "model": model_name,
            "mse": float(mse),
            "rmse": float(np.sqrt(mse)),
            "r2": float(r2_score(y_valid_variant, preds)),
        })

eval_df = pl.DataFrame(evaluation_rows).sort("rmse")
best_config = eval_df.row(0, named=True)
eval_df

scaler,model,mse,rmse,r2
str,str,f64,f64,f64
"""robust""","""elasticnet_cv""",0.00013,0.011393,0.000727
"""standard""","""elasticnet_cv""",0.00013,0.011396,0.000272
"""standard""","""lasso""",0.00013,0.0114,-0.000417
"""minmax""","""lasso""",0.00013,0.0114,-0.000417
"""robust""","""lasso""",0.00013,0.0114,-0.000419
…,…,…,…,…
"""minmax""","""linear_regression""",0.000131,0.01146,-0.011035
"""robust""","""linear_regression""",0.000131,0.01146,-0.011035
"""robust""","""random_forest""",0.000147,0.012115,-0.12997
"""standard""","""random_forest""",0.000147,0.012119,-0.130607


# Dummy Submission

Generate a baseline submission file with constant signal predictions. **Important:** The competition requires Parquet format (`submission.parquet`), not CSV.

In [84]:
dummy_test: pl.DataFrame = load_testset()
dummy_test_processed: pl.DataFrame = create_example_dataset(dummy_test)

constant_signal: float = float(convert_ret_to_signal(np.array([0.0]), ret_signal_params)[0])
dummy_submission: pl.DataFrame = (
    dummy_test_processed.select(["date_id"])
    .with_columns(pl.lit(constant_signal).alias("signal"))
)

# Write as Parquet (required by competition)
dummy_submission.write_parquet("submission.parquet")
print("✅ Created submission.parquet")
dummy_submission.head()

✅ Created submission.parquet


date_id,signal
i64,f64
8980,1.0
8981,1.0
8982,1.0
8983,1.0
8984,1.0


## Fitting the Model 

In [74]:
best_scaler_name: str = best_config["scaler"]
best_model_name: str = best_config["model"]
print(f"Best configuration -> scaler: {best_scaler_name}, model: {best_model_name}")

best_scaler: TransformerMixin = scaler_factories[best_scaler_name]()
dataset_full: DatasetOutput = split_dataset(
    train=train_full,
    test=kaggle_test,
    features=FEATURES,
    scaler=best_scaler,
 )

X_train = dataset_full.X_train.to_numpy()
X_test = dataset_full.X_test.to_numpy()
y_train = dataset_full.y_train.to_numpy()
y_test = dataset_full.y_test.to_numpy()
scaler = dataset_full.scaler

model = model_factories[best_model_name]()
model.fit(X_train, y_train)

Best configuration -> scaler: robust, model: elasticnet_cv


0,1,2
,l1_ratio,0.5
,eps,0.001
,n_alphas,'deprecated'
,alphas,array([1.0000...00000000e+02])
,fit_intercept,True
,precompute,'auto'
,max_iter,1000000
,tol,0.0001
,cv,10
,copy_X,True


## Prediction Function via Kaggle Server

In [83]:
def predict(test: pl.DataFrame) -> float:
    test = test.rename({'lagged_forward_returns':'target'})
    df: pl.DataFrame = create_example_dataset(test)
    X_test: pl.DataFrame = df.select(FEATURES)
    X_test_scaled_np: np.ndarray = scaler.transform(X_test)
    X_test: pl.DataFrame = pl.from_numpy(X_test_scaled_np, schema=FEATURES)
    raw_pred: float = model.predict(X_test)[0]
    return convert_ret_to_signal(raw_pred, ret_signal_params)

## Launch Server

In [82]:
inference_server = kaggle_evaluation.default_inference_server.DefaultInferenceServer(predict)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(('kaggle/input/hull-tactical-market-prediction/',))

