In [44]:
import pandas as pd
import numpy as np

# Constants - Market Data
DATE = 'date'
OPEN = 'open'
HIGH = 'high'
LOW = 'low'
CLOSE = 'close'
ADJ_CLOSE = 'adj_close'
VOLUME = 'volume'

# Constants - Position
BULLISH = 'bullish'
BEARISH = 'bearish'

# Constants - Operator
EQUAL = 'equal'
LARGER_THAN = 'larger_than'
LESS_THAN = 'less_than'
LARGER_THAN_OR_EQUAL = 'larger_than_or_equal'
LESS_THAN_OR_EQUAL = 'less_than_or_equal'

# Constants - Strategy Events
OPEN_POSITION = 'open_position'
CLOSE_POSITION = 'close_position'
STOP_GAIN = 'stop_gain'
STOP_LOSS = 'stop_loss'
NO_EVENT = 'no_event'

In [45]:
class DataCleaner():
    
    def __init__(self):
        self.dropna_method = 'all'
        self.is_fill_missing = True
    
    def clean(self, raw_data):
        raw_data = raw_data.dropna(how=self.dropna_method)
        if self.is_fill_missing:
            raw_data = self.fill_missing_date(raw_data)
        return raw_data
    
    @staticmethod
    def fill_missing_date(raw_data):
        cols = list(raw_data.columns)
        cols.remove(DATE)
        for col in cols:
            for i in raw_data.index:
                element = raw_data.at[i, col]
                if np.isnan(element) or element == 0:
                    missing_data_date = raw_data.at[i, DATE]
                    ref_data_date = raw_data.at[i-1, 'date']
                    raw_data.at[i, col] = raw_data.at[i-1, col]
                    print('[CLEAN] Copy data ({}, {}) to ({}, {}))'
                          .format(ref_data_date, col, missing_data_date, col))
        return raw_data

In [46]:
class CCIParam():
    def __init__(self, period, coeff):
        self.period = period
        self.coeff = coeff


class CCI():
    
    def __init__(self, param):
        self.param = param
        
    def compute(self, data):
        
        # typical price
        data['typical'] = data[[HIGH, LOW, CLOSE]].apply(lambda row: self.typical_price(row), axis=1)
        
        # moving average of typical price
        data['MA'] = data['typical'].rolling(window=self.param.period).mean()
        
        # absolute difference between typical price and MA
        data['abs_diff'] = data[['typical', 'MA']].apply(lambda row: abs(row['typical'] - row['MA']), axis=1)
        
        # mean deviation
        data['MD'] = data['abs_diff'].rolling(window=self.param.period).mean()
        
        # CCI
        data['CCI'] = data[['typical', 'MA', 'MD']].apply(lambda row: self.cci_formula(row), axis=1)
        
        # drop columns for intermediate steps
        data = data.drop(columns=['typical', 'MA', 'abs_diff', 'MD'])
        
        print('[DATA] Finished computation of CCI')
        return data
        
    @staticmethod
    def typical_price(row):
        return (row[HIGH] + row[LOW] + row[CLOSE]) / 3
    
    def cci_formula(self, row):
        return (row['typical'] - row['MA']) / (self.param.coeff * row['MD'])

In [47]:
class Rule():
    def __init__(self, name, indicators_required, f):
        self.name = name
        self.indicators_required = indicators_required
        self.f = f

class OpenPositionParam():
    def __init__(self, rules):
        self.rules = rules  # rule: (data, entry_index=None, current_index) -> boolean

class ClosePositionParam():
    def __init__(self, rules):
        self.rules = rules  # rule : (data, entry_index, current_index) -> boolean


class Strategy():

    def __init__(self, name, open_position_param, close_position_param, position):
        self.name = name
        self.open_position = open_position_param
        self.close_position = close_position_param
        self.position = position
        self.have_position = False
        self.entry_index = None
        
    def check_event(self, data, current_index):       
        if not self.have_position:  # check for open position
            rule_triggered = self.check_rules(data, current_index, self.open_position.rules)
            if rule_triggered:
                self.have_position = True
                self.entry_index = current_index
                return OPEN_POSITION, rule_triggered
            
        else:  # check for close positioin
            rule_triggered = self.check_rules(data, current_index, self.close_position.rules)
            if rule_triggered:
                self.have_position = False
                self.entry_index = None
                return CLOSE_POSITION, rule_triggered
        
        return NO_EVENT, None
    
    def check_rules(self, data, current_index, rules):
        # logical OR on all the rules
        entry_index = self.entry_index
        for rule in rules:
            if rule.f(data, entry_index, current_index):
                return rule.name
        return False

In [50]:
class BackTest():
    
    def __init__(self, raw_data, strategies):
        self.data = raw_data
        self.trade_record = pd.DataFrame()
        self.report = pd.DataFrame()
        self.cleaner = DataCleaner()
        self.strategies = strategies
        self.cci = CCI(CCIParam(20, 0.015))
        
    def initialize(self):
        self.initialize_trade_record()
        self.initialize_report()
        print('[BACKTEST] Initializated')
    
    def initialize_trade_record(self):
        self.trade_record = pd.DataFrame()
        self.trade_record['date'] = None
        self.trade_record['strategy_name'] = None
        self.trade_record['position'] = None
        self.trade_record['event'] = None
        self.trade_record['rule'] = None
        self.trade_record['price'] = None
        self.trade_record['trade_return'] = None
        
    def initialize_report(self):
        self.report = pd.DataFrame()
        self.report['strategy_name'] = None
        self.report['occurrence'] = None
        self.report['num_profit'] = None
        self.report['num_loss'] = None
        self.report['return_average'] = None
        self.report['return_std'] = None
        
    def data_preprocess(self):
        self.data = self.cleaner.clean(self.data)
        print('[BACKTEST] Finished data pre-process')
        
    def compute_technical_indicator(self):
        self.data = self.cci.compute(self.data)
        print('[BACKTEST] Finished computation on technical indicators')
    
    # ensure the indicators data are available
    def validate_indicator_data(self):
        indicators = set()
        for strategy in self.strategies:
            for rule in strategy.open_position.rules:
                indicators.update(rule.indicators_required)
            for rule in strategy.close_position.rules:
                indicators.update(rule.indicators_required)
                
        if indicators.issubset(self.data.columns):
            print('[BACKTEST] Validated availability of indicator data: {}'.format(indicators))
        else:
            raise ValueError('[ERROR] Missing indicator data: {}'.format(indicators - set(self.data.columns)))
        
    def back_test(self):
        for strategy in self.strategies:
            for i in self.data.index:
                event, rule = strategy.check_event(self.data, i)
                if event is not NO_EVENT:
                    date = self.data.at[i, DATE]
                    price = self.data.at[i, CLOSE]
                    self.record_event(date, strategy, event, rule, price)
            print('[BACKTEST] Finished backtest on {}'.format(strategy.name))
        self.calculate_return()
        print('[BACKTEST] Finished all backtest')
    
    def record_event(self, date, strategy, event, rule, price):
        new_row = {'date': date,
                   'strategy_name': strategy.name,
                   'position': strategy.position,
                   'event': event,
                   'rule': rule,
                   'price': price}
        self.trade_record = self.trade_record.append(new_row, ignore_index=True)
        
    def calculate_return(self):
        self.trade_record.loc[self.trade_record['event'] == OPEN_POSITION, 'trade_return'] = np.NaN
        
        for strategy in self.strategies:
            
            strategy_trade_record = self.trade_record[self.trade_record['strategy_name'] == strategy.name]
            strategy_trade_record['original_id'] = strategy_trade_record.index
            strategy_trade_record = strategy_trade_record.reset_index()
            
            for i in strategy_trade_record.index:
                
                if strategy_trade_record.at[i, 'event'] == CLOSE_POSITION:
                    date = strategy_trade_record.at[i, DATE]
                    entry_price = strategy_trade_record.at[i-1, 'price']
                    exit_price = strategy_trade_record.at[i, 'price']
                    trade_return = self.compute_return(entry_price, exit_price, strategy.position)
                    original_id = strategy_trade_record.at[i, 'original_id']
                    self.trade_record.at[original_id, 'trade_return'] = trade_return
                    
    @staticmethod
    def compute_return(entry_price, exit_price, position):
        if position == BULLISH:
            return (exit_price - entry_price) / entry_price
        elif position == BEARISH:
            return -1 * (exit_price - entry_price) / entry_price
        else:
            raise ValueError('[ERROR] Unknown Position')
    
    def gen_report(self):
        for strategy in self.strategies:
            df = self.trade_record[self.trade_record['strategy_name'] == strategy.name]
            
            occurrence = df[df['event'] == CLOSE_POSITION].shape[0]
            num_profit = df[df['trade_return'] > 0].shape[0]
            num_loss = df[df['trade_return'] < 0].shape[0]
            return_average = df['trade_return'].mean()
            return_std = df['trade_return'].std()
            
            new_row = {'strategy_name': strategy.name,
                       'occurrence': occurrence,
                       'num_profit': num_profit,
                       'num_loss': num_loss,
                       'return_average': return_average,
                       'return_std': return_std}
            self.report = self.report.append(new_row, ignore_index=True)
            
        self.gen_report_summary_row()
        print('[BACKTEST] Generated back test report')
        
    def gen_report_summary_row(self):
        new_row = {'strategy_name': 'summary',
                   'occurrence': self.report['occurrence'].sum(),
                   'num_profit': self.report['num_profit'].sum(),
                   'num_loss': self.report['num_loss'].sum(),
                   'return_average': self.trade_record['trade_return'].mean(),
                   'return_std': self.trade_record['trade_return'].std()}
        self.report = self.report.append(new_row, ignore_index=True)
    
    def run(self):
        self.initialize()
        self.data_preprocess()
        self.compute_technical_indicator()
        self.validate_indicator_data()
        self.back_test()
        self.gen_report()
        return

In [57]:
rule = Rule('basic', ['CCI'], lambda data, entry_i, i: data.at[i, 'CCI'] < -198)
open_position_param = OpenPositionParam(rules=[rule])

rule = Rule('basic', ['CCI'], lambda data, entry_i, i: data.at[i, 'CCI'] > 150)
close_position_param = ClosePositionParam(rules=[rule])

CCI_bull_strategy = Strategy('CCI_bull_strategy', open_position_param, close_position_param, BULLISH)

rule = Rule('basic', ['CCI'], lambda data, entry_i, i: data.at[i, 'CCI'] > 150)
open_position_param = OpenPositionParam(rules=[rule])

rule = Rule('basic', ['CCI'], lambda data, entry_i, i: data.at[i, 'CCI'] < -198)
close_position_param = ClosePositionParam(rules=[rule])

CCI_bear_strategy = Strategy('CCI_bear_strategy', open_position_param, close_position_param, BEARISH)


df = df = pd.read_csv('hsi.csv')
strategies = [CCI_bull_strategy, CCI_bear_strategy]
backtest = BackTest(df, strategies)
backtest.run()

[BACKTEST] Initializated
[CLEAN] Copy data (2020-10-12, open) to (2020-10-13, open))
[CLEAN] Copy data (2020-10-12, high) to (2020-10-13, high))
[CLEAN] Copy data (2020-10-12, low) to (2020-10-13, low))
[CLEAN] Copy data (2020-10-12, close) to (2020-10-13, close))
[CLEAN] Copy data (2020-10-12, adj_close) to (2020-10-13, adj_close))
[CLEAN] Copy data (2019-12-17, volume) to (2019-12-18, volume))
[CLEAN] Copy data (2020-02-25, volume) to (2020-02-26, volume))
[CLEAN] Copy data (2020-03-20, volume) to (2020-03-23, volume))
[CLEAN] Copy data (2020-05-05, volume) to (2020-05-06, volume))
[CLEAN] Copy data (2020-07-22, volume) to (2020-07-23, volume))
[CLEAN] Copy data (2020-10-12, volume) to (2020-10-13, volume))
[CLEAN] Copy data (2020-10-14, volume) to (2020-10-15, volume))
[BACKTEST] Finished data pre-process
[DATA] Finished computation of CCI
[BACKTEST] Finished computation on technical indicators
[BACKTEST] Validated availability of indicator data: {'CCI'}
[BACKTEST] Finished backtest

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


In [58]:
backtest.trade_record

Unnamed: 0,date,strategy_name,position,event,rule,price,trade_return
0,2020-03-09,CCI_bull_strategy,bullish,open_position,basic,25040.46094,
1,2020-07-06,CCI_bull_strategy,bullish,close_position,basic,26339.16016,0.051864
2,2019-12-13,CCI_bear_strategy,bearish,open_position,basic,27687.75977,
3,2020-03-09,CCI_bear_strategy,bearish,close_position,basic,25040.46094,0.095613
4,2020-07-06,CCI_bear_strategy,bearish,open_position,basic,26339.16016,


In [59]:
backtest.report

Unnamed: 0,strategy_name,occurrence,num_profit,num_loss,return_average,return_std
0,CCI_bull_strategy,1,1,0,0.051864,
1,CCI_bear_strategy,1,1,0,0.095613,
2,summary,2,2,0,0.073738,0.030935


In [113]:
df

Unnamed: 0,date,strategy_name,position,event,price,trade_return
0,2019-12-04,CCI_bull_strategy,bullish,open_position,26062.56055,
1,2019-12-12,CCI_bull_strategy,bullish,close_position,26994.14063,0.035744
2,2020-01-23,CCI_bull_strategy,bullish,open_position,27909.11914,
3,2020-04-14,CCI_bull_strategy,bullish,close_position,24435.40039,-0.124465
4,2020-05-22,CCI_bull_strategy,bullish,open_position,22930.14063,
5,2020-06-08,CCI_bull_strategy,bullish,close_position,24776.76953,0.080533
6,2020-09-04,CCI_bull_strategy,bullish,open_position,24695.44922,


In [115]:
df['trade_return'].std()

0.10777858143237284

In [38]:
s1 = set({'a', 'b'})
s2 = set({'c', 'b'})
s3 = ['a', 'd', 'c']

In [41]:
s1 - set(s3)

{'b'}