Momentum Trading Strategy implemented in Python on a 500 Stocks universe [Beginner friendly]
By AlgoVibes (Youtube)
https://www.youtube.com/watch?v=L2nhNvIAyBI

Description:<br>
Get list of S&P500 stocks<br>
Download price data for all<br>
Concatenate into one dataframe<br>
Calculate percent change<br>
Resample to monthly period<br>

To Do:<br>
    Use start_date and end_date in process_ticker function, but just to trim returned data.<br>
    Change benchmark to use equal weighted assets used by the strategy<br>
    Allow for holding 100% cash if all other assets have negative returns.<br>
    v3: Add trailing stop: try monthly, then daily<br>
        After stop triggers, don't get back in until next month<br>
        Alternatively, get back in after things go positive<br>

In [None]:
#Import libraries
import sys
import datetime as dt
import yfinance as yf
import pandas as pd
import numpy as np
from pandas.tseries.offsets import MonthEnd
%matplotlib inline
import quantstats as qs
qs.extend_pandas() # # extend pandas functionality with metrics, etc.
sys.path.append('/home/lantro/Documents/Algo Trading/Stock Price DB')
#from get_ticker_data_from_db_v2 import process_ticker
from StockPriceData import process_ticker

In [None]:
#Set variables 
data_folder = '/home/lantro/Documents/Algo Trading/Data'
start_date = '2020-01-01'#'1962-01-02'
end_date = '2022-08-23'
get_latest_data = True # Getting latest prices from DB is slow, so we can read previously created csv
tickers = ['AAPL','AMZN','NFLX','AMD','NVDA','MSFT','WMT','IBM','KO']
benchmark = ['^GSPC']


In [None]:
#NOTE: when we get price df, we may want to replace all values below a certain threshold with 0 so that our algo doesn'tickers
# trade them since they may appear to have great returns at times even though they're uninvestible at the time such as when they first list.
# We should replace small values from beginning of df until the price meets our threshold. We probably want to leave in prices when they go below our threshold later.
def get_prices(tickers):
    if get_latest_data == True:
        prices, symbols = [],[]
        for ticker in tickers:
            print(f'Processing {ticker}')
            try:
                #df = process_ticker(ticker,end_date)['Adj_Close']
                df = process_ticker(ticker,start_date,end_date)['Adj_Close']
                if not df.empty:
                    print(f'Appending {ticker} to prices and symbols lists')
                    prices.append(df)
                    symbols.append(ticker)
            except:
                print(f'Unable to download data for {ticker}')
            
        all_prices = pd.concat(prices,axis=1) #Concatenate all ticker price dfs to one df
        all_prices.columns = symbols # Rename column names based on tickers
        all_prices.to_csv(f"{data_folder}/all_sp500_prices.csv", index=True) #Index is date
    else:
        #all_prices = pd.read_csv(f"{data_folder}/all_sp500_prices.csv", index_col=[0], header=0, parse_dates=True) #Index is date
        all_prices = pd.read_csv(f"{data_folder}/all_sp500_prices.csv", index_col='Date', parse_dates=True)
        #all_prices.index = pd.to_datetime(all_prices.index) #Not necessary if dates are parsed properly
    return(all_prices)

all_prices = get_prices(tickers)
benchmark_prices = get_prices(benchmark)

In [None]:
benchmark_prices.tail()

In [None]:
#prices
#symbols
#all_prices.index
all_prices.head()
all_prices.tail(10)
#all_prices[all_prices > 0]
latest_date_for_all = all_prices.dropna().index[-1] #This is the latest date where all tickers have data
all_prices = all_prices.loc[:latest_date_for_all,:] #Trimming df to last date where all tickers had data
all_prices.tail()

In [None]:
# See  how much data we have for each ticker
for column in all_prices.columns[:-1]:
    prices = all_prices[column][all_prices[column]>0]
    print(f'{column}:  {prices.index[0]} - {prices.index[-1]}')

In [None]:
#Calculate monthly returns
if get_latest_data == True:
    all_mtl_ret = all_prices.pct_change().resample('M').agg(lambda x : (x + 1).prod() -1)
    all_mtl_ret['CASH'] = 0 # Add alternative for condition where everything is negative 
    all_mtl_ret.to_csv(f"{data_folder}/all_mtl_ret.csv", index=True) #Index is date
else:
    all_mtl_ret = pd.read_csv(f"{data_folder}/all_mtl_ret.csv", index_col='Date', parse_dates=True) #Index is date
# IMPORTANT: We need to remove NaN and inf values from percentage returns df since stocks that trade at very low prices or zero will have
# crazy return calculations when prices go from 0 to anything or vice versa.
all_mtl_ret = all_mtl_ret.replace([np.inf, -np.inf, np.nan], 0)
#Create a new prices df of only companies that have prices going back to the earliest date
#earliest_prices = all_prices.loc[:, all_prices.iloc[0] > 0]


In [None]:
all_mtl_ret.index
all_mtl_ret.tail()

In [None]:
# Instead  of using something like the S&P500 as benchmark, we'll use an average of the returns for our defined assets
#benchmark_returns = benchmark_prices.pct_change().dropna()
benchmark_returns = all_prices.pct_change().dropna().mean(axis=1)
#benchmark_monthly_returns = benchmark_prices.pct_change().resample('M').agg(lambda x : (x + 1).prod() -1)
benchmark_monthly_returns = all_mtl_ret.mean(axis=1)
#all_prices.tail().pct_change()#.dropna(axis=0)

In [None]:
benchmark_returns
benchmark_monthly_returns.head()

Put it all together into a function that takes a lookback period (instead of just 12 months)

In [None]:
def mom_long(all_mtl_ret, lookback):
    #Calculate rolling returns based on provided lookback period and return df
    #Loop though rolling return df and identify 50 winning stocks
    #Get return for following month for each winning stock
    #Append returns to list
    #Return the return for the series of returns 
    all_mtl_ret_lb = all_mtl_ret.rolling(lookback).agg(lambda x: (x+1).prod() - 1) #Calculate return for period
    #See https://stackoverflow.com/questions/67168187/cannot-called-a-function-using-agg-method-in-pandas
    all_mtl_ret_lb.dropna(inplace=True)
    rets = {}
    for row in range(len(all_mtl_ret_lb)-1): #Loop through all monthly periods & identify winners and losers
        #curr = all_mtl_ret_lb.iloc[row] # All stocks for period
        curr = all_mtl_ret_lb.iloc[row][all_mtl_ret_lb.iloc[row].ge(0)] # Only stocks with prices >= 0
        win = curr.nlargest(2) # Take the top 2 stocks
        win_ret = all_mtl_ret.loc[win.name + MonthEnd(1), win.index]
        win_mean = win_ret.mean() #Average return of top stocks for period
        rets[curr.name]=win_mean
    #print(f'rets: {rets}')
    ret_series = pd.Series(rets, dtype='float64')
    # Use QS to calculate strategy stats here or just return the return series and do it later.
    tot_ret = round(qs.stats.comp(ret_series),2)
    sharpe_ratio = round(qs.stats.sharpe(ret_series),2)
    max_dd = round(qs.stats.max_drawdown(ret_series),2)
    print(f'Lookback:  {lookback},  Tot Ret: {tot_ret},  Sharpe Ratio:  {sharpe_ratio},  Max DD:  {max_dd}')
    # Calculate returns manually for comparison
    tot_return = (pd.Series(rets, dtype='float64') + 1).prod() - 1
    cum_returns = (ret_series + 1).cumprod()
    #print(f'Tot Cum Return:  {round((cum_returns.iloc[-1])-1,2)}')
    return (ret_series)
    


In [None]:
# Calculate momentum results for several different lookback periods
monthly_periods = None#120 # Specify "None" if you don't want to roll over the entire dataset
lookback_periods = [3,6,12]
period_weights = [.3333, .3333, .3333]
# Improvement:  keep track of lookback period return series by adding to a dictionary, then we can reference later
# to compare different weightings for top x lookback periods. This will allow us to try compare returns for different weightings.
# (period 1 return * weight) + (period 2 return * wieght), etc
'''You can compute a weighted average by multiplying its relative proportion or percentage by its value in sequence and
 adding those sums together. Thus, if a portfolio is made up of 55% stocks, 40% bonds, and 5% cash, those weights would be
  multiplied by their annual performance to get a weighted average return.'''

#all_mtl_ret_limited = all_mtl_ret.loc['2000-09-01':].copy()
all_mtl_ret_limited = all_mtl_ret.iloc[:monthly_periods,:] # Trim df required number of periods (optional) 
print(f'Returns for {len(all_mtl_ret_limited)} months ({all_mtl_ret_limited.index[0].date()} - {all_mtl_ret_limited.index[-1].date()})')
lookback_returns = {} # Dictionary to hold df of return series for specific lookback period.
for lookback in lookback_periods:
    lookback_returns[lookback] = mom_long(all_mtl_ret_limited, lookback)
    #print(f'lookback_returns[{lookback}] last date: {lookback_returns[lookback].index[-1]}')
    

In [None]:
weighted_returns_index = all_mtl_ret_limited.index #Create new df from index of existing to hold weighted returns
weighted_returns = pd.DataFrame(index = weighted_returns_index)
weighted_returns.head()

In [None]:
index = 0
for period_return in lookback_returns.keys():
    lookback = lookback_periods[index]
    print(f'Period weight for {lookback} period lookback:  {period_weights[index]}')
    weighted_return = lookback_returns[period_return]*period_weights[index]
    #print(f'weighted_return for {lookback} period lookback:  {weighted_return}')
    weighted_returns[lookback] = weighted_return
    index+=1
weighted_returns = weighted_returns.replace([np.inf, -np.inf, np.nan], 0)
returns = weighted_returns.sum(axis=1) # Add up weighted returns to get total return


In [None]:
weighted_returns.tail(20)

In [None]:
returns.head(11)


In [None]:
# Pass returns and benchmark to QuantStats to get return metrics
#qs.reports.metrics(returns, '^GSPC', mode='basic')
qs.reports.metrics(returns, benchmark_monthly_returns, mode='basic') # Compared returns to benchmark
#qs.reports.metrics(returns, lookback_returns[3], mode='basic') #Compared weighted returns to one of the lookback periods

In [None]:
qs.reports.plots(returns, benchmark_monthly_returns, mode='full')
#qs.reports.plots(returns, lookback_returns[3], mode='full')