In [1]:
import backtrader as bt
import backtrader.feeds as btfeeds
import backtrader.indicators as btind
import backtrader.analyzers as btanalyzers
from enum import Enum
import pandas as pd
import numpy as np
import statsmodels.api as sm
import matplotlib

import math
import datetime

In [None]:
# Reference: https://github.com/polakowo/vectorbt/blob/master/examples/PairsTrading.ipynb

In [None]:
#This is the data feed to be fed into the backtesting lib
class PandasData(btfeeds.PandasData):
    params = (
        # Possible values for datetime (must always be present)
        #  None : datetime is the "index" in the Pandas Dataframe
        #  -1 : autodetect position or case-wise equal name
        #  >= 0 : numeric index to the colum in the pandas dataframe
        #  string : column name (as index) in the pandas dataframe
        ('datetime', None),

        ('open', 'Open'),
        ('high', 'High'),
        ('low', 'Low'),
        ('close', 'Adj Close'),
        ('openinterest', None),
    )

In [None]:
class Log(btind.Indicator):
    """Calculates log."""
    lines = ('log',)
    
    def next(self):
        self.l.log[0] = math.log(self.data[0])

class OLSSlopeIntercept(btind.PeriodN):
    """Calculates a linear regression using OLS."""
    _mindatas = 2  # ensure at least 2 data feeds are passed

    lines = ('slope', 'intercept',)
    params = (
        ('period', 10),
    )

    def next(self):
        p0 = pd.Series(self.data0.get(size=self.p.period))
        p1 = pd.Series(self.data1.get(size=self.p.period))
        p1 = sm.add_constant(p1)
        intercept, slope = sm.OLS(p0, p1).fit().params

        self.lines.slope[0] = slope
        self.lines.intercept[0] = intercept

class OLSSpread(btind.PeriodN):
    """Calculates the z-score of the OLS spread."""
    _mindatas = 2  # ensure at least 2 data feeds are passed
    lines = ('slope', 'spread', 'spread_mean', 'spread_std', 'zscore',)
    params = (('period', 10),)

    def __init__(self):
        data0_log = Log(self.data0)
        data1_log = Log(self.data1)
        slint = OLSSlopeIntercept(data0_log, data1_log, period=self.p.period)

        spread = data0_log - (slint.slope * data1_log + slint.intercept)
        self.l.spread = spread
        self.l.slope = slint.slope

        self.l.spread_mean = bt.ind.SMA(spread, period=self.p.period)
        self.l.spread_std = bt.ind.StdDev(spread, period=self.p.period)
        self.l.zscore = (spread - self.l.spread_mean) / self.l.spread_std

In [None]:
class Status(Enum):
    LONG = 1
    SHORT = 2
    NONE = 3
    
class PairTradingStrategy(bt.Strategy):
    """Basic pair trading strategy."""
    
    # These are just default values, pass in these values while initialising strategy
    params = dict(
        period=100,
        order_pct1=0.1,
        order_pct2=0.1,
        printout=True,
        upper=2,
        lower=2,
        symbol1="SYMBOL1",
        symbol2="SYMBOL2"
    )

    def log(self, txt, dt=None):
        if self.p.printout:
            dt = dt or self.data.datetime[0]
            dt = bt.num2date(dt)
            print('%s, %s' % (dt.isoformat(), txt))

    def notify_order(self, order):
        if order.status in [bt.Order.Submitted, bt.Order.Accepted]:
            return  # Await further notifications

        if order.status == order.Completed:
            if order.isbuy():
                buytxt = 'BUY COMPLETE {}, size = {:.2f}, price = {:.2f}'.format(
                    order.data._name, order.executed.size, order.executed.price)
                self.log(buytxt, order.executed.dt)
            else:
                selltxt = 'SELL COMPLETE {}, size = {:.2f}, price = {:.2f}'.format(
                    order.data._name, order.executed.size, order.executed.price)
                self.log(selltxt, order.executed.dt)

        elif order.status in [order.Expired, order.Canceled, order.Margin]:
            self.log('%s ,' % order.Status[order.status])
            pass  # Simply log

        # Allow new orders
        self.orderid = None

    def __init__(self):
        # To control operation entries
        self.orderid = None
        self.order_pct1 = self.p.order_pct1
        self.order_pct2 = self.p.order_pct2
        self.upper = self.p.upper
        self.lower = self.p.lower
        
        self.transform = OLSSpread(self.data0, self.data1, period=self.p.period)

        self.spread = self.transform.spread
        self.zscore = self.transform.zscore
        self.slope = self.transform.slope
        
        self.status = Status.NONE
        self.symbol1 = self.p.symbol1
        self.symbol2 = self.p.symbol2

    def next(self):
        if self.orderid:
            return  # if an order is active, no new orders are allowed
        
        if self.zscore[0] > self.upper and self.status != Status.SHORT:
            # Short sell stock 1
            self.order_target_percent(data=self.data0, target=-self.order_pct1)
            # Buy stock 2
            self.order_target_percent(data=self.data1, target=self.order_pct2)
            
            self.status = Status.SHORT

        elif self.zscore[0] < self.lower and self.status != Status.LONG:
            # Short sell stock 2
            self.order_target_percent(data=self.data1, target=-self.order_pct2)
            # Buy stock 1
            self.order_target_percent(data=self.data0, target=self.order_pct1)
                     
            self.status = Status.LONG
        
#         # Sample strategy does not close position when zscore reaches zero again, 
#         # only reverses position when it swings the other way
#         # uncomment below if desired behaviour is to close at z-score = 0
#         elif self.zscore[0] <= 0 and self.status == Status.SHORT:
#             # Close position
#             self.order_target_percent(data=self.data1, target=0)
#             self.order_target_percent(data=self.data0, target=0)
#             self.status = Status.NONE
        
#         elif self.zscore[0] >= 0 and self.status == Status.LONG:
#             # Close position
#             self.order_target_percent(data=self.data1, target=0)
#             self.order_target_percent(data=self.data0, target=0)
#             self.status = Status.NONE

    def stop(self):
        if self.p.printout:
            print('==================================================')
            print('Starting Value - %.2f' % self.broker.startingcash)
            print('Ending   Value - %.2f' % self.broker.getvalue())
            print('==================================================')

In [None]:
# This this can be added to Cerebro allow fractional shares (for this like crypto)
# By default backtrader doesnt allow that.
class CommInfoFloat(bt.CommInfoBase):
    """Commission schema that keeps size as float."""
    params = (
        ('stocklike', True),
        ('commtype', bt.CommInfoBase.COMM_PERC),
        ('percabs', True),
      )
    
    def getsize(self, price, cash):
        if not self._stocklike:
            return self.p.leverage * (cash / self.get_margin(price))

        return self.p.leverage * (cash / price)


class DataAnalyzer(bt.analyzers.Analyzer):
    """Analyzer to extract OHLCV."""
    def create_analysis(self):
        self.rets0 = {}
        self.rets1 = {}

    def next(self):
        self.rets0[self.strategy.datetime.datetime()] = [
            self.data0.open[0],
            self.data0.high[0],
            self.data0.low[0],
            self.data0.close[0],
            self.data0.volume[0]
        ]
        self.rets1[self.strategy.datetime.datetime()] = [
            self.data1.open[0],
            self.data1.high[0],
            self.data1.low[0],
            self.data1.close[0],
            self.data1.volume[0]
        ]

    def get_analysis(self):
        return self.rets0, self.rets1

class CashValueAnalyzer(bt.analyzers.Analyzer):
    """Analyzer to extract cash and value."""
    def create_analysis(self):
        self.rets = {}

    def notify_cashvalue(self, cash, value):
        self.rets[self.strategy.datetime.datetime()] = self.strategy.broker.getvalue()

    def get_analysis(self):
        return self.rets
    
class OrderAnalyzer(bt.analyzers.Analyzer):
    """Analyzer to extract order price, size, value, and paid commission."""
    def create_analysis(self):
        self.rets0 = {}
        self.rets1 = {}

    def notify_order(self, order):
        if order.status == order.Completed:
            if order.data._name == self.strategy.symbol1:
                rets = self.rets0
            else:
                rets = self.rets1
            rets[self.strategy.datetime.datetime()] = (
                order.executed.price,
                order.executed.size,
                -order.executed.size * order.executed.price,
                order.executed.comm
            )

    def get_analysis(self):
        return self.rets0, self.rets1