TRADING STRATEGY 2: Simple Moving Average - SMA (Stocks)

STEP 1: RETRIEVE TRADING 212 TICKERS - Use Trading 212 API to import stocks tickers and other data.

In [129]:

# Extracting ticker info from Trading212
import requests
import pandas as pd
import numpy as np
import yfinance as yf
import json

# url for Trading 212 web page used to extract ticker info (stores the API endpoint string in the variable).
url = "https://demo.trading212.com/api/v0/equity/metadata/instruments"

# Python dictionary with one key-value pair (API token used to access the ticker info).
headers = {"Authorization": "37572290ZhZqNnrNqreeZBJMBIzeLtusvWwxY"}

# Uses requests.get() to send a GET request to the Trading212 API.
response = requests.get(url, headers=headers)

if response.status_code == 200:

    # Convert response to JSON
    data = response.json()  

    # Convert to Pandas DataFrame
    dfs = pd.DataFrame(data)

    # Save as CSV
    dfs.to_csv("trading212_data.csv", index=False)

    print("✅ Data saved to trading212_data.csv")

else:
    print("❌ Failed to fetch data:", response.status_code)


✅ Data saved to trading212_data.csv


In [130]:

# Sort info from df dataframe into stocks and ETFs
stocks = dfs[ dfs["type"] == "STOCK" ]

# Extract "shortName" (ticker symbols) from "stocks" and assign to Pandas series
stock_tickers_212 = stocks[ "shortName" ]

# Removing duplicate tickers
stock_tickers_212 = np.unique(stock_tickers_212).tolist()

# First 156 tickers are removed as no pricing data is available for them on Yahoo Finance
stock_tickers_212 = stock_tickers_212[156:]

# Implement the previous steps for ETFs
etfs = dfs[ dfs["type"] == "ETF" ]
etf_tickers_212 = etfs[ "shortName" ]
etf_tickers_212 = np.unique(etf_tickers_212).tolist()

# Retrieve US stock sectors from CSV file
us_stock_sectors = pd.read_csv("../us_stock_sectors.csv")

# Declare dictionary for US stock sectors
us_stock_sect_dict = {}

# Create a key for each stock sectors
# This will be needed later to assign stock data to the correct sector
for sector in us_stock_sectors.values:

    us_stock_sect_dict[sector[0]] = []

# "all_tickers_with_info" is a list containing all tickers from Trading 212 that have retrievable stock data on Yahoo Finance
all_tickers_with_info = pd.read_csv("../all_tickers_with_info.csv")

# Convert the DataFrame "all_tickers_with_info" to a list
all_tickers_with_info = all_tickers_with_info.iloc[:, 0].tolist()


In [131]:

# Importing Packages and Libraries
import yfinance as yf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
from io import StringIO

plt.style.use('Solarize_Light2')

def get_sp500_tickers():
    url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0 Safari/537.36"
    }

    # Fetch page with user-agent
    response = requests.get(url, headers=headers)
    response.raise_for_status()

    # Use StringIO → avoids FutureWarning
    html = StringIO(response.text)

    # Read all tables
    tables = pd.read_html(html)

    # Find the table that contains the S&P 500 tickers
    table = None
    for t in tables:
        if "Symbol" in t.columns or "Ticker symbol" in t.columns:
            table = t
            break

    if table is None:
        raise ValueError("Could not find S&P 500 table in Wikipedia page")

    # Standardize column names
    table = table.rename(columns={
        "Symbol": "Symbol",
        "Ticker symbol": "Symbol",
        "GICS Sector": "GICS Sector",
        "GICS sector": "GICS Sector"
    })

    # Clean tickers for Yahoo Finance
    table["Symbol"] = table["Symbol"].str.replace(".", "-", regex=False)

    # Only keep needed columns
    return table[["Symbol", "GICS Sector"]]


# Initialize sector dictionary
sector_dict = {}

# Fetch all S&P 500 tickers and their sectors
sp500_data = get_sp500_tickers()

# Extract tickers only, convert to a list, and sort alphabetically
sp500_tickers = sorted(list(sp500_data["Symbol"]))

ticker_data = []


sp500_tickers.append("RIOT")

In [132]:

import random

five_tickers = random.sample( sp500_tickers, 4)
five_tickers.append("RIOT")
five_tickers

['FE', 'JBHT', 'GE', 'F', 'RIOT']

STEP 2: DOWNLOAD STOCK DATA - Import libraries, specify timeframe, and download stock data.

In [133]:

# Importing libraries
import yfinance as yf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pandas_ta as ta
from datetime import datetime
plt.style.use('Solarize_Light2')

# Function which calculates the Average True Range (AVR) for each time period (needed for setting a stop loss).
def calculate_atr(avr, period=14):
    
    # Finding the Average True Range (AVR)
    avr['H-L'] = avr['High'] - avr['Low']
    avr['H-PC'] = abs(avr['High'] - avr['Close'].shift(1))
    avr['L-PC'] = abs(avr['Low'] - avr['Close'].shift(1))
    avr['TR'] = avr[['H-L', 'H-PC', 'L-PC']].max(axis=1)
    avr['ATR'] = avr['TR'].rolling(window=period).mean()

# Function which calculates the stop loss value for short positions using ATR multiplier.
def calculate_stop_loss_ATR(entry_price, atr, multiplier, position_type):

    stop_loss = entry_price - (atr * multiplier) if position_type == "Long" else entry_price + (atr * multiplier)
    
    return stop_loss

# Range of different ATR multipliers
atr_mult = [ 1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75 ]

# Function which calculates the stop loss value for a specified limit (e.g. 10% above or below entry price).
def calculate_stop_loss(entry_price, limit, position_type):

    stop_loss = entry_price - entry_price * limit if position_type == "Long" else entry_price + entry_price * limit

    return stop_loss

# Range of different stop loss limits
sl_lim = [ 0.08, 0.1, 0.12, 0.14, 0.16, 0.18, 0.2, 0.22 ]

# Prevents accidental modifications to the original dataframe.
pd.options.mode.copy_on_write = True
# Show all rows when displaying pandas dataframes
pd.set_option("display.max_rows", None)

#Specifying length of moving averages
SMA1 = 50
SMA2 = 100

# Specifying the start-date and end-date
start_date = "2023-08-01"
end_date = "2025-01-01"

# This list stores the dataframes for each ticker.
a = []
df_ticker = []
avail_ticker = []
tolerance_days = 3

for ticker in range(len(five_tickers)):

    try:

        current_ticker = five_tickers[ticker]
        print(f"\nCurrent ticker: {current_ticker}")
        
        # Downloading ticker data from Yahoo Finance
        current_ticker_data = yf.download( current_ticker, start = start_date, end = end_date, auto_adjust = False )

        if current_ticker_data.empty:
            
            print(f"No price data available for {current_ticker}. Skipping...")
            continue  # Exit loop if no data
        
        earliest_date = current_ticker_data.index.min().date()
    
        if ( ( earliest_date - datetime.strptime(start_date, "%Y-%m-%d").date() ).days > tolerance_days ):
            
            print(f"Start date ({start_date}) is earlier than available data ({earliest_date}) for {current_ticker}. Skipping...")
            continue  # Exit loop if start date is too far back

        a.append(current_ticker_data)
        calculate_atr(current_ticker_data)
        
        df = current_ticker_data["Adj Close"]
        df["SMA_50"] = ta.sma( df.iloc[ :, 0 ], length = SMA1 )
        df["SMA_100"] = ta.sma( df.iloc[ :, 0 ], length = SMA2 )
        df["Long"] = ["--"]*len(df["SMA_50"])
        df["Short"] = ["--"]*len(df["SMA_50"])
        df = pd.concat([ df , current_ticker_data[ "ATR" ] ], axis=1)
        df = df[SMA2 - 2:] 
        stop_loss_col = ["--"]*( len(df["SMA_50"]) )
        df.insert(loc=4, column='SL-l', value=stop_loss_col)
        df.insert(loc=6, column='SL-s', value=stop_loss_col)

        df_ticker.append( df )
        avail_ticker.append(current_ticker)

    except Exception as e:

        print(f"Failed to download data for {current_ticker}. Reason: {e}")
        continue
    
def sort_and_combine(data, keys):
    sorted_data = [[] for _ in range(len(keys))]
    for i, key in enumerate(keys):
        for ticker_data in data:
            for df in ticker_data:
                if str(key) in df.columns.name:
                    sorted_data[i].append(df)
    combined = []
    for group in sorted_data:
        group_dfs = []
        for df in group:
            dft = df.copy()
            dft.index = [dft.index.name]
            dft.index.name = None
            group_dfs.append(dft)
        combined.append(pd.concat(group_dfs))
    return combined

def sort_dfs(dfs, columns):
    return {col: [df.sort_values(by=col, ascending=False) for df in dfs] for col in columns}



Current ticker: FE


[*********************100%***********************]  1 of 1 completed



Current ticker: JBHT


[*********************100%***********************]  1 of 1 completed



Current ticker: GE


[*********************100%***********************]  1 of 1 completed



Current ticker: F


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


Current ticker: RIOT





STEP 3: APPLY STRATEGY - Indicate where buy and sell postions should execute

In [134]:

# This take profit strategy exits positions based on either a signal reversal or a predefined indicator-based condition.
# Long trigger: Buy stock when current price > SA1 and when SA1 > SA2. Take profit when any of these conditions become false.
# Short trigger: Short stock when current price < SA1 and when SA1 < SA2. Take profit when any of these conditions become false.

df_triggers = []

for d in range(len(df_ticker)):

    df = df_ticker[d]
    
    for item in range(len(df)):
    
        # Calculating returns for long positions
        if ( (df.iloc[item, 0] > df.iloc[item, 1]) and (df.iloc[item, 1] > df.iloc[item, 2]) ):
    
            df.iloc[item, 3] = 1
            
        # Calculating returns for short positions
        if ( (df.iloc[item, 0] < df.iloc[item, 1]) and (df.iloc[item, 1] < df.iloc[item, 2]) ):
    
            df.iloc[item, 5] = 1

    df_triggers.append( df )

print("Done")
df_triggers[0]
# By isolating this section of code, you can dynamically modify the boolean expressions based on which strategy is being used.
# This code will change for different strategies whereas the code for different take profit methods should not. 

Done


Unnamed: 0_level_0,FE,SMA_50,SMA_100,Long,SL-l,Short,SL-s,ATR
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2023-12-19,33.954269,33.400543,,--,--,--,--,0.700715
2023-12-20,33.282913,33.429236,33.014306,--,--,--,--,0.717143
2023-12-21,33.411671,33.452501,32.997556,--,--,--,--,0.702857
2023-12-22,33.319695,33.476837,32.983216,--,--,--,--,0.713571
2023-12-26,33.420864,33.49774,32.982936,--,--,--,--,0.71
2023-12-27,33.402473,33.508089,32.986838,--,--,--,--,0.686428
2023-12-28,33.650784,33.524859,32.994859,1,--,--,--,0.687143
2023-12-29,33.715153,33.547646,33.00025,1,--,--,--,0.663571
2024-01-02,34.39571,33.586954,33.015084,1,--,--,--,0.695714
2024-01-03,34.726795,33.639796,33.032229,1,--,--,--,0.694286


STEP 4: IMPLEMENT STRATEGY

STEP 4A: TAKE PROFIT METHOD 1 - Signal reversal or indicator-based exit

In [135]:

# This take profit strategy exits positions based on either a signal reversal or a predefined indicator-based condition.
# Long trigger: Buy stock when current price > SA1 and when SA1 > SA2. Take profit when any of these conditions become false.
# Short trigger: Short stock when current price < SA1 and when SA1 < SA2. Take profit when any of these conditions become false.
# Only exit trades of any of the stated relationships for long and short positions become false.

# "ticker_data_a" stores the dataframe for each ticker which shows where buy/sell signals have been triggered. This will primarily be used for testing. 
dfa_all_ticker_data = []

def Signal_Reversal( df, direction, entry_col, exit_col, stop_loss_func, returns_dict, sl_type, mult  ):

    entry_price, exit_price = None, None
    en_val, ex_val = 1, 1
    
    for item in range(len(df)):

        prev_atr = first_atr_val if item == 0 else df.iloc[item - 1, -1]

        if df.iloc[item, entry_col] == 1 and entry_price is None:
            
            entry_price = df.iloc[item, 0]
            df.iloc[item, entry_col] = f"EN{en_val}"
            sl = stop_loss_func(entry_price, prev_atr, mult, direction) if sl_type == "ATR" else stop_loss_func(entry_price, mult, direction)
            df.iloc[item, exit_col] = sl
            en_val += 1

        if entry_price is not None:

            long_cond = ( df.iloc[item, 0] < df.iloc[item, 1] ) or ( df.iloc[item, 1] < df.iloc[item, 2] )
            short_cond = ( df.iloc[item, 0] > df.iloc[item, 1] ) or ( df.iloc[item, 1] > df.iloc[item, 2] )
            exit_cond = long_cond if direction == "Long" else short_cond
            current_price = df.iloc[ item, 0 ]
            sl_hit = current_price < sl if direction == "Long" else current_price > sl

            if exit_cond or sl_hit:
    
                exit_price = current_price
                ret = ((exit_price/entry_price) - 1) if direction == "Long" else ((exit_price/entry_price) - 1)*(-1)
                returns_a[ direction ].append( float(ret) )
        
                p_or_l = exit_price > entry_price if direction == "Long" else exit_price < entry_price
                df.iloc[item, entry_col] = f"TP{ex_val}" if p_or_l else f"CL{ex_val}"
                ex_val += 1
                entry_price, exit_price = None, None
        

returns_atr_mult_a, returns_sl_lim_a = [], []

for d in range(len(df_ticker)):
    
    returns_dfa = None
    
    for item in range(2):

        sl_type = "ATR" if item == 0 else "LIM"
        sl_function = calculate_stop_loss_ATR if item == 0 else calculate_stop_loss
        stop_loss_vals = atr_mult if item == 0 else sl_lim
        returns_dfa_temp = []

        for sl in range(len(stop_loss_vals)):

            all_returns_a = []

            # Returns is a dictionary used to store returns from both long and short positions
            returns_a = {"Long": [], "Short": []}
            
            # Making a copy of df so the original dataframe is not manipulated each time a different take profit method is tested.
            dfa = df_triggers[d].copy()
            first_atr_val = float(dfa.iloc[ 0, -1 ])
            dfa = dfa[1:]

            # Function call for short and long positions
            Signal_Reversal(dfa, "Long", 3, 4, sl_function, returns_a, sl_type, stop_loss_vals[sl] )
            Signal_Reversal(dfa, "Short", 5, 6, sl_function, returns_a, sl_type, stop_loss_vals[sl] )
            
            # Finding the number of trades for long and short positions
            long_len_a = len(returns_a["Long"])
            short_len_a = len(returns_a["Short"])
            
            # Finding mean return for long and short positions for full time frame
            long_mean_a = np.mean(returns_a["Long"]) if long_len_a > 0 else np.nan
            short_mean_a = np.mean(returns_a["Short"]) if short_len_a > 0 else np.nan
            
            # Finding mean return for a dollar-neutral portfolio (combining long and short positions)
            if long_len_a + short_len_a > 0:
                overall_return_a = ((long_mean_a * long_len_a if long_len_a > 0 else 0) + (short_mean_a * short_len_a if short_len_a > 0 else 0)) / (long_len_a + short_len_a)
            else:
                overall_return_a = np.nan
            
            dfa.columns.name = sl_type + ": " + str(stop_loss_vals[sl])
            dfa_all_ticker_data.append( dfa )
            dfa = None
            
            all_returns_a.append([
                round(long_mean_a, 6) if not np.isnan(long_mean_a) else np.nan,
                round(short_mean_a, 6) if not np.isnan(short_mean_a) else np.nan,
                round(overall_return_a, 6) if not np.isnan(overall_return_a) else np.nan
            ])
            
            returns_dfa = pd.DataFrame(
                all_returns_a,
                columns=["Long", "Short", "Overall"],
            )
            returns_dfa.columns.name = sl_type + ": " + str(stop_loss_vals[sl])
            returns_dfa.index.name = avail_ticker[d]
            returns_dfa_temp.append(returns_dfa)

        (returns_atr_mult_a if sl_type == "ATR" else returns_sl_lim_a).append(returns_dfa_temp)
            

dfa_all_ticker_data = [dfa_all_ticker_data[i:i + 2*len(sl_lim)] for i in range(0, len(dfa_all_ticker_data), 2*len(sl_lim))] 

combined_atr_a = sort_and_combine(returns_atr_mult_a, atr_mult)
combined_sl_a = sort_and_combine(returns_sl_lim_a, sl_lim)

columns = ["Long", "Short", "Overall"]
# "dfa_sorted_atr" and "dfa_sorted_sl" are Python dictionaries with 3 key-value pairs.
# Long, Short and Overall (Long and Short at the same time) are shown for a range of different ATR (average-true-range) values.
dfa_sorted_atr = sort_dfs(combined_atr_a, columns)
# Long, Short and Overall (Long and Short at the same time) are shown for a range of different SL (stop-loss) values.
dfa_sorted_sl = sort_dfs(combined_sl_a, columns)

#pd.set_option('display.max_colwidth', None)
print( "Done" )
dfa_sorted_atr["Long"]

Done


[ATR: 1.0      Long     Short   Overall
 GE        0.034358       NaN  0.034358
 JBHT      0.004758 -0.005493  0.001341
 FE        0.004645       NaN  0.004645
 F        -0.056351 -0.021962 -0.034244
 RIOT     -0.073056 -0.007240 -0.049123,
 ATR: 1.25      Long     Short   Overall
 GE         0.034358       NaN  0.034358
 JBHT       0.010336 -0.005493  0.004400
 FE         0.004645       NaN  0.004645
 F         -0.056351 -0.022656 -0.035616
 RIOT      -0.075984 -0.007240 -0.050986,
 ATR: 1.5      Long     Short   Overall
 GE        0.034358       NaN  0.034358
 JBHT      0.010336 -0.005493  0.004400
 FE        0.004645       NaN  0.004645
 F        -0.060547 -0.022656 -0.035286
 RIOT     -0.075880 -0.007240 -0.048424,
 ATR: 1.75      Long     Short   Overall
 GE         0.034358       NaN  0.034358
 JBHT       0.010336 -0.005493  0.004400
 FE         0.004645       NaN  0.004645
 F         -0.060547 -0.022656 -0.035286
 RIOT      -0.075880 -0.007240 -0.048424,
 ATR: 2.0      Long     

STEP 4B: TAKE PROFIT METHOD 2 - Time-based exit

In [136]:

# The exit time refers to the number of days after entering a position that you will close. 
# e.g. if exit-time = 3 then you will close three days after entry. 

# All returns shows the profitability of long-only, short-only and long/short positions for different time-based exits
# The chosen time-based exits are 1, 2, 5, 10, 20, 50, 100, 200
exit_times = [ 1, 2, 5, 10, 20, 50, 100, 200 ]
dfb_all_ticker_data = []


def Time_Based_Exit(df, direction, entry_col, exit_col, stop_loss_func, returns_dict, sl_type, mult, extime):
    entry_price, exit_price, entry_index = None, None, None
    n_val = 1
    holding = False  # Whether we're currently in a trade

    for item in range(len(df)):
        if not holding and df.iloc[item, entry_col] == 1:
            # Enter trade
            entry_price = df.iloc[item, 0]
            entry_index = item
            df.iloc[item, entry_col] = f"EN{n_val}"

            prev_atr = df.iloc[entry_index - 1, -1] if entry_index > 0 else df.iloc[entry_index, -1]
            sl = stop_loss_func(entry_price, prev_atr, mult, direction) if sl_type == "ATR" else stop_loss_func(entry_price, mult, direction)
            df.iloc[item, exit_col] = sl

            holding = True
            continue

        if holding:
            current_price = df.iloc[item, 0]
            sl_hit = current_price < sl if direction == "Long" else current_price > sl
            exit_day = entry_index + exit_times[extime]

            # Check stop loss or time-based exit
            if sl_hit or item == exit_day:
                exit_price = current_price
                #label = f"TP{n_val}" if ((exit_price > entry_price) if direction == "Long" else (exit_price < entry_price)) else f"SL{n_val}"
                label = f"SL{n_val}" if sl_hit else f"TP{n_val}" if ((exit_price > entry_price) if direction == "Long" else (exit_price < entry_price)) else f"CL{n_val}"

                df.iloc[item, entry_col] = label

                ret = (exit_price / entry_price - 1) if direction == "Long" else (1 - exit_price / entry_price)
                returns_dict[direction].append(ret)

                # Reset state
                entry_price, exit_price, entry_index = None, None, None
                holding = False
                n_val += 1


# Compute returns for each exit time [ 1, 2, 5, 10, 20, 50, 100, 200 ] using the ATR and LIMIT stop loss methods
# Test 1: Exit Times [ 1, 2, 5, 10, 20, 50, 100, 200 ] vs ATR [ 1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3.0 ].
# Test 2: Exit Times [ 1, 2, 5, 10, 20, 50, 100, 200 ] vs LIMIT [ 8, 10, 12, 14, 16, 18, 20, 22 ].
# This will be in the form of an 8 x 8 matrix for each test.
# Carry out both tests for each ticker in "avail_tickers".
returns_atr_mult_b, returns_sl_lim_b = [], []

for d in range(len(df_ticker)):

    dfb_current_ticker_data = []
    returns_dfb = None

    for item in range(2):
        
        sl_type = "ATR" if item == 0 else "LIM"
        sl_function = calculate_stop_loss_ATR if item == 0 else calculate_stop_loss
        stop_loss_vals = atr_mult if item == 0 else sl_lim
        returns_dfb_temp = []

        for sl in range(len(atr_mult)):

            all_returns_b = []

            for et in range(len(exit_times)):
            
                returns_b = {"Long": [], "Short": []}
                
                dfb = df_triggers[d].copy()
                first_atr_val = float(dfb.iloc[ 0, -1 ])
                dfb = dfb[1:]
                
                Time_Based_Exit( dfb, "Long", 3, 4, sl_function, returns_b, sl_type, stop_loss_vals[sl], et )
                Time_Based_Exit( dfb, "Short", 5, 6, sl_function, returns_b, sl_type, stop_loss_vals[sl], et )
            
                # Finding the number of trades for long and short positions
                long_len_b = len(returns_b["Long"])
                short_len_b = len(returns_b["Short"])

                # Finding mean return for long and short positions for full time frame          
                long_mean_b = np.mean(returns_b["Long"]) if long_len_b > 0 else np.nan
                short_mean_b = np.mean(returns_b["Short"]) if short_len_b > 0 else np.nan
                
                # Weighted average of all returns
                if long_len_b + short_len_b > 0:
                    overall_return_b = ((long_mean_b * long_len_b if long_len_b > 0 else 0) + (short_mean_b * short_len_b if short_len_b > 0 else 0)) / (long_len_b + short_len_b)
                else:
                    overall_return_b = np.nan

                dfb.columns.name = sl_type + ": " + str(stop_loss_vals[sl]) + "  --  ET: " + str(exit_times[et])
                dfb_current_ticker_data.append( dfb )
                dfb = None
            
                all_returns_b.append([
                    round(long_mean_b, 6) if not np.isnan(long_mean_b) else np.nan,
                    round(short_mean_b, 6) if not np.isnan(short_mean_b) else np.nan,
                    round(overall_return_b, 6) if not np.isnan(overall_return_b) else np.nan
                ])
        
            dfb_all_ticker_data.append( dfb_current_ticker_data )
            
            # Convert to DataFrame for better readability
            returns_dfb = pd.DataFrame(
                all_returns_b,
                columns=["Long", "Short", "Overall"],
                index=exit_times
            )
            returns_dfb.columns.name = sl_type + ": " + str(stop_loss_vals[sl])
            returns_dfb.index.name = avail_ticker[ d ]
            returns_dfb_temp.append(returns_dfb)
        (returns_atr_mult_b if sl_type == "ATR" else returns_sl_lim_b).append(returns_dfb_temp)

# Assuming returns_atr_mult has exactly 40 elements
dfb_all_ticker_data = [dfb_all_ticker_data[i:i + 2*len(sl_lim)] for i in range(0, len(dfb_all_ticker_data), 2*len(sl_lim))] 

def rearrange( returns_b ):

    returns_b_all_exit_times = []
    
    for ext_val in exit_times:
        current_ext_time = [[] for _ in range(5)]
        
        for ticker, ticker_data in enumerate(returns_b):
            for idx, data in enumerate(ticker_data):
                for ext_idx in range(len(data)):

                    current_val = data.iloc[[ext_idx]].copy()
                    col_name = current_val.columns.name
                    current_val.columns.name = col_name + "   Ext: " + str(current_val.index[0])
                    col_name = None
                    
                    if current_val.index[0] == ext_val:
                        current_ext_time[ticker].append(current_val)
        
        returns_b_all_exit_times.append(current_ext_time)
        
    return returns_b_all_exit_times

returns_atr_mult_b_all_ext_times = rearrange( returns_atr_mult_b )
returns_sl_lim_b_all_ext_times = rearrange( returns_sl_lim_b )

# There are 8 exit times so each of these lists should contain 8 elements (each index representing a dictionary for a particular exit time). 
# "dfb_exit_times_sorted_atr" and "dfb_exit_times_sorted_sl" are Python dictionaries.
# Long, Short and Overall (Long and Short at the same time) are shown for a range of different exit times and ATR (average-true-range) values.
dfb_exit_times_sorted_atr = {}
# Long, Short and Overall (Long and Short at the same time) are shown for a range of different exit times and SL (stop-loss) values.
dfb_exit_times_sorted_sl = {}

columns = ["Long", "Short", "Overall"]

for ex in range(len(exit_times)):

    dfa_sorted_atr, dfa_sorted_sl = None, None
    
    combined_atr_b = sort_and_combine( returns_atr_mult_b_all_ext_times[ex], atr_mult )
    combined_sl_b = sort_and_combine( returns_sl_lim_b_all_ext_times[ex], sl_lim )

    dfa_sorted_atr = sort_dfs(combined_atr_b, columns)
    dfa_sorted_sl = sort_dfs(combined_sl_b, columns)

    key_label = f"ext{exit_times[ex]}"
    dfb_exit_times_sorted_atr[ key_label ] =  dfa_sorted_atr
    dfb_exit_times_sorted_sl[ key_label ] = dfa_sorted_sl 

print("Done")

# Below the long returns for each stock are shown when a stop loss value of 0.08 is applied with an exit time of 10 days.
list((dfb_exit_times_sorted_sl.items()))[3][1]["Long"][0]

Done


LIM: 0.08 Ext: 10,Long,Short,Overall
GE,0.015453,,0.015453
JBHT,0.008685,-0.014775,-0.000975
FE,0.005177,0.026464,0.006241
F,-0.03051,-0.022374,-0.026203
RIOT,-0.062697,-0.005062,-0.02602


STEP 4C: TAKE PROFIT METHOD 3 - Fixed Profit Target ( e.g. +10% or +$100 )

In [137]:

# In this section, trades are closed once the price reaches a certain percentage above (long positions) or below (short positions) the entry price. 

# This function takes in the following input parameters.
# "df" is the dataframe containing the stock info.
# "direction" is the position type (long or short).
# "entry col" and "exit col" are the numbers of the columns in df that show the trade triggers and SL info for long and short positions.
# "stop_loss_func" calculates the stop loss for long and short positions.
# "threshold mult" indicates when a trade should be closed.
# "returns_dict" stores the mean return for long and short positions.
# "mult" is the ATR multiplier.

# "dfc_ticker_data" stores the dataframe for each ticker which shows where buy/sell signals have been triggered. This will primarily be used for testing.
dfc_all_ticker_data = []

def FixedProfitTarget( df, direction, entry_col, exit_col, stop_loss_func, threshold_mult, returns_dict, sl_type ,mult ):

    entry_price = exit_price = None
    n_val = 1
    
    for i in range(len(df)):
        
        prev_atr = first_atr_val if i == 0 else df.iloc[i - 1, -1]
        signal = df.iloc[i, entry_col]

        if signal == 1 and entry_price is None:
            entry_price = df.iloc[i, 0]
            df.iloc[i, entry_col] = f"EN{n_val}"

            if ( sl_type == "ATR" ):
                sl = stop_loss_func(entry_price, prev_atr, mult, direction)
            else:
                sl = stop_loss_func(entry_price, mult, direction)

            df.iloc[i, exit_col] = sl

        if entry_price is not None:
            current_price = df.iloc[i, 0]
            #next_price = df.iloc[i + 1, 0]

            # Define thresholds
            tp = entry_price * (1 + threshold_mult) if direction == "Long" else entry_price * (1 - threshold_mult)
            sl_hit = current_price < sl if direction == "Long" else current_price > sl
            #tp_hit_next = next_price > tp if direction == "Long" else next_price < tp
            tp_hit_now = current_price > tp if direction == "Long" else current_price < tp

            # if tp_hit_next:
            #     exit_price = next_price
            #     df.iloc[i + 1, entry_col] = f"TP{n_val}"
            if tp_hit_now:
                exit_price = current_price
                df.iloc[i, entry_col] = f"TP{n_val}"
            elif sl_hit:
                exit_price = current_price
                df.iloc[i, entry_col] = f"SL{n_val}"

            if exit_price is not None:
                ret = (exit_price / entry_price - 1) if direction == "Long" else ( (exit_price / entry_price - 1)*(-1) )
                returns_dict[direction].append(float(ret))
                entry_price = exit_price = None
                n_val += 1

        
returns_atr_mult_c, returns_sl_lim_c = [], []

for d in range( len(df_ticker) ):

    returns_dfc = None

    for item in range(2):
        
        sl_type = "ATR" if item == 0 else "LIM"
        sl_function = calculate_stop_loss_ATR if item == 0 else calculate_stop_loss
        stop_loss_vals = atr_mult if item == 0 else sl_lim
        returns_dfc_temp = []

        for sl in range(len(atr_mult)):

            all_returns_c = []

            # Returns is a dictionary used to store returns from both long and short positions
            returns_c = {"Long": [], "Short": []}

            # Making a copy of df so the original dataframe is not manipulated each time a different take profit method is tested.
            dfc = df_triggers[d].copy()
            first_atr_val = float(dfc.iloc[ 0, -1 ])
            dfc = dfc[1: ]
            
            # Processing returns for long and short positions
            # It may be worthwhile to investigate how different values for the threshold multiplier and ATR multiplier could improve profitability.
            # Also investigate a different method for stop loss calculation (e.g. 10%, 15%, or 20% above or below entry price).
            FixedProfitTarget( dfc, "Long", 3, 4, sl_function, 0.1, returns_c, sl_type, stop_loss_vals[sl] )
            FixedProfitTarget( dfc, "Short", 5, 6, sl_function, 0.1, returns_c, sl_type, stop_loss_vals[sl] )
            
            # Finding the number of trades for long and short positions
            long_len_c = len(returns_c["Long"])
            short_len_c = len(returns_c["Short"])
            
            # Finding mean return for long and short positions for full time frame
            long_mean_c = np.mean(returns_c["Long"]) if long_len_c > 0 else np.nan
            short_mean_c = np.mean(returns_c["Short"]) if short_len_c > 0 else np.nan
            
            # Finding mean return for a dollar-neutral portfolio (combining long and short positions)
            if long_len_c + short_len_c > 0:
                overall_return_c = ((long_mean_c * long_len_c if long_len_c > 0 else 0) + (short_mean_c * short_len_c if short_len_c > 0 else 0)) / (long_len_c + short_len_c)
            else:
                overall_return_c = np.nan

            dfc.columns.name = sl_type + ": " + str(stop_loss_vals[sl])
            dfc_all_ticker_data.append( dfc )
            dfc = None

            all_returns_c.append([
                round(long_mean_c, 6) if not np.isnan(long_mean_c) else np.nan,
                round(short_mean_c, 6) if not np.isnan(short_mean_c) else np.nan,
                round(overall_return_c, 6) if not np.isnan(overall_return_c) else np.nan
            ])
            
            returns_dfc = pd.DataFrame(
                all_returns_c,
                columns=["Long", "Short", "Overall"],
            )
            returns_dfc.columns.name = sl_type + ": " + str(stop_loss_vals[sl])
            returns_dfc.index.name = avail_ticker[d]
            returns_dfc_temp.append(returns_dfc)
            
        (returns_atr_mult_c if sl_type == "ATR" else returns_sl_lim_c).append(returns_dfc_temp)
            
dfc_all_ticker_data = [dfc_all_ticker_data[i:i + 2*len(sl_lim)] for i in range(0, len(dfc_all_ticker_data), 2*len(sl_lim))] 

combined_atr_c = sort_and_combine(returns_atr_mult_c, atr_mult)
combined_sl_c = sort_and_combine(returns_sl_lim_c, sl_lim)

columns = ["Long", "Short", "Overall"]
# "dfc_sorted_atr" and "dfc_sorted_sl" are Python dictionaries with 3 key-value pairs.
# Long, Short and Overall (Long and Short at the same time) are shown for a range of different ATR (average-true-range) values when a Fixed Profit Target method is applied.
dfc_sorted_atr = sort_dfs(combined_atr_c, columns)
# Long, Short and Overall (Long and Short at the same time) are shown for a range of different SL (stop-loss) values when a Fixed Profit Target method is applied.
dfc_sorted_sl = sort_dfs(combined_sl_c, columns)

print( "Done" )
dfc_sorted_sl["Short"]

Done


[LIM: 0.08      Long     Short   Overall
 RIOT      -0.035188 -0.009942 -0.022565
 F         -0.026347 -0.057175 -0.043162
 JBHT       0.009352 -0.080718 -0.008662
 FE         0.039923       NaN  0.039923
 GE         0.080530       NaN  0.080530,
           Long     Short   Overall
 RIOT -0.023844  0.004303 -0.007760
 RIOT -0.027117 -0.007423 -0.016690
 RIOT -0.033183 -0.010751 -0.021307
 RIOT -0.023844 -0.011226 -0.016634
 RIOT -0.047589 -0.024898 -0.035704
 F     0.003450 -0.046336 -0.027666
 F     0.003450 -0.051982 -0.031195
 F    -0.017444 -0.053367 -0.035405
 F     0.003450 -0.061490 -0.037137
 F     0.003450 -0.112543 -0.062832
 JBHT  0.003254 -0.113172 -0.020031
 JBHT  0.030798 -0.156725 -0.016083
 JBHT  0.026924 -0.156725 -0.018988
 JBHT  0.003710 -0.185645 -0.043629
 JBHT  0.003710 -0.185645 -0.043629
 FE    0.101985       NaN  0.101985
 FE    0.101985       NaN  0.101985
 FE    0.101985       NaN  0.101985
 FE    0.101985       NaN  0.101985
 FE    0.101985       NaN  0.1019

STEP 4D: TAKE PROFIT METHOD D - A dynamic stop loss that follows the price as it moves in your favour

In [138]:

dfd_all_ticker_data = []

def TrailingStop( df, direction, entry_col, exit_col, stop_loss_func, returns_dict, sl_type ,mult ):

    entry_price = exit_price = None
    h_l_price = 0
    n_val = 1
    
    for i in range(len(df)):
        
        prev_atr = first_atr_val if i == 0 else df.iloc[i - 1, -1]
        signal = df.iloc[i, entry_col]

        if signal == 1 and entry_price is None:
            entry_price = df.iloc[i, 0]
            h_l_price = entry_price
            df.iloc[i, entry_col] = f"EN{n_val}"

            sl = stop_loss_func(entry_price, prev_atr, mult, direction) if sl_type == "ATR" else stop_loss_func(entry_price, mult, direction)
            df.iloc[i, exit_col] = sl

        h_l_price_cond = df.iloc[i, 0] > h_l_price if direction == "Long" else df.iloc[i, 0] < h_l_price
        
        if h_l_price_cond and entry_price is not None:
             
            h_l_price = df.iloc[i, 0]  
            sl = stop_loss_func(h_l_price, prev_atr, mult, direction) if sl_type == "ATR" else stop_loss_func(h_l_price, mult, direction)
            df.iloc[i, exit_col] = sl

        if entry_price is not None:
            
            current_price = df.iloc[i, 0]

            # Define thresholds
            sl_hit = current_price < sl if direction == "Long" else current_price > sl
            sl_p_or_l = current_price < entry_price if direction == "Long" else current_price > entry_price

            if (sl_hit):

                exit_price = current_price
                
                if( sl_p_or_l ):
                    df.iloc[i, entry_col] = f"SL{n_val}"
                else:
                    df.iloc[i, entry_col] = f"TP{n_val}"
                    
            if exit_price is not None:
                ret = (exit_price / entry_price - 1) if direction == "Long" else ( (exit_price / entry_price - 1)*(-1) )
                returns_dict[direction].append(float(ret))
                entry_price = exit_price = None
                n_val += 1


returns_atr_mult_d, returns_sl_lim_d = [], []

for d in range(len(df_ticker)):
    
    returns_dfd = None
    
    for item in range(2):

        sl_type = "ATR" if item == 0 else "LIM"
        sl_function = calculate_stop_loss_ATR if item == 0 else calculate_stop_loss
        stop_loss_vals = atr_mult if item == 0 else sl_lim
        returns_dfd_temp = []

        for sl in range(len(atr_mult)):

            all_returns_d = []          

            # Returns is a dictionary used to store returns from both long and short positions
            returns_d = {"Long": [], "Short": []}

            # Making a copy of df so the original dataframe is not manipulated each time a different take profit method is tested.
            dfd = df_triggers[d].copy()
            first_atr_val = float(dfd.iloc[ 0, -1 ])
            dfd = dfd[1:]
            
            TrailingStop( dfd, "Long", 3, 4, sl_function, returns_d, sl_type, stop_loss_vals[sl] )
            TrailingStop( dfd, "Short", 5, 6, sl_function, returns_d, sl_type, stop_loss_vals[sl] )
            
            # Finding the number of trades for long and short positions
            long_len_d = len(returns_d["Long"])
            short_len_d = len(returns_d["Short"])
            
            # Finding mean return for long and short positions for full time frame
            long_mean_d = np.mean(returns_d["Long"]) if long_len_d > 0 else np.nan
            short_mean_d = np.mean(returns_d["Short"]) if short_len_d > 0 else np.nan
            
            # Finding mean return for a dollar-neutral portfolio (combining long and short positions)
            if long_len_d + short_len_d > 0:
                overall_return_d = ((long_mean_d * long_len_d if long_len_d > 0 else 0) + (short_mean_d * short_len_d if short_len_d > 0 else 0)) / (long_len_d + short_len_d)
            else:
                overall_return_d = np.nan

            dfd.columns.name = sl_type + ": " + str(stop_loss_vals[sl])
            dfd_all_ticker_data.append( dfd )
            dfd = None
            
            all_returns_d.append([
                round(long_mean_d, 6) if not np.isnan(long_mean_d) else np.nan,
                round(short_mean_d, 6) if not np.isnan(short_mean_d) else np.nan,
                round(overall_return_d, 6) if not np.isnan(overall_return_d) else np.nan
            ])
 
            returns_dfd = pd.DataFrame(
                all_returns_d,
                columns=["Long", "Short", "Overall"],
            )
            returns_dfd.columns.name = sl_type + ": " + str(stop_loss_vals[sl])
            returns_dfd.index.name = avail_ticker[d]
            returns_dfd_temp.append(returns_dfd)
            
        (returns_atr_mult_d if sl_type == "ATR" else returns_sl_lim_d).append(returns_dfd_temp)

dfd_all_ticker_data = [dfd_all_ticker_data[i:i + 2*len(sl_lim)] for i in range(0, len(dfd_all_ticker_data), 2*len(sl_lim))] 

combined_atr_d = sort_and_combine(returns_atr_mult_d, atr_mult)
combined_sl_d = sort_and_combine(returns_sl_lim_d, sl_lim)

columns = ["Long", "Short", "Overall"]
# "dfd_sorted_atr" and "dfd_sorted_sl" are Python dictionaries with 3 key-value pairs.
# Long, Short and Overall (Long and Short at the same time) are shown for a range of different ATR (average-true-range) values when a Dynamic Stop Loss method is applied.
dfd_sorted_atr = sort_dfs(combined_atr_d, columns)
# Long, Short and Overall (Long and Short at the same time) are shown for a range of different SL (stop-loss) values when a Dynamic Stop Loss method is applied.
dfd_sorted_sl = sort_dfs(combined_sl_d, columns)

print( "Done" )
dfd_sorted_sl["Long"]

Done


[LIM: 0.08      Long     Short   Overall
 GE         0.366222       NaN  0.366222
 FE         0.154843       NaN  0.154843
 JBHT      -0.006990 -0.054048 -0.022676
 RIOT      -0.045831 -0.008000 -0.022803
 F         -0.047561 -0.062073 -0.056631,
           Long     Short   Overall
 GE    0.722360       NaN  0.722360
 GE    0.683447       NaN  0.683447
 GE    0.633607       NaN  0.633607
 GE    0.344743       NaN  0.344743
 FE    0.116339       NaN  0.116339
 JBHT  0.016507 -0.078217 -0.030855
 JBHT -0.010866 -0.088130 -0.049498
 JBHT -0.032080 -0.061641 -0.041933
 F    -0.041295 -0.137837 -0.113701
 F    -0.041295 -0.146425 -0.120143
 F    -0.047585 -0.099067 -0.081906
 JBHT -0.049810 -0.080718 -0.065264
 F    -0.050704 -0.073419 -0.064901
 F    -0.052739 -0.111040 -0.087720
 RIOT -0.064850  0.014522 -0.020754
 RIOT -0.067988  0.001796 -0.033096
 RIOT -0.071174  0.054337 -0.018878
 RIOT -0.096023  0.014188 -0.040918
 RIOT -0.110780  0.014188 -0.048296
 JBHT -0.126972 -0.113172 -0.1200

STEP 5: STATISTICAL ARBITRAGE: Investing how a Dollar Neutral Zero-Cost Strategy could be applied.

In [139]:
# Dollar Neutral Zero-Cost Strategy: Simulataneously go long the top decile and short the bottom decile
long_stock_prices = [32, 60, 55, 40.5, 20, 12, 8, 90, 89, 28]
short_stock_prices = [14, 15, 67, 15, 71, 43, 55, 78, 99, 102]

# It = total investment level, Il = Investment in short stocks, Is = Investment in long stocks
Il = 600
Is = 400
It = Il + Is

wl = 1/(2*len(long_stock_prices))
ws = 1/(2*len(short_stock_prices))

num_long_shares = []
num_short_shares = []

def num_shares(a, b):

    x = Il*wl
    y = Is*ws

    for price in range(len(long_stock_prices)):
        
        num_long_shares.append( round( (x/a[price]), 3) )
        num_short_shares.append( round( (y/b[price]), 3) )

    return 

num_shares(long_stock_prices, short_stock_prices)
print("Investment level for each long stock: ", Il*wl)
print("Investment level for each short stock: ", Is*ws)
print(num_long_shares)
print(num_short_shares)


Investment level for each long stock:  30.0
Investment level for each short stock:  20.0
[0.938, 0.5, 0.545, 0.741, 1.5, 2.5, 3.75, 0.333, 0.337, 1.071]
[1.429, 1.333, 0.299, 1.333, 0.282, 0.465, 0.364, 0.256, 0.202, 0.196]
