In [None]:
import alpaca_trade_api as alpaca
from dotenv import load_dotenv
import os
import pandas as pd
import talib as ta
import matplotlib.pyplot as plt
import datetime as dt
import statistics

In [None]:
# initialize API from API keys in .env
load_dotenv()
api_key = os.environ['APCA-API-KEY-ID']
api_secret_key = os.environ['APCA-API-SECRET-KEY']
api_base_url = 'https://paper-api.alpaca.markets'
api = alpaca.REST(api_key, api_secret_key, api_base_url)

In [None]:
def get_SP500():
    return pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')[0]

def get_ticker_data(ticker):
    print(f"Getting {ticker} data...")
    days = [10, 20, 50, 100, 200]
    bars = api.get_bars(ticker, alpaca.TimeFrame.Day, '2016-01-01', adjustment='split').df
    for d in days:
        bars[f"{d} day SMA"] = ta.SMA(bars['close'], timeperiod=d)
        bars = bars[bars[f"{d} day SMA"].notna()]
        bars[f"close - {d} day SMA"] = bars['close'] - bars[f"{d} day SMA"]
    bars = bars.reset_index()
    bars = bars.rename(columns={'timestamp': 'date'})
    bars['date'] = pd.to_datetime(bars['date']).dt.date
    bars['ticker'] = ticker
    bars = bars[['ticker', 'date', 'close', '10 day SMA', 'close - 10 day SMA', 
                                            '20 day SMA', 'close - 20 day SMA', 
                                            '50 day SMA', 'close - 50 day SMA', 
                                            '100 day SMA', 'close - 100 day SMA', 
                                            '200 day SMA', 'close - 200 day SMA']]
    return bars

def write_SP500_data():
    frames = [get_ticker_data(ticker) for ticker in get_SP500()['Symbol']]
    pd.concat(frames).to_csv('backtesting_data.csv', index=False)

In [None]:
def add_business_days(start_date: dt.date, n: int) -> dt.date:
    weeks, extra_days = divmod(n, 5)
    result_date = start_date + dt.timedelta(weeks=weeks)

    if extra_days != 0:
        start_weekday = result_date.weekday()
        
        if start_weekday + extra_days > 4:
            extra_days += 2
        result_date += dt.timedelta(days=extra_days)
    return result_date

In [None]:
def SMA_slope(bars, days):
    time_period = 5
    return (bars.iloc[-1][f"{days} day SMA"] - bars.iloc[-1 - time_period][f"{days} day SMA"]) / time_period

def get_troughs(bars, days, n_troughs, radius):
    trough_dates = []
    bool_array = [False] * len(bars)
    sorted_bars = bars.sort_values(f"close - {days} day SMA")
    for i in range(len(sorted_bars)):
        close_prox = False
        row = sorted_bars.iloc[i]
        date = row['date']
        for tdate in trough_dates:
            lower_bound = add_business_days(tdate, -radius)
            upper_bound = add_business_days(tdate, radius)
            if date >= lower_bound and date <= upper_bound:
                close_prox = True
                break
        if close_prox:
            continue
        trough_dates.append(date)
        bool_array[i] = True
        if len(trough_dates) >= n_troughs:
            break
    return sorted_bars[bool_array].sort_index()

def get_trading_days(bars, start_day, end_day):
    dates = bars[bars['ticker'] == 'META']
    dates = dates[(dates['date'] >= start_day) & (dates['date'] <= end_day)]
    dates = dates['date'].to_list()
    date_to_index = {}
    for i in range(len(dates)):
        date_to_index[dates[i]] = i
    return (dates, date_to_index)

def back_test(SMA_days):

    timeframe = 100

    longterm_trend_SMA_days = 50
    longterm_SMA_slope_yearly_threshold = 0.2

    troughs_std_threshold = 0.01
    troughs_range_threshold = 0.04
    troughs_mean_adjustment = 0.01

    stability_percent_threshold = 0.02

    reward = 0.07
    risk = 0.05

    start_day = dt.date(2018, 1, 2)
    end_day = dt.date(2024, 11, 22)

    tickers = get_SP500()['Symbol']
    bars = pd.read_csv('backtesting_data.csv')
    bars['date'] = pd.to_datetime(bars['date']).dt.date # string to datetime.date object
    (dates, date_to_index) = get_trading_days(bars, start_day, end_day)
    
    n_open_positions = [0] * len(dates)
    n_positions_opened = [0] * len(dates)
    n_positions_closed_profit = [0] * len(dates)
    n_positions_closed_loss = [0] * len(dates)
    closed_profit = [0] * len(dates)
    closed_loss = [0] * len(dates)

    holding_durations = []

    tickers2 = ['NVDA', 'LLY', 'JPM', 'PGR', 'MCD', 'COST', 'WMT', 'GOOG', 'SPOT', 'BAC', 'GM', 'GE', 'CAT']
    tickers3 = ['LDOS']
    for ticker in tickers:
        print(f"Checking {ticker}...")
        holding_position = 0
        cost_basis = 0
        ticker_data = bars[bars['ticker'] == ticker]
        if len(ticker_data) == 0:
            continue

        first_index = ticker_data.iloc[0].name
        first_date = ticker_data.iloc[0]['date']
        hold_duration = 0
        for date in dates:
            index = date_to_index[date]
            if date < first_date:
                continue
            right_index = ticker_data.loc[ticker_data['date'] == date].index[0]
            left_index = right_index - timeframe
            if left_index < first_index:
                continue
            if holding_position == 1:
                hold_duration += 1
                current_price = ticker_data.loc[right_index]['close']
                if current_price > cost_basis * (1 + reward):
                    holding_position = 0
                    profit = (current_price / cost_basis) - 1
                    n_positions_closed_profit[index] += 1
                    closed_profit[index] += profit
                    holding_durations.append(hold_duration)
                    hold_duration = 0
                elif current_price < cost_basis * (1 - risk):
                    holding_position = 0
                    loss = (cost_basis - current_price) / cost_basis
                    n_positions_closed_loss[index] += 1
                    closed_loss[index] += loss
                    holding_durations.append(hold_duration)
                    hold_duration = 0
            else:
                current_price = ticker_data.loc[right_index]['close']
                timeframe_bars = ticker_data.loc[left_index:right_index - 1]
                longterm_SMA_slope_percentage = SMA_slope(timeframe_bars, longterm_trend_SMA_days) / current_price
                if longterm_SMA_slope_percentage * 365 < longterm_SMA_slope_yearly_threshold:
                    continue
                troughs = get_troughs(timeframe_bars, SMA_days, 3, 10)
                troughs_std = troughs[f"close - {SMA_days} day SMA"].std()
                troughs_mean = troughs[f"close - {SMA_days} day SMA"].mean()
                troughs_range = troughs[f"close - {SMA_days} day SMA"].max() - troughs[f"close - {SMA_days} day SMA"].min()
                last_sma = ticker_data.loc[right_index][f"{SMA_days} day SMA"]
                # if troughs_std / current_price > troughs_std_threshold:
                #     continue 
                if troughs_range / current_price > troughs_range_threshold:
                    continue 
                if current_price - last_sma > troughs_mean - current_price * troughs_mean_adjustment:
                    continue

                # check that the price remained stable for at least 2 days
                prev_price = timeframe_bars.iloc[-1]['close']
                prev2_price = timeframe_bars.iloc[-2]['close']
                if (abs(current_price - prev_price) / prev_price) > stability_percent_threshold:
                    continue
                if (abs(current_price - prev2_price) / prev2_price) > stability_percent_threshold:
                    continue

                holding_position = 1
                cost_basis = current_price
                n_positions_opened[index] += 1
            n_open_positions[index] += holding_position

            

    df = pd.DataFrame({
        'date':dates,
        'open positions': n_open_positions,
        'positions opened': n_positions_opened,
        'positions closed for profit': n_positions_closed_profit,
        'positions closed for loss': n_positions_closed_loss,
        'profit from closed positions': closed_profit,
        'loss from closed positions': closed_loss
    })
    df.to_csv('test.csv', index=False)
    print(f"cumulative profit: {df['profit from closed positions'].sum()}")
    print(f"cumulative loss: {df['loss from closed positions'].sum()}")
    print(f"roi: {df['profit from closed positions'].sum() -df['loss from closed positions'].sum()}")
    print(f"winrate: {df['positions closed for profit'].sum() / (df['positions closed for profit'].sum() + df['positions closed for loss'].sum())}")
    avg1 = df['profit from closed positions'].sum() / df['positions closed for profit'].sum()
    avg2 = df['loss from closed positions'].sum() / df['positions closed for loss'].sum()
    print(f"pl ratio: {avg1 / avg2}")
    print(f"total closed positions: {df['positions closed for loss'].sum() + df['positions closed for profit'].sum()}")
    print(f"average positions opened per day: {df['positions opened'].mean()}")
    print(f"average active positions: {df['open positions'].mean()}")
    print(f"average holding duration: {statistics.mean(holding_durations)}")

In [None]:
back_test(20)

In [None]:
df = pd.read_csv('test.csv')
df['date'] = pd.to_datetime(df['date']).dt.date
df = df[df['date'] > dt.date(2024, 1, 1)]
print(df['profit from closed positions'].sum())
print(df['loss from closed positions'].sum())