In [1]:
import tushare as ts
import pandas as pd
import numpy as np
import statsmodels.api as sm
from talib import abstract
import matplotlib.pyplot as plt

# 设置Tushare token (需替换为您的实际token)
ts.set_token('4e250e621fb484356a7b948649e3f1c73d9af358e9ce6a91c1e68a77')
pro = ts.pro_api()

# 获取沪深300和中证1000月度数据
def get_index_data():
    # 沪深300 (000300.SH)
    hs300 = pro.index_monthly(ts_code='000300.SH', 
                             start_date='20050101',
                             end_date='20240630',
                             fields='trade_date,close')
    hs300.rename(columns={'trade_date':'date', 'close':'hs300_close'}, inplace=True)
    hs300['date'] = pd.to_datetime(hs300['date'])
    
    # 中证1000 (000852.SH)
    csi1000 = pro.index_monthly(ts_code='000852.SH', 
                              start_date='20050101',
                              end_date='20240630',
                              fields='trade_date,close')
    csi1000.rename(columns={'trade_date':'date', 'close':'csi1000_close'}, inplace=True)
    csi1000['date'] = pd.to_datetime(csi1000['date'])
    
    # 合并数据
    df = pd.merge(hs300, csi1000, on='date', how='outer').sort_values('date')
    return df

# 获取宏观经济数据
def get_macro_data():
    # CPI同比数据
    cpi = pro.cn_cpi(start_m='200501', end_m='202406', 
                    fields='month,cpi_yoy')
    cpi.rename(columns={'month':'date', 'cpi_yoy':'cpi同比'}, inplace=True)
    cpi['date'] = pd.to_datetime(cpi['date'] + '01')
    
    # M2同比数据
    m2 = pro.cn_m(start_m='200501', end_m='202406',
                 fields='month,m2_yoy')
    m2.rename(columns={'month':'date', 'm2_yoy':'m2同比'}, inplace=True)
    m2['date'] = pd.to_datetime(m2['date'] + '01')
    
    # 合并数据
    macro = pd.merge(cpi, m2, on='date', how='outer')
    return macro

# 获取中证1000日线数据（用于MACD计算）
def get_daily_data():
    daily = pro.index_daily(ts_code='000852.SH',
                          start_date='20050101',
                          end_date='20240630',
                          fields='trade_date,close')
    daily.rename(columns={'trade_date':'date', 'close':'price'}, inplace=True)
    daily['date'] = pd.to_datetime(daily['date'])
    return daily

# 整合所有数据
def prepare_data():
    # 获取数据
    index_data = get_index_data()
    macro_data = get_macro_data()
    daily_data = get_daily_data()
    
    # 合并指数和宏观数据
    df = pd.merge(index_data, macro_data, on='date', how='left')
    
    # 计算月度收益率
    df['hs300_ret'] = df['hs300_close'].pct_change()
    df['csi1000_ret'] = df['csi1000_close'].pct_change()
    
    # 添加实际赢家标识
    df['actual_winner'] = np.where(df['csi1000_ret'] > df['hs300_ret'], 'csi1000', 'hs300')
    
    # 计算MACD指标
    daily_data = daily_data.set_index('date')
    macd = abstract.MACD(daily_data, fastperiod=12, slowperiod=26, signalperiod=9)
    monthly_macd = macd.resample('M').last()  # 取每月最后一天的MACD值
    monthly_macd['macd_diff'] = monthly_macd['macd'] - monthly_macd['macdsignal']
    monthly_macd = monthly_macd.reset_index()
    
    # 合并MACD数据
    df = pd.merge(df, monthly_macd[['date', 'macd_diff']], on='date', how='left')
    
    # 确保日期格式一致
    df['date'] = pd.to_datetime(df['date'])
    
    return df.dropna().reset_index(drop=True)

ModuleNotFoundError: No module named 'talib'

In [None]:
def calculate_factors(df):
    # 大小盘收益差 (SMB)
    df['smb'] = df['csi1000_ret'] - df['hs300_ret']
    
    # CPI环比增速 (滞后1期)
    df['cpi_环比'] = df['cpi同比'].pct_change().shift(1)
    
    # M2环比增速 (滞后1期)
    df['m2_环比'] = df['m2同比'].pct_change().shift(1)
    
    # MACD指标 (滞后1期)
    df['macd'] = df['macd_diff'].shift(1)
    
    # 处理无穷值
    df.replace([np.inf, -np.inf], np.nan, inplace=True)
    df.fillna(method='ffill', inplace=True)
    
    return df

In [None]:
def rolling_logit_predict(df, window=96):
    results = []
    
    for i in range(window, len(df)):
        # 准备训练数据
        train = df.iloc[i-window:i]
        X_train = train[['smb', 'cpi_环比', 'm2_环比', 'macd']]
        X_train = sm.add_constant(X_train)  # 添加常数项
        y_train = (train['actual_winner'] == 'csi1000').astype(int)  # 小盘赢家=1
        
        # 训练Logit模型
        try:
            model = sm.Logit(y_train, X_train).fit(disp=0, method='bfgs')
        except:
            # 处理收敛问题
            continue
        
        # 准备预测数据
        X_pred = df.iloc[i][['smb', 'cpi_环比', 'm2_环比', 'macd']].values
        X_pred = np.insert(X_pred, 0, 1)  # 添加常数项
        
        # 预测概率
        prob = model.predict(X_pred)[0]
        pred_winner = 'csi1000' if prob > 0.5 else 'hs300'
        
        # 存储结果
        results.append({
            'date': df.iloc[i]['date'],
            'pred_winner': pred_winner,
            'prob': prob,
            'actual_winner': df.iloc[i]['actual_winner'],
            'csi1000_ret': df.iloc[i]['csi1000_ret'],
            'hs300_ret': df.iloc[i]['hs300_ret']
        })
    
    return pd.DataFrame(results)

pandas.core.frame.DataFrame

In [None]:
def backtest_strategy(pred_df):
    # 初始化
    pred_df['strategy_ret'] = 0.0
    current_hold = None
    transaction_cost = 0.005  # 0.5%交易费用
    
    for i in range(len(pred_df)):
        date = pred_df.iloc[i]['date']
        pred_winner = pred_df.iloc[i]['pred_winner']
        
        # 首次持仓
        if current_hold is None:
            current_hold = pred_winner
            pred_df.loc[i, 'strategy_ret'] = pred_df.iloc[i][f'{pred_winner}_ret']
            continue
            
        # 检查是否切换持仓
        if pred_winner != current_hold:
            # 切换时扣除交易费用
            ret = pred_df.iloc[i][f'{pred_winner}_ret'] - transaction_cost
            current_hold = pred_winner
        else:
            ret = pred_df.iloc[i][f'{current_hold}_ret']
            
        pred_df.loc[i, 'strategy_ret'] = ret
    
    # 计算累计收益
    pred_df['strategy_cum'] = (1 + pred_df['strategy_ret']).cumprod() - 1
    pred_df['hs300_cum'] = (1 + pred_df['hs300_ret']).cumprod() - 1
    pred_df['csi1000_cum'] = (1 + pred_df['csi1000_ret']).cumprod() - 1
    
    return pred_df

def evaluate_performance(df):
    # 计算准确率
    accuracy = (df['pred_winner'] == df['actual_winner']).mean()
    
    # 计算年化收益率
    months = len(df)
    strategy_cagr = (1 + df['strategy_cum'].iloc[-1]) ** (12/months) - 1
    hs300_cagr = (1 + df['hs300_cum'].iloc[-1]) ** (12/months) - 1
    csi1000_cagr = (1 + df['csi1000_cum'].iloc[-1]) ** (12/months) - 1
    
    # 计算最大回撤
    cum_series = df['strategy_cum'] + 1
    peak = cum_series.cummax()
    drawdown = (cum_series - peak) / peak
    max_drawdown = drawdown.min()
    
    # 生成报告
    print(f"回测周期: {df['date'].iloc[0].strftime('%Y-%m')} 至 {df['date'].iloc[-1].strftime('%Y-%m')}")
    print(f"预测准确率: {accuracy:.2%}")
    print(f"策略累计收益: {df['strategy_cum'].iloc[-1]:.2%}")
    print(f"策略年化收益: {strategy_cagr:.2%}")
    print(f"沪深300年化收益: {hs300_cagr:.2%}")
    print(f"中证1000年化收益: {csi1000_cagr:.2%}")
    print(f"最大回撤: {max_drawdown:.2%}")
    
    # 绘制收益曲线
    plt.figure(figsize=(12, 6))
    plt.plot(df['date'], df['strategy_cum'], label='轮动策略')
    plt.plot(df['date'], df['hs300_cum'], label='沪深300', linestyle='--')
    plt.plot(df['date'], df['csi1000_cum'], label='中证1000', linestyle='--')
    plt.title('大小盘轮动策略收益对比')
    plt.ylabel('累计收益')
    plt.legend()
    plt.grid(True)
    plt.show()
    
    return {
        'accuracy': accuracy,
        'strategy_cum': df['strategy_cum'].iloc[-1],
        'strategy_cagr': strategy_cagr,
        'max_drawdown': max_drawdown
    }

In [None]:
# 主执行函数
def main():
    # 步骤1: 准备数据
    print("正在从Tushare获取数据...")
    raw_data = prepare_data()
    
    # 步骤2: 计算因子
    print("计算因子特征...")
    factor_data = calculate_factors(raw_data)
    
    # 步骤3: 滚动预测
    print("执行滚动预测...")
    prediction = rolling_logit_predict(factor_data, window=96)
    
    # 步骤4: 回测策略
    print("回测策略绩效...")
    backtest_results = backtest_strategy(prediction)
    
    # 步骤5: 绩效评估
    print("\n策略绩效报告:")
    performance = evaluate_performance(backtest_results)
    
    return backtest_results, performance

# 执行回测
if __name__ == "__main__":
    results_df, perf_metrics = main()
    
    # 保存结果到CSV
    results_df.to_csv('size_rotation_strategy_results.csv', index=False)
    print("结果已保存到 size_rotation_strategy_results.csv")

===== 开始执行大小盘风格轮动策略 =====

===== 获取大盘指数数据 =====


ModuleNotFoundError: No module named 'numpy._core'