In [1]:
# Import Packages
import pandas as pd
import numpy as np 
import os 
import glob
import math
from collections import OrderedDict
from datetime import timedelta

# Import date and time zone libraries
import datetime
import pytz

# Import visualization
import matplotlib.pyplot as plt

# This ensures our graphs will be shown properly in the notebook
%matplotlib inline


In [3]:
class Backtest(object):
    """
    Backtest Engine
    """  
    def __init__(self, symbol, initial_capital, start_date, end_date, max_buy=1, max_sell=1, leverage,
                 root_path, file_name, ftc=0.0, ptc=0.0, verbose=True):
        
        self.symbol = symbol # Underlying to trade
        self.initial_capital = initial_capital # Initial Capital
        self.cash = initial_capital # Current cash on hand
        self.start_date = start_date
        self.end_date = end_date
        self.ftc = ftc # Fixed transition cost 
        self.ptc = ptc # Percentage transition cost
        self.verbose = verbose # Control to print output or not
        self.max_buy = max_buy 
        self.max_sell = max_sell
        self.leverage = leverage # Set the maximum leverage of portfolio
        
        # Output path
        self.root_path = root_path
        self.file_name = file_name
        self.log_output_path = os.path.join(self.root_path, self.file_name + '.txt') # Log output path

        self.cur_datetime = 0
        self.cur_date = 0
        self.order_dict = OrderedDict()
        self.position = 0
        self.trades = 0
        self.total = 0
        self.holding = 0
        
        self.buy_barIndex = [] # Record bar index of "Buy" event
        self.sell_barIndex = [] # Record bar index of "Sell" event
        
        self.pnl = 0
        self.list_of_pnl = []
        self.portfolio_pnl = [initial_capital]
        
        
    def go_long(self, barIndex, order_info): #price, units=None, amount=None):
        """
        Go Long position (buy)
        -----
        Args:
            barIndex(int) : current bar index
            order_info(dict) : dictionary of all order info (e.g. symbol, price, units)
        """
        self.symbol = order_info['Symbol']
        price = order_info['RequestPrice']
        units = order_info['Unit']
        amount = order_info['Amount']
        stopProfit = order_info['StopProfit']
        stopLoss = order_info['StopLoss']
        stopTimeBar = order_info['StopTimeBar']
        
        # Store the order info another dictionary
        self.order_dict[barIndex] = {'Datetime' : self.cur_datetime,
                                     'Symbol' : self.symbol, 
                                     'RequestPrice' : price,
                                     'ExecutionPrice' : price,
                                     'Unit' : units,
                                     'Amount' : amount,
                                     'LongShort': 1, 
                                     'StopProfit' : stopProfit, 
                                     'StopLoss' : stopLoss, 
                                     'StopTimeBar' : stopTimeBar}
        
        # Check if having short position
        if self.position < 0:
            # Check any Short position on same underlying
            for order in self.order_dict.copy():
                order_symbol = self.order_dict[order]['Symbol']
                LongShort = self.order_dict[order]['LongShort']
                if (order_symbol == self.symbol) and (LongShort == -1):
                    self.close_out(order, price) 
        # Check trading on units or amount
        if units: 
            self.place_long_order(barIndex, price, units=units) 
        elif amount: 
            if amount == 'All': 
                amount = self.cash 
            self.place_long_order(barIndex, price, amount=amount)
                
        
    def go_short(self, barIndex, order_info): #price, units=None, amount=None):
        """
        Go Short position (sell)
        -----
        Args:
            barIndex(int) : current bar index
            order_info(dict) : dictionary of all order info (e.g. symbol, price, units)
        """
        self.symbol = order_info['Symbol']
        price = order_info['RequestPrice']
        units = order_info['Unit']
        amount = order_info['Amount']
        stopProfit = order_info['StopProfit']
        stopLoss = order_info['StopLoss']
        stopTimeBar = order_info['StopTimeBar']
        
        # Store the order info another dictionary
        self.order_dict[barIndex] = {'Datetime' : self.cur_datetime,
                                     'Symbol' : self.symbol,
                                     'RequestPrice' : price,
                                     'ExecutionPrice' : price,
                                     'Unit' : units,
                                     'Amount' : amount,
                                     'LongShort': -1,
                                     'StopProfit' : stopProfit, 
                                     'StopLoss' : stopLoss, 
                                     'StopTimeBar' : stopTimeBar}
        
        # Check if having long postion
        if self.position > 0:
            # Check any Long position on same underlying
            for order in self.order_dict.copy():
                order_symbol = self.order_dict[order]['Symbol']
                LongShort = self.order_dict[order]['LongShort']
                if (order_symbol == self.symbol) and (LongShort == 1):
                    self.close_out(order, price) 
        # Check trading on units or amount
        if units:
            self.place_sell_order(barIndex, price, units=units) 
        elif amount:
            if amount == 'All':
                amount = self.cash
            self.place_sell_order(barIndex, price, amount=amount)  

    
    def close_out(self, orderID, price, stop_all=None):
        """ 
        Closing out a long or short position 
        -----
        Args:
            orderID(int) : bar index as orderID
            price(float) : order price
        """
        # Check if closing out all existing possitions
        if stop_all:
            # Loop over each order
            for order in self.order_dict.copy():
                
                executedPrice = self.order_dict[order]['ExecutionPrice']
                executedUnit = self.order_dict[order]['Unit']
                
                if self.order_dict[order]['LongShort'] == 1: # Close Long Order
                    gain = (price-executedPrice) * executedUnit * self.Product_Info(price)['leverage'] - (executedPrice*executedUnit)*self.ptc - self.ftc*executedUnit
                    self.trades += 1
                    self.position = self.position - executedUnit
                    self.cash += (executedPrice*executedUnit) + gain
                    self.holding = self.position * price
                    self.total = self.cash + self.holding
                    # Store pnl value
                    self.pnl += gain
                    self.portfolio_pnl.append(self.initial_capital + self.pnl)
                    self.list_of_pnl.append([self.cur_datetime, self.cur_date, gain])
                    self.sell_barIndex.append(order) # Record for visualization
                    if self.verbose:
                        self.write_log('%s | short %s unit at %f | pnl: %f'% (self.cur_datetime, 
                                                                                        self.order_dict[order]['Unit'],
                                                                                        price, gain))
                else: # Close Short Order
                    gain = (executedPrice-price)*executedUnit*self.Product_Info(price)['leverage'] - (executedPrice*executedUnit)*self.ptc - self.ftc*executedUnit
                    self.trades += 1
                    self.position = self.position + executedUnit
                    self.cash +=  (executedPrice*executedUnit) + gain
                    self.holding = abs(self.position * price)
                    self.total = self.cash + self.holding
                    # Store pnl value
                    self.pnl += gain
                    self.portfolio_pnl.append(self.initial_capital + self.pnl)
                    self.list_of_pnl.append([self.cur_datetime, self.cur_date, gain])
                    self.buy_barIndex.append(order) # Record for visualization
                    if self.verbose:
                        self.write_log('%s | long %s unit at %f | pnl: %f'% (self.cur_datetime, 
                                                                                        self.order_dict[order]['Unit'],
                                                                                        price, gain))
                del self.order_dict[order]
                self.print_balance()
        else:
            executedPrice = self.order_dict[orderID]['ExecutionPrice']
            executedUnit = self.order_dict[orderID]['Unit']
            
            if self.order_dict[orderID]['LongShort'] == 1: # Close Long Order on specific one trade
                gain = (price-executedPrice) * executedUnit * self.Product_Info(price)['leverage'] - (executedPrice*executedUnit)*self.ptc - self.ftc*executedUnit
                self.trades += 1
                self.position = self.position - executedUnit
                self.cash += (executedPrice*executedUnit) + gain
                self.holding = self.position * price
                self.total = self.cash + self.holding
                # Store pnl value
                self.pnl += gain
                self.portfolio_pnl.append(self.initial_capital + self.pnl)
                self.list_of_pnl.append([self.cur_datetime, self.cur_date, gain])
                self.sell_barIndex.append(orderID) # Record for visualization
                if self.verbose:
                    self.write_log('%s | short %s unit at %f | pnl: %f'% (self.cur_datetime, 
                                                                                     self.order_dict[orderID]['Unit'],
                                                                                     price, gain))
            else: # Close Short Order
                gain = (executedPrice-price) * executedUnit * self.Product_Info(price)['leverage'] - (executedPrice*executedUnit)*self.ptc - self.ftc*executedUnit
                self.trades += 1
                self.position = self.position + executedUnit
                self.cash +=  (executedPrice*executedUnit) + gain
                self.holding = abs(self.position * price)
                self.total = self.cash + self.holding
                # Store pnl value
                self.pnl += gain
                self.portfolio_pnl.append(self.initial_capital + self.pnl)
                self.list_of_pnl.append([self.cur_datetime, self.cur_date, gain])
                self.buy_barIndex.append(orderID) # Record for visualization
                if self.verbose:
                    self.write_log('%s | long %s unit at %f | pnl: %f'% (self.cur_datetime, 
                                                                                    self.order_dict[orderID]['Unit'],
                                                                                    price, gain))
            del self.order_dict[orderID]
            self.print_balance()
        
        
    def place_long_order(self, barIndex, price, units=None, amount=None):
        """ 
        Place a Long (buy) order. 
        -----
        Args:
            barIndex(int) : bar index as OrderID
            price(float) : order price
            units(int) : trading units
            amount(int) : $ cash amount used for trade
        """
        if units is None:
            # Calculate the units to trade
            units = math.floor(self.cash / price) 
        
        self.position += units 
        # Cash (Available capital)
        self.cash -= (units * price) # * (1 + self.ptc) + self.ftc 
        # Holding (position amount)
        self.holding += (units*price)
        self.total = self.cash + self.holding
        self.buy_barIndex.append(barIndex) # Record for visualization
        if self.verbose:
            self.write_log('%s | long %s unit at %f '% (self.cur_datetime, units, price))
        self.print_balance()

        
    def place_sell_order(self, barIndex, price, units=None, amount=None):
        """ 
        Place a Short (sell) order.
        -----
        Args:
            barIndex(int) : bar index as OrderID
            price(float) : order price 
            units(int) : trading units
            amount(int) : $ cash amount used for trade
        """
        if units is None:
            # Calculate the units to trade
            units = math.floor(self.cash / price) 
            
        self.position -= units
        # Cash (Available capital)
        self.cash -= (units * price) # * (1 + self.ptc) + self.ftc 
        # Holding (position amount)
        self.holding += abs(units*price)
        self.total = self.cash + self.holding
        self.sell_barIndex.append(barIndex) # Record for visualization
        if self.verbose:
            self.write_log('%s | short %s units at %f '% (self.cur_datetime, units, price))
        self.print_balance()
        
            
    def Market_Data_System(self, cur_datetime):
        """ 
        Check the current datetime. 
        -----
        Args:
            cur_datetime(pd.datetime) : current datetime
        -----
        Returns:
            boolean : can trade or not
        """
        # the HSI time should be > 09:30:00 and < 16:00:00
        market_time = {"HSIF": [' 09:30:00', ' 16:00:00'],
                       "MHI": [' 09:30:00', ' 16:00:00'],
                       "Stocks": [' 09:30:00', ' 16:00:00']}
        
        market_open = market_time[self.symbol][0]
        market_close = market_time[self.symbol][1]

        cur_date = pd.to_datetime(cur_datetime, format= '%Y-%m-%d %H:%M:%s').strftime('%Y-%m-%d')
        self.cur_datetime = cur_datetime
        self.cur_date = cur_date
        
        if (cur_datetime >= pd.to_datetime(cur_date+market_open)) and \
                                (cur_datetime <= pd.to_datetime(cur_date+market_close)):
            return True
        else:
            return False    
     
    
    def Product_Info(self, price):
        """
        Get the underlying info 
        -----
        Args:
            price(float) : Current price of the underlying
        -----
        Return:
            dict : dictionary of the product info  
        """
        product_info = {'HSIF': {'required_capital': 100000, 'leverage': 50},
                        'MHI': {'required_capital': 50000, 'leverage': 10},
                        'Stocks': {'required_capital': price, 'leverage': 1}}
        return product_info[self.symbol]
    
    
    def Risk_Management_System(self, cur_datetime, price):
        """ 
        Manage the strategy risk (E.g. Max Buy/ Sell, Leverage and global risk, product margin) 
        -----
        Args:
            price(float) : current price
        -----
        Returns:
            Boolean : tradable or not
        """
        cur_date = pd.to_datetime(cur_datetime, format= '%Y-%m-%d %H:%M:%s').strftime('%Y-%m-%d')
        self.cur_datetime = cur_datetime
        self.cur_date = cur_date
        
        # Strategy level risk 
        if (self.position >= self.max_buy) or (self.position <= -self.max_sell):
            return False
        
        product_dict = self.Product_Info(price)
        if (((price*product_dict['leverage'])/ self.initial_capital)*self.position) > self.leverage:
            return False
    
        # Global level risk - Margin Requirement
        if self.cash < product_dict['required_capital']:
            self.write_log('%s | Balances %f | not enough money on %f '%(self.cur_datetime, self.cash, price))
            return False
        return True

            
    def create_log(self):
        """ Create Log File for each backtest. """
        log = open(self.log_output_path, "w") 
        log.close()
        
        
    def write_log(self, message):
        """
        Write Log
        -----
        Args:
            message(str) : message to write into log
        """
        log = open(self.log_output_path, "a") 
        log.write(str(message))
        log.write('\n')
        log.close()
    
    
    def start_of_day(self):
        """ Perform sth on the start of day. """
        pass
    
    
    def end_of_day(self, cur_datetime, cut_off_datetime, Price):
        """ 
        Check end of day and close all position.
        -----
        Args:
            cur_datetime : 
            cut_off_datetime : 
            Price : 
        """
        cur_date = pd.to_datetime(cur_datetime, format= '%Y-%m-%d %H:%M:%s').strftime('%Y-%m-%d')
        # Check time and current postion
        if (cur_datetime >= pd.to_datetime(cur_date+' '+cut_off_datetime)) & (self.position!=0):
            self.close_out(self.Bar, Price, 'All')
            self.position = 0 
                
    
    def resampling(self, df_data, timeframe='1T'):
        """ 
        Resampling to OHLC format 
        -----
        Args:
            df_data(pd.DataFrame) : price dataframe in OHLC format
        -----
        Returns:
            pd.DataFrame : Resamplied data
        """
        df_open = df_data.Open.resample(timeframe).first().reset_index(name='Open')
        df_high = df_data.High.resample(timeframe).max().reset_index(name='High')
        df_low = df_data.Low.resample(timeframe).min().reset_index(name='Low')
        df_close = df_data.Close.resample(timeframe).last().reset_index(name='Close')
        
        df = df_open.copy()
        df['High'] = df_high['High']
        df['Low'] = df_low['Low']
        df['Close'] = df_close['Close']
        return df
    
    
    def print_balance(self, date=''):
        """ Print out current cash balance info. """
        self.write_log('%s | Total balances %8d' % (self.cur_datetime, self.total))
        self.write_log('-' * 50)
        
      
    def get_pnl(self):
        """ Return list of pnl on all trades """
        return self.list_of_pnl
    
    
    def get_cur_balance(self):
        """ Return current balance info. """
        return self.total
        
        
    def get_order_info(self):
        """ Return current order info. """
        return self.order_dict
    
    
    def get_cur_position(self):
        """ Return current position. """
        return self.position
    
        
    def buy_sell_signals_plot(self, price):
        """
        Output plot to show the buy/sell signals on the price data
        -----
        Args:
            price(pd.DataFrame) : a series of price data
        """
        # Output path
        png_name = os.path.join(self.root_path, self.file_name+'_signals_plot' + '.png')

        # Visualize
        fig = plt.figure(figsize = (15,5))
        plt.plot(price, color='r', lw=2.)
        plt.plot(price, '^', markersize=10, color='m', label = 'buying signal', markevery = self.buy_barIndex)
        plt.plot(price, 'v', markersize=10, color='k', label = 'selling signal', markevery = self.sell_barIndex)
        plt.title('Total No. of Trades  ( %f )'%(self.trades))
        plt.legend()
        plt.savefig(png_name)
        plt.show()
        
        
    def potfolio_pnl_plot(self):
        """
        Output PnL diagram
        -----
        Args:
            png_name : output path
        """
        # Output path
        png_name = os.path.join(self.root_path, self.file_name+'_portfolioPnL_plot' + '.png')

        # Visualize
        fig = plt.figure(figsize = (15,5))
        # Plot the equity curve (PnL curve)
        plt.plot(self.portfolio_pnl, color='r', lw=2.)
        
        # Sharpe Ratio
        df_sharpe = pd.DataFrame(self.list_of_pnl)
        df_sharpe = df_sharpe.reset_index(drop=True)
        df_sharpe.columns = ['Datetime', 'Date', 'PnL']
        #df_sharpe = df_sharpe.groupby('Date')['PnL'].sum().reset_index(name='Daily_PnL')
        mean_sharpe = df_sharpe['PnL'].mean()
        std_sharpe = df_sharpe['PnL'].std() 
        #Sharpe = (self.pnl/ (std_sharpe* np.sqrt(252)) )
        sharpe_ratio = (mean_sharpe / std_sharpe) * np.sqrt(252)
        SQN = (mean_sharpe / std_sharpe) * np.sqrt(len(df_sharpe))
        
        # Winning ratio
        list_of_pnl_values = [value[2] for value in self.list_of_pnl]
        winning_trades = [trade_pnl for trade_pnl in list_of_pnl_values if trade_pnl >= 0]
        winning_ratio = len(winning_trades) / len(list_of_pnl_values)
        
        # Averge win / average loss
        lossing_trades = [trade_pnl for trade_pnl in list_of_pnl_values if trade_pnl < 0]
        avg_win_loss = np.mean(winning_trades) / (-np.mean(lossing_trades))
        
        # Max. Drawdown by Trades 
        max_drawdown_trade = 0
        count_drawdown = 0
        for trade_pnl in list_of_pnl_values:
            if trade_pnl < 0:
                count_drawdown += trade_pnl
            else:
                count_drawdown = 0
            if count_drawdown < max_drawdown_trade:
                max_drawdown_trade = count_drawdown
                
        # Max. Drawdown by day
        max_drawdown_day = 0
        count_drawdown = 0
        df_sharpe = df_sharpe.groupby('Date')['PnL'].sum().reset_index(name='Daily_PnL')
        for value in df_sharpe['Daily_PnL'].tolist():
            if value < 0:
                count_drawdown += value
            else:
                count_drawdown = 0
            if count_drawdown < max_drawdown_day:
                max_drawdown_day = count_drawdown
        
        # Plot
        plt.title('Cumulative PnL ( %f) | Sharpe Ratio (%f) | SQN (%f) | Winning Ratio (%f) | Avg. win/loss (%f) | Max. Drawdown(trades) (%f) | Max. Drawdown(days) (%f) '%(self.pnl, sharpe_ratio, 
                                                                                                           SQN,
                                                                                                           winning_ratio,
                                                                                                           avg_win_loss, max_drawdown_trade, max_drawdown_day))     
        plt.legend()
        plt.savefig(png_name)
        plt.show()

        
    def PnL_plot(self):
        """
        Output Returns by trades diagram
        -----
        Args:
            png_name : output path
        """
        # Output path
        png_name = os.path.join(self.root_path, self.file_name+'_tradePnL_plot' + '.png')

        # Visualize
        fig = plt.figure(figsize = (15,5))
        # Get the returns (PnL)
        list_of_pnl_values = [value[2] for value in self.list_of_pnl]
        
        # Plot
        plt.bar(range(len(list_of_pnl_values)), list_of_pnl_values, width = 0.3, color='r')
        plt.savefig(png_name)
        plt.show()
        
        
    def Output_Pnl_Report(self):
        """
        Output PnL Report by trades, daily and monthly
        """    
        # Output path
        trade_Pnl_output_path = os.path.join(self.root_path, self.file_name+'_trade' + '.txt')
        daily_pnl_output_path = os.path.join(self.root_path, self.file_name+'_daily' + '.txt')
        monthly_pnl_output_path = os.path.join(self.root_path, self.file_name+'_monthly' + '.txt')
        
        # Convert list of pnl into dataframe
        df_pnl = pd.DataFrame(self.list_of_pnl)
        df_pnl = df_pnl.reset_index(drop=True)
        df_pnl.columns = ['Datetime', 'Date', 'PnL']
        df_pnl.to_csv(trade_Pnl_output_path, sep='\t', index=False)
        
        # Daily Pnl
        df_daily_pnl = df_pnl.groupby('Date')['PnL'].sum().reset_index(name='Daily_PnL')
        df_daily_pnl.to_csv(daily_pnl_output_path, sep='\t', index=False)
        
        # Monthly PnL
        df_monthly_pnl = df_daily_pnl.copy()
        df_monthly_pnl['Month'] = pd.to_datetime(df_monthly_pnl['Date'], format= '%Y-%m-%d').dt.strftime('%Y-%m')
        df_monthly_pnl = df_monthly_pnl.groupby('Month')['Daily_PnL'].sum().reset_index(name='Monthly_PnL')
        df_monthly_pnl.to_csv(monthly_pnl_output_path, sep='\t', index=False)
        print(df_monthly_pnl)

        