# 03 - 策略原型开发 (Strategy Prototype)

本 Notebook 用于快速验证策略想法，在正式编写回测代码前进行概念验证。

内容包括：
1. 策略逻辑设计
2. 信号生成与可视化
3. 简易收益计算
4. 参数敏感性分析
5. 策略优化方向

## 1. 环境准备

In [None]:
# 导入必要的库
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
from datetime import datetime, timedelta
from itertools import product

# 设置中文显示
matplotlib.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei']
matplotlib.rcParams['axes.unicode_minus'] = False

# 设置图表样式
plt.style.use('seaborn-v0_8-whitegrid')

# 掘金 SDK
from gm.api import *

print("Environment ready!")

In [None]:
# 设置 Token
TOKEN = 'your_token_here'
set_token(TOKEN)

print("Token set successfully!")

## 2. 指标计算工具

In [None]:
# ==============================================================================
# 技术指标函数
# ==============================================================================

def sma(series, period):
    return series.rolling(window=period).mean()

def ema(series, period):
    return series.ewm(span=period, adjust=False).mean()

def macd(close, fast=12, slow=26, signal=9):
    exp1 = ema(close, fast)
    exp2 = ema(close, slow)
    dif = exp1 - exp2
    dea = ema(dif, signal)
    histogram = 2 * (dif - dea)
    return dif, dea, histogram

def rsi(close, period=14):
    delta = close.diff()
    gain = delta.where(delta > 0, 0)
    loss = (-delta).where(delta < 0, 0)
    avg_gain = gain.ewm(span=period, adjust=False).mean()
    avg_loss = loss.ewm(span=period, adjust=False).mean()
    rs = avg_gain / avg_loss
    return 100 - (100 / (1 + rs))

def bollinger_bands(close, period=20, std_dev=2):
    middle = sma(close, period)
    std = close.rolling(window=period).std()
    upper = middle + std_dev * std
    lower = middle - std_dev * std
    return upper, middle, lower

def atr(high, low, close, period=14):
    tr1 = high - low
    tr2 = abs(high - close.shift(1))
    tr3 = abs(low - close.shift(1))
    tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
    return tr.rolling(window=period).mean()

def crossover(series1, series2):
    """判断 series1 上穿 series2"""
    return (series1 > series2) & (series1.shift(1) <= series2.shift(1))

def crossunder(series1, series2):
    """判断 series1 下穿 series2"""
    return (series1 < series2) & (series1.shift(1) >= series2.shift(1))

print("指标函数已加载")

## 3. 获取测试数据

In [None]:
# 获取测试数据
symbol = 'SHSE.600000'
start_date = '2022-01-01'
end_date = '2023-12-31'

df = history(
    symbol=symbol,
    frequency='1d',
    start_time=start_date,
    end_time=end_date,
    fields='open,high,low,close,volume',
    adjust=ADJUST_PREV,
    df=True
)

print(f"获取 {symbol} 数据: {len(df)} 条")
print(f"时间范围: {df.index[0]} 至 {df.index[-1]}")
df.head()

---
## 策略原型 1: 双均线交叉策略

**策略逻辑**:
- 买入: 短期均线上穿长期均线 (金叉)
- 卖出: 短期均线下穿长期均线 (死叉)

**参数**:
- 短期均线周期 (fast_period)
- 长期均线周期 (slow_period)

In [None]:
# ==============================================================================
# 策略 1: 双均线交叉
# ==============================================================================

def strategy_dual_ma(df, fast_period=5, slow_period=20):
    """双均线交叉策略"""
    data = df.copy()
    
    # 计算均线
    data['ma_fast'] = sma(data['close'], fast_period)
    data['ma_slow'] = sma(data['close'], slow_period)
    
    # 生成信号
    data['signal'] = 0
    data.loc[data['ma_fast'] > data['ma_slow'], 'signal'] = 1
    data.loc[data['ma_fast'] < data['ma_slow'], 'signal'] = -1
    
    # 买卖点
    data['buy'] = crossover(data['ma_fast'], data['ma_slow'])
    data['sell'] = crossunder(data['ma_fast'], data['ma_slow'])
    
    return data

# 运行策略
result1 = strategy_dual_ma(df, fast_period=5, slow_period=20)

print(f"买入信号: {result1['buy'].sum()} 次")
print(f"卖出信号: {result1['sell'].sum()} 次")

In [None]:
# 可视化
fig, ax = plt.subplots(figsize=(14, 7))

ax.plot(result1.index, result1['close'], label='收盘价', color='black', linewidth=1.5)
ax.plot(result1.index, result1['ma_fast'], label='MA5', color='red', linewidth=1)
ax.plot(result1.index, result1['ma_slow'], label='MA20', color='blue', linewidth=1)

# 标记买卖点
buy_points = result1[result1['buy']]
sell_points = result1[result1['sell']]

ax.scatter(buy_points.index, buy_points['close'], marker='^', color='red', s=100, label='买入', zorder=5)
ax.scatter(sell_points.index, sell_points['close'], marker='v', color='green', s=100, label='卖出', zorder=5)

ax.set_ylabel('价格', fontsize=12)
ax.set_xlabel('日期', fontsize=12)
ax.set_title('策略1: 双均线交叉 (MA5/MA20)', fontsize=14)
ax.legend(loc='upper left')
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

---
## 策略原型 2: RSI 均值回归策略

**策略逻辑**:
- 买入: RSI < 30 (超卖)
- 卖出: RSI > 70 (超买)

**参数**:
- RSI 周期
- 超买阈值
- 超卖阈值

In [None]:
# ==============================================================================
# 策略 2: RSI 均值回归
# ==============================================================================

def strategy_rsi_reversion(df, rsi_period=14, oversold=30, overbought=70):
    """RSI 均值回归策略"""
    data = df.copy()
    
    # 计算 RSI
    data['rsi'] = rsi(data['close'], rsi_period)
    
    # 生成信号
    data['signal'] = 0
    data['buy'] = data['rsi'] < oversold
    data['sell'] = data['rsi'] > overbought
    
    # 持仓状态
    position = 0
    positions = []
    for i in range(len(data)):
        if data['buy'].iloc[i] and position == 0:
            position = 1
        elif data['sell'].iloc[i] and position == 1:
            position = 0
        positions.append(position)
    data['position'] = positions
    
    return data

# 运行策略
result2 = strategy_rsi_reversion(df, rsi_period=14, oversold=30, overbought=70)

print(f"超卖信号: {result2['buy'].sum()} 次")
print(f"超买信号: {result2['sell'].sum()} 次")

In [None]:
# 可视化
fig, axes = plt.subplots(2, 1, figsize=(14, 10), sharex=True,
                         gridspec_kw={'height_ratios': [2, 1]})

# 价格图
ax1 = axes[0]
ax1.plot(result2.index, result2['close'], label='收盘价', color='black', linewidth=1.5)

# 持仓区间着色
in_position = result2['position'] == 1
ax1.fill_between(result2.index, result2['close'].min(), result2['close'].max(),
                 where=in_position, alpha=0.2, color='green', label='持仓区间')

ax1.set_ylabel('价格', fontsize=12)
ax1.set_title('策略2: RSI 均值回归', fontsize=14)
ax1.legend(loc='upper left')
ax1.grid(True, alpha=0.3)

# RSI 图
ax2 = axes[1]
ax2.plot(result2.index, result2['rsi'], color='purple', linewidth=1)
ax2.axhline(y=70, color='red', linestyle='--', linewidth=1)
ax2.axhline(y=30, color='green', linestyle='--', linewidth=1)
ax2.fill_between(result2.index, 70, 100, color='red', alpha=0.1)
ax2.fill_between(result2.index, 0, 30, color='green', alpha=0.1)
ax2.set_ylim(0, 100)
ax2.set_ylabel('RSI', fontsize=12)
ax2.set_xlabel('日期', fontsize=12)
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

---
## 策略原型 3: 布林带突破策略

**策略逻辑**:
- 买入: 价格从下方突破下轨后回升至中轨
- 卖出: 价格从上方突破上轨后回落至中轨

**参数**:
- 布林带周期
- 标准差倍数

In [None]:
# ==============================================================================
# 策略 3: 布林带突破
# ==============================================================================

def strategy_bollinger_breakout(df, period=20, std_dev=2):
    """布林带突破策略"""
    data = df.copy()
    
    # 计算布林带
    data['bb_upper'], data['bb_middle'], data['bb_lower'] = bollinger_bands(
        data['close'], period, std_dev
    )
    
    # 计算 %B
    data['pctB'] = (data['close'] - data['bb_lower']) / (data['bb_upper'] - data['bb_lower'])
    
    # 信号: 价格触及下轨后回升买入，触及上轨后回落卖出
    data['touch_lower'] = data['close'] < data['bb_lower']
    data['touch_upper'] = data['close'] > data['bb_upper']
    
    # 状态机
    position = 0
    waiting_buy = False
    waiting_sell = False
    positions = []
    buy_signals = []
    sell_signals = []
    
    for i in range(len(data)):
        buy = False
        sell = False
        
        if data['touch_lower'].iloc[i]:
            waiting_buy = True
        if data['touch_upper'].iloc[i]:
            waiting_sell = True
        
        # 价格回升到中轨 -> 买入
        if waiting_buy and position == 0 and data['close'].iloc[i] > data['bb_middle'].iloc[i]:
            position = 1
            waiting_buy = False
            buy = True
        
        # 价格回落到中轨 -> 卖出
        if waiting_sell and position == 1 and data['close'].iloc[i] < data['bb_middle'].iloc[i]:
            position = 0
            waiting_sell = False
            sell = True
        
        positions.append(position)
        buy_signals.append(buy)
        sell_signals.append(sell)
    
    data['position'] = positions
    data['buy'] = buy_signals
    data['sell'] = sell_signals
    
    return data

# 运行策略
result3 = strategy_bollinger_breakout(df, period=20, std_dev=2)

print(f"买入信号: {sum(result3['buy'])} 次")
print(f"卖出信号: {sum(result3['sell'])} 次")

In [None]:
# 可视化
fig, ax = plt.subplots(figsize=(14, 7))

ax.plot(result3.index, result3['close'], label='收盘价', color='black', linewidth=1.5)
ax.plot(result3.index, result3['bb_upper'], label='上轨', color='red', linewidth=1, linestyle='--')
ax.plot(result3.index, result3['bb_middle'], label='中轨', color='blue', linewidth=1)
ax.plot(result3.index, result3['bb_lower'], label='下轨', color='green', linewidth=1, linestyle='--')
ax.fill_between(result3.index, result3['bb_upper'], result3['bb_lower'], alpha=0.1, color='gray')

# 标记买卖点
buy_points = result3[result3['buy']]
sell_points = result3[result3['sell']]

ax.scatter(buy_points.index, buy_points['close'], marker='^', color='red', s=100, label='买入', zorder=5)
ax.scatter(sell_points.index, sell_points['close'], marker='v', color='green', s=100, label='卖出', zorder=5)

ax.set_ylabel('价格', fontsize=12)
ax.set_xlabel('日期', fontsize=12)
ax.set_title('策略3: 布林带突破', fontsize=14)
ax.legend(loc='upper left')
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

---
## 4. 简易回测框架

In [None]:
# ==============================================================================
# 简易回测函数
# ==============================================================================

def backtest(df, signal_column='signal', initial_capital=100000):
    """简易回测
    
    Parameters:
    -----------
    df : DataFrame
        包含 'close' 和信号列的数据
    signal_column : str
        信号列名 (1=做多, 0=空仓, -1=做空)
    initial_capital : float
        初始资金
        
    Returns:
    --------
    dict : 回测结果
    """
    data = df.copy()
    
    # 计算收益率
    data['returns'] = data['close'].pct_change()
    
    # 策略收益 (使用前一天的信号)
    data['strategy_returns'] = data[signal_column].shift(1) * data['returns']
    
    # 累计收益
    data['cumulative_returns'] = (1 + data['returns']).cumprod()
    data['cumulative_strategy'] = (1 + data['strategy_returns']).cumprod()
    
    # 资金曲线
    data['equity'] = initial_capital * data['cumulative_strategy']
    data['benchmark'] = initial_capital * data['cumulative_returns']
    
    # 计算指标
    total_return = data['cumulative_strategy'].iloc[-1] - 1
    benchmark_return = data['cumulative_returns'].iloc[-1] - 1
    
    # 年化收益
    days = len(data)
    annual_return = (1 + total_return) ** (252 / days) - 1
    
    # 波动率
    volatility = data['strategy_returns'].std() * np.sqrt(252)
    
    # 夏普比率
    sharpe = annual_return / volatility if volatility > 0 else 0
    
    # 最大回撤
    rolling_max = data['cumulative_strategy'].expanding().max()
    drawdown = data['cumulative_strategy'] / rolling_max - 1
    max_drawdown = drawdown.min()
    
    # 胜率 (只考虑非零收益)
    strategy_returns = data['strategy_returns'].dropna()
    strategy_returns = strategy_returns[strategy_returns != 0]
    win_rate = (strategy_returns > 0).sum() / len(strategy_returns) if len(strategy_returns) > 0 else 0
    
    results = {
        'data': data,
        'total_return': total_return,
        'benchmark_return': benchmark_return,
        'annual_return': annual_return,
        'volatility': volatility,
        'sharpe': sharpe,
        'max_drawdown': max_drawdown,
        'win_rate': win_rate,
        'final_equity': data['equity'].iloc[-1]
    }
    
    return results

def print_results(results, name="策略"):
    """打印回测结果"""
    print("=" * 50)
    print(f"{name} 回测结果")
    print("=" * 50)
    print(f"总收益率: {results['total_return']*100:.2f}%")
    print(f"基准收益率: {results['benchmark_return']*100:.2f}%")
    print(f"年化收益率: {results['annual_return']*100:.2f}%")
    print(f"年化波动率: {results['volatility']*100:.2f}%")
    print(f"夏普比率: {results['sharpe']:.2f}")
    print(f"最大回撤: {results['max_drawdown']*100:.2f}%")
    print(f"胜率: {results['win_rate']*100:.2f}%")
    print(f"最终资金: {results['final_equity']:,.2f}")

print("回测框架已加载")

In [None]:
# 回测三个策略

# 策略1 - 双均线
result1['signal'] = 0
result1.loc[result1['ma_fast'] > result1['ma_slow'], 'signal'] = 1
bt1 = backtest(result1, 'signal')
print_results(bt1, "策略1: 双均线交叉")

print()

# 策略2 - RSI
result2['signal'] = result2['position']
bt2 = backtest(result2, 'signal')
print_results(bt2, "策略2: RSI 均值回归")

print()

# 策略3 - 布林带
result3['signal'] = result3['position']
bt3 = backtest(result3, 'signal')
print_results(bt3, "策略3: 布林带突破")

In [None]:
# 收益曲线对比
fig, ax = plt.subplots(figsize=(14, 7))

ax.plot(bt1['data'].index, bt1['data']['cumulative_returns'], 
        label='持有策略', color='black', linewidth=1.5, linestyle='--')
ax.plot(bt1['data'].index, bt1['data']['cumulative_strategy'], 
        label='策略1: 双均线', color='blue', linewidth=1.5)
ax.plot(bt2['data'].index, bt2['data']['cumulative_strategy'], 
        label='策略2: RSI', color='red', linewidth=1.5)
ax.plot(bt3['data'].index, bt3['data']['cumulative_strategy'], 
        label='策略3: 布林带', color='green', linewidth=1.5)

ax.axhline(y=1, color='gray', linestyle='-', alpha=0.5)
ax.set_ylabel('累计收益', fontsize=12)
ax.set_xlabel('日期', fontsize=12)
ax.set_title('策略收益曲线对比', fontsize=14)
ax.legend(loc='upper left')
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

---
## 5. 参数敏感性分析

In [None]:
# 双均线策略参数优化
fast_range = [3, 5, 8, 10, 15]
slow_range = [15, 20, 30, 40, 60]

results_grid = []

for fast in fast_range:
    for slow in slow_range:
        if fast >= slow:
            continue
        
        # 运行策略
        result = strategy_dual_ma(df, fast_period=fast, slow_period=slow)
        result['signal'] = 0
        result.loc[result['ma_fast'] > result['ma_slow'], 'signal'] = 1
        
        # 回测
        bt = backtest(result, 'signal')
        
        results_grid.append({
            'fast': fast,
            'slow': slow,
            'return': bt['total_return'],
            'sharpe': bt['sharpe'],
            'max_dd': bt['max_drawdown']
        })

param_df = pd.DataFrame(results_grid)
print("参数优化结果:")
print(param_df.sort_values('sharpe', ascending=False).head(10))

In [None]:
# 参数热力图
pivot_return = param_df.pivot(index='fast', columns='slow', values='return')
pivot_sharpe = param_df.pivot(index='fast', columns='slow', values='sharpe')

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# 收益率热力图
ax1 = axes[0]
im1 = ax1.imshow(pivot_return, cmap='RdYlGn', aspect='auto')
ax1.set_xticks(range(len(pivot_return.columns)))
ax1.set_yticks(range(len(pivot_return.index)))
ax1.set_xticklabels(pivot_return.columns)
ax1.set_yticklabels(pivot_return.index)
ax1.set_xlabel('慢线周期', fontsize=12)
ax1.set_ylabel('快线周期', fontsize=12)
ax1.set_title('总收益率', fontsize=14)

# 添加数值
for i in range(len(pivot_return.index)):
    for j in range(len(pivot_return.columns)):
        val = pivot_return.iloc[i, j]
        if not pd.isna(val):
            ax1.text(j, i, f'{val*100:.1f}%', ha='center', va='center', fontsize=9)

plt.colorbar(im1, ax=ax1)

# 夏普比率热力图
ax2 = axes[1]
im2 = ax2.imshow(pivot_sharpe, cmap='RdYlGn', aspect='auto')
ax2.set_xticks(range(len(pivot_sharpe.columns)))
ax2.set_yticks(range(len(pivot_sharpe.index)))
ax2.set_xticklabels(pivot_sharpe.columns)
ax2.set_yticklabels(pivot_sharpe.index)
ax2.set_xlabel('慢线周期', fontsize=12)
ax2.set_ylabel('快线周期', fontsize=12)
ax2.set_title('夏普比率', fontsize=14)

for i in range(len(pivot_sharpe.index)):
    for j in range(len(pivot_sharpe.columns)):
        val = pivot_sharpe.iloc[i, j]
        if not pd.isna(val):
            ax2.text(j, i, f'{val:.2f}', ha='center', va='center', fontsize=9)

plt.colorbar(im2, ax=ax2)

plt.tight_layout()
plt.show()

---
## 6. 策略改进方向

In [None]:
# ==============================================================================
# 改进策略: 双均线 + RSI 过滤 + ATR 止损
# ==============================================================================

def strategy_enhanced(df, fast_period=5, slow_period=20, rsi_period=14, 
                      rsi_lower=40, rsi_upper=60, atr_multiplier=2):
    """增强型策略"""
    data = df.copy()
    
    # 计算指标
    data['ma_fast'] = sma(data['close'], fast_period)
    data['ma_slow'] = sma(data['close'], slow_period)
    data['rsi'] = rsi(data['close'], rsi_period)
    data['atr'] = atr(data['high'], data['low'], data['close'], 14)
    
    # 信号条件
    ma_bullish = data['ma_fast'] > data['ma_slow']
    rsi_neutral = (data['rsi'] > rsi_lower) & (data['rsi'] < rsi_upper)
    
    # 进场信号: 均线金叉 + RSI 在中性区间
    entry_signal = crossover(data['ma_fast'], data['ma_slow']) & rsi_neutral
    
    # 出场信号: 均线死叉 或 ATR 止损
    exit_signal = crossunder(data['ma_fast'], data['ma_slow'])
    
    # 模拟交易
    position = 0
    entry_price = 0
    positions = []
    
    for i in range(len(data)):
        # ATR 止损检查
        if position == 1:
            stop_price = entry_price - atr_multiplier * data['atr'].iloc[i]
            if data['close'].iloc[i] < stop_price:
                position = 0  # 止损出场
        
        # 进场
        if entry_signal.iloc[i] and position == 0:
            position = 1
            entry_price = data['close'].iloc[i]
        
        # 出场
        if exit_signal.iloc[i] and position == 1:
            position = 0
        
        positions.append(position)
    
    data['signal'] = positions
    
    return data

# 运行增强策略
result_enhanced = strategy_enhanced(df)
bt_enhanced = backtest(result_enhanced, 'signal')

print_results(bt_enhanced, "增强策略: 双均线 + RSI过滤 + ATR止损")

In [None]:
# 对比原始双均线和增强策略
fig, ax = plt.subplots(figsize=(14, 7))

ax.plot(bt1['data'].index, bt1['data']['cumulative_returns'], 
        label='持有策略', color='black', linewidth=1.5, linestyle='--')
ax.plot(bt1['data'].index, bt1['data']['cumulative_strategy'], 
        label='原始双均线', color='blue', linewidth=1.5, alpha=0.7)
ax.plot(bt_enhanced['data'].index, bt_enhanced['data']['cumulative_strategy'], 
        label='增强策略', color='red', linewidth=2)

ax.axhline(y=1, color='gray', linestyle='-', alpha=0.5)
ax.set_ylabel('累计收益', fontsize=12)
ax.set_xlabel('日期', fontsize=12)
ax.set_title('原始策略 vs 增强策略', fontsize=14)
ax.legend(loc='upper left')
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# 绩效对比表
print("\n" + "="*60)
print("策略绩效对比")
print("="*60)
print(f"{'指标':<15} {'原始双均线':>15} {'增强策略':>15}")
print("-"*60)
print(f"{'总收益率':<15} {bt1['total_return']*100:>14.2f}% {bt_enhanced['total_return']*100:>14.2f}%")
print(f"{'夏普比率':<15} {bt1['sharpe']:>15.2f} {bt_enhanced['sharpe']:>15.2f}")
print(f"{'最大回撤':<15} {bt1['max_drawdown']*100:>14.2f}% {bt_enhanced['max_drawdown']*100:>14.2f}%")
print(f"{'胜率':<15} {bt1['win_rate']*100:>14.2f}% {bt_enhanced['win_rate']*100:>14.2f}%")

---
## 7. 小结

本 Notebook 展示了策略原型开发的完整流程:

| 步骤 | 内容 |
|------|------|
| 1. 策略设计 | 明确买卖逻辑和参数 |
| 2. 信号生成 | 实现策略逻辑 |
| 3. 可视化 | 验证信号合理性 |
| 4. 简易回测 | 快速评估绩效 |
| 5. 参数优化 | 寻找最优参数组合 |
| 6. 策略改进 | 添加过滤器、止损等 |

**策略开发建议**:
1. 先在 Notebook 中快速验证想法
2. 使用简易回测评估可行性
3. 确认有效后再转换为正式策略代码
4. 正式回测使用掘金的 `run()` 函数

**下一步**: 将验证过的策略原型转换为正式的策略文件 (参考 `test/` 目录中的策略模板)。