In [7]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import quantstats as qs
import statistics as st
from datetime import datetime, timedelta
from matplotlib.colors import DivergingNorm
from scipy.signal import convolve2d
import import_ipynb
from IPython.core.display import HTML
HTML("""<style>.output_png img {display: block;margin-left: auto;margin-right: auto;} </style>""")

In [None]:
qs.extend_pandas()
plt.rcParams['figure.figsize'] = (26,12)
plt.rcParams['text.color'] = 'w'
plt.rcParams['legend.facecolor'] = '#2f3540'
plt.rcParams['xtick.color'] = 'w'
plt.rcParams['ytick.color'] = 'w'
plt.rcParams['axes.labelcolor'] = 'w'

SMALL_SIZE = 13
MEDIUM_SIZE = 15
BIG_SIZE = 17
plt.rcParams['font.size'] = SMALL_SIZE
plt.rcParams['axes.titlesize'] = BIG_SIZE
plt.rcParams['axes.labelsize'] = MEDIUM_SIZE
plt.rcParams['xtick.labelsize'] = MEDIUM_SIZE
plt.rcParams['ytick.labelsize'] = MEDIUM_SIZE
plt.rcParams['legend.fontsize'] = SMALL_SIZE
plt.rcParams['figure.titlesize'] = BIG_SIZE

np.set_printoptions(edgeitems=10, linewidth=1000) 

In [13]:
# Load DF with SP500 data
def get_sp500_data(start_date="1970-01-02", from_local_file=True, save_to_file=False):
    if from_local_file == True:
        data = pd.read_pickle('data/SP500_hist_data.pkl')
        return data
    else:
        # Download data from yfinance
        data = yf.download("^GSPC", auto_adjust=True, start=start_date)
        if save_to_file == True:
            data.to_pickle("data/SP500_hist_data.pkl")
        return data

In [13]:
# Define if the strategy position (1=fast_ma higher than slow_ma, -1=short)
def get_strategy_position(df, fast_ma=1, slow_ma=1):
    full_df = get_sp500_data()
    df['fast_ma'] = full_df['Close'].rolling(window=fast_ma).mean()
    df['slow_ma'] = full_df['Close'].rolling(window=slow_ma).mean()
    df['fast-slow'] = df['fast_ma'] - df['slow_ma']
    return np.where(df['fast-slow'] < 0, -1, 1)

In [15]:
def print_periods(IS_start_years, IS_end_years, OOS_start_years, OOS_end_years):
    print("\tIn SAMPLE\t\tOOS")
    for iss, ie, oi, oe in zip(IS_start_years, IS_end_years, OOS_start_years, OOS_end_years):
        print("{:%Y-%m-%d} {:%Y-%m-%d} \t {:%Y-%m-%d} {:%Y-%m-%d}".format(iss, ie, oi, oe))

In [42]:
def print_backtest_stats(df, fast_ma, slow_ma, ret_strat=np.nan, sr_strat=np.nan):
    ini_equity = 100
    if 'Market_cum_ret' not in df.columns:
        df['Market_cum_ret'] = df['Market_daily_ret'].add(1).cumprod().mul(ini_equity)
        df.loc[df.index.values[0], 'Market_cum_ret'] = ini_equity + df.loc[first_row, 'Market_daily_ret'] * 100
        
    ret_market = (df.loc[df.index.values[-1], 'Market_cum_ret'] / ini_equity) * 100
    sr_market = df['Market_cum_ret'].sharpe()
    
    print("\tPeriod: {:%Y-%m-%d} to {:%Y-%m-%d}".format(df.index[0], df.index[-1]))
    print("\tOverall return of SP500: {:.2f} %. SR of SP500: {:.2f}".format(ret_market, sr_market))
    if fast_ma >= slow_ma:
        print("\tOverall return of long only: {:.2f} %. Sharpe ratio strategy: {:.2f}".format(ret_strat, sr_strat))
    else:
        print("\tOverall return of {}-{} MA crossover: {:.2f} %. Sharpe ratio strategy: {:.2f}".format(fast_ma, slow_ma, ret_strat, sr_strat))

    return

In [40]:
def backtest_ma_strat(df, fast_ma, slow_ma, ini_equity=100, commision=0.001):
    """
    backtest_ma_strat does the backtest of an MA crossover strategy. It adds the following columns to the received dataframe:
    - Strat_position: Position of the strategy on the index. 1:long, -1:short
    - Market_daily_ret: daily returns of the benchmark
    - Strat_daily_ret: daily returns of the strategy
    - Market_cum_ret: daily cummulative returns of the benchmark
    - Strat_cum_ret: daily cummulative returns of the strategy

    Args:  
        df (DataFrame): df with 'Close' returns of the benchmark
        fast_ma (int): fast moving average
        slow_ma (int): slow moving average
        ini_equity (int): initial equity to be invested in the benchmark and strategy. 100 by default

    Returns
        df['Strat_daily_ret'] (pd.Series): Daily returns of the strategy
        ret_strat (float): final value of the investment on the strategy
        sr_strat (float): Sharpe Ratio of the strategy
    """
    if (fast_ma >= slow_ma):
        df['Strat_position'] = 1
    else:
        df['Strat_position'] = get_strategy_position(df.copy(), fast_ma, slow_ma)

    # Create columns daily returns of market and strategy. In percentage. Also cummulative returns starting with an equity of 100$
    df['Strat_daily_ret'] = df['Market_daily_ret'].mul(
        df['Strat_position'].shift(1).fillna(method='bfill', limit=1))

    if commision > 0:
        df['Strat_daily_ret'].where(df['Strat_position'] == df['Strat_position'].shift(1).fillna(method='bfill', limit=1),
                                    other=df['Strat_daily_ret'].sub(commision),
                                    inplace=True)

    first_day = df.index.values[0]
    df['Market_cum_ret'] = df['Market_daily_ret'].add(1).cumprod().mul(ini_equity)
    df['Strat_cum_ret'] = df['Strat_daily_ret'].add(1).cumprod().mul(ini_equity)
    df.loc[first_day, 'Market_cum_ret'] = ini_equity + df.loc[first_day, 'Market_daily_ret'] * ini_equity
    df.loc[first_day, 'Strat_cum_ret'] = ini_equity + df.loc[first_day,'Market_daily_ret'] * df.loc[first_day, 'Strat_position'] * ini_equity

    ret_strat = df['Strat_cum_ret'][-1]
    sr_strat = df['Strat_cum_ret'].sharpe()

    return df['Strat_daily_ret'], ret_strat, sr_strat

In [20]:
def run_all_combinations(df, fast_ma, slow_ma):
    """
    Runs a backtest with all possible combinations and returns 2 matrices, one with pnl results, and one with SR
    """
    results_pnl = np.zeros((len(fast_ma),len(slow_ma)))
    results_sharpe = np.zeros((len(fast_ma),len(slow_ma)))
    
    _, pnl_SP, sharpe_SP = backtest_ma_strat(in_sample, fast_ma=0, slow_ma=0)
    
    for i, fast in enumerate(fast_ma):
        for j, slow in enumerate(slow_ma):
            if fast < slow:
                _, pnl, sharpe = backtest_ma_strat(in_sample, fast, slow)
                results_pnl[i,j] = pnl
                results_sharpe[i,j] = sharpe
            else:
                results_pnl[i,j] = pnl_SP
                results_sharpe[i,j] = sharpe_SP
                
    results_pnl = np.round(results_pnl, 3)    
    results_sharpe = np.round(results_sharpe, 3) 
    
    return results_pnl, results_sharpe

In [18]:
def get_best_combination(results_sharpe):
    n = len(results_sharpe)
    
    sharpe_neighbors = convolve2d(results_sharpe, np.ones((3,3)),'same') - results_sharpe
    n_neighbors = np.full((n,n), 8)
    n_neighbors[[0,n-1], :] = n_neighbors[:, [0,n-1]] = 5
    n_neighbors[[0,n-1], [0,n-1]] = n_neighbors[[n-1,0], [0,n-1]] = 3
    results_sharpe_neighbors = np.divide(sharpe_neighbors, n_neighbors)
    
    # (Individual SR + neighbors SR) / 2
    results_sharpe_combined = np.round(np.divide(np.add(results_sharpe, results_sharpe_neighbors), 2), 3)
    
    # Get index from best SR
#    fast_index_ind, slow_index_ind = np.unravel_index(np.argmax(results_sharpe, axis=None), results_sharpe.shape)
    fast_index, slow_index = np.unravel_index(np.argmax(results_sharpe_combined, axis=None), results_sharpe_combined.shape)

#     print("Individual: {}-{}".format(fast_ma[fast_index_ind], slow_ma[slow_index_ind]))
#     print("With NN: {}-{}".format(fast_ma[fast_index], slow_ma[slow_index]))
    
    return fast_index, slow_index, results_sharpe_combined

In [24]:
def show_plot(df, start='1975', end='2020', norm=True, benchmark=True, position=True, MAs=False):
    df_plot = df.copy()
    #start = df_plot.index.values[0]
    #end = df_plot.index.values[-1]
    
    cols = ['Strat_cum_ret']
    title = 'MA crossover strategy'
    
    if (benchmark == True):
        title = 'SP500 vs ' + title  
        if norm == False:
            cols.append('Close')
        else:
            cols.append('Market_cum_ret')
    if norm == False:
        df_plot['Strat_cum_ret'] = df_plot['Strat_cum_ret'] * df_plot.loc[df_plot.index.values[0], 'Close'] / df_plot.loc[df_plot.index.values[0], 'Strat_cum_ret']
            
    if MAs == True:
        cols.append('fast_ma')
        cols.append('slow_ma')
        
    df_plot.loc[start:end, cols].plot(title=title, grid=True)
    
    if position == True:
        df_plot.loc[start:end, 'Strat_position'].plot(secondary_y=True, legend='Strategy Position')

In [41]:
def show_oos_plot(results_df):
    # Calculate returns from benchmark
    results_df['Market_cum_ret'] = (results_df['Market_daily_ret'] + 1).cumprod() * ini_equity
    
    results_df.loc[results_df.index.values[0], 'OOS_daily_ret'] = results_df.loc[results_df.index.values[0], 'Market_daily_ret'] # Ret of first day = ret of market
    results_df['OOS_cum_ret'] = (results_df['OOS_daily_ret'] + 1).cumprod() * ini_equity

    results_df[['Market_cum_ret', 'OOS_cum_ret']].plot(title='OOS: SP500 vs Optimized MA crossover strategy', grid=True)
    results_df['Strat_position'].plot(secondary_y=True, legend='Strategy Position')

In [12]:
# Plots a heatmap with data from a matrix. 
def show_heatmap(data, plot_title, x_title, x_values, y_title, y_values):
    # Flip matrix vertically for better visualization
    data = np.flip(data, axis=0)
    fig, ax = plt.subplots(figsize=(11, 9))

    rdgn = sns.diverging_palette(h_neg=10, h_pos=130, as_cmap=True, s=80, l=50)
#    divnorm = DivergingNorm(vmin=data.min(), vcenter=0, vmax=data.max())
#    sns.heatmap(data, cmap=rdgn, norm=divnorm, annot=True, fmt ='.2', 
    fig = sns.heatmap(data, cmap=rdgn, annot=True, fmt =".2f", 
                vmin=-1.0, center=0, vmax=1.0,
                linecolor='black', cbar=True, ax=ax,
                xticklabels=x_values, yticklabels=np.flip(y_values))
    
    ax.set(title=plot_title, xlabel=x_title, ylabel=y_title)
    
    plt.yticks(rotation=0)

    plt.show()