## Backtesting Base Class ##

In [1]:
#
# Python Script with Base Class
# for Event-based Backtesting
#
# Python for Algorithmic Trading
# (c) Dr. Yves J. Hilpisch
# The Python Quants GmbH
#
import numpy as np
import pandas as pd
from pylab import mpl, plt
plt.style.use('seaborn-v0_8')
mpl.rcParams['font.family'] = 'serif'


class BacktestBase(object):
    ''' Base class for event-based backtesting of trading strategies.

    Attributes
    ==========
    symbol: str
        TR RIC (financial instrument) to be used
    start: str
        start date for data selection
    end: str
        end date for data selection
    amount: float
        amount to be invested either once or per trade
    ftc: float
        fixed transaction costs per trade (buy or sell)
    ptc: float
        proportional transaction costs per trade (buy or sell)

    Methods
    =======
    get_data:
        retrieves and prepares the base data set
    plot_data:
        plots the closing price for the symbol
    get_date_price:
        returns the date and price for the given bar
    print_balance:
        prints out the current (cash) balance
    print_net_wealth:
        prints auf the current net wealth
    place_buy_order:
        places a buy order
    place_sell_order:
        places a sell order
    close_out:
        closes out a long or short position
    '''

    raw = None
    
    def __init__(self, symbol, start, end, amount,
                 ftc=0.0, ptc=0.0, verbose=True):
        self.symbol = symbol
        self.start = start
        self.end = end
        self.initial_amount = amount
        self.amount = amount
        self.ftc = ftc
        self.ptc = ptc
        self.units = 0
        self.position = 0
        self.trades = 0
        self.verbose = verbose
        self.get_data()

    def get_data(self):
        ''' Retrieves and prepares the data.
        '''
        
        if self.raw is None:
            print("Retrieving data...")
            self.raw = pd.read_csv('http://hilpisch.com/pyalgo_eikon_eod_data.csv',
                              index_col=0, parse_dates=True).dropna()
        self.data = pd.DataFrame(self.raw[self.symbol].copy())
        self.data = self.data.loc[self.start:self.end]
        self.data.rename(columns={self.symbol: 'price'}, inplace=True)
        self.data['return'] = np.log(self.data['price'] / self.data['price'].shift(1))
        self.data.dropna(inplace=True)



    def select_data(self, start, end):
        ''' Selects sub-sets of the financial data.
        '''
        data = self.data[(self.data.index >= start) &
                         (self.data.index <= end)].copy()
        return data

    def plot_data(self, cols=None):
        ''' Plots the closing prices for symbol.
        '''
        if cols is None:
            cols = ['price']
        self.data['price'].plot(figsize=(10, 6), title=self.symbol)

    def get_date_price(self, bar):
        ''' Return date and price for bar.
        '''
        date = str(self.data.index[bar])[:10]
        price = self.data.price.iloc[bar]
        return date, price

    def print_balance(self, bar):
        ''' Print out current cash balance info.
        '''
        date, price = self.get_date_price(bar)
        print(f'{date} | current balance {self.amount:.2f}')

    def print_net_wealth(self, bar):
        ''' Print out current cash balance info.
        '''
        date, price = self.get_date_price(bar)
        net_wealth = self.units * price + self.amount
        print(f'{date} | current net wealth {net_wealth:.2f}')

    def place_buy_order(self, bar, units=None, amount=None):
        ''' Place a buy order.
        '''
        date, price = self.get_date_price(bar)
        if units is None:
            units = int(amount / price)
        self.amount -= (units * price) * (1 + self.ptc) + self.ftc
        self.units += units
        self.trades += 1
        if self.verbose:
            print(f'{date} | BUY {units} units at {price:.2f} - value: {(price*units):.2f}')
            self.print_balance(bar)
            self.print_net_wealth(bar)

    def place_sell_order(self, bar, units=None, amount=None):
        ''' Place a sell order.
        '''
        date, price = self.get_date_price(bar)
        if units is None:
            units = int(amount / price)
        self.amount += (units * price) * (1 - self.ptc) - self.ftc
        self.units -= units
        self.trades += 1
        if self.verbose:
            print(f'{date} | SELL {units} units at {price:.2f} - value: {(price*units):.2f}')
            self.print_balance(bar)
            self.print_net_wealth(bar)

    def close_out(self, bar):
        ''' Closing out a long or short position.
        '''
        date, price = self.get_date_price(bar)
        self.amount += self.units * price
        self.units = 0
        self.trades += 1
        if self.verbose:
            print(f'{date} | *** CLOSING OUT FINAL POSITION ***')
            print(f'{date} | Liquidating inventory: {self.units} units at {price:.2f}')
            print('=' * 57)
        print('Final balance   [$] {:.2f}'.format(self.amount))
        perf = ((self.amount - self.initial_amount) /
                self.initial_amount * 100)
        print('Net Performance [%] {:.2f}'.format(perf))
        print('=' * 57)


#if __name__ == '__main__':
#    bb = BacktestBase('AAPL.O', '2010-1-1', '2019-12-31', 10000)
#    print(bb.data.info())
#    print(bb.data.tail())
#    bb.plot_data()
#    # plt.savefig('../../images/ch06/backtestbaseplot.png')


In [2]:
bt = BacktestBase("EUR=", start='2012-01-01', end='2015-12-31', amount=10000, ftc=3)

Retrieving data...


In [3]:
#bt.raw

In [4]:
#bt.data

## (Interim) basic long/short event based backtest

In [6]:
#smabt = BacktestML("EUR=", start='2012-01-01', end='2015-12-31', amount=10000, ftc=3)

In [7]:
#smabt.run_sma_strategy(42, 252)

## ML event based backtesting

In [8]:
from BacktestBase import *
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier

In [33]:
class BacktestML(BacktestBase):
    
    models = None
    train_test_split_percentage = 0.7
    lags = 7
    base_features = []
    lagged_features = []
    
    def setup_models(self):
        print("Setting up ML models...")
        self.models = {'gauss': GaussianNB(),
                       'logreg': LogisticRegression(C=1, solver='lbfgs', max_iter=500),
                       'dtc': DecisionTreeClassifier(max_depth=7),
                       'svm': SVC(C=1, gamma='auto', kernel='linear'),
                       'mlp': MLPClassifier(hidden_layer_sizes=[64], shuffle=False,
                                max_iter=1000)}
        
    
    def go_long(self, bar, units=None, amount=None):
        if self.position == -1:
            self.place_buy_order(bar, units=-self.units)
        if units:
            self.place_buy_order(bar, units=units)
        elif amount:
            if amount == 'all':
                amount = self.amount
            self.place_buy_order(bar, amount=amount)

    def go_short(self, bar, units=None, amount=None):
        if self.position == 1:
            self.place_sell_order(bar, units=self.units)
        if units:
            self.place_sell_order(bar, units=units)
        elif amount:
            if amount == 'all':
                amount = self.amount
            self.place_sell_order(bar, amount=amount)

    # *** This sma strategy is mainly used for testing of core logic while developing,
    # *** but not part of ML strategies
    def run_sma_strategy(self, SMA1, SMA2):
        msg = f'\n\nRunning SMA strategy | {self.symbol} | SMA1={SMA1} & SMA2={SMA2}'
        msg += f'\nfixed costs {self.ftc} | '
        msg += f'proportional costs {self.ptc}'
        print(msg)
        print('=' * 57)
        self.position = 0  # initial neutral position
        self.trades = 0  # no trades yet
        self.amount = self.initial_amount  # reset initial capital
        self.data['SMA1'] = self.data['price'].rolling(SMA1).mean()
        self.data['SMA2'] = self.data['price'].rolling(SMA2).mean()

        for bar in range(SMA2, len(self.data)):
            if self.position in [0, -1]:
                if self.data['SMA1'].iloc[bar] > self.data['SMA2'].iloc[bar]:
                    self.go_long(bar, amount='all')
                    self.position = 1  # long position
            elif self.position in [0, 1]:
                if self.data['SMA1'].iloc[bar] < self.data['SMA2'].iloc[bar]:
                    self.go_short(bar, amount='all')
                    self.position = -1  # short position
        self.close_out(bar)

    def prepare_features(self):
        print("Prepring features...")
        self.base_features = ['SMA1',
                'SMA2',
                'SMA_dif',
                'EWMA1',
                'EWMA2',
                'EWMA_dif',
                'VOLAT1',
                'VOLAT2']
        SMA1 = 42
        SMA2 = 252
        EWMA1 = 37
        EWMA2 = 157
        VOLAT1 = 10
        VOLAT2 = 50
        
        self.data['SMA1'] = self.data['price'].rolling(SMA1).mean()
        self.data['SMA2'] = self.data['price'].rolling(SMA2).mean()
        self.data['SMA_dif'] = self.data['SMA1'] - self.data['SMA2']
        self.data['EWMA1'] = self.data['price'].ewm(halflife=EWMA1).mean()
        self.data['EWMA2'] = self.data['price'].ewm(halflife=EWMA2).mean()
        self.data['EWMA_dif'] = self.data['EWMA1'] - self.data['EWMA2']
        self.data['VOLAT1'] = self.data['return'].rolling(VOLAT1).std()
        self.data['VOLAT2'] = self.data['return'].rolling(VOLAT2).std()
        self.data.dropna(inplace=True)
        
        # Apply train/test split, then calculate mu and std for normalization based on train data
        # Then apply calculated mu & std to feature columns of entire data set
        
        split = int(len(self.data) * self.train_test_split_percentage)
        train = self.data.iloc[:split]
        
        mu, std = train[self.base_features].mean(), train[self.base_features].std()
        self.data[self.base_features] -= mu
        self.data[self.base_features] /= std
        
        # Add direction,
        self.data['d'] = np.where(self.data['return'] > 0, 1, 0)
        self.data['d'] = self.data['d'].astype(int)

        bins = [-0.025, -0.02, -0.015, -0.01, -0.005, 0.005, 0.01, 0.015, 0.02, 0.025,]
        self.data['d_bins'] = np.digitize(self.data['return'], bins=bins)
        
#        if self.end is not None:
#            self.data = self.data.iloc[:self.end - self.start]
#            self.data_ = self.data_.iloc[:self.end - self.start]

        self.base_features.extend(['return', 'd', 'd_bins'])

    def add_lags_to_features(self):
        print(f'Adding {self.lags} lags to all features...')
        lagged_features = []
        for col in self.base_features:
            for lag in range(1, self.lags + 1):
                col_ = col + f'_lag_{lag}'
                #train[col_] = train[col].shift(lag)
                #test[col_] = test[col].shift(lag)
                self.data[col_] = self.data[col].shift(lag)
                lagged_features.append(col_)
        #train.dropna(inplace=True)
        #test.dropna(inplace=True)
        self.data.dropna(inplace=True)
    
    def create_train_test_data_split(self):
        print(f'Creating training ({self.train_test_split_percentage*100:.0f}%) and testing ({(1-self.train_test_split_percentage)*100:.0f}%) data split...')
        split = int(len(self.data) * self.train_test_split_percentage)
        self.train_data = self.data.iloc[:split].copy()
        self.test_data = self.data.iloc[split:].copy()

    
    def run_all_ml_strategies(self):
        if self.models is None:
            self.setup_models()
        self.prepare_features()
        print(f'Base features: {self.base_features}')
        self.add_lags_to_features()
        self.create_train_test_data_split()
#        for m in self.models:
#            self.run_ml_strategy(m)
 
    
    def run_ml_strategy(self, model):
        msg = f"\n\nRunning ML strategy: {model} | {self.symbol}"
        msg += f'\nfixed costs {self.ftc} | '
        msg += f'proportional costs {self.ptc}'
        print(msg)
        print('=' * 57)
        self.position = 0  # initial neutral position
        self.trades = 0  # no trades yet
        self.amount = self.initial_amount  # reset initial capital
        self.data['momentum'] = self.data['return'].rolling(momentum).mean()
        for bar in range(momentum, len(self.data)):
            if self.position in [0, -1]:
                if self.data['momentum'].iloc[bar] > 0:
                    self.go_long(bar, amount='all')
                    self.position = 1  # long position
            elif self.position in [0, 1]:
                if self.data['momentum'].iloc[bar] <= 0:
                    self.go_short(bar, amount='all')
                    self.position = -1  # short position
        self.close_out(bar)

In [34]:
mlbt = BacktestML("EUR=", start='2012-01-01', end='2015-12-31', amount=10000, ftc=3)

Retrieving data...


In [35]:
mlbt.run_all_ml_strategies()

Setting up ML models...
Prepring features...
Base features: ['SMA1', 'SMA2', 'SMA_dif', 'EWMA1', 'EWMA2', 'EWMA_dif', 'VOLAT1', 'VOLAT2', 'return', 'd', 'd_bins']
Adding 7 lags to all features...
Creating training (70%) and testing (30%) data split...


In [None]:
#mlbt.data