# Backtesting with Zipline - Using the Pipeline API with ML-Driven Signals

## Loading Libraries

In [3]:
# Numerical Computing
import numpy as np

# Data Manipulation
import pandas as pd
import pandas_datareader.data as web

# Path, Time & Collections
from time import time
from pathlib import Path
from collections import defaultdict


# Data Visualization
import seaborn as sns
import matplotlib.pyplot as plt

# Logger
from logbook import Logger, StderrHandler, INFO

# ZipLine
from zipline import run_algorithm
from zipline.api import (attach_pipeline, pipeline_output,
                         date_rules, time_rules, record,
                         schedule_function, commission, slippage,
                         set_slippage, set_commission,
                         get_open_orders, cancel_order,
                         order_target, order_target_percent)

from zipline.data import bundles
from zipline.utils.run_algo import load_extensions
from zipline.pipeline import Pipeline, CustomFactor
from zipline.pipeline.data import Column, DataSet
from zipline.pipeline.domain import JP_EQUITIES
from zipline.pipeline.filters import StaticAssets
from zipline.pipeline.loaders.frame import DataFrameLoader

# PyFolio
import pyfolio as pf
from pyfolio.plotting import plot_rolling_returns, plot_rolling_sharpe
from pyfolio.timeseries import forecast_cone_bootstrap

# Warnings
import warnings

In [4]:
np.random.seed(42)

idx = pd.IndexSlice

sns.set_style('whitegrid')

warnings.filterwarnings('ignore')

In [5]:
results_path = Path('results', 'return_predictions')

if not results_path.exists():
    results_path.mkdir(parents=True)

### Loading Zipline Extensions

In [6]:
load_extensions(default=True,
                extensions=[],
                strict=True,
                environ=None)

In [7]:
log_handler = StderrHandler(format_string='[{record.time:%Y-%m-%d %H:%M:%S.%f}]: ' +
                            '{record.level_name}: {record.func_name}: {record.message}',
                            level=INFO)

log_handler.push_application()
log = Logger('Algorithm')

### Algo Params

In [8]:
N_LONGS = 25

N_SHORTS = 25

MIN_POSITIONS = 15

### Loading Data

#### Quandl Wiki Bundel

In [9]:
# bundle_data = bundles.load('stooq')

#### ML Predictions

In [10]:
def load_predictions(bundle):
    t = 1
    df = pd.concat([pd.read_hdf(results_path / 'predictions.h5', 'train/{:02}'.format(t)),
                    pd.read_hdf(results_path / 'predictions.h5', 'test/{:02}'.format(t))])
    df = df[~df.index.duplicated()].drop('y_test', axis=1)
    predictions = df.iloc[:, :5].mean(1).to_frame('predictions')

    tickers = predictions.index.get_level_values('ticker').unique().tolist()

    assets = bundle.asset_finder.lookup_symbols(tickers, as_of_date=None)
    predicted_sids = pd.Int64Index([asset.sid for asset in assets])
    ticker_map = dict(zip(tickers, predicted_sids))

    return (predictions
            .unstack('ticker')
            .rename(columns=ticker_map)
            .predictions
            .tz_localize('UTC')), assets

In [11]:
# predictions, assets = load_predictions(bundle_data)

#### Defining Custom Dataset

In [13]:
class SignalData(DataSet):
    predictions = Column(dtype=float)
    domain = JP_EQUITIES

#### Defining Pipeline Loaders

In [15]:
# signal_loader = {SignalData.predictions:
#                  DataFrameLoader(SignalData.predictions, predictions)}

### Pipeline Setup

#### Custom ML Factor

In [16]:
class MLSignal(CustomFactor):
    """Converting signals to Factor
        so we can rank and filter in Pipeline"""
    inputs = [SignalData.predictions]
    window_length = 1

    def compute(self, today, assets, out, preds):
        out[:] = preds

#### Creating Pipeline

In [17]:
def compute_signals():
    signals = MLSignal()
    predictions = SignalData.predictions.latest
    return Pipeline(columns={
        'longs': signals.top(N_LONGS, mask=signals > 0),
        'shorts': signals.bottom(N_SHORTS, mask=signals < 0)},
        screen=StaticAssets(assets)
    )

### Initializing Algorithm

In [18]:
def initialize(context):
    """
    Called once at the start of the algorithm.
    """
    context.n_longs = N_LONGS
    context.n_shorts = N_SHORTS
    context.min_positions = MIN_POSITIONS
    context.universe = assets
    context.trades = pd.Series()

    set_slippage(slippage.FixedSlippage(spread=0.00))
    set_commission(commission.PerShare(cost=0.05, min_trade_cost=1))

    schedule_function(rebalance,
                      date_rules.every_day(),
                      time_rules.market_open(hours=1, minutes=30))

    schedule_function(record_vars,
                      date_rules.every_day(),
                      time_rules.market_close())

    pipeline = compute_signals()
    attach_pipeline(pipeline, 'signals')

#### Getting Daily Pipeline Results

In [19]:
def before_trading_start(context, data):
    """
    Called every day before market open.
    """
    output = pipeline_output('signals')
    context.trades = (output['longs'].astype(int)
                      .append(output['shorts'].astype(int).mul(-1))
                      .reset_index()
                      .drop_duplicates()
                      .set_index('index')
                      .squeeze())

### Defining Rebalancing Logic

In [20]:
def rebalance(context, data):
    """
    Execute orders according to schedule_function() date & time rules.
    """
    trades = defaultdict(list)
    for symbol, open_orders in get_open_orders().items():
        for open_order in open_orders:
            cancel_order(open_order)

    positions = context.portfolio.positions
    s = (pd.Series({s: v.amount*v.last_sale_price for s,
                    v in positions.items()})
         .sort_values(ascending=False))
    
    for stock, trade in context.trades.items():
        if trade == 0:
            order_target(stock, target=0)
        else:
            trades[trade].append(stock)

    context.longs, context.shorts = len(trades[1]), len(trades[-1])
    if context.longs > context.min_positions and context.shorts > context.min_positions:
        for stock in trades[-1]:
            order_target_percent(stock, -1 / context.shorts)
        for stock in trades[1]:
            order_target_percent(stock, 1 / context.longs)
    else:
        for stock in trades[-1] + trades[1]:
            if stock in positions:
                order_target(stock, 0)

### Record Data Points

In [21]:
def record_vars(context, data):
    """
    Plot variables at the end of each day.
    """
    record(leverage=context.account.leverage,
           longs=context.longs,
           shorts=context.shorts)

### Running Algorithm

In [22]:
dates = predictions.index.get_level_values('date')

start_date = dates.min() + pd.DateOffset(day=1)
end_date = dates.max()

In [23]:
print('Start:\t{}\nEnd:\t{}'.format(start_date.date(), end_date.date()))

In [24]:
start = time()
results = run_algorithm(start=start_date,
                        end=end_date,
                        initialize=initialize,
                        before_trading_start=before_trading_start,
                        capital_base=1e6,
                        data_frequency='daily',
                        bundle='stooq',
                        custom_loader=signal_loader)# need to modify zipline

print('Duration: {:.2f}s'.format(time() - start))

### PyFolio Analysis

In [25]:
returns, positions, transactions = pf.utils.extract_rets_pos_txn_from_zipline(results)

In [26]:
benchmark = web.DataReader('NIKKEI225', 
                           'fred', 
                           start='2015', 
                           end='2020').squeeze()

benchmark = benchmark.pct_change().tz_localize('UTC')

#### Custom Plots

In [27]:
fig, axes = plt.subplots(ncols=2, figsize=(16, 5))

plot_rolling_returns(returns,
                     factor_returns=benchmark,
                     live_start_date='2018-01-01',
                     logy=False,
                     cone_std=2,
                     legend_loc='best',
                     volatility_match=False,
                     cone_function=forecast_cone_bootstrap,
                    ax=axes[0])
plot_rolling_sharpe(returns, ax=axes[1], rolling_window=63)
axes[0].set_title('Cumulative Returns - In and Out-of-Sample')
axes[1].set_title('Rolling Sharpe Ratio (3 Months)')
fig.tight_layout()
fig.savefig((results_path / 'pyfolio_out_of_sample').as_posix(), dpi=300);
plt.show()

### Tear Sheets

In [28]:
pf.create_full_tear_sheet(returns, 
                          positions=positions, 
                          transactions=transactions,
                          benchmark_rets=benchmark,
                          live_start_date='2018-01-01', 
                          round_trips=True)