In [22]:
import numpy as np
import pandas as pd
import yfinance as yf
import plotly.graph_objs as go
from plotly.subplots import make_subplots
import matplotlib.pyplot as plt
from scipy.signal import find_peaks as fp
import matplotlib.dates as mdates
import time
import vectorbt as vbt
import datetime as dt

In [389]:
def download_data(tickers, start, end, interval):
  fx = {}
  for ticker in tickers:
      fx[ticker] = yf.download(tickers=ticker, start=start, end=end, interval=interval)
      if fx[ticker].empty:
          fx.pop(ticker)
  return fx

# Data download
# Resolutions: 1/2/5/15/30/60/90m | 1h | 1/5d | 1wk | 1/3mo
# 1d --> Unlimited probably
# 1h --> 730 days
# 2/5/15/90m --> 60 days
# 1m --> 7 days
tickers = [
  'EURUSD=X',
  'GBPUSD=X', 
  'JPYUSD=X', 
  'AUDUSD=X',
  'CADUSD=X',
  'BTC-USD', 
  'SPY',]
end = dt.datetime.now() 
start = end - dt.timedelta(days = 730)
interval = '1h'
data = download_data(tickers, start, end, interval)

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed


[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed


In [101]:
tickers = [
  'ES=F',
  'YM=F',
  'NQ=F',
  'SPY']
end = dt.datetime.now() 
start = end - dt.timedelta(days = 730)
interval = '1h'
data = download_data(tickers, start, end, interval)

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed


In [3]:
# Plotting Functions
def MAobj(single_data, period):
  data = single_data.copy()
  data[f'{period}MA'] = data['Open'].rolling(period).mean()
  ma_obj = go.Scatter(x=data.index, y=data[f'{period}MA'], mode='lines', name=f'{period}MA') # line=dict(color='blue'))
  return ma_obj

def PTobj(single_data, pt, pt_string):
  data = single_data.copy()['Open']
  index = data.index
  if pt_string == 'peak':
    pt_obj = go.Scatter(x=index[pt], y=data[pt], mode='markers', name='Peaks', line=dict(color='green'))
  else:
    pt_obj = go.Scatter(x=index[pt], y=data[pt], mode='markers', name='Troughs', line=dict(color='red'))
  return pt_obj

def vbtPriceobj(vbt_data):
  data = vbt_data.copy()
  vbtPrice_obj = go.Scatter(x=data.index, y=data, mode='lines', name='Prices')
  return vbtPrice_obj

def Candleobj(single_data):
  data = single_data.copy()
  candle_obj = go.Candlestick(
        x = data.index,
        open = data['Open'],
        high = data['High'],
        low= data['Low'],
        close= data['Close'],
        name = 'market data'
        )
  return candle_obj

def Colobj(single_data, col, color):
  data = single_data.copy()
  col_obj = go.Scatter(x=data.index, y=data[col], mode='lines', name=col, line=dict(color=color))
  return col_obj

def XYobj(index,y,name, color):
  obj = go.Scatter(x=index, y=y, mode='lines',name=name, line=dict(color=color))
  return obj

def plotObjects(objList, indList):
  layout = go.Layout(
      title='Plot',
      xaxis=dict(title='Date', rangeslider=dict(visible=True)),
      yaxis=dict(title='Price'),
      xaxis_fixedrange=False,  # Enable scrolling on the x-axis
      yaxis_fixedrange=False,  # Enable scrolling on the y-axis
      dragmode='pan',  # Enable panning
  )
  rows = len(indList)
  if rows == 0: # No indicators
    fig = go.Figure(data=objList, layout=layout)
  else:
    fig = make_subplots(rows=rows+1, cols=1, shared_xaxes=True, vertical_spacing=0.3)
    for obj in objList:
      fig.add_trace(obj, row=1, col=1)
    
    for i, ind in enumerate(indList):
      for n in range(len(ind)):
        fig.add_trace(ind[n], row=i+2, col=1)
    fig.update_layout(layout)

  # fig = go.Figure(data=objList, layout=layout)
  fig.show()

In [24]:
def regressionLine(x, y):
  coef = np.polyfit(x,y,1)
  return coef

def crossover(df1, df2): # goes above
  cur = df1 > df2
  return cur

def split(closing_price, sigs, ratio = 0.5):
  split_index = int(round(len(closing_price) * ratio, 0))

  insample_sigs = {
    'long_entries': sigs['long_entries'][:split_index],
    'long_exits': sigs['long_exits'][:split_index],
    'short_entries': sigs['short_entries'][:split_index],
    'short_exits': sigs['short_exits'][:split_index],
  }
  insample = {
    'closing_prices': closing_price[:split_index],
    'sigs': insample_sigs,
  }
  
  outsample_sigs = {
    'long_entries': sigs['long_entries'][split_index:],
    'long_exits': sigs['long_exits'][split_index:],
    'short_entries': sigs['short_entries'][split_index:],
    'short_exits': sigs['short_exits'][split_index:],
  }
  outsample = {
    'closing_prices': closing_price[split_index:],
    'sigs': outsample_sigs,
  }

  return insample, outsample, split_index

def get_SPYret(spy_data, datatype):
  split_index = int(round(len(spy_data) * 0.5,0))
  if datatype == 'insample':
    spy_close = spy_data[:split_index]['Close']
  elif datatype == 'outsample':
    spy_close = spy_data[split_index:]['Close']
  else:
    spy_close = spy_data['Close']
  
  spy_ret = spy_close.vbt.to_returns()
  spy_ret = spy_ret.values
  spy_ret[0] = 0
  spy_ret = np.cumprod(1+spy_ret)[-1] - 1
  
  return spy_ret

def dynamicLookback(close, x, max, min):
  opt_lookback = 0
  opt_sd = np.inf
  for lookback in range(min,max+1):
    lookback_y = close[x-lookback:x].values # 1st: 0 to 49 inclusive
    price_sd = np.std(lookback_y)
    if price_sd < opt_sd:
      opt_sd = price_sd
      opt_lookback = lookback
  return opt_lookback

def percentInChannel(close, x_val, m, b):
  prices = close[x_val].values
  price_sd = np.std(prices)
  upper = m * x_val + b + 1 * price_sd
  lower = m * x_val + b - 1 * price_sd
  upper_bool = prices < upper
  lower_bool = prices > lower
  real = upper_bool & lower_bool
  perc = np.sum(real) / len(real)
  return perc > 0.68

def aroonUp(close, x_val): # >70 is uptrend, long only
  index = close.index[x_val]
  prices = close.loc[index].values
  period = len(x_val)
  highprice_ind = np.argmax(prices)
  bars_since_high = period - highprice_ind - 1
  aroon_ind = (period - bars_since_high) * 100 / period
  return aroon_ind 

def aroonDown(close, x_val): # >70 is downtrend, shorts only
  index = close.index[x_val]
  prices = close.loc[index].values
  period = len(x_val)
  lowprice_ind = np.argmin(prices)
  bars_since_low = period - lowprice_ind - 1
  aroon_ind = (period - bars_since_low) * 100 / period
  return aroon_ind 

def adj_mask(sig, adj_bool):
  df = pd.DataFrame({'a': sig, 'b': adj_bool})
  df['a and b'] = df[['a', 'b']].all(axis=1)
  masked_list = pd.Series(df['a and b'])
  return masked_list

def reverse_ticker(ticker):
  # EURUSD=X to EUR=X
  index = ticker.find('USD')
  new_ticker = ticker[:index] + ticker[index+3:]

  return new_ticker

def sigToPos(signal): # Long only 
  state = 0
  entry = []
  exit = []
  for i in range(len(signal)):
    if state == 0:
      if signal[i] == 1:
        entry.append(True)
        exit.append(False)
        state = 1
      else:
        entry.append(False)
        exit.append(False)
    elif state == 1:
      if signal[i] == -1:
        exit.append(True)
        entry.append(False)
        state = 0
      else:
        entry.append(False)
        exit.append(False)
  return entry, exit

In [25]:
def getSharpe(pf):
  hours_per_year = 252*24
  hourly_rf = np.power(1.0467,1/hours_per_year)-1
  retdiff = pf.returns() - hourly_rf
  mult = np.sqrt(hours_per_year)
  sharpe = mult * np.mean(retdiff) / np.std(retdiff)
  return sharpe

def getCAGR(pf,portfolio_period):
  trading_period = 252*24
  final_ret = pf.cumulative_returns().iloc[-1]
  cagr = np.power((1+final_ret),trading_period/portfolio_period)-1
  return cagr

def getMAR(pf, portfolio_period):
  cagr = getCAGR(pf, portfolio_period)
  mdd = pf.max_drawdown()
  mar = cagr/np.abs(mdd)
  return mar


In [26]:
def LinearRegressionChannel(closing_prices, params):
  close = closing_prices.copy() # For vbt data
  index = closing_prices.index

  bfl_y = []
  U_y = []
  L_y = []
  lookback_list = []

  lookback = params['lookback']
  max_lookback = params['max_lookback']
  min_lookback = params['min_lookback']
  dynlookback = params['dynlookback']
  sd_mult = params['sd_mult']
  if dynlookback:
    all_x = np.arange(max_lookback, len(close))
  else:
    all_x = np.arange(lookback, len(close))
  close_comp = pd.Series(close[lookback:], index=index[all_x])

  slopes = []
  slopes_bool = [] # True if can trade, False if no trade
  slope_adj = params['slope_adj']

  pic_bool = []
  pic_adj = params['pic_adj']

  aroon_adj = params['aroon_adj']
  aroonLong = []
  aroonShort = []
  aroonUptrend = []
  aroonDowntrend = []

  for x,y in enumerate(close):
    if (x >= lookback and not dynlookback) or (x >= max_lookback and dynlookback):
      if dynlookback:
        lookback = dynamicLookback(close, x, max_lookback, min_lookback)
      lookback_list.append(lookback)
      lookback_y = close[x-lookback:x].values # 1st: 0 to 49 inclusive
      lookback_x = np.arange(x-lookback,x)
      price_coef = regressionLine(lookback_x, lookback_y)

      if slope_adj: # Valid for slope_adj
        m = price_coef[0]
        m_thresh = params['m_thresh']
        if m > m_thresh or m < -m_thresh:
          slopes_bool.append(False)
        else: 
          slopes_bool.append(True)
      
      if pic_adj: # Valid for pic_adj
        pic_bool.append(percentInChannel(close, lookback_x, price_coef[0], price_coef[1]))
      
      if aroon_adj:
        aroon_lb = params['aroon_lb']
        if x >= aroon_lb:
          aroonlookback_x = np.arange(x-aroon_lb,x)
          uptrend = aroonUp(close, aroonlookback_x)
          downtrend = aroonDown(close, aroonlookback_x)
          aroonUptrend.append(uptrend)
          aroonDowntrend.append(downtrend)

          u_line = params['lineuthresh']
          d_line = params['linedthresh']

          if uptrend > u_line and downtrend < d_line: # Long only
            aroonLong.append(True)
            aroonShort.append(False)
          elif downtrend > u_line and uptrend < d_line: # Short only
            aroonLong.append(False)
            aroonShort.append(True)
          else: # Both
            aroonLong.append(True)
            aroonShort.append(True)
        else:
          aroonLong.append(True)
          aroonShort.append(True)

      price_sd = np.std(lookback_y)
      mean_y = price_coef[0] * x + price_coef[1]
      upper_y = mean_y + sd_mult * price_sd
      lower_y = mean_y - sd_mult * price_sd

      # Check if channel is volatile or not OR check if 90% of data is within the channel
      U_y.append(upper_y)
      L_y.append(lower_y)
      bfl_y.append(mean_y)
      slopes.append(price_coef[0])
  
  # Strategy
  upper_series = pd.Series(L_y, index=index[all_x])
  lower_series = pd.Series(U_y, index=index[all_x])
  long_entries = crossover(lower_series, close_comp) # crossover(close_comp, pd.Series(L_y, index=index[all_x])) # crossover(pd.Series(L_y, index=index[all_x]), close_comp)
  long_exits = crossover(close_comp, upper_series) # Change here
  short_exits = crossover(lower_series, close_comp) # crossover(pd.Series(L_y, index=index[all_x]), close_comp) # Change here
  short_entries = crossover(close_comp, upper_series) # crossunder(close_comp, pd.Series(U_y, index=index[all_x]))# crossover(close_comp, pd.Series(U_y, index=index[all_x]))

  if slope_adj:
    slopes_bool = pd.Series(slopes_bool, index=index[all_x])
    long_entries = adj_mask(long_entries, slopes_bool)
    short_entries = adj_mask(long_entries, slopes_bool)
  
  if pic_adj:
    pic_bool = pd.Series(pic_bool, index=index[all_x])
    long_entries = adj_mask(long_entries, pic_bool)
    short_entries = adj_mask(long_entries, pic_bool)
  
  if aroon_adj:
    aroonLong = pd.Series(aroonLong, index=index[all_x])
    aroonShort = pd.Series(aroonShort, index=index[all_x])
    long_entries = adj_mask(long_entries, aroonLong)
    short_entries = adj_mask(short_entries, aroonShort)

  sigs = {
    'long_entries': long_entries,
    'long_exits': long_exits,
    'short_entries': short_entries,
    'short_exits': short_exits
  }

  adjustment_list = [
    slopes,
    lookback_list,
    (aroonUptrend, aroonDowntrend),
  ]

  return U_y, L_y, bfl_y, all_x, sigs, adjustment_list

In [425]:
# Fixed
binance_fee = 0.001
fx_fee = 0.00002
params = {
    'ticker': 'BTC-USD',
    'split_ratio': 0.5,
    'lookback': 36,
    'sd_mult': 2.0,
    'asset_type': 'fx', # 'crypto'
    'data_type': 'all',
    'sl_stop': 0.1,
    'slope_adj': False, # Slope mask
    'm_thresh': None,
    'pic_adj': False, # PercentInChannel mask
    'aroon_adj': False, # Aroon indicator
    'lineuthresh': 70,
    'linedthresh': 30,
    'aroon_lb': 720,
    'dynlookback': False, # Dynamic lookback
    'max_lookback': 70,
    'min_lookback': 50,
}
if params['asset_type'] == 'fx':
    fee_type = fx_fee
elif params['asset_type'] == 'crypto':
    fee_type = binance_fee

In [426]:
fxdata = data[params['ticker']] 
closing_prices = fxdata['Close']
log_prices = np.log(fxdata['Close'])
U_y, L_y, bfl_y, all_x, sigs, adjustment_list = LinearRegressionChannel(closing_prices, params)
U_y = np.array(U_y)
L_y = np.array(L_y)
all_x = np.array(all_x)
index = fxdata.index[all_x]

if params['dynlookback']:
  index_req = fxdata.index[:params['max_lookback']]
else:
  index_req = fxdata.index[:params['lookback']]
lookback_cover = pd.Series(False, index=index_req)
sigs['long_entries'] = pd.concat([lookback_cover, sigs['long_entries']])
sigs['long_exits'] = pd.concat([lookback_cover, sigs['long_exits']])
sigs['short_entries'] = pd.concat([lookback_cover, sigs['short_entries']])
sigs['short_exits'] = pd.concat([lookback_cover, sigs['short_exits']])

insample, outsample, split_index = split(closing_prices, sigs, params['split_ratio'])

In [427]:
plot = 0
aroonUptrend, aroonDowntrend = adjustment_list[2]

if plot:
  obj_list = []
  ind_list = []
  obj_list.append(XYobj(index, U_y, 'Upper bound', 'green')) # Upper bound object
  obj_list.append(XYobj(index, L_y, 'Lower bound', 'red')) # Lower bound object
  # obj_list.append(XYobj(index, bfl_y, 'Best fit line')) # Lower bound object
  # obj_list.append(XYobj(index, slopes, 'Slopes')) # Slopes
  # obj_list.append(Candleobj(fxdata)) # Candle data
  obj_list.append(Colobj(fxdata, 'Close', 'black'))
  # obj_list.append(XYobj(log_prices.index, log_prices, 'Log prices', 'black')) # Log price

  lineuthresh = params['lineuthresh']
  linedthresh = params['linedthresh']
  lineU = np.ones(len(index)) * lineuthresh
  lineD = np.ones(len(index)) * linedthresh
  aroon = [
    XYobj(index, aroonUptrend, 'Aroon Up', 'green'), 
    XYobj(index, aroonDowntrend, 'Aroon Down', 'red'), 
    XYobj(index, lineU, f'{lineuthresh}-threshold line', 'orange'),
    XYobj(index, lineD, f'{linedthresh}-threshold line', 'orange'),
  ]
  ind_list.append(aroon)
  plotObjects(obj_list, ind_list)

In [428]:
# Backtest in-sample/out-sample
data_type = params['data_type']
spy_split = int(round(len(data['SPY'])*params['split_ratio'],0))

if data_type == 'insample':
    close = insample['closing_prices']
    long_entries = insample['sigs']['long_entries']
    long_exits = insample['sigs']['long_exits']
    short_entries = insample['sigs']['short_entries']
    short_exits = insample['sigs']['short_exits']
    spy_close = data['SPY'][:spy_split]['Close']
elif data_type == 'outsample':
    close = outsample['closing_prices']
    long_entries = outsample['sigs']['long_entries']
    long_exits = outsample['sigs']['long_exits']
    short_entries = outsample['sigs']['short_entries']
    short_exits = outsample['sigs']['short_exits']
    spy_close = data['SPY'][spy_split:]['Close']
elif data_type == 'all':
    close = closing_prices
    long_entries = sigs['long_entries']
    long_exits = sigs['long_exits']
    short_entries = sigs['short_entries']
    short_exits = sigs['short_exits']
    spy_close = data['SPY']['Close']

metrics = [
    'start', 
    'end', 
    'period', 
    'start_value', 
    'end_value', 
    'total_return', 
    'benchmark_return',
    'total_fees_paid',
    'max_dd',
    'max_dd_duration',
    'total_trades',
    'total_closed_trades',
    'total_open_trades',
    'win_rate',
    'best_trade',
    'worst_trade',
]
    

In [429]:
# Perform backtest
spy_pf = vbt.Portfolio.from_signals(
    close = spy_close,
    init_cash = 1_000_000,
    freq = interval,
)
pf = vbt.Portfolio.from_signals(
    close = close,
    entries = long_entries,
    exits = long_exits, 
    short_entries = short_entries, 
    short_exits = short_exits, 
    # direction = 'both',
    # size = None,
    # size_type = None,
    freq = interval,
    fees=fee_type,
    # slippage = None,
    lock_cash = True,
    sl_stop = params['sl_stop'],
    init_cash = 1_000_000,
    # accumulate = 'addonly',
)
pf.stats(metrics=metrics)
# spy_pf.stats(metrics=metrics)

Start                    2022-05-24 13:00:00+00:00
End                      2024-05-23 05:00:00+00:00
Period                           727 days 21:00:00
Start Value                              1000000.0
End Value                            660966.965612
Total Return [%]                        -33.903303
Benchmark Return [%]                    139.202625
Total Fees Paid                       12243.328253
Max Drawdown [%]                         47.976929
Max Drawdown Duration            577 days 07:00:00
Total Trades                                   360
Total Closed Trades                            359
Total Open Trades                                1
Win Rate [%]                             67.966574
Best Trade [%]                           11.744108
Worst Trade [%]                         -13.982128
dtype: object

In [430]:
cagr = getCAGR(pf, len(close))
sharpe = getSharpe(pf)
mar = getMAR(pf, len(close))
# spy_cagr = getCAGR(spy_pf, len(spy_close))
# spy_sharpe = getSharpe(spy_pf)

print(f'----------data period: {data_type}----------')
print(params['ticker'])
print('lookback:', params['lookback'])
print('sd_mult:', params['sd_mult'])
print('sl_stop:', params['sl_stop'])
print('aroon_lb:', params['aroon_lb'])
print()
print(f'Strategy return over period: {round(pf.cumulative_returns().iloc[-1] * 100, 3)}%') # (SPY return: {round(spy_pf.total_return() * 100, 3)}%)')
# print(f'Sharpe: {round(sharpe,3)}') # (SPY Sharpe: {round(spy_sharpe,3)})')
print(f'MAR: {round(mar,3)}') 
print(f'MDD: {round(pf.max_drawdown()*100,3)}%') # (SPY MDD: {round(spy_pf.max_drawdown()*100,3)}%)')
# print(f'Alpha: {round(pf.alpha(),3)}') # (SPY Alpha: {round(spy_pf.alpha()*100,3)})
# print(f'Beta: {round(pf.beta(),4)}') # (SPY Beta: {round(spy_pf.alpha()*100,4)})
print(f'CAGR: {round(cagr * 100,2)}%') # (SPY CAGR: {round(spy_cagr * 100,2)}%)')
print(pf.stats('win_rate'))

----------data period: all----------
BTC-USD
lookback: 36
sd_mult: 2.0
sl_stop: 0.1
aroon_lb: 720

Strategy return over period: -33.903%
MAR: -0.278
MDD: -47.977%
CAGR: -13.35%
Win Rate [%]    67.966574
dtype: float64


In [431]:
print(f'mar: {round(mar,3)} | cagr: {round(cagr * 100,2)}% | mdd: {round(pf.max_drawdown()*100,3)}%')
# mar: -0.3607075557 | cagr: -4.021408% | mdd: -11.148666%

mar: -0.278 | cagr: -13.35% | mdd: -47.977%


In [432]:
pf.plot().show()

In [433]:
pf.plot_drawdowns()

FigureWidget({
    'data': [{'line': {'color': '#9467bd'},
              'name': 'Value',
              'showlegend': True,
              'type': 'scatter',
              'uid': '62133d86-523c-4c71-a7bd-f6ef4a44a4f0',
              'x': array([datetime.datetime(2022, 5, 24, 13, 0, tzinfo=datetime.timezone.utc),
                          datetime.datetime(2022, 5, 24, 14, 0, tzinfo=datetime.timezone.utc),
                          datetime.datetime(2022, 5, 24, 15, 0, tzinfo=datetime.timezone.utc),
                          ..., datetime.datetime(2024, 5, 23, 3, 0, tzinfo=datetime.timezone.utc),
                          datetime.datetime(2024, 5, 23, 4, 0, tzinfo=datetime.timezone.utc),
                          datetime.datetime(2024, 5, 23, 5, 0, tzinfo=datetime.timezone.utc)],
                         dtype=object),
              'y': array([1000000.        , 1000000.        , 1000000.        , ...,
                           661396.63504547,  660119.39526549,  660966.96561234])},
 