# 机器学习策略演示

本notebook演示如何使用机器学习方法构建一个简单的交易策略。主要步骤包括：

1. 数据获取与预处理
2. 简单因子/特征构造
3. 目标变量（下期收益）的定义
4. 训练简单的线性回归模型
5. 策略回测
6. 使用Backtrader进行回测

## 0. 导入依赖包

In [None]:
import yfinance as yf
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta

# 设置显示选项
pd.set_option('display.float_format', lambda x: '%.4f' % x)
plt.style.use('seaborn')

# 设置中文显示
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False

## 1. 数据获取与预处理

我们获取TSLA过去5年的日线数据。

In [None]:
# 设定时间范围（从现在往前推5年）
end_date = datetime.now()
start_date = end_date - timedelta(days=5*365)

print(f"获取数据时间范围：{start_date.strftime('%Y-%m-%d')} 到 {end_date.strftime('%Y-%m-%d')}")

# 下载特斯拉数据
data = yf.download('TSLA', start=start_date, end=end_date)

print("\n数据概览：")
print(data.head())

print("\n数据基本信息：")
print(data.info())

# 绘制收盘价走势图
plt.figure(figsize=(15, 6))
plt.plot(data.index, data['Close'], label='TSLA收盘价')
plt.title('特斯拉股价走势')
plt.xlabel('日期')
plt.ylabel('价格（美元）')
plt.legend()
plt.grid(True)
plt.show()

## 2. 简单因子/特征构造

构建两个简单的因子：
1. 动量因子：过去5日涨跌幅
2. 成交量比值：最近5日均量vs最近10日均量

In [None]:
# 复制数据
df = data.copy()

# 动量因子: 过去5日涨跌幅
df['momentum_5'] = df['Close'] / df['Close'].shift(5) - 1

# 成交量因子: (最近5日平均成交量) / (最近10日平均成交量) - 1
df['vol_ratio'] = (df['Volume'].rolling(5).mean()) / (df['Volume'].rolling(10).mean()) - 1

# 查看新添加的列
print("因子数据预览：")
print(df[['Close', 'momentum_5', 'vol_ratio']].tail(10))

# 绘制因子分布图
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

sns.histplot(df['momentum_5'].dropna(), bins=50, ax=ax1)
ax1.set_title('动量因子分布')
ax1.set_xlabel('5日动量')

sns.histplot(df['vol_ratio'].dropna(), bins=50, ax=ax2)
ax2.set_title('成交量比值分布')
ax2.set_xlabel('成交量比值')

plt.tight_layout()
plt.show()

## 3. 目标变量的定义

定义下期1日收益率作为目标变量。

In [None]:
# 计算下期收益率
df['future_ret_1d'] = df['Close'].pct_change().shift(-1)

# 去掉NaN值
df.dropna(inplace=True)

print("添加目标变量后的数据预览：")
print(df[['Close', 'momentum_5', 'vol_ratio', 'future_ret_1d']].head(10))

# 绘制目标变量分布
plt.figure(figsize=(10, 5))
sns.histplot(df['future_ret_1d'], bins=50)
plt.title('下期收益率分布')
plt.xlabel('收益率')
plt.show()

# 计算因子与目标变量的相关性
corr = df[['momentum_5', 'vol_ratio', 'future_ret_1d']].corr()

plt.figure(figsize=(8, 6))
sns.heatmap(corr, annot=True, cmap='coolwarm', center=0)
plt.title('因子与目标变量相关性')
plt.show()

## 4. 划分训练集与测试集

按照时间顺序，使用前80%的数据作为训练集，后20%作为测试集。

In [None]:
# 计算分割点
split_idx = int(len(df) * 0.8)
split_date = df.index[split_idx]

train_data = df.iloc[:split_idx].copy()
test_data = df.iloc[split_idx:].copy()

print("训练集范围:", train_data.index.min(), "→", train_data.index.max())
print("测试集范围:", test_data.index.min(), "→", test_data.index.max())
print("\n训练集样本数:", len(train_data))
print("测试集样本数:", len(test_data))

# 可视化训练集和测试集的划分
plt.figure(figsize=(15, 6))
plt.plot(train_data.index, train_data['Close'], label='训练集', color='blue')
plt.plot(test_data.index, test_data['Close'], label='测试集', color='red')
plt.axvline(split_date, color='black', linestyle='--', label='划分点')
plt.title('训练集和测试集划分')
plt.xlabel('日期')
plt.ylabel('价格（美元）')
plt.legend()
plt.grid(True)
plt.show()

## 5. 训练线性回归模型

In [None]:
# 准备特征和目标变量
features = ['momentum_5', 'vol_ratio']
X_train = train_data[features].values
y_train = train_data['future_ret_1d'].values

X_test = test_data[features].values
y_test = test_data['future_ret_1d'].values

# 训练模型
model = LinearRegression()
model.fit(X_train, y_train)

# 预测
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)

# 评估
train_mse = mean_squared_error(y_train, y_pred_train)
test_mse = mean_squared_error(y_test, y_pred_test)

print("模型评估：")
print(f"训练集MSE: {train_mse:.8f}")
print(f"测试集MSE:  {test_mse:.8f}")
print("\n模型参数：")
for feature, coef in zip(features, model.coef_):
    print(f"{feature}: {coef:.6f}")
print(f"截距: {model.intercept_:.6f}")

# 绘制预测值vs实际值的散点图
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

ax1.scatter(y_train, y_pred_train, alpha=0.5)
ax1.plot([y_train.min(), y_train.max()], [y_train.min(), y_train.max()], 'r--')
ax1.set_title('训练集：预测值 vs 实际值')
ax1.set_xlabel('实际收益率')
ax1.set_ylabel('预测收益率')

ax2.scatter(y_test, y_pred_test, alpha=0.5)
ax2.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--')
ax2.set_title('测试集：预测值 vs 实际值')
ax2.set_xlabel('实际收益率')
ax2.set_ylabel('预测收益率')

plt.tight_layout()
plt.show()

## 6. 策略回测

在测试集上进行简单的策略回测。

In [None]:
# 在测试集上生成预测
test_data['pred_ret_1d'] = model.predict(test_data[features])

# 生成交易信号
test_data['signal'] = (test_data['pred_ret_1d'] > 0).astype(int)

# 计算策略收益
test_data['strategy_ret'] = test_data['signal'] * test_data['future_ret_1d']

# 计算累积收益
test_data['strategy_cum'] = (1 + test_data['strategy_ret']).cumprod()
test_data['buy_and_hold'] = test_data['Close'] / test_data['Close'].iloc[0]

# 计算策略评估指标
strategy_return = test_data['strategy_cum'].iloc[-1] - 1
bh_return = test_data['buy_and_hold'].iloc[-1] - 1

strategy_sharpe = np.sqrt(252) * test_data['strategy_ret'].mean() / test_data['strategy_ret'].std()
bh_sharpe = np.sqrt(252) * test_data['Close'].pct_change().mean() / test_data['Close'].pct_change().std()

strategy_drawdown = (test_data['strategy_cum'] / test_data['strategy_cum'].cummax() - 1).min()
bh_drawdown = (test_data['buy_and_hold'] / test_data['buy_and_hold'].cummax() - 1).min()

print("策略评估指标：")
print(f"策略总收益率: {strategy_return:.2%}")
print(f"买入持有收益率: {bh_return:.2%}")
print(f"\n策略夏普比率: {strategy_sharpe:.2f}")
print(f"买入持有夏普比率: {bh_sharpe:.2f}")
print(f"\n策略最大回撤: {strategy_drawdown:.2%}")
print(f"买入持有最大回撤: {bh_drawdown:.2%}")

# 绘制策略收益曲线
plt.figure(figsize=(15, 6))
plt.plot(test_data.index, test_data['strategy_cum'], label='策略收益')
plt.plot(test_data.index, test_data['buy_and_hold'], label='买入持有')
plt.title('策略收益 vs 买入持有')
plt.xlabel('日期')
plt.ylabel('累积收益')
plt.legend()
plt.grid(True)
plt.show()

# 绘制月度收益热力图
monthly_returns = test_data['strategy_ret'].groupby([test_data.index.year, test_data.index.month]).mean()
monthly_returns = monthly_returns.unstack()

plt.figure(figsize=(12, 6))
sns.heatmap(monthly_returns, annot=True, fmt='.2%', cmap='RdYlGn', center=0)
plt.title('策略月度收益热力图')
plt.show()

## 7. 使用Backtrader进行回测

In [None]:
import backtrader as bt

class MLFactorStrategy(bt.Strategy):
    params = (
        ('model', None),  # 预训练的模型
    )

    def __init__(self):
        self.model = self.p.model
        self.momentum_5 = bt.indicators.PercentChange(self.data.close, period=5)
        self.vol_5 = bt.indicators.SMA(self.data.volume, period=5)
        self.vol_10 = bt.indicators.SMA(self.data.volume, period=10)
        
    def next(self):
        # 计算因子值
        momentum = self.momentum_5[0]
        vol_ratio = (self.vol_5[0] / self.vol_10[0]) - 1 if self.vol_10[0] != 0 else 0
        
        # 构建特征向量
        X = [[momentum, vol_ratio]]
        pred_ret = self.model.predict(X)[0]
        
        # 交易逻辑
        if pred_ret > 0:
            if not self.position:
                self.buy()
        else:
            if self.position:
                self.close()

# 准备数据
data = bt.feeds.PandasData(
    dataname=test_data,
    datetime=None,  # 使用索引作为日期
    open=0,
    high=1,
    low=2,
    close=3,
    volume=5,
    openinterest=-1
)

# 初始化cerebro
cerebro = bt.Cerebro()
cerebro.adddata(data)
cerebro.addstrategy(MLFactorStrategy, model=model)

# 设置初始资金和手续费
initial_cash = 100000.0
cerebro.broker.setcash(initial_cash)
cerebro.broker.setcommission(commission=0.001)

# 添加分析器
cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')

# 运行回测
print(f"初始资金: ${initial_cash:.2f}")
results = cerebro.run()
final_value = cerebro.broker.getvalue()
print(f"最终资金: ${final_value:.2f}")
print(f"总收益率: {(final_value/initial_cash - 1):.2%}")

# 获取分析结果
strat = results[0]
print(f"\n夏普比率: {strat.analyzers.sharpe.get_analysis()['sharperatio']:.2f}")
print(f"最大回撤: {strat.analyzers.drawdown.get_analysis()['max']['drawdown']:.2%}")

# 绘制回测结果
cerebro.plot(style='candlestick')