# Backtesting with zipline - Pipeline API with Custom Data

> This notebook requires the `conda` environment `backtest`. Please see the [installation instructions](../installation/README.md) for running the latest Docker image or alternative ways to set up your environment.

## Imports & Settings

In [8]:
import warnings

from data.data_loader import data_loader

warnings.filterwarnings('ignore')

In [9]:
from pathlib import Path
from time import time

import numpy as np
import pandas as pd
import pandas_datareader.data as web
from logbook import Logger, StderrHandler, INFO, WARNING

from zipline import run_algorithm
from zipline.api import (attach_pipeline, pipeline_output,
                         date_rules, time_rules, record,
                         schedule_function, commission, slippage,
                         set_slippage, set_commission, set_max_leverage,
                         order_target, order_target_percent,
                         get_open_orders, cancel_order)
from zipline.data import bundles
from zipline.utils.run_algo import load_extensions
from zipline.pipeline import Pipeline, CustomFactor
from zipline.pipeline.data import Column, DataSet
from zipline.pipeline.domain import US_EQUITIES
from zipline.pipeline.filters import StaticAssets
from zipline.pipeline.loaders import USEquityPricingLoader
from zipline.pipeline.loaders.frame import DataFrameLoader
from trading_calendars import get_calendar

import pyfolio as pf
from pyfolio.plotting import plot_rolling_returns, plot_rolling_sharpe
from pyfolio.timeseries import forecast_cone_bootstrap

from alphalens.tears import (create_returns_tear_sheet,
                             create_summary_tear_sheet,
                             create_full_tear_sheet)

from alphalens.performance import mean_return_by_quantile
from alphalens.plotting import plot_quantile_returns_bar
from alphalens.utils import get_clean_factor_and_forward_returns, rate_of_return

import matplotlib.pyplot as plt
import seaborn as sns

In [10]:
sns.set_style('whitegrid')

np.random.seed(42)
idx = pd.IndexSlice

In [11]:
results_path = Path('results_5min_2yrs_nocut_v3')

## Alphalens Analysis

In [12]:
DATA_STORE = Path('..', 'data', 'crypto.h5')

In [13]:
"""
def get_trade_prices(tickers):
    prices = (pd.read_hdf(DATA_STORE, 'quandl/wiki/prices').swaplevel().sort_index())
    prices.index.names = ['symbol', 'date']
    prices = prices.loc[idx[tickers, '2015':'2018'], 'adj_open']
    return (prices
            .unstack('symbol')
            .sort_index()
            .shift(-1)
            .tz_localize('UTC'))
"""
def get_trade_prices(tickers):
    prices, metadata, categories = data_loader(timeframe='5Min')
    prices = prices.swaplevel().sort_index()
    prices = prices.loc[idx[tickers, :], 'open']
    prices = prices.unstack('ticker').sort_index().shift(-1).tz_localize('UTC')
    print(prices)
    return prices

In [14]:
original_predictions = pd.read_hdf(results_path / 'test_preds.h5', 'predictions')
original_predictions.info()

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 9220542 entries, ('atom-usd', Timestamp('2019-04-25 18:15:00')) to ('zec-usd', Timestamp('2021-06-17 04:55:00'))
Data columns (total 10 columns):
 #   Column  Dtype  
---  ------  -----  
 0   0       float32
 1   1       float32
 2   2       float32
 3   3       float32
 4   4       float32
 5   5       float32
 6   6       float32
 7   7       float32
 8   8       float32
 9   9       float32
dtypes: float32(10)
memory usage: 406.7+ MB


In [15]:
predictions = (original_predictions
               .iloc[:, :3]
               .mean(1)
               .to_frame('prediction'))

In [16]:
factor = (predictions
          .unstack('ticker')
          .asfreq('60Min')
          .dropna(how='all')
          .stack()
          .tz_localize('UTC', level='date')
          .sort_index())
tickers = factor.index.get_level_values('ticker').unique()

In [17]:
trade_prices = get_trade_prices(tickers)

factor

28
Index(['atom-usd', 'bat-usd', 'bnt-usd', 'bsv-usd', 'btc-usd', 'btg-usd',
       'btt-usd', 'dash-usd', 'dgb-usd', 'enj-usd', 'eos-usd', 'etc-usd',
       'eth-usd', 'leo-usd', 'ltc-usd', 'miota-usd', 'mkr-usd', 'neo-usd',
       'omg-usd', 'qtum-usd', 'trx-usd', 'vet-usd', 'wbtc-usd', 'xlm-usd',
       'xmr-usd', 'xrp-usd', 'xtz-usd', 'zec-usd'],
      dtype='object', name='ticker')
Number tickers 28
Index(['atom-usd', 'bat-usd', 'bnt-usd', 'bsv-usd', 'btc-usd', 'btg-usd',
       'btt-usd', 'dash-usd', 'dgb-usd', 'enj-usd', 'eos-usd', 'etc-usd',
       'eth-usd', 'leo-usd', 'ltc-usd', 'miota-usd', 'mkr-usd', 'neo-usd',
       'omg-usd', 'qtum-usd', 'trx-usd', 'vet-usd', 'wbtc-usd', 'xlm-usd',
       'xmr-usd', 'xrp-usd', 'xtz-usd', 'zec-usd'],
      dtype='object', name='ticker')
<class 'pandas.core.frame.DataFrame'>
MultiIndex: 437388 entries, (Timestamp('2019-06-14 12:00:00', freq='60T'), 'atom-usd') to (Timestamp('2021-03-26 08:00:00', freq='60T'), 'zec-usd')
Data columns (total

Unnamed: 0_level_0,Unnamed: 1_level_0,prediction
date,ticker,Unnamed: 2_level_1
2017-11-06 17:40:00+00:00,btc-usd,0.002092
2017-11-06 17:40:00+00:00,btg-usd,0.006738
2017-11-06 17:40:00+00:00,dash-usd,-0.000272
2017-11-06 17:40:00+00:00,eos-usd,0.001442
2017-11-06 17:40:00+00:00,etc-usd,0.001525
...,...,...
2021-06-17 04:40:00+00:00,xlm-usd,0.000039
2021-06-17 04:40:00+00:00,xmr-usd,0.000102
2021-06-17 04:40:00+00:00,xrp-usd,0.000250
2021-06-17 04:40:00+00:00,xtz-usd,-0.000014


In [18]:
print(2)
factor_data = get_clean_factor_and_forward_returns(factor=factor,
                                                   prices=trade_prices,
                                                   quantiles=5,
                                                   max_loss=0.3,
                                                   periods=(1, 5, 10, 21),
                                                   cumulative_returns = False).sort_index()
factor_data.info()

2


ValueError: Factor and prices indices don't match: make sure they have the same convention in terms of datetimes and symbol-names

In [None]:
create_summary_tear_sheet(factor_data)

In [None]:
create_returns_tear_sheet(factor_data)

### Load zipline extensions

Only need this in notebook to find bundle.

In [None]:
load_extensions(default=True,
                extensions=[],
                strict=True,
                environ=None)

In [None]:
log_handler = StderrHandler(format_string='[{record.time:%Y-%m-%d %H:%M:%S.%f}]: ' +
                            '{record.level_name}: {record.func_name}: {record.message}',
                            level=WARNING)
log_handler.push_application()
log = Logger('Algorithm')

## Algo Params

In [None]:
N_LONGS = 25
N_SHORTS = 25
MIN_POSITIONS = 10

## Load Data

### Quandl Wiki Bundel

In [None]:
bundle_data = bundles.load('quandl')

### ML Predictions

In [None]:
def load_predictions(bundle):
    predictions = (pd.read_hdf(results_path / 'test_preds.h5', 'predictions')
                   .iloc[:, :3]
                   .mean(1)
                   .to_frame('prediction'))
    tickers = predictions.index.get_level_values('ticker').unique().tolist()

    assets = bundle.asset_finder.lookup_symbols(tickers, as_of_date=None)
    predicted_sids = pd.Int64Index([asset.sid for asset in assets])
    ticker_map = dict(zip(tickers, predicted_sids))

    return (predictions
            .unstack('ticker')
            .rename(columns=ticker_map)
            .prediction
            .tz_localize('UTC')), assets

In [None]:
predictions, assets = load_predictions(bundle_data)

In [None]:
predictions.info()

### Define Custom Dataset

In [None]:
class SignalData(DataSet):
    predictions = Column(dtype=float)
    domain = US_EQUITIES

### Define Pipeline Loaders

In [None]:
signal_loader = {SignalData.predictions:
                     DataFrameLoader(SignalData.predictions, predictions)}

## Pipeline Setup

### Custom ML Factor

In [None]:
class MLSignal(CustomFactor):
    """Converting signals to Factor
        so we can rank and filter in Pipeline"""
    inputs = [SignalData.predictions]
    window_length = 1

    def compute(self, today, assets, out, predictions):
        out[:] = predictions

### Create Pipeline

In [None]:
def compute_signals():
    signals = MLSignal()
    return Pipeline(columns={
        'longs' : signals.top(N_LONGS, mask=signals > 0),
        'shorts': signals.bottom(N_SHORTS, mask=signals < 0)},
            screen=StaticAssets(assets))

## Initialize Algorithm

In [None]:
def initialize(context):
    """
    Called once at the start of the algorithm.
    """
    context.longs = context.shorts = None
    set_slippage(slippage.FixedSlippage(spread=0.00))
#     set_commission(commission.PerShare(cost=0.001, min_trade_cost=0))

    schedule_function(rebalance,
                      date_rules.every_day(),
                      time_rules.market_open(hours=1, minutes=30))

    schedule_function(record_vars,
                      date_rules.every_day(),
                      time_rules.market_close())

    pipeline = compute_signals()
    attach_pipeline(pipeline, 'signals')

### Get daily Pipeline results

In [None]:
def before_trading_start(context, data):
    """
    Called every day before market open.
    """
    output = pipeline_output('signals')
    longs = pipeline_output('signals').longs.astype(int)
    shorts = pipeline_output('signals').shorts.astype(int)
    holdings = context.portfolio.positions.keys()
    
    if longs.sum() > MIN_POSITIONS and shorts.sum() > MIN_POSITIONS:
        context.longs = longs[longs!=0].index
        context.shorts = shorts[shorts!=0].index
        context.divest = holdings - set(context.longs) - set(context.shorts)
    else:
        context.longs = context.shorts = pd.Index([])
        context.divest = set(holdings)

## Define Rebalancing Logic

In [None]:
def rebalance(context, data):
    """
    Execute orders according to schedule_function() date & time rules.
    """
    
    for symbol, open_orders in get_open_orders().items():
        for open_order in open_orders:
            cancel_order(open_order)
          
    for stock in context.divest:
        order_target(stock, target=0)
    
#     log.warning('{} {:,.0f}'.format(len(context.portfolio.positions), context.portfolio.portfolio_value))
    if not (context.longs.empty and context.shorts.empty):
        for stock in context.shorts:
            order_target_percent(stock, -1 / len(context.shorts))
        for stock in context.longs:
            order_target_percent(stock, 1 / len(context.longs))

## Record Data Points

In [None]:
def record_vars(context, data):
    """
    Plot variables at the end of each day.
    """
    record(leverage=context.account.leverage,
           longs=context.longs,
           shorts=context.shorts)

## Run Algorithm

In [None]:
dates = predictions.index.get_level_values('date')
start_date, end_date = dates.min(), dates.max()

In [None]:
print('Start: {}\nEnd:   {}'.format(start_date.date(), end_date.date()))

In [None]:
start = time()
results = run_algorithm(start=start_date,
                        end=end_date,
                        initialize=initialize,
                        before_trading_start=before_trading_start,
                        capital_base=1e5,
                        data_frequency='daily',
                        bundle='quandl',
                        custom_loader=signal_loader)  # need to modify zipline

print('Duration: {:.2f}s'.format(time() - start))

## PyFolio Analysis

In [None]:
returns, positions, transactions = pf.utils.extract_rets_pos_txn_from_zipline(results)

In [None]:
benchmark = web.DataReader('SP500', 'fred', '2014', '2018').squeeze()
benchmark = benchmark.pct_change().tz_localize('UTC')

### Custom Plots

In [None]:
LIVE_DATE = '2016-11-30'

In [None]:
fig, axes = plt.subplots(ncols=2, figsize=(16, 5))
plot_rolling_returns(returns,
                     factor_returns=benchmark,
                     live_start_date=LIVE_DATE,
                     logy=False,
                     cone_std=2,
                     legend_loc='best',
                     volatility_match=False,
                     cone_function=forecast_cone_bootstrap,
                     ax=axes[0])
plot_rolling_sharpe(returns, ax=axes[1], rolling_window=63)
axes[0].set_title('Cumulative Returns - In and Out-of-Sample')
axes[1].set_title('Rolling Sharpe Ratio (3 Months)')
sns.despine()
fig.tight_layout()
fig.savefig((results_path / 'pyfolio_out_of_sample').as_posix(), dpi=300)

### Tear Sheets

In [None]:
pf.create_full_tear_sheet(returns, 
                          positions=positions, 
                          transactions=transactions,
                          benchmark_rets=benchmark,
                          live_start_date=LIVE_DATE, 
                          round_trips=True)