In [1]:
%matplotlib inline
%load_ext zipline

**注意**，使用`data.history`时，`price`会前向填充，但其他字段`("open", "high", "low", "close", "volume")`则不会。

In [2]:
%%zipline --start 2018-1-1 --end 2018-4-27

from zipline import get_calendar
from logbook import Logger
from zipline.api import (symbols, time_rules, schedule_function, date_rules,
                         attach_pipeline, pipeline_output,
                         order_target_percent, get_datetime, calendars)
from zipline.pipeline import Pipeline
from zipline.pipeline.factors import DailyReturns, Returns
from zipline.pipeline.builtin import IsResumed, SuccessiveSuspensionDays

import datetime
import pandas as pd
import numpy as np

log = Logger('回测')
day = get_calendar('SZSH').day
MIN_DAYS = 30 # 最少连续停牌交易日数量

def make_pipeline():

    return Pipeline(
        columns={
            'pct': DailyReturns(),
            'days': SuccessiveSuspensionDays(include=True)
        },
        screen=IsResumed())


def initialize(context):
    # 保存复牌股票清单，主键为asset，值为复牌后的天数
    context.resumed_assets = {}
    context.output = None
    context.report = None
    my_pipe = make_pipeline()
    attach_pipeline(my_pipe, 'my_pipeline')
    # 每天收盘后，处理报告
    schedule_function(
        handle_report,
        date_rules.every_day(),
        time_rules.market_close(minutes=1),
        calendar=calendars.US_EQUITIES)


def handle_data(context, data):
    #print(context.resumed_assets)
    pass


def update_roc(context, data, num):
    """更新roc列数值"""
    # 找出复牌后num天的股票
    stocks = []
    for stock, days in context.resumed_assets.items():
        if days == num:
            stocks.append(stock)
    # 不存在符合条件的，返回即可
    if len(stocks) == 0:
        return
    # 要想获得完整数据，需要多加1个交易日
    history = data.history(
        stocks, fields="close", bar_count=num + 1, frequency="1d")
    # 取最后一行，转置为以股票作为index
    roc = history.pct_change(num).iloc[-1, :].T
    col_name = 'afer_{}_days'.format(num)
    context.report.loc[roc.index, col_name] = roc.values


def update_report(context, data, df):
    """更新复牌后交易天数,整理报告"""
    # 添加新增复牌股票到上下文字典
    cond = df['days'] >= MIN_DAYS
    gt_30_stocks = df[cond].index
    
    # 如不在字典中，添加并赋值天数为0
    for stock in gt_30_stocks:
        if stock not in context.resumed_assets.keys():
            context.resumed_assets[stock] = 0

    # 更新字典中股票复牌后成交天数
    for stock in context.resumed_assets.keys():
        days = context.resumed_assets[stock] + 1
        context.resumed_assets[stock] = days
    
    to_add = df.loc[gt_30_stocks]
    # 如原始数据非空, 增加2列
    if not to_add.empty:
        to_add['afer_5_days'] = np.nan
        to_add['afer_10_days'] = np.nan
        # 添加停牌分析报告
        if context.report is None:
            # 如空，则直接赋值
            context.report = to_add
        else:
            # 如已有数据，行合并
            # 防止重复，只能添加新增加的股票，已经存在的无需添加
            # 这样，如果股票复牌后再次停牌，则只会统计前次停牌的数据，忽略后续停牌
            old_stocks = context.report.index
            new_add_stocks = to_add.index.difference(old_stocks)
            context.report = pd.concat([context.report, to_add.loc[new_add_stocks]], verify_integrity=True)

    # 更新复牌5天股票收益率
    update_roc(context, data, 5)
    # 更新复牌10天股票收益率
    update_roc(context, data, 10)


def handle_report(context, data):
    """处理分析报告"""
    output = pipeline_output('my_pipeline')
    d = get_datetime('Asia/Shanghai')
    if output is not None:
        #log.info('日期:{},复牌家数:{}'.format(d.date(), output.shape[0]))
        output['begin_date'] = (d - day).date()
        update_report(context, data, output)
        # 重复保存，只需要最终结果
        context.report.to_pickle('report.pkl')

ValueError: SQLite file '/home/ldf/.zipline/data/cndaily/2018-05-02T14;00;03.669934/assets-6.sqlite' doesn't exist.

In [None]:
import pandas as pd

In [None]:
df = pd.read_pickle('report.pkl')

In [None]:
df.sort_values('begin_date')

In [None]:
df.plot(y='pct',kind='hist',figsize=(8,6))

In [None]:
df.plot(y='afer_5_days',kind='hist',figsize=(8,6))

In [None]:
df.plot(y='afer_10_days',kind='hist',figsize=(8,6))