In [14]:
from numba import njit
import numpy as np
import pandas as pd
import datetime
import collections
import math
import pytz
import scipy.stats as st
import vectorbt as vbt

import backtrader as bt
import backtrader.feeds as btfeeds

class OrderAnalyzer(bt.analyzers.Analyzer):
    """Analyzer to extract order price, size, value, and paid commission."""
    def create_analysis(self):
        self.rets0 = {}
        self.rets1 = {}

    def notify_order(self, order):
        if order.status == order.Completed:
            if order.data._name == SYMBOL1:
                rets = self.rets0
            else:
                rets = self.rets1
            rets[self.strategy.datetime.datetime()] = (
                order.executed.price,
                order.executed.size,
                -order.executed.size * order.executed.price,
                order.executed.comm
            )

    def get_analysis(self):
        return self.rets0, self.rets1

class CashValueAnalyzer(bt.analyzers.Analyzer):
    """Analyzer to extract cash and value."""
    def create_analysis(self):
        self.rets = {}

    def notify_cashvalue(self, cash, value):
        self.rets[self.strategy.datetime.datetime()] = (cash, value)

    def get_analysis(self):
        return self.rets

class DataAnalyzer(bt.analyzers.Analyzer):
    """Analyzer to extract OHLCV."""
    def create_analysis(self):
        self.rets0 = {}
        self.rets1 = {}

    def next(self):
        self.rets0[self.strategy.datetime.datetime()] = [
            self.data0.open[0],
            self.data0.high[0],
            self.data0.low[0],
            self.data0.close[0],
            self.data0.volume[0]
        ]
        self.rets1[self.strategy.datetime.datetime()] = [
            self.data1.open[0],
            self.data1.high[0],
            self.data1.low[0],
            self.data1.close[0],
            self.data1.volume[0]
        ]

    def get_analysis(self):
        return self.rets0, self.rets1

class PandasData(btfeeds.PandasData):
    params = (
        # Possible values for datetime (must always be present)
        #  None : datetime is the "index" in the Pandas Dataframe
        #  -1 : autodetect position or case-wise equal name
        #  >= 0 : numeric index to the colum in the pandas dataframe
        #  string : column name (as index) in the pandas dataframe
        ('datetime', None),

        ('open', 'Open'),
        ('high', 'High'),
        ('low', 'Low'),
        ('close', 'Close'),
        ('volume', 'Volume'),
        ('openinterest', None),
    )

class CommInfoFloat(bt.CommInfoBase):
    """Commission schema that keeps size as float."""
    params = (
        ('stocklike', True),
        ('commtype', bt.CommInfoBase.COMM_PERC),
        ('percabs', True),
      )
    
    def getsize(self, price, cash):
        if not self._stocklike:
            return self.p.leverage * (cash / self.get_margin(price))

        return self.p.leverage * (cash / price)

class PairTradingStrategy(bt.Strategy):
    """Basic pair trading strategy."""
    params = dict(
        period=PERIOD,
        order_pct1=ORDER_PCT1,
        order_pct2=ORDER_PCT2,
        printout=True,
        upper=UPPER,
        lower=LOWER,
        mode=MODE
    )

    def log(self, txt, dt=None):
        if self.p.printout:
            dt = dt or self.data.datetime[0]
            dt = bt.num2date(dt)
            print('%s, %s' % (dt.isoformat(), txt))

    def notify_order(self, order):
        if order.status in [bt.Order.Submitted, bt.Order.Accepted]:
            return  # Await further notifications

        if order.status == order.Completed:
            if order.isbuy():
                buytxt = 'BUY COMPLETE {}, size = {:.2f}, price = {:.2f}'.format(
                    order.data._name, order.executed.size, order.executed.price)
                self.log(buytxt, order.executed.dt)
            else:
                selltxt = 'SELL COMPLETE {}, size = {:.2f}, price = {:.2f}'.format(
                    order.data._name, order.executed.size, order.executed.price)
                self.log(selltxt, order.executed.dt)

        elif order.status in [order.Expired, order.Canceled, order.Margin]:
            self.log('%s ,' % order.Status[order.status])
            pass  # Simply log

        # Allow new orders
        self.orderid = None

@njit
def rolling_logret_zscore_nb(a, b, period):
    """Calculate the log return spread."""
    spread = np.full_like(a, np.nan, dtype=np.float_)
    spread[1:] = np.log(a[1:] / a[:-1]) - np.log(b[1:] / b[:-1])
    zscore = np.full_like(a, np.nan, dtype=np.float_)
    for i in range(a.shape[0]):
        from_i = max(0, i + 1 - period)
        to_i = i + 1
        if i < period - 1:
            continue
        spread_mean = np.mean(spread[from_i:to_i])
        spread_std = np.std(spread[from_i:to_i])
        zscore[i] = (spread[i] - spread_mean) / spread_std
    return spread, zscore

@njit
def ols_spread_nb(a, b):
    """Calculate the OLS spread."""
    a = np.log(a)
    b = np.log(b)
    _b = np.vstack((b, np.ones(len(b)))).T
    slope, intercept = np.dot(np.linalg.inv(np.dot(_b.T, _b)), np.dot(_b.T, a))
    spread = a - (slope * b + intercept)
    return spread[-1]
    
@njit
def rolling_ols_zscore_nb(a, b, period):
    """Calculate the z-score of the rolling OLS spread."""
    spread = np.full_like(a, np.nan, dtype=np.float_)
    zscore = np.full_like(a, np.nan, dtype=np.float_)
    for i in range(a.shape[0]):
        from_i = max(0, i + 1 - period)
        to_i = i + 1
        if i < period - 1:
            continue
        spread[i] = ols_spread_nb(a[from_i:to_i], b[from_i:to_i])
        spread_mean = np.mean(spread[from_i:to_i])
        spread_std = np.std(spread[from_i:to_i])
        zscore[i] = (spread[i] - spread_mean) / spread_std
    return spread, zscore

def prepare_cerebro(data0, data1, use_analyzers=True, **params):
    # Create a cerebro
    cerebro = bt.Cerebro()

    # Add the 1st data to cerebro
    cerebro.adddata(data0)

    # Add the 2nd data to cerebro
    cerebro.adddata(data1)

    # Add the strategy
    cerebro.addstrategy(PairTradingStrategy, **params)

    # Add the commission - only stocks like a for each operation
    cerebro.broker.setcash(CASH)

    # Add the commission - only stocks like a for each operation
    comminfo = CommInfoFloat(commission=COMMPERC)
    cerebro.broker.addcommissioninfo(comminfo)

    if use_analyzers:
        # Add analyzers    
        cerebro.addanalyzer(DataAnalyzer)
        cerebro.addanalyzer(CashValueAnalyzer)
        cerebro.addanalyzer(OrderAnalyzer)
    
    return cerebro

SYMBOL1 = 'PEP'
SYMBOL2 = 'KO'
FROMDATE = datetime.datetime(2017, 1, 1, tzinfo=pytz.utc)
TODATE = datetime.datetime(2019, 1, 1, tzinfo=pytz.utc)
PERIOD = 100
CASH = 100000
COMMPERC = 0.005  # 0.5%
ORDER_PCT1 = 0.1
ORDER_PCT2 = 0.1
UPPER = st.norm.ppf(1 - 0.05 / 2)
LOWER = -st.norm.ppf(1 - 0.05 / 2)
MODE = 'OLS'  # OLS, log_return


start_date = FROMDATE.replace(tzinfo=pytz.utc)
end_date = TODATE.replace(tzinfo=pytz.utc)
data = vbt.YFData.download([SYMBOL1, SYMBOL2], start=start_date, end=end_date)
data = data.loc[(data.wrapper.index >= start_date) & (data.wrapper.index < end_date)]

print(data.data[SYMBOL1].iloc[[0, -1]])
print(data.data[SYMBOL2].iloc[[0, -1]])


# Create the 1st data
data0 = PandasData(dataname=data.data[SYMBOL1], name=SYMBOL1)

# Create the 2nd data
data1 = PandasData(dataname=data.data[SYMBOL2], name=SYMBOL2)

# Prepare a cerebro
cerebro = prepare_cerebro(data0, data1)

# And run it
bt_strategy = cerebro.run()[0]



# Extract OHLCV
bt_s1_rets, bt_s2_rets = bt_strategy.analyzers.dataanalyzer.get_analysis()
data_cols = ['open', 'high', 'low', 'close', 'volume']
bt_s1_ohlcv = pd.DataFrame.from_dict(bt_s1_rets, orient='index', columns=data_cols)
bt_s2_ohlcv = pd.DataFrame.from_dict(bt_s2_rets, orient='index', columns=data_cols)

print(bt_s1_ohlcv.shape)
print(bt_s2_ohlcv.shape)

print(bt_s1_ohlcv.iloc[[0, -1]])
print(bt_s2_ohlcv.iloc[[0, -1]])


# Calculate OLS z-score using Numba for a nice speedup
if MODE == 'OLS':
    vbt_spread, vbt_zscore = rolling_ols_zscore_nb(
        bt_s1_ohlcv['close'].values, 
        bt_s2_ohlcv['close'].values, 
        PERIOD
    )
elif MODE == 'log_return':
    vbt_spread, vbt_zscore = rolling_logret_zscore_nb(
        bt_s1_ohlcv['close'].values, 
        bt_s2_ohlcv['close'].values, 
        PERIOD
    )
else:
    raise ValueError("Unknown mode")
vbt_spread = pd.Series(vbt_spread, index=bt_s1_ohlcv.index, name='spread')
vbt_zscore = pd.Series(vbt_zscore, index=bt_s1_ohlcv.index, name='zscore')

                                Open       High       Low      Close   Volume  \
Date                                                                            
2017-01-03 05:00:00+00:00  86.424916  86.548445  85.82371  86.186081  3741200   
2018-12-31 05:00:00+00:00  96.724638  97.170731  95.62251  96.637169  5019100   

                           Dividends  Stock Splits  
Date                                                
2017-01-03 05:00:00+00:00        0.0           0.0  
2018-12-31 05:00:00+00:00        0.0           0.0  
                                Open       High        Low      Close  \
Date                                                                    
2017-01-03 05:00:00+00:00  33.463263  33.713231  33.285867  33.705166   
2018-12-31 05:00:00+00:00  40.949931  40.993045  40.492918  40.829208   

                             Volume  Dividends  Stock Splits  
Date                                                          
2017-01-03 05:00:00+00:00  14711000        0