In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yfinance as yf
from datetime import datetime, timedelta
import backtrader as bt



In [2]:
def strat_decision(data, strat, threshold, safety_threshold, short_long_threshold, safety):
    """
    Take a stock data dictionary and a strategy with settings, calculate the strategy positions and add them to the
    data dictionary. Return the updated data dictionary.
    """
    #[data, strat, threshold, safety_threshold, short_long_threshold, safety] = args
    
    if strat=='SMA_MR':
        if safety==True:
            #SMA extension signal
            data['extension_position'] = np.nan
            data['extension_position'] = np.where((data['extension']<-threshold) & (data['extension']>-safety_threshold), 1, data['extension_position'])
            data['extension_position'] = np.where(data['extension']>-0.01, 0, data['extension_position'])
            data['extension_position'] = data['extension_position'].ffill().fillna(0)

            #Short-Long SMA signal
            data['short_long_position'] = np.nan
            data['short_long_position'] = np.where(data['SMA_short_long']<1-short_long_threshold, 1, data['short_long_position']) #adjusted threshold to 0.95 to leave a hold band around 1
            data['short_long_position'] = np.where(data['SMA_short_long']>1, 0, data['short_long_position'])
            data['short_long_position'] = data['short_long_position'].ffill().fillna(0)
            
            #Add signal to check if the market is in a mean-reverting regime (Ehlers/Kalman filters)
            
            #Add signal to check market volatility (see https://decodingmarkets.com/mean-reversion-trading-strategy/ step 7)
            
            #Set the position to 1 if both signal positions are 1. Else, set the position to 0.
            data['position'] = np.nan
            data['position'] = data['extension_position']*data['short_long_position']
        else:
            #SMA extension signal
            data['extension_position'] = np.nan
            #If extension is below our threshold, buy. Else, hold
            data['extension_position'] = np.where(data['extension']<-threshold, 1, data['extension_position'])
            #If we the extension is within 0.01 of a neutral value, sell. Else, hold
            data['extension_position'] = np.where(data['extension']>-threshold, 0, data['extension_position'])
            data['extension_position'] = data['extension_position'].ffill().fillna(0)

            #Short-Long SMA signal
            data['short_long_position'] = np.nan
            data['short_long_position'] = np.where(data['SMA_short_long']<1-short_long_threshold, 1, data['short_long_position'])
            data['short_long_position'] = np.where(data['SMA_short_long']>1, 0, data['short_long_position'])
            data['short_long_position'] = data['short_long_position'].ffill().fillna(0)

            #Set the position to 1 if both signal positions are 1. Else, set the position to 0.
            data['position'] = np.nan
            data['position'] = data['extension_position']*data['short_long_position']
    else:
        print('Please enter a valid strategy name. Currently supported strategies: "SMA_MR"')
    
    return data

In [3]:
def SMAMeanReversion_strat(strat_settings):
    """
    Evaluate a stock using two versions of SMA mean reversion strategy and make a buy/hold/sell decision
    
    Inputs
        ticker (str) - stock ticker
        sma (int) - rolling average window size, in days
        threshold (float) - mean reversion significance threshold, >0
        current_position (int) - our current position for this stock 
            1 means we own the stock, 0 means we do not
        safety (bool) - if True, activate safety latch
        safety_threshold (float) - safety latch threshold, >0
        short_term_sma (int) - window size for short sma
        long_term_sma (int) - window size for long sma
        short_long_threshold (float) - short_long ratio significance threshold
        tp_threshold (float) - percentage of expected profit to set take-profit order at, 0<x<1
        
        Must have safety_threshold > threshold
    Outputs
        decision (dataframe) - contains stock ticker, current extension, and strategy decision
        
    ticker, sma, threshold, current_position, safety=False, safety_threshold=0.25, short_term_sma=30, long_term_sma=90, short_long_threshold=0.05, tp_thresh=0.7
    """
    [ticker,sma,threshold,current_position,safety,safety_threshold,short_term_sma,long_term_sma,short_long_threshold,tp_threshold] = strat_settings
    
    strat = 'SMA_MR'
    
    #end_date = '2021-07-24'
    end_date = datetime.today().strftime('%Y-%m-%d')
    start_date = (datetime.today() - timedelta(days=2*long_term_sma)).strftime('%Y-%m-%d')
    
    #Can add a switch to use either yfinance or a different library. Only column we need is data['Close']
    #Get yahooFinance historical data
    yfObj = yf.Ticker(ticker)
    data = yfObj.history(start=start_date, end=end_date)
    
    #Calculate SMA and extension at the end of each day
    #current_price = yfObj.info['regularMarketPrice']
    data['SMA'] = data['Close'].rolling(sma).mean()
    data['extension'] = (data['Close'] - data['SMA']) / data['SMA']
    data['SMA_short_long'] = data['Close'].rolling(short_term_sma).mean() / data['Close'].rolling(long_term_sma).mean()
    
    #Add signal to check if the market is in a mean-reverting regime (Ehlers/Kalman filters)    
    #Add signal to check market volatility (see https://decodingmarkets.com/mean-reversion-trading-strategy/ step 7)
    
    #Calculate actual price and expected price based on average of SMA and extension
    ap = data['Close'][-1:] #Might replace with the instantaneous current price from the yfObj
    ext_ep = data['SMA'][-1:]
    sma_ep = data['Close'].rolling(long_term_sma).mean()[-1:]
    ep = (ext_ep + sma_ep)/2
    
    #Check the strategy criteria at the end of each day and adjust the position accordingly
    data = strat_decision(data, strat, threshold, safety_threshold, short_long_threshold, safety)

    #Create a combined criteria that requires both the extension and the short_long ratio to agree in order to make a move
    """
    if (data['extension'][-1:].values<-threshold) & (data['SMA_short_long'][-1:].values<1-short_long_threshold) & (current_position == 0):
        print('signal 1 current 0')
        movement = 'Buy'
        #take-profit threshold
        tp_price = ap + tp_threshold*(ep-ap)
    else:
        print('signal 0 current 0')
        movement = 'Hold'
        tp_price = 0
    #Could make this an OR statement to conservatively sell if either signal is triggered
    if (data['extension'][-1:].values>-threshold) & (data['SMA_short_long'][-1:].values>1) & (current_position == 1):
        print('signal 0 current 1')
        movement = 'Sell'
        tp_price = 0
    else:
        if movement != 'Buy':
            print('signal 1 current 1')
            movement = 'Hold'
            tp_price = ap + tp_threshold*(ep-ap)
    """
    if (current_position == 0):
        if (data['extension'][-1:].values<-threshold) & (data['SMA_short_long'][-1:].values<1-short_long_threshold):
            #print('signal 1 current 0')
            movement = 'Buy'
            #take-profit threshold
            tp_price = ap + tp_threshold*(ep-ap)
        else:
            #print('signal 0 current 0')
            movement = 'Hold'
            tp_price = 0
    elif (current_position == 1):
        #Could make this an OR statement to conservatively sell if either signal is triggered
        if (data['extension'][-1:].values>-threshold) & (data['SMA_short_long'][-1:].values>1):
            #print('signal 0 current 1')
            movement = 'Sell'
            tp_price = 0
        else:
            #print('signal 1 current 1')
            movement = 'Hold'
            tp_price = ap + tp_threshold*(ep-ap)
            
    #could implement a take-profit order where we sell when the price reaches the expected price (or a few percent below expected)
    
    #Get the extension & decision for this stock today and store it in a df
    decision_dict = {}
    decision_dict['ticker'] = ticker
    #decision_dict['live_price'] = yfObj.info['regularMarketPrice'] #comment out to improve speed
    decision_dict['latest_close_price'] = data['Close'][-1:]
    decision_dict['expected_price'] = ep
    decision_dict['extension'] = data['extension'][-1:]
    decision_dict['extension_position'] = data['extension_position'][-1:]
    decision_dict['short_long ratio'] = data['SMA_short_long'][-1:]
    decision_dict['short_long_position'] = data['short_long_position'][-1:]
    decision_dict['movement'] = movement
    decision_dict['take_profit_price'] = tp_price
    decision_dict['position'] = data['position'][-1:]
    decision = pd.DataFrame(decision_dict).round(3)
    
    return decision, data

In [4]:
def stratBacktest(strat_settings):
    """
    Perform a backtest of a specified strategy for a given ticker
    
    Inputs
        ticker (str) - stock ticker
        sma (int) - rolling average window size, in days
        threshold (float) - mean reversion significance threshold
        start_date (str)
        end_date (str)
    Outputs
        data (dataframe) - contains strategy returns and statistics data
    """
    [ticker, strat, sma, threshold, safety, safety_threshold, short_term_sma, long_term_sma, short_long_threshold, start_date,end_date] = strat_settings
    
    #Can add a switch to use either yfinance or a different library. Only column we need is data['Close']
    #Get yahooFinance historical data
    yfObj = yf.Ticker(ticker)
    warmup_date = (datetime.strptime(start_date, '%Y-%m-%d') - timedelta(days=2*long_term_sma)).strftime('%Y-%m-%d')
    data = yfObj.history(start=warmup_date, end=end_date)
    
    #Calculate SMA and extension at the end of each day
    data['SMA'] = data['Close'].rolling(int(sma)).mean()
    data['extension'] = (data['Close'] - data['SMA']) / data['SMA']
    data['SMA_short_long'] = data['Close'].rolling(int(short_term_sma)).mean() / data['Close'].rolling(int(long_term_sma)).mean()
    
    #Check the strategy criteria at the end of each day and adjust the position accordingly
    data = strat_decision(data, strat, threshold, safety_threshold, short_long_threshold, safety)
    
    #Get data from first valid day, i.e. first day where NaN due to warm-up doesn't occur
    first_valid_day = data['SMA_short_long'].first_valid_index()
    
    #Calculate returns and statistics
    data['returns'] = data['Close'] / data['Close'].shift(1)
    #data['returns'] = returns/return_normalization #Normalize the buy&hold returns for warm-up
    data['log_returns'] = np.log(data['returns'])
    data['strat_returns'] = data['position'].shift(1) * data['returns']
    data['strat_log_returns'] = data['position'].shift(1) * data['log_returns']
    #Normalize buy&hold returns for warm-up
    cum_returns = np.exp(data['log_returns'].cumsum())
    cum_returns_normalization = cum_returns.loc[first_valid_day]
    data['cum_returns'] = cum_returns/cum_returns_normalization

    data['strat_cum_returns'] = np.exp(data['strat_log_returns'].cumsum())
    data['peak'] = data['cum_returns'].cummax()
    data['strat_peak'] = data['strat_cum_returns'].cummax()    
    
    return data.dropna()

In [5]:
def getStratStats(data, verbose=False, risk_free_rate=0.02):
    sma_strat, buy_hold_strat = {}, {}
    
    #Total Returns
    sma_strat['tot_returns'] = np.exp(data['strat_log_returns'].sum()) - 1
    buy_hold_strat['tot_returns'] = np.exp(data['log_returns'].sum()) - 1

    #Mean Annual Returns
    sma_strat['annual_returns'] = np.exp(data['strat_log_returns'].mean()*252) - 1
    buy_hold_strat['annual_returns'] = np.exp(data['log_returns'].mean()*252) - 1

    #Annual Volatility
    sma_strat['annual_volatility'] = data['strat_log_returns'].std() * np.sqrt(252)
    buy_hold_strat['annual_volatility'] = data['log_returns'].std() * np.sqrt(252)
    
    #Sharpe Ratio
    #sma_strat['sharpe_ratio'] = (sma_strat['annual_returns'] - risk_free_rate) / sma_strat['annual_volatility']
    #buy_hold_strat['sharpe_ratio'] = (buy_hold_strat['annual_returns'] - risk_free_rate) / buy_hold_strat['annual_volatility']
    
    #Max Drawdown
    _strat_dd = data['strat_peak'] - data['strat_cum_returns']
    _buy_hold_dd = data['peak'] - data['cum_returns']
    sma_strat['max_drawdown'] = _strat_dd.max()
    buy_hold_strat['max_drawdown'] = _buy_hold_dd.max()
    
    try:
        #Error in gridsearch occurs somewhere in this block
        #Max Drawdown Duration
        strat_dd = _strat_dd[_strat_dd==0]
        strat_dd_diff = strat_dd.index[1:] - strat_dd.index[:-1]
        strat_dd_days = strat_dd_diff.map(lambda x: x.days).values
        strat_dd_days = np.hstack([strat_dd_days, (_strat_dd.index[-1] - strat_dd.index[-1]).days])

        sma_strat['max_drawdown_duration'] = strat_dd_days.max()
    except:
        if verbose:
            print('IndexError occured due to no drawdown occurences in strategy backtest')
        sma_strat['max_drawdown_duration'] = 0.0
        #probably have same error of no drawdown occurence, and need to set sma_strat['max_drawdown_duration'] = 0.0

    try:
        buy_hold_dd = _buy_hold_dd[_buy_hold_dd==0]
        buy_hold_dd_diff = buy_hold_dd.index[1:] - buy_hold_dd.index[:-1]
        buy_hold_dd_days = buy_hold_dd_diff.map(lambda x: x.days).values
        buy_hold_dd_days = np.hstack([buy_hold_dd_days, (_buy_hold_dd.index[-1] - buy_hold_dd.index[-1]).days])
        #Calculate max drawdown duration as largest difference between reaching a peak value and then coming back to that value after falling
        buy_hold_strat['max_drawdown_duration'] = buy_hold_dd_days.max()
    except IndexError:
        if verbose:
            print('IndexError occured due to no drawdown occurences in buy&hold backtest')
        buy_hold_strat['max_drawdown_duration'] = 0.0
    
    stats_dict = {'strat_stats': sma_strat, 'base_stats': buy_hold_strat}

    return stats_dict

In [6]:
def portfolio_DrawdownStats(data): 
    """
    Calculate the max drawdown and max drawdown duration of a portfolio 
    given the cumulative returns df 
    """
    
    sma_strat, buy_hold_strat = {}, {}
    
    #Get the peak values
    data['peak'] = data['cum_returns'].cummax()
    data['strat_peak'] = data['strat_cum_returns'].cummax()
    
    #Max Drawdown
    _strat_dd = data['strat_peak'] - data['strat_cum_returns']
    _buy_hold_dd = data['peak'] - data['cum_returns']
    sma_strat['max_drawdown'] = _strat_dd.max()
    buy_hold_strat['max_drawdown'] = _buy_hold_dd.max()
    
    #Max Drawdown Duration
    strat_dd = _strat_dd[_strat_dd==0]
    strat_dd_diff = strat_dd.index[1:] - strat_dd.index[:-1]
    strat_dd_days = strat_dd_diff.map(lambda x: x.days).values
    strat_dd_days = np.hstack([strat_dd_days, (_strat_dd.index[-1] - strat_dd.index[-1]).days])
    
    sma_strat['max_drawdown_duration'] = strat_dd_days.max()
    
    try:
        buy_hold_dd = _buy_hold_dd[_buy_hold_dd==0]
        buy_hold_dd_diff = buy_hold_dd.index[1:] - buy_hold_dd.index[:-1]
        buy_hold_dd_days = buy_hold_dd_diff.map(lambda x: x.days).values
        buy_hold_dd_days = np.hstack([buy_hold_dd_days, (_buy_hold_dd.index[-1] - buy_hold_dd.index[-1]).days])
        #Calculate max drawdown duration as largest difference between reaching a peak value and then coming back to that value after falling
        buy_hold_strat['max_drawdown_duration'] = buy_hold_dd_days.max()
    except IndexError:
        if verbose:
            print('IndexError occured due to no drawdown occurences')
        buy_hold_strat['max_drawdown_duration'] = 0.0
    
    dd_dict = {'strat_stats': sma_strat, 'base_stats': buy_hold_strat}
    
    return dd_dict

In [7]:
def applyPortfolioStrat(strat_settings):
    """
    Take a strategy function and apply it to a list of tickers and current positions.
    Return the decisions in a dataframe.
    
    Should eventually package all of the strategy settings into a strat_settings array variable to make this 
    easier to change, more readable, and robust enough to use different strat_func's with non-identical inputs. 
    Will pack the settings before using applyPortfolioStrat and then unpack inside of strat_func.
    
    applyPortfolioStrat(strat_func, strat_settings, tickers, current_positions)
    SMAMeanReversion_strat(strat_settings, ticker, current_position)
    
    strat_func, tickers, current_positions, SMA, threshold, safety=False, safety_threshold=0.25, short_term_sma=30, long_term_sma=90, short_long_threshold=0.05
    """
    #Unpack strat_settings
    [strat_func, tickers, current_positions, SMA, threshold, safety, safety_threshold, short_term_sma, long_term_sma, short_long_threshold, tp_threshold] = strat_settings
    
    #Apply the strategy to each ticker in the portfolio and append the decisions
    strat_setting_0 = [tickers[0],SMA,threshold,current_positions[0],safety,safety_threshold,short_term_sma,long_term_sma,short_long_threshold,tp_threshold]
    decisions, _ = strat_func(strat_setting_0)
    if len(tickers)>1:
        for i in range(1,len(tickers)):
            strat_setting_i = [tickers[i],SMA,threshold,current_positions[i],safety,safety_threshold,short_term_sma,long_term_sma,short_long_threshold,tp_threshold]
            decision, _ = strat_func(strat_setting_i)
            decisions = decisions.append(decision)
    return decisions

Add some styling to the decisions dataframe output table

In [8]:
def color_code_ext(value):
    if value<0:
        color='green'
    elif value>0:
        color='red'
    else:
        color='black'
    return 'color: %s' % color

def color_code_sl(value):
    if value>1:
        color='red'
    elif value<1:
        color='green'
    else:
        color='black'
    return 'color: %s' % color

def color_code_m(value):
    if value=='Buy':
        color='green'
    elif value=='Sell':
        color='red'
    else:
        color='blue'
    return 'color: %s' % color

In [9]:
def portfolio_dashboard(decisions):
    """
    Print out a dashboard displaying the strategy decisions and statistics for each ticker in a portfolio
    """
    decisions_dash = decisions.reset_index(drop=True).style.format(precision=3).applymap(color_code_ext, subset=['extension']).applymap(color_code_sl, subset=['short_long ratio']).applymap(color_code_m, subset=['movement'])
    display(decisions_dash)

Write a function that applies the backtest to a portfolio containing a list of stocks

In [10]:
def portfolioBacktest(strat_settings):
        """
        Apply the backtest function to every ticker in a list. Create a dictionary containing
        the running cumulative returns data of each ticker, and a dictionary containing the 
        statistical performance of each ticker.
        
        Need to handle the errors caused by:
            - stock didn't exist at the beginning of given time frame (caused by max drawdown length)
            - divide by zero error (caused by sharpe ratio)
            - index -1 out of range for array of size 0 (caused by max drawdown length)
            Update: all 3 of these errors seem to be fixed by removing sharpe ratio and adding a try-except for 
            stocks which don't experience buy&hold drawdown in the post-burn-in period. 
        """
        #tickers, strat, SMA, threshold, safety=True, safety_threshold=0.25, short_term_sma=30, long_term_sma=90, short_long_threshold=0.05, start_date='2000-01-01',end_date='2020-12-31', verbose=False
        #Unpack args
        [tickers, strat, SMA, threshold, safety, safety_threshold, short_term_sma, long_term_sma, short_long_threshold, start_date, end_date, verbose] = strat_settings
        
        #Store the returns and stats of each ticker in the portfolio
        portfolio_returns = {}
        portfolio_stats = {}
        for i in range(0,len(tickers)):
            if verbose:
                print('Backtesting', tickers[i])
            #Get the backtest returns and stats for ticker i
            strat_settings_i = [tickers[i], strat, SMA, threshold, safety, safety_threshold, short_term_sma, long_term_sma, short_long_threshold, start_date,end_date]
            data_df = stratBacktest(strat_settings_i)
            #Get data from first valid day, i.e. first day where NaN due to warm-up doesn't occur
            first_valid_day = data_df['SMA_short_long'].first_valid_index()
            if i==0:
                start_day = first_valid_day
            if i>0:
                #Check whether this is the latest first_valid_day value so far; If so, update it
                if first_valid_day.to_pydatetime()>start_day.to_pydatetime():
                    start_day = first_valid_day
            
        for i in range(0,len(tickers)):
            #Clip the data so that we evaluate performance from the first day that the strategy is warmed up and starts making decisions
            data_clipped = data_df[start_day:]
            stats_df = pd.DataFrame(getStratStats(data_clipped, verbose)).round(3)
            returns_df = data_clipped[['cum_returns','strat_cum_returns']]

            #Store the returns and stats of the ticker
            portfolio_returns[tickers[i]] = returns_df
            portfolio_stats[tickers[i]] = stats_df
        
        #Sum the returns and stats across the portfolio
        portfolio_sum_returns, portfolio_sum_stats = sum_metrics(tickers, portfolio_returns, portfolio_stats)
        
        return portfolio_sum_returns, portfolio_sum_stats

Write a function to take the portfolio_returns dictionary and sum the overall returns of the portfolio, then plot strat vs buy and hold

In [11]:
def sum_metrics(tickers, portfolio_returns, portfolio_stats):
    
    #Get the returns and stats from the first ticker
    portfolio_sum_returns = portfolio_returns[tickers[0]]
    portfolio_sum_stats = portfolio_stats[tickers[0]]
    #For each remaining ticker, add the returns and stats to the portfolio total sum
    if len(tickers)>1:
        for i in range(1,len(tickers)):
            """
            #Add the returns to the portfolio sum, handling differing start dates nan error
            #This fix caused another problem since a jump in cum_return occurs when a new ticker is introduced. Could
            #fix this with a normalization that divides the return by the total number of tickers active on a daily 
            #basis instead of the total number of tickers given.
            #For now I'll just stick with the start time being restricted by the newest ticker since I have more confidence in the results
            if portfolio_sum_returns.first_valid_index() == portfolio_returns[tickers[i]].first_valid_index():
                print(portfolio_sum_returns.first_valid_index())
                print(portfolio_returns[tickers[i]].first_valid_index())
                portfolio_sum_returns = portfolio_sum_returns + portfolio_returns[tickers[i]]
            else:
                print(portfolio_sum_returns.first_valid_index())
                print(portfolio_returns[tickers[i]].first_valid_index())
                #Leave values unchanged for rows where tickers[i] has nan values
                start_row = portfolio_sum_returns.first_valid_index()
                end_row = portfolio_returns[tickers[i]].first_valid_index()
                portfolio_sum_returns.loc[start_row:end_row,'cum_returns'][:-1] = portfolio_sum_returns.loc[start_row:end_row,'cum_returns'][:-1]
                portfolio_sum_returns.loc[start_row:end_row,'strat_cum_returns'][:-1] = portfolio_sum_returns.loc[start_row:end_row,'strat_cum_returns'][:-1]
                #Add the returns for rows where tickers[i] has real values
                print(portfolio_returns[tickers[i]]['cum_returns'])
                portfolio_sum_returns.loc[end_row:,'cum_returns'] = portfolio_sum_returns.loc[end_row:,'cum_returns'] + portfolio_returns[tickers[i]]['cum_returns']
            """
            portfolio_sum_returns = portfolio_sum_returns + portfolio_returns[tickers[i]]
            #Add the stats to the portfolio sum    
            portfolio_sum_stats = portfolio_sum_stats + portfolio_stats[tickers[i]]
    #Re-normalize the portfolio total sum
    portfolio_sum_returns = portfolio_sum_returns/len(tickers)
    #Re-normalize the portfolio return & volatility stats
    portfolio_sum_stats.iloc[0:3] = portfolio_sum_stats.iloc[0:3]/len(tickers)
    #Calculate the portfolio drawdown stats
    dd_stats = portfolio_DrawdownStats(portfolio_sum_returns)
    portfolio_sum_stats.iloc[3][0] = dd_stats['strat_stats']['max_drawdown']
    portfolio_sum_stats.iloc[4][0] = dd_stats['strat_stats']['max_drawdown_duration']
    portfolio_sum_stats.iloc[3][1] = dd_stats['base_stats']['max_drawdown']
    portfolio_sum_stats.iloc[4][1] = dd_stats['base_stats']['max_drawdown_duration']
    
    return portfolio_sum_returns, portfolio_sum_stats

In [12]:
def plot_portfolio_backtest(portfolio_returns):
    """
    There is an issue here where MRT returns only start after the averaging burn-in, while buy&hold starts 
    immediately, so the buy&hold returns are given a head start. This should be resolved when I add 
    a pre-burn-in period in stratBacktest.
    """
    #Normalize returns to the first valid day
    first_valid_day = portfolio_returns.first_valid_index()
    returns_normalized = portfolio_returns/portfolio_returns.loc[first_valid_day]

    fig, ax = plt.subplots(figsize=(12, 5))
    ax.plot(returns_normalized['strat_cum_returns'], label='Mean Reversion Strategy with Safety and Ensemble')
    ax.plot(returns_normalized['cum_returns'], label='Buy and Hold Strategy')
    ax.set_xlabel('Date')
    ax.set_ylabel('Returns (%)')
    ax.set_title('Cumulative Returns for Mean Reversion and Buy and Hold Strategies')

    ax.legend()
    plt.show()

Write a function that applies the portfolio backtest for each point on a grid of hyperparameters and returns a plot of each of the major strategy metrics (annual returns, volatility, max drawdown). This will probably be a corner plot where I can see the effect of each parameter and pick the desired min/max location.

Could start with just annual returns to avoid multi-objective optimization. Seems like a good metric to use.

Note: sharpe ratio and max_drawdown_duration both cause errors, and probably aren't the most relevant metrics anyways, so I won't use them.

Possibly use MCMC or another optimization algorithm for this?

In [2]:
def annual_return_diff(portfolio_stats):
    #Calculate annual return difference of the portfolio
    strat_annual_returns = portfolio_stats['strat_stats']['annual_returns']
    base_annual_returns = portfolio_stats['base_stats']['annual_returns']
    annual_return_diff = strat_annual_returns - base_annual_returns
    return annual_return_diff

def annual_return_ratio(portfolio_stats):
    #Calculate annual return difference of the portfolio
    strat_annual_returns = portfolio_stats['strat_stats']['annual_returns']
    base_annual_returns = portfolio_stats['base_stats']['annual_returns']
    annual_return_ratio = strat_annual_returns/base_annual_returns
    return annual_return_ratio

In [13]:
def hyperparam_gridsearch(tickers, N):
    """
    Perform a gridsearch to find the hyperparameters that optimize strategy performance
    
    tickers - array of stocks/cryptos to consider in our portfolio
    N - number of gridpoints to search in each hyperparameter
    """

    #Set strategy and dates
    start_date = '2018-05-01'
    end_date = '2021-10-17'
    strat='SMA_MR'
    #Keep these set for now. Might try comparing with & without the safety 
    verbose = False
    safety = True
    
    #Hyperparam bounds
    SMAb = [2,200]
    thresholdb = [0.01,1.0]
    safety_thresholdb = [0.01,1.0]
    short_term_smab = [2,90]
    long_term_smab = [10,200]
    short_long_thresholdb = [0.01,1.0]
    
    #Hyperparam vectors
    SMAv = np.linspace(SMAb[0],SMAb[1],N)
    thresholdv = np.linspace(thresholdb[0],thresholdb[1],N)
    safety_thresholdv = np.linspace(safety_thresholdb[0],safety_thresholdb[1],N)
    short_term_smav = np.linspace(short_term_smab[0],short_term_smab[1],N)
    long_term_smav = np.linspace(long_term_smab[0],long_term_smab[1],N)
    short_long_thresholdv = np.linspace(short_long_thresholdb[0],short_long_thresholdb[1],N)
    
    #Save the parameter vectors
    param_vecs = [SMAv, thresholdv, safety_thresholdv, short_term_smav, long_term_smav, short_long_thresholdv]
    #cols = ['SMA', 'threshold', 'safety_threshold', 'short_term_sma', 'long_term_sma', 'short_long_threshold']
    #params_df = pd.DataFrame(vecs, columns=cols) #ValueError: 6 columns passed, passed data had 3 columns
    
    #Initialize storage for statistic evaluated on grid
    stat_grid = np.empty([N, N, N, N, N, N])
    
    #Not sure if I can eventually vectorize this?
    #SMAg, thresholdg, safety_thresholdg, short_term_smag, long_term_smag, short_long_thresholdg = np.meshgrid(SMAv, thresholdv, safety_thresholdv, short_term_smav, long_term_smav, short_long_thresholdv)
    
    npts = N**6
    print('Performing hyperparameter gridsearch. Grid size=',npts)
    count = 0
    for i1 in range(0,N):
        for i2 in range(0,N):
            for i3 in range(0,N):
                print(count,' of ',npts,' gridpoints')
                for i4 in range(0,N):
                    for i5 in range(0,N):
                        for i6 in range(0,N):
                            count = count+1
                            #Get the hyperparams for this gridpoint
                            SMA = SMAv[i1]
                            threshold = thresholdv[i2]
                            safety_threshold = safety_thresholdv[i3]
                            short_term_sma = short_term_smav[i4]
                            long_term_sma = long_term_smav[i5]
                            short_long_threshold = short_long_thresholdv[i6]
                            #Check that this gridpoint meets our constraints. If yes, evaluate the model.
                            """
                            Constraints: 
                            safety_threshold > threshold
                            long_term_sma > short_term_sma
                            """
                            if (safety_threshold>threshold) and (long_term_sma>short_term_sma):
                                strat_settings = [tickers, strat, SMA, threshold, safety, safety_threshold, short_term_sma, long_term_sma, short_long_threshold, start_date, end_date, verbose]
                                
                                #Run the backtest on the entire portfolio
                                portfolio_returns, portfolio_stats = portfolioBacktest(strat_settings)
                                #Calculate annual return ratio of the portfolio
                                strat_annual_returns = portfolio_stats['strat_stats']['annual_returns']
                                base_annual_returns = portfolio_stats['base_stats']['annual_returns']
                                #annual_return_ratio = strat_annual_returns/base_annual_returns
                                annual_return = strat_annual_returns - base_annual_returns
                                
                                #Store the result
                                stat_grid[i1,i2,i3,i4,i5,i6] = annual_return
                                
    print('Gridsearch completed')
    return stat_grid, param_vecs

In [14]:
def clean_stat_grid(stat_grid, min_val=1e-5, max_val=1e2, neg_val=5):
    """
    Remove unreasonably large, small, and negative values from the gridsearch results
    """
    stat_grid = np.where(stat_grid > max_val, 0, stat_grid) #Drop unreasonably large values
    stat_grid = np.where(np.abs(stat_grid) < min_val, 0, stat_grid) #Drop unrealistically small values
    stat_grid = np.where(stat_grid < -neg_val, 0, stat_grid) #Drop negative values below a certain unreasonable cutoff
    #Also should remove nan and replace with 0
    
    return stat_grid

In [16]:
def save_gridsearch(param_vecs, stat_grid, N):
    """
    Save the results of a gridsearch to two CSV files
    """
    print('Saving Gridsearch Results...')
    
    #Reshape stat_grid to vector
    npts = N**6
    stat_vec = np.reshape(stat_grid, npts)

    #Save parameter vectors to csv
    param_name = 'GridsearchData/param_vecs_N{}.csv'.format(N)
    np.savetxt(param_name, param_vecs, delimiter =", ", fmt ='% s')
    #Save gridsearch output statistic vector to csv, with a header giving the array length so we can reshape it when loading it back
    head = str(N)
    stat_name = 'GridsearchData/stat_vec_N{}.csv'.format(N)
    np.savetxt(stat_name, stat_vec, delimiter =", ", header=head, fmt ='% s')
    print('Results Saved')

In [17]:
def load_gridsearch(N):
    """
    Load the results of a gridsearch from two CSV files
    """
    #Load parameter vectors
    param_name = 'GridsearchData/param_vecs_N{}.csv'.format(N)
    param_vecs = np.loadtxt(param_name, delimiter =", ") #Need to check if this is the right usage
    #Load output statistic vector
    stat_name = 'GridsearchData/stat_vec_N{}.csv'.format(N)
    stat_vec = np.loadtxt(stat_name, delimiter =", ")
    #Recover original 6D stat_grid
    stat_grid = np.reshape(stat_vec, [N, N, N, N, N, N])
    
    return param_vecs, stat_grid

In [3]:
def write_3d_nparray(filename, stat_grid):
    # Write the array to disk
    with open(filename, 'w') as outfile:
        # I'm writing a header here just for the sake of readability
        # Any line starting with "#" will be ignored by numpy.loadtxt
        outfile.write('# Array shape: {0}\n'.format(stat_grid.shape))

        # Iterating through a ndimensional array produces slices along
        # the last axis. This is equivalent to data[i,:,:] in this case
        for data_slice in stat_grid:

            # The formatting string indicates that I'm writing out
            # the values in left-justified columns 7 characters in width
            # with 2 decimal places.  
            np.savetxt(outfile, data_slice) #, fmt='%-7.2f'

            # Writing out a break to indicate different slices...
            outfile.write('# New slice\n')
        print(filename,'saved')
        
def load_3d_nparray(stat_name, N):
    # Read the array from disk
    data = np.loadtxt(stat_name)
    # Convert back to original 3D array shape
    data = data.reshape((N,N,N))
    
    return data

In [1]:
def plot_gridsearch_results(stat_grid, param_vecs, N):
    cols = ['SMA', 'threshold', 'safety_threshold', 'short_term_sma', 'long_term_sma', 'short_long_threshold']
    arr = np.array([0,1,2,3,4,5]) #List of dimensions
    stat_vecs = np.empty([6,N])

    for i in range(0,6):
        arr = arr[arr != i] #Specify which dimension to drop leave out of the averaging
        avg=np.apply_over_axes(np.nanmean, stat_grid, arr) #Average over all axes except the specified axis
        #Collapse extra dimensions
        if i==0:
            avg_flat = avg[:,0,0,0,0,0]
        if i==1:
            avg_flat = avg[0,:,0,0,0,0]
        if i==2:
            avg_flat = avg[0,0,:,0,0,0]
        if i==3:
            avg_flat = avg[0,0,0,:,0,0]
        if i==4:
            avg_flat = avg[0,0,0,0,:,0]
        if i==5:
            avg_flat = avg[0,0,0,0,0,:]
        stat_vecs[i,:] = avg_flat

        #Plot the averaged results
        plt.figure()
        plt.plot(param_vecs[i], stat_vecs[i],'o')
        plt.xlabel(cols[i])
        plt.ylabel('Annual Return Difference (Strat - B&H)')
    
    return stat_vecs

In [3]:
def addPortfolioToFeed(tickers, cerebro, start_date, end_date):
    
    for t in tickers:
        data = bt.feeds.PandasData(dataname=yf.download(t, start_date, end_date))
        #data.plotinfo.plotmaster = data
        cerebro.adddata(data, name = t)