优化版Backtrader回测程序
主要优化点：
1. 启用多核处理(maxcpus=16)提高并行计算能力
2. 优化数据预处理，减少重复操作
3. 使用更高效的数据结构和查询方式
4. 减少不必要的日志输出
5. 添加性能监控


In [1]:
import backtrader as bt
import pandas as pd
import numpy as np
import datetime
import time
from copy import deepcopy

# 小试一下

In [3]:
# 实例化 cerebro
cerebro = bt.Cerebro()
# Cerbro在后台自动创建Broker对象，用于管理交易账户和资金
# 打印初始资金
print('Starting Portfolio Value: %.2f' % cerebro.broker.getvalue())
# 启动回测
cerebro.run()
# 打印回测完成后的资金
print('Final Portfolio Value: %.2f' % cerebro.broker.getvalue())

Starting Portfolio Value: 10000.00
Final Portfolio Value: 10000.00


# 一、数据准备

## 1.1 读取日度行情表

表内字段就是 Backtrader 默认情况下要求输入的 7 个字段： 'datetime' 、'open'、'high'、'low'、'close'、'volume'、'openinterest'，外加一个 'sec_code' 股票代码字段。

Backtrader 的核心组件：
1. 数据加载器：DataFeeds
2. 策略：Strategy
3. 回测大脑：Cerebro
4. 经纪人：Broker
5. 指标：Indicators
6. 订单：Order
7. 仓位管理：Position
8. 交易信号：Signals
9. 性能评估：Analyzer


In [4]:
daily_price = pd.read_csv("./data/daily_price.csv", parse_dates=['datetime'])
daily_price

Unnamed: 0,datetime,sec_code,open,high,low,close,volume,openinterest
0,2019-01-02,600466.SH,33.064891,33.496709,31.954503,32.386321,10629352,0
1,2019-01-02,603228.SH,50.660230,51.458513,50.394136,51.120778,426147,0
2,2019-01-02,600315.SH,148.258423,150.480132,148.258423,149.558935,2138556,0
3,2019-01-02,000750.SZ,49.512579,53.154883,48.715825,51.561375,227557612,0
4,2019-01-02,002588.SZ,36.608672,36.608672,35.669988,35.763857,2841517,0
...,...,...,...,...,...,...,...,...
255967,2021-01-28,600717.SH,121.489201,122.011736,120.705400,120.966667,6022213,0
255968,2021-01-28,300558.SZ,134.155888,137.600704,130.700970,131.569750,5330301,0
255969,2021-01-28,600171.SH,39.774873,39.830040,38.864630,38.947380,12354183,0
255970,2021-01-28,600597.SH,47.190201,49.243025,46.250355,46.423484,32409940,0


In [6]:
daily_price.query("sec_code=='600466.SH'")

Unnamed: 0,datetime,sec_code,open,high,low,close,volume,openinterest
0,2019-01-02,600466.SH,33.064891,33.496709,31.954503,32.386321,10629352,0
546,2019-01-03,600466.SH,32.262944,32.941515,31.399309,31.831127,8602646,0
1211,2019-01-04,600466.SH,31.399309,33.558397,31.337621,33.496709,12768116,0
1700,2019-01-07,600466.SH,33.496709,34.360344,33.373332,33.620085,10584321,0
2136,2019-01-08,600466.SH,33.311644,34.113591,32.694762,33.743462,10012902,0
...,...,...,...,...,...,...,...,...
253953,2021-01-22,600466.SH,30.245430,30.312942,29.502796,29.772845,17184055,0
253973,2021-01-25,600466.SH,29.570309,29.570309,28.827675,28.962699,23646174,0
254574,2021-01-26,600466.SH,28.962699,29.232748,28.692651,28.760163,9963442,0
255079,2021-01-27,600466.SH,28.760163,29.232748,28.692651,28.895187,12929331,0


In [66]:
# 以 datetime 为 index，类型为 datetime 或 date 类型，Datafeeds 默认情况下是将 index 匹配给 datetime 字段；
daily_price = daily_price.set_index(['datetime'])

## 1.2 读取调仓信息表

表内数据说明：

+ trade_date： 调仓期（每月最后一个交易日）;

+ sec_code：持仓成分股；

+ weight：持仓权重。

In [67]:
trade_info = pd.read_csv("./data/trade_info.csv", parse_dates=['trade_date'])
trade_info

Unnamed: 0,trade_date,sec_code,weight
0,2019-01-31,000006.SZ,0.007282
1,2019-01-31,000008.SZ,0.009783
2,2019-01-31,000025.SZ,0.006928
3,2019-01-31,000090.SZ,0.007234
4,2019-01-31,000536.SZ,0.003536
...,...,...,...
2490,2021-01-28,603712.SH,0.007630
2491,2021-01-28,603737.SH,0.019291
2492,2021-01-28,603816.SH,0.022646
2493,2021-01-28,603866.SH,0.018611


# 二、 选股回测

 选股策略：定期按持仓权重调仓 。

In [68]:
# 回测策略 - 优化版
class TestStrategy(bt.Strategy):
    params = (
        ('buy_stocks', None),  # 传入各个调仓日的股票列表和相应的权重
        ('verbose', False),    # 是否打印详细日志，设为False可提高性能
    )
    
    def log(self, txt, dt=None):
        ''' 日志函数 - 仅在verbose=True时打印 '''
        if self.params.verbose:
            dt = dt or self.datas[0].datetime.date(0)
            print('{}, {}'.format(dt.isoformat(), txt))

    def __init__(self):
        # 预处理调仓信息，提高查询效率
        self.trade_dates = set(pd.to_datetime(self.p.buy_stocks['trade_date'].unique()).tolist())
        
        # 创建高效的查询字典
        self.trade_info_dict = {}
        for date in self.trade_dates:
            stocks_data = self.p.buy_stocks[self.p.buy_stocks['trade_date'] == date]
            self.trade_info_dict[date] = {
                row['sec_code']: row['weight'] 
                for _, row in stocks_data.iterrows()
            }
        
        self.order_list = []  # 记录以往订单
        self.buy_stocks_pre = []  # 记录上一期持仓
        
        # 记录当前日期，避免每次next()都重新获取
        self.current_date = None
    
    def next(self):
        # 获取当前的回测时间点
        dt = self.datas[0].datetime.date(0)
        self.current_date = dt
        
        # 仅在verbose模式下打印资产信息
        if self.params.verbose:
            self.log('当前总资产 %.2f' % (self.broker.getvalue()))
        
        # 使用set成员检查提高效率
        if dt in self.trade_dates:
            if self.params.verbose:
                print("--------------{} 为调仓日----------".format(dt))
            
            # 取消未完成订单
            if len(self.order_list) > 0:
                if self.params.verbose:
                    print("--------------- 撤销未完成的订单 -----------------")
                for od in self.order_list:
                    self.cancel(od)
                self.order_list = []
            
            # 使用预处理的字典获取当前调仓信息
            current_holdings = self.trade_info_dict[dt]
            long_list = list(current_holdings.keys())
            
            if self.params.verbose:
                print('long_list', long_list)
            
            # 卖出不再持有的股票
            sell_stock = [i for i in self.buy_stocks_pre if i not in long_list]
            if sell_stock and self.params.verbose:
                print('sell_stock', sell_stock)
                print("-----------对不再持有的股票进行平仓--------------")
            
            for stock in sell_stock:
                data = self.getdatabyname(stock)
                if self.getposition(data).size > 0:
                    od = self.close(data=data)
                    self.order_list.append(od)
            
            # 买入或调整持仓
            if self.params.verbose:
                print("-----------买入此次调仓期的股票--------------")
            
            for stock in long_list:
                weight = current_holdings[stock]  # 直接从字典获取权重，避免查询
                data = self.getdatabyname(stock)
                order = self.order_target_percent(data=data, target=weight*0.95)
                self.order_list.append(order)
            
            self.buy_stocks_pre = long_list
    
    def notify_order(self, order):
        # 未被处理的订单
        if order.status in [order.Submitted, order.Accepted]:
            return
        
        # 已被处理的订单 - 仅在verbose模式下打印详细信息
        if self.params.verbose and order.status in [order.Completed, order.Canceled, order.Margin]:
            if order.isbuy():
                self.log(
                    'BUY EXECUTED, ref:%.0f，Price: %.2f, Cost: %.2f, Comm %.2f, Size: %.2f, Stock: %s' %
                    (order.ref,
                     order.executed.price,
                     order.executed.value,
                     order.executed.comm,
                     order.executed.size,
                     order.data._name))
            else:  # Sell
                self.log(
                    'SELL EXECUTED, ref:%.0f, Price: %.2f, Cost: %.2f, Comm %.2f, Size: %.2f, Stock: %s' %
                    (order.ref,
                     order.executed.price,
                     order.executed.value,
                     order.executed.comm,
                     order.executed.size,
                     order.data._name))

In [69]:
# 实例化大脑，启用多核
cerebro_ = bt.Cerebro(maxcpus=16)

# 预处理所有数据
data_dict = {}
unique_dates = daily_price.index.unique()
required_columns = ['open', 'high', 'low', 'close', 'volume', 'openinterest']

for stock in daily_price['sec_code'].unique():
    # 提取数据并一次性完成所有处理
    df = daily_price.query(f"sec_code=='{stock}'")[required_columns]
    data_ = pd.DataFrame(index=unique_dates)
    data_ = pd.merge(data_, df, left_index=True, right_index=True, how='left')
    
    # 使用更高效的数据处理方式
    data_ = data_.assign(
        volume=data_['volume'].fillna(0),
        openinterest=data_['openinterest'].fillna(0),
        **data_[['open', 'high', 'low', 'close']].ffill().fillna(0)
    )
    
    # 创建数据源并添加到cerebro
    datafeed = bt.feeds.PandasData(
        dataname=data_,
        fromdate=datetime.datetime(2019,1,2),
        todate=datetime.datetime(2021,1,28)
    )
    cerebro_.adddata(datafeed, name=stock)
    print(f"{stock} Done!")

600466.SH Done!
603228.SH Done!
600315.SH Done!
000750.SZ Done!
002588.SZ Done!
002926.SZ Done!
603816.SH Done!
002517.SZ Done!
600366.SH Done!
001914.SZ Done!
000732.SZ Done!
600733.SH Done!
000930.SZ Done!
002093.SZ Done!
603056.SH Done!
002078.SZ Done!
600978.SH Done!
600329.SH Done!
601872.SH Done!
600058.SH Done!
601019.SH Done!
600497.SH Done!
002563.SZ Done!
600699.SH Done!
601608.SH Done!
002051.SZ Done!
002603.SZ Done!
000636.SZ Done!
000980.SZ Done!
002217.SZ Done!
600291.SH Done!
600827.SH Done!
603369.SH Done!
000829.SZ Done!
002317.SZ Done!
002509.SZ Done!
002557.SZ Done!
002212.SZ Done!
000415.SZ Done!
000860.SZ Done!
600317.SH Done!
600060.SH Done!
600500.SH Done!
300474.SZ Done!
600273.SH Done!
000681.SZ Done!
603707.SH Done!
600967.SH Done!
600415.SH Done!
002056.SZ Done!
600267.SH Done!
600161.SH Done!
002670.SZ Done!
002127.SZ Done!
002815.SZ Done!
002223.SZ Done!
002690.SZ Done!
600039.SH Done!
600126.SH Done!
601678.SH Done!
000543.SZ Done!
002185.SZ Done!
600410.S

In [70]:
# 使用原始cerebro_而不是深度复制，避免额外的内存开销
cerebro = cerebro_  # 直接使用已经导入数据的cerebro_

# 设置broker参数
cerebro.broker.setcash(100000000.0)
cerebro.broker.setcommission(commission=0.0003)
cerebro.broker.set_slippage_perc(perc=0.0001)

# 预处理trade_info数据，提高查询效率
# 将trade_info转换为更高效的查询结构
trade_info_dict = {}
for date in pd.to_datetime(trade_info['trade_date'].unique()):
    stocks_data = trade_info[trade_info['trade_date'] == date]
    trade_info_dict[date] = {
        row['sec_code']: row['weight'] 
        for _, row in stocks_data.iterrows()
    }

# 添加策略，设置verbose=False来减少日志输出提高性能
cerebro.addstrategy(
    TestStrategy,
    buy_stocks=trade_info,
    verbose=False  # 关闭详细日志输出以提高性能
)

# 添加性能监控
print("\n开始回测...")
print(f"处理的股票数量: {len(daily_price['sec_code'].unique())}")
print(f"时间范围: {daily_price.index.min()} 到 {daily_price.index.max()}")
print(f"总交易天数: {len(daily_price.index.unique())}")

# 添加分析器
cerebro.addanalyzer(bt.analyzers.TimeReturn, _name='pnl')
cerebro.addanalyzer(bt.analyzers.AnnualReturn, _name='_AnnualReturn')
cerebro.addanalyzer(bt.analyzers.SharpeRatio, riskfreerate=0.003, annualize=True, _name='_SharpeRatio')
cerebro.addanalyzer(bt.analyzers.DrawDown, _name='_DrawDown')

# 添加观测器 - 仅在需要可视化时添加，否则会影响性能
cerebro.addobserver(bt.observers.Value)

# 启动回测
print("开始回测...")
import time
start_time = time.time()
result = cerebro.run()
end_time = time.time()
print(f"回测完成，耗时: {end_time - start_time:.2f} 秒")


开始回测...
处理的股票数量: 510
时间范围: 2019-01-02 00:00:00 到 2021-01-28 00:00:00
总交易天数: 506
开始回测...
回测完成，耗时: 88.39 秒


In [71]:
strat = result[0]
print("--------------- AnnualReturn -----------------")
print(strat.analyzers._AnnualReturn.get_analysis())
print("--------------- SharpeRatio -----------------")
print(strat.analyzers._SharpeRatio.get_analysis())
print("--------------- DrawDown -----------------")
print(strat.analyzers._DrawDown.get_analysis())
print("--------------- Portfolio Value -----------------")
# 打印回测完成后的资金
print('Final Portfolio Value: %.2f' % cerebro.broker.getvalue())

--------------- AnnualReturn -----------------
OrderedDict([(2019, 0.0), (2020, 0.0), (2021, 0.0)])
--------------- SharpeRatio -----------------
OrderedDict([('sharperatio', None)])
--------------- DrawDown -----------------
AutoOrderedDict([('len', 0), ('drawdown', 0.0), ('moneydown', 0.0), ('max', AutoOrderedDict([('len', 0.0), ('drawdown', 0.0), ('moneydown', 0.0)]))])
--------------- Portfolio Value -----------------
Final Portfolio Value: 100000000.00
