In [1]:
import os
# Change the working directory to strategy_lab
os.chdir("/Users/sugang/Desktop/projects/2024coin/strategy_lab")


import pandas as pd
import numpy as np
from handle_candle import resample_df
from datetime import datetime
from performance_utils import get_performance


In [2]:


def backtest(df, lookback_period):
    df = df.copy()
    # Calculate rolling high and low
    df[f"high_{lookback_period}"] = df["high"].rolling(window=lookback_period).max()
    df[f"low_{lookback_period}"] = df["low"].rolling(window=lookback_period).min()
    
    # Initialize the signal column with default value 0
    df["signal"] = 0
    
    # Generate buy signal: 1 when price breaks above the high
    df.loc[df["close"] > df[f"high_{lookback_period}"].shift(1), "signal"] = 1
    
    # Generate sell signal: -1 when price breaks below the low
    df.loc[df["close"] < df[f"low_{lookback_period}"].shift(1), "signal"] = -1
    
    # Initialize the position column with default value 0
    df["position"] = 0
    
    # Variable to track the current position status
    current_position = 0
    
    # Iterate over the rows and set the position
    for i in range(1, len(df)):
        if df.loc[i-1, "signal"] == 1:  # Enter long position
            if df.loc[i, "signal"] == -1:
                df.loc[i, "position"] = 1
                current_position = 0
                continue
            elif df.loc[i, "signal"] == 1:
                df.loc[i, "signal"] = 0
            current_position = 1
        elif df.loc[i, "signal"] == -1:  # Exit position
            df.loc[i, "position"] = current_position
            current_position = 0
            continue
        elif current_position == 1:
            df.loc[i, "signal"] = 0
        df.loc[i, "position"] = current_position
    
    # Calculate the strategy returns (only when in a long position)
    df["strategy_returns"] = df["position"] * df["close"].pct_change()
    df["strategy_returns2"] = df["strategy_returns"]
    
    # Adjust for trading fees (buy with 0.2% fee, sell with 0.2% fee)
    df["buy_price"] = df["close"].shift(1) * np.where(df["signal"].shift(1) == 1, 1.002, 1)
    df["sell_price"] = df["close"] * np.where(df["signal"] == -1, 0.998, 1)
    
    # Calculate strategy returns with fees
    df["strategy_returns2"] = np.where(df["position"] == 1, df["sell_price"] / df["buy_price"] - 1, 0)
    
    # Calculate the cumulative returns
    df["cumulative_returns"] = (1 + df["strategy_returns"]).cumprod()
    df["cumulative_returns2"] = (1 + df["strategy_returns2"]).cumprod()
    performance = get_performance(df)
    df.to_csv(f"2_trading_range_breakouts/temp_{lookback_period}.csv")
    return {"strategy": f"trb_{lookback_period}_v1", "stoploss": None, **performance}

def backtest_stoploss(df, lookback_period, stop_loss_pct):
    df = df.copy()
    # Calculate rolling high and low
    df[f"high_{lookback_period}"] = df["high"].rolling(window=lookback_period).max()
    df[f"low_{lookback_period}"] = df["low"].rolling(window=lookback_period).min()
    
    # Initialize the signal column with default value 0
    df["signal"] = 0
    
    # Generate buy signal: 1 when price breaks above the high
    df.loc[df["close"] > df[f"high_{lookback_period}"].shift(1), "signal"] = 1
    
    # Generate sell signal: -1 when price breaks below the low
    df.loc[df["close"] < df[f"low_{lookback_period}"].shift(1), "signal"] = -1
    
    # Initialize the position column with default value 0
    df["position"] = 0
    df["highest_price"] = np.nan
    
    # Variable to track the current position status
    current_position = 0
    
    # Iterate over the rows and set the position
    for i in range(1, len(df)):
        if df.loc[i-1, "signal"] == 1:  # Enter long position
            df.loc[i, "highest_price"] = max(df.loc[i-1, "close"], df.loc[i, "close"])
            if df.loc[i, "signal"] == -1:
                df.loc[i, "position"] = 1
                current_position = 0
                continue
            elif df.loc[i, "signal"] == 1:
                df.loc[i, "signal"] = 0
                
            if df.loc[i, "close"] <= df.loc[i, "highest_price"] * (1 - stop_loss_pct):
                df.loc[i, "position"] = 1
                df.loc[i, "signal"] = -1
                current_position = 0
                continue
            current_position = 1
        elif df.loc[i, "signal"] == -1 and current_position == 1:  # Exit position
            df.loc[i, "highest_price"] = max(df.loc[i-1, "highest_price"], df.loc[i, "close"])
            df.loc[i, "position"] = current_position
            current_position = 0
            continue
        
        elif current_position == 1:  # Check current_position instead of df position
            df.loc[i, "highest_price"] = max(df.loc[i-1, "highest_price"], df.loc[i, "close"])
            df.loc[i, "signal"] = 0
            if df.loc[i, "close"] <= df.loc[i, "highest_price"] * (1 - stop_loss_pct):
                df.loc[i, "position"] = 1
                df.loc[i, "signal"] = -1
                current_position = 0
                continue
        
        if current_position == 0 and df.loc[i, "close"] > df.loc[i, f"high_{lookback_period}"]:
            df.loc[i, "signal"] = 1
        df.loc[i, "position"] = current_position
    
    # Calculate the strategy returns (only when in a long position)
    df["strategy_returns"] = df["position"] * df["close"].pct_change()
    df["strategy_returns2"] = df["strategy_returns"]
    
    # Adjust for trading fees (buy with 0.2% fee, sell with 0.2% fee)
    df["buy_price"] = df["close"].shift(1) * np.where(df["signal"].shift(1) == 1, 1.002, 1)
    df["sell_price"] = df["close"] * np.where(df["signal"] == -1, 0.998, 1)
    
    # Calculate strategy returns with fees
    df["strategy_returns2"] = np.where(df["position"] == 1, df["sell_price"] / df["buy_price"] - 1, 0)
    
    # Calculate the cumulative returns
    df["cumulative_returns"] = (1 + df["strategy_returns"]).cumprod()
    df["cumulative_returns2"] = (1 + df["strategy_returns2"]).cumprod()
    performance = get_performance(df)
    df.to_csv(f"2_trading_range_breakouts/temp_stoploss_{lookback_period}_{stop_loss_pct}_v1.csv")
    return {"strategy": f"trb_{lookback_period}_v1", "stoploss": stop_loss_pct, **performance}
    

def backtest_benchmark(df):
    df = df.copy()
    df["benchmark_returns2"] = df["close"].pct_change()
    df["cumulative_returns2"] = (1 + df["benchmark_returns2"]).cumprod()
    performance = get_performance(df)
    
    return {"strategy": "benchmark", "stoploss": None, **performance}


In [3]:
def backtest_v2(df, lookback_period):
    df = df.copy()
    # Calculate rolling high and low
    df[f"high_{lookback_period}"] = df["high"].rolling(window=lookback_period).max()
    df[f"low_{lookback_period}"] = df["low"].rolling(window=lookback_period).min()
    
    # Initialize the signal column with default value 0
    df["signal"] = 0
    
    # Generate buy signal: 1 when price breaks above the high
    df.loc[df["high"] > df[f"high_{lookback_period}"].shift(1), "signal"] = 1
    
    # Generate sell signal: -1 when price breaks below the low
    df.loc[df["close"] < df[f"low_{lookback_period}"].shift(1), "signal"] = -1
    
    # Initialize the position column with default value 0
    df["position"] = 0
    
    # Variable to track the current position status
    current_position = 0
    
    # Iterate over the rows and set the position
    for i in range(1, len(df)):
        if df.loc[i-1, "signal"] == 1:  # Enter long position
            if df.loc[i, "signal"] == -1:
                df.loc[i, "position"] = 1
                current_position = 0
                continue
            elif df.loc[i, "signal"] == 1:
                df.loc[i, "signal"] = 0
            current_position = 1
        elif df.loc[i, "signal"] == -1:  # Exit position
            df.loc[i, "position"] = current_position
            current_position = 0
            continue

        elif current_position == 1:  # Check current_position instead of df position
            df.loc[i, "signal"] = 0
        df.loc[i, "position"] = current_position
    
    # Calculate the strategy returns (only when in a long position)
    df["strategy_returns"] = df["position"] * df["close"].pct_change()
    df["strategy_returns2"] = df["strategy_returns"]
    
    # Adjust for trading fees (buy with 0.2% fee, sell with 0.2% fee)
    df["buy_price"] = np.where(df["signal"].shift(1) == 1, df[f"high_{lookback_period}"].shift(1) * 1.002, df["close"].shift(1))
    df["sell_price"] = df["close"] * np.where(df["signal"] == -1, 0.998, 1)
    
    # Calculate strategy returns with fees
    df["strategy_returns2"] = np.where(df["position"] == 1, df["sell_price"] / df["buy_price"] - 1, 0)
    
    # Calculate the cumulative returns
    df["cumulative_returns"] = (1 + df["strategy_returns"]).cumprod()
    df["cumulative_returns2"] = (1 + df["strategy_returns2"]).cumprod()
    performance = get_performance(df)
    df.to_csv(f"2_trading_range_breakouts/temp_v2_{lookback_period}.csv")
    return {"strategy": f"trb_{lookback_period}_v2", "stoploss": None, **performance}

def backtest_stoploss_v2(df, lookback_period, stop_loss_pct):
    df = df.copy()
    # Calculate rolling high and low
    df[f"high_{lookback_period}"] = df["high"].rolling(window=lookback_period).max()
    df[f"low_{lookback_period}"] = df["low"].rolling(window=lookback_period).min()
    
    # Initialize the signal column with default value 0
    df["signal"] = 0
    
    # Generate buy signal: 1 when price breaks above the high
    df.loc[df["high"] > df[f"high_{lookback_period}"].shift(1), "signal"] = 1
    
    # Generate sell signal: -1 when price breaks below the low
    df.loc[df["close"] < df[f"low_{lookback_period}"].shift(1), "signal"] = -1
    
    # Initialize the position column with default value 0
    df["position"] = 0
    df["highest_price"] = np.nan
    
    # Variable to track the current position status
    current_position = 0
    
    # Iterate over the rows and set the position
    for i in range(1, len(df)):
        if df.loc[i-1, "signal"] == 1:  # Enter long position
            df.loc[i, "highest_price"] = max(df.loc[i-1, "close"], df.loc[i, "close"])
            if df.loc[i, "signal"] == -1:
                df.loc[i, "position"] = 1
                current_position = 0
                continue
            elif df.loc[i, "signal"] == 1:
                df.loc[i, "signal"] = 0
                
            if df.loc[i, "close"] <= df.loc[i, "highest_price"] * (1 - stop_loss_pct):
                df.loc[i, "position"] = 1
                df.loc[i, "signal"] = -1
                current_position = 0
                continue
            current_position = 1
        elif df.loc[i, "signal"] == -1 and current_position == 1:  # Exit position
            df.loc[i, "highest_price"] = max(df.loc[i-1, "highest_price"], df.loc[i, "close"])
            df.loc[i, "position"] = current_position
            current_position = 0
            continue
        
        elif current_position == 1:  # Check current_position instead of df position
            df.loc[i, "highest_price"] = max(df.loc[i-1, "highest_price"], df.loc[i, "close"])
            df.loc[i, "signal"] = 0
            if df.loc[i, "close"] <= df.loc[i, "highest_price"] * (1 - stop_loss_pct):
                df.loc[i, "position"] = 1
                df.loc[i, "signal"] = -1
                current_position = 0
                continue
        
        if current_position == 0 and df.loc[i, "high"] > df.loc[i, f"high_{lookback_period}"]:
            df.loc[i, "signal"] = 1
        df.loc[i, "position"] = current_position
    
    # Calculate the strategy returns (only when in a long position)
    df["strategy_returns"] = df["position"] * df["close"].pct_change()
    df["strategy_returns2"] = df["strategy_returns"]
    
    # Adjust for trading fees (buy with 0.2% fee, sell with 0.2% fee)
    df["buy_price"] = np.where(df["signal"].shift(1) == 1, df[f"high_{lookback_period}"].shift(1) * 1.002, df["close"].shift(1))
    df["sell_price"] = df["close"] * np.where(df["signal"] == -1, 0.998, 1)
    
    # Calculate strategy returns with fees
    df["strategy_returns2"] = np.where(df["position"] == 1, df["sell_price"] / df["buy_price"] - 1, 0)
    
    # Calculate the cumulative returns
    df["cumulative_returns"] = (1 + df["strategy_returns"]).cumprod()
    df["cumulative_returns2"] = (1 + df["strategy_returns2"]).cumprod()
    performance = get_performance(df)
    df.to_csv(f"2_trading_range_breakouts/temp_stoploss_{lookback_period}_{stop_loss_pct}_v2.csv")
    return {"strategy": f"trb_{lookback_period}_v2", "stoploss": stop_loss_pct, **performance}
    

def backtest_benchmark(df):
    df = df.copy()
    df["benchmark_returns"] = df["close"].pct_change()
    df["cumulative_returns2"] = (1 + df["benchmark_returns"]).cumprod()
    performance = get_performance(df)
    
    return {"strategy": "benchmark", "stoploss": None, **performance}

# # Parameters
# lookback_periods = [50, 150, 200]
# stop_loss_pcts = [0.03, 0.05, 0.1, 0.2]
# hours = range(0, 24, 1)

# # Store results in a list
# results = []
# # Read the data
# df = pd.read_csv("temp.csv", index_col=0)
# df_copy = df.copy()
# execution_time = datetime.strptime(f"00:00", "%H:%M")
# df_copy = resample_df(df_copy, execution_time=execution_time)
# print(backtest_benchmark(df_copy))
# print(backtest_v2(df_copy, 50))
# print(backtest_stoploss_v2(df_copy, 50, 0.03))


In [4]:


def backtest_v3(df, lookback_period):
    df = df.copy()
    # Calculate rolling high and low
    df[f"high_{lookback_period}"] = df["high"].rolling(window=lookback_period).max()
    df[f"low_{lookback_period}"] = df["low"].rolling(window=lookback_period).min()
    
    # Initialize the signal column with default value 0
    df["signal"] = 0
    
    # Generate buy signal: 1 when price breaks above the high
    df.loc[df["high"] > df[f"high_{lookback_period}"].shift(1), "signal"] = 1
    
    # Generate sell signal: -1 when price breaks below the low
    df.loc[df["low"] < df[f"low_{lookback_period}"].shift(1), "signal"] = -1
    
    # Initialize the position column with default value 0
    df["position"] = 0
    
    # Variable to track the current position status
    current_position = 0
    
    # Iterate over the rows and set the position
    for i in range(1, len(df)):
        if df.loc[i-1, "signal"] == 1:  # Enter long position
            if df.loc[i, "signal"] == -1:
                df.loc[i, "position"] = 1
                current_position = 0
                continue
            elif df.loc[i, "signal"] == 1:
                df.loc[i, "signal"] = 0
            current_position = 1
        elif df.loc[i, "signal"] == -1:  # Exit position
            df.loc[i, "position"] = current_position
            current_position = 0
            continue
        elif current_position == 1:
            df.loc[i, "signal"] = 0
        df.loc[i, "position"] = current_position
    
    # Calculate the strategy returns (only when in a long position)
    df["strategy_returns"] = df["position"] * df["close"].pct_change()
    df["strategy_returns2"] = df["strategy_returns"]
    
    # Adjust for trading fees (buy with 0.2% fee, sell with 0.2% fee)
    df["buy_price"] = np.where(df["signal"].shift(1) == 1, df[f"high_{lookback_period}"].shift(1) * 1.002, df["close"].shift(1))
    df["sell_price"] = np.where((df["signal"] == -1) & (df["low"] < df[f"low_{lookback_period}"]), df[f"low_{lookback_period}"] * 0.998, df["close"])
    
    # Calculate strategy returns with fees
    df["strategy_returns2"] = np.where(df["position"] == 1, df["sell_price"] / df["buy_price"] - 1, 0)
    
    # Calculate the cumulative returns
    df["cumulative_returns"] = (1 + df["strategy_returns"]).cumprod()
    df["cumulative_returns2"] = (1 + df["strategy_returns2"]).cumprod()
    performance = get_performance(df)
    df.to_csv(f"2_trading_range_breakouts/temp_v3_{lookback_period}.csv")
    return {"strategy": f"trb_{lookback_period}_v3", "stoploss": None, **performance}

def backtest_stoploss_v3(df, lookback_period, stop_loss_pct):
    df = df.copy()
    # Calculate rolling high and low
    df[f"high_{lookback_period}"] = df["high"].rolling(window=lookback_period).max()
    df[f"low_{lookback_period}"] = df["low"].rolling(window=lookback_period).min()
    
    # Initialize the signal column with default value 0
    df["signal"] = 0
    
    # Generate buy signal: 1 when price breaks above the high
    df.loc[df["high"] > df[f"high_{lookback_period}"].shift(1), "signal"] = 1
    
    # Generate sell signal: -1 when price breaks below the low
    df.loc[df["low"] < df[f"low_{lookback_period}"].shift(1), "signal"] = -1
    
    # Initialize the position column with default value 0
    df["position"] = 0
    df["highest_price"] = np.nan
    
    # Variable to track the current position status
    current_position = 0
    
    # Iterate over the rows and set the position
    for i in range(1, len(df)):
        if df.loc[i-1, "signal"] == 1:  # Enter long position
            df.loc[i, "highest_price"] = max(df.loc[i-1, "close"], df.loc[i, "close"])
            if df.loc[i, "signal"] == -1:
                df.loc[i, "position"] = 1
                current_position = 0
                continue
            elif df.loc[i, "signal"] == 1:
                df.loc[i, "signal"] = 0
                
            if df.loc[i, "close"] <= df.loc[i, "highest_price"] * (1 - stop_loss_pct):
                df.loc[i, "position"] = 1
                df.loc[i, "signal"] = -1
                current_position = 0
                continue
            current_position = 1
        elif df.loc[i, "signal"] == -1 and current_position == 1:  # Exit position
            df.loc[i, "highest_price"] = max(df.loc[i-1, "highest_price"], df.loc[i, "close"])
            df.loc[i, "position"] = current_position
            current_position = 0
            continue
        
        elif current_position == 1:  # Check current_position instead of df position
            df.loc[i, "highest_price"] = max(df.loc[i-1, "highest_price"], df.loc[i, "close"])
            df.loc[i, "signal"] = 0
            if df.loc[i, "close"] <= df.loc[i, "highest_price"] * (1 - stop_loss_pct):
                df.loc[i, "position"] = 1
                df.loc[i, "signal"] = -1
                current_position = 0
                continue
        
        if current_position == 0 and df.loc[i, "high"] > df.loc[i, f"high_{lookback_period}"]:
            df.loc[i, "signal"] = 1
        df.loc[i, "position"] = current_position
    
    # Calculate the strategy returns (only when in a long position)
    df["strategy_returns"] = df["position"] * df["close"].pct_change()
    df["strategy_returns2"] = df["strategy_returns"]
    
    # Adjust for trading fees (buy with 0.2% fee, sell with 0.2% fee)
    df["buy_price"] = np.where(
        df["signal"].shift(1) == 1, 
        df[f"high_{lookback_period}"].shift(1) * 1.002, 
        df["close"].shift(1)
    )
    df["sell_price"] = np.where(
        (df["signal"] == -1) & (df["low"] < df[f"low_{lookback_period}"]),
        df[f"low_{lookback_period}"] * 0.998,
        np.where(
            (df["signal"] == -1) & (df["close"] <= df["highest_price"] * (1 - stop_loss_pct)),
            df["close"] * 0.998, 
            df["close"]
        )
    )
    
    # Calculate strategy returns with fees
    df["strategy_returns2"] = np.where(df["position"] == 1, df["sell_price"] / df["buy_price"] - 1, 0)
    
    # Calculate the cumulative returns
    df["cumulative_returns"] = (1 + df["strategy_returns"]).cumprod()
    df["cumulative_returns2"] = (1 + df["strategy_returns2"]).cumprod()
    performance = get_performance(df)
    df.to_csv(f"2_trading_range_breakouts/temp_stoploss_{lookback_period}_{stop_loss_pct}_v3.csv")
    return {"strategy": f"trb_{lookback_period}_v3", "stoploss": stop_loss_pct, **performance}
    

def backtest_benchmark(df):
    df = df.copy()
    df["benchmark_returns"] = df["close"].pct_change()
    df["cumulative_returns2"] = (1 + df["benchmark_returns"]).cumprod()
    performance = get_performance(df)
    
    return {"strategy": "benchmark", "stoploss": None, **performance}

# # Parameters
# lookback_periods = [50, 150, 200]
# stop_loss_pcts = [0.03, 0.05, 0.1, 0.2]
# hours = range(0, 24, 1)

# # Store results in a list
# results = []
# # Read the data
# df = pd.read_csv("temp.csv", index_col=0)
# df_copy = df.copy()
# execution_time = datetime.strptime(f"00:00", "%H:%M")
# df_copy = resample_df(df_copy, execution_time=execution_time)
# print(backtest_benchmark(df_copy))
# print(backtest_v3(df_copy, 50))
# print(backtest_stoploss_v3(df_copy, 50, 0.03))

In [5]:
# Parameters
lookback_periods = [50, 150, 200]
stop_loss_pcts = [0.03, 0.05, 0.1, 0.2]
hours = range(0, 24, 1)

# Store results in a list
results = []
# Read the data
df = pd.read_csv("temp.csv", index_col=0)
df_copy = df.copy()

# Run backtests and collect results
results.append(backtest_benchmark(df))
for hour in hours:
    df_copy = df.copy()
    execution_time = datetime.strptime(f"{hour:02d}:00", "%H:%M")
    df_copy = resample_df(df_copy, execution_time=execution_time)
    for lookback_period in lookback_periods:
        results.append({
            "execution_time": execution_time.strftime("%H:%M"),
            **backtest(df_copy, lookback_period)
        })
        results.append({
            "execution_time": execution_time.strftime("%H:%M"),
            **backtest_v2(df_copy, lookback_period)
        })
        results.append({
            "execution_time": execution_time.strftime("%H:%M"),
            **backtest_v3(df_copy, lookback_period)
        })
        for stop_loss_pct in stop_loss_pcts:
            results.append({
                "execution_time": execution_time.strftime("%H:%M"),
                **backtest_stoploss(df_copy, lookback_period, stop_loss_pct)
            })
            results.append({
                "execution_time": execution_time.strftime("%H:%M"),
                **backtest_stoploss_v2(df_copy, lookback_period, stop_loss_pct)
            })
            results.append({
                "execution_time": execution_time.strftime("%H:%M"),
                **backtest_stoploss_v3(df_copy, lookback_period, stop_loss_pct)
            })


In [6]:
# Convert results to DataFrame and save to CSV
results_df = pd.DataFrame(results)
results_df.to_csv("2_trading_range_breakouts/results.csv", index=False)
print("Results saved to results.csv")

Results saved to results.csv
