# Hull Tactical Market Prediction – Lazy-Fit Streaming Baseline

Kaggleの評価API(DefaultInferenceServer)にそのまま投げられるようにしたノートブックのスターターです。初回の予測時に train.csv を動的に学習し、以降は受け取ったバッチに対して 1 つの数値を返します。


## 0. Optional: install extra packages

Kaggle Notebook には numpy/pandas/scikit-learn/polars が同棒機ですが、反復用中には下記を差し込んで使用してください。


In [None]:
# Kaggle の Python インスタンスでは必要ない場合がほとんどです。
# 必要ならコメントアウトを外してください。
# !pip install polars --quiet


## 1. ライブラリの読み込みとユーティリティ

下記は Kaggle で提供される DefaultInferenceServer 用のベースラインコードです。


In [None]:
# === Hull Tactical - Market Prediction: lazy-fit baseline for evaluation API ===
#  - 初回 predict 呼び出し時に train.csv を読み込んで学習（Ridge回帰 + 標準化 + 欠損中央値補完）
#  - 以降は受け取ったバッチ（1タイムステップ想定）に対して予測スカラを返す
#  - 数値列のみを特徴量として自動選択（ID/日付/目的変数は除外）
#
# 返り値仕様：
#   - スカラ float を返す（タイムステップ1つにつき1値の予測）
#   - 万一、評価側が複数行を1度に渡す場合は、行ごとの予測を平均してスカラに畳み返却
#
# 依存：
#   - scikit-learn が利用可能な環境を想定（不可なら numpy 最小自前実装にフォールバック）
# -----------------------------------------------------------------------------

import os
import warnings
from dataclasses import dataclass
from typing import Iterable, Optional

import numpy as np
import pandas as pd

try:
    import polars as pl
except Exception:
    pl = None

try:
    from sklearn.linear_model import Ridge
    from sklearn.preprocessing import StandardScaler
    _HAS_SK = True
except Exception:
    Ridge = StandardScaler = None
    _HAS_SK = False

from kaggle_evaluation.default_inference_server import DefaultInferenceServer

warnings.filterwarnings("ignore")

TARGET = os.getenv("HTMP_TARGET", "forward_returns")
ID_CANDIDATES = {"row_id", "id", "ID"}
DATE_CANDIDATES = {"date", "timestamp", "time", "Date"}
DATA_ROOTS = (
    "/kaggle/input/hull-tactical-market-prediction",
    "/kaggle/input/hull-tactical-market-prediction/",
    "../input/hull-tactical-market-prediction",
    "../input/hull-tactical-market-prediction/",
    "data/raw",
    "./",
)


@dataclass(slots=True)
class ModelArtifacts:
    feature_columns: list[str]
    medians: dict[str, float]
    scaler: Optional[StandardScaler]
    model: object


def resolve_path(fname: str) -> Optional[str]:
    for root in DATA_ROOTS:
        candidate = os.path.join(root, fname)
        if os.path.exists(candidate):
            return candidate
    if os.path.exists(fname):
        return fname
    return None


def read_csv(path: str) -> pd.DataFrame:
    if pl is not None:
        return pl.read_csv(path).to_pandas()
    return pd.read_csv(path)


def infer_numeric_columns(df: pd.DataFrame) -> list[str]:
    exclude = set()
    exclude.update(col for col in df.columns if col in ID_CANDIDATES)
    exclude.update(col for col in df.columns if col in DATE_CANDIDATES)
    if TARGET in df.columns:
        exclude.add(TARGET)
    numeric_cols = [
        col for col in df.columns if col not in exclude and pd.api.types.is_numeric_dtype(df[col])
    ]
    return numeric_cols


def median_impute(df: pd.DataFrame, medians: dict[str, float]) -> pd.DataFrame:
    for col, value in medians.items():
        df[col] = df[col].fillna(value)
    return df


def fit_manual_ridge(X: np.ndarray, y: np.ndarray, lam: float = 1.0):
    mu = X.mean(axis=0)
    std = X.std(axis=0) + 1e-8
    Xs = (X - mu) / std
    a = Xs.T @ Xs + lam * np.eye(Xs.shape[1])
    b = Xs.T @ y
    weights = np.linalg.solve(a, b)
    bias = float(y.mean() - (mu / std) @ weights)
    return weights, bias, mu, std


class LazyBaseline:
    def __init__(self) -> None:
        self.artifacts: Optional[ModelArtifacts] = None

    def _fit(self) -> None:
        train_path = resolve_path("train.csv")
        if train_path is None:
            raise FileNotFoundError("train.csv が見つかりません。DATA_ROOTS を確認してください。")

        train_df = read_csv(train_path)
        if TARGET not in train_df.columns:
            raise KeyError(
                f"目的変数カラム '{TARGET}' が train.csv に見つかりません。"
            )

        numeric_cols = infer_numeric_columns(train_df)
        features = train_df[numeric_cols].copy()
        y = train_df[TARGET].astype(float).values

        medians = {
            col: (features[col].median() if pd.api.types.is_numeric_dtype(features[col]) else 0.0)
            for col in numeric_cols
        }
        features = median_impute(features, medians)

        if _HAS_SK:
            scaler = StandardScaler()
            Xs = scaler.fit_transform(features.values)
            model = Ridge(alpha=1.0, random_state=42)
            model.fit(Xs, y)
        else:
            scaler = None
            model = fit_manual_ridge(features.values, y)

        self.artifacts = ModelArtifacts(
            feature_columns=numeric_cols,
            medians=medians,
            scaler=scaler,
            model=model,
        )

    def _ensure_fitted(self) -> ModelArtifacts:
        if self.artifacts is None:
            self._fit()
        assert self.artifacts is not None
        return self.artifacts

    def _prepare_features(self, batch: pd.DataFrame, artifacts: ModelArtifacts) -> pd.DataFrame:
        aligned = pd.DataFrame(index=batch.index)
        for col in artifacts.feature_columns:
            if col in batch.columns and pd.api.types.is_numeric_dtype(batch[col]):
                aligned[col] = batch[col]
            else:
                aligned[col] = np.nan
        aligned = median_impute(aligned, artifacts.medians.copy())
        return aligned

    def predict(self, batch) -> float:
        artifacts = self._ensure_fitted()

        if isinstance(batch, pl.DataFrame):
            frame = batch.to_pandas()
        elif isinstance(batch, pd.DataFrame):
            frame = batch
        else:
            frame = pd.DataFrame(batch)

        features = self._prepare_features(frame, artifacts)
        values = features.values

        if _HAS_SK and artifacts.scaler is not None:
            scaled = artifacts.scaler.transform(values)
            preds = artifacts.model.predict(scaled)
        else:
            weights, bias, mu, std = artifacts.model
            scaled = (values - mu) / std
            preds = scaled @ weights + bias

        return float(np.mean(preds))


BASELINE = LazyBaseline()


def predict(batch) -> float:
    return BASELINE.predict(batch)


server = DefaultInferenceServer(predict)


## 2. ローカル実行 or コンペ提出

Kaggle 環境で `KAGGLE_IS_COMPETITION_RERUN` が設定されていれば `server.serve()` が呼ばれます。
Notebook ではローカルゲートウェイでテスト可能です。


In [None]:
if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    server.serve()
else:
    # Notebook 上ではローカルゲートウェイに接続して動作確認できます。
    server.run_local_gateway(('/kaggle/input/hull-tactical-market-prediction/',))
