In [251]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import pandas as pd

def load_data(ticker):

    filename = '../data/' + ticker + '_1min_firstratedata.csv'
    df = pd.read_csv(filename)
    df['ticker'] = ticker

    df["timestamp"] = pd.to_datetime(df["timestamp"])
    df["date"] = df["timestamp"].dt.date
    df["time"] = df["timestamp"].dt.time
    return df

ticker = 'SPY'
df = load_data(ticker)

In [252]:
import datetime

def daily_summary(ticker):

    # Data Preprocessing
    filename = "../data/" + ticker + "_1min_firstratedata.csv"
    df = pd.read_csv(filename)
    df["ticker"] = ticker

    # time handling
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    df["Date"] = df["timestamp"].dt.date
    df["time"] = df["timestamp"].dt.time
    df["timestamp"] = pd.to_datetime(df["timestamp"], format="%Y-%m-%d %H:%M:%S")
    
    # Crop to market hours
    df = df[(df["time"] >= datetime.time(9, 30)) & (df["time"] <= datetime.time(16, 0))]

    # Get the earliest and latest times for each day
    # Find the indices of the earliest and latest rows for each day
    earliest_indices = df.groupby("Date")["time"].idxmin()
    latest_indices = df.groupby("Date")["time"].idxmax()

    # Concatenate the indices and drop duplicates
    indices_to_keep = pd.concat([earliest_indices, latest_indices]).drop_duplicates()

    # Create a new DataFrame with only the selected rows
    df = df.loc[indices_to_keep]
    df = df.sort_values(by=["Date"])
    df.drop(["timestamp", "high", "low"], axis=1, inplace=True)

    df = df.groupby(["Date", "time"]).filter(lambda group: len(group) != 2)
    
    open_rows = df[df["time"] == datetime.time(9, 30)][["Date", "open", "time"]]
    close_rows = df[df["time"] == datetime.time(16, 0)][["Date", "close", "time"]]

    merged_df = pd.merge(open_rows, close_rows, on=["Date"])
    merged_df["dif"] = merged_df["close"] - merged_df["open"]
    merged_df["dif"] = merged_df["dif"].round(2)

    merged_df["percent_change"] = (merged_df["dif"] / merged_df["open"]).round(4)
    merged_df["percent_change"] = merged_df["percent_change"] * 100

    merged_df["up"] = merged_df["dif"] > 0
    merged_df["up"] = merged_df["up"].astype(int)

    merged_df.to_csv("../temp/merged.csv", index=False)

    return merged_df

In [253]:
# Add the RSI indicator with period n to a df

def RSI(df, n):
    "function to calculate RSI"
    delta = df["close"].diff()
    delta = delta[1:]
    up, down = delta.copy(), delta.copy()
    up[up < 0] = 0
    down[down > 0] = 0
    df["up"] = up.round(4)
    df["down"] = down.round(4)
    AVG_Gain = df["up"].rolling(window=n).mean()
    AVG_Loss = abs(df["down"].rolling(window=n).mean())
    RS = AVG_Gain / AVG_Loss
    RSI = 100.0 - (100.0 / (1.0 + RS))
    df["RSI_14"] = RSI.round(4)
    df = df.drop(columns=["up", "down"])

    return df

df = RSI(df, 14)

In [254]:
import datetime
import pandas as pd

def simulate_trade_RSI(date, entry, exit_high, exit_low, max_trades, df):
    df_day = df[(df["date"] == date) & (df["time"] >= datetime.time(9, 30, 0)) & (df["time"] <= datetime.time(16, 0, 0))]
    trade_log = pd.DataFrame(columns=["Date", "Direction", "Entry", "Exit", "TimeEnter", "TimeExit", "Gain%", "Gain$", "ExitCondition"])
    trades, in_trade = 0, False

    for i in range(1, len(df_day)):
        entry_rsi = (df_day["RSI_14"].iloc[i - 1] < entry and df_day["RSI_14"].iloc[i] >= entry)

        exit_profit = (df_day["RSI_14"].iloc[i - 1] >= exit_high and df_day["RSI_14"].iloc[i] < exit_high)
        exit_stoploss = (df_day["RSI_14"].iloc[i - 1] <= exit_low and df_day["RSI_14"].iloc[i] > exit_low)
        exit_endofday = (df_day["time"].iloc[i] == datetime.time(16, 0, 0))
        exit_condition = "MaxProfit" if exit_profit else "StopLoss" if exit_stoploss else "EndOfDay" if exit_endofday else None

        if not in_trade and entry_rsi:
            entry_price, in_trade, time_enter = df_day["close"].iloc[i], True, df_day["time"].iloc[i - 1]

        elif in_trade and (exit_profit or exit_stoploss or exit_endofday):
            exit_price = df_day["close"].iloc[i]
            trade_gain_percent = round(((exit_price - entry_price) / entry_price) * 100, 2)
            trade_gain_net = round(exit_price - entry_price, 2)

            trade_log.loc[len(trade_log)] = pd.Series({
                "Date": date,
                "Direction": "Long",
                "Entry": entry_price,
                "Exit": exit_price,
                "TimeEnter": time_enter,
                "TimeExit": df_day["time"].iloc[i],
                "Gain%": trade_gain_percent,
                "Gain$": trade_gain_net,
                "ExitCondition": exit_condition
            })

            trades, entry_price, in_trade = trades + 1, 0, False

            # Check if the maximum number of trades is reached
            if trades >= max_trades:
                break

    return trade_log


In [255]:
# Combine the simulated trades for a range of dates into a single dataframe

def simulate_date_range(start_date, end_date, entry, exit_high, exit_low, max_trades, df):
    day_datas = []

    for n in range((end_date - start_date).days + 1):
        date = start_date + datetime.timedelta(n)
        day_data = simulate_trade_RSI(date, entry, exit_high, exit_low, max_trades, df)
        if day_data is not None:
            day_datas.append(day_data)

    day_datas = pd.concat(day_datas, ignore_index=True)

    return day_datas

In [256]:
trades = simulate_date_range(datetime.date(2022, 9, 30), datetime.date(2023, 9, 30), 50, 70, 30, 1, df)

In [257]:
# chart the results of the simulated trades

import plotly.express as px
import plotly.graph_objects as go

# line chart of running gain%
def chart_gain_percent(trades):
    trades["RunningGain%"] = trades["Gain%"].cumsum()
    fig = px.line(trades, y="RunningGain%", title="Running Gain%")
    fig.show()

chart_gain_percent(trades)

In [258]:
# chart a scatter of the sum of the gain% for each day against 

import plotly.express as px
import plotly.graph_objects as go

underlying = daily_summary(ticker)
print(underlying.columns)
print(trades.columns)

combined = pd.merge(trades, underlying, on=["Date"])

#print(combined.columns)
#combined.head()

fig = px.scatter(combined, x="percent_change", y="Gain%", title="Gain% vs Underlying Gain%")
fig.show()

Index(['Date', 'open', 'time_x', 'close', 'time_y', 'dif', 'percent_change',
       'up'],
      dtype='object')
Index(['Date', 'Direction', 'Entry', 'Exit', 'TimeEnter', 'TimeExit', 'Gain%',
       'Gain$', 'ExitCondition', 'RunningGain%'],
      dtype='object')


Unnamed: 0,Date,Direction,Entry,Exit,TimeEnter,TimeExit,Gain%,Gain$,ExitCondition,RunningGain%,open,time_x,close,time_y,dif,percent_change,up
0,2022-09-30,Long,362.6,364.1,09:32:00,10:38:00,0.41,1.5,MaxProfit,0.41,361.8,09:30:00,357.87,16:00:00,-3.93,-1.09,0
1,2022-10-03,Long,361.04,360.9199,09:44:00,09:55:00,-0.03,-0.12,MaxProfit,0.38,361.08,09:30:00,366.77,16:00:00,5.69,1.58,1
2,2022-10-04,Long,376.87,376.389,10:15:00,10:27:00,-0.13,-0.48,StopLoss,0.25,372.4,09:30:00,377.74,16:00:00,5.34,1.43,1
3,2022-10-05,Long,374.44,372.29,09:50:00,10:14:00,-0.57,-2.15,StopLoss,-0.32,373.39,09:30:00,377.54,16:00:00,4.15,1.11,1
4,2022-10-06,Long,376.41,376.86,09:30:00,09:34:00,0.12,0.45,MaxProfit,-0.2,375.62,09:30:00,373.21,16:00:00,-2.41,-0.64,0


In [259]:
trades
# histogram of the trade gain%


fig = px.histogram(trades, x="Gain%", nbins=100, title="Histogram of Trade Gain%")
fig.update_layout(
    xaxis_title="Gain%",
    yaxis_title="Count",
    bargap=0.1
)
fig.show()

In [260]:
print("Total Trades:", len(trades))
print("Total Gain%:", round(trades["Gain%"].sum(), 2))
print("Average Gain%:", round(trades["Gain%"].mean(), 5))
print("Median Gain%:", round(trades["Gain%"].median(), 2))

print("Average Gain$:", round(trades["Gain$"].mean(), 5))
print("Median Gain$:", round(trades["Gain$"].median(), 2))


trades["Gain%"].describe()

Total Trades: 251
Total Gain%: -2.21
Average Gain%: -0.0088
Median Gain%: 0.04
Average Gain$: -0.03904
Median Gain$: 0.14


count     251.00
unique     85.00
top         0.08
freq       11.00
Name: Gain%, dtype: float64

In [261]:
trades.to_csv('../temp/trades.csv', index=False)

trades["ExitCondition"].value_counts()

ExitCondition
MaxProfit    160
StopLoss      91
Name: count, dtype: int64

In [262]:
fig = px.histogram(trades, x="TimeEnter", nbins=50, title="Histogram of Time to Enter Trade")
fig.update_layout(
    xaxis_title="Time",
    yaxis_title="Count",
    bargap=0.1
)
fig.show()

In [263]:
# scatter plot of entry time vs gain%

fig = px.scatter(trades, x="TimeEnter", y="Gain%", title="Entry Time vs Gain%")
fig.update_layout(
    xaxis_title="Time",
    yaxis_title="Gain%",
    bargap=0.1
)

fig.show()

In [264]:
def rsi_ranged_backtest_single_pnl(df, rsi_enter, rsi_exit, maxTrades, ticker):
    res = pd.DataFrame()
    res.loc[0, 'Ticker'] = ticker
    res.loc[0, 'StartDate'] = df['Date'].min()
    res.loc[0, 'EndDate'] = df['Date'].max()
    res.loc[0, 'Entry'] = rsi_enter
    res.loc[0, 'Exit'] = rsi_exit,
    res.loc[0, 'MaxTrades'] = maxTrades,
    res.loc[0, 'TotalTrades'] = len(df)
    res.loc[0, 'TotalPercentGain'] = round(df['Gain%'].sum(), 2)
    res.loc[0, 'AveragePercentGain'] = round(df['Gain%'].mean(), 2)
    res.loc[0, 'MaxPercentGain'] = round(df['Gain%'].max(), 2)
    res.loc[0, 'MinPercentGain'] = round(df['Gain%'].min(), 2)

    return res

In [265]:
def multi_rsi_backtest(tickers, max_trades, indicator_pairs):
    results = pd.DataFrame(
        columns=[
            "Ticker","StartDate","EndDate","Entry","Exit","TotalTrades",
            "TotalPercentGain","AveragePercentGain","MaxPercentGain","MinPercentGain"])

    for e in indicator_pairs:
        for f in max_trades:
            for g in tickers:

                df = load_data(g)
                df = RSI(df, 14)

                print("testing TICKER/ENTER/EXIT/MAXTRADES:", g, e[0], e[1], f)
                trades = simulate_date_range(datetime.date(2022, 9, 30), datetime.date(2023, 9, 30), e[0], e[1], f, df)
                pnl = rsi_ranged_backtest_single_pnl(trades, e[0], e[1], f, g)
                results.loc[len(results)] = pnl.loc[0]

    results = results.sort_values(by=["TotalPercentGain"], ascending=False)
    return results