In [1]:
import polars as pl
import statsmodels.api as sm
from pathlib import Path
import numpy as np
from sqlalchemy import create_engine
import numba # 引入 Numba JIT 编译器

# =================================================================== #
#                           【1. 数据加载配置】                         #
# =================================================================== #
# --- 数据库连接配置 ---
username = "panjinhe"
password = "20020112p"
host = "localhost"
port = "5432"
database = "pbroe"
table_name = 'pbroech6'
schema_name = 'pbroe'

# --- 日期范围 ---
start_date = '2005-04-01'
end_date = '2025-03-31'

# --- 输出文件配置 ---
OUTPUT_DIR = Path(r'E:\PBROE\ch7pl')
OUTPUT_FILENAME = 'pbroe7.1_residuals_and_quantiles_pure_polars.csv'
OUTPUT_DIR.mkdir(parents=True, exist_ok=True) # 确保输出目录存在


# =================================================================== #
#                       【2. 数据加载 (Polars)】                        #
# =================================================================== #
dfpbroech7 = None
try:
    print("--- 步骤 1: 从PostgreSQL数据库加载数据 ---")
    connection_string = f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}"
    engine = create_engine(connection_string)

    # 在数据库层面直接过滤，效率更高
    sql_query = f"""
    SELECT * FROM {schema_name}.{table_name}
    WHERE "trdmnt" >= '{start_date}' AND "trdmnt" <= '{end_date}'
    """

    # 使用 pl.read_database 执行查询
    dfpbroech7 = pl.read_database(query=sql_query, connection=engine)

    print(f"数据加载成功！共加载 {dfpbroech7.height} 条记录。")
    print("数据概览 (describe):")
    print(dfpbroech7.describe())

except Exception as e:
    print(f"数据加载失败: {e}")
finally:
    if 'engine' in locals() and engine:
        engine.dispose()

# =================================================================== #
#          【3. Numba JIT 加速的滚动分位数计算辅助函数】          #
# =================================================================== #

@numba.jit(nopython=True)
def rolling_rank_get_last_quantile(arr: np.ndarray) -> float:
    """
    一个被 Numba JIT 编译的函数，用于高速计算滚动窗口中最后一个值的百分位排名。
    它接收一个NumPy数组，返回一个浮点数。
    """
    if len(arr) == 0:
        return np.nan

    # 获取窗口中的最后一个值
    last_val = arr[-1]

    # 计算排名（采用 'average' 方法的逻辑）
    less_count = 0
    equal_count = 0
    for x in arr:
        if x < last_val:
            less_count += 1
        elif x == last_val:
            equal_count += 1

    # rank = (小于last_val的数量) + (等于last_val的数量 + 1) / 2
    rank = less_count + (equal_count + 1) / 2.0

    # 返回百分位排名
    return rank / len(arr)


# =================================================================== #
#                  【4. 基于Polars的数据处理主函数】                  #
# =================================================================== #

def process_data_with_polars(df_pl: pl.DataFrame):
    """
    使用纯Polars对输入数据进行完整的处理流程。
    """
    if not isinstance(df_pl, pl.DataFrame) or df_pl.is_empty():
        print("错误: 输入的不是一个有效的Polars DataFrame或DataFrame为空。")
        return

    print("\n--- 步骤 2: 数据清洗与特征工程 (Polars) ---")
    cleaned_df = (
        df_pl
        .filter(pl.col('if_st') != 1)
        .filter(
            (pl.col('PB') > 0) &
            (pl.col('roe_ttm') <= 0.5) & (pl.col('roe_ttm') >= -0.1) &
            (pl.col('roic') <= 0.5) & (pl.col('roic') >= -0.1)
        )
        .drop_nulls(subset=['roe_ttm', 'roic', 'PB', 'indnme1'])
        .with_columns([
            pl.col('PB').log().alias('lnPB'),
            (pl.col('roe_ttm') - pl.col('roic')).alias('leverage_spread')
        ])
    )
    print(f"数据清洗后，剩余 {cleaned_df.height} 条有效记录用于回归分析。")


    print("\n--- 步骤 3: 执行月度行业回归与残差计算 (全向量化) ---")
    # 1. 定义回归变量
    y_col = 'lnPB'
    x1_col = 'roic'
    x2_col = 'leverage_spread'
    group_cols = ['trdmnt', 'indnme1']

    # 2. 在每个分组内，计算OLS回归所需的所有聚合统计量
    stats_df = cleaned_df.group_by(group_cols).agg(
        pl.mean(y_col).alias(f'mean_{y_col}'),
        pl.mean(x1_col).alias(f'mean_{x1_col}'),
        pl.mean(x2_col).alias(f'mean_{x2_col}'),
        pl.var(x1_col).alias(f'var_{x1_col}'),
        pl.var(x2_col).alias(f'var_{x2_col}'),
        pl.cov(x1_col, y_col).alias(f'cov_{x1_col}_{y_col}'),
        pl.cov(x2_col, y_col).alias(f'cov_{x2_col}_{y_col}'),
        pl.cov(x1_col, x2_col).alias(f'cov_{x1_col}_{x2_col}'),
        pl.len().alias('n') # 【代码修正】使用 pl.len() 替代已弃用的 pl.count()
    ).filter(pl.col('n') >= 15)

    # 3. 根据OLS公式，利用聚合统计量计算回归系数
    denominator = (pl.col(f'var_{x1_col}') * pl.col(f'var_{x2_col}')) - (pl.col(f'cov_{x1_col}_{x2_col}') ** 2)
    safe_denominator = pl.when(denominator.abs() < 1e-9).then(1e-9).otherwise(denominator)
    stats_df = stats_df.with_columns(
        b1=((pl.col(f'var_{x2_col}') * pl.col(f'cov_{x1_col}_{y_col}')) - (pl.col(f'cov_{x1_col}_{x2_col}') * pl.col(f'cov_{x2_col}_{y_col}'))) / safe_denominator,
        b2=((pl.col(f'var_{x1_col}') * pl.col(f'cov_{x2_col}_{y_col}')) - (pl.col(f'cov_{x1_col}_{x2_col}') * pl.col(f'cov_{x1_col}_{y_col}'))) / safe_denominator
    ).with_columns(
        b0=pl.col(f'mean_{y_col}') - (pl.col('b1') * pl.col(f'mean_{x1_col}')) - (pl.col('b2') * pl.col(f'mean_{x2_col}'))
    )

    # 4. 将计算出的系数连接回原始数据集
    regression_df = cleaned_df.join(stats_df.select(group_cols + ['b0', 'b1', 'b2']), on=group_cols, how='inner')

    # 5. 计算预测值和残差
    regression_df = regression_df.with_columns(
        predicted_y = pl.col('b0') + (pl.col('b1') * pl.col(x1_col)) + (pl.col('b2') * pl.col(x2_col))
    ).with_columns(
        residual = pl.col(y_col) - pl.col('predicted_y')
    )

    # 6. 计算残差的Z-score标准化值
    regression_results_df = regression_df.with_columns(
        residual_zscore=(
            (pl.col('residual') - pl.col('residual').mean().over(group_cols)) /
            pl.col('residual').std(ddof=0).over(group_cols)
        ).fill_nan(0.0)
    )
    print(f"全向量化回归与残差计算完成，共得到 {regression_results_df.height} 条有效记录。")


    print("\n--- 步骤 4: 计算时序残差分位数 (Numba 加速) ---")
    if regression_results_df.is_empty():
        print("没有有效的残差数据，跳过时序分位数计算。")
        return

    periods = [10]
    df_sorted = regression_results_df.sort('stkcd', 'trdmnt')

    quantile_exprs = []
    for period in periods:
        # =================================================================== #
        #              【代码优化 - 使用 Numba 加速 rolling_map】             #
        # =================================================================== #
        expr = (
            pl.col('residual_zscore')
              .rolling_map(
                  function=lambda s: rolling_rank_get_last_quantile(s.to_numpy()), # 调用高速Numba函数
                  window_size=period,
                  min_samples=1
              )
              .over('stkcd')
              .alias(f'residual_quantile_{period}m')
        )
        quantile_exprs.append(expr)

    df_with_quantiles = df_sorted.with_columns(quantile_exprs)
    print(f"已为 {periods} 周期计算时序残差分位数。")


    print("\n--- 步骤 5: 格式化并保存最终结果 ---")
    final_data = df_with_quantiles.with_columns(
        (pl.col('trdmnt').str.to_date('%Y-%m', strict=False)
         .dt.offset_by('1mo')
         .dt.strftime('%Y-%m-%d'))
        .alias('调入日期')
    )

    output_columns = [
        '调入日期', 'stkcd', 'shortname', 'indnme1',
        'roe_ttm', 'roic', 'PB', 'residual_zscore',
        'residual_quantile_10m'
    ]
    final_output_df = final_data.select([col for col in output_columns if col in final_data.columns])
    output_path = OUTPUT_DIR / OUTPUT_FILENAME
    final_output_df.write_csv(output_path, float_precision=6)
    print(f"\n策略数据已成功生成并保存为 '{output_path}'。")


# =================================================================== #
#                           【5. 执行入口】                           #
# =================================================================== #

if __name__ == '__main__' and dfpbroech7 is not None:
    process_data_with_polars(dfpbroech7)
elif dfpbroech7 is None:
    print("\n数据未能成功加载，处理流程终止。")


--- 步骤 1: 从PostgreSQL数据库加载数据 ---
数据加载成功！共加载 664336 条记录。
数据概览 (describe):
shape: (9, 13)
┌────────────┬────────┬─────────┬────────────┬───┬────────────┬────────────┬───────────┬───────────┐
│ statistic  ┆ stkcd  ┆ trdmnt  ┆ accper     ┆ … ┆ market_cap ┆ PB         ┆ roe_ttm   ┆ roic      │
│ ---        ┆ ---    ┆ ---     ┆ ---        ┆   ┆ ---        ┆ ---        ┆ ---       ┆ ---       │
│ str        ┆ str    ┆ str     ┆ str        ┆   ┆ f64        ┆ f64        ┆ f64       ┆ f64       │
╞════════════╪════════╪═════════╪════════════╪═══╪════════════╪════════════╪═══════════╪═══════════╡
│ count      ┆ 664336 ┆ 664336  ┆ 664336     ┆ … ┆ 664336.0   ┆ 662266.0   ┆ 635519.0  ┆ 653819.0  │
│ null_count ┆ 0      ┆ 0       ┆ 0          ┆ … ┆ 0.0        ┆ 2070.0     ┆ 28817.0   ┆ 10517.0   │
│ mean       ┆ null   ┆ null    ┆ 2017-06-06 ┆ … ┆ 1.6867e10  ┆ 4.107753   ┆ 0.056615  ┆ 0.031215  │
│            ┆        ┆         ┆ 12:03:53.3 ┆   ┆            ┆            ┆           ┆           │
│  

In [11]:
import polars as pl
from sqlalchemy import create_engine
import numpy as np
from pathlib import Path
import time
import statsmodels.api as sm

# =================================================================== #
#                       【1. 全局配置】                               #
# =================================================================== #

# --- 数据库连接配置 ---
DB_USERNAME = "panjinhe"
DB_PASSWORD = "20020112p"
DB_HOST = "localhost"
DB_PORT = "5432"
DB_DATABASE = "pbroe"
DB_TABLE_NAME = 'pbroech4' # 数据源明确为 pbroech4
DB_SCHEMA_NAME = 'pbroe'

# --- 日期范围 ---
START_DATE = '2005-04-01'
END_DATE = '2025-03-31'

# --- 文件路径配置 ---
# 目标文件：我们将读取此文件，并向其中添加新列后，再覆盖保存
TARGET_FACTOR_FILE = Path("E:/PBROE/ch7pl/pbroe7.1_residuals_and_quantiles_pure_polars.csv")


# =================================================================== #
#                  【2. 数据加载与准备】                              #
# =================================================================== #

def load_base_data() -> pl.DataFrame:
    """从数据库加载计算所需的基础数据。"""
    print("--- 步骤 1: 从PostgreSQL数据库加载 pbroech4 数据 ---")
    engine = None
    try:
        connection_string = f"postgresql+psycopg2://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_DATABASE}"
        engine = create_engine(connection_string)

        sql_query = f"""
        SELECT * FROM {DB_SCHEMA_NAME}.{DB_TABLE_NAME}
        WHERE "trdmnt" >= '{START_DATE}' AND "trdmnt" <= '{END_DATE}'
        """

        df_filtered = pl.read_database(query=sql_query, connection=engine)

        print(f"数据加载成功！共加载 {df_filtered.height} 条记录。")
        return df_filtered

    except Exception as e:
        print(f"数据加载失败: {e}")
        return None
    finally:
        if engine:
            engine.dispose()

# =================================================================== #
#                  【3. 核心计算模块】                                #
# =================================================================== #

def run_price_model_on_group(group_tuple) -> pl.DataFrame:
    """
    (辅助函数) 对给定的分组数据(DataFrame)，执行价格模型回归计算 b_adj_ps。
    """
    _ , group_df = group_tuple # group_tuple is (group_keys, group_dataframe)

    if group_df.height < 10:
        return None

    independent_vars = ['onaps', 'arps', 'invps', 'iaps', 'eps_ttm_core']
    dependent_var = 'price'

    regression_sample = group_df.select([dependent_var] + independent_vars).drop_nulls()

    if regression_sample.height < 10:
        return None

    Y = regression_sample.get_column(dependent_var).to_numpy()
    X = regression_sample.select(independent_vars).to_numpy()
    X = sm.add_constant(X, has_constant='add')

    try:
        model = sm.OLS(Y, X).fit()
        params = model.params

        beta_onaps = params[1]
        beta_arps = params[2]
        beta_invps = params[3]
        beta_iaps = params[4]

        if beta_onaps > 0:
            vc_arps = beta_arps / beta_onaps
            vc_invps = beta_invps / beta_onaps
            vc_iaps = beta_iaps / beta_onaps

            b_adj_ps_series = (
                group_df.get_column('onaps') +
                vc_arps * group_df.get_column('arps') +
                vc_invps * group_df.get_column('invps') +
                vc_iaps * group_df.get_column('iaps')
            )
        else:
            b_adj_ps_series = pl.lit(None, dtype=pl.Float64)

        return group_df.with_columns(b_adj_ps_series.alias('b_adj_ps'))

    except Exception:
        return group_df.with_columns(pl.lit(None, dtype=pl.Float64).alias('b_adj_ps'))


def calculate_pbadj(df: pl.DataFrame) -> pl.DataFrame:
    """
    在一个流程中，计算 b_adj_ps 和 PBadj。
    """
    # --- 3.1 数据预处理 ---
    print("\n--- 步骤 2: 数据预处理 ---")
    df_clean = df.drop_nulls(subset=['eps_ttm_core'])

    # --- 3.2 计算 b_adj_ps ---
    print("\n--- 步骤 3: 计算质量调整后净资产 (b_adj_ps) ---")

    # 为彻底规避 map_groups 的顽固错误，我们采用手动迭代分组的方式。
    group_cols = ['trdmnt', 'indcd1']

    # 1. 使用 group_by 得到一个迭代器
    grouped = df_clean.group_by(group_cols)

    # 2. 使用列表推导式，对每个分组应用回归函数
    results_list = [run_price_model_on_group(group) for group in grouped]

    # 3. 过滤掉因数据不足而返回 None 的结果，并合并
    df_with_badj = pl.concat([res for res in results_list if res is not None])

    df_with_badj = df_with_badj.drop_nulls(subset=['b_adj_ps'])
    print("b_adj_ps 计算完成。")

    # --- 3.3 计算 PBadj ---
    print("\n--- 步骤 4: 计算质量调整后市净率 (PBadj) ---")
    df_with_pbadj = df_with_badj.with_columns(
        (pl.col('price') / pl.col('b_adj_ps')).alias('PBadj')
    ).filter(
        pl.col('PBadj').is_finite() & (pl.col('PBadj') > 0)
    )
    print("PBadj 计算完成。")

    return df_with_pbadj


# =================================================================== #
#                         【4. 合并与保存】                           #
# =================================================================== #

def main():
    """主执行函数"""
    start_time = time.time()

    # 1. 加载基础数据
    base_data = load_base_data()
    if base_data is None:
        return

    # 2. 计算 PBadj 因子
    df_new_factors = calculate_pbadj(base_data)

    # 3. 准备用于合并的新因子数据
    # =================================================================== #
    #                【代码修正 - 转换字符串为日期】                #
    # =================================================================== #
    # 在使用 .dt 之前，必须先将 'trdmnt' 字符串列按 'YYYY-MM' 格式转换为日期类型。
    df_to_merge = df_new_factors.with_columns(
        pl.col('trdmnt').str.to_date(format='%Y-%m', strict=False).dt.offset_by('1mo').alias('调入日期')
    ).select(
        pl.col('stkcd').cast(pl.Utf8).str.zfill(6),
        '调入日期',
        'PBadj'
    )

    # 4. 加载原始因子文件
    print(f"\n--- 步骤 5: 加载原始因子文件 '{TARGET_FACTOR_FILE}' ---")
    try:
        df_original_factors = pl.read_csv(TARGET_FACTOR_FILE).with_columns(
            pl.col('调入日期').str.to_date(format='%Y-%m-%d'),
            pl.col('stkcd').cast(pl.Utf8).str.zfill(6)
        )
    except Exception as e:
        print(f"错误: 无法加载原始因子文件: {e}")
        return

    # 5. 执行左连接合并
    print("\n--- 步骤 6: 合并新计算的因子 ---")
    if 'PBadj' in df_original_factors.columns:
        df_original_factors = df_original_factors.drop('PBadj')

    df_merged = df_original_factors.join(
        df_to_merge,
        on=['stkcd', '调入日期'],
        how='left'
    )

    # 6. 保存覆盖原文件
    print(f"\n--- 步骤 7: 保存合并后的文件至 '{TARGET_FACTOR_FILE}' ---")
    try:
        df_merged.write_csv(TARGET_FACTOR_FILE)
        print("文件保存成功！")
    except Exception as e:
        print(f"错误: 文件保存失败: {e}")

    end_time = time.time()
    print(f"\n--- 所有任务完成！总耗时: {end_time - start_time:.2f} 秒 ---")


if __name__ == "__main__":
    main()


--- 步骤 1: 从PostgreSQL数据库加载 pbroech4 数据 ---
数据加载成功！共加载 665293 条记录。

--- 步骤 2: 数据预处理 ---

--- 步骤 3: 计算质量调整后净资产 (b_adj_ps) ---
b_adj_ps 计算完成。

--- 步骤 4: 计算质量调整后市净率 (PBadj) ---
PBadj 计算完成。

--- 步骤 5: 加载原始因子文件 'E:\PBROE\ch7pl\pbroe7.1_residuals_and_quantiles_pure_polars.csv' ---

--- 步骤 6: 合并新计算的因子 ---

--- 步骤 7: 保存合并后的文件至 'E:\PBROE\ch7pl\pbroe7.1_residuals_and_quantiles_pure_polars.csv' ---
文件保存成功！

--- 所有任务完成！总耗时: 71.58 秒 ---


In [14]:
# 计算残差adj
# =================================================================== #
#                       【1. 全局配置】                               #
# =================================================================== #

# --- 文件路径配置 ---
# 输入和输出都是同一个文件
FACTOR_FILE = Path("E:/PBROE/ch7pl/pbroe7.1_residuals_and_quantiles_pure_polars.csv")


# =================================================================== #
#                  【2. 核心计算模块】                                #
# =================================================================== #

def calculate_residual_adj(df: pl.DataFrame) -> pl.DataFrame:
    """
    使用 PBadj 运行回归，计算并返回 residual_adj 因子。
    """
    # --- 2.1 数据预处理与特征工程 ---
    print("\n--- 步骤 1: 数据预处理与特征工程 ---")

    # 确保所有必需列都存在且无缺失值
    required_cols = ['PBadj', 'roic', 'roe_ttm', 'trdmnt', 'indnme1']
    df_clean = df.drop_nulls(subset=required_cols).filter(
        pl.col('PBadj') > 0 # PBadj 必须为正才能取对数
    )

    df_clean = df_clean.with_columns(
        pl.col('PBadj').log().alias('lnPBadj'),
        (pl.col('roe_ttm') - pl.col('roic')).alias('leverage_spread')
    )
    print("数据预处理完成。")

    # --- 2.2 全向量化回归 ---
    print("\n--- 步骤 2: 执行全向量化回归 ---")
    y_col = 'lnPBadj'
    x1_col = 'roic'
    x2_col = 'leverage_spread'
    group_cols = ['trdmnt', 'indnme1']

    # 1. 一次性计算所有回归所需的统计量
    stats_df = df_clean.group_by(group_cols).agg(
        pl.mean(y_col).alias(f'mean_{y_col}'),
        pl.mean(x1_col).alias(f'mean_{x1_col}'),
        pl.mean(x2_col).alias(f'mean_{x2_col}'),
        pl.var(x1_col).alias(f'var_{x1_col}'),
        pl.var(x2_col).alias(f'var_{x2_col}'),
        pl.cov(x1_col, y_col).alias(f'cov_{x1_col}_{y_col}'),
        pl.cov(x2_col, y_col).alias(f'cov_{x2_col}_{y_col}'),
        pl.cov(x1_col, x2_col).alias(f'cov_{x1_col}_{x2_col}'),
        pl.len().alias('n')
    ).filter(pl.col('n') >= 15)

    # 2. 根据OLS公式计算回归系数
    denominator = (pl.col(f'var_{x1_col}') * pl.col(f'var_{x2_col}')) - (pl.col(f'cov_{x1_col}_{x2_col}') ** 2)
    safe_denominator = pl.when(denominator.abs() < 1e-9).then(1e-9).otherwise(denominator)

    stats_df = stats_df.with_columns(
        b1=((pl.col(f'var_{x2_col}') * pl.col(f'cov_{x1_col}_{y_col}')) - (pl.col(f'cov_{x1_col}_{x2_col}') * pl.col(f'cov_{x2_col}_{y_col}'))) / safe_denominator,
        b2=((pl.col(f'var_{x1_col}') * pl.col(f'cov_{x2_col}_{y_col}')) - (pl.col(f'cov_{x1_col}_{x2_col}') * pl.col(f'cov_{x1_col}_{y_col}'))) / safe_denominator
    ).with_columns(
        b0=pl.col(f'mean_{y_col}') - (pl.col('b1') * pl.col(f'mean_{x1_col}')) - (pl.col('b2') * pl.col(f'mean_{x2_col}'))
    )

    # 3. 将系数连接回数据集
    df_regged = df_clean.join(stats_df.select(group_cols + ['b0', 'b1', 'b2']), on=group_cols, how='inner')

    # 4. 计算预测值和残差
    df_regged = df_regged.with_columns(
        predicted_y = pl.col('b0') + (pl.col('b1') * pl.col(x1_col)) + (pl.col('b2') * pl.col(x2_col))
    ).with_columns(
        residual = pl.col(y_col) - pl.col('predicted_y')
    )

    # 5. 计算标准化残差 (residual_adj)
    df_final = df_regged.with_columns(
        residual_adj=(
            (pl.col('residual') - pl.col('residual').mean().over(group_cols)) /
            pl.col('residual').std(ddof=0).over(group_cols)
        ).fill_nan(0.0)
    )
    print("residual_adj 计算完成。")

    return df_final


# =================================================================== #
#                         【3. 主函数】                               #
# =================================================================== #

def main():
    """主执行函数"""
    start_time = time.time()

    # 1. 加载已包含 PBadj 的因子文件
    print(f"--- 步骤 0: 加载因子文件 '{FACTOR_FILE}' ---")
    try:
        # =================================================================== #
        #                【代码修正 - 统一日期类型】                #
        # =================================================================== #
        # 在加载时，立即将 '调入日期' 从字符串转换为日期类型，以避免后续的类型不匹配错误。
        df_original_factors = pl.read_csv(FACTOR_FILE).with_columns(
            pl.col('调入日期').str.to_date(format='%Y-%m-%d'), # 直接转换为日期
            pl.col('stkcd').cast(pl.Utf8).str.zfill(6)
        )
    except Exception as e:
        print(f"错误: 无法加载原始因子文件: {e}")
        return

    # 准备用于计算的数据框，添加 trdmnt 列
    df_for_calculation = df_original_factors.with_columns(
        pl.col('调入日期').dt.offset_by('-1mo').alias('trdmnt')
    )

    # 2. 计算 residual_adj
    df_new_factors = calculate_residual_adj(df_for_calculation)

    # 3. 准备用于合并的新因子数据
    df_to_merge = df_new_factors.select(
        pl.col('stkcd'),
        pl.col('调入日期'), # 此列现在是日期类型
        'residual_adj'
    )

    # 4. 执行左连接合并
    print("\n--- 步骤 3: 合并新计算的 residual_adj 因子 ---")

    # 如果 residual_adj 列已存在，先删除
    if 'residual_adj' in df_original_factors.columns:
        df_original_factors = df_original_factors.drop('residual_adj')

    # 此时，两边的 '调入日期' 列都是日期类型，可以成功合并
    df_merged = df_original_factors.join(
        df_to_merge,
        on=['stkcd', '调入日期'],
        how='left'
    )

    # 5. 保存最终结果
    print(f"\n--- 步骤 4: 保存合并后的文件至 '{FACTOR_FILE}' ---")
    try:
        df_merged.write_csv(FACTOR_FILE)
        print("文件保存成功！")
    except Exception as e:
        print(f"错误: 文件保存失败: {e}")

    end_time = time.time()
    print(f"\n--- 所有任务完成！总耗时: {end_time - start_time:.2f} 秒 ---")


if __name__ == "__main__":
    main()


--- 步骤 0: 加载因子文件 'E:\PBROE\ch7pl\pbroe7.1_residuals_and_quantiles_pure_polars.csv' ---

--- 步骤 1: 数据预处理与特征工程 ---
数据预处理完成。

--- 步骤 2: 执行全向量化回归 ---
residual_adj 计算完成。

--- 步骤 3: 合并新计算的 residual_adj 因子 ---

--- 步骤 4: 保存合并后的文件至 'E:\PBROE\ch7pl\pbroe7.1_residuals_and_quantiles_pure_polars.csv' ---
文件保存成功！

--- 所有任务完成！总耗时: 0.88 秒 ---


In [34]:
# 新ROEttm计算
import polars as pl
import os

# --- 配置 ---
# 定义输入和输出文件的路径
file_path = r'E:\PBROE\ch7pl\pbroe7.1_residuals_and_quantiles_pure_polars.csv'
# 定义要计算的窗口期 (以年为单位)
windows = {'roe_vol_2y': 2} #'roe_vol_3y': 3, 'roe_vol_5y': 5

# --- 数据处理 ---
try:
    # 检查文件是否存在
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"错误：文件不存在于路径 {file_path}")

    print("--- 开始基于年度数据计算ROE波动率 (Polars实现) ---")

    # 1. 将CSV文件加载到Polars DataFrame中
    df = pl.read_csv(file_path, schema_overrides={'stkcd': pl.Utf8})

    print("\n--- 步骤 1: 数据预处理 ---")
    # 创建临时的时间列，并筛选非空roe_ttm
    df_processed = df.with_columns(
        pl.col("调入日期").str.to_date(format="%Y-%m-%d").alias("date")
    ).with_columns(
        pl.col("date").dt.year().alias("year"),
        pl.col("date").dt.month().alias("month")
    ).drop_nulls(subset=["roe_ttm"])
    print("数据预处理完成。")

    print("\n--- 步骤 2: 提取年度 ROE 数据 (每年5月) ---")
    # 筛选出每年5月份的数据，并按股票和年份分组取最后一条，确保唯一性
    annual_roe = df_processed.filter(
        pl.col("month") == 5
    ).group_by(["stkcd", "year"]).agg(
        pl.all().last()
    ).sort("stkcd", "year")
    print(f"已提取 {len(annual_roe)} 条唯一的年度ROE记录用于计算。")

    print("\n--- 步骤 3: 计算年度滚动波动率 ---")
    # 对每个stkcd分组，在年度数据上进行滚动计算
    vol_expressions = []
    for col_name, window_size in windows.items():
        # min_samples=window_size 确保窗口内有足够的数据点
        vol_expressions.append(
            pl.col("roe_ttm")
            .rolling_std(window_size=window_size, min_samples=window_size)
            .over("stkcd")
            .alias(col_name)
        )
        print(f"已配置 {window_size} 年滚动波动率的计算。")

    # 在年度数据上计算出波动率
    annual_vol = annual_roe.with_columns(vol_expressions)
    print("年度波动率计算完成。")

    print("\n--- 步骤 4: 将年度波动率合并回原始月度数据 ---")
    # 选择需要合并的列 (键 + 新计算的波动率)
    columns_to_merge = ["stkcd", "year"] + list(windows.keys())
    vol_to_merge_df = annual_vol.select(columns_to_merge)

    # 从原始df中删除可能已存在的旧波动率列，以避免合并冲突
    # 注意：这里我们使用原始的df，而不是df_processed
    existing_vol_cols = [col for col in windows.keys() if col in df.columns]
    if existing_vol_cols:
        df = df.drop(existing_vol_cols)
        print(f"已从原始数据中移除旧的波动率列: {existing_vol_cols}")

    # 将年度波动率数据合并到原始的月度DataFrame中
    # 需要先给原始df也加上'year'列用于连接
    final_df = df.with_columns(
        pl.col("调入日期").str.to_date(format="%Y-%m-%d").dt.year().alias("year")
    ).join(
        vol_to_merge_df, on=["stkcd", "year"], how="left"
    ).drop("year") # 合并后移除临时的'year'列
    print("合并完成。")

    print(f"\n--- 步骤 5: 保存结果到文件 ---")
    # 将更新后的DataFrame写回到原始CSV文件中，覆盖原有内容
    final_df.write_csv(file_path)
    print(f"结果已成功保存到: {file_path}")

    print("\n最终数据预览 (包含波动率的非空行):")
    # 显示包含计算结果的尾部数据作为示例
    preview_df = final_df.drop_nulls(subset=list(windows.keys()))
    print(preview_df.select(['stkcd', '调入日期', 'roe_ttm'] + list(windows.keys())).tail())

except FileNotFoundError as e:
    print(e)
except Exception as e:
    print(f"处理过程中发生未知错误: {e}")


--- 开始基于年度数据计算ROE波动率 (Polars实现) ---

--- 步骤 1: 数据预处理 ---
数据预处理完成。

--- 步骤 2: 提取年度 ROE 数据 (每年5月) ---
已提取 40704 条唯一的年度ROE记录用于计算。

--- 步骤 3: 计算年度滚动波动率 ---
已配置 2 年滚动波动率的计算。
年度波动率计算完成。

--- 步骤 4: 将年度波动率合并回原始月度数据 ---
合并完成。

--- 步骤 5: 保存结果到文件 ---
结果已成功保存到: E:\PBROE\ch7pl\pbroe7.1_residuals_and_quantiles_pure_polars.csv

最终数据预览 (包含波动率的非空行):
shape: (5, 4)
┌────────┬────────────┬──────────┬────────────┐
│ stkcd  ┆ 调入日期   ┆ roe_ttm  ┆ roe_vol_2y │
│ ---    ┆ ---        ┆ ---      ┆ ---        │
│ str    ┆ str        ┆ f64      ┆ f64        │
╞════════╪════════════╪══════════╪════════════╡
│ 689009 ┆ 2024-08-01 ┆ 0.13665  ┆ 0.004849   │
│ 689009 ┆ 2024-09-01 ┆ 0.13665  ┆ 0.004849   │
│ 689009 ┆ 2024-10-01 ┆ 0.176322 ┆ 0.004849   │
│ 689009 ┆ 2024-11-01 ┆ 0.176322 ┆ 0.004849   │
│ 689009 ┆ 2024-12-01 ┆ 0.176322 ┆ 0.004849   │
└────────┴────────────┴──────────┴────────────┘


In [31]:
# ROEttm波动率计算
import polars as pl
import os

# --- 配置 ---
# 定义输入和输出文件的路径
file_path = r'E:\PBROE\ch7pl\pbroe7.1_residuals_and_quantiles_pure_polars.csv'
# 定义滚动波动率计算的窗口大小 (2年 = 24个月)
window_size = 24
# 定义计算波动率所需的最少数据点数量 (这里设置为12个月)
min_samples = 12 # 注意: 'min_periods' 已更新为 'min_samples'

# --- 数据处理 ---
try:
    # 检查文件是否存在
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"错误：文件不存在于路径 {file_path}")

    # 1. 将CSV文件加载到Polars DataFrame中
    # 将'stkcd'作为字符串读取，以保留可能存在的前导零
    # 注意: 'dtypes' 已更新为 'schema_overrides'
    df = pl.read_csv(file_path, schema_overrides={'stkcd': pl.Utf8})

    # 2. 数据准备
    # 将日期列从字符串转换为Polars的Date类型
    # 按股票代码和日期对DataFrame进行排序，以确保窗口计算的顺序正确
    df = df.with_columns(
        pl.col("调入日期").str.to_date(format="%Y-%m-%d", strict=True)
    ).sort("stkcd", "调入日期")

    # 3. 计算两年滚动波动率
    # 对每个股票('stkcd')，计算'roe_ttm'的滚动标准差
    # '.over("stkcd")' 表达式是一个窗口函数，它按股票对数据进行分区
    df_with_volatility = df.with_columns(
        pl.col("roe_ttm")
        .rolling_std(window_size=window_size, min_samples=min_samples) # 注意: 'min_periods' 已更新为 'min_samples'
        .over("stkcd")
        .alias("roe_vol_2y") # 为新列命名
    )

    # 4. 保存结果
    # 将更新后的DataFrame写回到原始CSV文件中，覆盖原有内容
    df_with_volatility.write_csv(file_path)

    print(f"成功计算了两年期ROE波动率，并已更新文件：{file_path}")
    print("\n更新后数据的预览：")
    # 打印包含新列的DataFrame的前几行以供查验
    print(df_with_volatility.head())

except FileNotFoundError as e:
    print(e)
except Exception as e:
    print(f"处理过程中发生未知错误: {e}")



成功计算了两年期ROE波动率，并已更新文件：E:\PBROE\ch7pl\pbroe7.1_residuals_and_quantiles_pure_polars.csv

更新后数据的预览：
shape: (5, 12)
┌────────────┬────────┬───────────┬────────────┬───┬────────────┬──────────┬───────────┬───────────┐
│ 调入日期   ┆ stkcd  ┆ shortname ┆ indnme1    ┆ … ┆ residual_q ┆ PBadj    ┆ residual_ ┆ roe_volat │
│ ---        ┆ ---    ┆ ---       ┆ ---        ┆   ┆ uantile_10 ┆ ---      ┆ adj       ┆ ility_2y  │
│ date       ┆ str    ┆ str       ┆ str        ┆   ┆ m          ┆ f64      ┆ ---       ┆ ---       │
│            ┆        ┆           ┆            ┆   ┆ ---        ┆          ┆ f64       ┆ f64       │
│            ┆        ┆           ┆            ┆   ┆ f64        ┆          ┆           ┆           │
╞════════════╪════════╪═══════════╪════════════╪═══╪════════════╪══════════╪═══════════╪═══════════╡
│ 2012-10-01 ┆ 000001 ┆ 深发展A   ┆ 货币金融服 ┆ … ┆ 1.0        ┆ 0.880136 ┆ -0.029223 ┆ null      │
│            ┆        ┆           ┆ 务         ┆   ┆            ┆          ┆           ┆     

In [15]:
# 残差
# pbroe7_backtest_engine_polars.py
# 一个为PB-ROE系列策略设计的、支持分组回测的通用引擎
# 版本：Polars全向量化版 (高性能、高稳定性)

import polars as pl
import numpy as np
from pathlib import Path
import time

# =================================================================== #
#                       【1. 核心回测模块 (Polars)】                    #
# =================================================================== #

def run_grouped_backtest_polars(config):
    """
    (核心函数) 使用 Polars 全向量化方法，对所有分组一次性完成回测。
    """
    print("--- 步骤 1: 加载数据 (Polars) ---")
    try:
        # 加载策略分组数据
        strategy_df = pl.read_csv(config['RESIDUAL_FILE']).with_columns(
            pl.col('调入日期').str.to_date(format='%Y-%m-%d'),
            pl.col('stkcd').cast(pl.Utf8).str.zfill(6)
        )

        # 加载收益率数据
        returns_df = pl.read_csv(config['RETURNS_FILE']).select(
            pl.col('Stkcd').cast(pl.Utf8).str.zfill(6).alias('stkcd'),
            pl.col('Trdmnt').str.to_date(format='%Y-%m').alias('date'),
            pl.col('Mretwd').cast(pl.Float64, strict=False).alias('stock_return')
        )

        # 加载基准数据
        all_benchmarks_df = pl.read_csv(config['BENCHMARK_FILE'])
        benchmark_df = all_benchmarks_df.filter(
            pl.col('Indexcd').cast(pl.Utf8).str.zfill(6) == config['BENCHMARK_CODE']
        ).select(
            pl.col('Month').str.to_date(format='%Y-%m').alias('date'),
            pl.col('Idxrtn').alias('benchmark_return')
        )
        print("所有数据加载成功。")

    except Exception as e:
        print(f"错误: 加载数据时出错: {e}。程序终止。")
        return None, None

    print("\n--- 步骤 2: 构建投资组合并执行回测 (Polars) ---")

    # 1. 创建分组
    num_groups = config['NUM_GROUPS']
    strategy_df = strategy_df.with_columns(
        # 使用 qcut 创建分组，类似 pandas.qcut
        pl.col('residual_zscore').qcut(num_groups, labels=[f"G{i}" for i in range(1, num_groups + 1)])
          .alias('residual_group')
    ).drop_nulls('residual_group').rename({'调入日期': 'date'})

    # 2. 筛选回测周期
    start_date = pl.lit(config['BACKTEST_START_DATE']).str.to_date()
    end_date = pl.lit(config['BACKTEST_END_DATE']).str.to_date()
    strategy_df = strategy_df.filter(pl.col('date').is_between(start_date, end_date))

    # 3. 【核心向量化步骤】将策略数据与收益数据合并
    merged_df = strategy_df.join(returns_df, on=['date', 'stkcd'], how='inner')

    # 4. 【核心向量化步骤】一次性计算所有组在所有月份的平均收益
    monthly_returns = merged_df.group_by(['date', 'residual_group']).agg(
        pl.col('stock_return').mean()
    )

    # 5. 【核心向量化步骤】将结果从长格式转换为宽格式 (pivot)
    #    【代码修正】根据Polars API更新，将 'columns' 参数重命名为 'on'
    portfolio_returns_df = monthly_returns.pivot(
        index='date',
        on='residual_group',
        values='stock_return'
    ).sort('date')

    # 重命名列以匹配后续格式
    new_cols = {'date': 'date'}
    for i in range(1, num_groups + 1):
        if f'G{i}' in portfolio_returns_df.columns:
            new_cols[f'G{i}'] = f'portfolio_return_g{i}'
    portfolio_returns_df = portfolio_returns_df.rename(new_cols)

    # 填充缺失月份，确保时间序列连续
    full_date_range = pl.date_range(
        portfolio_returns_df.get_column('date').min(),
        portfolio_returns_df.get_column('date').max(),
        interval="1mo",
        eager=True
    ).alias("date")

    portfolio_returns_df = pl.DataFrame(full_date_range).join(
        portfolio_returns_df, on='date', how='left'
    ).fill_null(0.0)

    print(f"向量化回测完成，已生成 {len(portfolio_returns_df)} 条月度收益记录。\n")

    return portfolio_returns_df, benchmark_df


# =================================================================== #
#                   【2. 绩效计算与保存 (Polars)】                    #
# =================================================================== #

def calculate_performance_and_save_polars(portfolio_returns_df, benchmark_df, config):
    """为所有分组计算绩效并保存结果。"""
    print("--- 步骤 3: 计算并保存所有分组的绩效 ---")

    output_dir = config['OUTPUT_DIR']
    output_dir.mkdir(parents=True, exist_ok=True)
    returns_output_file = output_dir / f"{config['STRATEGY_NAME']}_returns.csv"
    performance_output_file = output_dir / f"{config['STRATEGY_NAME']}_performance.csv"

    num_groups = config['NUM_GROUPS']
    risk_free_rate = config['RISK_FREE_RATE']

    # 合并基准收益
    final_returns_df = portfolio_returns_df.join(benchmark_df, on='date', how='left').fill_null(0)

    # 计算所有分组的累计收益
    cumulative_exprs = []
    for i in range(1, num_groups + 1):
        return_col = f'portfolio_return_g{i}'
        if return_col in final_returns_df.columns:
            cumulative_exprs.append(
                (1 + pl.col(return_col)).cum_prod().alias(f'cumulative_return_g{i}')
            )
    final_returns_df = final_returns_df.with_columns(cumulative_exprs)

    # 计算各项绩效指标
    all_metrics = []
    total_months = len(final_returns_df)

    # 计算基准的年化收益率，用于后续计算超额收益
    annualized_benchmark_return = ((1 + final_returns_df['benchmark_return']).product() ** (12 / total_months) - 1)

    for group_id in range(1, num_groups + 1):
        return_col = f'portfolio_return_g{group_id}'
        cum_return_col = f'cumulative_return_g{group_id}'
        if return_col not in final_returns_df.columns: continue

        # 使用 Polars 表达式一次性计算所有指标
        group_perf = final_returns_df.select(
            pl.col(return_col).alias('return'),
            pl.col(cum_return_col).alias('cum_return'),
            (pl.col(return_col) - pl.col('benchmark_return')).alias('excess_return')
        ).select(
            annualized_return = ((pl.col('cum_return').last()) ** (12 / total_months) - 1),
            annualized_volatility = (pl.col('return').std() * np.sqrt(12)),
            max_drawdown = (((pl.col('cum_return') / pl.col('cum_return').cum_max()) - 1).min()),
            tracking_error = (pl.col('excess_return').std() * np.sqrt(12))
        ).row(0, named=True)

        annualized_return = group_perf['annualized_return']
        annualized_volatility = group_perf['annualized_volatility']
        sharpe_ratio = (annualized_return - risk_free_rate) / annualized_volatility if annualized_volatility != 0 else 0
        annualized_excess_return = annualized_return - annualized_benchmark_return
        tracking_error = group_perf['tracking_error']
        information_ratio = annualized_excess_return / tracking_error if tracking_error != 0 else 0

        metrics = {
            'group': f"Group {group_id}",
            '年化收益率': annualized_return,
            '年化波动率': annualized_volatility,
            '夏普比率': sharpe_ratio,
            '最大回撤': group_perf['max_drawdown'],
            '累计收益率': final_returns_df[cum_return_col].last() - 1,
            '年化超额收益率': annualized_excess_return,
            '信息比率': information_ratio,
            '跟踪误差': tracking_error
        }
        all_metrics.append(metrics)

    performance_df = pl.DataFrame(all_metrics)

    # 【代码修正】为基准计算完整的绩效指标，并创建一个形状匹配的DataFrame
    benchmark_perf = final_returns_df.select(
        pl.col('benchmark_return').alias('return')
    ).with_columns(
        (1 + pl.col('return')).cum_prod().alias('cum_return')
    ).select(
        annualized_return = (((pl.col('cum_return').last()) ** (12 / total_months)) - 1),
        annualized_volatility = (pl.col('return').std() * np.sqrt(12)),
        max_drawdown = (((pl.col('cum_return') / pl.col('cum_return').cum_max()) - 1).min()),
        total_cumulative_return = pl.col('cum_return').last()
    ).row(0, named=True)

    benchmark_row_data = {
        'group': '基准 (沪深300)',
        '年化收益率': benchmark_perf['annualized_return'],
        '年化波动率': benchmark_perf['annualized_volatility'],
        '夏普比率': (benchmark_perf['annualized_return'] - risk_free_rate) / benchmark_perf['annualized_volatility'] if benchmark_perf['annualized_volatility'] != 0 else 0,
        '最大回撤': benchmark_perf['max_drawdown'],
        '累计收益率': benchmark_perf['total_cumulative_return'] - 1,
        '年化超额收益率': None,
        '信息比率': None,
        '跟踪误差': None
    }
    benchmark_row = pl.DataFrame([benchmark_row_data])

    # 使用 concat 进行安全的合并
    performance_df = pl.concat([performance_df, benchmark_row], how='vertical')

    final_returns_df.write_csv(returns_output_file, float_precision=6)
    print(f"\n所有分组的月度收益率详情已保存至: {returns_output_file}")
    performance_df.write_csv(performance_output_file, float_precision=6)
    print(f"所有分组的绩效指标已保存至: {performance_output_file}")
    print(f"\n--- {config['STRATEGY_NAME']} 各分组绩效简报 ---")
    print(performance_df)


# =================================================================== #
#                          【3. 主函数执行】                          #
# =================================================================== #

def main(config):
    """主执行函数"""
    start_time = time.time()

    # 步骤 1 & 2: 执行全向量化回测
    portfolio_returns_df, benchmark_df = run_grouped_backtest_polars(config)

    if portfolio_returns_df is None:
        print("回测失败，程序终止。")
        return

    # 步骤 3: 计算并保存绩效
    calculate_performance_and_save_polars(portfolio_returns_df, benchmark_df, config)

    end_time = time.time()
    print(f"\n--- 所有任务完成！总耗时: {end_time - start_time:.2f} 秒 ---")

# =================================================================== #
#                       【4. 脚本执行入口】                              #
# =================================================================== #

if __name__ == "__main__":

    # --- 定义 pbroe7.1 分组回测的配置 ---
    CONFIG_PBROE7_GROUPED = {
        # --- 策略与输出配置 ---
        "STRATEGY_NAME": "pbroe7.1_grouped_backtest_polars",
        "OUTPUT_DIR": Path("E:/PBROE/ch7pl/backtest_results_polars"),

        # --- 输入文件路径 ---
        "RESIDUAL_FILE": Path("E:/PBROE/ch7pl/pbroe7.1_residuals_and_quantiles_pure_polars.csv"),
        "RETURNS_FILE": Path("E:/PBROE/data/TRDNEW_Mnth.csv"),
        "BENCHMARK_FILE": Path("E:/PBROE/data/benchmark_indices.csv"),

        # --- 策略核心参数 ---
        "NUM_GROUPS": 10,

        # --- 通用回测参数 ---
        "BACKTEST_START_DATE": '2010-05-01',
        "BACKTEST_END_DATE": '2025-04-30',
        "BENCHMARK_CODE": '000300',
        "RISK_FREE_RATE": 0.03
    }

    # --- 执行回测 ---
    # 调用主函数，并传入配置字典
    main(CONFIG_PBROE7_GROUPED)


--- 步骤 1: 加载数据 (Polars) ---
所有数据加载成功。

--- 步骤 2: 构建投资组合并执行回测 (Polars) ---
向量化回测完成，已生成 180 条月度收益记录。

--- 步骤 3: 计算并保存所有分组的绩效 ---

所有分组的月度收益率详情已保存至: E:\PBROE\ch7pl\backtest_results_polars\pbroe7.1_grouped_backtest_polars_returns.csv
所有分组的绩效指标已保存至: E:\PBROE\ch7pl\backtest_results_polars\pbroe7.1_grouped_backtest_polars_performance.csv

--- pbroe7.1_grouped_backtest_polars 各分组绩效简报 ---
shape: (11, 9)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ group     ┆ 年化收益  ┆ 年化波动  ┆ 夏普比率  ┆ … ┆ 累计收益  ┆ 年化超额  ┆ 信息比率  ┆ 跟踪误差 │
│ ---       ┆ 率        ┆ 率        ┆ ---       ┆   ┆ 率        ┆ 收益率    ┆ ---       ┆ ---      │
│ str       ┆ ---       ┆ ---       ┆ f64       ┆   ┆ ---       ┆ ---       ┆ f64       ┆ f64      │
│           ┆ f64       ┆ f64       ┆           ┆   ┆ f64       ┆ f64       ┆           ┆          │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ Group 1   ┆ 0.150373  ┆ 0.25

In [16]:
# 分位数残差
# pbroe7_backtest_engine_polars.py
# 一个为PB-ROE系列策略设计的、支持分组回测的通用引擎
# 版本：Polars全向量化版 (高性能、高稳定性)

import polars as pl
import numpy as np
from pathlib import Path
import time

# =================================================================== #
#                       【1. 核心回测模块 (Polars)】                    #
# =================================================================== #

def run_grouped_backtest_polars(config):
    """
    (核心函数) 使用 Polars 全向量化方法，对所有分组一次性完成回测。
    """
    print("--- 步骤 1: 加载数据 (Polars) ---")
    try:
        # 加载策略分组数据
        strategy_df = pl.read_csv(config['RESIDUAL_FILE']).with_columns(
            pl.col('调入日期').str.to_date(format='%Y-%m-%d'),
            pl.col('stkcd').cast(pl.Utf8).str.zfill(6)
        )

        # 加载收益率数据
        returns_df = pl.read_csv(config['RETURNS_FILE']).select(
            pl.col('Stkcd').cast(pl.Utf8).str.zfill(6).alias('stkcd'),
            pl.col('Trdmnt').str.to_date(format='%Y-%m').alias('date'),
            pl.col('Mretwd').cast(pl.Float64, strict=False).alias('stock_return')
        )

        # 加载基准数据
        all_benchmarks_df = pl.read_csv(config['BENCHMARK_FILE'])
        benchmark_df = all_benchmarks_df.filter(
            pl.col('Indexcd').cast(pl.Utf8).str.zfill(6) == config['BENCHMARK_CODE']
        ).select(
            pl.col('Month').str.to_date(format='%Y-%m').alias('date'),
            pl.col('Idxrtn').alias('benchmark_return')
        )
        print("所有数据加载成功。")

    except Exception as e:
        print(f"错误: 加载数据时出错: {e}。程序终止。")
        return None, None

    print("\n--- 步骤 2: 构建投资组合并执行回测 (Polars) ---")

    # 1. 创建分组
    num_groups = config['NUM_GROUPS']
    grouping_col = config['GROUPING_COLUMN'] # 从配置中读取分组列

    # =================================================================== #
    #         【代码修正 - 使用 when/then 手动实现 cut】         #
    # =================================================================== #
    # 由于 cut 函数在不同 Polars 版本中 API 不稳定，我们使用更稳健的
    # when/then 表达式链来手动实现精确的区间分组。

    # 动态构建 when/then 表达式链
    # (0, 0.1] -> G1, (0.1, 0.2] -> G2, ...
    # when/then 链会按顺序执行，第一个满足条件的分支会被采纳。
    grouping_expr = pl.when(pl.col(grouping_col) <= (1 / num_groups)).then(pl.lit("G1"))
    for i in range(2, num_groups + 1):
        upper_bound = i / num_groups
        grouping_expr = grouping_expr.when(pl.col(grouping_col) <= upper_bound).then(pl.lit(f"G{i}"))

    strategy_df = strategy_df.with_columns(
        grouping_expr.otherwise(pl.lit(None)).alias('residual_group')
    ).drop_nulls('residual_group').rename({'调入日期': 'date'})


    # 2. 筛选回测周期
    start_date = pl.lit(config['BACKTEST_START_DATE']).str.to_date()
    end_date = pl.lit(config['BACKTEST_END_DATE']).str.to_date()
    strategy_df = strategy_df.filter(pl.col('date').is_between(start_date, end_date))

    # 3. 【核心向量化步骤】将策略数据与收益数据合并
    merged_df = strategy_df.join(returns_df, on=['date', 'stkcd'], how='inner')

    # 4. 【核心向量化步骤】一次性计算所有组在所有月份的平均收益
    monthly_returns = merged_df.group_by(['date', 'residual_group']).agg(
        pl.col('stock_return').mean()
    )

    # 5. 【核心向量化步骤】将结果从长格式转换为宽格式 (pivot)
    portfolio_returns_df = monthly_returns.pivot(
        index='date',
        on='residual_group',
        values='stock_return'
    ).sort('date')

    # 重命名列以匹配后续格式
    new_cols = {'date': 'date'}
    for i in range(1, num_groups + 1):
        if f"G{i}" in portfolio_returns_df.columns:
            new_cols[f"G{i}"] = f'portfolio_return_g{i}'
    portfolio_returns_df = portfolio_returns_df.rename(new_cols)

    # 填充缺失月份，确保时间序列连续
    if portfolio_returns_df.is_empty():
        print("警告：在指定的回测周期内没有生成任何投资组合收益。")
        return None, None

    full_date_range = pl.date_range(
        portfolio_returns_df.get_column('date').min(),
        portfolio_returns_df.get_column('date').max(),
        interval="1mo",
        eager=True
    ).alias("date")

    portfolio_returns_df = pl.DataFrame(full_date_range).join(
        portfolio_returns_df, on='date', how='left'
    ).fill_null(0.0)

    print(f"向量化回测完成，已生成 {len(portfolio_returns_df)} 条月度收益记录。\n")

    return portfolio_returns_df, benchmark_df


# =================================================================== #
#                   【2. 绩效计算与保存 (Polars)】                    #
# =================================================================== #

def calculate_performance_and_save_polars(portfolio_returns_df, benchmark_df, config):
    """为所有分组计算绩效并保存结果。"""
    print("--- 步骤 3: 计算并保存所有分组的绩效 ---")

    output_dir = config['OUTPUT_DIR']
    output_dir.mkdir(parents=True, exist_ok=True)
    returns_output_file = output_dir / f"{config['STRATEGY_NAME']}_returns.csv"
    performance_output_file = output_dir / f"{config['STRATEGY_NAME']}_performance.csv"

    num_groups = config['NUM_GROUPS']
    risk_free_rate = config['RISK_FREE_RATE']

    # 合并基准收益
    final_returns_df = portfolio_returns_df.join(benchmark_df, on='date', how='left').fill_null(0)

    # 计算所有分组的累计收益
    cumulative_exprs = []
    for i in range(1, num_groups + 1):
        return_col = f'portfolio_return_g{i}'
        if return_col in final_returns_df.columns:
            cumulative_exprs.append(
                (1 + pl.col(return_col)).cum_prod().alias(f'cumulative_return_g{i}')
            )
    final_returns_df = final_returns_df.with_columns(cumulative_exprs)

    # 计算各项绩效指标
    all_metrics = []
    total_months = len(final_returns_df)

    if total_months == 0:
        print("错误：无法计算绩效，因为没有有效的月度收益数据。")
        return

    # 计算基准的年化收益率，用于后续计算超额收益
    annualized_benchmark_return = ((1 + final_returns_df['benchmark_return']).product() ** (12 / total_months) - 1)

    for group_id in range(1, num_groups + 1):
        return_col = f'portfolio_return_g{group_id}'
        cum_return_col = f'cumulative_return_g{group_id}'
        if return_col not in final_returns_df.columns: continue

        # 使用 Polars 表达式一次性计算所有指标
        group_perf = final_returns_df.select(
            pl.col(return_col).alias('return'),
            pl.col(cum_return_col).alias('cum_return'),
            (pl.col(return_col) - pl.col('benchmark_return')).alias('excess_return')
        ).select(
            annualized_return = ((pl.col('cum_return').last()) ** (12 / total_months) - 1),
            annualized_volatility = (pl.col('return').std() * np.sqrt(12)),
            max_drawdown = (((pl.col('cum_return') / pl.col('cum_return').cum_max()) - 1).min()),
            tracking_error = (pl.col('excess_return').std() * np.sqrt(12))
        ).row(0, named=True)

        annualized_return = group_perf['annualized_return']
        annualized_volatility = group_perf['annualized_volatility']
        sharpe_ratio = (annualized_return - risk_free_rate) / annualized_volatility if annualized_volatility != 0 else 0
        annualized_excess_return = annualized_return - annualized_benchmark_return
        tracking_error = group_perf['tracking_error']
        information_ratio = annualized_excess_return / tracking_error if tracking_error != 0 else 0

        metrics = {
            'group': f"Group {group_id}",
            '年化收益率': annualized_return,
            '年化波动率': annualized_volatility,
            '夏普比率': sharpe_ratio,
            '最大回撤': group_perf['max_drawdown'],
            '累计收益率': final_returns_df[cum_return_col].last() - 1,
            '年化超额收益率': annualized_excess_return,
            '信息比率': information_ratio,
            '跟踪误差': tracking_error
        }
        all_metrics.append(metrics)

    performance_df = pl.DataFrame(all_metrics)

    # 为基准计算完整的绩效指标
    benchmark_perf = final_returns_df.select(
        pl.col('benchmark_return').alias('return')
    ).with_columns(
        (1 + pl.col('return')).cum_prod().alias('cum_return')
    ).select(
        annualized_return = (((pl.col('cum_return').last()) ** (12 / total_months)) - 1),
        annualized_volatility = (pl.col('return').std() * np.sqrt(12)),
        max_drawdown = (((pl.col('cum_return') / pl.col('cum_return').cum_max()) - 1).min()),
        total_cumulative_return = pl.col('cum_return').last()
    ).row(0, named=True)

    benchmark_row_data = {
        'group': '基准 (沪深300)',
        '年化收益率': benchmark_perf['annualized_return'],
        '年化波动率': benchmark_perf['annualized_volatility'],
        '夏普比率': (benchmark_perf['annualized_return'] - risk_free_rate) / benchmark_perf['annualized_volatility'] if benchmark_perf['annualized_volatility'] != 0 else 0,
        '最大回撤': benchmark_perf['max_drawdown'],
        '累计收益率': benchmark_perf['total_cumulative_return'] - 1,
        '年化超额收益率': None,
        '信息比率': None,
        '跟踪误差': None
    }
    benchmark_row = pl.DataFrame([benchmark_row_data])

    # 使用 concat 进行安全的合并
    performance_df = pl.concat([performance_df, benchmark_row], how='vertical')

    final_returns_df.write_csv(returns_output_file, float_precision=6)
    print(f"\n所有分组的月度收益率详情已保存至: {returns_output_file}")
    performance_df.write_csv(performance_output_file, float_precision=6)
    print(f"所有分组的绩效指标已保存至: {performance_output_file}")
    print(f"\n--- {config['STRATEGY_NAME']} 各分组绩效简报 ---")
    print(performance_df)


# =================================================================== #
#                          【3. 主函数执行】                          #
# =================================================================== #

def main(config):
    """主执行函数"""
    start_time = time.time()

    # 步骤 1 & 2: 执行全向量化回测
    portfolio_returns_df, benchmark_df = run_grouped_backtest_polars(config)

    if portfolio_returns_df is None:
        print("回测失败，程序终止。")
        return

    # 步骤 3: 计算并保存绩效
    calculate_performance_and_save_polars(portfolio_returns_df, benchmark_df, config)

    end_time = time.time()
    print(f"\n--- 所有任务完成！总耗时: {end_time - start_time:.2f} 秒 ---")

# =================================================================== #
#                       【4. 脚本执行入口】                              #
# =================================================================== #

if __name__ == "__main__":

    # --- 定义 pbroe7.2 时序分位数回测的配置 ---
    CONFIG_PBROE7_QUANTILE = {
        # --- 策略与输出配置 ---
        "STRATEGY_NAME": "pbroe7.2_quantile_backtest_polars",
        "OUTPUT_DIR": Path("E:/PBROE/ch7pl/backtest_results_polars"),

        # --- 输入文件路径 ---
        "RESIDUAL_FILE": Path("E:/PBROE/ch7pl/pbroe7.1_residuals_and_quantiles_pure_polars.csv"),
        "RETURNS_FILE": Path("E:/PBROE/data/TRDNEW_Mnth.csv"),
        "BENCHMARK_FILE": Path("E:/PBROE/data/benchmark_indices.csv"),

        # --- 策略核心参数 ---
        "GROUPING_COLUMN": "residual_quantile_10m", # 明确指定分组列
        "NUM_GROUPS": 10,

        # --- 通用回测参数 ---
        "BACKTEST_START_DATE": '2010-05-01',
        "BACKTEST_END_DATE": '2025-04-30',
        "BENCHMARK_CODE": '000300',
        "RISK_FREE_RATE": 0.03
    }

    # --- 执行回测 ---
    # 调用主函数，并传入配置字典
    main(CONFIG_PBROE7_QUANTILE)


--- 步骤 1: 加载数据 (Polars) ---
所有数据加载成功。

--- 步骤 2: 构建投资组合并执行回测 (Polars) ---
向量化回测完成，已生成 180 条月度收益记录。

--- 步骤 3: 计算并保存所有分组的绩效 ---

所有分组的月度收益率详情已保存至: E:\PBROE\ch7pl\backtest_results_polars\pbroe7.2_quantile_backtest_polars_returns.csv
所有分组的绩效指标已保存至: E:\PBROE\ch7pl\backtest_results_polars\pbroe7.2_quantile_backtest_polars_performance.csv

--- pbroe7.2_quantile_backtest_polars 各分组绩效简报 ---
shape: (11, 9)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ group     ┆ 年化收益  ┆ 年化波动  ┆ 夏普比率  ┆ … ┆ 累计收益  ┆ 年化超额  ┆ 信息比率  ┆ 跟踪误差 │
│ ---       ┆ 率        ┆ 率        ┆ ---       ┆   ┆ 率        ┆ 收益率    ┆ ---       ┆ ---      │
│ str       ┆ ---       ┆ ---       ┆ f64       ┆   ┆ ---       ┆ ---       ┆ f64       ┆ f64      │
│           ┆ f64       ┆ f64       ┆           ┆   ┆ f64       ┆ f64       ┆           ┆          │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ Group 1   ┆ 0.182689  ┆ 0

In [17]:
# pbroe7_intersection_backtest_polars.py
# 一个为PB-ROE系列策略设计的、专门用于回测双因子交集的引擎
# 版本：Polars全向量化版 (高性能、高稳定性)

import polars as pl
import numpy as np
from pathlib import Path
import time

# =================================================================== #
#                       【1. 核心回测模块 (Polars)】                    #
# =================================================================== #

def run_intersection_backtest_polars(config):
    """
    (核心函数) 使用 Polars 全向量化方法，回测双因子交集。
    """
    print("--- 步骤 1: 加载数据 (Polars) ---")
    try:
        # 加载策略分组数据
        strategy_df = pl.read_csv(config['RESIDUAL_FILE']).with_columns(
            pl.col('调入日期').str.to_date(format='%Y-%m-%d'),
            pl.col('stkcd').cast(pl.Utf8).str.zfill(6)
        )

        # 加载收益率数据
        returns_df = pl.read_csv(config['RETURNS_FILE']).select(
            pl.col('Stkcd').cast(pl.Utf8).str.zfill(6).alias('stkcd'),
            pl.col('Trdmnt').str.to_date(format='%Y-%m').alias('date'),
            pl.col('Mretwd').cast(pl.Float64, strict=False).alias('stock_return')
        )

        # 加载基准数据
        all_benchmarks_df = pl.read_csv(config['BENCHMARK_FILE'])
        benchmark_df = all_benchmarks_df.filter(
            pl.col('Indexcd').cast(pl.Utf8).str.zfill(6) == config['BENCHMARK_CODE']
        ).select(
            pl.col('Month').str.to_date(format='%Y-%m').alias('date'),
            pl.col('Idxrtn').alias('benchmark_return')
        )
        print("所有数据加载成功。")

    except Exception as e:
        print(f"错误: 加载数据时出错: {e}。程序终止。")
        return None, None

    print("\n--- 步骤 2: 构建交集投资组合 ---")

    # =================================================================== #
    #                【代码修正 - 调整筛选标准】                #
    # =================================================================== #
    # 1. 构建 residual_zscore 的前 20% 组合
    #    使用 qcut(5, ...) 进行五等分分组，选出残差最低的20%
    df_zscore_g1 = strategy_df.with_columns(
        group_z = pl.col('residual_zscore').qcut(5, labels=[f"G{i}" for i in range(1, 6)])
    ).filter(pl.col('group_z') == 'G1').rename({'调入日期': 'date'})
    print(f"已构建 Z-Score 最低 20% 组合，共 {len(df_zscore_g1)} 条记录。")

    # 2. 构建 residual_quantile_10m 的前 10% 组合
    #    选出时序分位数在 (0, 0.1] 区间的股票
    df_quantile_g1 = strategy_df.with_columns(
        group_q = pl.when(pl.col('residual_quantile_10m') <= 0.10).then(pl.lit("G1"))
    ).filter(pl.col('group_q') == 'G1').rename({'调入日期': 'date'})
    print(f"已构建 Quantile 最低 10% 组合，共 {len(df_quantile_g1)} 条记录。")

    # 3. 找出两个组合在每个调仓日的交集
    intersection_portfolio = df_zscore_g1.join(
        df_quantile_g1.select(['date', 'stkcd']), # 只需关键列即可
        on=['date', 'stkcd'],
        how='inner'
    )
    print(f"已构建交集组合，共 {len(intersection_portfolio)} 条记录。")

    # 4. 筛选回测周期
    start_date = pl.lit(config['BACKTEST_START_DATE']).str.to_date()
    end_date = pl.lit(config['BACKTEST_END_DATE']).str.to_date()
    intersection_portfolio = intersection_portfolio.filter(pl.col('date').is_between(start_date, end_date))

    # 5. 【核心向量化步骤】将交集组合与收益数据合并
    merged_df = intersection_portfolio.join(returns_df, on=['date', 'stkcd'], how='inner')

    # 6. 【核心向量化步骤】计算交集组合的等权月度收益
    portfolio_returns_df = merged_df.group_by('date').agg(
        pl.col('stock_return').mean().alias('portfolio_return')
    ).sort('date')

    # 填充缺失月份，确保时间序列连续
    if portfolio_returns_df.is_empty():
        print("警告：在指定的回测周期内没有生成任何投资组合收益。")
        return None, None

    full_date_range = pl.date_range(
        portfolio_returns_df.get_column('date').min(),
        portfolio_returns_df.get_column('date').max(),
        interval="1mo",
        eager=True
    ).alias("date")

    portfolio_returns_df = pl.DataFrame(full_date_range).join(
        portfolio_returns_df, on='date', how='left'
    ).fill_null(0.0)

    print(f"向量化回测完成，已生成 {len(portfolio_returns_df)} 条月度收益记录。\n")

    return portfolio_returns_df, benchmark_df


# =================================================================== #
#                   【2. 绩效计算与保存 (Polars)】                    #
# =================================================================== #

def calculate_performance_and_save_polars(portfolio_returns_df, benchmark_df, config):
    """为交集组合计算绩效并保存结果。"""
    print("--- 步骤 3: 计算并保存交集组合的绩效 ---")

    output_dir = config['OUTPUT_DIR']
    output_dir.mkdir(parents=True, exist_ok=True)
    returns_output_file = output_dir / f"{config['STRATEGY_NAME']}_returns.csv"
    performance_output_file = output_dir / f"{config['STRATEGY_NAME']}_performance.csv"

    risk_free_rate = config['RISK_FREE_RATE']

    # 合并基准收益
    final_returns_df = portfolio_returns_df.join(benchmark_df, on='date', how='left').fill_null(0)

    # 计算组合和基准的累计收益
    final_returns_df = final_returns_df.with_columns(
        (1 + pl.col('portfolio_return')).cum_prod().alias('cumulative_portfolio_return'),
        (1 + pl.col('benchmark_return')).cum_prod().alias('cumulative_benchmark_return')
    )

    # 计算各项绩效指标
    total_months = len(final_returns_df)
    if total_months == 0:
        print("错误：无法计算绩效，因为没有有效的月度收益数据。")
        return

    # 计算组合指标
    portfolio_perf = final_returns_df.select(
        pl.col('portfolio_return').alias('return'),
        pl.col('cumulative_portfolio_return').alias('cum_return'),
        (pl.col('portfolio_return') - pl.col('benchmark_return')).alias('excess_return')
    ).select(
        annualized_return = ((pl.col('cum_return').last()) ** (12 / total_months) - 1),
        annualized_volatility = (pl.col('return').std() * np.sqrt(12)),
        max_drawdown = (((pl.col('cum_return') / pl.col('cum_return').cum_max()) - 1).min()),
        tracking_error = (pl.col('excess_return').std() * np.sqrt(12))
    ).row(0, named=True)

    # 计算基准指标
    benchmark_perf = final_returns_df.select(
        pl.col('benchmark_return').alias('return'),
        pl.col('cumulative_benchmark_return').alias('cum_return')
    ).select(
        annualized_return = ((pl.col('cum_return').last()) ** (12 / total_months) - 1),
        annualized_volatility = (pl.col('return').std() * np.sqrt(12)),
        max_drawdown = (((pl.col('cum_return') / pl.col('cum_return').cum_max()) - 1).min())
    ).row(0, named=True)

    annualized_excess_return = portfolio_perf['annualized_return'] - benchmark_perf['annualized_return']
    tracking_error = portfolio_perf['tracking_error']
    information_ratio = annualized_excess_return / tracking_error if tracking_error != 0 else 0

    # 整理并合并结果
    all_metrics = [
        {
            'group': '交集策略 (0.2+0.1)',
            '年化收益率': portfolio_perf['annualized_return'],
            '年化波动率': portfolio_perf['annualized_volatility'],
            '夏普比率': (portfolio_perf['annualized_return'] - risk_free_rate) / portfolio_perf['annualized_volatility'] if portfolio_perf['annualized_volatility'] != 0 else 0,
            '最大回撤': portfolio_perf['max_drawdown'],
            '累计收益率': final_returns_df['cumulative_portfolio_return'].last() - 1,
            '年化超额收益率': annualized_excess_return,
            '信息比率': information_ratio,
            '跟踪误差': tracking_error
        },
        {
            'group': '基准 (沪深300)',
            '年化收益率': benchmark_perf['annualized_return'],
            '年化波动率': benchmark_perf['annualized_volatility'],
            '夏普比率': (benchmark_perf['annualized_return'] - risk_free_rate) / benchmark_perf['annualized_volatility'] if benchmark_perf['annualized_volatility'] != 0 else 0,
            '最大回撤': benchmark_perf['max_drawdown'],
            '累计收益率': final_returns_df['cumulative_benchmark_return'].last() - 1,
            '年化超额收益率': None, '信息比率': None, '跟踪误差': None
        }
    ]
    performance_df = pl.DataFrame(all_metrics)

    final_returns_df.write_csv(returns_output_file, float_precision=6)
    print(f"\n交集组合的月度收益率详情已保存至: {returns_output_file}")
    performance_df.write_csv(performance_output_file, float_precision=6)
    print(f"交集组合的绩效指标已保存至: {performance_output_file}")
    print(f"\n--- {config['STRATEGY_NAME']} 绩效简报 ---")
    print(performance_df)


# =================================================================== #
#                          【3. 主函数执行】                          #
# =================================================================== #

def main(config):
    """主执行函数"""
    start_time = time.time()
    portfolio_returns_df, benchmark_df = run_intersection_backtest_polars(config)
    if portfolio_returns_df is not None:
        calculate_performance_and_save_polars(portfolio_returns_df, benchmark_df, config)
    end_time = time.time()
    print(f"\n--- 所有任务完成！总耗时: {end_time - start_time:.2f} 秒 ---")

# =================================================================== #
#                       【4. 脚本执行入口】                              #
# =================================================================== #

if __name__ == "__main__":

    # --- 定义 pbroe7.4 (0.2+0.1)交集策略回测的配置 ---
    CONFIG_INTERSECTION_20_10 = {
        # --- 策略与输出配置 ---
        "STRATEGY_NAME": "pbroe7.4_intersection_20_10_backtest",
        "OUTPUT_DIR": Path("E:/PBROE/ch7pl/backtest_results_polars"),

        # --- 输入文件路径 ---
        "RESIDUAL_FILE": Path("E:/PBROE/ch7pl/pbroe7.1_residuals_and_quantiles_pure_polars.csv"),
        "RETURNS_FILE": Path("E:/PBROE/data/TRDNEW_Mnth.csv"),
        "BENCHMARK_FILE": Path("E:/PBROE/data/benchmark_indices.csv"),

        # --- 通用回测参数 ---
        "BACKTEST_START_DATE": '2010-05-01',
        "BACKTEST_END_DATE": '2025-04-30',
        "BENCHMARK_CODE": '000300',
        "RISK_FREE_RATE": 0.03
    }

    # --- 执行回测 ---
    main(CONFIG_INTERSECTION_20_10)


--- 步骤 1: 加载数据 (Polars) ---
所有数据加载成功。

--- 步骤 2: 构建交集投资组合 ---
已构建 Z-Score 最低 20% 组合，共 105479 条记录。
已构建 Quantile 最低 10% 组合，共 83307 条记录。
已构建交集组合，共 26958 条记录。
向量化回测完成，已生成 180 条月度收益记录。

--- 步骤 3: 计算并保存交集组合的绩效 ---

交集组合的月度收益率详情已保存至: E:\PBROE\ch7pl\backtest_results_polars\pbroe7.4_intersection_20_10_backtest_returns.csv
交集组合的绩效指标已保存至: E:\PBROE\ch7pl\backtest_results_polars\pbroe7.4_intersection_20_10_backtest_performance.csv

--- pbroe7.4_intersection_20_10_backtest 绩效简报 ---
shape: (2, 9)
┌────────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬──────────┬──────────┐
│ group      ┆ 年化收益  ┆ 年化波动  ┆ 夏普比率  ┆ … ┆ 累计收益  ┆ 年化超额  ┆ 信息比率 ┆ 跟踪误差 │
│ ---        ┆ 率        ┆ 率        ┆ ---       ┆   ┆ 率        ┆ 收益率    ┆ ---      ┆ ---      │
│ str        ┆ ---       ┆ ---       ┆ f64       ┆   ┆ ---       ┆ ---       ┆ f64      ┆ f64      │
│            ┆ f64       ┆ f64       ┆           ┆   ┆ f64       ┆ f64       ┆          ┆          │
╞════════════╪═══════════╪═══════════╪════

In [56]:
import polars as pl
from pathlib import Path

# =================================================================== #
#                      【1. 配置文件】                                #
# =================================================================== #

# --- 输入文件路径 (请确保与您的回测脚本一致) ---
RESIDUAL_FILE = Path("E:/PBROE/ch7pl/pbroe7.1_residuals_and_quantiles_pure_polars.csv")


# =================================================================== #
#                  【2. 持仓数量统计】                            #
# =================================================================== #

def analyze_holding_counts(residual_file_path: Path):
    """
    分析并打印 (0.2+0.1) 交集策略在每个调仓日的持仓数量。
    """
    print("--- 步骤 1: 加载数据 ---")
    try:
        strategy_df = pl.read_csv(residual_file_path).with_columns(
            pl.col('调入日期').str.to_date(format='%Y-%m-%d'),
            pl.col('stkcd').cast(pl.Utf8).str.zfill(6)
        )
        print("数据加载成功。")
    except Exception as e:
        print(f"错误: 加载数据文件 '{residual_file_path}' 时出错: {e}。")
        return

    print("\n--- 步骤 2: 构建交集投资组合 (与 0.2+0.1 回测脚本逻辑一致) ---")

    # 1. 构建 residual_zscore 的前 20% 组合
    #    使用 qcut(5, ...) 进行五等分分组，选出残差最低的20%
    df_zscore_g1 = strategy_df.with_columns(
        group_z = pl.col('residual_zscore').qcut(5, labels=[f"G{i}" for i in range(1, 6)])
    ).filter(pl.col('group_z') == 'G1').rename({'调入日期': 'date'})

    # 2. 构建 residual_quantile_10m 的前 10% 组合
    #    选出时序分位数在 (0, 0.1] 区间的股票
    df_quantile_g1 = strategy_df.with_columns(
        group_q = pl.when(pl.col('residual_quantile_10m') <= 0.1).then(pl.lit("G1"))
    ).filter(pl.col('group_q') == 'G1').rename({'调入日期': 'date'})

    # 3. 找出两个组合在每个调仓日的交集
    intersection_portfolio = df_zscore_g1.join(
        df_quantile_g1.select(['date', 'stkcd']),
        on=['date', 'stkcd'],
        how='inner'
    )

    print("\n--- 步骤 3: 统计每月持仓数量 ---")
    if intersection_portfolio.is_empty():
        print("交集组合为空，没有任何持仓记录。")
        return

    # 按调仓日期分组，并计算每组的股票数量
    holding_counts = (
        intersection_portfolio
        .group_by('date')
        .len() # .len() 是 .agg(pl.len()) 的简写
        .rename({'len': 'holding_count'})
        .sort('date')
    )

    print("交集策略 (0.2+0.1) 每月持仓数量如下：")
    # 设置 Polars 的打印选项，以显示所有行
    with pl.Config(tbl_rows=-1):
        print(holding_counts)

# =================================================================== #
#                           【4. 执行】                               #
# =================================================================== #

if __name__ == "__main__":
    analyze_holding_counts(RESIDUAL_FILE)



--- 步骤 1: 加载数据 ---
数据加载成功。

--- 步骤 2: 构建交集投资组合 (与 0.2+0.1 回测脚本逻辑一致) ---

--- 步骤 3: 统计每月持仓数量 ---
交集策略 (0.2+0.1) 每月持仓数量如下：
shape: (230, 2)
┌────────────┬───────────────┐
│ date       ┆ holding_count │
│ ---        ┆ ---           │
│ date       ┆ u32           │
╞════════════╪═══════════════╡
│ 2006-03-01 ┆ 35            │
│ 2006-04-01 ┆ 40            │
│ 2006-05-01 ┆ 40            │
│ 2006-06-01 ┆ 48            │
│ 2006-07-01 ┆ 47            │
│ 2006-08-01 ┆ 57            │
│ 2006-09-01 ┆ 39            │
│ 2006-10-01 ┆ 48            │
│ 2006-11-01 ┆ 40            │
│ 2006-12-01 ┆ 39            │
│ 2007-01-01 ┆ 43            │
│ 2007-02-01 ┆ 51            │
│ 2007-03-01 ┆ 53            │
│ 2007-04-01 ┆ 58            │
│ 2007-05-01 ┆ 46            │
│ 2007-06-01 ┆ 69            │
│ 2007-07-01 ┆ 66            │
│ 2007-08-01 ┆ 65            │
│ 2007-09-01 ┆ 56            │
│ 2007-10-01 ┆ 53            │
│ 2007-11-01 ┆ 67            │
│ 2007-12-01 ┆ 43            │
│ 2008-01-01 ┆ 50         

In [74]:
# 三交集
import polars as pl
import numpy as np
from pathlib import Path
import time

# =================================================================== #
#                       【1. 核心回测模块 (Polars)】                    #
# =================================================================== #

def run_intersection_backtest_polars(config):
    """
    (核心函数) 使用 Polars 全向量化方法，回测三因子交集策略。
    """
    print("--- 步骤 1: 加载数据 (Polars) ---")
    try:
        # 加载策略分组数据
        strategy_df = pl.read_csv(config['RESIDUAL_FILE']).with_columns(
            pl.col('调入日期').str.to_date(format='%Y-%m-%d'),
            pl.col('stkcd').cast(pl.Utf8).str.zfill(6)
        )

        # --- 鲁棒性检查：确保所有需要的因子列都存在 ---
        required_factors = list(config['FACTOR_THRESHOLDS'].keys())
        missing_factors = [f for f in required_factors if f not in strategy_df.columns]
        if missing_factors:
            print(f"错误: 输入文件 '{config['RESIDUAL_FILE']}' 缺少以下必需的因子列: {missing_factors}。程序终止。")
            return None, None

        # 加载收益率数据
        returns_df = pl.read_csv(config['RETURNS_FILE']).select(
            pl.col('Stkcd').cast(pl.Utf8).str.zfill(6).alias('stkcd'),
            pl.col('Trdmnt').str.to_date(format='%Y-%m').alias('date'),
            pl.col('Mretwd').cast(pl.Float64, strict=False).alias('stock_return')
        )

        # 加载基准数据
        all_benchmarks_df = pl.read_csv(config['BENCHMARK_FILE'])
        benchmark_df = all_benchmarks_df.filter(
            pl.col('Indexcd').cast(pl.Utf8).str.zfill(6) == config['BENCHMARK_CODE']
        ).select(
            pl.col('Month').str.to_date(format='%Y-%m').alias('date'),
            pl.col('Idxrtn').alias('benchmark_return')
        )
        print("所有数据加载成功。")

    except Exception as e:
        print(f"错误: 加载数据时出错: {e}。程序终止。")
        return None, None

    print("\n--- 步骤 2: 构建三因子交集投资组合 ---")

    thresholds = config['FACTOR_THRESHOLDS']

    # 1. 构建 residual_quantile_10m 组合
    quantile_threshold = thresholds['residual_quantile_10m']
    df_quantile = strategy_df.filter(
        pl.col('residual_quantile_10m') <= quantile_threshold
    ).rename({'调入日期': 'date'})
    print(f"已构建 Quantile <= {quantile_threshold:.0%} 组合，共 {len(df_quantile)} 条记录。")

    # 2. 构建 residual_zscore 组合
    zscore_threshold = thresholds['residual_zscore']
    num_groups_z = int(1 / zscore_threshold)
    df_zscore = strategy_df.with_columns(
        group_z = pl.col('residual_zscore').qcut(num_groups_z, labels=[f"G{i}" for i in range(1, num_groups_z + 1)])
    ).filter(pl.col('group_z') == 'G1').rename({'调入日期': 'date'})
    print(f"已构建 Z-Score 最低 {zscore_threshold:.0%} 组合，共 {len(df_zscore)} 条记录。")

    # 3. 构建 residual_adj 组合
    adj_threshold = thresholds['residual_adj']
    num_groups_adj = int(1 / adj_threshold)
    df_adj = strategy_df.with_columns(
        group_adj = pl.col('residual_adj').qcut(num_groups_adj, labels=[f"G{i}" for i in range(1, num_groups_adj + 1)])
    ).filter(pl.col('group_adj') == 'G1').rename({'调入日期': 'date'})
    print(f"已构建 Adj-Residual 最低 {adj_threshold:.0%} 组合，共 {len(df_adj)} 条记录。")

    # 4. 找出三个组合在每个调仓日的交集
    intersection_portfolio = df_quantile.join(
        df_zscore.select(['date', 'stkcd']), on=['date', 'stkcd'], how='inner'
    ).join(
        df_adj.select(['date', 'stkcd']), on=['date', 'stkcd'], how='inner'
    )
    print(f"已构建三因子交集组合，共 {len(intersection_portfolio)} 条记录。")

    # 5. 筛选回测周期
    start_date = pl.lit(config['BACKTEST_START_DATE']).str.to_date()
    end_date = pl.lit(config['BACKTEST_END_DATE']).str.to_date()
    intersection_portfolio = intersection_portfolio.filter(pl.col('date').is_between(start_date, end_date))

    # 6. 【核心向量化步骤】将交集组合与收益数据合并
    merged_df = intersection_portfolio.join(returns_df, on=['date', 'stkcd'], how='inner')

    # 7. 【核心向量化步骤】计算交集组合的等权月度收益
    portfolio_returns_df = merged_df.group_by('date').agg(
        pl.col('stock_return').mean().alias('portfolio_return')
    ).sort('date')

    # 8. 填充缺失月份，确保时间序列连续
    if portfolio_returns_df.is_empty():
        print("警告：在指定的回测周期内没有生成任何投资组合收益。")
        return None, None

    full_date_range = pl.date_range(
        portfolio_returns_df.get_column('date').min(),
        portfolio_returns_df.get_column('date').max(),
        interval="1mo",
        eager=True
    ).alias("date")

    portfolio_returns_df = pl.DataFrame(full_date_range).join(
        portfolio_returns_df, on='date', how='left'
    ).fill_null(0.0)

    print(f"向量化回测完成，已生成 {len(portfolio_returns_df)} 条月度收益记录。\n")

    return portfolio_returns_df, benchmark_df


# =================================================================== #
#                   【2. 绩效计算与保存 (Polars)】                    #
# =================================================================== #

def calculate_performance_and_save_polars(portfolio_returns_df, benchmark_df, config):
    """为交集组合计算绩效并保存结果。"""
    print("--- 步骤 3: 计算并保存交集组合的绩效 ---")

    output_dir = config['OUTPUT_DIR']
    output_dir.mkdir(parents=True, exist_ok=True)
    returns_output_file = output_dir / f"{config['STRATEGY_NAME']}_returns.csv"
    performance_output_file = output_dir / f"{config['STRATEGY_NAME']}_performance.csv"

    risk_free_rate = config['RISK_FREE_RATE']

    # 合并基准收益
    final_returns_df = portfolio_returns_df.join(benchmark_df, on='date', how='left').fill_null(0)

    # 计算组合和基准的累计收益
    final_returns_df = final_returns_df.with_columns(
        (1 + pl.col('portfolio_return')).cum_prod().alias('cumulative_portfolio_return'),
        (1 + pl.col('benchmark_return')).cum_prod().alias('cumulative_benchmark_return')
    )

    # 计算各项绩效指标
    total_months = len(final_returns_df)
    if total_months == 0:
        print("错误：无法计算绩效，因为没有有效的月度收益数据。")
        return

    # 计算组合指标
    portfolio_perf = final_returns_df.select(
        pl.col('portfolio_return').alias('return'),
        pl.col('cumulative_portfolio_return').alias('cum_return'),
        (pl.col('portfolio_return') - pl.col('benchmark_return')).alias('excess_return')
    ).select(
        annualized_return = ((pl.col('cum_return').last()) ** (12 / total_months) - 1),
        annualized_volatility = (pl.col('return').std() * np.sqrt(12)),
        max_drawdown = (((pl.col('cum_return') / pl.col('cum_return').cum_max()) - 1).min()),
        tracking_error = (pl.col('excess_return').std() * np.sqrt(12))
    ).row(0, named=True)

    # 计算基准指标
    benchmark_perf = final_returns_df.select(
        pl.col('benchmark_return').alias('return'),
        pl.col('cumulative_benchmark_return').alias('cum_return')
    ).select(
        annualized_return = ((pl.col('cum_return').last()) ** (12 / total_months) - 1),
        annualized_volatility = (pl.col('return').std() * np.sqrt(12)),
        max_drawdown = (((pl.col('cum_return') / pl.col('cum_return').cum_max()) - 1).min())
    ).row(0, named=True)

    annualized_excess_return = portfolio_perf['annualized_return'] - benchmark_perf['annualized_return']
    tracking_error = portfolio_perf['tracking_error']
    information_ratio = annualized_excess_return / tracking_error if tracking_error != 0 else 0

    # 整理并合并结果
    all_metrics = [
        {
            'group': '三因子交集策略',
            '年化收益率': portfolio_perf['annualized_return'],
            '年化波动率': portfolio_perf['annualized_volatility'],
            '夏普比率': (portfolio_perf['annualized_return'] - risk_free_rate) / portfolio_perf['annualized_volatility'] if portfolio_perf['annualized_volatility'] != 0 else 0,
            '最大回撤': portfolio_perf['max_drawdown'],
            '累计收益率': final_returns_df['cumulative_portfolio_return'].last() - 1,
            '年化超额收益率': annualized_excess_return,
            '信息比率': information_ratio,
            '跟踪误差': tracking_error
        },
        {
            'group': '基准 (沪深300)',
            '年化收益率': benchmark_perf['annualized_return'],
            '年化波动率': benchmark_perf['annualized_volatility'],
            '夏普比率': (benchmark_perf['annualized_return'] - risk_free_rate) / benchmark_perf['annualized_volatility'] if benchmark_perf['annualized_volatility'] != 0 else 0,
            '最大回撤': benchmark_perf['max_drawdown'],
            '累计收益率': final_returns_df['cumulative_benchmark_return'].last() - 1,
            '年化超额收益率': None, '信息比率': None, '跟踪误差': None
        }
    ]
    performance_df = pl.DataFrame(all_metrics)

    final_returns_df.write_csv(returns_output_file, float_precision=6)
    print(f"\n交集组合的月度收益率详情已保存至: {returns_output_file}")
    performance_df.write_csv(performance_output_file, float_precision=6)
    print(f"交集组合的绩效指标已保存至: {performance_output_file}")
    print(f"\n--- {config['STRATEGY_NAME']} 绩效简报 ---")
    print(performance_df)


# =================================================================== #
#                          【3. 主函数执行】                          #
# =================================================================== #

def main(config):
    """主执行函数"""
    start_time = time.time()
    portfolio_returns_df, benchmark_df = run_intersection_backtest_polars(config)
    if portfolio_returns_df is not None:
        calculate_performance_and_save_polars(portfolio_returns_df, benchmark_df, config)
    end_time = time.time()
    print(f"\n--- 所有任务完成！总耗时: {end_time - start_time:.2f} 秒 ---")

# =================================================================== #
#                       【4. 脚本执行入口】                              #
# =================================================================== #

if __name__ == "__main__":

    # --- 定义 pbroe7.5 三因子交集策略回测的配置 ---
    CONFIG_3FACTOR_INTERSECTION = {
        # --- 策略与输出配置 ---
        "STRATEGY_NAME": "pbroe7.5_3factor_intersection_backtest",
        "OUTPUT_DIR": Path("E:/PBROE/ch7pl/backtest_results_polars"),

        # --- 输入文件路径 ---
        "RESIDUAL_FILE": Path("E:/PBROE/ch7pl/pbroe7.1_residuals_and_quantiles_pure_polars.csv"),
        "RETURNS_FILE": Path("E:/PBROE/data/TRDNEW_Mnth.csv"),
        "BENCHMARK_FILE": Path("E:/PBROE/data/benchmark_indices.csv"),

        # --- 策略核心参数 ---
        "FACTOR_THRESHOLDS": {
            "residual_quantile_10m": 0.1, # 时序残差分位数 <= 10%
            "residual_zscore": 0.2,       # 横截面残差 <= 20%
            "residual_adj": 0.2          # 调整后残差 <= 20%
        },

        # --- 通用回测参数 ---
        "BACKTEST_START_DATE": '2010-05-01',
        "BACKTEST_END_DATE": '2025-04-30',
        "BENCHMARK_CODE": '000300',
        "RISK_FREE_RATE": 0.03
    }

    # --- 执行回测 ---
    main(CONFIG_3FACTOR_INTERSECTION)


--- 步骤 1: 加载数据 (Polars) ---
所有数据加载成功。

--- 步骤 2: 构建三因子交集投资组合 ---
已构建 Quantile <= 10% 组合，共 83307 条记录。
已构建 Z-Score 最低 20% 组合，共 105479 条记录。
已构建 Adj-Residual 最低 20% 组合，共 90461 条记录。
已构建三因子交集组合，共 14859 条记录。
向量化回测完成，已生成 180 条月度收益记录。

--- 步骤 3: 计算并保存交集组合的绩效 ---

交集组合的月度收益率详情已保存至: E:\PBROE\ch7pl\backtest_results_polars\pbroe7.5_3factor_intersection_backtest_returns.csv
交集组合的绩效指标已保存至: E:\PBROE\ch7pl\backtest_results_polars\pbroe7.5_3factor_intersection_backtest_performance.csv

--- pbroe7.5_3factor_intersection_backtest 绩效简报 ---
shape: (2, 9)
┌────────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬──────────┬──────────┐
│ group      ┆ 年化收益  ┆ 年化波动  ┆ 夏普比率  ┆ … ┆ 累计收益  ┆ 年化超额  ┆ 信息比率 ┆ 跟踪误差 │
│ ---        ┆ 率        ┆ 率        ┆ ---       ┆   ┆ 率        ┆ 收益率    ┆ ---      ┆ ---      │
│ str        ┆ ---       ┆ ---       ┆ f64       ┆   ┆ ---       ┆ ---       ┆ f64      ┆ f64      │
│            ┆ f64       ┆ f64       ┆           ┆   ┆ f64       ┆ f64       ┆          ┆  

In [73]:
# 四交集
import polars as pl
import numpy as np
from pathlib import Path
import time

# =================================================================== #
#                       【1. 核心回测模块 (Polars)】                    #
# =================================================================== #

def run_volatility_split_backtest_polars(config):
    """
    (核心函数) 使用 Polars 全向量化方法，回测在三因子交集的基础上，
    根据【年度】ROE波动率高低分组的策略。
    """
    print("--- 步骤 1: 加载数据 (Polars) ---")
    try:
        # 加载策略分组数据
        strategy_df = pl.read_csv(config['RESIDUAL_FILE'], schema_overrides={'stkcd': pl.Utf8}).with_columns(
            pl.col('调入日期').str.to_date(format='%Y-%m-%d')
        )

        # --- 鲁棒性检查：确保所有需要的因子列都存在 ---
        # 使用新的年度波动率列 roe_vol_2y
        required_factors = list(config['FACTOR_THRESHOLDS'].keys()) + ['roe_vol_2y']
        missing_factors = [f for f in required_factors if f not in strategy_df.columns]
        if missing_factors:
            print(f"错误: 输入文件 '{config['RESIDUAL_FILE']}' 缺少以下必需的因子列: {missing_factors}。程序终止。")
            return None, None

        # 加载收益率数据
        returns_df = pl.read_csv(config['RETURNS_FILE']).select(
            pl.col('Stkcd').cast(pl.Utf8).str.zfill(6).alias('stkcd'),
            pl.col('Trdmnt').str.to_date(format='%Y-%m').alias('date'),
            pl.col('Mretwd').cast(pl.Float64, strict=False).alias('stock_return')
        )

        # 加载基准数据
        all_benchmarks_df = pl.read_csv(config['BENCHMARK_FILE'])
        benchmark_df = all_benchmarks_df.filter(
            pl.col('Indexcd').cast(pl.Utf8).str.zfill(6) == config['BENCHMARK_CODE']
        ).select(
            pl.col('Month').str.to_date(format='%Y-%m').alias('date'),
            pl.col('Idxrtn').alias('benchmark_return')
        )
        print("所有数据加载成功。")

    except Exception as e:
        print(f"错误: 加载数据时出错: {e}。程序终止。")
        return None, None

    print("\n--- 步骤 2: 构建投资组合 ---")

    # 1. 找出三因子交集
    thresholds = config['FACTOR_THRESHOLDS']
    intersection_portfolio = strategy_df.filter(
        (pl.col('residual_quantile_10m') <= thresholds['residual_quantile_10m']) &
        (pl.col('residual_zscore').rank().over('调入日期') / pl.len().over('调入日期') <= thresholds['residual_zscore']) &
        (pl.col('residual_adj').rank().over('调入日期') / pl.len().over('调入日期') <= thresholds['residual_adj'])
    ).rename({'调入日期': 'date'})
    print(f"已构建三因子交集组合，共 {len(intersection_portfolio)} 条记录。")

    # 2. 【核心修改】根据【年度】ROE波动率分组 (roe_vol_2y)
    #    首先，计算每个调仓日，交集内部股票的 roe_vol_2y 的中位数
    vol_median_df = intersection_portfolio.group_by('date').agg(
        pl.col('roe_vol_2y').median().alias('vol_median')
    )
    #    然后，将中位数信息合并回交集组合
    intersection_with_vol = intersection_portfolio.join(vol_median_df, on='date', how='left')

    #    根据与中位数的比较，分为高、低波动率两组
    low_vol_portfolio = intersection_with_vol.filter(pl.col('roe_vol_2y') <= pl.col('vol_median'))
    high_vol_portfolio = intersection_with_vol.filter(pl.col('roe_vol_2y') > pl.col('vol_median'))
    print(f"已将交集组合拆分为低波动组 ({len(low_vol_portfolio)} 条) 和高波动组 ({len(high_vol_portfolio)} 条)。")


    # 3. 筛选回测周期
    start_date = pl.lit(config['BACKTEST_START_DATE']).str.to_date()
    end_date = pl.lit(config['BACKTEST_END_DATE']).str.to_date()

    portfolios_to_backtest = {
        "low_vol_portfolio": low_vol_portfolio.filter(pl.col('date').is_between(start_date, end_date)),
        "high_vol_portfolio": high_vol_portfolio.filter(pl.col('date').is_between(start_date, end_date))
    }

    # 4. 【核心向量化步骤】并行计算各组的月度收益
    portfolio_returns_dfs = {}
    for name, portfolio_df in portfolios_to_backtest.items():
        merged_df = portfolio_df.join(returns_df, on=['date', 'stkcd'], how='inner')

        returns_calc_df = merged_df.group_by('date').agg(
            pl.col('stock_return').mean().alias(f'{name}_return')
        ).sort('date')

        portfolio_returns_dfs[name] = returns_calc_df

    # 5. 合并所有收益率序列并填充缺失月份
    if not portfolio_returns_dfs:
        print("警告：在指定的回测周期内没有生成任何投资组合收益。")
        return None, None

    # 创建完整日期范围
    all_dates = pl.concat([df.get_column('date') for df in portfolio_returns_dfs.values()]).unique()
    full_date_range = pl.date_range(
        all_dates.min(), all_dates.max(), interval="1mo", eager=True
    ).alias("date")

    # 合并所有组合收益
    final_portfolio_returns = pl.DataFrame(full_date_range)
    for name, df in portfolio_returns_dfs.items():
        final_portfolio_returns = final_portfolio_returns.join(df, on='date', how='left')

    final_portfolio_returns = final_portfolio_returns.fill_null(0.0)

    print(f"向量化回测完成，已生成 {len(final_portfolio_returns)} 条月度收益记录。\n")

    return final_portfolio_returns, benchmark_df


# =================================================================== #
#                   【2. 绩效计算与保存 (Polars)】                    #
# =================================================================== #

def calculate_performance_and_save_polars(portfolio_returns_df, benchmark_df, config):
    """为多个组合计算绩效并保存结果。"""
    print("--- 步骤 3: 计算并保存各组绩效 ---")

    output_dir = config['OUTPUT_DIR']
    output_dir.mkdir(parents=True, exist_ok=True)
    returns_output_file = output_dir / f"{config['STRATEGY_NAME']}_returns.csv"
    performance_output_file = output_dir / f"{config['STRATEGY_NAME']}_performance.csv"

    risk_free_rate = config['RISK_FREE_RATE']

    # 合并基准收益
    final_returns_df = portfolio_returns_df.join(benchmark_df, on='date', how='left').fill_null(0)

    # --- 循环计算每个策略组的绩效 ---
    all_metrics = []
    portfolio_names = [col for col in final_returns_df.columns if col.endswith('_return')]

    for portfolio_col_name in portfolio_names:
        strategy_name_cn = {
            "low_vol_portfolio_return": "三因子交集-低波动组",
            "high_vol_portfolio_return": "三因子交集-高波动组"
        }.get(portfolio_col_name, portfolio_col_name)

        # 计算累计收益
        cumulative_col_name = f"cumulative_{portfolio_col_name}"
        final_returns_df = final_returns_df.with_columns(
            (1 + pl.col(portfolio_col_name)).cum_prod().alias(cumulative_col_name)
        )

        # 计算绩效指标
        total_months = len(final_returns_df)
        if total_months == 0: continue

        perf = final_returns_df.select(
            annualized_return = ((pl.col(cumulative_col_name).last()) ** (12 / total_months) - 1),
            annualized_volatility = (pl.col(portfolio_col_name).std() * np.sqrt(12)),
            max_drawdown = (((pl.col(cumulative_col_name) / pl.col(cumulative_col_name).cum_max()) - 1).min()),
            annualized_excess_return = (((pl.col(cumulative_col_name).last()) ** (12 / total_months) - 1) - ((1 + pl.col('benchmark_return')).cum_prod().last() ** (12 / total_months) - 1)),
            tracking_error = ((pl.col(portfolio_col_name) - pl.col('benchmark_return')).std() * np.sqrt(12))
        ).row(0, named=True)

        perf['information_ratio'] = perf['annualized_excess_return'] / perf['tracking_error'] if perf['tracking_error'] != 0 else 0
        perf['sharpe_ratio'] = (perf['annualized_return'] - risk_free_rate) / perf['annualized_volatility'] if perf['annualized_volatility'] != 0 else 0
        perf['cumulative_return'] = final_returns_df[cumulative_col_name].last() - 1

        all_metrics.append({
            'group': strategy_name_cn,
            '年化收益率': perf['annualized_return'], '年化波动率': perf['annualized_volatility'],
            '夏普比率': perf['sharpe_ratio'], '最大回撤': perf['max_drawdown'],
            '累计收益率': perf['cumulative_return'], '年化超额收益率': perf['annualized_excess_return'],
            '信息比率': perf['information_ratio'], '跟踪误差': perf['tracking_error']
        })

    # --- 计算基准的绩效 ---
    final_returns_df = final_returns_df.with_columns(
        (1 + pl.col('benchmark_return')).cum_prod().alias('cumulative_benchmark_return')
    )
    benchmark_perf = final_returns_df.select(
        annualized_return = ((pl.col('cumulative_benchmark_return').last()) ** (12 / total_months) - 1),
        annualized_volatility = (pl.col('benchmark_return').std() * np.sqrt(12)),
        max_drawdown = (((pl.col('cumulative_benchmark_return') / pl.col('cumulative_benchmark_return').cum_max()) - 1).min())
    ).row(0, named=True)
    all_metrics.append({
        'group': '基准 (沪深300)',
        '年化收益率': benchmark_perf['annualized_return'], '年化波动率': benchmark_perf['annualized_volatility'],
        '夏普比率': (benchmark_perf['annualized_return'] - risk_free_rate) / benchmark_perf['annualized_volatility'] if benchmark_perf['annualized_volatility'] != 0 else 0,
        '最大回撤': benchmark_perf['max_drawdown'], '累计收益率': final_returns_df['cumulative_benchmark_return'].last() - 1,
        '年化超额收益率': None, '信息比率': None, '跟踪误差': None
    })

    performance_df = pl.DataFrame(all_metrics)

    final_returns_df.write_csv(returns_output_file, float_precision=6)
    print(f"\n各组月度收益率详情已保存至: {returns_output_file}")
    performance_df.write_csv(performance_output_file, float_precision=6)
    print(f"各组绩效指标已保存至: {performance_output_file}")
    print(f"\n--- {config['STRATEGY_NAME']} 绩效简报 ---")
    print(performance_df)


# =================================================================== #
#                          【3. 主函数执行】                          #
# =================================================================== #

def main(config):
    """主执行函数"""
    start_time = time.time()
    portfolio_returns_df, benchmark_df = run_volatility_split_backtest_polars(config)
    if portfolio_returns_df is not None:
        calculate_performance_and_save_polars(portfolio_returns_df, benchmark_df, config)
    end_time = time.time()
    print(f"\n--- 所有任务完成！总耗时: {end_time - start_time:.2f} 秒 ---")

# =================================================================== #
#                       【4. 脚本执行入口】                              #
# =================================================================== #

if __name__ == "__main__":

    # --- 定义 pbroe7.7 年度波动率分组策略回测的配置 ---
    CONFIG_VOLATILITY_SPLIT = {
        # --- 策略与输出配置 ---
        "STRATEGY_NAME": "pbroe7.6_annual_vol_split_backtest",
        "OUTPUT_DIR": Path("E:/PBROE/ch7pl/backtest_results_polars"),

        # --- 输入文件路径 ---
        "RESIDUAL_FILE": Path("E:/PBROE/ch7pl/pbroe7.1_residuals_and_quantiles_pure_polars.csv"),
        "RETURNS_FILE": Path("E:/PBROE/data/TRDNEW_Mnth.csv"),
        "BENCHMARK_FILE": Path("E:/PBROE/data/benchmark_indices.csv"),

        # --- 策略核心参数 ---
        "FACTOR_THRESHOLDS": {
            "residual_quantile_10m": 0.1,  # 时序残差分位数 <= 10%
            "residual_zscore": 0.2,        # 横截面残差 <= 20%
            "residual_adj": 0.2           # 调整后残差 <= 20%
        },

        # --- 通用回测参数 ---
        "BACKTEST_START_DATE": '2010-05-01', # 确保有足够的年度数据来计算初期的波动率
        "BACKTEST_END_DATE": '2025-04-30',
        "BENCHMARK_CODE": '000300',
        "RISK_FREE_RATE": 0.03
    }

    # --- 执行回测 ---
    main(CONFIG_VOLATILITY_SPLIT)


--- 步骤 1: 加载数据 (Polars) ---
所有数据加载成功。

--- 步骤 2: 构建投资组合 ---
已构建三因子交集组合，共 16092 条记录。
已将交集组合拆分为低波动组 (7368 条) 和高波动组 (7259 条)。
向量化回测完成，已生成 176 条月度收益记录。

--- 步骤 3: 计算并保存各组绩效 ---

各组月度收益率详情已保存至: E:\PBROE\ch7pl\backtest_results_polars\pbroe7.6_annual_vol_split_backtest_returns.csv
各组绩效指标已保存至: E:\PBROE\ch7pl\backtest_results_polars\pbroe7.6_annual_vol_split_backtest_performance.csv

--- pbroe7.6_annual_vol_split_backtest 绩效简报 ---
shape: (4, 9)
┌────────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬──────────┬──────────┐
│ group      ┆ 年化收益  ┆ 年化波动  ┆ 夏普比率  ┆ … ┆ 累计收益  ┆ 年化超额  ┆ 信息比率 ┆ 跟踪误差 │
│ ---        ┆ 率        ┆ 率        ┆ ---       ┆   ┆ 率        ┆ 收益率    ┆ ---      ┆ ---      │
│ str        ┆ ---       ┆ ---       ┆ f64       ┆   ┆ ---       ┆ ---       ┆ f64      ┆ f64      │
│            ┆ f64       ┆ f64       ┆           ┆   ┆ f64       ┆ f64       ┆          ┆          │
╞════════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪══════════╪