<a href="https://colab.research.google.com/github/nessb26/certificate_tutorial_week6_aglo/blob/main/TestProject_week6_AlgoTrading.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Nessim BOUALLAI - Algo Trading certificate - week6

In [2]:
import numpy as np
import pandas as pd
from pylab import plt
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.metrics import accuracy_score

In [15]:
class BackTest_Event:
  def __init__(self, symbol, start, end, amount,ftc=0.0, ptc=0.0, splitRatio=0.7,lags=5,verbose=True):
        self.symbol = symbol
        self.start = start
        self.end = end
        self.initial_amount = amount  
        self.amount = amount  
        self.ftc = ftc  
        self.ptc = ptc  
        self.units = 0  
        self.position = 0  
        self.trades = 0  
        self.verbose = verbose  
        self.get_data()
        self.get_features(splitRatio,lags)
        self.accuracy={}
        self.accuracy["accuracy_test"]={}
        self.accuracy["accuracy_train"]={}
  
  def reinit(self,amount):
    self.initial_amount = amount  
    self.amount = amount  
    self.units = 0  
    self.position = 0  
    self.trades = 0 

  def get_date_price(self, bar):
        ''' Return date and price for bar. '''
        date = str(self.data.index[bar])[:10]
        price = self.data.price.iloc[bar]
        return date, price

  def print_balance(self, bar):
        ''' Print out current cash balance info.'''
        date, price = self.get_date_price(bar)
        print(f'{date} | current balance {self.amount:.2f}')

  def print_net_wealth(self, bar):
        ''' Print out current cash balance info.    '''
        date, price = self.get_date_price(bar)
        net_wealth = self.units * price + self.amount
        print(f'{date} | current net wealth {net_wealth:.2f}')

  def place_buy_order(self, bar, units=None, amount=None):
        ''' Place a buy order.
        '''
        date, price = self.get_date_price(bar)  
        if units is None:  
            units = int(amount / price)  
        self.amount -= (units * price) * (1 + self.ptc) + self.ftc  
        self.units += units  
        self.trades += 1  
        if self.verbose:  
            print(f'{date} | buying {units} units at {price:.2f}')  
            self.print_balance(bar)  
            self.print_net_wealth(bar)  
  
  def place_sell_order(self, bar, units=None, amount=None):
        ''' Place a sell order.
        '''
        date, price = self.get_date_price(bar)
        if units is None:
            units = int(amount / price)
        self.amount += (units * price) * (1 - self.ptc) - self.ftc  
        self.units -= units  
        self.trades += 1
        if self.verbose:
            print(f'{date} | selling {units} units at {price:.2f}')
            self.print_balance(bar)
            self.print_net_wealth(bar)
  
  def close_out(self, bar):
        ''' Closing out a long or short position.
        '''
        date, price = self.get_date_price(bar)
        self.amount += self.units * price
        self.units = 0
        self.trades += 1
        if self.verbose:
            print(f'{date} | inventory {self.units} units at {price:.2f}')
            print('=' * 55)
        print('Final balance   [$] {:.2f}'.format(self.amount))
        perf = ((self.amount - self.initial_amount) /
                self.initial_amount * 100)
        print('Net Performance [%] {:.2f}'.format(perf))
        print('Trades Executed [#] {:.2f}'.format(self.trades))
  
  def go_long(self, bar, units=None, amount=None):  
        if self.position == -1:  
            self.place_buy_order(bar, units=-self.units)  
        if units:  
            self.place_buy_order(bar, units=units)  
        elif amount:  
            if amount == 'all':  
                amount = self.amount  
            self.place_buy_order(bar, amount=amount)  

  def go_short(self, bar, units=None, amount=None):
        if self.position == 1:
            self.place_sell_order(bar, units=self.units)
        if units:
            self.place_sell_order(bar, units=units)
        elif amount:
            if amount == 'all':
                amount = self.amount
            self.place_sell_order(bar, amount=amount)

  def get_data(self):
        ''' Retrieves and prepares the data.
        '''
        url='http://hilpisch.com/pyalgo_eikon_eod_data.csv'
        raw = pd.read_csv(url, index_col=0, parse_dates=True).dropna()
        raw = pd.DataFrame(raw[self.symbol])
        raw = raw.loc[self.start:self.end]
        raw.rename(columns={self.symbol: 'price'}, inplace=True)
        self.data = raw.dropna()
  
  def get_indicators(self,SMA1=20,SMA2=60):
    self.data['r'] = np.log(self.data["price"] / self.data["price"].shift(1))
    self.data['d'] = np.where(self.data['r'] > 0, 1, -1)

    bins = [-0.0075, -0.005, 0.005, 0.0075]
    self.data['d_'] = np.digitize(self.data['r'], bins=bins)
    
    self.data['SMA1'] = self.data["price"].rolling(SMA1).mean()
    self.data['SMA2'] = self.data["price"].rolling(SMA2).mean()
    self.data['SMA_'] = self.data['SMA1'] - self.data['SMA2']
    self.data['EWMA1'] = self.data["price"].ewm(halflife=SMA1).mean()
    self.data['EWMA2'] = self.data["price"].ewm(halflife=SMA2).mean()
    self.data['EWMA_'] = self.data['EWMA1'] - self.data['EWMA2']
    self.data['V1'] = self.data['r'].rolling(SMA1).std()
    self.data['V2'] = self.data['r'].rolling(SMA2).std()
    self.data.dropna(inplace=True)

  def normalize(self,x, mu, std):
    return (x - mu) / std

  def get_features(self, SMA1=20,SMA2=60,splitRatio=0.7,lags=5):
    self.get_indicators(SMA1=20,SMA2=60)
    
    split = int(len(self.data) * splitRatio)
    self.train = self.data.iloc[:split].copy()
    self.test = self.data.iloc[split:].copy()
    
    cols = ['r','d','d_','SMA1','SMA2','SMA_','EWMA1','EWMA2','EWMA_','V1','V2']
    #exclude for normalisation
    exclude = ['r', 'd', 'd_', 'V1', 'V2']

    #normalisation
    mu, std = self.train.mean(), self.train.std()

    cols_ = list()
    for col in cols:
      for lag in range(1, lags + 1):
        col_ = col + f'_lag_{lag}'
        if col not in exclude:
           # Lag feature and normalize using mu and std previously calculated
          self.train[col_] = self.normalize(self.train[col].shift(lag),mu[col], std[col])
          self.test[col_] = self.normalize(self.test[col].shift(lag),mu[col], std[col])
        else:
          # Lag feature
          self.train[col_] = self.train[col].shift(lag)
          self.test[col_] = self.test[col].shift(lag)
        cols_.append(col_)

    self.train.dropna(inplace=True)
    self.test.dropna(inplace=True)
    self.features=cols_
    
  def backtest(self,m):
    models = {'gauss': GaussianNB(),
          'logreg': LogisticRegression(C=1, solver='lbfgs', max_iter=500),
          'dtc': DecisionTreeClassifier(max_depth=10),
          'svm': SVC(C=1, gamma='auto', kernel='linear'),
          'mlp': MLPClassifier(hidden_layer_sizes=[64], shuffle=False,max_iter=1000)
          }
    model = models[m]
    model.fit(self.train[self.features], self.train['d'])

    self.train['p_' + m] = model.predict(self.train[self.features])
    self.train['p_' + m] = np.where(self.train['p_' + m] > 0, 1, -1)
    self.accuracy["accuracy_train"][m]=accuracy_score(self.train['d'],self.train['p_' + m])

    self.test['p_' + m] = model.predict(self.test[self.features])
    self.test['p_' + m] = np.where(self.test['p_' + m] > 0, 1, -1)
    self.accuracy["accuracy_test"][m]=accuracy_score(self.test['d'],self.test['p_' + m])

    for bar in range(0, len(self.test)):
      if self.position == 0:  
        if (self.test['p_' + m].iloc[bar]==1):  
          self.go_long(bar, amount=self.initial_amount)  
          self.position = 1  
        elif (self.test['p_' + m].iloc[bar] == -1):  
          self.go_short(bar, amount=self.initial_amount)
          self.position = -1  
      elif self.position == 1:  
          if (self.test['p_' + m].iloc[bar]==-1):  
            self.place_sell_order(bar, units=2*self.units)  
            self.position = 1  
      elif self.position == -1:  
        if (self.test['p_' + m].iloc[bar]==1):  
            self.place_buy_order(bar, units=-2*self.units)  
            self.position = 1
      
    self.close_out(bar)



In [16]:
symbol="EUR="
start="2010-01-01"
end="2019-12-31"

amount=100.0
ftc=0.00
ptc=0.00
verbose=False
splitRatio=0.7
lags=5
models=['gauss','logreg','dtc','svm','mlp']
a=BackTest_Event(symbol, start, end, amount,ftc, ptc, splitRatio,lags,verbose)

for m in models:
  print("Backtest model "+m)
  a.backtest(m)
  print('Accuracy train   {:.3f}'.format(a.accuracy["accuracy_train"][m]))
  print('Accuracy test    {:.3f}'.format(a.accuracy["accuracy_test"][m]))
  print('=' * 55)
  a.reinit(amount=amount)




Backtest model gauss
Final balance   [$] 99.52
Net Performance [%] -0.48
Trades Executed [#] 565.00
Accuracy train   0.536
Accuracy test    0.507
Backtest model logreg
Final balance   [$] 101.77
Net Performance [%] 1.77
Trades Executed [#] 210.00
Accuracy train   0.543
Accuracy test    0.496
Backtest model dtc
Final balance   [$] 122.12
Net Performance [%] 22.12
Trades Executed [#] 271.00
Accuracy train   0.663
Accuracy test    0.477
Backtest model svm
Final balance   [$] 89.00
Net Performance [%] -11.00
Trades Executed [#] 210.00
Accuracy train   0.538
Accuracy test    0.507
Backtest model mlp
Final balance   [$] 109.08
Net Performance [%] 9.08
Trades Executed [#] 538.00
Accuracy train   0.653
Accuracy test    0.469
