In [4]:
# Import libraries
import pandas as pd
import pandas_ta as ta
import numpy as np
from IPython.display import clear_output
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import seaborn as sns

In [5]:
# Vars
timeframe = "H1"
symbols = ["GBP_JPY"]
systems = ["Strat"]
starting_balance = 100
risk_per_trade = 0.02 # Risking 2% per trade
market_open_time = "09:00"
opening_range_hours = 5
latest_entry_time = "22:00"
trade_direction = "long"
timezone = "Europe/London"

In [6]:
# Load CSV File
def get_price_data(symbol):
    df = pd.read_csv(f"../data/{symbol}_{timeframe}.csv", parse_dates=['Datetime'], index_col='Datetime')
    df.index = pd.to_datetime(df.index, utc=True).tz_convert(timezone)  # Keeps local timezone without converting to UTC
    return df

In [7]:
def calculate_inputs(df):
  
    # Ensure the Datetime column is a datetime object
    df.index = pd.to_datetime(df.index)
    df['Date'] = df.index.date
    df['Time'] = df.index.time
    df['Last_Candle'] = df['Date'] != df['Date'].shift(-1)

    df['ATR'] = ta.atr(df['High'], df['Low'], df['Close'], length=14)

    # Calculate the opening range end time
    opening_range_start_time = (pd.Timestamp.combine(pd.Timestamp.today(), pd.to_datetime(market_open_time, format="%H:%M").time()) 
                            - pd.Timedelta(hours=opening_range_hours)).time()

    # Filter the DataFrame for the opening range period
    df_open_range = df[
        (df['Time'] >= opening_range_start_time) & 
        (df['Time'] < pd.to_datetime(market_open_time, format="%H:%M").time())
    ]

    # Group by each trading day and calculate the high and low for that range
    opening_range = df_open_range.groupby('Date').agg(
        Open_Range_High=('High', 'max'),
        Open_Range_Low=('Low', 'min')
    )

    # Merge the opening range back to the original DataFrame without resetting the index
    df = df.join(opening_range, on='Date')

    # Drop the 'Time' and 'Date' columns if no longer needed
    df = df.drop(['Time', 'Date'], axis=1)

    return df

In [9]:
def generate_signals(df, s, atr_sl, tp_ratio):
    # Get hour values
    df["Hour"] = df.index.hour
    
    # Various entry conditions
    c1_long = (df['Open'] <= df['Open_Range_High']) & (df['Close'] > df['Open_Range_High'])  # Breakout above opening range (long signal)
    c1_short = (df['Open'] >= df['Open_Range_Low']) & (df['Close'] < df['Open_Range_Low'])  # Breakout below opening range (short signal)
    
    c2 = df.index.time >= pd.to_datetime(market_open_time, format='%H:%M').time()
    c3 = df.index.time <= pd.to_datetime(latest_entry_time, format='%H:%M').time()
    c4 = df['Open_Range_High'].notna() # Check that the current candle actually has a range. The sunday candles on CFDs can open in the evening
    c5 = (df['High'] - df['Low']) < df['ATR'] * 2.5
    c6 = (df['High'] - df['Low']) > df['ATR'] * 1.0

    #generate entries and exits
    # Entries depend on the strategy
    if s == "Strat":
        if trade_direction == "long":
            # default entry rules
            df[f"{s}_Signal"] = c1_long.shift(1) & c2 & c3 & c4 & c5.shift(1) & c6.shift(1)
        elif trade_direction == "short":
            df[f"{s}_Signal"] = c1_short.shift(1) & c2 & c3 & c4 & c5.shift(1) & c6.shift(1)

    # Generate exits
    if trade_direction == "long":
        nominal_stop_dist = df['Open'] - df['Open_Range_Low']
        df['SL'] = df['Open'] - nominal_stop_dist * atr_sl
        stop_dist = df['Open'] - df['SL']
        df['TP'] = df['Open'] + stop_dist * tp_ratio
    elif trade_direction == "short":
        nominal_stop_dist = df['Open_Range_High'] - df['Open']
        df['SL'] = df['Open'] + nominal_stop_dist * atr_sl
        stop_dist = df['SL'] - df['Open']
        df['TP'] = df['Open'] - stop_dist * tp_ratio
   
    return df

In [10]:

def generate_trades(df, s):
    #create empty list for trades
    trades_list = []
    trade_open = False
    open_change = {}
    balance = starting_balance
    equity = starting_balance
    balance_history = []
    equity_history = []
    
    #iterate through rows to work out entries and exits
    for i, row in df.iterrows():
        #if there is currently no trade
        if not trade_open:
            #check if long signal was generated and enter at the same time
            if row[f"{s}_Signal"] == True:
                entry_date = i
                entry_price = row.Open
                sl = row.SL
                tp = row.TP
                # Calculate position size based on risk percentage
                risk_amount = balance * risk_per_trade
                if entry_price == sl:
                    position_size = 0.01
                else:
                    position_size = risk_amount / abs(entry_price - sl)  # Position size in units
                trade_open = True
        #check if a trade is already open
        if trade_open:
            #get price values
            low = row.Low
            high = row.High

            if trade_direction == "long":
                # Calculate unrealized PnL
                floating_pnl = (high - entry_price) * position_size
                equity = balance + floating_pnl  # Update equity dynamically

                # Calculate unrealized PnL
                #check if stop is hit
                if low <= sl:
                    #first check for a gap down
                    if row.Open <= sl:
                        exit_price = row.Open
                    else:
                        exit_price = sl
                    trade_open = False

                # Now do the same check for take profit
                elif high >= tp:
                    #first check for a gap up
                    if row.Open >= tp:
                        exit_price = row.Open
                    else:
                        exit_price = tp
                    trade_open = False

            elif trade_direction == "short":
                floating_pnl = (entry_price - low) * position_size
                equity = balance + floating_pnl  # Update equity dynamically

                # Calculate unrealized PnL
                #check if stop is hit
                if high >= sl:
                    #first check for a gap down
                    if row.Open >= sl:
                        exit_price = row.Open
                    else:
                        exit_price = sl
                    trade_open = False

                # Now do the same check for take profit
                elif low <= tp:
                    #first check for a gap up
                    if row.Open <= tp:
                        exit_price = row.Open
                    else:
                        exit_price = tp
                    trade_open = False

            if not trade_open: # If trade has been closed
                exit_date = i
                trade_open = False

                if trade_direction == "long":   
                    pnl = (exit_price - entry_price) * position_size  # PnL in currency terms
                elif trade_direction == "short":
                    pnl = -1 * (exit_price - entry_price) * position_size # PnL in currency terms
                balance += pnl  # Update balance with PnL
                

                #store trade data in a list
                trade = [entry_date, entry_price, exit_date, exit_price, position_size, pnl, balance, True]
                #append trade to overall trade list
                trades_list.append(trade)

        # Store balance and equity
        balance_history.append(balance)
        equity_history.append(equity)

    trades = pd.DataFrame(trades_list, columns=["Entry_Date", "Entry_Price", "Exit_Date", "Exit_Price", "Position_Size", "PnL", "Balance", "Sys_Trade"])
    
    #calculate return of each trade as well as the trade duration
    trades[f"{s}_Return"] = trades.Balance / trades.Balance.shift(1)
    dur = []
    for i, row in trades.iterrows():
        d1 = row.Entry_Date
        d2 = row.Exit_Date
        dur.append(np.busday_count(d1.date(), d2.date()) + 1)#Add 1 because formula doesn't include the end date otherwise
    
    trades[f"{s}_Duration"] = dur

    #create a new dataframe with an index of exit dfs
    returns = pd.DataFrame(index=trades.Exit_Date)
    #create a new dataframe with an index of entries to track entry price
    entries = pd.DataFrame(index=trades.Entry_Date)

    entries[f"{s}_Entry_Price"] = pd.Series(trades.Entry_Price).values
    #add the Return column to this new data frame
    returns[f"{s}_Ret"] = pd.Series(trades[f"{s}_Return"]).values
    returns[f"{s}_Trade"] = pd.Series(trades.Sys_Trade).values
    returns[f"{s}_Duration"] = pd.Series(trades[f"{s}_Duration"]).values
    returns[f"{s}_PnL"] = pd.Series(trades.PnL).values
    returns[f"{s}_Balance"] = pd.Series(trades.Balance).values
    change_ser = pd.Series(open_change, name=f"{s}_Change")

    #add the returns from the trades to the main data frame
    df = pd.concat([df, returns, entries, change_ser], axis=1)
    #fill all the NaN return values with 1 as there was no profit or loss on those days
    df[f"{s}_Ret"] = df[f"{s}_Ret"].fillna(1)
    #fill all the NaN trade values with False as there was no trade on those days
    df[f"{s}_Trade"] = df[f"{s}_Trade"].infer_objects(copy=False)
    #fill all the NaN return values with 1 as there was no loss on those days
    df[f"{s}_Change"] = df[f"{s}_Change"].astype(float).fillna(1)
    
    #use the updated balance and equity variables
    df[f"{s}_Bal"] = pd.Series(balance_history, index=df.index).ffill()
    df[f"{s}_Equity"] = pd.Series(equity_history, index=df.index).ffill()

    active_trades = np.where(df[f"{s}_Trade"] == True, True, False)
    df[f"{s}_In_Market"] = df[f"{s}_Trade"].copy()
    #populate trades column based on duration
    for count, t in enumerate(active_trades):
        if t == True:
            dur = df[f"{s}_Duration"].iat[count]
            for i in range(int(dur)):
                #starting from the exit date, move backwards and mark each trading day
                df[f"{s}_In_Market"].iat[count - i] = True
    
    return df, trades

In [11]:
def backtest(price, atr_sl, tp_ratio):
    #calculate strategy inputs
    price = calculate_inputs(price)

    for s in systems:
        #generate signals
        price = generate_signals(price, s, atr_sl, tp_ratio)

        #generate trades
        price, trades = generate_trades(price, s)

    for s in systems:
        #calculate drawdown
        price[f"{s}_Peak"] = price[f"{s}_Bal"].cummax()
        price[f"{s}_DD"] = price[f"{s}_Bal"] - price[f"{s}_Peak"]

    return price, trades

In [12]:
results = []
prog = 0
sl_range = np.arange(1.0, 1.5, 0.5)
tp_range = np.arange(1.5, 2.0, 0.5)
max_prog = len(symbols) * len(sl_range) * len(tp_range)
for sym in symbols:
    price = get_price_data(sym)
    for atr_sl in sl_range:
        for tp_ratio in tp_range:
            result, trades = backtest(price, atr_sl, tp_ratio)
            results.append(result)
            prog += 1
            clear_output(wait=True)
            print(f"Progress: {round((prog / max_prog) * 100)} %")

FileNotFoundError: [Errno 2] No such file or directory: '../data/GBP_JPY_H1.csv'