In [2]:
%reload_ext autoreload
%autoreload 2
from fun import *
import polars as pl
import pandas as pd
import datetime as dt
import time 
import logging

logging = get_logger(log_file='回测.log')
start_date = dt.date(2021,1,1)
end_date = dt.datetime.today()

# 获取指定日期的日线数据
stock_data = read_day_data(start_date=start_date,end_date=end_date,file_path='ts_stock_all_data')
stock_data = stock_data.drop_nulls(subset=['open','close','pre_close','limit_up','limit_down']).sort(['code','trading_date'])
market_value = read_day_data(start_date=start_date,end_date=end_date,file_path='ts_daily_basic')
market_value = market_value.with_columns([
   ( pl.col('free_share')*pl.col('close')/1e4).alias('free_float_mv')
])
stock_data = stock_data.join(market_value.select(['code','trading_date','free_float_mv']),on=['code','trading_date'],how='left')
#stock_data.schema

In [3]:
# 1.涨停标记(返回按照code,trading_date排序)
# 标记涨停状态：limit_status
stock_data = mark_limit_status(stock_data,db_days=3) #三天之内都算断板
# 标记涨停描述：limit_desc
stock_data = mark_limit_desc(stock_data)
# 记录最近的一次涨停描述：last_limit_desc
stock_data = mark_last_limit_desc(stock_data)

# 2.均线特征
# 计算均线:sma_{window}
stock_data = add_sma(stock_data, window=5)
stock_data = add_sma(stock_data, window=7)
"""
#stock_data = add_ema(stock_data, window=4)
#stock_data = add_ema(stock_data, window=7)
#stock_data = add_ewma_volatility(stock_data, window=8)
"""
# 3.计算开盘涨幅:open_chg,乖离率,成交均价vwap
stock_data = stock_data.with_columns(
    (
        (pl.col("open") - pl.col("pre_close")) 
        / pl.col("pre_close") 
        * 100
    ).alias("open_pct"),  # 开盘涨幅百分比
    ((pl.col("close") - pl.col("sma_7")) / pl.col("sma_7") * 100).alias("close_sma7_pct"), #乖离率
    (pl.col("amount")*100 / pl.col("volume")).alias("vwap"),
    ((pl.col("low") <= pl.col("limit_down")*1.015)).alias("touch_limit_down"), # 是否触及跌停
)
stock_data = cal_n_lowest(stock_data)
stock_data = add_pre_n_ratio(stock_data,field='volume',n=5)


# 3.k线特征
# 计算量比,k线特征
#stock_data = cal_kline_pattern_features(stock_data)


# 筛选stock_data行 为买入信号 1. 昨日涨停or断板or炸板   2.今日低开-3%至-4%  3.昨日收盘在昨日日五线上   4. 最近一次涨停描述 !=一天一板 或者 非空
# 1. 先确保数据按股票和日期排序
stock_data = stock_data.sort(["code", "trading_date"])

# 2. 在每个股票组内计算移位数据（关键步骤）
stock_data = stock_data.with_columns([
    # 同一股票内的前一天涨停状态
    pl.col("limit_status").shift(1).over("code").alias("prev_limit_status"),
    # 同一股票内的前一天5日均线
    pl.col("sma_7").shift(1).over("code").alias("prev_sma_7"),
    #pl.col("ema_7").shift(1).over("code").alias("prev_ema_7"),
    #pl.col("volume_ratio_5").shift(1).over("code").alias("pre_volume_ratio_5"),
    pl.col("pct").shift(1).over("code").alias("pre_pct"),
    pl.col("vwap").shift(1).over("code").alias("pre_vwap"),
    #pl.col("ewma_volatility_8").shift(1).over("code").alias("pre_ewma_volatility_8"),
    pl.col("close_sma7_pct").shift(1).over("code").alias("pre_close_sma7_pct"),
    
    # k线特征
    # pl.col("body_ratio").shift(1).over("code").alias("pre_body_ratio"),
    # pl.col("upper_shadow_ratio").shift(1).over("code").alias("pre_upper_shadow_ratio"),
    # pl.col("lower_shadow_ratio").shift(1).over("code").alias("pre_lower_shadow_ratio"),
    # pl.col("candle_direction").shift(1).over("code").alias("pre_candle_direction")
])


回测信号参数: {'low': -5, 'high': -2.5, 'mv_min': 35, 'mv_max': 1000, 'prev_limit_status': ['断板', '炸板']}


N字战法:
1. 低位板(m天n板,n<4)放量涨停
2. 缩量断板回调天数大于等于2天,不超过5天


In [None]:
# 信号构建
# 基于组内移位后的数据筛选买入信号
# 条件参数字典
params_dict={
    'high':-2.5,
    'low':-5,
    'mv_min':35,
    'mv_max':1000,
    'prev_limit_status':['断板','炸板']
}

信号文件 = stock_data.filter(
    # 非st,创业,科创
    ~(pl.col("type").is_not_null() & (pl.col("type") == "ST")) &
    ~(pl.col("code").str.split(".").list[1].str.starts_with("30") | pl.col("code").str.split(".").list[1].str.starts_with("688")) &

    # 1. 昨日or断板or炸板（使用组内移位后的数据）
    (pl.col("prev_limit_status").is_in(params_dict['prev_limit_status'])) &
    
    # 2. 今日低开-3%至-4%
    (pl.col("open_pct") >= params_dict["low"]) & (pl.col("open_pct") <= params_dict["high"]) &
    
    # 3. 昨日收盘在昨日5日均线上
    (pl.col("pre_close") >= pl.col("prev_sma_7")) &
    
    # 4. 最近一次涨停描述 != 一天一板 或者 空缺
    (
        (pl.col("last_limit_desc") != "1天1板") &
        #(pl.col("last_limit_desc") == "2天1板") 
        (pl.col("last_limit_desc").is_not_null())
    ) &

    # 5. 自由流通值大于
    ((pl.col("free_float_mv") >= params_dict["mv_min"]) & (pl.col("free_float_mv") <= params_dict["mv_max"])) 

    # 6. 绝对位置不能太高，不能触发严重异动
    & ((pl.col("open")/pl.col("lowest_30")) <=3 )
    # 均线偏离度大于11或小于9
    #& ((pl.col("pre_close_sma7_pct") <= 9))
)
    

logging.info(f"回测信号参数: {params_dict}")

In [None]:
from trade_fun import *
start_date_str = start_date.strftime("%Y-%m-%d")
end_date_str = end_date.strftime("%Y-%m-%d")
#end_date_str = '2025-11-17'

logging.info(f"回测时间区间: {start_date_str} 至 {end_date_str}")
result_df,merged_df = cal_trade_info(信号文件,trade_fun=trade,start_date=start_date_str,end_date=end_date_str)


In [3]:
import tinyshare as tns
ts_token = 'YzAEH11Yc7jZCHjeJa63fnbpSt3k9Je3GvWn0390oiBKO95bVJjP7u5L34e2ff6b'
ts =tns.pro_api(ts_token)
a=ts.index_basic(category='策略指数')

In [None]:
def report_backtest_full(
    result_df: pd.DataFrame,
    start_date,
    end_date,
    profit_col: str = 'profit',
    buy_date_col:str = 'buy_time',
    sell_date_col: str = 'buy_time',
    holding_days_col:str = 'holding_days',
    benchmark_code: str = "399300.SZ",
    risk_free_rate: float = 0.02,
    return_method = 'compound',
    plot = True,
    second_y = True
):
    """
    result_df 交割单:包括策略的 1.卖出信息 2.利润信息 的交割单
    回测结果汇报函数（含净值曲线、最大回撤、夏普比率、超额收益等）
    """
    from fun import get_logger
    logging = get_logger(log_file='回测.log')

    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots
    import tinyshare as tns
    from mapping import convert_code_format,clean_stocks_data
    ts_token = 'YzAEH11Yc7jZCHjeJa63fnbpSt3k9Je3GvWn0390oiBKO95bVJjP7u5L34e2ff6b'
    ts =tns.pro_api(ts_token)

    # 1. 策略净值曲线计算
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)
    result_df[sell_date_col] = pd.to_datetime(result_df[sell_date_col])
    result_df = result_df[
        (result_df[sell_date_col] >= start_date) & 
        (result_df[sell_date_col] <= end_date)
    ]
    result_df = result_df.sort_values(sell_date_col).reset_index(drop=True)
    result_df['sell_date_date'] = pd.to_datetime(result_df[sell_date_col]).dt.date
    result_df['buy_date_date'] = pd.to_datetime(result_df[buy_date_col]).dt.date # 提取买入日期（仅日期部分）
    daily_returns = result_df.groupby('sell_date_date')[profit_col].mean() / 100  # 转为小数
    net_values = (1 + daily_returns).cumprod()
    if return_method!='compound':
        net_values = 1 + daily_returns.cumsum()  # 单利：初始净值1 + 每日收益累计和
    strategy_curve = net_values.copy()
    #strategy_curve.index = pd.to_datetime(strategy_curve.index).normalize()

    # 2. 获取指数净值曲线
    index_data = ts.index_daily(ts_code=convert_code_format(benchmark_code,format='suffix'), start_date=start_date.strftime('%Y%m%d'), end_date=end_date.strftime('%Y%m%d'))
    index_data = clean_stocks_data(index_data)
    index_df = index_data
    index_df['trading_date_date'] = pd.to_datetime(index_df['trading_date']).dt.date
    index_df = index_df.sort_values('trading_date_date').reset_index(drop=True)
    if not index_df.empty:
        if 'pct' not in index_df.columns:
            index_df['pct'] = index_df['close'].pct_change()
        index_df['net_value'] = (1 + index_df['pct']/100).cumprod()
        index_curve = index_df.set_index('trading_date_date')['net_value']
    else:
        raise ValueError("未获取到有效的指数数据")

    # 3. 对齐日期
    strategy_curve = strategy_curve.sort_index()
    index_curve = index_curve.sort_index()
    strategy_curve = strategy_curve.reindex(index_curve.index, method='ffill').fillna(1)

    # 4. 核心指标
    total_return = strategy_curve.iloc[-1] - 1 if len(strategy_curve) > 0 else 0
    if len(strategy_curve) >= 2:
        first_date = strategy_curve.index[0]
        last_date = strategy_curve.index[-1]
        total_days = (last_date - first_date).days
        years = total_days / 365
    else:
        years = 0
    annualized_return = (strategy_curve.iloc[-1]) ** (1 / years) - 1 if years > 0 and strategy_curve.iloc[-1] > 0 else 0

    # 最大回撤
    roll_max = strategy_curve.cummax()
    drawdown = (strategy_curve - roll_max) / roll_max
    max_drawdown = drawdown.min()
    # 找到最大回撤的开始和结束时间
    max_drawdown_end = drawdown.idxmin()
    # 找到最大回撤开始时间（即之前的最高点）
    max_drawdown_start = roll_max.loc[:max_drawdown_end].idxmax()

    # 夏普比率
    daily_ret = strategy_curve.pct_change().dropna()
    daily_drawdown = daily_ret.where(daily_ret < 0, 0)
    rf_daily = risk_free_rate / 252
    excess_daily = daily_ret - rf_daily
    sharpe_ratio = (excess_daily.mean() / excess_daily.std()) * np.sqrt(252) if excess_daily.std() > 0 else 0

    # 超额收益
    bench_years = (index_curve.index[-1] - index_curve.index[0]).days / 365 if len(index_curve) > 1 else 0
    bench_annual = (index_curve.iloc[-1]) ** (1 / bench_years) - 1 if bench_years > 0 else 0
    excess_return = annualized_return - bench_annual

    # 胜率和盈亏比
    # 筛选出结果不等于0的记录（排除盈亏平衡的情况）
    non_zero_df = result_df[result_df[profit_col] != 0]

    # 计算赢率：大于0的笔数 / 不等于0的笔数
    win_rate = (non_zero_df[profit_col] > 0).mean()
    avg_win = result_df[result_df[profit_col] > 0][profit_col].mean()
    avg_loss = abs(result_df[result_df[profit_col] < 0][profit_col].mean())
    profit_loss_ratio = avg_win / avg_loss if avg_loss != 0 else float('inf')

    # 平均开仓个数
    daily_buy_count = result_df.groupby('buy_date_date')['code'].nunique()
    # 计算平均值（排除无买入的日期，仅统计有交易的日期）
    avg_daily_buy_count = daily_buy_count.mean() if not daily_buy_count.empty else 0

    # 平均持仓天数
    valid_holding_days = result_df[
        result_df[holding_days_col].notna() &  # 排除空值
        (result_df[holding_days_col] > 0)      # 排除0或负数（异常数据）
    ][holding_days_col]
    avg_holding_days = valid_holding_days.mean() if not valid_holding_days.empty else 0

    # 8. 将结果整理成DataFrame并返回
    metrics_df = pd.DataFrame({
        '指标名称': [
            '回测开始日期', '回测结束日期', '策略胜率', '策略盈亏比',
            '每单位风险期望收益', '策略总收益率', '策略年化收益率',
            '最大回撤', '最大回撤开始日期', '最大回撤结束日期',
            '夏普比率', '策略超额年化收益率',
            '最终净值','每日平均买入股票个数', '平均持仓天数',
        ],
        '指标值': [
            first_date, last_date, f"{win_rate:.2%}",
            f"{profit_loss_ratio:.2f}", f"{win_rate*(profit_loss_ratio+1) - 1 :.4f}",
            f"{total_return:.2%}", f"{annualized_return:.2%}", f"{max_drawdown:.2%}",
            max_drawdown_start, max_drawdown_end,
            f"{sharpe_ratio:.2f}", f"{excess_return:.2%}", f"{strategy_curve.iloc[-1]:.4f}",
            f"{avg_daily_buy_count:.2f}", f"{avg_holding_days:.2f} 天"
        ]
    })

    if not plot:
        return metrics_df
    # 5. 输出结果
    logging.info(
        f"\n回测时间:{strategy_curve.index[0]} - {strategy_curve.index[-1]}\n"
        f"策略胜率: {win_rate:.2%}\n"
        f"策略盈亏比: {profit_loss_ratio:.2f}\n"
        f"每日平均开仓个数: {avg_daily_buy_count:.2f}\n"
        f"平均持仓天数: {avg_holding_days:.2f} 天\n"
        f"每单位风险期望收益:{win_rate*(profit_loss_ratio+1) -1 :.4f}\n"
        f"策略总收益率: {total_return:.2%}\n"
        f"策略年化收益率: {annualized_return:.2%}\n"
        f"最大回撤: {max_drawdown:.2%}\n"
        f"最大回撤阶段: {max_drawdown_start} 至 {max_drawdown_end}\n"
        f"夏普比率: {sharpe_ratio:.2f}\n"
        f"策略超额年化收益率: {excess_return:.2%}\n"
        f"最终净值: {strategy_curve.iloc[-1]:.4f}"
    )

    
    # 6. 绘制净值曲线
    plt.figure(figsize=(14, 7))
    plt.plot(strategy_curve.index, strategy_curve.values, label='Strategy Net Value')
    plt.plot(index_curve.index, index_curve.values, label='Index Net Value')
    plt.xlabel('Date')
    plt.ylabel('Net Value')
    plt.title('Strategy Net Value vs Index Net Value')
    plt.legend()
    plt.grid()
    plt.show()
    
    # 7. 使用plotly绘制可交互净值曲线（新增回撤直方图）
    # 创建图形 - 修改为3个子图：净值曲线、收益直方图、回撤直方图
    fig = make_subplots(
        rows=3, cols=1,
        shared_xaxes=True,
        vertical_spacing=0.08,
        subplot_titles=('净值曲线对比', '策略每日收益率', '策略每日回撤'),
        specs=[
            [{"secondary_y": True}],  # 净值曲线（双Y轴）
            [{"secondary_y": False}], # 收益直方图
            [{"secondary_y": False}]  # 回撤直方图
        ],
        row_heights=[0.5, 0.25, 0.13]  # 调整子图高度比例
    )

    # 7.1 添加策略净值曲线（左Y轴）
    fig.add_trace(
        go.Scatter(
            x=strategy_curve.index, 
            y=strategy_curve.values, 
            name='策略净值',
            line=dict(color='#1f77b4', width=2),
            hovertemplate='日期: %{x}<br>策略净值: %{y:.4f}<extra></extra>'
        ),
        row=1, col=1,
        secondary_y=False
    )

    # 7.2 添加指数净值曲线（右Y轴）
    fig.add_trace(
        go.Scatter(
            x=index_curve.index, 
            y=index_curve.values, 
            name='指数净值',
            line=dict(color='#ff7f0e', width=2),
            hovertemplate='日期: %{x}<br>指数净值: %{y:.4f}<extra></extra>'
        ),
        row=1, col=1,
        secondary_y=second_y
    )

    # 7.3 添加每日收益率直方图
    # 准备收益率数据和颜色
    ret_colors = ['#d62728' if x > 0 else '#2ca02c' for x in daily_ret.values]
    
    fig.add_trace(
        go.Bar(
            x=daily_ret.index, 
            y=daily_ret.values, 
            name='策略每日收益率',
            marker_color=ret_colors,
            hovertemplate='日期: %{x}<br>收益率: %{y:.2%}<extra></extra>'
        ),
        row=2, col=1
    )

    # 7.4 添加每日回撤直方图
    fig.add_trace(
        go.Bar(
            x=daily_drawdown.index, 
            y=daily_drawdown.values, 
            name='策略每日回撤',
            marker_color='#2ca02c',  # 绿色
            hovertemplate='日期: %{x}<br>回撤率: %{y:.2%}<extra></extra>'
        ),
        row=3, col=1
    )

    # 7.5 更新布局
    fig.update_layout(
        height=800,  # 增加高度以容纳3个子图
        title_text="回测结果可视化",
        title_font=dict(size=16, weight='bold'),
        hovermode="x unified",
        legend=dict(
            orientation="h",
            yanchor="bottom",
            y=1.02,
            xanchor="right",
            x=1,
            font=dict(size=12)
        ),
        plot_bgcolor='rgba(248,248,248,1)',  # 浅灰色背景
        paper_bgcolor='white'
    )

    # 7.6 更新轴标签和样式
    # 净值曲线轴标签
    fig.update_yaxes(title_text="策略净值", secondary_y=False, row=1, col=1, 
                     title_font=dict(size=12), tickfont=dict(size=10))
    fig.update_yaxes(title_text="指数净值", secondary_y=True, row=1, col=1,
                     title_font=dict(size=12), tickfont=dict(size=10))
    
    # 收益率轴标签
    fig.update_yaxes(title_text="收益率", row=2, col=1,
                     title_font=dict(size=12), tickfont=dict(size=10),
                     tickformat='.2%')  # 百分比格式
    
    # 回撤轴标签
    fig.update_yaxes(title_text="回撤率", row=3, col=1,
                     title_font=dict(size=12), tickfont=dict(size=10),
                     tickformat='.2%')  # 百分比格式
    
    # X轴样式
    fig.update_xaxes(
        title_text="日期", row=3, col=1,
        tickfont=dict(size=10, family='Arial', color='gray'),
        #tickangle=-45,
        title_font=dict(size=12)
    )

    # 7.7 添加网格线
    fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(200,200,200,0.2)', row=1)
    fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(200,200,200,0.2)', row=2)
    fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(200,200,200,0.2)', row=3)
    fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(200,200,200,0.2)', row=1)
    fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(200,200,200,0.2)', row=2)
    fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(200,200,200,0.2)', row=3)

    # 显示图形
    fig.show()
    
    return metrics_df

from fun import *
from trade_fun import adjust_weight_by_near_n # 触及跌停调仓

print(pl.__version__)
#print(type(merged_df))
# 构建文件名
filename = f"信号文件/断板低开{params_dict['low']}-{params_dict['high']} {dt.datetime.now().strftime('%Y%m%d_%H%M%S')}(sma7).csv"

# 仓位控制风控(merged_df_with_weight)
merged_df_with_weight = adjust_weight_by_near_n(merged_df)
#merged_df_with_weight.write_csv(filename, include_bom=True)
merged_df_with_weight = merged_df_with_weight.with_columns(
    (pl.col("profit") * pl.col("weight")).alias("weight_profit")
)

# 买点下移风控(merged_df_with_weight_adjust)
rate_move = 0.02
# 增加买点下移的逻辑（当weight=0时,如果low<=open*(1-rate_move)时,买点下移rate_move（也就是profit+rate_move*100），否则profit=0）
merged_df_with_weight_adjust = merged_df_with_weight.with_columns(
    pl.when(pl.col("weight") != 0)
    # 条件1：weight≠0 → 保持原profit不变
    .then(pl.col("profit"))
    # 条件2：weight==0 → 进一步判断low是否满足下移条件
    .otherwise(
        pl.when(pl.col("low") <= pl.col("open") * (1 - rate_move))
        # 子条件1：满足下移 → profit+rate_move*100
        .then(pl.col("profit") + rate_move * 100)
        # 子条件2：不满足下移 → profit=0
        .otherwise(0)
    ).alias("profit")  # 覆盖原profit列
) #买点下移回测
merged_df_with_weight_adjust = merged_df_with_weight_adjust.with_columns(
    (pl.col("profit") *0.4).alias("weight_profit")
)

result_df['weight_profit'] = result_df['profit'] *0.4

merged_df_with_weight_pd = merged_df_with_weight.to_pandas() # merged_df_with_weight_pd:买点下移结果
merged_df_with_weight_pd = cal_industry_concentration(merged_df_with_weight_pd, window=3)
# 回测结果汇报
back_result =report_backtest_full(merged_df_with_weight_pd, start_date = '2025-01-01', end_date='2025-12-20',profit_col='weight_profit',return_method='simple')
