In [1]:
import numpy as np
import pandas as pd
import bt
import copy
from pykalman import KalmanFilter

class Tester():
    def __init__(self, pairs, data):
        self.pairs = pairs
        self.data = data
    
    def zscore(stocks):
        """ Basic function to compute z-score. """
        return (stocks - stocks.mean()) / np.std(stocks)
    
    def calc_kalman(self, stock1_prices, stock2_prices):
        """
        Utilise the Kalman Filter from the pyKalman package
        to calculate the slope and intercept of the regressed
        ETF prices.
        """
        delta = 1e-5
        trans_cov = delta / (1 - delta) * np.eye(2)
        obs_mat = np.vstack(
            [stock1_prices, np.ones(stock1_prices.shape)]
        ).T[:, np.newaxis]

        kf = KalmanFilter(
            n_dim_obs=1, 
            n_dim_state=2,
            initial_state_mean=np.zeros(2),
            initial_state_covariance=np.ones((2, 2)),
            transition_matrices=np.eye(2),
            observation_matrices=obs_mat,
            observation_covariance=1.0,
            transition_covariance=trans_cov
        )
        state_means, state_covs = kf.filter(stock2_prices.values)
        return state_means, state_covs
    
    def get_beta_dy(self, stock1_price, stock2_price):
        """Return dynamic beta hr of a pair."""
        beta_means, beta_covs = self.calc_kalman(stock1_price, stock2_price)
        betas = []  
        for beta in beta_means:
            betas.append(beta[0])
        betas = pd.Series(betas)
        betas.index = stock2_price.index
        return betas
    
    def create_zscore_roll(self, spread):
        """Create rolling z-score of spread"""
        spread_mavg1 = spread.rolling(5).mean()
        # 60 day rolling
        spread_mavg2 = spread.rolling(60).mean()
        spread_std = spread.rolling(60).std()
        zscore_roll = (spread_mavg1 - spread_mavg2)/spread_std
        return zscore_roll

        
    def create_sig(self, zscore_roll, upper, lower):
        """Create signal matrix with specified parameters for backtest"""
        sig = zscore_roll.copy()
        sig = sig.to_frame()
        sig.columns = ['z_score']
        sig['z_score'].loc[(sig['z_score'].isnull())] = 0
        sig['z_score'].loc[(sig['z_score'].between(lower, upper))] = 0
        sig['z_score'].loc[(sig['z_score'] < lower)] = -1
        sig['z_score'].loc[(sig['z_score'] > upper)] = 1
        return sig
    
    def run_backtest(self, display):
        weigh_equally = 2*len(self.pairs)
        weights = copy.deepcopy(self.data)
        for pair in self.pairs:
            betas = self.get_beta_dy(np.log(self.data[pair[0]]), np.log(self.data[pair[1]]))
            spread = np.log(self.data[pair[0]]) - (betas * np.log(self.data[pair[1]]))
            zscore_roll = self.create_zscore_roll(spread)
             
#             if display:

#                 # rolling z-score of spread
#                 zscore_roll.plot(label = 'Rolling z score')
#                 plt.title("Rolling Z-Score of Spread")
#                 plt.axhline(0, color='black')
#                 plt.axhline(1, c='r',ls='--')
#                 plt.axhline(-1, c='r',ls='--')
#                 # slope and intercept changes from Kalman Filter
#                 draw_slope_intercept_changes(data[pair[0]], beta_means)
                
            sig = self.create_sig(zscore_roll, pair[2], pair[3])
            weights[pair[0]] = -sig / weigh_equally
            weights[pair[1]] = sig / weigh_equally
            
        pair_trade = bt.Strategy('Pairs Trading ', [bt.algos.WeighTarget(weights),
                                        bt.algos.Rebalance(),
                                        bt.algos.RunAfterDays(100),
                                        ])
        benchmark = bt.Strategy('Benchmark', [bt.algos.SelectAll(), 
                                              bt.algos.WeighEqually(),
                                              bt.algos.Rebalance(),
                                              bt.algos.RunAfterDays(100)
                                        ])
        t = bt.Backtest(pair_trade, self.data)
        b = bt.Backtest(benchmark, self.data)
        res = bt.run(t,b)
        if display:
            res.display()
            res.plot()
        strat_sharpe = res[0].stats['monthly_sharpe']
        bench_sharpe = res[1].stats['monthly_sharpe']
        return strat_sharpe     