In [1]:

G_COUNT = 0  # jupyter运行次数统计
G_CEREBRO = None  # 大脑引擎
G_RESULT_ONE = None  # 回测大脑返回
G_RESULTS_OPT = None  # 参数调优大脑返回
res_df = None  # 筛选后的参数优化结果


In [2]:
import os, sys
import re
import time
import backtrader as bt
import logging
import argparse
import pandas as pd
import numpy as np
from datetime import datetime
import global_variable as glv  # 全局变量模块
import MyStrategy as ms

glv.init()

# """命令行参数解析"""
def parse_args(pargs=None):
    """命令行参数解析"""
    kwargs = glv.get('kwargs', dict())
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
        description='Sample for Order Target')

    parser.add_argument('--data', required=False, default=kwargs['G_FILE_PATH'], help='Specific data to be read in')
    parser.add_argument('--dtformat', required=False, default=kwargs['G_DT_DTFORMAT'], help='Ending date in data datetime format')
    parser.add_argument('--fromdate', required=False, default=kwargs['G_DT_START'], help='Starting date in `dtformat` format')
    parser.add_argument('--todate', required=False, default=kwargs['G_DT_END'], help='Ending date in `dtformat` format')
    parser.add_argument('--timeframe', required=False, default=kwargs['G_DT_TIMEFRAME'], choices=['minutes', 'daily', 'weekly', 'monthly'], help='重新采样到的时间范围')
    parser.add_argument('--compression', required=False, type=int, default=kwargs['G_DT_COMPRESSION'], help='将 n 条压缩为 1, 最小周期为原数据周期')
    parser.add_argument('--kpr', required=False, type=dict, default=kwargs.get('G_P_PARAM', dict()).get('kpr'), help="当穿越关键价格后加仓限制，字典类型 {日期1:{'kps':[价格1,价格2]}, 日期2:{'kps':[价格1,价格2]},}"),
    parser.add_argument('--pwl', required=False, type=list, default=kwargs.get('G_P_PARAM', dict()).get('pwl'), help='--pwl 盈亏千分比'),
    parser.add_argument('--pw', required=False, type=list, default=kwargs.get('G_P_PARAM', dict()).get('pw'), help='--pw 盈利千分比'),
    parser.add_argument('--pl', required=False, type=list, default=kwargs.get('G_P_PARAM', dict()).get('pl'), help='--pl 亏损千分比'),
    parser.add_argument('--ojk', required=False, type=list, default=kwargs.get('G_P_PARAM', dict()).get('ojk'), help='--ojk 订单间隔bar周期数'),
    parser.add_argument('--opts', required=False, type=bool, default=kwargs.get('G_OPTS', False), help='是否策略优化')
    parser.add_argument('--quantstats', required=False, type=int, default=kwargs['G_QUANTSTATS'], help='是否使用 quantstats 分析测试结果')
    parser.add_argument('--maxcpus', '-m', type=int, required=False, default=15, help=('Number of CPUs to use in the optimization'
                                                                                       '\n  - 0 (default): 使用所有可用的 CPU\n   - 1 -> n: 使用尽可能多的指定\n'))
    parser.add_argument('--no-optdatas', action='store_true', required=False, help='优化中不优化数据预加载')
    parser.add_argument('--no-optreturn', action='store_true', required=False, help='不要优化返回值以节省时间,这避免了回传大量生成的数据，例如指标在回溯测试期间生成的值')

    # Plot options
    parser.add_argument('--plot', '-p', nargs='?', required=False, metavar='kwargs', const=True, default=kwargs['G_PLOT'], help='绘制应用传递的任何 kwargs 的读取数据\n\n例如:\n\n  --plot style="candle" (to plot candles)\n')
    parser.add_argument("-f", "--file", default="file")  # 接收这个-f参数
    if pargs is not None:
        return parser.parse_args(pargs)

    return parser.parse_args()


def runstrat(args=None):
    global G_RESULT_ONE, G_RESULTS_OPT, res_df  # 申明要使用全局变量
    strats = None
    result_one = glv.get('G_RESULT_ONE')
    results_opt = glv.get('G_RESULTS_OPT')

    args = parse_args(args)
    glv.set('args', args)
    kwargs = glv.get('kwargs')  # 参数字典
    kwargs['test_kwargs'] = dict()  # 回测参数字典

    file_path_abs = dt_start = dt_end = dt_format = dt_dtformat = dt_tmformat = ""

    if args.dtformat is not None:
        dt_format = args.dtformat
        dt_dtformat = dt_format[:dt_format.find('%d') + len('%d')]
        dt_tmformat = dt_format[dt_format.find('%H'):]
        # dkwargs['dtformat'] = dt_format
        # dkwargs['tmformat'] = dt_tmformat
    if args.fromdate is not None:
        dt_start = datetime.strptime(args.fromdate, dt_dtformat).date()
        # dkwargs['fromdate'] = dt_start
    if args.todate is not None:
        dt_end = datetime.strptime(args.todate, dt_dtformat).date()
        # dkwargs['todate'] = dt_end
    # 从文件路径加载数据
    if args.data is not None:
        file_path = args.data
        myQuant_ROOT = os.getcwd()[:os.getcwd().find("bt_backtrader\\") + len("bt_backtrader\\")]  # 获取项目中相对根路径
        file_path_abs = os.path.join(myQuant_ROOT, file_path)  # 文件路径
        file_path_hdf_abs = file_path_abs.replace('.csv', '.hdf')  # hdf文件路径
        # hdf文件的key, key=合约id_数据周期
        kwargs['hdf_key'] = hdf_key = str(kwargs['G_CONT_ID'])
        print("run time:", datetime.now())
        # print('dt_start:', dt_start, 'dt_end:', dt_end)
        print("dt_format:", dt_format, "dt_start:", datetime.strftime(dt_start, "%Y-%m-%d"), "dt_end:", datetime.strftime(dt_end, "%Y-%m-%d"))
        df_data = None
        # 从hdf文件加载数据 hdf文件加载速度要比用read_csv从csv文件中加载数据快很多
        if os.path.exists(file_path_hdf_abs):
            print(file_path_hdf_abs)
            # 读取hdf文件数据
            hdf_store = pd.HDFStore(file_path_hdf_abs, mode='r')
            # 从hdf文件中加载指定key的数据
            df_data = hdf_store.get(hdf_key)
            # 关闭hdf文件
            hdf_store.close()
            pass
        # 从csv文件中加载数据
        elif os.path.exists(file_path_abs):
            print(file_path_abs)
            # 加载数据
            df_data = pd.read_csv(filepath_or_buffer=file_path_abs,
                                  # parse_dates={'datetime': ['date', 'time']},  # 日期和时间分开的情况
                                  parse_dates=['datetime'],
                                  index_col='datetime',
                                  infer_datetime_format=True,
                                  usecols=['datetime', 'open', 'close'],
                                  )
            # df.sort_values(by=["datetime"], ascending=True, inplace=True)  # 按日期先后排序
            # df.sort_values(by=["date", "time"], ascending=True, inplace=True)  # 按日期时间列先后排序

            # df.index = pd.to_datetime(df.date + ' ' + df.time, format=dt_format)  # 方式1: 将日期列和时间合并后转换成日期类型,并设置成列索引
            # df.index = pd.to_datetime(df.date.astype(str) + ' ' + df.time.astype(str), format=dt_format)  # 方式2: 将日期列和时间合并后转换成日期类型,并设置成列索引
            # df.index = pd.to_datetime(df['date'] + ' ' + df['time'], format=dt_format)  # 方式3: 将日期列和时间合并后转换成日期类型,并设置成列索引
            # df.index = pd.to_datetime(df['date'], format=dt_dtformat) + pd.to_timedelta(df['time'])  # 方式4: 将日期列和时间合并后转换成日期类型,并设置成列索引
            # df.index = pd.to_datetime(df.pop('date')) + pd.to_timedelta(df.pop('time'))  # 方式5: 将日期列和时间合并后转换成日期类型,并设置成列索引
            # df.index = pd.to_datetime(df['date'].str.cat(df['time'], sep=' '), format=dt_format)  # 方式6: 将日期列和时间合并后转换成日期类型,并设置成列索引
            # df_data['openinterest'] = 0.00  # 增加一列openinterest
            # df_data = df_data[['open', 'high', 'low', 'close', 'volume']]  # 取出特定的列
            # df_data.rename(columns={"volume": "vol"}, inplace=True)  # 列名修改
            pass
            # 将当前周期数据保存到hdf文件中,创建hdf文件
            hdf_store = pd.HDFStore(file_path_hdf_abs, mode='w')
            # 存储数据到hdf中
            hdf_store[hdf_key] = df_data
            # 关闭hdf文件
            hdf_store.close()
            pass
        elif not os.path.exists(file_path_abs):
            raise Exception("数据源文件未找到！" + file_path_abs)

        # 截取时间段内样本数据
        df_data = df_data[dt_start:dt_end]  # 截取时间段内的数据
        # data = bt.feeds.GenericCSVData(dataname=file_path, **dkwargs)  # 使用GenericCSVData数据源创建交易数据集, 对于日期和时间是同一列的不太适用
        data = (bt.feeds.PandasData(dataname=df_data, fromdate=dt_start, todate=dt_end))  # 使用pandas数据源创建交易数据集
        # 由数据相对路径+合约id+数据周期+开始日期+结束时期+{参数字典}组成的文件名
        kwargs['file_name'] = ('{:}_{:}_{:}_{:}_{:}'
                               .format((str(kwargs['G_FILE_PATH'][:6]) + kwargs['G_CONT_ID']),
                                       (str(kwargs['G_DT_COMPRESSION']) + (kwargs['G_DT_TIMEFRAME'][:1])),
                                       kwargs['G_DT_START'], kwargs['G_DT_END'],
                                       (str(kwargs['G_P_PARAM']).replace('range', '')  # 替换路径中的range字符串
                                        .translate(str.maketrans({' ': '', '\'': '', ':': '', }))),  # 将路径中的空格':字符替换成''
                                       ) + ('_rs' if kwargs['G_OPTS_IS_USE'] else ''))

    if args.pwl is not None:
        kwargs['test_kwargs']['pwl'] = args.pwl
    if args.pw is not None:
        kwargs['test_kwargs']['pw'] = args.pw
    if args.pl is not None:
        kwargs['test_kwargs']['pl'] = args.pl
    if args.ojk is not None:
        kwargs['test_kwargs']['ojk'] = args.ojk
    if args.kpr is not None:
        kwargs['test_kwargs']['kpr'] = args.kpr

    # 重采样到更大时间框架
    if args.timeframe and args.compression:
        tframes = dict(
            minutes=bt.TimeFrame.Minutes,
            daily=bt.TimeFrame.Days,
            weekly=bt.TimeFrame.Weeks,
            monthly=bt.TimeFrame.Months)
        data.resample(timeframe=tframes[args.timeframe], compression=args.compression)
    # 初始化大脑
    cerebro = bt.Cerebro(stdstats=False)
    # 加载数据到大脑
    cerebro.adddata(data)
    # 设置投资金额1000000
    cerebro.broker.setcash(kwargs.get('G_INI_CASH', 10000 * 10))
    # 设置手续费
    commissioninfo(cerebro=cerebro)

    # 参数调优
    if args.opts:
        optimize(cerebro=cerebro)
        pass
    # 回测分析
    if not args.opts:
        backing(cerebro=cerebro)
        pass
    # plot绘图
    if args.plot and not args.opts:
        pkwargs = dict(style='bar')
        if args.plot is not True:  # evals to True but is not True
            npkwargs = eval('dict(' + args.plot + ')')  # args were passed
            pkwargs.update(npkwargs)

        cerebro.plot(volume=False, **pkwargs)  # 绘图BT观察器结果
        pyplot(result_one=result_one)  # 结合pyfolio工具 计算并绘制收益评价指标
    # 回测分析保存到文件
    if args.quantstats and not args.opts:
        # 使用quantstats 分析工具并保存到HTML文件
        quantstats_reports_html(result_one=glv.get('G_RESULT_ONE'))
        # 使用quantstats 分析工具并保存到HTML文件


# """设置手续费"""
def commissioninfo(cerebro):
    """设置手续费"""
    # # <editor-fold desc="折叠代码:交易手续费设置一">
    # cerebro.broker.setcommission(
    #     # 交易手续费，根据margin取值情况区分是百分比手续费还是固定手续费
    #     commission=0.0015,
    #     # commission=4,
    #     # 期货保证金，决定着交易费用的类型,只有在stocklike=False时起作用
    #     margin=0,
    #     # 乘数，盈亏会按该乘数进行放大
    #     mult=10.0,
    #     # 交易费用计算方式，取值有：
    #     # 1.CommInfoBase.COMM_PERC 百分比费用
    #     # 2.CommInfoBase.COMM_FIXED 固定费用
    #     # 3.None 根据 margin 取值来确定类型
    #     commtype=bt.CommInfoBase.COMM_PERC,
    #     # 当交易费用处于百分比模式下时，commission 是否为 % 形式
    #     # True，表示不以 % 为单位，0.XX 形式；False，表示以 % 为单位，XX% 形式
    #     percabs=True,
    #     # 是否为股票模式，该模式通常由margin和commtype参数决定
    #     # margin=None或COMM_PERC模式时，就会stocklike=True，对应股票手续费；
    #     # margin设置了取值或COMM_FIXED模式时,就会stocklike=False，对应期货手续费
    #     stocklike=False,
    #     # 计算持有的空头头寸的年化利息
    #     # days * price * abs(size) * (interest / 365)
    #     interest=0.0,
    #     # 计算持有的多头头寸的年化利息
    #     interest_long=False,
    #     # 杠杆比率，交易时按该杠杆调整所需现金
    #     leverage=1.0,
    #     # 自动计算保证金
    #     # 如果 False, 则通过margin参数确定保证金
    #     # 如果automargin<0, 通过mult*price确定保证金
    #     # 如果automargin>0, 如果 automargin*price确定保证金 automargin=mult*margin
    #     automargin=10 * 0.13,
    #     # 交易费用设置作用的数据集(也就是作用的标的)
    #     # 如果取值为None，则默认作用于所有数据集(也就是作用于所有assets)
    #     name=None
    # )
    # # </editor-fold>
    pass
    # <editor-fold desc="折叠代码:交易手续费设置方式二">
    from my_strategy_config import MyCommission  # 自定义合约信息
    comm = {
        'ZJIF13': MyCommission(commtype=bt.CommInfoBase.COMM_PERC, commission=0.00050, margin_rate=0.23, mult=300.0),  # 股指合约信息
        'SQRB13': MyCommission(commtype=bt.CommInfoBase.COMM_PERC, commission=0.00015, margin_rate=0.13, mult=10.0),  # 螺纹钢合约信息
        'SQCU13': MyCommission(commtype=bt.CommInfoBase.COMM_PERC, commission=0.00015, margin_rate=0.14, mult=5.0),  # 沪铜合约信息
        'DQC13': MyCommission(commtype=bt.CommInfoBase.COMM_FIXED, commission=2.4, margin_rate=0.10, mult=10.0),  # 玉米合约信息
        'ZQCF13': MyCommission(commtype=bt.CommInfoBase.COMM_FIXED, commission=4.0, margin_rate=0.11, mult=5.0),  # 棉花合约信息

    }
    # 添加进 broker
    cerebro.broker.addcommissioninfo(comm[kwargs['G_CONT_ID']], name=None)  # name 用于指定该交易费用函数适用的标的,未指定将适用所有标的
    # </editor-fold>
    pass


# """参数调优"""
def optimize(cerebro):
    """参数调优"""
    args = glv.get('args')
    kwargs = glv.get('kwargs')
    results_opt = glv.get('G_RESULTS_OPT')
    opts_kwargs = kwargs.get('G_P_PARAM')  # 优化参数字典
    kwargs['opts_path'] = (kwargs.get('file_name') + '_opt.csv')  # 优化结果保存路径
    print('opts_kwargs:', opts_kwargs)
    # clock the start of the process
    tstart = time.perf_counter()
    # 为Cerebro引擎添加策略, 优化策略
    from MyStrategy import MyStrategy  # 引用策略
    strats = cerebro.optstrategy(MyStrategy, **opts_kwargs)
    # 添加分析指标
    cerebro.addanalyzer(bt.analyzers.TimeReturn, _name='timeReturn', timeframe=bt.TimeFrame.Years)  # 此分析器通过查看时间范围的开始和结束来计算回报
    cerebro.addanalyzer(bt.analyzers.Returns, _name="returns")  # 使用对数方法计算的总回报、平均回报、复合回报和年化回报
    cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyFolio')  # 添加PyFolio
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name="drawdown")  # 回撤
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name="sharpe")  # 夏普率
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name="tradeAnalyzer")  # 提供有关平仓交易的统计信息（也保留未平仓交易的数量）
    from my_tradeanalyzer import My_TradeAnalyzer  # 自定义分析器
    cerebro.addanalyzer(My_TradeAnalyzer, _name="my_tradeAnalyzer")  # 自定义平仓交易的统计信息
    # Run over everything
    if kwargs['G_OPTS_IS_USE'] and glv.get('G_RESULTS_OPT'):  # 是否使用上次参数优化结果
        results_opt = glv.get('G_RESULTS_OPT')
        print("--------------- 上次参数优化结果 -----------------")
    else:
        results_opt = cerebro.run(
            maxcpus=args.maxcpus,
            optdatas=not args.no_optdatas,  # optdatas（默认值：True)如果和优化（以及系统可以和使用），数据预加载将只在主进程中完成一次，以节省时间和资源。
            optreturn=not args.no_optreturn,  # optreturn（默认值：True) 在大多数情况下，只有分析器和哪些参数是评估策略性能所需的东西,优化结果不是完整的对象,而是具有以下属性的对象（以及所有数据、指标、观察器等）。如果需要对（例如）指标的生成值进行详细分析，请将其关闭
            # optreturn=False,
            # stdstats=False,
        )
        glv.set('G_RESULTS_OPT', results_opt)
        print("--------------- 参数优化结果 -----------------")
    # clock the end of the process
    tend = time.perf_counter()
    # print out the results_opt
    print('\nTime used:', str(tend - tstart))

    res_df = pd.DataFrame()  # 新建一个空的pandas列表,内容由字典填充

    # 每个策略实例的结果以列表的形式保存在列表中。优化运行模式下，返回值是列表的列表,内列表只含一个元素，即策略实例
    for i, x in enumerate(results_opt):
        trade = x[0].analyzers.tradeAnalyzer.get_analysis()  # 交易分析引用
        my_trade = x[0].analyzers.my_tradeAnalyzer.get_analysis()  # 交易分析引用
        returns = x[0].analyzers.returns.get_analysis()  # 回报分析引用
        pyFolio = x[0].analyzers.pyFolio.get_analysis()  # pyFolio分析引用
        drawdown = x[0].analyzers.drawdown.get_analysis()  # 回撤分析引用
        sharpe = x[0].analyzers.sharpe.get_analysis()  # sharpe分析引用
        timeReturn = x[0].analyzers.timeReturn.get_analysis()  # timeReturn 分析引用

        if trade['total']['total'] == 0:
            continue  # 忽略交易次数为0 的数据

        returns_rort_ = returns['rtot'] * 100  # 总复合回报
        pyFolio_returns_ = sum(pyFolio['returns'].values()) * 100  # pyFolio总复合回报
        returns_rnorm100_ = returns['rnorm100'] * 100  # 年化归一化回报
        trade_won_ = (trade.get('won')['total'])  # 总盈利次数
        trade_lost_ = (trade.get('lost')['total'])  # 总亏损次数
        trade_total_ = trade['total']['total']  # 交易次数
        trade_win_rate = (trade_won_ / trade_total_) * 100  # 胜率
        drawdown_ = drawdown.get('max').get('drawdown', 0)  # 最大回撤
        sharpe_ = sharpe.get('sharperatio', 0)  # 夏普率
        trade_pnl_total_ = (trade.get('pnl').get('gross').get('total', 0))  # 总盈亏
        trade_pnl_net_ = (trade.get('pnl')['net']['total'])  # (净盈亏)总盈亏-手续费
        trade_pnl_comm_ = abs(trade_pnl_total_ - trade_pnl_net_)  # 手续费
        trade_comm_net_p = ((trade_pnl_comm_ / trade_pnl_net_) * 100) if trade_pnl_net_ != 0 else 0  # 手续费占比净盈亏百分比

        row = dict()
        for pk, pv in kwargs['G_P_PARAM'].items():  # 遍历参数列表,将需要优化的参数名和值添加到字典里
            if type(pv) == list or type(pv) == range:
                row[pk] = x[0].p._get(pk)
        row.update({
            'pwl': x[0].p.pwl,  # 参数
            'pw': x[0].p.pw,  # 参数
            'pl': x[0].p.pl,  # 参数
            'total': '{:0>4d}'.format(trade_total_),  # 交易次数
            'sharpe': sharpe_,  # 夏普率
            'rtot%': returns_rort_,  # 总复合回报
            'py_rt%': pyFolio_returns_,  # pyFolio总复合回报
            'won%': trade_win_rate,  # 胜率
            'rnorm%': returns_rnorm100_,  # 年化归一化回报
            'maxDD%': round(drawdown_, 3),  # 最大回撤
            'comm%': round(trade_comm_net_p, 3),  # 手续费占比净盈亏百分比
            'pnl_net': '{:8.2f}'.format(trade_pnl_net_),  # 总盈亏余额含手续费
        })
        for k, v in timeReturn.items():  # 把timeReturn统计的月度或年度复合回报添加在后面 # 月度或年度复合回报,由参数timeframe=bt.TimeFrame.Months控制
            row['{:%Y-%m}'.format(k)] = v
        res_df = res_df.append(row, ignore_index=True)
    res_df.loc[:, :'total'] = res_df.loc[:, :'total'].astype(int)  # 转换指定total列前的数据类型
    if bool(res_df.empty):
        print('回测数据不存在')
    if not ('pw' in opts_kwargs or 'pl' in opts_kwargs):
        # 删除未优化的参数列
        res_df = res_df.drop(labels=['pw', 'pl'], axis=1)
    if not ('pwl' in opts_kwargs):
        res_df = res_df.drop(labels=['pwl'], axis=1)

    res_df = res_df.dropna(how='any', axis=0)  # 删除所有带NaN的行
    # res_df[['pw', 'pl', 'total']] = res_df[['pw', 'pl', 'total']].apply(pd.to_numeric, downcast='signed', axis=1)  # 转换指定列数据类型为整形
    res_df[['rtot%', 'pnl_net']] = res_df[['rtot%', 'pnl_net']].apply(pd.to_numeric, errors='ignore', axis=1)

    res_df.sort_values(by=['pnl_net', 'rtot%'], ascending=False, inplace=True)  # 按累计盈亏和总复合回报排序
    # res_df.reset_index(drop=True, inplace=True)  # 重设索引id,删除旧索引,替换新索引
    res_df.index.name = 'id'  # 设置索引名称
    pd.set_option('precision', 3)  # 显示小数点后的位数
    pd.set_option('display.min_rows', 300)  # 确定显示的部分有多少行
    pd.set_option('display.max_rows', 300)  # 确定显示的部分有多少行
    pd.set_option('display.max_columns', 20)  # 确定显示的部分有多少列
    pd.set_option('display.float_format', '{:,.2f}'.format)  # 逗号格式化大值数字,设置数据精度
    pd.set_option('expand_frame_repr', False)  # True就是可以换行显示。设置成False的时候不允许换行
    pd.set_option('display.width', 180)  # 设置打印宽度(**重要**)

    res_df[res_df.columns[:5]].info()  # 显示前几列的数据类型
    if not res_df.empty:
        result_one = results_opt[res_df.index[0]]  # 返回第一个参数测试结果
        glv.set('G_RESULT_ONE', result_one)
    opts_path = kwargs['opts_path']
    print(opts_path)  # 打印文件路径
    print(res_df.loc[:, :'pnl_net'])  # 显示 开始列到'pnl_net'列的 参数优化结果
    res_df.to_csv(opts_path, sep='\t', float_format='%.2f')  # 保存分析数据到文件
    print("--------------- 优化结束 -----------------")
    pass


# """回测"""
def backing(cerebro):
    """回测"""
    kwargs = glv.get('kwargs')
    test_kwargs = kwargs['G_P_PARAM']  # 回测参数
    log_logger = None
    if kwargs.get('G_P_LOG_PRINT') or kwargs.get('G_P_LOG_FILE'):
        log_logger = logger_config(log_path=(kwargs.get('file_name') + '_log.txt'), log_name='交易日志')
    # 回测日志参数
    log_kwargs = dict(
        log_logger=log_logger,
        log_print=kwargs.get('G_P_LOG_PRINT', False),  # 是否打印日志到控制台
        log_save=kwargs.get('G_P_LOG_FILE', False),  # 是否保存日志到文件
    )
    print('test_kwargs:', test_kwargs)  # 回测参数
    print('log_kwargs:', log_kwargs)  # 日志参数
    # clock the start of the process
    tstart = time.perf_counter()

    # 添加策略和参数
    from MyStrategy import MyStrategy  # 引用策略
    cerebro.addstrategy(MyStrategy, log_kwargs=log_kwargs, **test_kwargs)
    # 添加观测器,绘制时显示
    cerebro.addobserver(bt.observers.Broker)
    cerebro.addobserver(bt.observers.Trades)
    cerebro.addobserver(bt.observers.BuySell)
    # cerebro.addobserver(bt.observers.TimeReturn)
    cerebro.addobserver(bt.observers.DrawDown)
    # 添加分析指标
    cerebro.addanalyzer(bt.analyzers.AnnualReturn, _name='annualReturn')  # 返回年初至年末的年度收益率
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawDown')  # 计算最大回撤相关指标
    cerebro.addanalyzer(bt.analyzers.Returns, _name='returns', tann=252)  # 计算年化收益：日度收益
    cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyFolio')  # 添加PyFolio
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpeRatio', timeframe=bt.TimeFrame.Days, annualize=True, riskfreerate=0)  # 计算年化夏普比率：日度收益
    cerebro.addanalyzer(bt.analyzers.SharpeRatio_A, _name='sharpeRatio_A')
    cerebro.addanalyzer(bt.analyzers.TimeReturn, _name='timeReturn', )  # 添加收益率时序
    from my_tradeanalyzer import My_TradeAnalyzer  # 自定义分析器
    cerebro.addanalyzer(My_TradeAnalyzer, _name="my_tradeAnalyzer")  # 自定义平仓交易的统计信息

    # 引擎运行前打印期出资金
    print('组合期初资金: %s' % format(cerebro.broker.getvalue(), ',.2f'))
    # 启动回测
    result_one = cerebro.run()
    glv.set('G_RESULT_ONE', result_one)
    # clock the end of the process
    tend = time.perf_counter()
    # print out the result_one
    print('Time used:', str(tend - tstart))
    print("\n--------------- 回测结果 -----------------")
    # 引擎运行后打期末资金
    print('组合期末资金: %s' % format(cerebro.broker.getvalue(), ',.2f'), end='')
    # 回测结果提取分析
    result_analysis(result_one=result_one)
    # 在记录日志之后移除句柄
    if kwargs.get('G_P_LOG_PRINT') or kwargs.get('G_P_LOG_FILE'):
        log_logger.streamHandler.close()
        log_logger.fileHandler.close()
        logging.shutdown()  # 关闭日志系统
    pass


# """回测结果提取分析"""
def result_analysis(result_one):
    """回测结果提取分析"""
    # 提取结果
    print("\n--------------- 累计收益率 -----------------")
    returns = result_one[0].analyzers.returns.get_analysis()
    pyFolio = result_one[0].analyzers.pyFolio.get_analysis()
    print(" Cumulative Return: {:.2f}".format(returns['rtot'] * 100))
    print(" pyFolio Cumulative Return%: {:,.2f}".format(sum(pyFolio['returns'].values()) * 100))
    print("\n--------------- 年度收益率 -----------------")
    annualReturn = result_one[0].analyzers.annualReturn.get_analysis()
    # print(' 收益率k,v', get_analysis.items())
    for k, v in annualReturn.items():
        print((" [{:},{:.2f}]" if isinstance(v, float) else " [{:},{:}]").format(k, v), end='')
    print("\n--------------- 最大回撤 -----------------")
    drawDown = result_one[0].analyzers.drawDown.get_analysis()
    for k, v in drawDown.items():
        if not isinstance(v, dict):
            t = (" [{:},{:.2f}]" if isinstance(v, float) else " [{:},{:}]").format(k, v)
            print(t, end='')
        else:
            for kk, vv in v.items():
                t = (" [{:},{:.2f}]" if isinstance(vv, float) else " [{:},{:}]").format(kk, vv)
                print(t, end='')
    print("\n--------------- 年化收益：日度收益 -----------------")
    an_returns = result_one[0].analyzers.returns.get_analysis()
    for k, v in an_returns.items():
        print((" [{:},{:.2f}]" if isinstance(v, float) else " [{:},{:}]").format(k, v), end='')
    print("\n--------------- 年化夏普比率：日度收益 -----------------")
    sharpeRatio = (result_one[0].analyzers.sharpeRatio.get_analysis())
    for k, v in sharpeRatio.items():
        print((" [{:},{:.2f}]" if isinstance(v, float) else " [{:},{:}]").format(k, v), end='')
    print("\n--------------- SharpeRatio_A -----------------")
    sharpeRatio_A = result_one[0].analyzers.sharpeRatio_A.get_analysis()
    for k, v in sharpeRatio_A.items():
        print((" [{:},{:.2f}]" if isinstance(v, float) else " [{:},{:}]").format(k, v), end='')
    print("\n--------------- test end -----------------")
    pass


# """pyfolio分析结果绘图"""
def pyplot(result_one):
    """pyfolio分析结果绘图"""
    # 结合pyfolio工具 计算并绘制收益评价指标
    import pyfolio as pf
    # 绘制图形
    import matplotlib.pyplot as plt
    plt.rcParams['axes.unicode_minus'] = False  # 用来正常显示负号
    import matplotlib.ticker as ticker  # 导入设置坐标轴的模块
    # plt.style.use('seaborn')
    plt.style.use('dark_background')

    # 提取收益序列
    pnl = pd.Series(result_one[0].analyzers.timeReturn.get_analysis())
    # 计算累计收益
    cumulative = (pnl + 1).cumprod()
    # 计算回撤序列
    max_return = cumulative.cummax()
    drawdown = (cumulative - max_return) / max_return
    # 按年统计收益指标
    perf_stats_year = pnl.groupby(pnl.index.to_period('y')).apply(lambda data: pf.timeseries.perf_stats(data)).unstack()
    # 统计所有时间段的收益指标
    perf_stats_all = pf.timeseries.perf_stats(pnl).to_frame(name='all')
    perf_stats = pd.concat([perf_stats_year, perf_stats_all.T], axis=0)
    perf_stats_ = round(perf_stats, 4).reset_index()

    fig, (ax0, ax1) = plt.subplots(2, 1, gridspec_kw={'height_ratios': [3, 4]}, figsize=(24, 16))
    cols_names = ['date', 'Annual\nreturn', 'Cumulative\nreturns', 'Annual\n volatility',
                  'Sharpe\nratio', 'Calmar\nratio', 'Stability', 'Max\ndrawdown',
                  'Omega\nratio', 'Sortino\nratio', 'Skew', 'Kurtosis', 'Tail\nratio',
                  'Daily value\nat risk']
    # 绘制表格
    ax0.set_axis_off()  # 除去坐标轴
    table = ax0.table(cellText=perf_stats_.values,
                      bbox=(0, 0, 1, 1),  # 设置表格位置， (x0, y0, width, height)
                      rowLoc='left',  # 行标题居中
                      cellLoc='left',
                      colLabels=cols_names,  # 设置列标题
                      colLoc='left',  # 列标题居中
                      edges='open'  # 不显示表格边框
                      )
    table.set_fontsize(13)

    # 绘制累计收益曲线
    ax2 = ax1.twinx()
    ax1.yaxis.set_ticks_position('right')  # 将回撤曲线的 y 轴移至右侧
    ax2.yaxis.set_ticks_position('left')  # 将累计收益曲线的 y 轴移至左侧
    # 绘制回撤曲线
    drawdown.plot.area(ax=ax1, label='drawdown (right)', rot=0, alpha=0.3, fontsize=13, grid=False)
    # 绘制累计收益曲线
    cumulative.plot(ax=ax2, color='#F1C40F', lw=2.0, label='cumret (left)', rot=0, fontsize=13, grid=False)
    # 不然 x 轴留有空白
    ax2.set_xbound(lower=cumulative.index.min(), upper=cumulative.index.max())
    # 主轴定位器：每 5 个月显示一个日期：根据具体天数来做排版
    ax2.xaxis.set_major_locator(ticker.MultipleLocator(120))
    # 同时绘制双轴的图例
    h1, l1 = ax1.get_legend_handles_labels()
    h2, l2 = ax2.get_legend_handles_labels()

    plt.legend(h1 + h2, l1 + l2, fontsize=12, loc='upper left', ncol=1)
    fig.tight_layout()  # 规整排版
    plt.show()
    # 结合pyfolio工具 计算并绘制收益评价指标
    pass


# """quantstats分析报告html"""
def quantstats_reports_html(result_one):
    # 使用quantstats 分析工具并保存到HTML文件
    kwargs = glv.get('kwargs')
    portfolio_stats = result_one[0].analyzers.getbyname('pyFolio')
    returns, positions, transactions, gross_lev = portfolio_stats.get_pf_items()
    returns.index = returns.index.tz_convert(None)
    param_one = dict()
    for pk, pv in kwargs.get('G_P_PARAM').items():  # 遍历参数列表,将优化的参数名和值添加到字典里
        param_one[pk] = result_one[0].p._get(pk)
    # 将分析指标保存到HTML文件
    title_report = ('{:}-{:} st={:%Y-%m-%d} end={:%Y-%m-%d} pam={:} dt={:%H.%M.%S}'  # 优化结果网页标题
        .format(
        (kwargs.get('G_FILE_PATH').split('\\')[1].split('-')[0]),  # 合约名称
        str(kwargs.get('G_DT_COMPRESSION')) + (kwargs.get('G_DT_TIMEFRAME')[:1]),  # K线周期
        datetime.fromisoformat(kwargs.get('G_DT_START')), datetime.fromisoformat(kwargs.get('G_DT_END')),  # 开始结束时间
        str(param_one).replace('range', '').replace('datetime.date', '')  # 替换参数字典中的字符串
        .translate(str.maketrans({' ': '', '\'': '', ':': ''})),  # 替换参数字典中的字符
        datetime.now(),
    ))
    import quantstats
    quantstats.reports.html(returns, output='stats.html', title=title_report)
    print(title_report)
    print("quantstats 测试分析结果已保存至目录所在文件 quantstats-tearsheet.html")
    # 使用quantstats 分析工具并保存到HTML文件
    pass


# """日志配置"""
def logger_config(log_path, log_name):
    """
    配置log
    :param log_path: 输出log路径
    :param log_name: 记录中name，可随意
    :return:
    """
    '''
    logger是日志对象，handler是流处理器，console是控制台输出（没有console也可以，将不会在控制台输出，会在日志文件中输出）
    '''
    # 获取logger对象,取名
    logger = logging.getLogger(log_name)
    # 输出DEBUG及以上级别的信息，针对所有输出的第一层过滤
    logger.setLevel(level=logging.DEBUG)
    # 获取文件日志句柄并设置日志级别，第二层过滤
    if log_path:
        logger.fileHandler = logging.FileHandler(log_path, encoding='UTF-8')
        logger.fileHandler.setLevel(logging.INFO)  # 设置文件日志输出级别 设置 INFO 时debug信息将不显示
        # 生成并设置文件日志格式
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        logger.fileHandler.setFormatter(formatter)
        logger.addHandler(logger.fileHandler)  # 为logger对象添加句柄
    # console相当于控制台输出，handler文件输出。获取流句柄并设置日志级别，第二层过滤
    logger.streamHandler = logging.StreamHandler(stream=sys.stdout)  # stream=sys.stdout 不设置日志字体颜色是红色
    # logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
    logger.streamHandler.setLevel(logging.INFO)  # 设置控制台显示级别 日志级别： debug < info < warning < error < critical
    logger.addHandler(logger.streamHandler)  # 为logger对象添加句柄
    return logger



In [3]:

kwargs = dict()
# kwargs['G_FILE_PATH'] = "datas\\ZJIF13-5m-20100416-20220427.csv"
# kwargs['G_DT_START'], kwargs['G_DT_END'] = '2013-01-01', '2016-02-01'
kwargs['G_FILE_PATH'] = "datas\\DQC13-5m-20120709-20220330.csv"
kwargs['G_DT_START'], kwargs['G_DT_END'] = '2013-01-01', '2014-02-01'
# kwargs['G_DT_START'], kwargs['G_DT_END'] = '2017-01-01', '2022-02-01'
# kwargs['G_DT_START'], kwargs['G_DT_END'] = '2015-01-01', '2022-02-01'
# kwargs['G_FILE_PATH'] = "datas\\ZQCF13-5m-20121224-20220415.csv"
# kwargs['G_DT_START'], kwargs['G_DT_END'] = '2013-01-01', '2022-02-01'
# kwargs['G_FILE_PATH'] = "datas\\SQRB13-5m-20121224-20220330.csv"
# kwargs['G_FILE_PATH'] = "datas\\SQRBOC-5m-20090327-20211231.csv"
# kwargs['G_DT_START'], kwargs['G_DT_END'] = '2009-04-01', '2013-02-01'
# kwargs['G_FILE_PATH'] = "datas\\SQCU13-5m-20150625-20220427.csv"
# kwargs['G_DT_START'], kwargs['G_DT_END'] = '2015-06-25', '2019-02-01'

kwargs['G_DT_DTFORMAT'] = '%Y-%m-%d %H:%M:%S'
kwargs['G_CONT_ID'] = (re.findall(r"datas\\([\w]{2,6})", kwargs['G_FILE_PATH'])[0])  # 从文件名中提取2-6个字符由字母数字_组成的合约ID
kwargs['G_DT_TIMEFRAME'] = 'minutes'  # 重采样更大时间周期 choices=['minutes', 'daily', 'weekly', 'monthly']
kwargs['G_DT_COMPRESSION'] = 5  # 合成bar的周期数
kwargs['G_INI_CASH'] = 10000 * 10  # 初始金额
kwargs['G_PLOT'] = False  # 是否绘图,可提供绘图参数:'style="candle"'
kwargs['G_QUANTSTATS'] = True  # 是否使用 quantstats 分析测试结果
kwargs['G_P_LOG_FILE'] = False  # 是否输出日志到文件
kwargs['G_P_LOG_PRINT'] = False  # 是否输出日志到控制台
kwargs['G_OPTS'] = 1  # 是否参数调优
kwargs['G_OPTS_IS_USE'] = 0  # 是否使用上次优化结果
G_P_PW = [10, True, 2, 11, 1]  # 参数[默认值,是否优化,最小值,最大值,步长]
G_P_PL = [10, False, 2, 11, 1]  # 参数[默认值,是否优化,最小值,最大值,步长]
G_P_PWL = [10, False, 2, 5, 1]  # 参数[默认值,是否优化,最小值,最大值,步长]
G_P_OJK = [1, False, 1, 3, 1]  # 参数[默认值,是否优化,最小值,最大值,步长]
G_P_PO = [0, False, 0, 5, 1]  # 参数[默认值,是否优化,最小值,最大值,步长]
G_P_PP = [0, False, 0, 5, 1]  # 参数[默认值,是否优化,最小值,最大值,步长]
G_P_KPR = [True, {  # 关键价格[是否启用, {日期1: dict({'kps':[价格区间1]},日期2: {'kps':[价格区间2]},})]
    datetime(2013, 10, 30).date(): {'kps': [2448, 2323], },
    datetime(2015, 4, 30).date(): {'kps': [2323, 2450], },
}]
kwargs['G_P_PARAM'] = {
    'pw': (range(G_P_PW[2], G_P_PW[3], G_P_PW[4]) if kwargs['G_OPTS'] and G_P_PW[1] else G_P_PW[0]),
    'pl': (range(G_P_PL[2], G_P_PL[3], G_P_PL[4]) if kwargs['G_OPTS'] and G_P_PL[1] else G_P_PL[0]),
    # 'pwl': (range(G_P_PWL[2], G_P_PWL[3], G_P_PWL[4]) if G_OPTS and G_P_PWL[1] else G_P_PWL[0]),
    # 'ojk': (range(G_P_OJK[2], G_P_OJK[3], G_P_OJK[4]) if G_OPTS and G_P_OJK[1] else G_P_OJK[0]),
    # 'po': (range(G_P_PO[2], G_P_PO[3], G_P_PO[4]) if G_OPTS and G_P_PO[1] else G_P_PO[0]),
    # 'pp': (range(G_P_PP[2], G_P_PP[3], G_P_PP[4]) if G_OPTS and G_P_PP[1] else G_P_PP[0]),
    # 'kpr': G_P_KPR[1] if G_P_KPR[0] else None

}

"""-------主函数---------"""
if __name__ == '__main__':
    glv.set('kwargs', kwargs)
    runstrat()

run time: 2022-05-10 15:43:08.595942
dt_format: %Y-%m-%d %H:%M:%S dt_start: 2013-01-01 dt_end: 2014-02-01
D:\study\python-demo\myQuant\bt_backtrader\datas\DQC13-5m-20120709-20220330.hdf
opts_kwargs: {'pw': range(2, 11), 'pl': 10}
--------------- 参数优化结果 -----------------

Time used: 6.9400603000000025
<class 'pandas.core.frame.DataFrame'>
Int64Index: 9 entries, 7 to 0
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   pw      9 non-null      int32  
 1   pl      9 non-null      int32  
 2   total   9 non-null      int32  
 3   sharpe  9 non-null      float64
 4   rtot%   9 non-null      float64
dtypes: float64(2), int32(3)
memory usage: 324.0 bytes
datas\DQC13_5m_2013-01-01_2014-02-01_{pw(2,11),pl10}_opt.csv
    pw  pl  total  sharpe  rtot%  py_rt%  won%  rnorm%  maxDD%  comm%    pnl_net
id                                                                              
7    9  10     11   -1.96  -8.96   -8.56  0.00   -0.08   12.5

In [12]:

result_one_id = 7
G_RESULTS_OPT = glv.get('G_RESULTS_OPT')
# 使用quantstats 分析工具并保存到HTML文件
G_COUNT += 1
results_opt = G_RESULTS_OPT
# result_one_id = results_opt.index(G_RESULT_ONE)
# result_one_id = result_one_id if id < 0 else id
result_one = results_opt[result_one_id]
# 使用quantstats 分析工具并保存到HTML文件
import quantstats

quantstats_reports_html(result_one=result_one)


DQC13-5m st=2013-01-01 end=2014-02-01 pam={pw9,pl10} dt=01.08.50
quantstats 测试分析结果已保存至目录所在文件 quantstats-tearsheet.html


In [19]:

kwargs = dict()
# kwargs['G_FILE_PATH'] = "datas\\ZJIF13-5m-20100416-20220427.csv"
# kwargs['G_DT_START'], kwargs['G_DT_END'] = '2013-01-01', '2016-02-01'
kwargs['G_FILE_PATH'] = "datas\\DQC13-5m-20120709-20220330.csv"
kwargs['G_DT_START'], kwargs['G_DT_END'] = '2013-01-01', '2014-02-01'
# kwargs['G_DT_START'], kwargs['G_DT_END'] = '2017-01-01', '2022-02-01'
# kwargs['G_DT_START'], kwargs['G_DT_END'] = '2015-01-01', '2022-02-01'
# kwargs['G_FILE_PATH'] = "datas\\ZQCF13-5m-20121224-20220415.csv"
# kwargs['G_DT_START'], kwargs['G_DT_END'] = '2013-01-01', '2022-02-01'
# kwargs['G_FILE_PATH'] = "datas\\SQRB13-5m-20121224-20220330.csv"
# kwargs['G_FILE_PATH'] = "datas\\SQRBOC-5m-20090327-20211231.csv"
# kwargs['G_DT_START'], kwargs['G_DT_END'] = '2009-04-01', '2013-02-01'
# kwargs['G_FILE_PATH'] = "datas\\SQCU13-5m-20150625-20220427.csv"
# kwargs['G_DT_START'], kwargs['G_DT_END'] = '2015-06-25', '2019-02-01'

kwargs['G_DT_DTFORMAT'] = '%Y-%m-%d %H:%M:%S'
kwargs['G_COMM'] = 'comm_' + (re.findall(r"datas\\([\D]{2,4})", kwargs['G_FILE_PATH'])[0]).lower()  # 合约信息,提前预设好 保证金,手续费率,合约乘数等
kwargs['G_DT_TIMEFRAME'] = 'minutes'  # 重采样更大时间周期 choices=['minutes', 'daily', 'weekly', 'monthly']
kwargs['G_DT_COMPRESSION'] = 5  # 合成bar的周期数
kwargs['G_INI_CASH'] = 10000 * 10  # 初始金额
kwargs['G_PLOT'] = False  # 是否绘图,可提供绘图参数:'style="candle"'
kwargs['G_QUANTSTATS'] = True  # 是否使用 quantstats 分析测试结果
kwargs['G_P_LOG_FILE'] = False  # 是否输出日志到文件
kwargs['G_P_LOG_PRINT'] = False  # 是否输出日志到控制台
kwargs['G_OPTS'] = 1  # 是否参数调优
kwargs['G_OPTS_IS_USE'] = 0  # 是否使用上次优化结果
G_P_PW = [10, True, 2, 11, 1]  # 参数[默认值,是否优化,最小值,最大值,步长]
G_P_PL = [10, False, 2, 11, 1]  # 参数[默认值,是否优化,最小值,最大值,步长]
G_P_PWL = [10, False, 2, 5, 1]  # 参数[默认值,是否优化,最小值,最大值,步长]
G_P_OJK = [1, False, 1, 3, 1]  # 参数[默认值,是否优化,最小值,最大值,步长]
G_P_PO = [0, False, 0, 5, 1]  # 参数[默认值,是否优化,最小值,最大值,步长]
G_P_PP = [0, False, 0, 5, 1]  # 参数[默认值,是否优化,最小值,最大值,步长]
G_P_KPR = [True, {  # 关键价格[是否启用, {日期1: dict({'kps':[价格区间1]},日期2: {'kps':[价格区间2]},})]
    datetime(2013, 10, 30).date(): {'kps': [2448, 2323], },
    datetime(2015, 4, 30).date(): {'kps': [2323, 2450], },
}]
kwargs['G_P_PARAM'] = {
    'pw': (range(G_P_PW[2], G_P_PW[3], G_P_PW[4]) if kwargs['G_OPTS'] and G_P_PW[1] else G_P_PW[0]),
    'pl': (range(G_P_PL[2], G_P_PL[3], G_P_PL[4]) if kwargs['G_OPTS'] and G_P_PL[1] else G_P_PL[0]),
    # 'pwl': (range(G_P_PWL[2], G_P_PWL[3], G_P_PWL[4]) if G_OPTS and G_P_PWL[1] else G_P_PWL[0]),
    # 'ojk': (range(G_P_OJK[2], G_P_OJK[3], G_P_OJK[4]) if G_OPTS and G_P_OJK[1] else G_P_OJK[0]),
    # 'po': (range(G_P_PO[2], G_P_PO[3], G_P_PO[4]) if G_OPTS and G_P_PO[1] else G_P_PO[0]),
    # 'pp': (range(G_P_PP[2], G_P_PP[3], G_P_PP[4]) if G_OPTS and G_P_PP[1] else G_P_PP[0]),
    # 'kpr': G_P_KPR[1] if G_P_KPR[0] else None

}

"""-------主函数---------"""
if __name__ == '__main__':
    glv.set('kwargs', kwargs)
    runstrat()

D:\study\python-demo\myQuant\bt_backtrader\datas\SQCU13-5m-20150625-20220427.csv
dt_start: 2019-06-25 dt_end: 2022-02-01
opts_kwargs: {'pw': 6, 'pl': 5, 'ok': 30, 'po': range(-10, 11), 'pp': range(-10, 11)}
--------------- 参数优化结果 -----------------
Time used: 975.3814560000001
<class 'pandas.core.frame.DataFrame'>
Int64Index: 441 entries, 30 to 424
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype
---  ------  --------------  -----
 0   po      441 non-null    int32
 1   pp      441 non-null    int32
 2   pw      441 non-null    int32
 3   pl      441 non-null    int32
 4   total   441 non-null    int32
dtypes: int32(5)
memory usage: 12.1 KB
datas\SQCU13_5m_2019-06-25_2022-02-01_{pw6,pl5,ok30,po(-10,11),pp(-10,11)}_opt.csv
     po  pp  pw  pl  total  sharpe   rtot%   py_rt%  won%  rnorm%  maxDD%  comm%    pnl_net
id                                                                                         
30   -9  -1   6   5    440    0.98  189.62   225.90 38.18    0.34  

In [8]:
# 回测分析保存到文件
if kwargs['G_QUANTSTATS']:
    # 使用quantstats 分析工具并保存到HTML文件
    kwargs = glv.get('kwargs')
    portfolio_stats = result_one[0].analyzers.getbyname('pyFolio')
    returns, positions, transactions, gross_lev = portfolio_stats.get_pf_items()
    returns.index = returns.index.tz_convert(None)
    param_one = dict()
    for pk, pv in kwargs.get('G_P_PARAM').items():  # 遍历参数列表,将优化的参数名和值添加到字典里
        param_one[pk] = result_one[0].p._get(pk)

    # 将分析指标保存到HTML文件
    title_report = ('{:}-{:} st={:%Y-%m-%d} end={:%Y-%m-%d} pam={:} dt={:%H.%M.%S}'  # 优化结果网页标题
        .format(
        (kwargs.get('G_FILE_PATH').split('\\')[1].split('-')[0]),  # 合约名称
        str(kwargs.get('G_DT_COMPRESSION')) + (kwargs.get('G_DT_TIMEFRAME')[:1]),  #  K线周期
        datetime.fromisoformat(kwargs.get('G_DT_START')), datetime.fromisoformat(kwargs.get('G_DT_END')),  # 开始结束时间
        str(param_one).replace('range', '').replace('datetime.date', '')  # 替换参数字典中的字符串
        .translate(str.maketrans({' ': '', '\'': '', ':': ''})),  # 替换参数字典中的字符
        datetime.now(),
    ))
    import quantstats

    quantstats.reports.html(returns, output='stats.html', title=title_report)
    print(title_report)
    print("quantstats 测试分析结果已保存至目录所在文件 quantstats-tearsheet.html")
    # 使用quantstats 分析工具并保存到HTML文件
    pass

DQC13-5m st=2013-01-01 end=2019-02-01 pam={pw 9,pl 2,kpr {2013-01-31 2448,2013-10-31 2323,2015-03-31 2500}} dt=16.29.31
quantstats 测试分析结果已保存至目录所在文件 quantstats-tearsheet.html


In [None]:
global G_RESULTS_OPT, G_COUNT  # 申明要使用全局变量
results_opt = glv.get('G_RESULTS_OPT').copy()

for l1 in results_opt[:2]:
    print(list(l1)[0].analyzers.returns.rets)
for l2 in l1:
    # print(dir(l2))
    rets = l2.analyzers.returns.rets
print(' {:}, {:} '.format(l2.p.rpp, l2.p.spp)
      + ' {:.4f} '.format((l2.analyzers.returns.rets.get('rtot')))
      + ' {:} '.format(list(rets.keys())[0])
      + ' {:.4f} '.format(list(rets.values())[0])
      )