# Kaggle Submission – TabPFN 单体版

该 Notebook 完全使用 TabPFN，不依赖项目内模块文件。每个代码单元封装为相对独立的“模块”，便于后续拆分成独立 py 文件。

In [1]:
# 0) 环境设置与路径管理
# 从 __future__ 导入 annotations，确保类型提示的向前兼容性
from __future__ import annotations
# 导入数据类，用于创建结构化的数据容器
from dataclasses import dataclass
# 导入 pathlib，用于面向对象的文件系统路径操作
from pathlib import Path
# 导入 typing 中的高级类型提示
from typing import Optional, Dict, List, Sequence, Tuple
# 导入 os 模块，用于与操作系统交互，如此处设置环境变量
import os
# 导入 json 模块，用于处理 JSON 数据
import json
# 导入 numpy，用于高效的数值计算
import numpy as np
# 导入 pandas，用于数据处理和分析
import pandas as pd

# --- 全局常量和环境配置 ---

# 检测是否在 Kaggle 环境中运行，Kaggle 的工作目录是 /kaggle/working
IS_KAGGLE = Path('/kaggle/working').exists()
# 获取当前工作目录
CWD = Path.cwd()
# 解析输入数据的基础路径。在 Kaggle 环境中，路径通常是 ../input/
BASE_INPUT = (CWD.parent / "input") if CWD.name == "working" else (Path("..") / "input")
# 定义 Hugging Face 模型缓存的路径，指向预先下载好的数据集
HF_CACHE = BASE_INPUT / "package-hf-cache-tabpfn" / "hf_cache"
# TabPFN 模型的本地检查点文件路径
LOCAL_CKPT = HF_CACHE / "tabpfn-v2-regressor.ckpt"
# 设置 Hugging Face 的环境变量，使其使用本地缓存
os.environ.setdefault("HF_HOME", str(HF_CACHE))
# 强制 Hugging Face Hub 使用离线模式，避免在 Kaggle Notebook 中联网下载
os.environ.setdefault("HF_HUB_OFFLINE", "1")

# 使用 dataclass 定义一个用于存储所有重要路径的容器
@dataclass
class Paths:
    data_root: Path          # 数据集根目录
    train_csv: Path          # 训练数据 CSV 文件路径
    test_csv: Path           # 测试数据 CSV 文件路径
    out_dir: Path            # 输出目录
    model_dir: Path          # 模型保存目录
    submission_csv: Path     # 提交文件 (CSV 格式) 路径
    submission_parquet: Path # 提交文件 (Parquet 格式) 路径
    metadata_path: Path      # 模型元数据文件路径

# 定义一个函数来解析并构建所有需要的路径
def resolve_paths(verbose: bool = True) -> Paths:
    # 比赛数据的根目录
    data_root = (BASE_INPUT / "hull-tactical-market-prediction").resolve()
    # 输出目录设置为当前工作目录
    out_dir = CWD.resolve()
    # 模型目录设置在输出目录下
    model_dir = out_dir / "tabpfn_model"
    # 创建并填充 Paths 对象
    paths = Paths(
        data_root=data_root,
        train_csv=data_root / "train.csv",
        test_csv=data_root / "test.csv",
        out_dir=out_dir,
        model_dir=model_dir,
        submission_csv=out_dir / "submission.csv",
        submission_parquet=out_dir / "submission.parquet",
        metadata_path=model_dir / "metadata.json",
    )
    # 如果 verbose 为 True，则打印所有解析出的路径，方便调试
    if verbose:
        print("=== Paths ===")
        for k, v in paths.__dict__.items():
            print(f"{k:18s}: {v}")
    return paths

In [2]:
# 若在 Kaggle 环境中运行，则按需使用离线轮子（Kaggle 自带 GPU 版 torch）
if IS_KAGGLE:
    import os
    import subprocess
    from pathlib import Path

    wheel_ds = os.environ.get("TABPFN_WHEEL_DATASET", "kaggle-linux-wheels-tabpfn")
    wheel_dir = Path(f"/kaggle/input/{wheel_ds}")
    if (wheel_dir / "kaggle-linux-wheels-tabpfn").exists():
        wheel_dir = wheel_dir / "kaggle-linux-wheels-tabpfn"
    print("Wheel dir:", wheel_dir)
    print("提示：本赛提交需要 /kaggle/working/submission.parquet；请不要在 Kaggle 里 pip 安装/升级 pyarrow。")

    def pip_install(*packages: str, no_deps: bool = False) -> None:
        cmd = ["pip", "install", "--no-index", f"--find-links={wheel_dir}"]
        if no_deps:
            cmd.append("--no-deps")
        cmd += list(packages)
        subprocess.check_call(cmd)

    # 关键：用 --no-deps 避免 pip 去解析/改动 torch 与 nvidia-*（Kaggle 已自带 GPU torch）
    pip_install("backoff==2.2.1", "distro==1.9.0", no_deps=True)
    pip_install("posthog==6.7.4", no_deps=True)
    pip_install("tabpfn-common-utils==0.2.10", "kditransform==1.2.0", "eval-type-backport==0.3.1", no_deps=True)
    pip_install("tabpfn==6.0.6", no_deps=True)

    os.environ.setdefault("HF_HOME", str(wheel_dir))
    os.environ.setdefault("HF_HUB_OFFLINE", "1")
    os.environ.setdefault("TABPFN_MODEL_PATH", str(wheel_dir / "tabpfn-v2-regressor.ckpt"))
else:
    print("Not Kaggle environment; skip online install block")


Wheel dir: /kaggle/input/kaggle-linux-wheels-tabpfn
提示：本赛提交需要 /kaggle/working/submission.parquet；请不要在 Kaggle 里 pip 安装/升级 pyarrow。
Looking in links: /kaggle/input/kaggle-linux-wheels-tabpfn
Processing /kaggle/input/kaggle-linux-wheels-tabpfn/backoff-2.2.1-py3-none-any.whl
Installing collected packages: backoff
Successfully installed backoff-2.2.1
Looking in links: /kaggle/input/kaggle-linux-wheels-tabpfn
Processing /kaggle/input/kaggle-linux-wheels-tabpfn/posthog-6.7.4-py3-none-any.whl
Installing collected packages: posthog
Successfully installed posthog-6.7.4
Looking in links: /kaggle/input/kaggle-linux-wheels-tabpfn
Processing /kaggle/input/kaggle-linux-wheels-tabpfn/tabpfn_common_utils-0.2.10-py3-none-any.whl
Processing /kaggle/input/kaggle-linux-wheels-tabpfn/kditransform-1.2.0-py3-none-any.whl
Processing /kaggle/input/kaggle-linux-wheels-tabpfn/eval_type_backport-0.3.1-py3-none-any.whl
Installing collected packages: kditransform, tabpfn-common-utils, eval-type-backport
Successfully

In [None]:
# 0.5) ?????RAM / VRAM?
# ?????????? log_resources("tag")?????????????????
import os
import time
import shutil
import subprocess

try:
    import psutil  # ????????
except Exception:
    psutil = None

_RESOURCE_LAST_T = 0.0

def _rss_mb() -> float | None:
    # Prefer psutil
    try:
        if psutil is not None:
            return psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
    except Exception:
        pass

    # Linux fallback (/proc)
    try:
        with open('/proc/self/status', 'r', encoding='utf-8') as f:
            for line in f:
                if line.startswith('VmRSS:'):
                    return int(line.split()[1]) / 1024
    except Exception:
        pass

    return None

def log_resources(tag: str = '', *, force: bool = False, min_interval_s: float | None = None, include_nvidia_smi: bool = False) -> None:
    """Print RAM/VRAM usage with rate limiting.

    - force=True: always print
    - min_interval_s: override env LOG_RESOURCES_MIN_INTERVAL_S (default 10s)
    - include_nvidia_smi=True: also query nvidia-smi (slower)
    """
    global _RESOURCE_LAST_T

    if os.environ.get('LOG_RESOURCES', '1') in ('0', 'false', 'False'):
        return

    now = time.monotonic()
    if not force:
        if min_interval_s is None:
            try:
                min_interval_s = float(os.environ.get('LOG_RESOURCES_MIN_INTERVAL_S', '10'))
            except Exception:
                min_interval_s = 10.0
        if now - _RESOURCE_LAST_T < float(min_interval_s):
            return
    _RESOURCE_LAST_T = now

    parts = [f"[RES] {tag}".strip()]

    rss = _rss_mb()
    if rss is not None:
        parts.append(f"RSS={rss:.0f}MB")

    try:
        import torch
        if torch.cuda.is_available():
            free, total = torch.cuda.mem_get_info()
            parts.append(f"VRAM_free={free/1024**3:.2f}GB")
            parts.append(f"VRAM_total={total/1024**3:.2f}GB")
            parts.append(f"alloc={torch.cuda.memory_allocated()/1024**3:.2f}GB")
            parts.append(f"reserved={torch.cuda.memory_reserved()/1024**3:.2f}GB")
            try:
                parts.append(f"max_alloc={torch.cuda.max_memory_allocated()/1024**3:.2f}GB")
            except Exception:
                pass
    except Exception:
        pass

    if include_nvidia_smi and shutil.which('nvidia-smi'):
        try:
            out = subprocess.check_output(
                ['nvidia-smi', '--query-gpu=memory.used,memory.total', '--format=csv,noheader,nounits'],
                text=True,
            ).strip()
            parts.append(f"nvidia-smi(MB)={out}")
        except Exception:
            pass

    print(' | '.join(parts))


In [3]:
# 1) 数据加载与预处理
from dataclasses import dataclass

# 定义一个数据容器，用于捆绑加载后的训练集、测试集和重叠日期信息
@dataclass
class DataBundle:
    train: pd.DataFrame
    test: pd.DataFrame
    overlap_dates: List[int]

# 定义加载数据的函数
def load_data(paths: Paths) -> DataBundle:
    # 使用 pandas 从 CSV 文件加载训练和测试数据
    train_df = pd.read_csv(paths.train_csv)
    test_df = pd.read_csv(paths.test_csv)

    # --- 处理时间序列泄漏 ---
    # 竞赛说明中提到，训练集包含一部分与测试集时间重叠的数据（数据泄漏）。
    # 为了建立一个稳健的模型，我们需要识别并移除这部分数据。

    # 获取测试集的最早日期 ID
    test_start = int(test_df["date_id"].min())
    # 创建一个布尔掩码，标记出训练集中日期大于等于测试集起始日期的行
    leak_mask = train_df["date_id"] >= test_start
    # 提取这些重叠的、唯一的日期 ID
    overlap_dates = sorted(train_df.loc[leak_mask, "date_id"].unique().tolist())

    # 从训练集中移除存在泄漏的行
    cleaned_train = (
        train_df.loc[~leak_mask]
        .sort_values("date_id") # 按日期排序
        .reset_index(drop=True) # 重置索引
    )
    # 对测试集也进行排序和索引重置，以确保数据一致性
    cleaned_test = test_df.sort_values("date_id").reset_index(drop=True)

    # 打印清洗后数据的信息
    print(f"Train rows: {len(cleaned_train)} | Test rows: {len(cleaned_test)} | Overlap dates: {len(overlap_dates)}")
    # 返回包含清洗后数据的 DataBundle 对象
    return DataBundle(train=cleaned_train, test=cleaned_test, overlap_dates=overlap_dates)

In [4]:
# 2) 特征工程

# --- 常量定义 ---
# 目标变量列名
TARGET_COL = "forward_returns"

# 用于创建滞后特征的源列。这些是时间序列相关的关键指标。
LAG_SOURCE_COLUMNS: Tuple[str, ...] = (
    "forward_returns",                 # 未来收益率
    "market_forward_excess_returns",   # 市场未来超额收益率
    "risk_free_rate",                  # 无风险利率
)

# 用于识别"前瞻性"列的关键词。这些列包含未来信息，在训练时不能作为特征使用，否则会导致数据泄漏。
FORWARD_KEYWORDS: Tuple[str, ...] = ("forward", "future", "lead", "next")
# 安全的前缀。如果列名以这些前缀开头（例如 'lagged_forward_returns'），则不认为它是前瞻性的。
SAFE_FORWARD_PREFIXES: Tuple[str, ...] = ("lagged_",)
# 需要从特征集中明确排除的列。这些列要么是目标变量，要么是目标的变体，或不应作为特征。
EXCLUDE_COLUMNS: Tuple[str, ...] = (
    "forward_returns",
    "risk_free_rate",
    "market_forward_excess_returns",
)

# 辅助函数，判断一个列名是否表示前瞻性信息
def _is_forward_looking(name: str) -> bool:
    lower = name.lower()
    # 如果列名以安全前缀开头，则不是前瞻性的
    if any(lower.startswith(prefix) for prefix in SAFE_FORWARD_PREFIXES):
        return False
    # 如果列名包含任何前瞻性关键词，则认为是前瞻性的
    return any(keyword in lower for keyword in FORWARD_KEYWORDS)

# 定义一个数据容器，用于捆绑所有与特征相关的 DataFrame
@dataclass
class FeatureBundle:
    feature_columns: List[str]      # 特征列名列表
    train_features: pd.DataFrame   # 训练特征集 (X_train)
    test_features: pd.DataFrame    # 测试特征集 (X_test)
    train_with_target: pd.DataFrame # 带有目标列的训练集 (用于 TabPFN)
    calibration_frame: pd.DataFrame # 用于后续 Sharpe 校准的数据

# 主函数，构建所有特征
def build_features(bundle: DataBundle) -> FeatureBundle:
    train_df = bundle.train.copy()
    test_df = bundle.test.copy()

    # --- 创建滞后特征 ---
    # 移位操作 (shift) 会将前一天的数据作为当前行的新特征，这是处理时间序列数据的常用方法。
    for col in LAG_SOURCE_COLUMNS:
        train_df[f"lagged_{col}"] = train_df[col].shift(1)

    # 由于 shift(1) 会在第一行产生 NaN，需要删除这些行
    lag_columns = [f"lagged_{c}" for c in LAG_SOURCE_COLUMNS]
    train_df = train_df.dropna(subset=lag_columns).reset_index(drop=True)

    # --- 筛选特征列 ---
    base_exclude = set(EXCLUDE_COLUMNS)
    feature_columns = [
        col
        for col in train_df.columns
        # 确保特征在测试集中也存在
        if col in test_df.columns
        # 排除预定义的列
        and col not in base_exclude
        # 排除所有前瞻性的列
        and not _is_forward_looking(col)
    ]
    feature_columns.sort() # 对列名排序，确保每次运行的顺序一致

    # --- 准备最终的 DataFrame ---
    # 使用 reindex 确保训练集和测试集有完全相同的列序，并用 0 填充缺失值
    train_features = train_df.reindex(columns=feature_columns, fill_value=np.nan).fillna(0)
    test_features = test_df.reindex(columns=feature_columns, fill_value=np.nan).fillna(0)
    # 创建一个包含特征和目标列的 DataFrame，这是 TabPFN.fit() 方法需要的格式
    train_with_target = train_features.copy()
    train_with_target[TARGET_COL] = train_df[TARGET_COL].to_numpy(dtype=np.float64)
    # 创建一个用于校准的 DataFrame，只包含必要列
    calibration_frame = train_df[["date_id", "forward_returns", "risk_free_rate"]].copy()

    print(f"Feature columns: {len(feature_columns)}")
    return FeatureBundle(
        feature_columns=feature_columns,
        train_features=train_features,
        test_features=test_features,
        train_with_target=train_with_target,
        calibration_frame=calibration_frame,
    )

# 辅助函数，用于在预测时准备特征。确保应用与训练时相同的列和填充逻辑。
def prepare_features(df: pd.DataFrame, feature_columns: Sequence[str]) -> pd.DataFrame:
    return df.reindex(columns=feature_columns, fill_value=np.nan).fillna(0)

In [5]:
# 3) Sharpe Ratio 校准工具
# 本模块的目标是找到一个线性变换 (y = scale * x + shift) 应用于模型的原始预测，
# 使得变换后的头寸（position）在历史数据上能最大化一个自定义的“调整后夏普比率”。
# 这是一种后处理步骤，旨在使模型的输出更符合竞赛的特定评分标准。
from dataclasses import dataclass
from scipy.optimize import minimize, Bounds # 从 SciPy 导入优化函数和边界类
from warnings import warn

# 定义一个数据容器，用于存储校准结果
@dataclass
class CalibrationResult:
    scale: float             # 缩放因子
    shift: float             # 平移因子
    raw_sharpe: float        # 原始预测的夏普比率
    adjusted_sharpe: float   # 校准后预测的夏普比率

    @property
    def params(self):
        """返回一个包含 scale 和 shift 的元组"""
        return (self.scale, self.shift)

# 计算调整后夏普比率的函数
def adjusted_sharpe(solution: pd.DataFrame, positions: np.ndarray, min_position: float, max_position: float) -> float:
    if solution.empty:
        return 0.0
    df = solution[["forward_returns", "risk_free_rate"]].copy()
    # 将头寸限制在允许的最小和最大值之间
    clipped_positions = np.clip(np.asarray(positions, dtype=np.float64), min_position, max_position)
    df["position"] = clipped_positions
    # 计算策略的日收益率：一部分是无风险资产，一部分是根据头寸投资于风险资产
    strategy_returns = df["risk_free_rate"] * (1 - df["position"]) + df["position"] * df["forward_returns"]
    # 计算策略的超额收益率
    excess = strategy_returns - df["risk_free_rate"]
    if len(excess) == 0: return 0.0
    # 计算几何平均超额收益率
    cum_excess = float(np.prod(1 + excess))
    mean_excess = cum_excess ** (1 / len(df)) - 1
    # 计算策略收益率的标准差
    std_excess = float(strategy_returns.std(ddof=0))
    if std_excess == 0: return 0.0
    annual_days = 252 # 年化天数
    # 计算标准夏普比率（年化）
    sharpe = mean_excess / std_excess * np.sqrt(annual_days)

    # --- 计算惩罚项 ---
    # 市场超额收益
    market_excess = df["forward_returns"] - df["risk_free_rate"]
    market_cum = float(np.prod(1 + market_excess))
    market_mean = market_cum ** (1 / len(df)) - 1 if len(df) else 0.0
    market_std = float(market_excess.std(ddof=0))
    # 年化波动率
    market_vol = market_std * np.sqrt(annual_days) * 100
    strat_vol = std_excess * np.sqrt(annual_days) * 100
    # 1. 波动率惩罚：如果策略波动率超过市场波动率的1.2倍，则施加惩罚
    excess_vol_penalty = 1 + max(0, strat_vol / market_vol - 1.2) if market_vol > 0 else 1
    # 2. 收益差距惩罚：如果策略的平均收益低于市场平均收益，则施加惩罚
    return_gap = max(0, (market_mean - mean_excess) * 100 * annual_days)
    return_penalty = 1 + (return_gap ** 2) / 100
    # 最终分数为夏普比率除以两个惩罚项
    score = sharpe / (excess_vol_penalty * return_penalty)
    return float(min(score, 1_000_000)) # 返回分数，并设置一个上限

# 应用校准参数（scale 和 shift）到预测值上
def apply_calibration(values: np.ndarray, params: Tuple[float, float], min_position: float, max_position: float) -> np.ndarray:
    scale, shift = params
    arr = np.asarray(values, dtype=np.float64)
    # 应用线性变换并裁剪结果
    return np.clip(arr * scale + shift, min_position, max_position)

# 对预测进行校准的主函数
def calibrate_predictions(train_frame: pd.DataFrame, base_predictions: pd.Series, window: int, bounds: Bounds, min_position: float, max_position: float) -> CalibrationResult:
    if base_predictions is None or len(base_predictions) == 0:
        return CalibrationResult(scale=1.0, shift=0.0, raw_sharpe=0.0, adjusted_sharpe=0.0)
    work = train_frame[["date_id", "forward_returns", "risk_free_rate"]].copy()
    work["model_pred"] = base_predictions.values
    work = work.sort_values("date_id")
    # 使用一个滑动窗口（例如最近180天）的数据进行校准
    if len(work) > window:
        work = work.tail(window)
    work = work.set_index("date_id")

    # 定义优化目标函数。优化器会尝试找到使这个函数值最小的参数。
    # 我们返回负的夏普比率，因为 minimize 求的是最小值。
    def objective(params: np.ndarray) -> float:
        adjusted = apply_calibration(work["model_pred"].values, tuple(params), min_position, max_position)
        return -adjusted_sharpe(work[["forward_returns", "risk_free_rate"]], adjusted, min_position, max_position)

    initial = np.array([1.0, 0.0]) # 初始猜测值 (scale=1, shift=0)
    try:
        # 调用 scipy.optimize.minimize 进行优化
        result = minimize(
            objective,             # 目标函数
            x0=initial,            # 初始参数
            method="Powell",       # 优化算法
            bounds=bounds,         # 参数的边界
            options={"xtol": 1e-4, "ftol": 1e-4, "maxiter": 500}, # 优化选项
        )
        if not result.success:
            warn(f"Calibration did not converge: {result.message}")
            scale, shift = initial
        else:
            scale, shift = result.x # 获取优化后的参数
    except Exception as exc:
        warn(f"Calibration failed, using identity transform: {exc}")
        scale, shift = initial

    # 计算校准前后的夏普比率以供参考
    adjusted = apply_calibration(work["model_pred"].values, (scale, shift), min_position, max_position)
    raw_score = adjusted_sharpe(work[["forward_returns", "risk_free_rate"]], work["model_pred"].values, min_position, max_position)
    adj_score = adjusted_sharpe(work[["forward_returns", "risk_free_rate"]], adjusted, min_position, max_position)
    return CalibrationResult(scale=float(scale), shift=float(shift), raw_sharpe=raw_score, adjusted_sharpe=adj_score)

In [6]:
# 4) TabPFN 模型服务封装
# 这个类将 TabPFN 模型的所有操作（训练、加载、预测、校准）封装起来，提供一个简洁的接口。
from tabpfn import TabPFNRegressor
import joblib # 用于保存和加载 Python 对象（如此处的模型）
import torch
import os
from scipy.optimize import Bounds
import numpy as np

class TabPFNService:
    def __init__(self, paths: Paths, seed: int = 42, min_position: float = 0.0, max_position: float = 2.0, calibration_window: int = 180, max_training_rows: Optional[int] = 20000):
        self.paths = paths
        self.seed = seed
        self.min_position = min_position
        self.max_position = max_position
        self.calibration_window = calibration_window
        self.max_training_rows = max_training_rows  # limit training samples for runtime control
        self.model: Optional[TabPFNRegressor] = None # 模型实例
        self.feature_columns: List[str] = [] # 特征列名
        self.calibration: Optional[CalibrationResult] = None # 校准结果
        self.model_source: str = "unknown" # 记录模型来源（'trained' 或 'pretrained'）

    @property
    def device(self) -> str:
        """自动检测并返回可用的设备（CUDA GPU 或 CPU）"""
        return "cuda" if torch.cuda.is_available() else "cpu"

    def fit(self, features: FeatureBundle) -> None:
        """训练 TabPFN 模型并保存到文件"""
        self.paths.model_dir.mkdir(parents=True, exist_ok=True)
        # 初始化 TabPFN 回归器
        # model_path: 指向本地的 .ckpt 文件
        # n_estimators: 类似于集成方法中的树的数量，TabPFN 用它来做多次推理并平均
        # ignore_pretraining_limits: TabPFN 预训练时对特征数量（100）和样本数量（1024）有限制，设置此项为 True 可忽略这些限制
        model = TabPFNRegressor(model_path=os.environ.get("TABPFN_MODEL_PATH", str(LOCAL_CKPT)), n_estimators=2, device=self.device, random_state=self.seed, ignore_pretraining_limits=True)
        # 训练模型
        train_X = features.train_features
        train_y = features.train_with_target[TARGET_COL]
        original_rows = len(train_X)
        if self.max_training_rows is not None and original_rows > self.max_training_rows:
            rng = np.random.default_rng(self.seed)
            idx = rng.choice(original_rows, self.max_training_rows, replace=False)
            train_X = train_X.iloc[idx].reset_index(drop=True)
            train_y = train_y.iloc[idx].reset_index(drop=True)
            print(f"TabPFN fit: downsampled from {original_rows} -> {len(train_X)} rows for faster training")
        model.fit(train_X, train_y)
        # 将训练好的模型和特征列名打包保存
        payload = {"model": model, "feature_columns": features.feature_columns}
        joblib.dump(payload, self.paths.model_dir / "tabpfn_model.joblib")
        # 更新服务实例的状态
        self.model = model
        self.feature_columns = list(features.feature_columns)
        self.model_source = "trained"

    def load(self) -> bool:
        """从文件加载预训练的模型"""
        model_path = self.paths.model_dir / "tabpfn_model.joblib"
        if not model_path.exists():
            return False # 如果模型文件不存在，返回 False
        saved = joblib.load(model_path)
        self.model = saved["model"]
        self.feature_columns = list(saved["feature_columns"])
        self.model_source = "pretrained"
        return True

    def predict_raw(self, df: pd.DataFrame) -> pd.Series:
        """
        [Fix] Manual batching to prevent OOM on Kaggle T4 GPU.
        """
        feats = prepare_features(df, self.feature_columns)

        # T4 Safe Batch Size for TabPFN
        BATCH_SIZE = 512
        preds_list = []
        total_rows = len(feats)

        if "log_resources" in globals():
            log_resources(f"predict_raw start rows={total_rows} batch={BATCH_SIZE}", force=True)
        print(f"Starting batched prediction: {total_rows} rows, Batch Size: {BATCH_SIZE}")

        for start_idx in range(0, total_rows, BATCH_SIZE):
            end_idx = min(start_idx + BATCH_SIZE, total_rows)
            batch_df = feats.iloc[start_idx:end_idx]

            try:
                batch_pred = self.model.predict(batch_df)
                preds_list.append(batch_pred)
            except Exception as e:
                print(f"!!! Batch {start_idx}-{end_idx} failed: {e}")
                fallback = np.zeros(len(batch_df), dtype=np.float64)
                preds_list.append(fallback)

            if torch.cuda.is_available():
                torch.cuda.empty_cache()
            import gc
            gc.collect()

            if "log_resources" in globals():
                log_resources(f"predict_raw batch {start_idx}-{end_idx}")

        if len(preds_list) > 0:
            final_preds = np.concatenate(preds_list)
        else:
            final_preds = np.array([])

        if "log_resources" in globals():
            log_resources("predict_raw done", force=True)
        return pd.Series(final_preds, index=feats.index, name=TARGET_COL)
    def ensure_calibration(self, features: FeatureBundle) -> None:
        """执行预测校准"""
        # 定义校准参数的搜索边界 (scale, shift)
        bounds = Bounds([0.8, -0.5], [1.2, 0.5])
        # 在（部分）训练集上生成预测，用于计算校准参数
        train_preds = self.predict_raw(features.train_features)
        # 调用校准函数
        calib = calibrate_predictions(
            features.calibration_frame,
            train_preds,
            window=self.calibration_window,
            bounds=bounds,
            min_position=self.min_position,
            max_position=self.max_position,
        )
        self.calibration = calib
        # 将模型和校准的元数据保存为 JSON 文件，方便追溯和分析
        payload = {
            "source": self.model_source,
            "feature_columns": self.feature_columns,
            "calibration": {
                "scale": calib.scale,
                "shift": calib.shift,
                "raw_sharpe": calib.raw_sharpe,
                "adjusted_sharpe": calib.adjusted_sharpe,
                "window": self.calibration_window,
            },
        }
        self.paths.model_dir.mkdir(parents=True, exist_ok=True)
        (self.paths.metadata_path).write_text(json.dumps(payload, indent=2), encoding="utf-8")

    def predict(self, df: pd.DataFrame) -> pd.Series:
        """生成最终的、经过校准的预测"""
        raw = self.predict_raw(df)
        if self.calibration is None:
            # 如果没有进行校准，直接返回原始预测
            return raw
        # 应用校准参数
        adjusted = apply_calibration(
            raw.values,
            self.calibration.params,
            min_position=self.min_position,
            max_position=self.max_position,
        )
        return pd.Series(adjusted, index=raw.index, name=raw.name)


In [7]:
# 5) 端到端执行：训练/加载 + 推理 + 提交

# 定义一个函数来运行整个流程
def run_pipeline(train_if_needed: bool = True, max_training_rows: Optional[int] = 20000):
    """Run the pipeline with an optional training-row cap for faster experiments."""
    # 1. 解析路径
    paths = resolve_paths(verbose=True)
    if "log_resources" in globals():
        log_resources("pipeline: paths", force=True)
    # 2. 加载数据
    bundle = load_data(paths)
    # 3. 构建特征
    features = build_features(bundle)
    # 4. 初始化模型服务
    service = TabPFNService(paths, max_training_rows=max_training_rows)
    # 5. 加载或训练模型
    loaded = service.load() # 尝试加载预训练模型
    if not loaded:
        if not train_if_needed:
            raise RuntimeError("No pretrained TabPFN model found and training is disabled.")
        print("No saved model found -> training TabPFN ...")
        if "log_resources" in globals():
            log_resources("pipeline: before fit", force=True)
        service.fit(features) # 如果加载失败，则训练新模型
        if "log_resources" in globals():
            log_resources("pipeline: after fit", force=True)
    else:
        print(f"Loaded pretrained model from {paths.model_dir}")

    # 6. 执行校准
    service.ensure_calibration(features)
    print("Calibration scale/shift:", service.calibration.scale, service.calibration.shift)

    if "log_resources" in globals():
        log_resources("pipeline: after calibration", force=True)

    # 7. 在测试集上生成最终预测
    if "log_resources" in globals():
        log_resources("pipeline: before test predict", force=True)
    test_preds = service.predict(features.test_features)
    
    # 8. 创建提交文件
    # 竞赛网关逻辑：优先用 batch_id；否则使用 test.csv 第一列作为 row_id（本赛通常是 date_id）
    if "batch_id" in bundle.test.columns:
        row_col = "batch_id"
    else:
        row_col = bundle.test.columns[0] if len(bundle.test.columns) else "row_id"
    ids = bundle.test[row_col].to_numpy() if row_col in bundle.test.columns else np.arange(len(test_preds))
    submission = pd.DataFrame({row_col: ids, "prediction": np.asarray(test_preds)})
    
    # 9. Save strictly as Parquet using PyArrow (Stable)
    # Hull Tactical requires submission.parquet
    try:
        submission.to_parquet(paths.submission_parquet, engine="pyarrow", index=False)
        print(f"Saved submission Parquet (pyarrow): {submission.shape} -> {paths.submission_parquet}")
        if "log_resources" in globals():
            log_resources("pipeline: after parquet", force=True)
    except Exception as exc:
        print(f"FATAL: Parquet export failed: {exc}")
        submission.to_csv(paths.submission_csv, index=False)
        raise

    return submission


In [8]:
import time

t0 = time.perf_counter()
submission_df = run_pipeline(train_if_needed=True)
elapsed = time.perf_counter() - t0
print(f"耗时：{elapsed:.2f} 秒")
submission_df.head()


=== Paths ===
data_root         : /kaggle/input/hull-tactical-market-prediction
train_csv         : /kaggle/input/hull-tactical-market-prediction/train.csv
test_csv          : /kaggle/input/hull-tactical-market-prediction/test.csv
out_dir           : /kaggle/working
model_dir         : /kaggle/working/tabpfn_model
submission_csv    : /kaggle/working/submission.csv
submission_parquet: /kaggle/working/submission.parquet
metadata_path     : /kaggle/working/tabpfn_model/metadata.json
Train rows: 8980 | Test rows: 10 | Overlap dates: 68
Feature columns: 98
No saved model found -> training TabPFN ...
Calibration scale/shift: 1.1998252107984386 -0.015470320717753984
Saved submission CSV: (10, 2) -> /kaggle/working/submission.csv
Saved submission Parquet: polars -> /kaggle/working/submission.parquet
耗时：118.75 秒


Unnamed: 0,date_id,prediction
0,8980,0.0
1,8981,0.0
2,8982,0.0
3,8983,0.0
4,8984,0.0
