# 限价订单簿（LOB）特征字典（共 110 维）

## v₁ 基础集合 (20 维)

| 特征名 | 数学符号 | 含义 |
|--------|----------|------|
| AskPrice_L1 … L5 | \(P^{ask}_i\) | 卖一 ~ 卖五 **报价** |
| AskVol_L1 … L5  | \(V^{ask}_i\) | 卖一 ~ 卖五 **挂单量** |
| BidPrice_L1 … L5 | \(P^{bid}_i\) | 买一 ~ 买五 报价 |
| BidVol_L1 … L5  | \(V^{bid}_i\) | 买一 ~ 买五 挂单量 |

---

## v₂ 时间无关集合 Ⅰ (10 维)

| 特征名 | 公式 | 说明 |
|--------|------|------|
| Spread_L1 … L5 | \(P^{ask}_i - P^{bid}_i\) | 各档 **买卖价差** |
| MidPrice_L1 … L5 | \(\tfrac{P^{ask}_i + P^{bid}_i}{2}\) | 各档 **中间价** |

---

## v₃ 时间无关集合 Ⅱ (12 维)

| 特征名 | 公式 | 说明 |
|--------|------|------|
| AskStackRange | \(P^{ask}_{L5} - P^{ask}_{L1}\) | 卖盘价格跨度 |
| BidStackRange | \(P^{bid}_{L1} - P^{bid}_{L5}\) | 买盘价格跨度 |
| AskGrad_L1_L2 … L4_L5 | \(|P^{ask}_i - P^{ask}_{i+1}|\) | 相邻 **卖档梯度** |
| BidGrad_L1_L2 … L4_L5 | \(|P^{bid}_i - P^{bid}_{i+1}|\) | 相邻 **买档梯度** |

---

## v₄ 均值价格与数量 (4 维)

| 特征名 | 公式 | 说明 |
|--------|------|------|
| MeanAskPrice | \(\frac{1}{5}\sum_i P^{ask}_i\) | 卖五档 **均价** |
| MeanBidPrice | \(\frac{1}{5}\sum_i P^{bid}_i\) | 买五档 均价 |
| MeanAskVol   | \(\frac{1}{5}\sum_i V^{ask}_i\) | 卖五档 **均量** |
| MeanBidVol   | \(\frac{1}{5}\sum_i V^{bid}_i\) | 买五档 均量 |

---

## v₅ 累计不平衡 (2 维)

| 特征名 | 公式 | 说明 |
|--------|------|------|
| CumPriceImb | \(\sum_i (P^{ask}_i - P^{bid}_i)\) | 总体 **价格不平衡** |
| CumVolImb   | \(\sum_i (V^{ask}_i - V^{bid}_i)\) | 总体 **数量不平衡** |

---

## v₆ 一阶导数（首尾差） (20 维)

> 以窗口首尾快照计算

| 特征名 | 说明 |
|--------|------|
| dAskPrice_L1 … L5 | 卖档 **价格斜率** |
| dBidPrice_L1 … L5 | 买档 价格斜率 |
| dAskVol_L1 … L5   | 卖档 **挂量斜率** |
| dBidVol_L1 … L5   | 买档 挂量斜率 |

---

## v₇ 平均 \|一阶导\| (20 维)

> 对窗口内每时刻的一阶导取绝对值，再对时间求均值。

| 特征名 | 说明 |
|--------|------|
| Mean_dAskPrice_L1 … L5 | 卖档价格平均变动速率 |
| Mean_dBidPrice_L1 … L5 | 买档价格平均变动速率 |
| Mean_dAskVol_L1 … L5   | 卖档挂量平均变动速率 |
| Mean_dBidVol_L1 … L5   | 买档挂量平均变动速率 |

---

## v₈ 相对强度指示符 (4 维, 0/1)

| 特征名 | 判别条件 | 意义 |
|--------|----------|------|
| RI_Price  | mean(dP<sub>ask</sub>) > mean(dP<sub>bid</sub>) | 价格趋势偏向卖方？ |
| RI_Vol    | MeanAskVol > MeanBidVol | 挂量偏向卖方？ |
| RI_dPrice | avg\|dP<sub>ask</sub>\| > avg\|dP<sub>bid</sub>\| | 价格波动更偏向卖方 |
| RI_dVol   | avg\|dV<sub>ask</sub>\| > avg\|dV<sub>bid</sub>\| | 挂量波动更偏向卖方 |

---

## v₉ 平均二阶导数 (20 维)

> 二阶导：

| 特征名 | 说明 |
|--------|------|
| Mean_ddAskPrice_L1 … L5 | 卖档价格 **加速度** |
| Mean_ddBidPrice_L1 … L5 | 买档价格 加速度 |
| Mean_ddAskVol_L1 … L5   | 卖档挂量 加速度 |
| Mean_ddBidVol_L1 … L5   | 买档挂量 加速度 |


In [1]:
pip install gplearn==0.4.2

Collecting gplearn==0.4.2
  Downloading gplearn-0.4.2-py3-none-any.whl.metadata (4.3 kB)
Downloading gplearn-0.4.2-py3-none-any.whl (25 kB)
Installing collected packages: gplearn
Successfully installed gplearn-0.4.2



参数：
   - `SEED`：随机种子  
   - `STRIDE`：滑动步长  
   - `TRAIN_FILE` / `VAL_FILE`：训练集与验证集路径  
   - `MODEL_DIR`：模型保存目录

我们定义的函数：

   - `ensure_ts_datetime`：确保时间列为日期时间格式
   - `purge`：去除空值与非有限值
   - `build_matrix`：构建价格-数量矩阵
   - `compute_v_features`：从窗口计算 v₁…v₉ 特征
   - `pretty_expr`：将程序树转化为可读表达式


In [None]:
# # =============================================================
# 有一些版本兼容问题
# =============================================================
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

import sklearn
from sklearn.base import BaseEstimator

def _no_tags(self): return {}
BaseEstimator.__sklearn_tags__ = _no_tags
BaseEstimator._get_tags        = _no_tags


import gplearn.genetic as _gp
for _cls_name in ("SymbolicRegressor","SymbolicClassifier","SymbolicTransformer"):
    cls = getattr(_gp, _cls_name)
    cls.__sklearn_tags__ = _no_tags
    cls._get_tags        = _no_tags

# ---------------- imports & I/O ----------------
import os, random, warnings, re, numpy as np, polars as pl, cloudpickle
from scipy.stats import spearmanr
from sklearn.utils import check_random_state
from gplearn.genetic import SymbolicTransformer as _ST_orig


SEED         = 123          # 固定随机种子
STRIDE       = 50           # stride设置成50，不然内存会崩
TRAIN_FILE   = "/content/drive/My Drive/train_unstandlized.parquet"
VAL_FILE     = "/content/drive/My Drive/val_unstandlized.parquet"
MODEL_DIR    = "/content/drive/My Drive/gp_models"
os.makedirs(MODEL_DIR, exist_ok=True)
################################################################

random.seed(SEED); np.random.seed(SEED)

# ---------- 兼容 gplearn 与 sklearn 1.3+ ----------------------
class SymbolicTransformer(_ST_orig):
    def __sklearn_tags__(self): return {}
    def _get_tags(self):        return {}

# ---------------- 固定超参/列名/工具 ----------------
STEP_MS, WINDOW_SIZE, HORIZON_STP = 500, 20, 20
GENERATIONS, POP_SIZE, TOURNAMENT_SZ, MAX_DEPTH = 1, 2500, 40, 5
N_LEVELS = 5
PRICE_COLS = [f"Buy{i}Price"  for i in range(1, N_LEVELS+1)] + \
             [f"Sell{i}Price" for i in range(1, N_LEVELS+1)]
QTY_COLS   = [f"Buy{i}OrderQty"  for i in range(1, N_LEVELS+1)] + \
             [f"Sell{i}OrderQty" for i in range(1, N_LEVELS+1)]
RAW_COLS   = PRICE_COLS + QTY_COLS

FEATURE_NAMES = [
    # v1–v6（66）
    "AskPrice_L1","AskPrice_L2","AskPrice_L3","AskPrice_L4","AskPrice_L5",
    "AskVol_L1","AskVol_L2","AskVol_L3","AskVol_L4","AskVol_L5",
    "BidPrice_L1","BidPrice_L2","BidPrice_L3","BidPrice_L4","BidPrice_L5",
    "BidVol_L1","BidVol_L2","BidVol_L3","BidVol_L4","BidVol_L5",
    "Spread_L1","Spread_L2","Spread_L3","Spread_L4","Spread_L5",
    "MidPrice_L1","MidPrice_L2","MidPrice_L3","MidPrice_L4","MidPrice_L5",
    "AskStackRange","BidStackRange",
    "AskGrad_L1_L2","AskGrad_L2_L3","AskGrad_L3_L4","AskGrad_L4_L5",
    "BidGrad_L1_L2","BidGrad_L2_L3","BidGrad_L3_L4","BidGrad_L4_L5",
    "MeanAskPrice","MeanBidPrice","MeanAskVol","MeanBidVol",
    "CumPriceImb","CumVolImb",
    "dAskPrice_L1","dAskPrice_L2","dAskPrice_L3","dAskPrice_L4","dAskPrice_L5",
    "dBidPrice_L1","dBidPrice_L2","dBidPrice_L3","dBidPrice_L4","dBidPrice_L5",
    "dAskVol_L1","dAskVol_L2","dAskVol_L3","dAskVol_L4","dAskVol_L5",
    "dBidVol_L1","dBidVol_L2","dBidVol_L3","dBidVol_L4","dBidVol_L5",
    # v7（20）
    "Mean_dAskPrice_L1","Mean_dAskPrice_L2","Mean_dAskPrice_L3","Mean_dAskPrice_L4","Mean_dAskPrice_L5",
    "Mean_dBidPrice_L1","Mean_dBidPrice_L2","Mean_dBidPrice_L3","Mean_dBidPrice_L4","Mean_dBidPrice_L5",
    "Mean_dAskVol_L1","Mean_dAskVol_L2","Mean_dAskVol_L3","Mean_dAskVol_L4","Mean_dAskVol_L5",
    "Mean_dBidVol_L1","Mean_dBidVol_L2","Mean_dBidVol_L3","Mean_dBidVol_L4","Mean_dBidVol_L5",
    # v8（4）
    "RI_Price", "RI_Vol", "RI_dPrice", "RI_dVol",
    # v9（20）
    "Mean_ddAskPrice_L1","Mean_ddAskPrice_L2","Mean_ddAskPrice_L3","Mean_ddAskPrice_L4","Mean_ddAskPrice_L5",
    "Mean_ddBidPrice_L1","Mean_ddBidPrice_L2","Mean_ddBidPrice_L3","Mean_ddBidPrice_L4","Mean_ddBidPrice_L5",
    "Mean_ddAskVol_L1","Mean_ddAskVol_L2","Mean_ddAskVol_L3","Mean_ddAskVol_L4","Mean_ddAskVol_L5",
    "Mean_ddBidVol_L1","Mean_ddBidVol_L2","Mean_ddBidVol_L3","Mean_ddBidVol_L4","Mean_ddBidVol_L5",
]

def ensure_ts_datetime(df: pl.DataFrame, col="ts") -> pl.DataFrame:
    if col not in df.columns:
        raise KeyError(f"列 '{col}' 不存在")
    if df[col].dtype in (pl.Datetime, pl.Date):
        return df
    # 字符串解析
    try:
        return df.with_columns(pl.col(col).str.strptime(pl.Datetime, strict=False, exact=False))
    except Exception:
        pass
    # epoch 毫秒
    try:
        return df.with_columns(pl.from_epoch(pl.col(col).cast(pl.Int64), unit="ms").alias(col))
    except Exception:
        pass
    # epoch 秒
    try:
        return df.with_columns(pl.from_epoch(pl.col(col).cast(pl.Int64), unit="s").alias(col))
    except Exception:
        pass
    warnings.warn(f"列 '{col}' 不是时间类型，后续 dt 操作可能失败。")
    return df

def purge(lf: pl.LazyFrame) -> pl.DataFrame:
    conds = [pl.col(c).is_finite() for c in RAW_COLS]
    return lf.drop_nulls(RAW_COLS).filter(pl.all_horizontal(conds)).collect(streaming=True)


# Construct an order book feature matrix from prices and quantities.
def build_matrix(df: pl.DataFrame) -> np.ndarray:
    sp = df.select([c for c in PRICE_COLS if "Sell" in c]).to_numpy()
    bp = df.select([c for c in PRICE_COLS if "Buy"  in c]).to_numpy()
    sq = df.select([c for c in QTY_COLS   if "Sell" in c]).to_numpy()
    bq = df.select([c for c in QTY_COLS   if "Buy"  in c]).to_numpy()
    return np.concatenate([sp, bp, sq, bq], axis=1).astype(np.float32)


# 手搓因子
def compute_v_features(window: np.ndarray, step_ms:int=STEP_MS) -> np.ndarray:
    n = N_LEVELS
    cur, prev = window[-1], window[0]
    P_ask, P_bid = cur[0:n],        cur[n:2*n]
    V_ask, V_bid = cur[2*n:3*n],    cur[3*n:4*n]
    P_ask0, P_bid0 = prev[0:n],     prev[n:2*n]
    V_ask0, V_bid0 = prev[2*n:3*n], prev[3*n:4*n]

    # v1–v6
    v1 = np.concatenate([P_ask, V_ask, P_bid, V_bid])
    spread, mid = P_ask - P_bid, 0.5*(P_ask + P_bid)
    v2 = np.concatenate([spread, mid])
    range_ask, range_bid = P_ask[-1]-P_ask[0], P_bid[0]-P_bid[-1]
    adj_diff = np.concatenate([np.abs(np.diff(P_ask)), np.abs(np.diff(P_bid))])
    v3 = np.concatenate([[range_ask, range_bid], adj_diff])
    v4 = np.array([P_ask.mean(), P_bid.mean(), V_ask.mean(), V_bid.mean()])
    v5 = np.array([(P_ask-P_bid).sum(), (V_ask-V_bid).sum()])
    delta_t = (window.shape[0]-1) * step_ms / 1000.0
    dP_ask, dP_bid = (P_ask-P_ask0)/delta_t, (P_bid-P_bid0)/delta_t
    dV_ask, dV_bid = (V_ask-V_ask0)/delta_t, (V_bid-V_bid0)/delta_t
    v6 = np.concatenate([dP_ask, dP_bid, dV_ask, dV_bid])

    # v7–v9（用完整窗口）
    dt = step_ms / 1000.0
    Pask_seq = window[:, 0:n];      Pbid_seq = window[:, n:2*n]
    Vask_seq = window[:, 2*n:3*n];  Vbid_seq = window[:, 3*n:4*n]
    dPask = np.diff(Pask_seq, axis=0) / dt
    dPbid = np.diff(Pbid_seq, axis=0) / dt
    dVask = np.diff(Vask_seq, axis=0) / dt
    dVbid = np.diff(Vbid_seq, axis=0) / dt
    v7 = np.concatenate([
        np.mean(np.abs(dPask), axis=0), np.mean(np.abs(dPbid), axis=0),
        np.mean(np.abs(dVask), axis=0), np.mean(np.abs(dVbid), axis=0),
    ])
    v8 = np.array([
        float(dPask.mean() > dPbid.mean()),
        float(V_ask.mean()  > V_bid.mean()),
        float(np.abs(dPask).mean() > np.abs(dPbid).mean()),
        float(np.abs(dVask).mean() > np.abs(dVbid).mean()),
    ], dtype=np.float32)
    ddPask = np.diff(dPask, axis=0) / dt
    ddPbid = np.diff(dPbid, axis=0) / dt
    ddVask = np.diff(dVask, axis=0) / dt
    ddVbid = np.diff(dVbid, axis=0) / dt
    v9 = np.concatenate([
        np.mean(ddPask, axis=0), np.mean(ddPbid, axis=0),
        np.mean(ddVask, axis=0), np.mean(ddVbid, axis=0),
    ])
    return np.concatenate([v1, v2, v3, v4, v5, v6, v7, v8, v9])

def pretty_expr(program, names):
    if hasattr(program, "export"):
        return program.export(feature_names=names)
    expr = str(program)
    def repl(m): return names[int(m.group(1))]
    return re.sub(r'\bX(\d+)\b', repl, expr)




## daily_ic_stats
- 按日期计算逐日 Spearman IC  
- 支持 1D   输入
- 返回：日均IC、日均|IC|、有效天数、每日IC映射

## select_best_component_daily
- 在多个组件中选择 **|日均IC| 最大** 的作为最优  



In [None]:

# ===================== 逐日 IC 计算 =====================

import numpy as np
from scipy.stats import spearmanr


#  逐日 Spearman IC 统计:
#       - 支持 1D
#       - 自动清理 NaN/Inf
#       - 返回: (日均IC, 日均|IC|, 有效天数, 每日IC字典)
# 例如：
# values = [0.12, 0.05, 0.33,    # 8月1日
#           0.15, 0.08, 0.30,    # 8月2日
#           0.10, 0.07, 0.28]    # 8月3日

# y =      [0.02, 0.01, 0.05,    # 8月1日
#           0.03, 0.02, 0.04,    # 8月2日
#           0.01, 0.00, 0.03]    # 8月3日

# dates =  ['2024-08-01'] * 3 + ['2024-08-02'] * 3 + ['2024-08-03'] * 3

#     """
def daily_ic_stats(values, y, dates):
    vals  = np.asarray(values)
    y     = np.asarray(y)
    dates = np.asarray(dates)

    day_map = {}
    for d in np.unique(dates):
        idx = (dates == d)
        ic = spearmanr(vals[idx], y[idx]).correlation
        if np.isfinite(ic):         #  避免全 NaN 把均值拖没
            day_map[d] = float(ic)

    if not day_map:
        return np.nan, np.nan, 0, {}

    ics = np.fromiter(day_map.values(), dtype=float)
    return float(ics.mean()), float(np.abs(ics).mean()), int(ics.size), day_map



def select_best_component_daily(comps: np.ndarray, y: np.ndarray, dates: np.ndarray):
    """
    用“|日均IC|”挑最优组件；若全 NaN，回退到 “日均|IC|”
    组件是 gplearn 生成的候选因子（数学表达式）的输出结果，用于评估和选择最优因子
    返回 (best_idx, summary_list)
    """
    k = comps.shape[1]
    stats = []
    for j in range(k):
        m_ic, m_abs, n_days, _ = daily_ic_stats(comps[:, j], y, dates)
        stats.append((j, m_ic, m_abs, n_days))
    scores = np.array([abs(s[1]) for s in stats], dtype=float)
    if not np.isfinite(scores).any():
        scores = np.array([s[2] for s in stats], dtype=float)  # 回退
    best_idx = int(np.nanargmax(scores))
    return best_idx, stats

# gplearn 因子训练与评估流程

## 1) 构造训练样本
- 读取并排序 Parquet 数据，标准化时间列  
- 计算未来 `HORIZON_STP` 步的中间价收益作为标签  
- 按 `STRIDE` 步长与 `WINDOW_SIZE` 窗口生成 v-特征矩阵  
-

## 2) 训练 SymbolicTransformer
- 遗传编程搜索候选因子（函数集含 `add`、`sub`、`mul`、`div`、`sin`、`cos`、`log`、`sqrt`）  
- 评估指标设为 `spearman`，生成 4 个候选因子  
- 固定随机种子保证可复现

## 3) 评估与选择最佳组件
- 逐日计算各组件 IC，选择 **|日均IC| 最大** 的作为最佳  
- 输出各组件统计并记录最佳表达式


In [None]:
# ============================================================
# 1) 读取训练集 & 构造样本
#   - 读取 Parquet，时间列标准化
#   - 构造标签：未来 HORIZON_STP 步的中间价收益
#   - 以 STRIDE 抽样，滑窗长度 WINDOW_SIZE，计算 v-特征
# ============================================================
import os, re, glob, json, time
import numpy as np
import pandas as pd

print(" 读取训练集")
df_tr = purge(pl.scan_parquet(TRAIN_FILE).sort("ts"))
df_tr = ensure_ts_datetime(df_tr, "ts").sort("ts")
print("完成，行数 =", len(df_tr))

# 原始矩阵 / 标签
X_raw_tr = build_matrix(df_tr)
mid_now_tr = (df_tr["Buy1Price"] + df_tr["Sell1Price"]) / 2
mid_fut_tr = mid_now_tr.shift(-HORIZON_STP)
ret_tr = ((mid_fut_tr - mid_now_tr) / mid_now_tr).fill_nan(0).to_numpy()

idx_tr = np.arange(WINDOW_SIZE-1, len(df_tr)-HORIZON_STP, STRIDE)
y_ic_tr = ret_tr[idx_tr]
X_tr = np.stack([compute_v_features(X_raw_tr[t-WINDOW_SIZE+1:t+1])
                 for t in idx_tr], axis=0).astype(np.float32)
dates_tr = df_tr["ts"].dt.date().to_numpy()[idx_tr]

print("训练特征矩阵:", X_tr.shape, "| 有效样本日期数(去重):", len(np.unique(dates_tr)))

# ============================================================
#  训练 gplearn SymbolicTransformer
#   - n_components=4：同时产出 4 个候选因子
# ============================================================
gp = SymbolicTransformer(
    generations=GENERATIONS, population_size=POP_SIZE,
    tournament_size=TOURNAMENT_SZ, init_depth=(2, MAX_DEPTH),
    function_set=['add','sub','mul','div','sin','cos','log','sqrt'],
    metric='spearman', const_range=(-1,1),
    parsimony_coefficient=5e-4, n_components=4,
    verbose=1, n_jobs=-1, random_state=SEED,  # 固定种子便于复现
)
gp.fit(X_tr, y_ic_tr)
gp.feature_names_ = FEATURE_NAMES

# ============================================================
# 训练集逐日IC评估，选择最佳组件（按“|日均IC|”）
#   - comps_tr: (N, n_components)
# 每天单独计算该日所有样本的 Spearman IC（因子值 vs 对应未来收益）。
# 得到一列 “每日 IC” 时间序列。
# 再对这些日度 IC 取均值（以及均值绝对值、标准差等），作为因子长期表现的统计指标。
#
# ============================================================
comps_tr = gp.transform(X_tr)   # (N, n_components)
best_idx, tr_stats = select_best_component_daily(comps_tr, y_ic_tr, dates_tr)
best_prog = gp._best_programs[best_idx]
expr_str  = pretty_expr(best_prog, FEATURE_NAMES)

# 打印训练集评估详情
best_mean_ic  = [s[1] for s in tr_stats][best_idx]   # 日均IC
best_mean_abs = [s[2] for s in tr_stats][best_idx]   # 日均|IC|
best_days     = [s[3] for s in tr_stats][best_idx]
print("\n====== 训练集（逐日IC）======")
for j, m_ic, m_abs, n_days in tr_stats:
    print(f"组件#{j}: 日均IC={m_ic:.4f} | 日均|IC|={m_abs:.4f} | 有效天数={n_days}")
print(f"\n▶ 选择组件 #{best_idx} 作为最佳（按“|日均IC|”）")
print("表达式：")
print(expr_str)

# 供后续写 Word/日志使用，避免 NameError
best_ic_tr      = float(best_mean_ic)         # 训练集 日均IC
best_abs_ic_tr  = float(abs(best_mean_ic))    # 训练集 |日均IC|


# 模型保存与验证流程

## 2c) 保存模型与元数据
- **新建运行目录**：若 `MODEL_DIR` 未定义，则以时间戳生成 `run_YYYYMMDD_HHMMSS` 目录  
- **分配模型编号**：`model_{idx}.pkl`，避免与现有文件冲突  
- **保存内容**：
  - `gp` 对象（gplearn 模型）
  - 特征名 `feature_names`
  - 最优组件索引 `best_idx`
  - 元数据 `meta`（训练区间、步长、窗口、预测周期、随机种子、函数集等）
- **维护 manifest.json**：
  - 每次保存更新清单，记录文件名、编号、最优组件、时间戳、特征数、元数据

## 3) 验证集评估
- **读取验证集**：
  - Parquet → 排序 → 时间列标准化
  - 调用 `purge` 清洗数据
- **防泄漏断言**：
  - 确保训练结束时间早于验证开始时间
- **构造验证样本**：
  - 与训练集相同的滑窗与步长逻辑
  - 计算 v-特征与未来收益标签
- **评估训练选出的最佳组件**：
  - 调用 `daily_ic_stats` 计算验证集日均 IC
  - 输出：
    - `|日均IC|`（主口径）

    - 有效天数
  - 打印最差/最好 5 天的 IC 以查看稳定性
- **回填 manifest**：
  - 将验证区间 `val_range` 写入对应模型记录，便于追溯


In [None]:

# ============================================================
# 2c) 统一保存：model_{idx}.pkl（含 best_idx / feature_names / meta）
#   - 保存到 <BASE_DIR>/models/run_YYYYMMDD_HHMMSS 或现有 MODEL_DIR

if "new_run_dir" not in globals():
    def new_run_dir(base_dir: str, subdir: str = "models") -> str:
        ts = time.strftime("%Y%m%d_%H%M%S")
        run_dir = os.path.join(base_dir, subdir, f"run_{ts}")
        os.makedirs(run_dir, exist_ok=True)
        globals()["MODEL_DIR"] = run_dir
        print(f"[MODEL_DIR] {run_dir}")
        return run_dir

if "save_gp_model_bundle" not in globals():
    import cloudpickle
    def _atomic_pickle_dump(obj, path):
        tmp = f"{path}.tmp"
        with open(tmp, "wb") as f:
            cloudpickle.dump(obj, f)
        os.replace(tmp, path)
    def _append_manifest(out_dir: str, rec: dict):
        path = os.path.join(out_dir, "manifest.json")
        if os.path.exists(path):
            try:
                with open(path, "r", encoding="utf-8") as f:
                    data = json.load(f)
            except Exception:
                data = {"models": []}
        else:
            data = {"models": []}
        data.setdefault("models", []).append(rec)
        with open(path, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        print(f"[MANIFEST] 已更新: {path}")
    def save_gp_model_bundle(gp_obj, idx, out_dir, feature_names, *, best_idx=None, meta=None, overwrite=False):
        os.makedirs(out_dir, exist_ok=True)
        save_path = os.path.join(out_dir, f"model_{int(idx)}.pkl")
        if os.path.exists(save_path) and not overwrite:
            raise FileExistsError(f"{save_path} 已存在；如需覆盖请传 overwrite=True")
        bundle = {
            "version": 1,
            "created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
            "gp": gp_obj,
            "feature_names": list(feature_names) if feature_names is not None else None,
            "best_idx": int(best_idx) if best_idx is not None else None,
            "meta": meta or {}
        }
        _atomic_pickle_dump(bundle, save_path)
        manifest_rec = {
            "file": os.path.basename(save_path),
            "idx": int(idx),
            "best_idx": bundle["best_idx"],
            "created_at": bundle["created_at"],
            "n_features": None if feature_names is None else len(feature_names),
            "meta": bundle["meta"]
        }
        _append_manifest(out_dir, manifest_rec)
        print(f"[SAVE] 模型已保存: {save_path}")
        return save_path

# —— 避免与已有 model_*.pkl 冲突
def _existing_model_indices(out_dir: str):
    files = glob.glob(os.path.join(out_dir, "model_*.pkl"))
    idxs = []
    for p in files:
        m = re.search(r"model_(\d+)\.pkl$", os.path.basename(p))
        if m:
            idxs.append(int(m.group(1)))
    return sorted(set(idxs))

def _allocate_model_idx(out_dir: str, prefer_idx=None):
    used = set(_existing_model_indices(out_dir))
    if prefer_idx is not None and int(prefer_idx) not in used:
        print(f"[INFO] 使用显式指定的 MODEL_IDX={int(prefer_idx)}")
        return int(prefer_idx)
    # 从 0 开始找第一个空位
    i = 0
    while i in used:
        i += 1
    if prefer_idx is not None:
        print(f"[INFO] 指定的 MODEL_IDX={int(prefer_idx)} 已存在，自动改用 {i}")
    else:
        print(f"[INFO] 自动分配 MODEL_IDX={i}")
    return i

# —— 确定输出目录（优先使用已有 MODEL_DIR；否则创建新 run 目录）
RUN_DIR = globals().get("MODEL_DIR", None)
if not RUN_DIR:
    RUN_DIR = new_run_dir(BASE_DIR)  # 会把全局 MODEL_DIR 指向该目录
os.makedirs(RUN_DIR, exist_ok=True)

# —— 分配编号；允许通过全局 MODEL_IDX 强制指定
MODEL_IDX = _allocate_model_idx(RUN_DIR, prefer_idx=globals().get("MODEL_IDX", None))

# —— 记录元数据（train/val 时间会在后面回填 val_range）
meta = {
    "train_range": [str(np.min(dates_tr)), str(np.max(dates_tr))],
    "val_range": None,  # 待回填
    "stride": int(STRIDE) if "STRIDE" in globals() else None,
    "window": int(WINDOW_SIZE) if "WINDOW_SIZE" in globals() else None,
    "horizon": int(HORIZON_STP) if "HORIZON_STP" in globals() else None,
    "random_state": int(SEED) if "SEED" in globals() else None,
    "function_set": list(getattr(gp, "function_set", []))
}

# —— 保存为 model_{idx}.pkl（携带 best_idx/feature_names/meta）
model_path = save_gp_model_bundle(
    gp_obj=gp,
    idx=MODEL_IDX,
    out_dir=RUN_DIR,
    feature_names=FEATURE_NAMES,
    best_idx=best_idx,
    meta=meta,
    overwrite=False
)

# ============================================================
# 3) 读取验证集 & 评估同一组件（与训练完全解耦的数据）
#   - 再次构造 v-特征
#   - 用训练时选出的 best_idx 在验证段做逐日IC
#
# ============================================================
print("\n 读取验证集 …")
df_val = purge(pl.scan_parquet(VAL_FILE).sort("ts"))
df_val = ensure_ts_datetime(df_val, "ts").sort("ts")
print("完成，行数 =", len(df_val))

# 训练区间应完全早于验证区间（防止泄漏）
tr_end = pd.to_datetime(df_tr["ts"].to_pandas()).max()
val_start = pd.to_datetime(df_val["ts"].to_pandas()).min()
assert tr_end < val_start, f"时间泄漏：train 结束 {tr_end} 不早于 val 开始 {val_start}"

# 验证集特征 / 标签
X_raw_val = build_matrix(df_val)
mid_now_val = (df_val["Buy1Price"] + df_val["Sell1Price"]) / 2
mid_fut_val = mid_now_val.shift(-HORIZON_STP)
ret_val = ((mid_fut_val - mid_now_val) / mid_now_val).fill_nan(0).to_numpy()

idx_val = np.arange(WINDOW_SIZE-1, len(df_val)-HORIZON_STP, STRIDE)
y_ic_val = ret_val[idx_val]
X_val = np.stack([compute_v_features(X_raw_val[t-WINDOW_SIZE+1:t+1])
                  for t in idx_val], axis=0).astype(np.float32)
dates_val = df_val["ts"].dt.date().to_numpy()[idx_val]

print("验证特征矩阵:", X_val.shape, "| 有效样本日期数(去重):", len(np.unique(dates_val)))

# 仅评估训练阶段选出的最佳组件
comps_val = gp.transform(X_val)
val_mean_ic, val_mean_abs, val_days, val_day_map = daily_ic_stats(
    comps_val[:, best_idx], y_ic_val, dates_val
)
# 兼容下游写报告/日志用的变量名
val_ic     = float(val_mean_ic)
val_abs_ic = float(abs(val_mean_ic))

print("\n====== 验证集（逐日IC）======")
print(f"验证集 |日均IC| = {abs(val_mean_ic):.4f}")   # 主口径：先均值，再取绝对值
print(f"验证集 日均|IC| = {val_mean_abs:.4f}")     # 参考口径
print(f"验证集 有效天数 = {val_days}")

# 打印前后各5天的逐日IC，便于快速查看稳定性
if val_day_map:
    kv = sorted(val_day_map.items(), key=lambda x: x[1])
    worst5 = kv[:5]
    best5  = kv[-5:]
    print("\n最差 5 天：")
    for d, ic in worst5: print(d, f"{ic:.4f}")
    print("\n最好 5 天：")
    for d, ic in best5:  print(d, f"{ic:.4f}")

# —— 回填验证区间到 manifest
try:
    manifest_path = os.path.join(RUN_DIR, "manifest.json")
    if os.path.exists(manifest_path):
        with open(manifest_path, "r", encoding="utf-8") as f:
            mani = json.load(f)
        for rec in mani.get("models", []):
            if rec.get("file") == os.path.basename(model_path):
                rec.setdefault("meta", {})
                rec["meta"]["val_range"] = [str(np.min(dates_val)), str(np.max(dates_val))]
        with open(manifest_path, "w", encoding="utf-8") as f:
            json.dump(mani, f, ensure_ascii=False, indent=2)
        print("[MANIFEST] 已回填验证区间 val_range")
except Exception as e:
    print("[WARN] 回填 manifest 失败：", e)


In [None]:
pip install python-docx

In [None]:
import os, re, pathlib, cloudpickle, json, docx

# ========= 1 基本路径 =========
BASE_DIR = "/content/drive/My Drive/gplearn"
pathlib.Path(BASE_DIR).mkdir(parents=True, exist_ok=True)  # 保证目录存在

# ========= 2 计算下一个编号 =========
def next_index(folder):
    nums = [int(m.group(1))
            for name in os.listdir(folder)
            if (m := re.match(r"model_(\d+)\.pkl$", name))]
    return max(nums) + 1 if nums else 1

idx = next_index(BASE_DIR)
print(f"即将保存为 model_{idx}.pkl")

# ========= 3 保存模型 =========
model_path = os.path.join(BASE_DIR, f"model_{idx}.pkl")
with open(model_path, "wb") as f:
    cloudpickle.dump(gp, f)
print(" 模型已保存:", model_path)

# ========= 4 追加 / 创建 Word 文档 =========
docx_path = os.path.join(BASE_DIR, "因子总表.docx")
doc = docx.Document(docx_path) if os.path.exists(docx_path) else docx.Document()

doc.add_heading(f'模型 {idx}', level=2)
doc.add_paragraph(expr_str)
doc.add_paragraph(f"训练集 IC = {best_ic_tr:.4f}")
doc.add_paragraph(f"验证集 IC = {val_ic:.4f}")
doc.add_paragraph("")   # 空行分隔

doc.save(docx_path)
print(" Word 已更新:", docx_path)


In [None]:
import os
BASE_DIR = "/content/drive/My Drive/gplearn"
print("当前目录文件：", os.listdir(BASE_DIR))


批量提取 gplearn 模型生成的因子，计算它们的Spearman 相关性矩阵，并找出高相关的因子对，同时将结果导出为 CSV 文件

In [None]:

import glob
import pandas as pd
import numpy as np
from scipy.stats import spearmanr


try:
    X_v_tr
except NameError:
    X_v_tr = X_tr
try:
    X_v_val
except NameError:
    X_v_val = X_val

# —— 模型目录：把当前 BASE_DIR 以及上面训练时的 MODEL_DIR 都纳入 ——
GP_DIRS = []
for d in {BASE_DIR, MODEL_DIR}:
    if isinstance(d, str) and os.path.isdir(d):
        GP_DIRS.append(d)
print(" 搜索模型目录：", GP_DIRS)

# —— 只取“最佳组件”，若未保存 best_idx 就默认取 #0 ——
# 因为我上周做的时候没有保存 best_idx，所以这里默认取 #0

MODE = "best"          # 可改成 "all" 抽出每个模型的全部组件
CORR_TH = 0.90         # 打印高相关对阈值
INCLUDE_ORIG = False   # True 时原始特征也一起进相关矩阵（矩阵会更大）

def load_gp_or_bundle(path):
    with open(path, "rb") as f:
        obj = cloudpickle.load(f)
    if isinstance(obj, dict) and "gp" in obj:
        return {
            "gp": obj["gp"],
            "best_idx": obj.get("best_idx"),
            "expr_str": obj.get("expr_str"),
            "feature_names": obj.get("feature_names"),
        }
    return {"gp": obj, "best_idx": None, "expr_str": None, "feature_names": None}

# 1) 收集所有模型文件
gp_paths = []
for d in GP_DIRS:
    gp_paths += glob.glob(os.path.join(d, "model_*.pkl"))
gp_paths = sorted(set(gp_paths))
print(f" 共发现 {len(gp_paths)} 个模型文件")

# 2) 抽取组件
# 因为之前有版本不兼容问题，所以这里进行详细的分类和跳过
features_tr_list, features_val_list, colnames_gp = [], [], []
for p in gp_paths:
    name = os.path.basename(p)
    try:
        bundle = load_gp_or_bundle(p)
        gp_obj  = bundle["gp"]
        # 是否训练过
        if not hasattr(gp_obj, "_best_programs") or not gp_obj._best_programs:
            print(f"  • {name:>14s} → 尚未训练（NotFitted），跳过")
            continue
        comps_tr  = gp_obj.transform(X_v_tr)  # (N_tr, k)
        comps_val = gp_obj.transform(X_v_val) # (N_val, k)
        k = comps_tr.shape[1]
    except Exception as e:
        print(f"  • {name:>14s} → transform 失败：{e.__class__.__name__}，跳过")
        continue

    if MODE == "best":
        bidx = 0 if bundle["best_idx"] is None else int(bundle["best_idx"])
        if bundle["best_idx"] is None:
            print(f"  • {name:>14s} → 未存 best_idx，默认用组件 #0")
        features_tr_list.append(comps_tr[:, bidx])
        features_val_list.append(comps_val[:, bidx])
        colnames_gp.append(f"{name.replace('.pkl','')}_best{bidx}")
        print(f"  • {name:>14s} → 用组件 #{bidx}")
    else:
        for j in range(k):
            features_tr_list.append(comps_tr[:, j])
            features_val_list.append(comps_val[:, j])
            colnames_gp.append(f"{name.replace('.pkl','')}_c{j}")
        print(f"  • {name:>14s} → 抽取全部 {k} 个组件")

if not features_tr_list:
    print("没有成功抽取到任何 gplearn 因子，结束。")
else:
    # 3) 组装 DataFrame
    df_gp_tr  = pd.DataFrame(np.column_stack(features_tr_list), columns=colnames_gp)
    df_gp_val = pd.DataFrame(np.column_stack(features_val_list), columns=colnames_gp)
    print(f" gplearn 因子（训练/验证）形状：{df_gp_tr.shape} / {df_gp_val.shape}")

    # 4) 相关性（训练集）
    if INCLUDE_ORIG:
        df_tr_all = pd.concat([pd.DataFrame(X_tr), df_gp_tr], axis=1)
        corr_df = df_tr_all.corr(method="spearman")
        out_csv = os.path.join(GP_DIRS[0], "all_features_spearman_corr_train.csv")
        print(" 已计算：原始+gplearn 全特征 Spearman 相关矩阵（训练集）")
    else:
        corr_df = df_gp_tr.corr(method="spearman")
        out_csv = os.path.join(GP_DIRS[0], "gp_factor_spearman_corr_train.csv")
        print(" 已计算：gplearn 因子 Spearman 相关矩阵（训练集）")

    # 5) 打印高相关对
    print(f"\n 高相关对（|corr| ≥ {CORR_TH}，不含对角）：")
    cols = corr_df.columns.tolist()
    high_pairs = []
    for i in range(len(cols)):
        for j in range(i+1, len(cols)):
            c = corr_df.iat[i, j]
            if np.isfinite(c) and abs(c) >= CORR_TH:
                high_pairs.append((cols[i], cols[j], float(c)))
    if high_pairs:
        for a, b, c in sorted(high_pairs, key=lambda x: -abs(x[2]))[:50]:
            print(f"  {a:>28s} ~ {b:<28s} : {c:+.4f}")
    else:
        print("  无。")

    # 6) 导出
    corr_df.to_csv(out_csv, index=True)
    print(" 相关矩阵已导出：", out_csv)


在评估前对 gplearn 因子数据做预处理，包括去除重复列名、清洗因子列表、统一评估参数配置，并准备好输出目录。

In [None]:

import os, json, time, re, glob, warnings
import numpy as np
import pandas as pd
import cloudpickle
from scipy.stats import spearmanr


# tqdm 进度条（无则自动安装）
try:
    from tqdm import tqdm
except Exception:
    !pip -q install tqdm
    from tqdm import tqdm

# 降噪：静默某些历史包的弃用警告（不影响结果）
warnings.filterwarnings("ignore", category=DeprecationWarning)

def banner(txt: str):
    """阶段标题打印"""
    print("\n" + "="*70)
    print(txt)
    print("="*70)

def save_csv(df: pd.DataFrame, path: str, index=False, head_n: int = 5, name: str = "表格"):
    """保存 CSV + 控制台预览（前 n 行）"""
    os.makedirs(os.path.dirname(path), exist_ok=True)
    df.to_csv(path, index=index, encoding="utf-8-sig")
    print(f" {name} 已保存：{path}")
    with pd.option_context('display.max_rows', 5, 'display.max_columns', 12):
        print(f" {name} 预览（前{head_n}行）:")
        print(df.head(head_n))

def _dedup_columns_inplace(df: pd.DataFrame, name: str) -> pd.DataFrame:
    """
    对 DataFrame 执行“列名去重”，仅保留每个重名的**首列**。
    返回一个新的 DataFrame；若无重复则原样返回。
    """
    dup_mask = pd.Index(df.columns).duplicated(keep='first')
    if dup_mask.any():
        dup_names = list(pd.Index(df.columns)[dup_mask])
        print(f"[WARN] {name} 存在重复列名 {dup_mask.sum()} 个，仅保留首列。示例: {dup_names[:10]}")
        return df.loc[:, ~dup_mask]
    return df

# ========== 目录/全局配置 ==========
EVAL_DIR = os.path.join(BASE_DIR, "eval_artifacts")
os.makedirs(EVAL_DIR, exist_ok=True)
print(f"评估产物目录：{EVAL_DIR}")

# 【统一入口】相关性阈值（所有步骤仅认这一处）
CORR_TH = 0.95
print(f"[CONF] 统一 CORR_TH = {CORR_TH}")

# 【统一分组数】
Q = 5

# ========== 预处理：去重列名，清理候选列表 ==========
# 1) 对训练段、验证段的因子矩阵做列名去重（避免 DataFrame['col'] 返回二维）
df_gp_tr  = _dedup_columns_inplace(df_gp_tr,  "df_gp_tr")
df_gp_val = _dedup_columns_inplace(df_gp_val, "df_gp_val")

# 2) 清洗因子候选列表：仅保留验证段里存在的列，并去重保持顺序
_col_before = len(colnames_gp)
colnames_gp = [c for c in colnames_gp if c in df_gp_val.columns]
colnames_gp = list(dict.fromkeys(colnames_gp))
if len(colnames_gp) != _col_before:
    print(f"[INFO] 因子名列表已清洗：原 { _col_before } → 现 { len(colnames_gp) }（去除重复/缺失）")





单因子 IC（信息系数）评估模块，分别在验证集和训练集上逐个计算因子的日均 IC 指标

In [None]:

# -----------------------------
# 步骤 1/6：单因子评估（Validation）
# -----------------------------
banner("步骤 1/6：单因子评估（Validation）")
t0 = time.time()
val_rows = []
for f in tqdm(colnames_gp, desc="逐因子计算 Validation IC"):
    m_ic, m_abs, n_days, _ = daily_ic_stats(df_gp_val[f].values, y_ic_val, dates_val)
    val_rows.append({
        "factor": f,
        "val_mean_ic": m_ic,           # 日均IC
        "val_abs_mean_ic": abs(m_ic),  # |日均IC|（排序指标）
        "val_mean_abs_ic": m_abs,      # 日均|IC|（仅仅作为参考）
        "val_days": n_days
    })
val_ic_df = pd.DataFrame(val_rows).sort_values("val_abs_mean_ic", ascending=False)
val_ic_path = os.path.join(EVAL_DIR, "single_factor_ic_val.csv")
save_csv(val_ic_df, val_ic_path, name="单因子 IC（Validation）")
print(f" 用时：{time.time()-t0:.2f}s")

# —— 方向性映射：按验证期 IC 的符号（后续分组收益/表达式/入库都用它）
VAL_IC_SIGN = dict(zip(val_ic_df["factor"].astype(str), np.sign(val_ic_df["val_mean_ic"].astype(float)).astype(int)))
print(f"[INFO] 方向映射（验证期 IC 符号）条数: {len(VAL_IC_SIGN)}")


# -----------------------------
# 步骤 1b：单因子评估（Train）
# -----------------------------
banner("（可选）步骤 1b：单因子评估（Train）")
t0 = time.time()
tr_rows = []
for f in tqdm(colnames_gp, desc="逐因子计算 Train IC"):
    m_ic, m_abs, n_days, _ = daily_ic_stats(df_gp_tr[f].values, y_ic_tr, dates_tr)
    tr_rows.append({
        "factor": f,
        "tr_mean_ic": m_ic,
        "tr_abs_mean_ic": abs(m_ic),
        "tr_mean_abs_ic": m_abs,
        "tr_days": n_days
    })
tr_ic_df = pd.DataFrame(tr_rows).sort_values("tr_abs_mean_ic", ascending=False)
tr_ic_path = os.path.join(EVAL_DIR, "single_factor_ic_train.csv")
save_csv(tr_ic_df, tr_ic_path, name="单因子 IC（Train）")
print(f" 用时：{time.time()-t0:.2f}s")



计算 gplearn 提取的因子之间的 Spearman 相关性矩阵，并找出高相关因子对供人工检查。

In [None]:

# -----------------------------
# 步骤 2/6：因子相关性矩阵（Validation）
#   * 这里再次确保相关矩阵只基于“唯一列名”的矩阵来计算，避免 loc 返回 Series
# -----------------------------
banner("步骤 2/6：因子相关性矩阵（Validation）")
t0 = time.time()

# 相关矩阵基于“去重后的验证集因子矩阵”
corr_df = df_gp_val.corr(method="spearman")
corr_path = os.path.join(EVAL_DIR, "gp_factor_spearman_corr_val.csv")
save_csv(corr_df, corr_path, index=True, name="相关性矩阵（Validation）")

# 打印 |ρ| >= CORR_TH 的 Top 20 高相关对，便于人工审阅
pairs = []
cols = corr_df.columns.tolist()
for i in range(len(cols)):
    for j in range(i+1, len(cols)):
        c = corr_df.iat[i, j]
        if np.isfinite(c) and abs(c) >= CORR_TH:
            pairs.append((cols[i], cols[j], float(c)))
print(f" |ρ| ≥ {CORR_TH} 的高相关对数量：{len(pairs)}")
for a, b, c in sorted(pairs, key=lambda x: -abs(x[2]))[:20]:
    print(f"  • {a}  ~  {b} : {c:+.4f}")
print(f" 用时：{time.time()-t0:.2f}s")



分组收益（Quantile Returns）评估模块，在验证集上根据因子值进行分位数分组，计算每组的平均收益，并结合验证期 IC 的方向来得到方向性调整后的多空收益。

In [None]:

# -----------------------------
# 步骤 3/6：分组收益（Validation，含“方向性”）
# -----------------------------
banner("步骤 3/6：分组收益（Validation，含方向性）")
t0 = time.time()

def quantile_returns(factor: np.ndarray, y: np.ndarray, dates: np.ndarray, q: int = Q):
    """
    按日分位分组收益（稳健版）：
      - 接受 factor/y 为 1D 或 2D；若为 2D，仅取第 1 列
      - 每日切片做 NaN/Inf 清理
      - 返回 (各分位收益均值列表, spread = Qq - Q1)，若存在缺失则 spread=None
    """
    # ---- 统一成 1D ----
    x_all = np.asarray(factor)
    y_all = np.asarray(y)

    if x_all.ndim == 2:
        x_all = x_all[:, 0]
    elif x_all.ndim != 1:
        raise ValueError(f"不支持的 factor 维度: {x_all.ndim}")

    if y_all.ndim == 2:
        y_all = y_all[:, 0]
    elif y_all.ndim != 1:
        raise ValueError(f"不支持的 y 维度: {y_all.ndim}")

    if len(x_all) != len(y_all) or len(x_all) != len(dates):
        raise ValueError(f"长度不一致：len(x)={len(x_all)}, len(y)={len(y_all)}, len(dates)={len(dates)}")

    uniq = np.unique(dates)
    buckets = [[] for _ in range(q)]

    for d in uniq:
        mask = (dates == d)
        if mask.sum() < q:
            continue

        x = x_all[mask].astype(float, copy=False)
        r = y_all[mask].astype(float, copy=False)

        ok = np.isfinite(x) & np.isfinite(r)
        if ok.sum() < q:
            continue
        x = x[ok]; r = r[ok]

        try:
            qs = np.quantile(x, np.linspace(0, 1, q + 1))
        except Exception:
            continue
        # 包含两端
        qs[0], qs[-1] = -np.inf, np.inf

        for k in range(q):
            sel = (x > qs[k]) & (x <= qs[k+1])  # 一维布尔
            if sel.sum() >= 3:
                buckets[k].append(float(np.nanmean(r[sel])))

    avg = [np.nan if len(b) == 0 else float(np.nanmean(b)) for b in buckets]
    spread = None if any(np.isnan(avg)) else float(avg[-1] - avg[0])
    return avg, spread

qr_rows = []
for f in tqdm(val_ic_df["factor"].tolist(), desc=f"按 {Q} 分位分组计算"):
    avg, spread = quantile_returns(df_gp_val[f].values, y_ic_val, dates_val, q=Q)
    # 方向：按验证期 val_mean_ic 的符号自动决定多空方向
    direction = VAL_IC_SIGN.get(str(f), 1)  # 找不到时默认 +1
    signed = None if spread is None else float(direction * spread)
    row = {
        "factor": f,
        **{f"Q{k+1}_mean": avg[k] for k in range(Q)},
        f"LongShort_Q{Q}_minus_Q1": spread,          # 原始 Q5-Q1
        "direction": int(direction),                 # +1 多（Q5-Q1），-1 空（Q1-Q5）
        "signed_spread": signed                      # 按方向后的收益
    }
    qr_rows.append(row)
qr_df = pd.DataFrame(qr_rows)
qr_path = os.path.join(EVAL_DIR, f"quantile_returns_Q{Q}_val.csv")
save_csv(qr_df, qr_path, name=f"分组收益（Q={Q}, 含方向性, Validation）")
print(f" 用时：{time.time()-t0:.2f}s")


基于验证集的因子剪枝，目的是在保留预测力较强因子的同时，去掉与已保留因子高度相关的冗余因子

In [None]:


# -----------------------------
# 步骤 4/6：剪枝（基于 Validation 的 |日均IC| 与相关性）
#   * 使用“安全取值”避免重复名/缺失导致的异常
# -----------------------------
banner("步骤 4/6：剪枝（基于 Validation 的 |日均IC| 与相关性）")
t0 = time.time()

def _get_corr_safe(cdf: pd.DataFrame, a: str, b: str) -> float:
    """从相关矩阵安全取标量，异常时返回 NaN。"""
    try:
        v = cdf.loc[a, b]
    except KeyError:
        return np.nan
    if isinstance(v, (pd.Series, np.ndarray)):  # 理论上不会发生，做兜底
        arr = np.asarray(v).reshape(-1)
        return float(arr[0]) if arr.size else np.nan
    try:
        return float(v)
    except Exception:
        return np.nan

# 延续你的排序逻辑：按 |日均IC|（val_abs_mean_ic）降序遍历
ranked = val_ic_df["factor"].tolist()
KEEP, DROP = [], []
present = set(corr_df.columns)  # 相关矩阵里真正可用的因子集合

for f in ranked:
    # 若该因子名不在相关矩阵（极少见），先“乐观保留”并提示
    if f not in present:
        print(f"[WARN] 剪枝：相关矩阵中找不到 {f}，暂时保留。")
        KEEP.append(f)
        continue

    keep_flag = True
    for g in KEEP:
        if g not in present:
            continue
        c = _get_corr_safe(corr_df, f, g)
        if np.isfinite(c) and abs(c) >= CORR_TH:
            # 若与已保留因子高度相关，则丢弃（贪心：越早进入 KEEP 的通常分数更高）
            keep_flag = False
            break
    if keep_flag:
        KEEP.append(f)
    else:
        DROP.append(f)

print(f" 剪枝前：{len(ranked)}  |  剪枝后保留：{len(KEEP)}  |  丢弃：{len(DROP)}（阈值 {CORR_TH}）")
print(" 保留前 10 个：", KEEP[:10])
print(" 丢弃前 10 个：", DROP[:10])

pruned_path = os.path.join(EVAL_DIR, f"selected_factors_after_pruning_corr{CORR_TH:.2f}.json")
with open(pruned_path, "w", encoding="utf-8") as f:
    json.dump({"keep": KEEP, "drop": DROP, "corr_threshold": CORR_TH}, f, ensure_ascii=False, indent=2)
print(" 剪枝清单已保存：", pruned_path)
print(f"用时：{time.time()-t0:.2f}s")




从已保留的 gplearn 因子中提取原始表达式、使用 Sympy 化简，并带上方向性信息，以便后续落地或入库。

In [None]:
# -----------------------------
# 步骤 5/6：表达式提取与化简（用于入库）
#   - 兼容模型文件命名（model_* 与 best_ic_gp_model_*）
#   - 化简表达式（Sympy），受保护算子：div/sqrt/log 等
#   - 输出 direction（用于后续落地）
# -----------------------------
banner("步骤 5/6：表达式提取与化简（用于入库）")
t0 = time.time()

# === 模型文件收集器（命名兼容 + 冲突取最新） ===
def collect_gp_model_paths(base_dir, model_dir=None):
    scan_dirs = {d for d in [base_dir, model_dir or base_dir] if isinstance(d, str) and os.path.isdir(d)}
    patterns = [
        "model_*.pkl",                   # 统一命名（新）
        "best_ic_gp_model_*.pkl",        # 历史命名（通用）
        "best_ic_gp_model_stride*.pkl",  # 历史命名（含 stride）
        "gp_model_*.pkl",                # 其他可能
    ]
    gp_paths = []
    for d in scan_dirs:
        for pat in patterns:
            gp_paths.extend(glob.glob(os.path.join(d, pat)))
    gp_paths = sorted(set(gp_paths))
    # 同名取最新（mtime）
    gp_map = {}
    for p in gp_paths:
        key = os.path.basename(p)
        if key not in gp_map or os.path.getmtime(p) > os.path.getmtime(gp_map[key]):
            gp_map[key] = p

    def latest_best_fallback():
        cands = [p for p in gp_paths if os.path.basename(p).startswith("best_ic_gp_model")]
        return max(cands, key=os.path.getmtime) if cands else None

    return gp_paths, gp_map, latest_best_fallback

GP_PATHS, GP_MAP, LATEST_BEST_FALLBACK = collect_gp_model_paths(BASE_DIR, globals().get("MODEL_DIR", BASE_DIR))
print(f"[INFO] 扫描到候选模型文件 {len(GP_PATHS)} 个")

# === 反序列化工具 ===
def load_bundle(path):
    with open(path, "rb") as f:
        obj = cloudpickle.load(f)
    # 兼容“直接存 gp 对象”的老格式
    return obj if (isinstance(obj, dict) and "gp" in obj) else {"gp": obj}

# === gplearn Program → 可读表达式（优先 export） ===
def pretty_expr(program, names):
    if hasattr(program, "export"):
        try:
            return program.export(feature_names=names)
        except Exception:
            pass
    expr = str(program)
    def repl(m):
        i = int(m.group(1))
        return names[i] if (0 <= i < len(names)) else f"X{i}"
    return re.sub(r'\bX(\d+)\b', repl, expr)

def gp_program_expr_str(gp_obj, comp_idx, feature_names):
    progs = getattr(gp_obj, "_best_programs", None)
    if not progs:
        return None
    if comp_idx >= len(progs):
        comp_idx = len(progs) - 1
    return pretty_expr(progs[comp_idx], feature_names)

# === 前缀表达式 → Sympy（含受保护算子） ===
import sympy as sp
EPS = sp.Float('1e-9')

_TOKEN = re.compile(r"\s*([A-Za-z_]\w*|[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?|\(|\)|,)")

def _tokenize(s: str):
    out = []; i, n = 0, len(s)
    while i < n:
        m = _TOKEN.match(s, i)
        if not m:
            raise ValueError(f"无法解析: '{s[i:i+32]}'")
        out.append(m.group(1)); i = m.end()
    return out

def _parse(tokens, i=0):
    if i >= len(tokens): raise ValueError("意外结束")
    tok = tokens[i]
    # 函数调用 name '(' args ')'
    if re.match(r"[A-Za-z_]\w*", tok) and i + 1 < len(tokens) and tokens[i+1] == '(':
        name = tok; i += 2
        args = []
        if tokens[i] == ')':
            i += 1
        else:
            while True:
                node, i = _parse(tokens, i)
                args.append(node)
                if tokens[i] == ',': i += 1; continue
                if tokens[i] == ')': i += 1; break
        return ('call', name, args), i
    # 数字
    if re.match(r"^[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?$", tok):
        return ('num', sp.Float(tok)), i + 1
    # 变量
    if re.match(r"^[A-Za-z_]\w*$", tok):
        return ('var', tok), i + 1
    raise ValueError(f"意外 token: {tok}")

def _var_symbol(var_token: str, feature_names=None):
    m = re.match(r"^[Xx](\d+)$", var_token)
    if m:
        idx = int(m.group(1))
        name = feature_names[idx] if (feature_names is not None and 0 <= idx < len(feature_names)) else f"X{idx}"
        return sp.Symbol(str(name), real=True)
    m = re.match(r"^[vV]_?(\d+)$", var_token)
    if m:
        return sp.Symbol(f"v{int(m.group(1))}", real=True)
    return sp.Symbol(var_token, real=True)

def _fold_bin(args, f2):
    if len(args) < 2: return args[0]
    acc = f2(args[0], args[1])
    for x in args[2:]:
        acc = f2(acc, x)
    return acc

def _ast_to_sympy(ast, feature_names=None):
    kind = ast[0]
    if kind == 'num': return ast[1]
    if kind == 'var': return _var_symbol(ast[1], feature_names)
    _, name, args_ast = ast
    args = [_ast_to_sympy(a, feature_names) for a in args_ast]
    lname = name.lower()

    if lname == 'add':  return _fold_bin(args, lambda a,b: a + b)
    if lname == 'sub':  return _fold_bin(args, lambda a,b: a - b)
    if lname == 'mul':  return _fold_bin(args, lambda a,b: a * b)
    if lname in ('div','pdiv','protected_division','safe_div'):
        return _fold_bin(args, lambda a,b: a / (b + EPS))  # 受保护除法

    if lname == 'neg':
        if len(args)!=1: raise ValueError("neg 需要 1 个参数")
        return -args[0]
    if lname in ('abs',):
        if len(args)!=1: raise ValueError("abs 需要 1 个参数")
        return sp.Abs(args[0])
    if lname in ('sqrt','psqrt','protected_sqrt'):
        if len(args)!=1: raise ValueError("sqrt 需要 1 个参数")
        return sp.sqrt(sp.Abs(args[0]))
    if lname in ('log','plog','protected_log'):
        if len(args)!=1: raise ValueError("log 需要 1 个参数")
        return sp.log(sp.Abs(args[0]) + EPS)
    if lname in ('exp',):
        if len(args)!=1: raise ValueError("exp 需要 1 个参数")
        return sp.exp(args[0])
    if lname in ('sin','cos','tan','sinh','cosh','tanh'):
        if len(args)!=1: raise ValueError(f"{name} 需要 1 个参数")
        return getattr(sp, lname)(args[0])
    if lname in ('inv','reciprocal'):
        if len(args)!=1: raise ValueError("inv 需要 1 个参数")
        return 1/(args[0] + EPS)
    if lname == 'pow':
        if len(args)!=2: raise ValueError("pow 需要 2 个参数")
        return args[0]**args[1]
    if lname == 'max':  return sp.Max(*args)
    if lname == 'min':  return sp.Min(*args)
    if lname == 'clip':
        if len(args)!=3: raise ValueError("clip 需要 3 个参数")
        x, lo, hi = args
        return sp.Min(sp.Max(x, lo), hi)
    if lname in ('sign','sgn'):
        if len(args)!=1: raise ValueError("sign 需要 1 个参数")
        return sp.sign(args[0])
    # 未知函数：保留为 Sympy 函数
    return sp.Function(name)(*args)

def expr_to_sympy(expr_str: str, feature_names=None, simplify_level: str = "fast"):
    tokens = _tokenize(expr_str)
    ast, j = _parse(tokens, 0)
    if j != len(tokens):
        raise ValueError("解析未到末尾，请检查括号/逗号配对")
    expr = _ast_to_sympy(ast, feature_names=feature_names)
    if simplify_level == "none":
        return sp.sstr(expr)
    expr = sp.simplify(expr)
    if simplify_level == "aggressive":
        expr = sp.simplify(sp.together(sp.cancel(expr)))
    return sp.sstr(expr)

# === 表达式提取主流程 ===
print(f"[INFO] 统一相关性阈值 CORR_TH = {CORR_TH}")

def _parse_factor_token(fac: str):
    """
    解析因子名，兼容 'model_12_best0' / 'model_7_c2' 等：
      返回 (model_key, tag, comp_idx)
      model_key: 'model_12.pkl' 之类；不存在时 None
      tag: 'best' 或 'c'
      comp_idx: 组件索引
    """
    m = re.search(r"(model_\d+)\s*_(best|c)(\d+)$", fac)
    if m:
        return m.group(1) + ".pkl", m.group(2), int(m.group(3))
    m2 = re.search(r"(best|c)(\d+)$", fac)
    comp_idx = int(m2.group(2)) if m2 else 0
    tag = m2.group(1) if m2 else "best"
    return None, tag, comp_idx

if "KEEP" not in globals():
    raise RuntimeError("未发现 KEEP（入库候选因子列表）。请先完成剪枝/筛选步骤。")

factor_expr_rows = []
ok_expr, ok_simpl = 0, 0

for fac in tqdm(KEEP, desc="提取/化简表达式"):
    model_key, tag, comp_idx = _parse_factor_token(str(fac))

    # 优先 'model_*.pkl'，找不到回退到最新 best_*（命名兼容）
    pkl_path = GP_MAP.get(model_key) if model_key else None
    if not pkl_path:
        pkl_path = LATEST_BEST_FALLBACK()

    if not pkl_path:
        print(f"[WARN] {fac}: 未找到对应的 .pkl（model_key={model_key}），跳过")
        factor_expr_rows.append({"factor": fac, "expr": None, "expr_simplified": None, "direction": 1})
        continue

    bundle = load_bundle(pkl_path)
    gp_obj = bundle.get("gp", None)
    if gp_obj is None:
        print(f"[WARN] {fac}: {os.path.basename(pkl_path)} 不含 'gp'，跳过")
        factor_expr_rows.append({"factor": fac, "expr": None, "expr_simplified": None, "direction": 1})
        continue

    # 列名来源：bundle.feature_names > 全局 FEATURE_NAMES > ORIGINAL_FEATURE_NAMES > X<i>
    f_names = (bundle.get("feature_names") or
               globals().get("FEATURE_NAMES") or
               globals().get("ORIGINAL_FEATURE_NAMES") or
               [f"X{i}" for i in range(256)])

    # 若是 best，且包里提供了 best_idx，以包内为准
    if tag == "best" and bundle.get("best_idx") is not None:
        comp_idx = int(bundle["best_idx"])

    # 提取 + 化简
    raw_expr = gp_program_expr_str(gp_obj, comp_idx, f_names)
    try:
        simp_str = expr_to_sympy(raw_expr, feature_names=f_names, simplify_level="fast") if raw_expr else None
    except Exception as e:
        print(f"[WARN] {fac}: 化简失败：{e}")
        simp_str = None

    if raw_expr: ok_expr += 1
    if simp_str is not None: ok_simpl += 1

    direction = VAL_IC_SIGN.get(str(fac), 1)  # 将方向带出用于入库/回测
    factor_expr_rows.append({
        "factor": str(fac),
        "expr": raw_expr,
        "expr_simplified": simp_str,
        "direction": int(direction),
        "corr_threshold_used": CORR_TH
    })

# —— 输出结果（以及完整打印不省略）——
import difflib

def _norm_expr(s):
    if s is None:
        return None
    # 统一比较用：去空白
    return re.sub(r"\s+", "", str(s))

expr_df = pd.DataFrame(factor_expr_rows)

# 标记是否变化 & 基本长度指标
expr_df["changed"] = (_norm_expr(expr_df["expr"]) != _norm_expr(expr_df["expr_simplified"]))
expr_df["len_raw"] = expr_df["expr"].map(lambda x: len(x) if isinstance(x, str) else pd.NA)
expr_df["len_simpl"] = expr_df["expr_simplified"].map(lambda x: len(x) if isinstance(x, str) else pd.NA)
expr_df["delta_len"] = expr_df["len_simpl"] - expr_df["len_raw"]

# 粗略操作符计数（可辅助判断复杂度是否降低）
_op_pat = re.compile(
    r"\b(add|sub|mul|div|pdiv|protected_division|safe_div|sqrt|psqrt|protected_sqrt|log|plog|protected_log|"
    r"exp|sin|cos|tan|sinh|cosh|tanh|min|max|clip|pow|abs|neg|sign|inv|reciprocal)\b",
    re.I
)
expr_df["ops_raw"] = expr_df["expr"].map(lambda s: len(_op_pat.findall(s)) if isinstance(s, str) else pd.NA)
expr_df["ops_simpl"] = expr_df["expr_simplified"].map(lambda s: len(_op_pat.findall(s)) if isinstance(s, str) else pd.NA)
expr_df["delta_ops"] = expr_df["ops_simpl"] - expr_df["ops_raw"]

expr_csv = os.path.join(EVAL_DIR, "selected_factor_expressions.csv")
save_csv(expr_df, expr_csv, name="入库候选表达式（含原始/化简/方向/复杂度）")

print(f" 表达式提取成功 {ok_expr}/{len(KEEP)}；化简成功 {ok_simpl}/{len(KEEP)}")
print(f" 化简后更短的个数：{int((expr_df['delta_len']<0).fillna(False).sum())} / {len(expr_df)}")
print(f" 用时：{time.time()-t0:.2f}s")

# 完整打印：展示 原始 vs 化简后（不截断）
display_cols = [
    "factor", "direction", "changed",
    "len_raw", "len_simpl", "delta_len",
    "ops_raw", "ops_simpl", "delta_ops",
    "expr", "expr_simplified"
]
with pd.option_context('display.max_colwidth', None, 'display.width', None, 'display.max_columns', None):
    try:
        from IPython.display import display
        display(expr_df[display_cols])
    except Exception:
        print(expr_df[display_cols].to_string(index=False))

# 另存一份纯文本，附带 unified diff（控制长度，避免过大）
expr_txt = os.path.join(EVAL_DIR, "selected_factor_expressions.txt")
with open(expr_txt, "w", encoding="utf-8") as f:
    for i, row in expr_df.iterrows():
        fac = row["factor"]; direc = row["direction"]
        raw = str(row["expr"]) if row["expr"] is not None else ""
        simp = str(row["expr_simplified"]) if row["expr_simplified"] is not None else ""
        f.write(
            f"[{i}] factor={fac}  direction={direc}  changed={bool(row['changed'])}  "
            f"len_raw={row['len_raw']}  len_simpl={row['len_simpl']}  delta_len={row['delta_len']}  "
            f"ops_raw={row['ops_raw']}  ops_simpl={row['ops_simpl']}  delta_ops={row['delta_ops']}\n"
        )
        f.write("—— RAW ——\n"); f.write(raw + "\n")
        f.write("—— SIMPLIFIED ——\n"); f.write(simp + "\n")

        diff_lines = list(difflib.unified_diff(
            raw.splitlines(keepends=False),
            simp.splitlines(keepends=False),
            fromfile="raw", tofile="simplified", lineterm=""
        ))
        if diff_lines:
            f.write("—— DIFF ——\n")
            # 最多写入 200 行 diff，避免文件过大
            for line in diff_lines[:200]:
                f.write(line + "\n")
        f.write("-" * 120 + "\n")

print(f"[SAVE] 纯文本（含原始/化简/小diff）已写入: {expr_txt}")


对每个因子分别做简单线性回归，评估它与目标变量（训练集收益）之间的拟合程度

In [None]:
# -----------------------------


# -----------------------------
# 步骤 6/6：单因子线性回归筛选

# -----------------------------
banner("预处理步骤：单因子线性回归筛选")
t0 = time.time()

# 导入所需库
import numpy as np
import pandas as pd
from sklearn.metrics import r2_score, mean_squared_error

# 存储线性回归结果
linear_reg_results = []

# 手动实现简单线性回归（避免依赖sklearn.LinearRegression）
def simple_linear_regression(X, y):
    """简单实现线性回归，返回系数和截距"""
    X = np.asarray(X).reshape(-1)
    y = np.asarray(y).reshape(-1)

    # 确保数据有效
    valid = np.isfinite(X) & np.isfinite(y)
    X, y = X[valid], y[valid]

    if len(X) < 5:  # 至少需要一些样本
        return 0, 0

    # 计算相关系数
    mean_x, mean_y = np.mean(X), np.mean(y)
    cov_xy = np.sum((X - mean_x) * (y - mean_y))
    var_x = np.sum((X - mean_x) ** 2)

    if var_x == 0:
        return 0, mean_y  # 如果X无方差，返回y的均值

    # 计算斜率和截距
    slope = cov_xy / var_x
    intercept = mean_y - slope * mean_x

    return slope, intercept

# 遍历所有因子进行单因子线性回归
for factor in tqdm(colnames_gp, desc="单因子线性回归"):
    try:
        # 提取单个因子作为特征
        X = df_gp_tr[factor].values
        y = y_ic_tr

        # 跳过含有NaN/Inf的因子
        if not np.all(np.isfinite(X)):
            print(f"跳过 {factor}：包含NaN或Inf值")
            continue

        # 使用自定义的简单线性回归
        coef, intercept = simple_linear_regression(X, y)

        # 计算预测值
        y_pred = coef * X + intercept

        # 计算评估指标
        r2 = r2_score(y, y_pred)
        mse = mean_squared_error(y, y_pred)

        # 保存结果
        linear_reg_results.append({
            "factor": factor,
            "r2": r2,
            "mse": mse,
            "coef": coef,
            "abs_coef": abs(coef)
        })

    except Exception as e:
        print(f"处理因子 {factor} 时出错: {e}")

# 转换为DataFrame并排序
if linear_reg_results:
    linear_reg_df = pd.DataFrame(linear_reg_results)

    # 按R²降序排序（更高表示更好的拟合）
    linear_reg_df = linear_reg_df.sort_values("r2", ascending=False)

    # 保存结果
    linear_reg_path = os.path.join(EVAL_DIR, "single_factor_linear_regression.csv")
    save_csv(linear_reg_df, linear_reg_path, name="单因子线性回归结果")

    # 可选：基于线性回归结果选择Top N个因子
    TOP_N = min(50, len(linear_reg_df))  # 可调整数量
    top_factors = linear_reg_df.head(TOP_N)["factor"].tolist()

    print(f"\n线性回归Top {TOP_N} 因子:")
    for i, (_, row) in enumerate(linear_reg_df.head(TOP_N).iterrows()):
        print(f"{i+1}. {row['factor']} - R²: {row['r2']:.4f}, MSE: {row['mse']:.6f}, 系数: {row['coef']:.6f}")

    # 可选：创建一个新的变量来存储线性回归筛选的因子
    LINEAR_REG_SELECTED = top_factors

    # 保存筛选的因子列表
    linear_selected_path = os.path.join(EVAL_DIR, "linear_regression_selected_factors.json")
    with open(linear_selected_path, "w", encoding="utf-8") as f:
        json.dump({"selected_factors": LINEAR_REG_SELECTED}, f, ensure_ascii=False, indent=2)
    print(f"线性回归筛选的因子已保存到: {linear_selected_path}")
else:
    print("没有得到有效的线性回归结果")
    LINEAR_REG_SELECTED = []

print(f"单因子线性回归筛选完成，用时: {time.time()-t0:.2f}s")


# colnames_gp = LINEAR_REG_SELECTED
print(f"继续使用原始的 {len(colnames_gp)} 个因子进行后续分析")

In [None]:


# -----------------------------
banner("入库清单与摘要报告")
t0 = time.time()

registry_path = os.path.join(BASE_DIR, "factor_registry.json")
registry = {"version": 1, "criteria": {
    "selection_set": "validation",
    "metric": "|mean_daily_IC|",     # 说明：排序采用 val_abs_mean_ic
    "corr_threshold": CORR_TH,
    "quantiles": Q,
}}

# 便于合并：转成 dict
val_ic_map = val_ic_df.set_index("factor").to_dict(orient="index")
qr_map     = qr_df.set_index("factor").to_dict(orient="index")

registry["factors"] = []
for f in KEEP:
    item = {"name": f}
    # 评估指标
    item["metrics"]  = val_ic_map.get(f, {})
    # 分组收益（含 signed_spread 与 direction）
    item["quantile"] = qr_map.get(f, {})
    # 表达式（若有）
    rec = expr_df[expr_df["factor"] == f]
    if not rec.empty:
        item["expr"] = rec.iloc[0]["expr"]
        item["expr_simplified"] = rec.iloc[0]["expr_simplified"]
        item["direction"] = int(rec.iloc[0]["direction"])
    # 统一附上当前阈值
    item["corr_threshold_used"] = CORR_TH
    registry["factors"].append(item)

with open(registry_path, "w", encoding="utf-8") as f:
    json.dump(registry, f, ensure_ascii=False, indent=2)
print(" 入库清单已更新：", registry_path)

md_path = os.path.join(EVAL_DIR, "evaluation_summary.md")
with open(md_path, "w", encoding="utf-8") as f:
    f.write("# 因子评估与入库摘要（Validation 驱动）\n\n")
    f.write(f"- 候选因子数：{len(colnames_gp)}\n")
    f.write(f"- 剪枝后保留：{len(KEEP)}（相关性阈值 {CORR_TH}）\n")
    f.write(f"- 单因子IC（Val）：{os.path.basename(os.path.join(EVAL_DIR, 'single_factor_ic_val.csv'))}\n")
    f.write(f"- 单因子IC（Train）：{os.path.basename(os.path.join(EVAL_DIR, 'single_factor_ic_train.csv'))}\n")
    f.write(f"- 分组收益（含方向）：{os.path.basename(qr_path)}\n")
    f.write(f"- 相关矩阵：{os.path.basename(corr_path)}\n")
    f.write(f"- 表达式表：{os.path.basename(expr_csv)}\n")
    f.write(f"- 入库清单：{os.path.basename(registry_path)}\n")
print(" 摘要报告已生成：", md_path)
print(f" 用时：{time.time()-t0:.2f}s")

print("\n 全流程完成。")



使用剪枝后的因子（KEEP）作为特征，构建 LightGBM 三分类模型，对上涨/下跌/不变进行回测评估。流程包括标签构建、模型训练、验证集评估、特征重要性分析等。



In [None]:
# -----------------------------
# 步骤 7/6：LightGBM 回测（三分类）- 修复版
# -----------------------------
banner("步骤 7/6：LightGBM 回测（三分类）- 修复版")
t0 = time.time()

# 设置 alpha 阈值用于构建三分类标签
ALPHA_THRESHOLD = 5e-5

# 1. 准备特征（使用保留的因子）
print(f"使用 {len(KEEP)} 个保留因子作为特征")

# 创建训练特征矩阵，只使用保留的因子
X_train_filtered = df_gp_tr[KEEP].copy()
X_val_filtered = df_gp_val[KEEP].copy()

# 2. 准备标签（三分类：上涨、下跌、不变）
def create_labels(returns, alpha=ALPHA_THRESHOLD):
    """根据阈值创建三分类标签：-1（下跌）、0（不变）、1（上涨）"""
    labels = np.zeros_like(returns)
    labels[returns > alpha] = 1    # 上涨
    labels[returns < -alpha] = -1  # 下跌
    return labels

# 创建训练和验证标签
y_train = create_labels(y_ic_tr)
y_val = create_labels(y_ic_val)

# 统计标签分布
train_label_dist = pd.Series(y_train).value_counts(normalize=True)
val_label_dist = pd.Series(y_val).value_counts(normalize=True)
print("训练集标签分布:")
print(train_label_dist)
print("\n验证集标签分布:")
print(val_label_dist)

# 3. 训练 LightGBM 模型
print("\n开始训练 LightGBM 模型...")

# 将标签转换为非负整数（LightGBM 要求）
# 映射: -1→0(下跌), 0→1(不变), 1→2(上涨)
y_train_lgb = y_train + 1
y_val_lgb = y_val + 1

# 转换为 LightGBM 数据集格式
train_data = lgb.Dataset(X_train_filtered, label=y_train_lgb)
val_data = lgb.Dataset(X_val_filtered, label=y_val_lgb, reference=train_data)

# 设置参数（三分类）
params = {
    'objective': 'multiclass',
    'num_class': 3,  # 对应类别: 0(下跌), 1(不变), 2(上涨)
    'metric': 'multi_logloss',
    'boosting_type': 'gbdt',
    'num_leaves': 31,
    'learning_rate': 0.05,
    'feature_fraction': 0.9,
    'bagging_fraction': 0.8,
    'bagging_freq': 5,
    'verbose': -1
}

# 修复训练部分，使用兼容的 callbacks 参数
try:
    # 较新版本的 LightGBM
    callbacks = [
        lgb.early_stopping(20, verbose=True),
        lgb.log_evaluation(20)
    ]
    model = lgb.train(
        params,
        train_data,
        num_boost_round=100,
        valid_sets=[train_data, val_data],
        valid_names=['train', 'valid'],
        callbacks=callbacks
    )
except (TypeError, AttributeError):
    # 旧版本的 LightGBM
    model = lgb.train(
        params,
        train_data,
        num_boost_round=100,
        valid_sets=[train_data, val_data],
        valid_names=['train', 'valid'],
        verbose_eval=20
    )

# 4. 评估模型（验证集）
print("\n在验证集上评估模型...")

# 预测
probs = model.predict(X_val_filtered, num_iteration=model.best_iteration)
y_pred_lgb = np.argmax(probs, axis=1)
y_pred = y_pred_lgb - 1  # 将 [0,1,2] 映射回 [-1,0,1]

# 计算准确率
accuracy = accuracy_score(y_val, y_pred)
print(f"验证集准确率: {accuracy:.4f}")

# 自定义分类报告函数（避免使用有问题的 sklearn.metrics.classification_report）
def custom_classification_report(y_true, y_pred, target_names=None):
    """创建类似 sklearn 的分类报告，但不依赖其实现"""
    from collections import Counter
    import numpy as np

    # 获取唯一标签
    labels = sorted(set(np.concatenate([np.unique(y_true), np.unique(y_pred)])))
    if target_names is None:
        target_names = [str(l) for l in labels]

    # 计算每个类别的指标
    report_dict = {}
    for i, label in enumerate(labels):
        # 真阳性、假阳性、假阴性
        tp = np.sum((y_true == label) & (y_pred == label))
        fp = np.sum((y_true != label) & (y_pred == label))
        fn = np.sum((y_true == label) & (y_pred != label))
        support = np.sum(y_true == label)

        # 计算精确率、召回率、F1分数
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

        report_dict[i] = {
            'precision': precision,
            'recall': recall,
            'f1-score': f1,
            'support': support
        }

    # 计算加权平均
    total = len(y_true)
    weighted_avg = {
        'precision': sum(report_dict[i]['precision'] * report_dict[i]['support'] for i in range(len(labels))) / total,
        'recall': sum(report_dict[i]['recall'] * report_dict[i]['support'] for i in range(len(labels))) / total,
        'f1-score': sum(report_dict[i]['f1-score'] * report_dict[i]['support'] for i in range(len(labels))) / total,
        'support': total
    }
    report_dict['weighted avg'] = weighted_avg

    # 格式化输出
    result = "              precision    recall  f1-score   support\n\n"
    for i, name in enumerate(target_names):
        metrics = report_dict[i]
        result += f"{name:14} {metrics['precision']:.2f}      {metrics['recall']:.2f}      {metrics['f1-score']:.2f}     {metrics['support']}\n"

    result += "\n"
    metrics = report_dict['weighted avg']
    result += f"{'weighted avg':14} {metrics['precision']:.2f}      {metrics['recall']:.2f}      {metrics['f1-score']:.2f}     {metrics['support']}\n"

    return result

# 使用自定义函数打印分类报告
print("\n分类报告:")
print(custom_classification_report(y_val, y_pred, target_names=['下跌', '不变', '上涨']))

# 混淆矩阵
cm = confusion_matrix(y_val, y_pred)
print("\n混淆矩阵:")
print(cm)

# 5. 特征重要性分析
importance = model.feature_importance(importance_type='gain')
feature_importance = pd.DataFrame({
    'Feature': KEEP,
    'Importance': importance
}).sort_values(by='Importance', ascending=False)

print("\n特征重要性 Top 10:")
print(feature_importance.head(10))

# 保存特征重要性
importance_path = os.path.join(EVAL_DIR, "feature_importance_lightgbm.csv")
feature_importance.to_csv(importance_path, index=False)
print(f"特征重要性已保存到: {importance_path}")
print(f"LightGBM回测完成，用时: {time.time()-t0:.2f}s")