In [None]:
# === 1. 导入库和数据初步检视 ===
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import seaborn as sns

# 设置绘图风格
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['font.size'] = 16
# --- 加载原始数据 (非转换过的) ---
# 假设 train_df 和 test_df 仍在内存中。
# 如果不在，需要先运行数据加载脚本。
try:
    DATA_PATH = './stocks-return-prediction'
    train_df = pd.read_pickle(os.path.join(DATA_PATH, 'train_data9.pkl'))
    print("--- 已加载原始训练数据 ---")
except (NameError, FileNotFoundError):
    print("--- 未找到原始数据，请先运行数据加载脚本 ---")
    exit()

print("\n--- 数据基本信息 (info) ---")
train_df.info()

print("\n--- 数据统计摘要 (describe) ---")
# .describe() 对于理解特征的尺度和分布至关重要
print(train_df.describe())


# === 2. 目标变量 `y` 分布分析 ===
print("\n--- 分析目标变量 y 的分布 ---")
plt.figure(figsize=(12, 6))
sns.histplot(train_df['y'], bins=100, kde=True)
plt.title('目标变量 (y) 的分布', fontsize=16)
plt.xlabel('y (收益率)', fontsize=12)
plt.ylabel('频数', fontsize=12)
# 检查峰度和偏度
skewness = train_df['y'].skew()
kurtosis = train_df['y'].kurt()
plt.text(0.05, 0.9, f'偏度 (Skewness): {skewness:.2f}\n峰度 (Kurtosis): {kurtosis:.2f}',
         transform=plt.gca().transAxes, bbox=dict(boxstyle='round,pad=0.5', fc='yellow', alpha=0.5))
plt.show()

print("分析: 目标变量呈现出典型的金融收益率分布：尖峰厚尾（高kurtosis），意味着极端事件（大涨大跌）比正态分布更常见。这解释了为什么基于排序的Rank IC是比RMSE更鲁棒的评估指标。")


# === 3. 特征 `f_0` ~ `f_27` 分布分析 ===
print("\n--- 分析特征 f_0 ~ f_27 的分布 ---")
features_to_plot = [f'f_{i}' for i in range(28)]
train_df[features_to_plot].hist(bins=50, figsize=(20, 15))
plt.suptitle('所有原始特征的分布', fontsize=20)
plt.tight_layout(rect=[0, 0, 1, 0.97])
plt.show()

print("分析: 从特征分布来看，大部分特征近似于正态分布，但尺度（scale）各不相同。有些特征（如f_1, f_15）的偏度较大。这表明对某些特征进行标准化或归一化可能对某些模型有益。")


# === 4. 相关性分析 ===
print("\n--- 相关性分析 ---")

# (1) 特征与目标 `y` 的相关性
feature_cols = [f'f_{i}' for i in range(28)]
correlations = train_df[feature_cols + ['y']].corr()['y'].drop('y')

plt.figure(figsize=(12, 8))
correlations.sort_values().plot(kind='barh')
plt.title('原始特征与目标y的斯皮尔曼相关系数', fontsize=16)
plt.xlabel('相关系数', fontsize=12)
plt.show()

print("分析: 可以看到，不同特征与目标y的相关性有正有负，且强度不一。这为特征选择提供了初步依据。我们可以优先考虑那些相关性绝对值较高的特征进行更复杂的衍生。")

# (2) 特征与特征之间的相关性
corr_matrix = train_df[feature_cols].corr()
plt.figure(figsize=(18, 15))
sns.heatmap(corr_matrix, cmap='viridis', annot=False)
plt.title('特征相关性热力图', fontsize=16)
plt.show()

print("分析: 热力图显示特征之间存在一定的相关性结构（一些小方块）。这意味着存在信息冗余。我们可以通过PCA降维，或者创建特征之间的比率、差值来提取更有效的信息。")


# === 5. 时间序列分析 ===
print("\n--- 时间序列分析 ---")
# 按日期计算平均收益率
mean_y_by_date = train_df.groupby('date')['y'].mean()

plt.figure(figsize=(15, 6))
mean_y_by_date.plot()
plt.title('每日平均目标收益率 (y) 的时间序列', fontsize=16)
plt.xlabel('日期', fontsize=12)
plt.ylabel('平均收益率', fontsize=12)
plt.show()

print("分析: 市场存在明显的波动性聚集现象，即某些时段市场整体波动较大。这表明，计算波动率相关的特征（如滚动标准差）可能会非常有效。")


# === 6. 新的特征工程思路与实现 ===
print("\n--- 基于EDA的高级特征工程策略 ---")
print("""
1.  **特征交互 (Interaction Features):**
    既然特征之间存在相关性，那么它们的组合可能包含新的信息。
    例如，可以创建特征的比率 (f_a / f_b) 或差值 (f_a - f_b)。这在量化中很常见，
    比如用一个动量因子除以一个波动率因子。

2.  **横截面排名 (Cross-Sectional Ranking):**
    这是量化策略中非常强大的一招！它不是看一个特征的绝对值，而是看它在
    当天所有股票中所处的相对位置。这可以消除市场的系统性影响，并使特征更稳定。
    我们可以创建一个自定义转换器来实现它。

3.  **更丰富的滚动特征 (Richer Rolling Features):**
    除了均值和标准差，我们还可以计算滚动窗口内的偏度(skew)和峰度(kurtosis)，
    这可以捕捉到近期收益分布形态的变化。

4.  **日期相关特征 (Date-based Features):**
    虽然在这个比赛中可能作用有限，但有时从日期中提取星期几(dayofweek)
    或月份(month)等信息可能会捕捉到微弱的周期性效应。
""")

# --- 示例：实现横截面排名转换器 ---
from sklearn.base import BaseEstimator, TransformerMixin

class CrossSectionalRank(BaseEstimator, TransformerMixin):
    def __init__(self, features_to_rank):
        self.features_to_rank = features_to_rank

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X_copy = X.copy()
        print("正在生成横截面排名特征...")
        for feature in self.features_to_rank:
            # 按日期分组，然后在每个组内对特征进行排名，并归一化到[-0.5, 0.5]
            X_copy[f'{feature}_rank'] = X_copy.groupby('date')[feature].rank(pct=True) - 0.5
        return X_copy

# --- 如何在你的Pipeline中使用它 ---
print("\n--- 如何将新特征整合到Pipeline中 ---")
print("""
# from feature_engineering_pipeline import feature_engineering_pipeline, LagFeatureGenerator, RollingWindowFeatureGenerator

# # 假设我们想对f_0到f_4进行排名
# features_to_process = [f'f_{i}' for i in range(5)]

# new_feature_pipeline = Pipeline(steps=[
#     ('lag_features', LagFeatureGenerator(features_to_lag=features_to_process, lag_periods=[1, 2, 3])),
#     ('rolling_features', RollingWindowFeatureGenerator(features_to_roll=features_to_process, window_sizes=[5, 10], aggregations=['mean', 'std'])),
#     ('rank_features', CrossSectionalRank(features_to_rank=features_to_process)) # <-- 在这里加入新步骤
# ])

# print("新的特征工程管道:")
# print(new_feature_pipeline)

# # 然后用这个新的pipeline去转换数据
# # all_df_transformed = new_feature_pipeline.transform(all_df)
""")

--- 已加载原始训练数据 ---

--- 数据基本信息 (info) ---
<class 'pandas.core.frame.DataFrame'>
Index: 4644139 entries, 1598 to 4642592
Data columns (total 31 columns):
 #   Column  Dtype  
---  ------  -----  
 0   code    object 
 1   date    int64  
 2   f_0     float64
 3   f_1     float64
 4   f_2     float64
 5   f_3     float64
 6   f_4     float64
 7   f_5     float64
 8   f_6     float64
 9   f_7     float64
 10  f_8     float64
 11  f_9     float64
 12  f_10    float64
 13  f_11    float64
 14  f_12    float64
 15  f_13    float64
 16  f_14    float64
 17  f_15    float64
 18  f_16    float64
 19  f_17    object 
 20  f_18    float64
 21  f_19    float64
 22  f_20    float64
 23  f_21    float64
 24  f_22    float64
 25  f_23    float64
 26  f_24    float64
 27  f_25    float64
 28  f_26    float64
 29  f_27    float64
 30  y       float64
dtypes: float64(28), int64(1), object(2)
memory usage: 1.1+ GB

--- 数据统计摘要 (describe) ---
               date           f_0           f_1           f_2    

In [3]:
# -特征工程管道 -*-
"""
特征工程管道 (v3 - Polars并行加速版)

说明:
本脚本是v2版本的性能优化版。我们引入了Polars库来并行处理
计算密集型的特征工程步骤（滚动窗口和横截面排名），从而
充分利用多核CPU，大幅提升处理速度。

核心改动：
1. 在转换器内部，将Pandas DataFrame转换为Polars DataFrame。
2. 使用Polars强大的表达式API进行并行计算。
3. 将计算结果转换回Pandas DataFrame，以保持与Scikit-learn管道的兼容性。

请先安装Polars:
pip install polars
"""

# === 1. 导入库和加载数据 ===
import pandas as pd
import polars as pl # 导入Polars
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
import os
import gc

# --- 加载原始数据 ---
try:
    DATA_PATH = './stocks-return-prediction'
    train_df = pd.read_pickle(os.path.join(DATA_PATH, 'train_data9.pkl'))
    test_df = pd.read_pickle(os.path.join(DATA_PATH, 'test_data9.pkl'))
    print("--- 成功加载原始训练和测试数据 ---")
except (NameError, FileNotFoundError):
    print("--- 未找到原始数据，请先运行数据加载脚本 ---")
    exit()


# === 2. 定义性能优化的特征转换器 (使用Polars) ===
print("\n--- 正在定义使用Polars加速的特征转换器 ---")

class DataCleaner(BaseEstimator, TransformerMixin):
    """
    一个用于数据清洗和内存优化的转换器，包含：
    1. 将指定列转换为数值类型。
    2. 将所有float64列降级为float32。
    3. 使用中位数填充所有列的NaN值。
    """
    def __init__(self, object_cols_to_numeric=None):
        self.object_cols_to_numeric = object_cols_to_numeric
        self.medians = None

    def fit(self, X, y=None):
        X_temp = X.copy()
        if self.object_cols_to_numeric:
            for col in self.object_cols_to_numeric:
                X_temp[col] = pd.to_numeric(X_temp[col], errors='coerce')

        # 降级精度
        float_cols = [c for c in X_temp.columns if X_temp[c].dtype == 'float64']
        for col in float_cols:
            X_temp[col] = X_temp[col].astype('float32')

        self.medians = X_temp.median(numeric_only=True)
        return self

    def transform(self, X):
        X_copy = X.copy()
        print("正在执行数据清洗和内存优化...")
        if self.object_cols_to_numeric:
            for col in self.object_cols_to_numeric:
                X_copy[col] = pd.to_numeric(X_copy[col], errors='coerce')

        float_cols = [c for c in X_copy.columns if X_copy[c].dtype == 'float64']
        if float_cols:
            print(f"正在转换 {len(float_cols)} 个浮点列为float32...")
            for col in float_cols:
                X_copy[col] = X_copy[col].astype('float32')

        X_copy.fillna(self.medians.astype('float32'), inplace=True)
        return X_copy

class FeatureInteraction(BaseEstimator, TransformerMixin):
    def __init__(self, interaction_pairs):
        self.interaction_pairs = interaction_pairs
    def fit(self, X, y=None): return self
    def transform(self, X):
        X_copy = X.copy()
        print("正在生成特征交互项...")
        for f1, f2, op in self.interaction_pairs:
            if op == 'ratio':
                new_col = (X_copy[f1] / (X_copy[f2] + 1e-6)).astype('float32')
                X_copy[f"{f1}_div_{f2}"] = new_col
            elif op == 'diff':
                new_col = (X_copy[f1] - X_copy[f2]).astype('float32')
                X_copy[f"{f1}_sub_{f2}"] = new_col
        return X_copy

class PolarsLagFeatureGenerator(BaseEstimator, TransformerMixin):
    def __init__(self, features_to_lag, lag_periods):
        self.features_to_lag = features_to_lag
        self.lag_periods = lag_periods
    def fit(self, X, y=None): return self
    def transform(self, X):
        print("正在生成滞后特征 (Polars)...")
        pl_df = pl.from_pandas(X).sort(['code', 'date'])

        lag_expressions = []
        for feature in self.features_to_lag:
            for lag in self.lag_periods:
                expr = pl.col(feature).shift(lag).over('code').cast(pl.Float32).alias(f"{feature}_lag_{lag}")
                lag_expressions.append(expr)

        pl_df = pl_df.with_columns(lag_expressions)
        return pl_df.to_pandas()

class PandasRollingWindowFeatureGenerator(BaseEstimator, TransformerMixin):
    def __init__(self, features_to_roll, window_sizes, aggregations):
        self.features_to_roll = features_to_roll
        self.window_sizes = window_sizes
        self.aggregations = aggregations
    def fit(self, X, y=None): return self
    def transform(self, X):
        X_copy = X.copy()
        print("正在生成滚动窗口特征 (Pandas - 稳定模式)...")
        grouped = X_copy.groupby('code')
        for feature in self.features_to_roll:
            for window in self.window_sizes:
                for agg in self.aggregations:
                    new_col_name = f"{feature}_rol_{window}_{agg}"
                    rolling_feature = grouped[feature].transform(
                        lambda s: s.rolling(window, min_periods=1).agg(agg)
                    ).astype('float32')
                    X_copy[new_col_name] = rolling_feature
        return X_copy

class PolarsCrossSectionalRank(BaseEstimator, TransformerMixin):
    def __init__(self, features_to_rank):
        self.features_to_rank = features_to_rank
    def fit(self, X, y=None): return self
    def transform(self, X):
        print("正在生成横截面排名特征 (Polars)...")
        pl_df = pl.from_pandas(X).sort(['date', 'code'])

        rank_expressions = []
        for feature in self.features_to_rank:
            expr = ((pl.col(feature).rank(method='ordinal').over('date') - 1) /
                    (pl.col(feature).count().over('date') - 1) - 0.5) \
                   .cast(pl.Float32).alias(f"{feature}_rank")
            rank_expressions.append(expr)

        pl_df = pl_df.with_columns(rank_expressions)
        return pl_df.to_pandas()


# === 3. 创建并配置新的并行化特征工程管道 ===
print("\n--- 正在创建Polars并行化特征工程管道 ---")

features_for_deep_eng = [
    'f_7', 'f_27', 'f_6', 'f_19', 'f_9',
    'f_15', 'f_16', 'f_11', 'f_5', 'f_22'
]
interaction_pairs = [
    ('f_7', 'f_15', 'ratio'), ('f_27', 'f_16', 'ratio'), ('f_6', 'f_11', 'diff')
]

feature_engineering_pipeline_v3 = Pipeline(steps=[
    ('cleaner', DataCleaner(object_cols_to_numeric=['f_17'])),
    ('lag_features', PolarsLagFeatureGenerator(features_to_lag=features_for_deep_eng, lag_periods=[1, 2, 3])),
    ('rolling_features', PandasRollingWindowFeatureGenerator(
        features_to_roll=features_for_deep_eng,
        window_sizes=[5, 10, 20],
        aggregations=['mean', 'std']
    )),
    ('rank_features', PolarsCrossSectionalRank(features_to_rank=features_for_deep_eng)),
    ('interaction_features', FeatureInteraction(interaction_pairs=interaction_pairs)),
])

print("混合动力管道创建成功:")
print(feature_engineering_pipeline_v3)


# === 4. 应用新管道到数据 ===
print("\n--- 正在应用新管道转换数据 ---")

all_df = pd.concat([train_df.drop('y', axis=1), test_df], ignore_index=True)
train_len = len(train_df) # 在删除前获取长度
y_original = train_df['y'].copy() # 提前保存y值
del train_df, test_df
gc.collect()

all_df_transformed = feature_engineering_pipeline_v3.fit_transform(all_df)
del all_df
gc.collect()

# 后续处理保持不变
all_df_transformed.fillna(0, inplace=True)

train_transformed_df = all_df_transformed.iloc[:train_len].copy()
test_transformed_df = all_df_transformed.iloc[train_len:].copy()
del all_df_transformed
gc.collect()

print("正在对齐y值...")
train_transformed_df['y'] = y_original.astype('float32').values
del y_original
gc.collect()

print("数据转换完成。")


# === 5. 查看转换后的数据 ===
print("\n--- 查看转换后的训练数据 ---")
print(f"转换后训练数据形状: {train_transformed_df.shape}")
print(train_transformed_df.info())


--- 成功加载原始训练和测试数据 ---

--- 正在定义使用Polars加速的特征转换器 ---

--- 正在创建Polars并行化特征工程管道 ---
混合动力管道创建成功:
Pipeline(steps=[('cleaner', DataCleaner(object_cols_to_numeric=['f_17'])),
                ('lag_features',
                 PolarsLagFeatureGenerator(features_to_lag=['f_7', 'f_27',
                                                            'f_6', 'f_19',
                                                            'f_9', 'f_15',
                                                            'f_16', 'f_11',
                                                            'f_5', 'f_22'],
                                           lag_periods=[1, 2, 3])),
                ('rolling_features',
                 PandasRollingWindowFeatureGenerator(aggregations=['mean',
                                                                   'std'],
                                                     features_to_roll=['f_7',
                                                                       'f_27',
          

In [4]:
# 10.5 模型训练管道 (v6 - 本地CPU最终版)
"""
模型训练管道 (v6 - 本地CPU最终版)

说明:
本脚本是为在本地计算机上运行而优化的最终版本。它不再进行超参数搜索，
而是直接使用我们已经找到的最优参数来训练模型并生成提交文件。

核心策略:
1.  **本地路径适配**: 所有文件路径均已更新为本地项目结构。
2.  **纯CPU训练**: 所有模型均配置为在CPU上运行，并会自动利用所有可用的CPU核心。
3.  **核心特征工程**: 坚持使用已被验证为最高效的截面排名 (Cross-Sectional Ranking)。
4.  **固化最优参数**: 直接使用LGBM, XGBoost, CatBoost的最优参数进行训练。
5.  **内存管理**: 保留了所有内存优化和错误处理的最佳实践。
"""

# === 1. 导入库和准备数据 ===
import pandas as pd
import numpy as np
import lightgbm as lgb
import xgboost as xgb
import catboost as cb
import os
import gc
import polars as pl
from tqdm.auto import tqdm
import traceback
import optuna
from joblib import parallel_backend

# === 2. 核心特征工程与数据准备 ===
print("--- 正在加载数据并执行截面排名特征工程 ---")

def preprocess_and_rank(df):
    """对原始DataFrame进行清洗和截面排名 (最稳定版本)"""
    for col in df.columns:
        if col not in ['code', 'date']:
            df[col] = pd.to_numeric(df[col], errors='coerce')

    float_cols = [c for c in df.columns if df[c].dtype == 'float64']
    for col in float_cols:
        df[col] = df[col].astype('float32')

    pl_df = pl.from_pandas(df)
    feature_cols = [col for col in df.columns if col.startswith('f_')]

    pl_df = pl_df.with_columns([
        (
            (pl.col(feature).rank(method='ordinal').over('date') - 1) /
            (pl.col(feature).count().over('date') - 1)
        ).alias(feature)
        for feature in feature_cols
    ])

    final_df = pl_df.to_pandas()
    final_df.fillna(0.5, inplace=True)
    return final_df

try:
    # *** 本地路径适配 ***
    DATA_PATH = './stocks-return-prediction'
    train_df_raw = pd.read_pickle(os.path.join(DATA_PATH, 'train_data9.pkl'))
    test_df_raw = pd.read_pickle(os.path.join(DATA_PATH, 'test_data9.pkl'))

    train_processed_df = preprocess_and_rank(train_df_raw.copy())
    test_processed_df = preprocess_and_rank(test_df_raw.copy())

    original_test_df_identifiers = test_df_raw[['code', 'date']].copy()

    del train_df_raw, test_df_raw
    gc.collect()
    print("截面排名特征工程完成。")

except Exception as e:
    print("!!!!!!!! 数据加载或特征工程失败 !!!!!!!!")
    print(f"错误类型: {type(e).__name__}")
    print(f"错误信息: {e}")
    traceback.print_exc()
    exit()

# --- 定义特征列和目标列 ---
features = [f'f_{i}' for i in range(28)]
target = 'y'
print(f"使用的特征数量: {len(features)}")


# === 3. 定义评估指标 (Rank IC) - Polars加速版 ===
def calculate_rank_ic_polars(y_true, y_pred, dates):
    df = pl.DataFrame({'y_true': y_true, 'y_pred': y_pred, 'date': dates})
    daily_ic = df.group_by('date', maintain_order=True).agg(
        pl.corr('y_true', 'y_pred', method='spearman').fill_nan(0.0).alias('ic')
    )
    return daily_ic['ic'].mean()


# === 4. 创建时间序列验证集 (带每日抽样) ===
print("\n--- 创建时间序列训练/验证集 ---")
unique_dates = sorted(train_processed_df['date'].unique())
split_date = unique_dates[-300]

train_split_df = train_processed_df[train_processed_df['date'] < split_date]
val_split_df = train_processed_df[train_processed_df['date'] >= split_date]

# *** NEW: Sample from each date for hyperparameter tuning ***
print(f"为加速调优，将在每个日期随机抽取最多200条数据。")
X_train_sample = train_split_df.groupby('date').apply(
    lambda x: x.sample(n=min(len(x), 1000), random_state=42)
).reset_index(drop=True)
y_train_sample = X_train_sample[target]
X_train_sample = X_train_sample[features]

X_val = val_split_df[features]
y_val = val_split_df[target]
val_dates_for_ic = val_split_df['date']

print(f"调优训练集大小: {X_train_sample.shape}, 验证集大小: {X_val.shape}")


# === 5. 使用Optuna进行超参数调优 ===
def objective(trial, model_name):
    n_jobs_per_trial = -1
    if model_name == 'lgbm':
        params = { 'random_state': 42, 'n_jobs': n_jobs_per_trial, 'verbose': -1, 'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.1), 'n_estimators': trial.suggest_int('n_estimators', 200, 1000, step=100), 'num_leaves': trial.suggest_int('num_leaves', 20, 150), 'max_depth': trial.suggest_int('max_depth', 5, 15), 'feature_fraction': trial.suggest_float('feature_fraction', 0.5, 1.0), 'bagging_fraction': trial.suggest_float('bagging_fraction', 0.5, 1.0) }
        model = lgb.LGBMRegressor(**params)
    elif model_name == 'xgb':
        params = { 'random_state': 42, 'n_jobs': n_jobs_per_trial, 'tree_method': 'hist', 'n_estimators': trial.suggest_int('n_estimators', 200, 1000, step=100), 'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.05), 'max_depth': trial.suggest_int('max_depth', 3, 8) }
        model = xgb.XGBRegressor(**params)
    elif model_name == 'catboost':
        params = { 'thread_count': n_jobs_per_trial, 'random_seed': 42, 'verbose': 0, 'allow_writing_files': False, 'iterations': trial.suggest_int('iterations', 200, 1000, step=100), 'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.05), 'depth': trial.suggest_int('depth', 4, 10) }
        model = cb.CatBoostRegressor(**params)

    model.fit(X_train_sample, y_train_sample)
    y_pred = model.predict(X_val)
    return calculate_rank_ic_polars(y_val, y_pred, val_dates_for_ic)

# --- 运行调优 ---
models_to_tune = ['lgbm', 'xgb', 'catboost']
best_params_all = {}
N_TRIALS = 15
N_JOBS_OPTUNA = -1

print(f"\n--- 开始对 {len(models_to_tune)} 个模型进行并行化超参数调优 (每个模型 {N_TRIALS} 次试验) ---")
for model_name in tqdm(models_to_tune, desc="Overall Tuning Progress"):
    print(f"\n--- 正在调优: {model_name.upper()} ---")
    study = optuna.create_study(direction='maximize')
    with parallel_backend('loky', n_jobs=N_JOBS_OPTUNA):
        study.optimize(lambda trial: objective(trial, model_name), n_trials=N_TRIALS, n_jobs=N_JOBS_OPTUNA, show_progress_bar=True)
    best_params_all[model_name] = study.best_params
    print(f"调优完成。最优Rank IC: {study.best_value:.6f}")
    print(f"最优参数: {study.best_params}")


# === 6. 训练最终模型并进行集成预测 ===
print("\n--- 使用最优参数在完整数据上训练最终模型 ---")
full_train_df = train_processed_df[train_processed_df['date'] < split_date]

print(f"完整训练数据过大 ({len(full_train_df)} 行), 将在每个日期随机抽取最多1000条数据进行最终训练。")
final_train_df = full_train_df.groupby('date').apply(
    lambda x: x.sample(n=min(len(x), 1000), random_state=42)
).reset_index(drop=True)
print(f"采样后的最终训练集大小为 {len(final_train_df)} 行。")

X_full_train = final_train_df[features]
y_full_train = final_train_df[target]
X_test = test_processed_df[features]

del train_processed_df, test_processed_df, full_train_df, final_train_df
gc.collect()

predictions = []
final_models_order = ['lgbm', 'xgb', 'catboost']
for model_name in tqdm(final_models_order, desc="Final Model Training"):
    if model_name not in best_params_all:
        continue

    params = best_params_all[model_name]
    print(f"正在训练最终的 {model_name.upper()} 模型...")
    try:
        if model_name == 'lgbm':
            params.update({'random_state': 42, 'n_jobs': -1, 'verbose': -1})
            model = lgb.LGBMRegressor(**params)
        elif model_name == 'xgb':
            params.update({'random_state': 42, 'n_jobs': -1, 'tree_method': 'hist'})
            model = xgb.XGBRegressor(**params)
        elif model_name == 'catboost':
            params.update({'thread_count': -1, 'random_seed': 42, 'verbose': 0, 'allow_writing_files': False})
            model = cb.CatBoostRegressor(**params)

        model.fit(X_full_train, y_full_train)
        predictions.append(model.predict(X_test))
        print(f"{model_name.upper()} 预测完成。")
    except Exception as e:
        print(f"!!!!!!!! 训练模型 {model_name.upper()} 失败: {e} !!!!!!!!")
        print("本次集成将跳过该模型。")


# === 7. 生成提交文件 ===
if not predictions:
    print("所有模型训练均失败，无法生成提交文件。")
else:
    ensemble_prediction = np.mean(predictions, axis=0)
    print("\n集成预测完成。")
    print("\n--- 正在生成提交文件 ---")

    submission_df = original_test_df_identifiers.copy()
    submission_df['y_pred'] = ensemble_prediction

    submission_df['id'] = range(len(submission_df))

    submission_df = submission_df[['id', 'code', 'date', 'y_pred']]

    submission_file = 'submission.csv'
    submission_df.to_csv(submission_file, index=False)
    print(f"提交文件 '{submission_file}' 已生成。")
    print(submission_df.head())

--- 正在加载数据并执行截面排名特征工程 ---
截面排名特征工程完成。
使用的特征数量: 28

--- 创建时间序列训练/验证集 ---
为加速调优，将在每个日期随机抽取最多200条数据。


  X_train_sample = train_split_df.groupby('date').apply(


调优训练集大小: (1391000, 28), 验证集大小: (1048233, 28)

--- 开始对 3 个模型进行并行化超参数调优 (每个模型 15 次试验) ---


Overall Tuning Progress:   0%|          | 0/3 [00:00<?, ?it/s]


--- 正在调优: LGBM ---


  0%|          | 0/15 [00:00<?, ?it/s]

调优完成。最优Rank IC: 0.116930
最优参数: {'learning_rate': 0.04596139206940063, 'n_estimators': 200, 'num_leaves': 21, 'max_depth': 6, 'feature_fraction': 0.6955973740152936, 'bagging_fraction': 0.5835145377028257}

--- 正在调优: XGB ---


  0%|          | 0/15 [00:00<?, ?it/s]

调优完成。最优Rank IC: 0.116576
最优参数: {'n_estimators': 400, 'learning_rate': 0.02911348329537932, 'max_depth': 4}

--- 正在调优: CATBOOST ---


  0%|          | 0/15 [00:00<?, ?it/s]

调优完成。最优Rank IC: 0.117536
最优参数: {'iterations': 500, 'learning_rate': 0.042015955854694455, 'depth': 4}

--- 使用最优参数在完整数据上训练最终模型 ---
完整训练数据过大 (3595906 行), 将在每个日期随机抽取最多1000条数据进行最终训练。


  final_train_df = full_train_df.groupby('date').apply(


采样后的最终训练集大小为 1391000 行。


Final Model Training:   0%|          | 0/3 [00:00<?, ?it/s]

正在训练最终的 LGBM 模型...
LGBM 预测完成。
正在训练最终的 XGB 模型...
XGB 预测完成。
正在训练最终的 CATBOOST 模型...
CATBOOST 预测完成。

集成预测完成。

--- 正在生成提交文件 ---
提交文件 'submission.csv' 已生成。
         id    code  date    y_pred
4647483   0  s_2554  1691 -0.002699
4646572   1  s_1612  1691  0.002016
4646612   2  s_1911  1691  0.002158
4646498   3   s_814  1691 -0.002218
4647458   4   s_325  1691 -0.000881


In [1]:
# 9.2 === 模型训练管道 v12.1 (最终索引修复版) ===
import pandas as pd
import numpy as np
import xgboost as xgb
import os
import gc
import polars as pl
import warnings

# Scikit-learn & Scikit-optimize Imports
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import MinMaxScaler
from skopt import BayesSearchCV

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

# --- 1. 评估指标与评分函数 ---

def calculate_rank_ic_polars(y_true, y_pred, dates):
    """使用Polars高效计算Rank IC"""
    if isinstance(y_true, pd.Series): y_true = y_true.to_numpy()
    if isinstance(y_pred, pd.Series): y_pred = y_pred.to_numpy()
    if isinstance(dates, pd.Series): dates = dates.to_numpy()

    df = pl.DataFrame({'y_true': y_true, 'y_pred': y_pred, 'date': dates})
    daily_ic = df.group_by('date', maintain_order=True).agg(
        pl.corr('y_true', 'y_pred', method='spearman').fill_nan(0.0).alias('ic')
    )
    return daily_ic['ic'].mean()

class RankICScorer:
    """(v1.2) 一个与scikit-learn兼容的自定义评分器 (最终索引修复版)"""
    def __init__(self, dates: pd.Series):
        # 核心修复: 移除 reset_index，保留原始索引以进行正确的匹配
        self.dates = dates

    def __call__(self, estimator, X, y_true) -> float:
        y_pred = estimator.predict(X)
        # 现在 y_true.index 可以正确地在 self.dates.index 中找到匹配项
        fold_dates = self.dates.loc[y_true.index]
        rank_ic = calculate_rank_ic_polars(y_true, y_pred, fold_dates)
        print(f"[RankICScorer] Fold Rank IC = {rank_ic:.6f}")
        return rank_ic if np.isfinite(rank_ic) else 0.0

# --- 2. 初始截面排名 (保持不变) ---
def preprocess_and_rank(df):
    """初始预处理，使用Polars进行截面排名"""
    for col in df.columns:
        if col not in ['code', 'date']:
            df[col] = pd.to_numeric(df[col], errors='coerce')
    float_cols = [c for c in df.columns if df[c].dtype == 'float64']
    for col in float_cols:
        df[col] = df[col].astype('float32')
    pl_df = pl.from_pandas(df)
    feature_cols = [col for col in df.columns if col.startswith('f_')]
    pl_df = pl_df.with_columns([
        ((pl.col(feature).rank(method='ordinal').over('date') - 1) / (pl.col(feature).count().over('date') - 1)).alias(feature)
        for feature in feature_cols
    ])
    final_df = pl_df.to_pandas()
    final_df.fillna(0.5, inplace=True)
    return final_df

# --- 3. 定义Pipeline组件 ---

class FeatureEngineeringTransformerPolars(BaseEstimator, TransformerMixin):
    """(v4.5) 高级特征工程 (Polars最终修复版)."""
    def __init__(self):
        self.base_price_features = [f'f_{i}' for i in [0, 3, 6, 9, 12, 15, 18, 21, 24]]
        self.base_size_features = [f'f_{i}' for i in [1, 4, 7, 10, 13, 16, 19, 22, 25]]
        self.base_features = self.base_price_features + self.base_size_features

    def fit(self, X, y=None): return self

    def transform(self, X):
        pl_X = pl.from_pandas(X)

        value_expressions = [(pl.col(p_col) * pl.col(s_col)).alias(f'value_{p_col}') for p_col, s_col in zip(self.base_price_features, self.base_size_features)]
        pl_X = pl_X.with_columns(value_expressions)

        vwap_value_cols = [f'value_f_{i}' for i in [0, 3, 6, 9, 12, 15, 18, 21, 24]]
        sum_sizes = pl.sum_horizontal(pl.col(c) for c in self.base_size_features)
        pl_X = pl_X.with_columns(
            (pl.sum_horizontal(pl.col(c) for c in vwap_value_cols) / pl.when(sum_sizes == 0).then(None).otherwise(sum_sizes)).alias('vwap')
        )

        final_expressions = [(pl.col('f_0') / pl.when(pl.col('vwap') == 0).then(None).otherwise(pl.col('vwap')) - 1).alias('price_to_vwap_ratio')]

        for window in [5, 10, 21]:
            final_expressions.append(pl.col('f_0').rolling_std(window, min_periods=int(window*0.8)).over('code').alias(f'volatility_{window}'))
        for window in [1, 2, 5, 10]:
            shifted_price = pl.col('f_0').shift(window).over('code')
            final_expressions.append((pl.col('f_0') / pl.when(shifted_price == 0).then(None).otherwise(shifted_price) - 1).alias(f'momentum_{window}'))
        for col in self.base_features:
            final_expressions.append((pl.col(col) - pl.col(col).mean().over('date')).alias(f'{col}_market_diff'))

        X_with_features = pl_X.with_columns(final_expressions)

        X_filled = X_with_features.fill_nan(0).fill_null(0)

        float_cols = [c.name for c in X_filled if c.dtype in pl.FLOAT_DTYPES]
        X_cleaned = X_filled.with_columns([
            pl.when(pl.col(c).is_infinite()).then(pl.lit(0.0)).otherwise(pl.col(c)).alias(c)
            for c in float_cols
        ])

        X_pd = X_cleaned.to_pandas()
        X_pd.index = X.index # 确保原始索引被保留

        feature_cols = [c for c in X_pd.columns if X_pd[c].dtype in [np.float32, np.float64]]
        return X_pd[feature_cols]

# --- 4. 主执行流程 ---
def main():
    print("--- 正在加载数据 ---")
    DATA_PATH = './stocks-return-prediction'
    train_df_raw = pd.read_pickle(os.path.join(DATA_PATH, 'train_data9.pkl'))
    test_df_raw = pd.read_pickle(os.path.join(DATA_PATH, 'test_data9.pkl'))

    print("\n--- 1. 初始预处理 ---")
    train_df = preprocess_and_rank(train_df_raw.copy())
    test_df = preprocess_and_rank(test_df_raw.copy())
    train_df.sort_values(by=['date', 'code'], inplace=True, ignore_index=True)
    test_df.sort_values(by=['date', 'code'], inplace=True, ignore_index=True)

    print("\n--- 2. 使用稳健时间窗口划分数据 ---")
    unique_dates = sorted(train_df['date'].unique())
    val_start_date = unique_dates[-100]
    search_start_date = unique_dates[-300]

    val_df = train_df[train_df['date'] >= val_start_date]
    search_df = train_df[(train_df['date'] >= search_start_date) & (train_df['date'] < val_start_date)]
    feat_sel_df = train_df[train_df['date'] < search_start_date]

    print(f"特征筛选集: {feat_sel_df['date'].min()} - {feat_sel_df['date'].max()} ({feat_sel_df['date'].nunique()} 天)")
    print(f"超参数搜索集: {search_df['date'].min()} - {search_df['date'].max()} ({search_df['date'].nunique()} 天)")
    print(f"最终验证集: {val_df['date'].min()} - {val_df['date'].max()} ({val_df['date'].nunique()} 天)")

    del train_df_raw
    gc.collect()

    print("\n--- 3. 开始特征筛选 ---")
    feature_generator = FeatureEngineeringTransformerPolars()
    print("为筛选生成所有高级特征...")
    X_eng_all = feature_generator.transform(feat_sel_df.drop(columns=['y']))
    y_eng_all = feat_sel_df['y']

    print("训练基础XGBoost模型以评估特征重要性...")
    selector_model = xgb.XGBRegressor(random_state=42, tree_method='gpu_hist', n_jobs=1)
    selector_model.fit(X_eng_all, y_eng_all)

    importances = pd.Series(selector_model.feature_importances_, index=X_eng_all.columns)
    TOP_100_FEATURES = importances.nlargest(100).index.tolist()

    print(f"特征筛选完成，选出最重要的100个特征。")
    del X_eng_all, y_eng_all, selector_model, importances
    gc.collect()

    print("\n--- 4. 对所有数据进行一次性特征工程 ---")
    final_train_df = pd.concat([feat_sel_df, search_df])

    print("处理最终训练集...")
    X_train_final = feature_generator.transform(final_train_df.drop(columns=['y']))[TOP_100_FEATURES]
    y_train_final = final_train_df['y']

    print("处理验证集...")
    X_val_final = feature_generator.transform(val_df.drop(columns=['y']))[TOP_100_FEATURES]
    y_val_final = val_df['y']
    val_dates_final = val_df['date']

    print("处理测试集...")
    X_test_final = feature_generator.transform(test_df)[TOP_100_FEATURES]

    del feat_sel_df, final_train_df, val_df, train_df
    gc.collect()

    pipeline = Pipeline([
        ('scaler', MinMaxScaler()),
        ('model', xgb.XGBRegressor(random_state=42, tree_method='gpu_hist', n_jobs=1))
    ])

    search_spaces = {
        'model__n_estimators': (500, 2000),
        'model__learning_rate': (0.005, 0.05, 'log-uniform'),
        'model__max_depth': (3, 8),
        'model__subsample': (0.6, 1.0, 'uniform'),
        'model__colsample_bytree': (0.6, 1.0, 'uniform'),
        'model__gamma': (0.0, 1.0, 'uniform'),
        'model__min_child_weight': (1, 30),
        'model__reg_alpha': (1e-2, 1.0, 'log-uniform'),
        'model__reg_lambda': (1e-2, 1.0, 'log-uniform'),
    }

    X_search = feature_generator.transform(search_df.drop(columns=['y']))[TOP_100_FEATURES]
    y_search = search_df['y']

    tscv = TimeSeriesSplit(n_splits=3)
    rank_ic_scorer = RankICScorer(dates=search_df['date'])

    search = BayesSearchCV(
        estimator=pipeline,
        search_spaces=search_spaces,
        n_iter=30,
        scoring=rank_ic_scorer,
        cv=tscv,
        random_state=42,
        n_jobs=1,
        verbose=2
    )

    print(f"\n--- 5. 开始使用贝叶斯优化进行超参数搜索 (数据量: {len(X_search)}) ---")
    search.fit(X_search, y_search)

    print("\n--- 超参数搜索完成 ---")
    print(f"最佳参数: {search.best_params_}")
    print(f"最佳交叉验证得分 (Rank IC): {search.best_score_:.6f}")

    best_pipeline = search.best_estimator_

    print("\n--- 6. 在所有历史数据上训练最终模型 ---")
    best_pipeline.fit(X_train_final, y_train_final)
    print("最终模型训练完成。")

    print("\n--- 7. 在独立验证集上评估最终模型 ---")
    y_pred_val = best_pipeline.predict(X_val_final)
    final_rank_ic = calculate_rank_ic_polars(y_val_final, y_pred_val, val_dates_final)
    print(f"\n独立验证集上的最终Rank IC: {final_rank_ic:.6f}")

    print("\n--- 8. 生成最终提交文件 ---")
    predictions = best_pipeline.predict(X_test_final)

    submission_df = test_df[['code', 'date']].copy()
    submission_df['y_pred'] = predictions
    submission_df.reset_index(inplace=True)
    submission_df.rename(columns={'index': 'id'}, inplace=True)

    submission_df = submission_df[['id', 'code', 'date', 'y_pred']]
    submission_df.to_csv('submission_pipeline_v12.1.csv', index=False)
    print("提交文件 'submission_pipeline_v12.1.csv' 已生成.")
    print(submission_df.head())

if __name__ == '__main__':
    main()


--- 正在加载数据 ---

--- 1. 初始预处理 ---

--- 2. 使用稳健时间窗口划分数据 ---
特征筛选集: 0 - 1390 (1391 天)
超参数搜索集: 1391 - 1590 (200 天)
最终验证集: 1591 - 1690 (100 天)

--- 3. 开始特征筛选 ---
为筛选生成所有高级特征...
训练基础XGBoost模型以评估特征重要性...
特征筛选完成，选出最重要的100个特征。

--- 4. 对所有数据进行一次性特征工程 ---
处理最终训练集...
处理验证集...
处理测试集...

--- 5. 开始使用贝叶斯优化进行超参数搜索 (数据量: 693510) ---
Fitting 3 folds for each of 1 candidates, totalling 3 fits
[RankICScorer] Fold Rank IC = 0.058763
[CV] END model__colsample_bytree=0.7640415835413256, model__gamma=0.7277257431773251, model__learning_rate=0.04283886967006358, model__max_depth=5, model__min_child_weight=20, model__n_estimators=1121, model__reg_alpha=0.050334141977735516, model__reg_lambda=0.30130647758680273, model__subsample=0.7217853244146024; total time=  10.6s
[RankICScorer] Fold Rank IC = 0.078366
[CV] END model__colsample_bytree=0.7640415835413256, model__gamma=0.7277257431773251, model__learning_rate=0.04283886967006358, model__max_depth=5, model__min_child_weight=20, model__n_estimators=1121, model__r

In [1]:
# 9.5 === 模型训练管道 v13.1 (三驾马车集成版) ===
import pandas as pd
import numpy as np
import xgboost as xgb
import lightgbm as lgb
import catboost as cb
import os
import gc
import polars as pl
import warnings

# Scikit-learn & Scikit-optimize Imports
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import MinMaxScaler
from skopt import BayesSearchCV

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

# --- 1. 评估指标与评分函数 ---

def calculate_rank_ic_polars(y_true, y_pred, dates):
    """使用Polars高效计算Rank IC"""
    if isinstance(y_true, pd.Series): y_true = y_true.to_numpy()
    if isinstance(y_pred, pd.Series): y_pred = y_pred.to_numpy()
    if isinstance(dates, pd.Series): dates = dates.to_numpy()

    df = pl.DataFrame({'y_true': y_true, 'y_pred': y_pred, 'date': dates})
    daily_ic = df.group_by('date', maintain_order=True).agg(
        pl.corr('y_true', 'y_pred', method='spearman').fill_nan(0.0).alias('ic')
    )
    return daily_ic['ic'].mean()

class RankICScorer:
    """(v1.2) 一个与scikit-learn兼容的自定义评分器"""
    def __init__(self, dates: pd.Series):
        self.dates = dates

    def __call__(self, estimator, X, y_true) -> float:
        y_pred = estimator.predict(X)
        fold_dates = self.dates.loc[y_true.index]
        rank_ic = calculate_rank_ic_polars(y_true, y_pred, fold_dates)
        print(f"[RankICScorer] Fold Rank IC = {rank_ic:.6f}")
        return rank_ic if np.isfinite(rank_ic) else 0.0

# --- 2. 核心特征工程: 截面排名 ---
def preprocess_and_rank(df):
    """初始预处理，只使用最核心的截面排名特征"""
    for col in df.columns:
        if col not in ['code', 'date']:
            df[col] = pd.to_numeric(df[col], errors='coerce')
    float_cols = [c for c in df.columns if df[c].dtype == 'float64']
    for col in float_cols:
        df[col] = df[col].astype('float32')
    pl_df = pl.from_pandas(df)
    feature_cols = [col for col in df.columns if col.startswith('f_')]
    pl_df = pl_df.with_columns([
        ((pl.col(feature).rank(method='ordinal').over('date') - 1) / (pl.col(feature).count().over('date') - 1)).alias(feature)
        for feature in feature_cols
    ])
    final_df = pl_df.to_pandas()
    final_df.fillna(0.5, inplace=True)
    return final_df

# --- 3. 主执行流程 ---
def main():
    print("--- 正在加载数据 ---")
    DATA_PATH = './stocks-return-prediction'
    train_df_raw = pd.read_pickle(os.path.join(DATA_PATH, 'train_data9.pkl'))
    test_df_raw = pd.read_pickle(os.path.join(DATA_PATH, 'test_data9.pkl'))

    print("\n--- 1. 核心特征工程 ---")
    train_df = preprocess_and_rank(train_df_raw.copy())
    test_df = preprocess_and_rank(test_df_raw.copy())
    train_df.sort_values(by=['date', 'code'], inplace=True, ignore_index=True)
    test_df.sort_values(by=['date', 'code'], inplace=True, ignore_index=True)

    del train_df_raw
    gc.collect()

    print("\n--- 2. 使用稳健时间窗口划分数据 ---")
    unique_dates = sorted(train_df['date'].unique())
    val_start_date = unique_dates[-200]
    # 最终训练和搜索都用近500天数据，以提供更多信息
    search_start_date = unique_dates[-500]

    val_df = train_df[train_df['date'] >= val_start_date]
    search_df = train_df[train_df['date'] >= search_start_date]

    print(f"超参数搜索与最终训练集: {search_df['date'].min()} - {search_df['date'].max()} ({search_df['date'].nunique()} 天)")
    print(f"最终验证集: {val_df['date'].min()} - {val_df['date'].max()} ({val_df['date'].nunique()} 天)")

    features = [f'f_{i}' for i in range(28)]

    # --- 3. 独立调优三大模型 ---
    models_to_tune = ['lgbm', 'xgb', 'catboost']
    best_params_all = {}

    for model_name in models_to_tune:
        print(f"\n--- 3.{models_to_tune.index(model_name)+1} 开始为 {model_name.upper()} 进行贝叶斯优化 ---")

        if model_name == 'lgbm':
            model = lgb.LGBMRegressor(random_state=42, n_jobs=-1, verbose=-1)
            search_spaces = {
                'n_estimators': (500, 2000), 'learning_rate': (0.005, 0.05, 'log-uniform'),
                'num_leaves': (20, 100), 'max_depth': (5, 15),
                'feature_fraction': (0.6, 1.0, 'uniform'), 'bagging_fraction': (0.6, 1.0, 'uniform'),
            }
        elif model_name == 'xgb':
            model = xgb.XGBRegressor(random_state=42, tree_method='gpu_hist', n_jobs=1)
            search_spaces = {
                'n_estimators': (500, 2000), 'learning_rate': (0.005, 0.05, 'log-uniform'),
                'max_depth': (3, 8), 'subsample': (0.6, 1.0, 'uniform'),
                'colsample_bytree': (0.6, 1.0, 'uniform'), 'min_child_weight': (1, 30),
            }
        elif model_name == 'catboost':
            model = cb.CatBoostRegressor(random_state=42, verbose=0, thread_count=-1, allow_writing_files=False)
            search_spaces = {
                'iterations': (500, 2000), 'learning_rate': (0.005, 0.05, 'log-uniform'),
                'depth': (4, 10), 'l2_leaf_reg': (1, 10, 'uniform'),
            }

        pipeline = Pipeline([('scaler', MinMaxScaler()), ('model', model)])

        prefixed_search_spaces = {f'model__{k}': v for k, v in search_spaces.items()}

        tscv = TimeSeriesSplit(n_splits=3)
        rank_ic_scorer = RankICScorer(dates=search_df['date'])

        search = BayesSearchCV(
            estimator=pipeline, search_spaces=prefixed_search_spaces,
            n_iter=15, scoring=rank_ic_scorer, cv=tscv,
            random_state=42, n_jobs=1, verbose=2
        )

        search.fit(search_df[features], search_df['y'])
        best_params_all[model_name] = {k.replace('model__', ''): v for k, v in search.best_params_.items()}
        print(f"--- {model_name.upper()} 调优完成 ---")
        print(f"最佳交叉验证得分 (Rank IC): {search.best_score_:.6f}")
        print(f"最佳参数: {best_params_all[model_name]}")

    # --- 4. 训练最终模型并进行集成预测 ---
    print("\n--- 4. 使用最优参数在最终训练集上训练三驾马车模型 ---")

    X_train_final = search_df[features]
    y_train_final = search_df['y']
    X_val_final = val_df[features]
    y_val_final = val_df['y']
    X_test_final = test_df[features]

    predictions = []
    for model_name in models_to_tune:
        print(f"正在训练最终的 {model_name.upper()} 模型...")
        params = best_params_all[model_name]

        if model_name == 'lgbm':
            model = lgb.LGBMRegressor(random_state=42, n_jobs=-1, verbose=-1, **params)
        elif model_name == 'xgb':
            model = xgb.XGBRegressor(random_state=42, tree_method='gpu_hist', n_jobs=-1, **params)
        elif model_name == 'catboost':
            model = cb.CatBoostRegressor(random_state=42, verbose=0, thread_count=-1, allow_writing_files=False, **params)

        pipeline = Pipeline([('scaler', MinMaxScaler()), ('model', model)])
        pipeline.fit(X_train_final, y_train_final)
        predictions.append(pipeline.predict(X_test_final))
        print(f"{model_name.upper()} 预测完成。")

    # --- 5. 在验证集上评估并生成提交文件 ---
    print("\n--- 5. 在独立验证集上评估最终集成模型 ---")
    val_predictions = []
    # 重新在训练集上训练，并在验证集上预测，以进行公正的评估
    for model_name in models_to_tune:
        params = best_params_all[model_name]
        if model_name == 'lgbm': model = lgb.LGBMRegressor(random_state=42, n_jobs=-1, verbose=-1, **params)
        elif model_name == 'xgb': model = xgb.XGBRegressor(random_state=42, tree_method='gpu_hist', n_jobs=-1, **params)
        elif model_name == 'catboost': model = cb.CatBoostRegressor(random_state=42, verbose=0, thread_count=-1, allow_writing_files=False, **params)
        pipeline = Pipeline([('scaler', MinMaxScaler()), ('model', model)])
        pipeline.fit(X_train_final, y_train_final)
        val_predictions.append(pipeline.predict(X_val_final))

    ensemble_val_pred = np.mean(val_predictions, axis=0)
    final_rank_ic = calculate_rank_ic_polars(y_val_final, ensemble_val_pred, val_df['date'])
    print(f"\n独立验证集上的最终Rank IC: {final_rank_ic:.6f}")

    print("\n--- 6. 生成最终提交文件 ---")
    ensemble_prediction = np.mean(predictions, axis=0)

    submission_df = test_df[['code', 'date']].copy()
    submission_df['y_pred'] = ensemble_prediction
    submission_df.reset_index(inplace=True)
    submission_df.rename(columns={'index': 'id'}, inplace=True)

    submission_df = submission_df[['id', 'code', 'date', 'y_pred']]
    submission_df.to_csv('submission_pipeline_v13.1.csv', index=False)
    print("提交文件 'submission_pipeline_v13.1.csv' 已生成.")
    print(submission_df.head())

if __name__ == '__main__':
    main()


--- 正在加载数据 ---

--- 1. 核心特征工程 ---

--- 2. 使用稳健时间窗口划分数据 ---
超参数搜索与最终训练集: 1191 - 1690 (500 天)
最终验证集: 1491 - 1690 (200 天)

--- 3.1 开始为 LGBM 进行贝叶斯优化 ---
Fitting 3 folds for each of 1 candidates, totalling 3 fits
[RankICScorer] Fold Rank IC = 0.030189
[CV] END model__bagging_fraction=0.7640415835413256, model__feature_fraction=0.89109029727093, model__learning_rate=0.04283886967006358, model__max_depth=8, model__n_estimators=1505, model__num_leaves=53; total time=  25.1s
[RankICScorer] Fold Rank IC = 0.106550
[CV] END model__bagging_fraction=0.7640415835413256, model__feature_fraction=0.89109029727093, model__learning_rate=0.04283886967006358, model__max_depth=8, model__n_estimators=1505, model__num_leaves=53; total time=  32.2s
[RankICScorer] Fold Rank IC = 0.074066
[CV] END model__bagging_fraction=0.7640415835413256, model__feature_fraction=0.89109029727093, model__learning_rate=0.04283886967006358, model__max_depth=8, model__n_estimators=1505, model__num_leaves=53; total time= 1.2min
Fit

In [2]:
# 0.9 === 模型训练管道 v22.1 (错误修复与跳过搜索版) ===
# 核心变更:
# 1. (修复) 为CatBoost模型添加 bootstrap_type='MVS'，以解决与 subsample 参数的兼容性问题。
# 2. (优化) 硬编码已知的XGBoost最优参数，并在本次运行中跳过其超参数搜索过程以节省时间。

import pandas as pd
import numpy as np
import xgboost as xgb
import lightgbm as lgb
import catboost as cb
import os
import gc
import polars as pl
import warnings

# Scikit-learn & Scikit-optimize Imports
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import MinMaxScaler
from skopt import BayesSearchCV
from scipy.optimize import minimize

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

def calculate_rank_ic_polars(y_true, y_pred, dates):
    """使用Polars高效计算Rank IC"""
    if isinstance(y_true, pd.Series): y_true = y_true.to_numpy()
    if isinstance(y_pred, pd.Series): y_pred = y_pred.to_numpy()
    if isinstance(dates, pd.Series): dates = dates.to_numpy()

    df = pl.DataFrame({'y_true': y_true, 'y_pred': y_pred, 'date': dates})
    daily_ic = df.group_by('date', maintain_order=True).agg(
        pl.corr('y_true', 'y_pred', method='spearman').fill_nan(0.0).alias('ic')
    )
    return daily_ic['ic'].mean()

class RankICScorer:
    """一个与scikit-learn兼容的自定义评分器"""
    def __init__(self, dates: pd.Series):
        self.dates = dates.reset_index(drop=True)

    def __call__(self, estimator, X, y_true) -> float:
        fold_dates = self.dates.iloc[X.index]
        y_pred = estimator.predict(X)
        rank_ic = calculate_rank_ic_polars(y_true, y_pred, fold_dates)
        print(f"[RankICScorer] Fold Rank IC = {rank_ic:.6f}")
        return rank_ic if np.isfinite(rank_ic) else 0.0

# --- 自定义重叠滑动窗口交叉验证 ---
def sliding_window_cv(dates, n_splits=3, train_days=300, test_days=200, step_days=100):
    """生成重叠的、向前滚动的滑动窗口交叉验证索引"""
    unique_dates = sorted(dates.unique())
    last_split_test_end_idx = (n_splits - 1) * step_days + train_days + test_days

    if len(unique_dates) < last_split_test_end_idx:
        raise ValueError(
            f"数据不足以创建{n_splits}个重叠滑动窗口。"
            f"需要 {last_split_test_end_idx} 天, 但只有 {len(unique_dates)} 天。"
        )

    print(f"--- 创建重叠滑动窗口 (训练={train_days}天, 验证={test_days}天, 步长={step_days}天, 共{n_splits}折) ---")
    for i in range(n_splits):
        train_start_idx = i * step_days
        train_end_idx = train_start_idx + train_days
        test_start_idx = train_end_idx
        test_end_idx = test_start_idx + test_days
        train_dates = unique_dates[train_start_idx : train_end_idx]
        test_dates = unique_dates[test_start_idx : test_end_idx]
        print(f"Fold {i+1}: Train on {train_dates[0]} to {train_dates[-1]}, "
              f"Validate on {test_dates[0]} to {test_dates[-1]}")
        train_mask = dates.isin(train_dates)
        test_mask = dates.isin(test_dates)
        yield dates[train_mask].index.to_numpy(), dates[test_mask].index.to_numpy()

# --- 1. 核心特征工程 ---
def preprocess_and_rank(df):
    """截面排名特征"""
    for col in df.columns:
        if col not in ['code', 'date']:
            df[col] = pd.to_numeric(df[col], errors='coerce')
    float_cols = [c for c in df.columns if df[c].dtype == 'float64']
    for col in float_cols:
        df[col] = df[col].astype('float32')
    pl_df = pl.from_pandas(df)
    feature_cols = [col for col in df.columns if col.startswith('f_')]
    pl_df = pl_df.with_columns([
        ((pl.col(feature).rank(method='ordinal').over('date') - 1) / (pl.col(feature).count().over('date') - 1)).alias(feature)
        for feature in feature_cols
    ])
    final_df = pl_df.to_pandas()
    final_df.fillna(0.5, inplace=True)
    return final_df

# --- 2. 主执行流程 ---
def main():
    print("--- 正在加载数据 ---")
    DATA_PATH = './stocks-return-prediction'
    train_df_raw = pd.read_pickle(os.path.join(DATA_PATH, 'train_data9.pkl'))
    test_df_raw = pd.read_pickle(os.path.join(DATA_PATH, 'test_data9.pkl'))

    print("\n--- 1. 核心特征工程 (仅截面排名) ---")
    train_df = preprocess_and_rank(train_df_raw.copy())
    test_df = preprocess_and_rank(test_df_raw.copy())
    train_df.sort_values(by=['date', 'code'], inplace=True, ignore_index=True)
    test_df.sort_values(by=['date', 'code'], inplace=True, ignore_index=True)
    del train_df_raw
    gc.collect()

    print("\n--- 2. 数据集划分 ---")
    unique_dates = sorted(train_df['date'].unique())

    # 最终训练集：最新的300天
    train_final_start_date = unique_dates[-300]
    train_final_df = train_df[train_df['date'] >= train_final_start_date].copy()

    # 超参数搜索集：最新的700天
    search_start_date = unique_dates[-700]
    search_df = train_df[train_df['date'] >= search_start_date].copy()

    print(f"超参数搜索集: {search_df['date'].min()} - {search_df['date'].max()} ({search_df['date'].nunique()} 天)")
    print(f"最终训练集: {train_final_df['date'].min()} - {train_final_df['date'].max()} ({train_final_df['date'].nunique()} 天)")

    features = [col for col in train_df.columns if col.startswith('f_')]
    print(f"\n--- 3. 使用全部 {len(features)} 个初始特征进行训练 ---")

    # --- 4. 独立调优三驾马车模型 (扩展搜索空间) ---
    search_df.reset_index(drop=True, inplace=True)
    X_search, y_search = search_df[features], search_df['y']
    custom_cv = list(sliding_window_cv(search_df['date'], n_splits=3, train_days=300, test_days=200, step_days=100))
    rank_ic_scorer = RankICScorer(dates=search_df['date'])

    # (优化) 硬编码XGBoost参数并跳过搜索
    best_params_all = {
        'xgb': {'colsample_bytree': 0.6, 'learning_rate': 0.012428019985097712, 'max_depth': 3, 'min_child_weight': 20, 'n_estimators': 1200, 'subsample': 0.6}
    }
    print("\n--- 使用已知的XGBoost最佳参数，跳过其搜索过程 ---")
    print(f"XGBoost Params: {best_params_all['xgb']}")

    # 本次运行只搜索CatBoost和LGBM
    models_to_tune = ['catboost', 'lgbm']

    for model_name in models_to_tune:
        print(f"\n--- 4.{models_to_tune.index(model_name)+1} 开始为 {model_name.upper()} 进行贝叶斯优化 ---")
        if model_name == 'xgb': # 这部分代码保留，但不会被执行
            model = xgb.XGBRegressor(random_state=42, tree_method='gpu_hist', n_jobs=-1)
            search_spaces = {
                'n_estimators': (500, 2000), 'learning_rate': (0.005, 0.05, 'log-uniform'), 'max_depth': (3, 10),
                'subsample': (0.6, 1.0, 'uniform'), 'colsample_bytree': (0.6, 1.0, 'uniform'), 'min_child_weight': (1, 30)
            }
        elif model_name == 'catboost':
            # (修复) 添加 bootstrap_type='MVS' 以兼容 subsample
            model = cb.CatBoostRegressor(random_state=42, verbose=0, allow_writing_files=False, task_type='GPU', bootstrap_type='MVS')
            search_spaces = {
                'iterations': (100, 1200), 'learning_rate': (0.005, 0.05, 'log-uniform'), 'depth': (4, 10),
                'l2_leaf_reg': (1, 10, 'uniform'), 'subsample': (0.6, 1.0, 'uniform')
            }
        elif model_name == 'lgbm':
            model = lgb.LGBMRegressor(random_state=42, device='gpu', n_jobs=-1, verbose=-1)
            search_spaces = {
                'n_estimators': (100, 1200), 'learning_rate': (0.005, 0.05, 'log-uniform'), 'num_leaves': (20, 150),
                'feature_fraction': (0.6, 1.0, 'uniform'), 'bagging_fraction': (0.6, 1.0, 'uniform'),
                'lambda_l1': (1e-8, 10.0, 'log-uniform'), 'lambda_l2': (1e-8, 10.0, 'log-uniform')
            }

        pipeline = Pipeline([('scaler', MinMaxScaler()), ('model', model)])
        prefixed_search_spaces = {f'model__{k}': v for k, v in search_spaces.items()}
        search = BayesSearchCV(
            estimator=pipeline, search_spaces=prefixed_search_spaces,
            n_iter=20, scoring=rank_ic_scorer, cv=custom_cv,
            random_state=42, n_jobs=1, verbose=2
        )
        search.fit(X_search, y_search)
        best_params_all[model_name] = {k.replace('model__', ''): v for k, v in search.best_params_.items()}
        print(f"--- {model_name.upper()} 调优完成, 最佳CV得分: {search.best_score_:.6f} ---")
        print(f"最佳参数: {best_params_all[model_name]}")

    # --- 5. 使用最优参数训练最终模型 ---
    print("\n--- 5. 使用最优参数在最终训练集上训练模型 ---")
    X_train_final, y_train_final = train_final_df[features], train_final_df['y']
    X_test_final = test_df[features]

    test_predictions = []
    # 最终训练时，使用所有模型
    all_models_for_final_training = ['xgb', 'catboost', 'lgbm']

    for model_name in all_models_for_final_training:
        print(f"正在训练并预测 {model_name.upper()} 模型...")
        params = best_params_all[model_name]

        if model_name == 'xgb':
            model = xgb.XGBRegressor(random_state=42, tree_method='gpu_hist', n_jobs=-1, **params)
        elif model_name == 'catboost':
            # (修复) 同样需要在此处添加
            model = cb.CatBoostRegressor(random_state=42, verbose=0, allow_writing_files=False, task_type='GPU', bootstrap_type='MVS', **params)
        elif model_name == 'lgbm':
            model = lgb.LGBMRegressor(random_state=42, device='gpu', n_jobs=-1, verbose=-1, **params)

        pipeline = Pipeline([('scaler', MinMaxScaler()), ('model', model)])
        pipeline.fit(X_train_final, y_train_final)

        test_predictions.append(pipeline.predict(X_test_final))
        print(f"{model_name.upper()} 预测完成。")

    # --- 6. 生成最终提交文件 (简单平均集成) ---
    print("\n--- 6. 使用简单平均法集成并生成最终提交文件 ---")
    ensemble_prediction = np.mean(test_predictions, axis=0)

    submission_df = test_df[['code', 'date']].copy()
    submission_df['y_pred'] = ensemble_prediction
    submission_df.reset_index(inplace=True)
    submission_df.rename(columns={'index': 'id'}, inplace=True)
    submission_df = submission_df[['id', 'code', 'date', 'y_pred']]

    submission_df.to_csv('submission_pipeline_v22_fix.csv', index=False)
    print("提交文件 'submission_pipeline_v22_fix.csv' 已生成.")
    print(submission_df.head())

if __name__ == '__main__':
    main()


--- 正在加载数据 ---

--- 1. 核心特征工程 (仅截面排名) ---

--- 2. 数据集划分 ---
超参数搜索集: 991 - 1690 (700 天)
最终训练集: 1391 - 1690 (300 天)

--- 3. 使用全部 28 个初始特征进行训练 ---
--- 创建重叠滑动窗口 (训练=300天, 验证=200天, 步长=100天, 共3折) ---
Fold 1: Train on 991 to 1290, Validate on 1291 to 1490
Fold 2: Train on 1091 to 1390, Validate on 1391 to 1590
Fold 3: Train on 1191 to 1490, Validate on 1491 to 1690

--- 使用已知的XGBoost最佳参数，跳过其搜索过程 ---
XGBoost Params: {'colsample_bytree': 0.6, 'learning_rate': 0.012428019985097712, 'max_depth': 3, 'min_child_weight': 20, 'n_estimators': 1200, 'subsample': 0.6}

--- 4.1 开始为 CATBOOST 进行贝叶斯优化 ---
Fitting 3 folds for each of 1 candidates, totalling 3 fits
[RankICScorer] Fold Rank IC = 0.072748
[CV] END model__depth=6, model__iterations=1592, model__l2_leaf_reg=9, model__learning_rate=0.010345931480630859, model__subsample=0.8680591793075738; total time=  33.1s
[RankICScorer] Fold Rank IC = 0.102335
[CV] END model__depth=6, model__iterations=1592, model__l2_leaf_reg=9, model__learning_rate=0.010345931

In [1]:
# === 模型训练管道 v23.4 (已修复XGBoost早停) ===
# 核心变更:
# 1. 在贝叶斯优化中移除了对 n_estimators/iterations 的搜索，因为最终轮数由早停机制确定。
# 2. 最终训练窗口为300天。
# 3. 交叉验证为2折，每折训练期300天。
# 4. 引入早停机制：在最近300天数据中，用前200天训练、后100天验证来找到最佳迭代次数，然后用此轮数在完整的300天数据上重新训练。
# 5. [修正] 使用 XGBoost 的 callback 接口实现早停，以解决 TypeError。

import pandas as pd
import numpy as np
import xgboost as xgb
import lightgbm as lgb
import catboost as cb
import os
import gc
import polars as pl
import warnings

# Scikit-learn & Scikit-optimize Imports
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import MinMaxScaler
from skopt import BayesSearchCV
from xgboost.callback import EarlyStopping # <--- 修正: 导入XGBoost的EarlyStopping回调

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

def calculate_rank_ic_polars(y_true, y_pred, dates):
    """使用Polars高效计算Rank IC"""
    if isinstance(y_true, pd.Series): y_true = y_true.to_numpy()
    if isinstance(y_pred, pd.Series): y_pred = y_pred.to_numpy()
    if isinstance(dates, pd.Series): dates = dates.to_numpy()

    df = pl.DataFrame({'y_true': y_true, 'y_pred': y_pred, 'date': dates})
    daily_ic = df.group_by('date', maintain_order=True).agg(
        pl.corr('y_true', 'y_pred', method='spearman').fill_nan(0.0).alias('ic')
    )
    return daily_ic['ic'].mean()

class RankICScorer:
    """一个与scikit-learn兼容的自定义评分器"""
    def __init__(self, dates: pd.Series):
        self.dates = dates.reset_index(drop=True)

    def __call__(self, estimator, X, y_true) -> float:
        fold_dates = self.dates.iloc[X.index]
        y_pred = estimator.predict(X)
        rank_ic = calculate_rank_ic_polars(y_true, y_pred, fold_dates)
        print(f"[RankICScorer] Fold Rank IC = {rank_ic:.6f}")
        return rank_ic if np.isfinite(rank_ic) else 0.0

# --- 自定义重叠滑动窗口交叉验证 ---
def sliding_window_cv(dates, n_splits=5, train_days=300, test_days=150, step_days=100):
    """生成重叠的、向前滚动的滑动窗口交叉验证索引"""
    unique_dates = sorted(dates.unique())
    last_split_test_end_idx = (n_splits - 1) * step_days + train_days + test_days

    if len(unique_dates) < last_split_test_end_idx:
        raise ValueError(
            f"数据不足以创建{n_splits}个重叠滑动窗口。"
            f"需要 {last_split_test_end_idx} 天, 但只有 {len(unique_dates)} 天。"
        )

    print(f"--- 创建重叠滑动窗口 (训练={train_days}天, 验证={test_days}天, 步长={step_days}天, 共{n_splits}折) ---")
    for i in range(n_splits):
        train_start_idx = i * step_days
        train_end_idx = train_start_idx + train_days
        test_start_idx = train_end_idx
        test_end_idx = test_start_idx + test_days
        train_dates = unique_dates[train_start_idx : train_end_idx]
        test_dates = unique_dates[test_start_idx : test_end_idx]
        print(f"Fold {i+1}: Train on {train_dates[0]} to {train_dates[-1]}, "
              f"Validate on {test_dates[0]} to {test_dates[-1]}")
        train_mask = dates.isin(train_dates)
        test_mask = dates.isin(test_dates)
        yield dates[train_mask].index.to_numpy(), dates[test_mask].index.to_numpy()

# --- 1. 核心特征工程 ---
def preprocess_and_rank(df):
    """截面排名特征"""
    for col in df.columns:
        if col not in ['code', 'date']:
            df[col] = pd.to_numeric(df[col], errors='coerce')
    float_cols = [c for c in df.columns if df[c].dtype == 'float64']
    for col in float_cols:
        df[col] = df[col].astype('float32')
    pl_df = pl.from_pandas(df)
    feature_cols = [col for col in df.columns if col.startswith('f_')]
    pl_df = pl_df.with_columns([
        ((pl.col(feature).rank(method='ordinal').over('date') - 1) / (pl.col(feature).count().over('date') - 1)).alias(feature)
        for feature in feature_cols
    ])
    final_df = pl_df.to_pandas()
    final_df.fillna(0.5, inplace=True)
    return final_df

# --- 2. 主执行流程 ---
def main():
    print("--- 正在加载数据 ---")
    DATA_PATH = './stocks-return-prediction'
    train_df_raw = pd.read_pickle(os.path.join(DATA_PATH, 'train_data9.pkl'))
    test_df_raw = pd.read_pickle(os.path.join(DATA_PATH, 'test_data9.pkl'))

    print("\n--- 1. 核心特征工程 (仅截面排名) ---")
    train_df = preprocess_and_rank(train_df_raw.copy())
    test_df = preprocess_and_rank(test_df_raw.copy())
    train_df.sort_values(by=['date', 'code'], inplace=True, ignore_index=True)
    test_df.sort_values(by=['date', 'code'], inplace=True, ignore_index=True)
    del train_df_raw
    gc.collect()

    print("\n--- 2. 数据集划分 ---")
    unique_dates = sorted(train_df['date'].unique())

    # 最终训练集：最新的300天
    train_final_start_date = unique_dates[-300]
    train_final_df = train_df[train_df['date'] >= train_final_start_date].copy()

    # 超参数搜索集：最新的700天 (4*100步长 + 200训练 + 100验证 = 700天)
    search_start_date = unique_dates[-700]
    search_df = train_df[train_df['date'] >= search_start_date].copy()

    print(f"超参数搜索集: {search_df['date'].min()} - {search_df['date'].max()} ({search_df['date'].nunique()} 天)")
    print(f"最终训练集: {train_final_df['date'].min()} - {train_final_df['date'].max()} ({train_final_df['date'].nunique()} 天)")

    features = [col for col in train_df.columns if col.startswith('f_')]
    print(f"\n--- 3. 使用全部 {len(features)} 个初始特征进行训练 ---")

    # --- 4. 独立调优三驾马车模型 ---
    search_df.reset_index(drop=True, inplace=True)
    X_search, y_search = search_df[features], search_df['y']

    custom_cv = list(sliding_window_cv(search_df['date'], n_splits=2, train_days=300, test_days=150, step_days=150))
    rank_ic_scorer = RankICScorer(dates=search_df['date'])

    models_to_tune = ['xgb', 'catboost', 'lgbm']
    best_params_all = {}

    for model_name in models_to_tune:
        print(f"\n--- 4.{models_to_tune.index(model_name)+1} 开始为 {model_name.upper()} 进行贝叶斯优化 ---")
        # 为模型设置一个固定的、足够大的迭代次数，让优化器专注于其他参数
        fixed_iterations = 2000

        if model_name == 'xgb':
            model = xgb.XGBRegressor(n_estimators=fixed_iterations, random_state=42, tree_method='gpu_hist', n_jobs=-1)
            search_spaces = {
                'learning_rate': (0.005, 0.05, 'log-uniform'), 'max_depth': (3, 10),
                'subsample': (0.6, 1.0, 'uniform'), 'colsample_bytree': (0.6, 1.0, 'uniform'), 'min_child_weight': (1, 20)
            }
        elif model_name == 'catboost':
            model = cb.CatBoostRegressor(iterations=fixed_iterations, random_state=42, verbose=0, allow_writing_files=False, task_type='GPU', bootstrap_type='MVS')
            search_spaces = {
                'learning_rate': (0.005, 0.05, 'log-uniform'), 'depth': (4, 10),
                'l2_leaf_reg': (1, 10, 'uniform'), 'subsample': (0.6, 1.0, 'uniform')
            }
        elif model_name == 'lgbm':
            model = lgb.LGBMRegressor(n_estimators=fixed_iterations, random_state=42, device='gpu', n_jobs=-1, verbose=-1)
            search_spaces = {
                'learning_rate': (0.005, 0.05, 'log-uniform'), 'num_leaves': (20, 150),
                'feature_fraction': (0.6, 1.0, 'uniform'), 'bagging_fraction': (0.6, 1.0, 'uniform'),
                'lambda_l1': (1e-8, 10.0, 'log-uniform'), 'lambda_l2': (1e-8, 10.0, 'log-uniform')
            }

        pipeline = Pipeline([('scaler', MinMaxScaler()), ('model', model)])
        prefixed_search_spaces = {f'model__{k}': v for k, v in search_spaces.items()}
        search = BayesSearchCV(
            estimator=pipeline, search_spaces=prefixed_search_spaces,
            n_iter=10, scoring=rank_ic_scorer, cv=custom_cv,
            random_state=42, n_jobs=1, verbose=2
        )
        search.fit(X_search, y_search)
        best_params_all[model_name] = {k.replace('model__', ''): v for k, v in search.best_params_.items()}
        print(f"--- {model_name.upper()} 调优完成, 最佳CV得分: {search.best_score_:.6f} ---")
        print(f"最佳参数: {best_params_all[model_name]}")

    # --- 5. 使用最优参数和早停机制训练最终模型 ---
    print("\n--- 5. 使用最优参数和早停机制在最终训练集上训练模型 ---")
    X_train_final, y_train_final = train_final_df[features], train_final_df['y']
    X_test_final = test_df[features]

    # 为早停机制划分数据：在最近300天的数据中，用前200天训练，后100天验证
    es_split_date = unique_dates[-100]
    train_es_mask = train_final_df['date'] < es_split_date
    eval_es_mask = train_final_df['date'] >= es_split_date

    X_train_es, y_train_es = X_train_final[train_es_mask], y_train_final[train_es_mask]
    X_eval_es, y_eval_es = X_train_final[eval_es_mask], y_train_final[eval_es_mask]

    print(f"早停训练集: {train_final_df[train_es_mask]['date'].min()} - {train_final_df[train_es_mask]['date'].max()} ({train_final_df[train_es_mask]['date'].nunique()} 天)")
    print(f"早停验证集: {train_final_df[eval_es_mask]['date'].min()} - {train_final_df[eval_es_mask]['date'].max()} ({train_final_df[eval_es_mask]['date'].nunique()} 天)")

    test_predictions = []

    for model_name in models_to_tune:
        print(f"\n--- 正在为 {model_name.upper()} 模型寻找最佳迭代次数 ---")
        params = best_params_all[model_name].copy()

        # scaler需要单独fit和transform，以便正确准备eval_set
        scaler = MinMaxScaler()
        X_train_es_scaled = scaler.fit_transform(X_train_es)
        X_eval_es_scaled = scaler.transform(X_eval_es)

        best_iteration = 0

        if model_name == 'xgb':
            # 这里的n_estimators也设置为一个大数，让早停来决定最终值
            model = xgb.XGBRegressor(n_estimators=fixed_iterations, random_state=42, tree_method='gpu_hist', n_jobs=-1, **params)

            # --- 修正: 使用回调函数进行早停 ---
            model.fit(X_train_es_scaled, y_train_es,
                      eval_set=[(X_eval_es_scaled, y_eval_es)],
                      callbacks=[EarlyStopping(rounds=50, save_best=True)],
                      verbose=False)

            # --- 修正: 使用 model.best_ntree_limit 获取最佳迭代次数 ---
            best_iteration = model.best_ntree_limit
            params['n_estimators'] = best_iteration
            final_model = xgb.XGBRegressor(random_state=42, tree_method='gpu_hist', n_jobs=-1, **params)

        elif model_name == 'catboost':
            model = cb.CatBoostRegressor(iterations=fixed_iterations, random_state=42, allow_writing_files=False, task_type='GPU', bootstrap_type='MVS', **params)
            model.fit(X_train_es_scaled, y_train_es,
                      eval_set=[(X_eval_es_scaled, y_eval_es)],
                      early_stopping_rounds=50, verbose=0)
            best_iteration = model.get_best_iteration()
            params['iterations'] = best_iteration
            final_model = cb.CatBoostRegressor(random_state=42, verbose=0, allow_writing_files=False, task_type='GPU', bootstrap_type='MVS', **params)

        elif model_name == 'lgbm':
            model = lgb.LGBMRegressor(n_estimators=fixed_iterations, random_state=42, device='gpu', n_jobs=-1, verbose=-1, **params)
            # 假设使用 lightgbm >= 4.0.0，回调是推荐方式
            model.fit(X_train_es_scaled, y_train_es,
                      eval_set=[(X_eval_es_scaled, y_eval_es)],
                      callbacks=[lgb.early_stopping(50, verbose=False)])
            best_iteration = model.best_iteration_
            params['n_estimators'] = best_iteration
            final_model = lgb.LGBMRegressor(random_state=42, device='gpu', n_jobs=-1, verbose=-1, **params)

        print(f"找到最佳迭代次数: {best_iteration}")
        print(f"--- 使用 {best_iteration} 轮在完整的300天数据上重新训练 {model_name.upper()} ---")
        final_pipeline = Pipeline([('scaler', MinMaxScaler()), ('model', final_model)])
        final_pipeline.fit(X_train_final, y_train_final)

        test_predictions.append(final_pipeline.predict(X_test_final))
        print(f"{model_name.upper()} 预测完成。")

    # --- 6. 生成最终提交文件 (简单平均集成) ---
    print("\n--- 6. 使用简单平均法集成并生成最终提交文件 ---")
    ensemble_prediction = np.mean(test_predictions, axis=0)

    submission_df = test_df[['code', 'date']].copy()
    submission_df['y_pred'] = ensemble_prediction
    submission_df.reset_index(inplace=True)
    submission_df.rename(columns={'index': 'id'}, inplace=True)
    submission_df = submission_df[['id', 'code', 'date', 'y_pred']]

    submission_df.to_csv('submission_pipeline_v23_es_fix.csv', index=False)
    print("提交文件 'submission_pipeline_v23_es_fix.csv' 已生成.")
    print(submission_df.head())

if __name__ == '__main__':
    main()


--- 正在加载数据 ---

--- 1. 核心特征工程 (仅截面排名) ---

--- 2. 数据集划分 ---
超参数搜索集: 991 - 1690 (700 天)
最终训练集: 1391 - 1690 (300 天)

--- 3. 使用全部 28 个初始特征进行训练 ---
--- 创建重叠滑动窗口 (训练=300天, 验证=150天, 步长=150天, 共2折) ---
Fold 1: Train on 991 to 1290, Validate on 1291 to 1440
Fold 2: Train on 1141 to 1440, Validate on 1441 to 1590

--- 4.1 开始为 XGB 进行贝叶斯优化 ---
Fitting 2 folds for each of 1 candidates, totalling 2 fits
[RankICScorer] Fold Rank IC = 0.076353
[CV] END model__colsample_bytree=0.7640415835413256, model__learning_rate=0.026711344437355546, model__max_depth=10, model__min_child_weight=7, model__subsample=0.8680591793075738; total time= 1.2min
[RankICScorer] Fold Rank IC = 0.100175
[CV] END model__colsample_bytree=0.7640415835413256, model__learning_rate=0.026711344437355546, model__max_depth=10, model__min_child_weight=7, model__subsample=0.8680591793075738; total time= 1.2min
Fitting 2 folds for each of 1 candidates, totalling 2 fits
[RankICScorer] Fold Rank IC = 0.081712
[CV] END model__colsample_bytre

TypeError: XGBModel.fit() got an unexpected keyword argument 'early_stopping_rounds'