<a href="https://colab.research.google.com/github/btomlinson237/Ukraine-War-Reserach/blob/master/Tomlinson_Stock_Analysis_with_Dual_Indicator_Back_Testing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install yfinance

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import yfinance as yf
import plotly.graph_objs as go
import pandas as pd
import numpy as np
from plotly.subplots import make_subplots

In [3]:
# The following creates a dataframe of daily market data of SPEU, going back 1 year; by default, the data will collect a year of daily opening prices,
    # daily closing prices, daily price highs, daily price lows, adjusted closing prices, and trading volume
fullData = yf.download(tickers = 'SPEU', period = '1y', interval = '1d', prepost = True)

[*********************100%***********************]  1 of 1 completed


In [4]:
fullData.tail() # Displays data

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2023-05-03,39.330002,39.560001,39.27,39.290001,39.290001,19500
2023-05-04,39.09,39.25,38.950001,39.099998,39.099998,25600
2023-05-05,39.240002,39.700001,39.200001,39.630001,39.630001,26900
2023-05-08,39.790001,39.790001,39.639999,39.689999,39.689999,23200
2023-05-09,39.150002,39.43,39.150002,39.34,39.34,104800


In [5]:
def MACDcalc(data):
  # MACD is a technical indicator that tracks the momentum of trends by demonstrating the relationship between two moving averages of the price of a stock
  # This calculation, and most calculations, of MACD track the relationship subtract a short (usually 12-period) exponential moving average of the price
      # by a long (usually 26-period) exponential moving average of the price.
  # MACD is usually compared against a "signal line" that represents a 9-period exponential moving average of the price, and the intersections between MACD
      # and the signal line indicate a shift from a bear market to a bull market, or vice-versa

  ShortEMA = data.Close.ewm(span=12, adjust = False).mean() # Calculates the 12-period EMA
  LongEMA = data.Close.ewm(span=26, adjust=False).mean() # Calculates the 26-period EMA
  MACD = ShortEMA - LongEMA # Subtracts short EMA by long EMA to arrive at MACD

  nineWeekEMA = MACD.ewm(span=9, adjust = False).mean() # Calculates the 9-period EMA to be compared against MACD
  data['MACD'] = MACD # adds MACD calculations to SPEU dataframe
  data['9-Week EMA'] = nineWeekEMA # Adds 9-week EMA to SPEU dataframe

MACDcalc(fullData)
fullData.tail()

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume,MACD,9-Week EMA
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2023-05-03,39.330002,39.560001,39.27,39.290001,39.290001,19500,0.437055,0.504106
2023-05-04,39.09,39.25,38.950001,39.099998,39.099998,25600,0.390029,0.481291
2023-05-05,39.240002,39.700001,39.200001,39.630001,39.630001,26900,0.39102,0.463236
2023-05-08,39.790001,39.790001,39.639999,39.689999,39.689999,23200,0.392126,0.449014
2023-05-09,39.150002,39.43,39.150002,39.34,39.34,104800,0.360604,0.431332


In [6]:
# ADX is a technical indicator that tracks the strength of a market trend. Note that it indicates only the magnitude of the trend, and not the direction;
    # thus, a higher ADX indicates only the presence of either a bearish or bullish market.

def adxCalc(data):
  high = data['High'] # Sets high equal to the daily price peaks of SPEU
  low = data['Low'] # Sets low equal to the daily price troughs of SPEU
  close = data['Close'] # Sets close equal to the daily closing prices of SPEU
  lookback = 14 # Sets the "lookback" period equal to 14 periods, which is standard

  plus_dm = high.diff() # Establishes the positive Directional Movement (+DM) of SPEU using price peaks
  minus_dm = low.diff() # Establishes the negative Directional Movement (-DM) of SPEU using price troughs
  plus_dm[plus_dm < 0] = 0 # Normalizes positive Directional Movement to only be positive
  minus_dm[minus_dm > 0] = 0 # Normalizes negative Directional Movement to only be negative

  # Creates 3 dataframes to measure three differences
  tr1 = pd.DataFrame(high - low) # Calculates difference between a day's price peak and the same day's price trough
  tr2 = pd.DataFrame(abs(high - close.shift(1))) # Calculates absolute difference between day's price high and next day's closing price
  tr3 = pd.DataFrame(abs(low - close.shift(1))) # Calculates absolute difference between day's price low and next day's closing price

  frames = [tr1, tr2, tr3] # Merges the 3 differences into one dataframe (3 columns)

  # Calculates the true range (TR) by selecting the maximum of the three differences for each index(day)
  tr = pd.concat(frames, axis = 1, join = 'inner').max(axis = 1) # consolidates the dataframe into one column representing the TR (max of differences)

  # Average true range (ATR) is calculated by taking the average true range of the lookback period (14 periods)
  atr = tr.rolling(lookback).mean()


  plus_di = 100 * (plus_dm.ewm(alpha = 1/lookback).mean() / atr) # Establishes positive Directional Index(+DI) = EMA of +DM / ATR
  minus_di = abs(100 * (minus_dm.ewm(alpha = 1/lookback).mean() / atr)) # Establishes negative Directional Index(-DI) = EMA of -DM / ATR

  dx = (abs(plus_di - minus_di) / abs(plus_di + minus_di)) * 100 # Calculates Directional Index(DI) using +DI and -DI 
  adx = ((dx.shift(1) * (lookback - 1)) + dx) / lookback # Calculates the Average Directional Index(ADX) using DI and the lookback period
  adx_smooth = adx.ewm(alpha = 1/lookback).mean() # Smooths ADX to provide more accurate values by using a custom moving average

  data['ADX'] = adx_smooth # Adds (smoothed) ADX calculations to SPEU dataframe

adxCalc(fullData)
fullData.tail()

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume,MACD,9-Week EMA,ADX
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2023-05-03,39.330002,39.560001,39.27,39.290001,39.290001,19500,0.437055,0.504106,24.382819
2023-05-04,39.09,39.25,38.950001,39.099998,39.099998,25600,0.390029,0.481291,23.524426
2023-05-05,39.240002,39.700001,39.200001,39.630001,39.630001,26900,0.39102,0.463236,22.124881
2023-05-08,39.790001,39.790001,39.639999,39.689999,39.689999,23200,0.392126,0.449014,21.592415
2023-05-09,39.150002,39.43,39.150002,39.34,39.34,104800,0.360604,0.431332,21.162817


In [7]:
# Visualizes data from two indicators with respect to SPEU

# Creates 2 separate plots (since MACD values don't visually scale well with ADX values) - a MACD plot and an ADX plot with a shared x-axis(time)
SPEUfig = make_subplots(rows = 2, cols =1, shared_xaxes = True, subplot_titles = ("SPEU Live Moving Average Convergence/Divergence", "SPEU Live Average Directional Movement Index"))

# Establishes MACD and EMASignal series for upper plot
SPEUfig.append_trace(go.Scatter(x=fullData.index, y = fullData['MACD'], line=dict(color='blue', width = .8), name = 'MACD'), row =1, col = 1)
SPEUfig.append_trace(go.Scatter(x=fullData.index, y = fullData['9-Week EMA'], line = dict(color='red', width = .8), name = '9-Week EMA'), row = 1, col = 1)

# Establishes ADX series for lower plot
SPEUfig.append_trace(go.Scatter(x=fullData.index, y = fullData['ADX'], line = dict(color='green', width = .8), name = "Average Directional Movement Index"), row = 2, col = 1)

# Allows viewer to dynamically adjust the time interval for the indicator of interest; applies to both plots simultaneously
SPEUfig.update_xaxes(
    rangeslider_visible=False,
    rangeselector=dict(
        buttons=list([
            dict(count=14, label="1 Week", step="day", stepmode="backward"),
            dict(count=40, label="1 Month", step="day", stepmode="backward"),
            dict(count=1, label="HTD", step="hour", stepmode="todate"),
            dict(count=1, label="1 Day", step="day", stepmode="backward"),
            dict(step="all")
        ])
    )
)

# Sets axis titles for each plot
SPEUfig['layout']['xaxis2']['title']= 'Date'
SPEUfig['layout']['yaxis']['title'] = 'MACD Value'
SPEUfig['layout']['yaxis2']['title'] = 'ADX Value'

# Sets title for the visual containing both plots
SPEUfig.update_layout(title_text='SPDR Portfolio Europe ETF: Live MACD and ADX Performance', xaxis_rangeslider_visible = False, height = 600)

# Displays the visual containing both plots
SPEUfig.show()

In [8]:
testData = yf.download('SPEU', start = "2022-03-01", end = "2023-3-01")
MACDcalc(testData)
adxCalc(testData)
testData.tail()

[*********************100%***********************]  1 of 1 completed


Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume,MACD,9-Week EMA,ADX
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2023-02-22,37.639999,37.740002,37.470001,37.490002,37.314678,20600,0.267777,0.401472,30.165377
2023-02-23,37.779999,37.779999,37.439999,37.740002,37.563507,23800,0.236601,0.368498,28.982623
2023-02-24,37.16,37.189999,36.98,37.189999,37.016075,21800,0.165605,0.327919,27.828484
2023-02-27,37.59,37.759998,37.59,37.68,37.503788,16300,0.147182,0.291772,26.074908
2023-02-28,37.630001,37.650002,37.330002,37.349998,37.175327,24900,0.104746,0.254367,25.157325


In [9]:
def setMACDSignal():
  hasEnteredMarket = False;
  for i in range(0,len(testData)):
    if testData['MACD'].iloc[i] > testData['9-Week EMA'].iloc[i] and testData['MACD'].iloc[i-1] < testData['9-Week EMA'].iloc[i-1]:
        macdBS.append(1)
        hasEnteredMarket = True;
    elif (testData['MACD'].iloc[i] < testData['9-Week EMA'].iloc[i] and testData['MACD'].iloc[i-1] > testData['9-Week EMA'].iloc[i-1]) and (hasEnteredMarket == True):
        macdBS.append(-1)
    else:
        macdBS.append(0)


macdBS= []
setMACDSignal()
testData['MACD B/S'] = macdBS

In [10]:
def setADXSignal():
  for i in range(0,len(testData)):
    if testData['ADX'].iloc[i] > 20 and testData['ADX'].iloc[i-1] < 20:
        adxBS.append(1)
    else:
        adxBS.append(0)

adxBS = []
setADXSignal()
testData['ADX B/S'] = adxBS

In [11]:
def setCombinedSignal(stopLossConstant, marketData):
  combinedBS = []
  currentlyHolding = False
  stopLoss = 0
  for i in range(0,len(testData)):
    if (testData['ADX B/S'].iloc[i-10:i].sum() == 1 and testData['MACD B/S'].iloc[i-10:i].sum() == 1) and (currentlyHolding == False):
      combinedBS.append(1)
      currentlyHolding = True
      stopLoss = stopLossConstant * testData['Close'].iloc[i]
    elif ((testData['ADX B/S'].iloc[i-10:i].sum() == 1) and (testData['MACD B/S'].iloc[i-10:i].sum() == -1)) and (currentlyHolding == True):
      combinedBS.append(-1)
      currentlyHolding = False
      stopLoss = 0
    elif ((testData['Close'].iloc[i] < stopLoss) and (currentlyHolding == True)):
      combinedBS.append(-1)
      currentlyHolding = False
      stopLoss = 0
    elif (i == len(testData) - 1) and (currentlyHolding == True):
      combinedBS.append(-1)
      currentlyHolding = False
      stopLoss = 0
    else:
        combinedBS.append(0)
  
  trialData = marketData
  trialData['Combined B/S'] = combinedBS
  return trialData

In [12]:
def calcReturns(marketData):
  entryPrices = []
  entryDates = []
  exitPrices = []
  exitDates = []
  inMarket = False

  for i in range(marketData.shape[0]):
    if (inMarket == False) and (marketData.iloc[i]["Combined B/S"] == 1):
      inMarket = True
      entryPrices.append((marketData.iloc[i]["Close"]))
      entryDates.append(marketData.iloc[i].name)
    if (inMarket == True) and (marketData.iloc[i]["Combined B/S"] == -1):
      inMarket = False
      exitPrices.append(marketData.iloc[i]["Close"])
      exitDates.append(marketData.iloc[i].name)
  
  if (len(entryPrices) > len(exitPrices)):
    exitPrices.append(marketData.iloc[-1]["Close"])
    exitDates.append(marketData.iloc[-1].name)

  trades = pd.DataFrame({'Entry Prices':entryPrices, 'Exit Prices':exitPrices})
  profits = trades['Profits'] = trades['Exit Prices'] - trades['Entry Prices']
  relativeProfits = trades['Relative Profits'] = trades['Profits'] / trades['Entry Prices']
  averageProfit = relativeProfits.mean()
  averageProfit = round(averageProfit,3)
  
  return averageProfit

In [13]:
def stopLossOptimization(marketData):
  initialStopLoss = 0.5
  loopBound = range(51)
  stopLosses = []
  stopLossReturns = []
  for i in loopBound:
    stopLoss = initialStopLoss + (0.01 * i)
    stopLoss = round(stopLoss,2)
    stopLosses.append(stopLoss)
    
    stopLossTest = setCombinedSignal(stopLoss,marketData)
    stopLossReturn = calcReturns(stopLossTest)
    stopLossReturns.append(stopLossReturn)

  stopLossResults = pd.DataFrame({'Potential Stop Losses':stopLosses, 'Average Return':stopLossReturns})

  maxReturn = stopLossResults['Average Return'].idxmax()
  maxReturnDuplicates = stopLossResults.loc[stopLossResults['Average Return'] == stopLossResults['Average Return'].max()]
  safestStopLoss = maxReturnDuplicates['Potential Stop Losses'].idxmax()
  optimizedStopLoss = stopLossResults.loc[safestStopLoss, 'Potential Stop Losses']
  
  return optimizedStopLoss

In [14]:
finalStopLoss = stopLossOptimization(testData)
testData = setCombinedSignal(finalStopLoss,testData)
testProfit = calcReturns(testData)

print("Average Return: ")
print("%.0000f%%" % (100 * testProfit))

Average Return: 
7%


In [97]:
testMarketData = go.Candlestick(x=testData.index, open = testData['Open'], high = testData['High'], low=testData['Low'], close = testData['Close'], name = 'Market Data')
testMACD = go.Scatter(x = testData.index, y = testData['MACD'], line=dict(color='blue', width = .8), name = 'MACD')
test9wEMA = go.Scatter(x = testData.index, y = testData['9-Week EMA'], line = dict(color='red', width = .8), name = '9-Week EMA')
testADX = go.Scatter(x = testData.index, y = testData['ADX'], line = dict(color = 'green', width =.8), name = 'ADX')


testFig = go.Figure()

testFig = make_subplots(rows = 3, cols =1, shared_xaxes = True, subplot_titles = ("Price Data", "SPEU Moving Average Convergence/Divergence",
                                                                                  "SPEU Average Directional Movement Index"))

testFig.add_trace(testMarketData, row = 1, col =1)
testFig.add_trace(testMACD, row =2, col =1)
testFig.add_trace(test9wEMA, row = 2, col =1)
testFig.add_trace(testADX, row = 3, col =1)

testFig.add_shape(type="line",
                  x0=testData.index[0],
                  y0=20,
                  x1=testData.index[-1],
                  y1=20,
                  line=dict(color="black", width=0.75, dash="dash"),
                  row=3, col=1)


testFig.update_xaxes(rangeslider_visible=False,
                 rangeselector=dict(buttons=list([
                     dict(count=15, label="15m", step="minute", stepmode="backward"),
                     dict(count=45, label="45m", step="minute", stepmode="backward"),
                     dict(count=1, label="HTD", step="hour", stepmode="todate"),
                     dict(count=3, label="3h", step="hour", stepmode="backward"),
                     dict(step="all")
                 ])))


testFig.add_trace(go.Scatter(x=testData.index[testData["Combined B/S"]==1], y = testData["Close"][testData["Combined B/S"]==1], 
                             mode = "markers", marker_color = "darkgreen", marker_symbol = "arrow-up", marker_size = 15,
                             name = "Buy Signal"), row = 1, col =1)

testFig.add_trace(go.Scatter(x=testData.index[testData["Combined B/S"]==-1], y = testData["Close"][testData["Combined B/S"]==-1], 
                             mode = "markers", marker_color = "darkred", marker_symbol = "arrow-down", marker_size = 15,
                             name = "Sell Signal"), row = 1, col = 1)

for signal in testData.index[testData["Combined B/S"] == 1]:
    testFig.add_shape(type="line", x0=signal, y0=testData["MACD"].min(), x1=signal, y1=testData["MACD"].max(),
                      line=dict(color="green", width=1, dash="dash"), row=2, col=1)

for signal in testData.index[testData["Combined B/S"] == -1]:
    testFig.add_shape(type="line", x0=signal, y0=testData["MACD"].min(), x1=signal, y1=testData["MACD"].max(),
                      line=dict(color="red", width=1, dash="dash"), row=2, col=1)
    
for signal in testData.index[testData["Combined B/S"] == 1]:
    testFig.add_shape(type="line", x0=signal, y0=testData["ADX"].min(), x1=signal, y1=testData["ADX"].max(),
                      line=dict(color="green", width=1, dash="dash"), row=3, col=1)

for signal in testData.index[testData["Combined B/S"] == -1]:
    testFig.add_shape(type="line", x0=signal, y0=testData["ADX"].min(), x1=signal, y1=testData["ADX"].max(),
                      line=dict(color="red", width=1, dash="dash"), row=3, col=1)

testFig.update_layout(title = 'SPEU Historical Stock Data',legend=dict(orientation="h", yanchor="top", y=1.15, xanchor="center", x = 0.5))

testFig.add_annotation(text="Market data for SPEU collected from: 3/1/2022 to 3/1/2023<br>Average Return for this MACD and ADX-based strategy is {:.0%}".format(testProfit),
                        xref="paper", yref="paper",
                        x=0.5, y=-0.2,
                        showarrow=False,
                        font=dict(size=14, color="black"))


testFig.show()

