In [None]:
import pandas as pd
import talib as ta
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import joblib
import logging
from backtesting import Backtest, Strategy
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor,VotingRegressor
from sklearn.metrics import mean_squared_error, r2_score
from xgboost import XGBRegressor
from sklearn.model_selection import GridSearchCV
DataFrame = pd.DataFrame
logging.basicConfig(level=logging.INFO)

In [None]:
def preprocess_data(data:DataFrame):
    data = data.sort_values(by='date')
    # 生成技术指标
    data['SMA'] = data['close'].rolling(window=20).mean()
    data['EMA20'] = data['close'].ewm(span=20, adjust=False).mean()
    data['MACD'] = data['close'].ewm(span=12, adjust=False).mean() - data['close'].ewm(span=26, adjust=False).mean()
    data['Signal'] = data['MACD'].ewm(span=9, adjust=False).mean()
    data['MACD_Histogram'] = data['MACD'] - data['Signal']
    data['RSI'] = ta.RSI(data['close'], timeperiod=14)
    data['UpperBB'],data['MiddleBB'],data['LowerBB'] = ta.BBANDS(data['close'], timeperiod=20)
    data['ATR'] = ta.ATR(data['high'], data['low'], data['close'], timeperiod=14)
    return data.dropna()

In [None]:
def create_features(data:DataFrame, lookahead = 5):
    # 特征工程
    for lag in [1, 3, 5]:
        data[f'return_lag{lag}'] = data['close'].pct_change(lag)
    # 添加技术指标交叉信号
    data['MACD_cross'] = np.where(data['MACD'] > data['Signal'], 1, -1)
    data['BB_width'] = (data['UpperBB'] - data['LowerBB']) / data['MiddleBB']
    # 未来收益率
    data['future_return'] = data['close'].pct_change(lookahead).shift(-lookahead)

    data['log_return'] = np.log(data['close']).diff()
    data['volatility_30'] = data['log_return'].rolling(30).std()
    # 新增行业相关性特征（示例）
    # 新增时间序列特征
    data['month'] = data.index.month
    data['day_of_week'] = data.index.dayofweek

    # 特征选择
    selected_features = ['RSI', 'MACD','Signal', 'volatility_30',  'UpperBB', 'MiddleBB', 'LowerBB', 'BB_width',
                        'return_lag1', 'return_lag3', 'month']
    # 确保时间对齐
    data = data.dropna().reset_index(drop=True)
    return  data[selected_features], data['future_return']

In [None]:
def train_model(X, y):
    """时间序列分割"""
    tscv = TimeSeriesSplit(n_splits=5, test_size=21)
    results = {}
    for fold, (train_index, test_index) in enumerate(tscv.split(X)):
        X_train, X_test = X.iloc[train_index], X.iloc[test_index]
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]
        pipeline = Pipeline([
            ('scaler', StandardScaler()),
            ('regressor', RandomForestRegressor(
                n_estimators=200,
                max_depth=8,
                random_state=42
            ))
        ])
        # 训练模型
        pipeline.fit(X_train, y_train)
        results[fold] = {
            "model": pipeline,
            "test_index": test_index,
            "train_score": pipeline.score(X_train, y_train),
            "test_score": pipeline.score(X_test, y_test)
        }
    """Train R2 和 Test R2: 这些值指的是模型在训练集和测试集上的决定系数（R平方）。
    R平方是一个统计量，用来表示模型对数据的拟合程度，其值范围从负无穷到1。一个R平方值越接近于1，
    意味着模型解释了更多的方差，拟合效果越好。然而，如果R平方过高（尤其是训练集上），可能也暗示着过拟合的风险。"""
    return results

In [None]:
def calculate_metrics(returns,trade_count, benchmark):
    metrics = {
        '年化收益率': returns.mean() * 252,
        '波动率': returns.std() * np.sqrt(252),
        '夏普比率': returns.mean() / returns.std() * np.sqrt(252),
        '最大回撤': (returns.cumsum() - returns.cumsum().cummax()).min(),
        '胜率': (returns > 0).mean(),
        '盈亏比': returns[returns > 0].mean() / abs(returns[returns < 0].mean()),
        '换手率': trade_count / len(returns)
    }
    return metrics

In [None]:
def backtest(results, X, y, initial_capital=100000):
    if not X.index.equals(y.index):
        raise ValueError("特征数据与标签数据索引不匹配")
    all_preds = []
    all_dates = []
    # valid_folds = []
    # for fold in results:
    #     test_index = results[fold]["test_index"]
    #     if test_index[-1] < len(X):  # 验证索引有效性
    #         valid_folds.append(fold)
    for fold in results:
        model = results[fold]["model"]
         # 获取该fold的测试集索引（需在训练时保存）
        test_index = results[fold]["test_index"]  # 新增test_index保存
        # 确保输入为DataFrame
        X_test = X.iloc[test_index].to_frame().T if isinstance(X.iloc[test_index], pd.Series) else X.iloc[test_index]
        # test_index = model.named_steps['regressor'].feature_importances_.argmax()  # 获取重要特征
        preds = model.predict(X_test)
        all_preds.extend(preds)
        all_dates.extend(X.iloc[test_index].index)
    # 生成预测信号
    logging.info(f"Backtesting on {len(all_preds)} predictions")
    signals = pd.Series(all_preds, index=all_dates).sort_index().rolling(5).mean()  # 5日平滑信号
    #使用滚动分位数确定阈值
    signals = signals.rolling(window=21).apply(
        lambda x: (x[-1] - x.mean()) / x.std()
    )
    # 对齐目标变量
    aligned_y = y.reindex(signals.index)
    # 信号生成（添加容错机制）
    valid_signals = signals.dropna()
    signals = np.where(valid_signals > 0.015, 1, 
                      np.where(valid_signals < -0.015, -1, 0))
    # 计算交易次数
    trade_count = np.abs(np.diff(valid_signals , prepend=0)).sum()

    transaction_cost = trade_count * 0.0005  # 假设0.05%交易成本
    # 计算策略收益
    returns = aligned_y.iloc[:len(signals)].values * signals - transaction_cost

    cumulative_returns = pd.Series(
        (1 + returns).cumprod(), 
        index=aligned_y.iloc[:len(signals)].index
    )
    # 基准收益（买入持有）
    # 基准收益计算修正
    benchmark = (1 + y.reindex(cumulative_returns.index)).cumprod()
    
    # 回测指标 Sharpe Ratio (夏普比率)
    """夏普比率是用来衡量每单位总风险获得的超额回报率的指标。它帮助投资者了解他们是否因承担额外风险而获得了相应的回报。
    夏普比率为正且越高越好，说明该投资相对于其风险提供了更好的回报。"""
    sharpe_ratio = np.sqrt(252) * returns.mean() / returns.std()
    # 回测指标 Max Drawdown (最大回撤)
    """最大回撤是指在某个时间段内，从最高点开始到最低点结束的最大损失。
    它衡量了投资者在某个时间段内的最大损失率。它是衡量投资风险的一个重要指标，显示了投资期间可能出现的最大资金缩水比例。"""
    max_drawdown = (cumulative_returns / cumulative_returns.cummax() - 1).min()
    daily_returns = pd.Series(returns, index=cumulative_returns.index)
    metrics = calculate_metrics(daily_returns, trade_count,
                              benchmark.pct_change().dropna())
    
    return {
        'signals': pd.Series(signals, index=aligned_y.iloc[:len(signals)].index),  # 新增信号数据
        'cumulative': cumulative_returns,
        'benchmark': benchmark,
        'metrics': metrics,
        'returns': daily_returns,
        'trades': trade_count,
        'max_drawdown': max_drawdown,
        'sharpe_ratio':sharpe_ratio
    }
    # return cumulative_returns, benchmark, sharpe_ratio, max_drawdown

In [None]:
# 模型部署模块
def deploy_model(results, model_name='stock_predictor.pkl'):
    ensemble = VotingRegressor(
        [(f'model_{k}', v['model']) for k, v in results.items()]
    )
    joblib.dump(ensemble, model_name)

from backtesting.test import GOOG

def walk_forward_validation(data, n_splits=5):
    data = data.reset_index(drop=True)  # 重置索引确保连续性
    min_size = len(data) // n_splits
    if min_size < 30:  # 确保最小数据量
        raise ValueError(f"数据集太小({len(data)})，无法进行{n_splits}次分割")
    wfa_results = []
    for i in range(n_splits):
        train = data.iloc[:int(len(data)*(i+1)/n_splits)]
        test = data.iloc[int(len(data)*i/n_splits):int(len(data)*(i+1)/n_splits)]
        
        # 训练模型
        features_train, target_train = create_features(train)
        model = train_model(features_train, target_train)
        
        # 回测验证
        features_test, target_test = create_features(test)
        results = backtest(model, features_test, target_test)
        
        wfa_results.append(results)
    return wfa_results

In [None]:
# 可视化模块
def visualize_results(cumulative, benchmark):
    plt.figure(figsize=(12,6))
    cumulative.plot(label='Strategy')
    benchmark.plot(label='Buy & Hold')
    plt.title('Strategy vs Benchmark Performance')
    plt.xlabel('Date')
    plt.ylabel('Return')
    plt.legend()
    plt.show()
def visualize_advanced(results):
    plt.figure(figsize=(15,10))
    
    # 收益分布直方图
    plt.subplot(2,2,1)
    sns.histplot(results['returns'], kde=True)
    plt.title('收益分布')
    
    # 月度收益热力图
    plt.subplot(2,2,2)
    monthly_returns = results['returns'].resample('M').sum()
    sns.heatmap(monthly_returns.unstack(), annot=True, fmt=".1%")
    plt.title('月度收益热力图')
    
    # 滚动夏普比率
    plt.subplot(2,2,3)
    rolling_sharpe = results['returns'].rolling(63).mean() / results['returns'].rolling(63).std() * np.sqrt(252)
    rolling_sharpe.plot()
    plt.title('滚动夏普比率')
    
    # 持仓时间分布
    plt.subplot(2,2,4)
    results['positions'].plot(kind='hist', bins=20)
    plt.title('持仓周期分布')

In [None]:
if __name__ == '__main__':
    data = pd.read_csv('stock.csv', index_col='date',parse_dates=True)
    data = preprocess_data(data)
    data = data.sort_values(by='date', ascending=True)
    features, target = create_features(data)
    cv_results = train_model(features, target)
    # 显示交叉验证结果
    for fold in cv_results:
        print(f"Fold {fold}: Train R2={cv_results[fold]['train_score']:.3f}, Test R2={cv_results[fold]['test_score']:.3f}")
    # 集成回测
    results = backtest(cv_results, features, target)
    # 可视化
    visualize_results(results['cumulative'], results['benchmark'])
    # 高级分析可视化（需要补充持仓时间数据）
    position_changes = results['signals'].diff().ne(0)
    results['positions'] = position_changes.cumsum().value_counts()
    visualize_advanced({
        'returns': results['returns'],
        'positions': results['positions']
    })
    # 打印关键指标
    print("策略绩效指标：")
    for k, v in results['metrics'].items():
        print(f"{k}: {v:.2f}" if isinstance(v, float) else f"{k}: {v}")
    # 展示各期指标对比
    # 分析各期验证结果
    # 原回测代码替换为：
    wfa_results = walk_forward_validation(data, n_splits=5)
    all_metrics = []
    for i, wfa_result in enumerate(wfa_results):
        cumulative, benchmark, sharpe, drawdown = wfa_result
        metrics = calculate_metrics(cumulative.pct_change().dropna(), 
                                  benchmark.pct_change().dropna())
        metrics['period'] = f"Window {i+1}"
        all_metrics.append(metrics)
    pd.DataFrame(all_metrics).set_index('period').plot(kind='bar', subplots=True)
    # 部署集成模型
    # deploy_model(cv_results)