In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import yfinance as yf
import datetime as dt
import copy
import time

## Utility Functions

Since we will be using 5m data, so we'll have to tweak our CAGR and volatitility functions accordingly to take into account 5m intraday data, since they were originally made for daily data. We have assumed 252 trading days in a year and 78 5M candles in a single trading session.

In [71]:
def ATR(DF,n):
    "function to calculate True Range and Average True Range"
    df = DF.copy()
    df['H-L']=abs(df['High']-df['Low'])
    df['H-PC']=abs(df['High']-df['Close'].shift(1))
    df['L-PC']=abs(df['Low']-df['Close'].shift(1))
    df['TR']=df[['H-L','H-PC','L-PC']].max(axis=1,skipna=False)
    df['ATR'] = df['TR'].rolling(n).mean()
    #df['ATR'] = df['TR'].ewm(span=n,adjust=False,min_periods=n).mean()
    df2 = df.drop(['H-L','H-PC','L-PC'],axis=1)
    return df2['ATR']

def CAGR(DF):
    "function to calculate the Cumulative Annual Growth Rate of a trading strategy"
    df = DF.copy()
    df["cum_return"] = (1 + df["Return"]).cumprod()
    n = len(df)/(252*78)
    CAGR = (df["cum_return"].tolist()[-1])**(1/n) - 1
    return CAGR

def volatility(DF):
    "function to calculate annualized volatility of a trading strategy"
    df = DF.copy()
    vol = df["Return"].std() * np.sqrt(252*78)
    return vol

def sharpe(DF,rf):
    "function to calculate sharpe ratio ; rf is the risk free rate"
    df = DF.copy()
    sr = (CAGR(df) - rf)/volatility(df)
    return sr
    

def max_dd(DF):
    "function to calculate max drawdown"
    df = DF.copy()
    df["cum_return"] = (1 + df["Return"]).cumprod()
    df["cum_roll_max"] = df["cum_return"].cummax()
    df["drawdown"] = df["cum_roll_max"] - df["cum_return"]
    df["drawdown_pct"] = df["drawdown"]/df["cum_roll_max"]
    max_dd = df["drawdown_pct"].max()
    return max_dd

## Getting Stock data
We are mainly focusing on stocks which see a lot of day to day action, ie. mostly tech stocks which have high trading volumes daily.

In [19]:
tickers = ["MSFT", "AAPL", "META", "AMZN", "INTC", "CSCO", "VZ", "IBM", "TSLA", "AMD"]
ohlc_intraday = {}  # Dictionary to store OHLC data for each stock

# Define start and end dates for data retrieval
end_date = dt.datetime.today()
start_date = end_date - dt.timedelta(37)  # Fetch data for at least 36 days

# Loop over tickers and retrieve intraday OHLC data
for ticker in tickers:
    ohlc_intraday[ticker] = yf.download(ticker, start=start_date, end=end_date, interval='5m')
    ohlc_intraday[ticker].dropna(inplace=True, how="all")

# Adjust datetime index to UTC timezone for consistency
for ticker in tickers:
    ohlc_intraday[ticker].index = ohlc_intraday[ticker].index.tz_convert('UTC')

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed


## Backtesting
The algorithm will be monitoring all the stocks candle by candle and if our criterias are met at a particular candle, the algorithm will place a buy/sell order along with a stop loss. We will be using candles closures above/below our levels to prevent fakeouts and getting trapped, as a candle closing above a certain support/resistance level gives more confidence of the trade playing out.

#### Strategy Rules :
1. Candle closures above the 20 period rolling max/min price.
2. Volume breaching 1.5 times the rolling max volume of the previous candle.
3. Generate a signal to go buy or sell and append to signal dictionary for the ticker.
4. The trailing stop loss will be set to close price of the candle plus minus 20 period ATR. 


In [26]:
# Calculating ATR and rolling max and min price as well as rolling max volume for each stock and consolidating this info by stock in a separate dataframe
ohlc_dict = copy.deepcopy(ohlc_intraday) # Creating a copy of our data so that we don't have to fetch it again
tickers_signal = {}
tickers_ret = {}
for ticker in tickers:
    ohlc_dict[ticker]['ATR'] = ATR(ohlc_dict[ticker],20)
    ohlc_dict[ticker]["Rolling_Max_P"] = ohlc_dict[ticker]['High'].rolling(20).max()
    ohlc_dict[ticker]['Rolling_Min_P'] = ohlc_dict[ticker]['Low'].rolling(20).min()
    ohlc_dict[ticker]['Rolling_Max_Volume'] = ohlc_dict[ticker]['Volume'].rolling(20).max()
    ohlc_dict[ticker].dropna(inplace = True)
    tickers_signal[ticker] = ""
    tickers_ret[ticker] = []

In [67]:
# Identifying signals and calculating return (stop loss factored in)
for ticker in tickers:
    for i in range(len(ohlc_dict[ticker])):
        
        if tickers_signal[ticker] == "":
            tickers_ret[ticker].append(0)
            
            if ohlc_dict[ticker]['Close'][i] > ohlc_dict[ticker]["Rolling_Max_P"][i] and \
            ohlc_dict[ticker]['Volume'][i] > 1.5*ohlc_dict[ticker]['Rolling_Max_Volume'][i-1]:
                tickers_signal[ticker] = "Buy"
            
            elif ohlc_dict[ticker]['Close'][i] < ohlc_dict[ticker]['Rolling_Min_P'][i] and \
            ohlc_dict[ticker]['Volume'][i] > 1.5*ohlc_dict[ticker]['Rolling_Max_Volume'][i-1]:
                tickers_signal[ticker] = 'Sell'
                
        elif tickers_signal[ticker] == "Buy":
            # Changing the signal from buy to no signal if stop loss hits, and appending return.
            if ohlc_dict[ticker]['Close'][i] < ohlc_dict[ticker]['Close'][i-1] - ohlc_dict[ticker]['ATR'][i-1]: 
                tickers_signal[ticker] = ""
                tickers_ret[ticker].append((ohlc_dict[ticker]['Close'][i])/(ohlc_dict[ticker]['Close'][i-1])-1)
            # Changing the signal from buy to sell if conditions are met and appending return.
            elif ohlc_dict[ticker]['Close'][i] < ohlc_dict[ticker]['Rolling_Min_P'][i] and \
            ohlc_dict[ticker]['Volume'][i] > 1.5*ohlc_dict[ticker]['Rolling_Max_Volume'][i-1]:
                tickers_signal[ticker] = 'Sell'
                tickers_ret[ticker].append((ohlc_dict[ticker]['Close'][i]/ohlc_dict[ticker]['Close'][i-1])-1)
            # Keeping the signal and simply appending the return candle by candle.
            else:
                tickers_ret[ticker].append((ohlc_dict[ticker]['Close'][i]/ohlc_dict[ticker]['Close'][i-1])-1) 
        elif tickers_signal[ticker] == "Sell":
            # Changing the signal from sell to no signal if stop loss hits, and appending return.
            if ohlc_dict[ticker]['Close'][i] > ohlc_dict[ticker]['Close'][i-1] + ohlc_dict[ticker]['ATR'][i-1]: 
                tickers_signal[ticker] = ""
                tickers_ret[ticker].append((ohlc_dict[ticker]['Close'][i])/(ohlc_dict[ticker]['Close'][i-1])-1)
            # Changing the signal from sell to buy if conditions are met and appending return.
            elif ohlc_dict[ticker]['Close'][i] > ohlc_dict[ticker]["Rolling_Max_P"][i] and \
                ohlc_dict[ticker]['Volume'][i] > 1.5*ohlc_dict[ticker]['Rolling_Max_Volume'][i-1]:
                tickers_signal[ticker] = "Buy"
                tickers_ret[ticker].append((ohlc_dict[ticker]['Close'][i]/ohlc_dict[ticker]['Close'][i-1])-1)
            # Keeping the signal and simply appending the return candle by candle.
            else:
                tickers_ret[ticker].append((ohlc_dict[ticker]['Close'][i]/ohlc_dict[ticker]['Close'][i-1])-1) 

    ohlc_dict[ticker]['Return'] = np.array(tickers_ret[ticker])

  if ohlc_dict[ticker]['Close'][i] > ohlc_dict[ticker]["Rolling_Max_P"][i] and \
  elif ohlc_dict[ticker]['Close'][i] < ohlc_dict[ticker]['Rolling_Min_P'][i] and \
  if ohlc_dict[ticker]['Close'][i] > ohlc_dict[ticker]["Rolling_Max_P"][i] and \
  elif ohlc_dict[ticker]['Close'][i] < ohlc_dict[ticker]['Rolling_Min_P'][i] and \
  if ohlc_dict[ticker]['Close'][i] > ohlc_dict[ticker]["Rolling_Max_P"][i] and \
  elif ohlc_dict[ticker]['Close'][i] < ohlc_dict[ticker]['Rolling_Min_P'][i] and \
  if ohlc_dict[ticker]['Close'][i] > ohlc_dict[ticker]["Rolling_Max_P"][i] and \
  elif ohlc_dict[ticker]['Close'][i] < ohlc_dict[ticker]['Rolling_Min_P'][i] and \
  if ohlc_dict[ticker]['Close'][i] > ohlc_dict[ticker]["Rolling_Max_P"][i] and \
  elif ohlc_dict[ticker]['Close'][i] < ohlc_dict[ticker]['Rolling_Min_P'][i] and \
  if ohlc_dict[ticker]['Close'][i] > ohlc_dict[ticker]["Rolling_Max_P"][i] and \
  elif ohlc_dict[ticker]['Close'][i] < ohlc_dict[ticker]['Rolling_Min_P'][i] and \
  if ohlc_dict[t

## Calculating Overall Strategy's KPIs

In [69]:
strategy_df = pd.DataFrame()
for ticker in tickers:
    strategy_df[ticker] = ohlc_dict[ticker]["Return"]
strategy_df["Return"] = strategy_df.mean(axis=1)

In [72]:
CAGR(strategy_df)
sharpe(strategy_df,0.025)
max_dd(strategy_df)  

  sr = (CAGR(df) - rf)/volatility(df)


0.0