## Resampling 1min data to wanted timeframe

In [None]:
import os
import pandas as pd
import config as c

def prepare_price_data(
    csv_path: str,  # 輸入 CSV 檔案完整路徑
    datasource: str = 'bybit_btcusdt',
    factor: str = 'price',
    timeframe: str = '1D', 
    delay_minutes: int = 0
):
    """
    讀取 1m 資料，轉成指定的時間週期 (timeframe)，
    可選擇延遲(正值)或提前(負值)時間索引，並自動存檔到當前工作目錄。

    參數：
      csv_path      : 【完整路徑】輸入的 1m 級別 CSV 檔案
      datasource    : 資料來源名稱 (如 bybit_btcusdt)
      factor        : 影響因子名稱 (如 price)
      timeframe     : 轉換後的時間週期，如 '1H'、'1D' 等 (預設 '1D')
      delay_minutes : 時間平移的分鐘數 (正值 = 延後；負值 = 提前)

    回傳：
      pandas DataFrame (resampled 後的結果)，
      並將結果輸出為 CSV，命名格式：
      {datasource}_{factor}_{timeframe}_{start_time}_{end_time}.csv
    """
    # 1. 讀取 CSV，解析時間
    df = pd.read_csv(
        csv_path, 
        parse_dates=['Time']  # pandas 會自動解析時間格式
    )

    # 2. 將 'Time' 欄設為索引
    df.set_index('Time', inplace=True)

    # 3. 時間平移 (延遲 / 提前)
    if delay_minutes != 0:
        df.index = df.index + pd.Timedelta(minutes=delay_minutes)

    # 4. 定義 resample 聚合方式
    ohlc_dict = {
        'Open': 'first',
        'High': 'max',
        'Low': 'min',
        'Close': 'last',
        'Volume': 'sum',
        'Turnover': 'sum'
    }
    
    # 5. 進行 resample
    df_resampled = df.resample(timeframe).agg(ohlc_dict).dropna(how='any')

    # Use Time to create one more column named 'start_time' that is in unix timestamp
    df_resampled['start_time'] = df_resampled.index.astype('int64') // 10**6
    # df_resampled['start_time'] = df_resampled['start_time'].astype('float64')

    # 6. 獲取開始與結束時間 (格式 YYYY-MM-DD)
    if not df_resampled.empty:
        start_time = df_resampled.index[0].strftime('%Y-%m-%d')
        end_time = df_resampled.index[-1].strftime('%Y-%m-%d')

        # 7. 構建輸出檔案名稱
        output_filename = f"./data/resample_{datasource}_{timeframe}.csv"
        output_path = os.path.join(os.getcwd(), output_filename)  # 當前工作目錄

        # 8. 輸出 CSV
        df_resampled.to_csv(output_path)
        print(f"✅ 檔案已儲存：{output_path}")
    else:
        print("⚠️ Resampled DataFrame 為空，未產生輸出檔案！")

    return df_resampled

df_r = prepare_price_data(
    csv_path="./data/bybit_btcusdt_price_1m_2020-01-01.csv",
    datasource='bybit_btc',
    factor='price',
    timeframe=c.candle_timeframe,
    delay_minutes=c.candle_delay
)

print(df_r.head()) 

In [None]:
from utilsnumpy import load_data, data_processing
import config as c

unselected_df = load_data(c.candle_file, c.factor_file)
df = unselected_df[["Time","start_time", "Close", c.factor]].copy()

df[["Time","start_time", "Close", c.factor]].head(10)

## Data Visualization of Raw Data

In [None]:
from utilsnumpy import load_data, data_processing
import matplotlib.pyplot as plt
import config as c

unselected_df = load_data(c.candle_file, c.factor_file)
df = unselected_df[["start_time", "Close", c.factor]].copy()
df.columns = ["start_time", "close", c.factor]
df = data_processing(df, "diff", c.factor)
# df = data_processing(df, "cbrt", factor)

# Visualize the raw data of factor do not need close price
fig, ax1 = plt.subplots(figsize=(15, 8))
ax1.plot(df['start_time'], df[c.factor], label=c.factor, color='green', linewidth=2)
ax1.set_xlabel("Date", fontsize=12)
ax1.set_ylabel(c.factor, fontsize=12, color='green')
ax1.tick_params(axis='y', labelcolor='green')
# Add title and grid
plt.title(f"Raw Data of {c.factor}", fontsize=16)
fig.tight_layout()  # Adjust layout to prevent overlap
plt.grid(True)
plt.show()

## Split Train Backtest + show heatmap of Split backtest

In [None]:
from tqdm import tqdm
from itertools import product
from utilsnumpy import backtest , load_data, load_single_data, combine_factors
import pandas as pd
import numpy as np
import plotly.express as px
import matplotlib.pyplot as plt
import seaborn as sns
import json
import config as c


annualizer_dict = {
    '1m': 525600,  # 1-minute intervals in a year
    '5m': 105120,  # 5-minute intervals in a year
    '15m': 35040,  # 15-minute intervals in a year
    '30m': 17520,  # 30-minute intervals in a year
    '1h': 8760,    # 1-hour intervals in a year
    '4h': 2190,    # 4-hour intervals in a year
    '1d': 365,     # 1-day intervals in a year
    '1w': 52,      # 1-week intervals in a year
    '1M': 12       # 1-month intervals in a year
}

all_backtest_results = []
plot_data = []

# ✅ 第三階段（細跑）：從手動記錄讀取特定 models & entrys
# ✅ 自動分類 models & entrys
def parse_manual_selection(filepath, all_models):
    """
    解析手動記錄的文件，根據 ALL_MODELS 自動分類 model & entry（並維持對應關係）。
    :param filepath: 手動記錄檔案的路徑
    :param all_models: 所有可用 models（用來辨識哪些是 model）
    :return: 字典 { model: [entry1, entry2, ...] }
    """
    with open(filepath, "r") as file:
        lines = [line.strip() for line in file.readlines() if line.strip()]  # 去除空行

    models_entrys = {}
    current_model = None

    for line in lines:
        if line in all_models:  # 如果是已知 model，則開啟新的 entry 清單
            current_model = line
            models_entrys[current_model] = []
        elif current_model:  # 如果是 entry，則加到當前的 model
            models_entrys[current_model].append(line)

    return models_entrys

def main(data1, data2, data3, factor, factor2, interval, operation,preprocess, model, entry, window_end, window_step, threshold_end, threshold_step):
    # Load data
    unselected_df = load_data(data1, data2)
    # Select wanted data column
    df = unselected_df[["Time", "start_time", "Close", factor]].copy()
    # rename data column
    df.columns = ["Time", "start_time", "close", factor]

    # Load data3 if operation sign provided
    if operation != 'none':
        df1 = load_single_data(data3, factor2)
        # Merge df and df1
        df = pd.merge_asof(df, df1.sort_values('start_time'), on="start_time", direction="nearest")
        df, new_column_name = combine_factors(df, factor, factor2, operation)
        factor = new_column_name

    # Check NaNs
    num_nans = df[factor].isna().sum().sum()
    total_rows = len(df)

    if num_nans > total_rows * 0.03:
        print(f"Skipping factor {factor}: {num_nans} NaNs exceed 3% Threshold.")
        return
    else:
        df.dropna(inplace=True)

    # metrics setting
    window_start= 5
    threshold_start = 0.0
    annualizer = annualizer_dict.get(interval, None) # Day data, so 365
    train_split = annualizer * 3
    
    backtest_report = []

    # Split data into train and test sets (Train set: 3Year, Test set use remaining data)
    df_train = df[:train_split].reset_index(drop=True).copy()
    # df_test = df[train_split:].reset_index(drop=True).copy()

    # backtest
    for rolling_window in range(window_start, window_end, window_step):
        for threshold in np.arange(threshold_start, threshold_end, threshold_step):
            backtest_report.append(backtest(df_train, rolling_window, threshold, preprocess, entry, annualizer, model, factor, interval, "sr"))
    all_backtest_results.append(backtest_report)
    
    # Extract pivot table for SR to plot heatmap
    backtest_df = pd.DataFrame(backtest_report)
    plot_data.append((model, entry, backtest_df))

def plot_heatmaps(sr_threshold=1.5):
    for model, entry, backtest_df in plot_data:
        # ✅ Dynamically set SR threshold based on entry name
        if 'short' in entry:
            srthreshold = 1.2
        elif 'long' in entry:
            srthreshold = sr_threshold
        else: 
            srthreshold = sr_threshold

        # ✅ Optimized pivot using groupby instead of pivot
        sr_pivot_data = backtest_df.groupby(['rolling_window', 'threshold'])['SR'].mean().unstack()

        # ✅ Check if the entire heatmap is NaN
        if sr_pivot_data.isna().all().all():
            print(f"⚠️ Skipping {model}_{entry} heatmap: All SR values are NaN.")
            continue  # Skip plotting

        # ✅ Check if there is at least one SR > threshold
        if not np.any(sr_pivot_data.to_numpy() > srthreshold):
            print(f"⚠️ Skipping {model}_{entry} heatmap: No SR value exceeds {srthreshold}.")
            continue  # Skip plotting

        plt.figure(figsize=(18, 14))  # ✅ Reduced figure size for faster rendering
        sns.heatmap(sr_pivot_data, annot=True, fmt=".2f", cmap="RdYlGn", linewidths=0.3, cbar_kws={'label': 'Sharpe Ratio'})
        plt.xticks(ticks=range(len(sr_pivot_data.columns)), labels=[f"{col:.2f}" for col in sr_pivot_data.columns], rotation=45)
        plt.yticks(ticks=range(len(sr_pivot_data.index)), labels=[f"{row:.2f}" for row in sr_pivot_data.index], rotation=0)
        plt.title(f"{model}_{c.preprocess}_{entry} Train Period BackTest SR Heatmap", fontsize=14)
        plt.show()  # ✅ Display the plot
        # 儲存 SR Heatmap
        if c.save_plot == True:
            plt.savefig(f"{model}_{entry}_heatmap", dpi=300, bbox_inches='tight')
            print(f"已儲存 {model}_{entry}_heatmap.png")
        plt.close()  # ✅ Free memory after each plot

# ✅ 選擇模式：第一步 or 第三步
USE_ALL_MODELS = True  # ✅ 設定為 True → 第一步（完整跑），設定為 False → 第三步（僅跑選擇的組合）

if USE_ALL_MODELS:
    models = c.ALL_MODELS
    entry_map = {model: c.ALL_ENTRYS for model in c.ALL_MODELS}  # 每個 model 跑所有 entries
    window_step = 20  # 第一階段用較大步長
    threshold_step = 0.2
else:
    entry_map = parse_manual_selection("manual_selected.txt", c.ALL_MODELS)  # 讀取手動篩選
    models = list(entry_map.keys())  # 取得所有模型
    window_step = 10  # 第三階段用較小步長
    threshold_step = 0.1

total_combinations = sum(len(entry_map[model]) for model in models)

# ✅ 加入 `tqdm` 進度條（顯示每個 `model` 和 `entry`）
with tqdm(total=total_combinations, desc="🔍 Backtesting Strategies", unit="strategy", leave=True) as pbar:
    for model in models:
        for entry in entry_map[model]:  # ✅ 只運行該 model 相關的 entries
            pbar.set_postfix({"Model": model, "Entry": entry})  # ✅ 動態顯示當前 `model` & `entry`
            main(
                c.candle_file,
                c.factor_file,
                c.factor2_file,
                c.factor,
                c.factor2,
                c.interval,
                c.operation,
                c.preprocess,
                model,
                entry,
                window_end=301,
                window_step=window_step,
                threshold_end=3.51,
                threshold_step=threshold_step
            )
            pbar.update(1)  # ✅ 每完成一個組合，更新進度條

# ✅ Plot all heatmaps that SR > 1.5after backtesting
plot_heatmaps(1.5)
   
# 生成一個json file 儲存全部 models 和 entry 的 metrics
# output_filename = f"{factor}_{interval}_split_backtest.json" 
# output_backtest_data = {"backtests": all_backtest_results}
# with open(output_filename, "w") as json_file:
#     json.dump(output_backtest_data, json_file, indent=4)

## Split Forward Testing Period

In [None]:
from utilsnumpy import backtest , load_data, load_single_data, combine_factors
import pandas as pd
import numpy as np
import plotly.express as px
import matplotlib.pyplot as plt
import seaborn as sns
import talib
import math
import json
import config as c

annualizer_dict = {
    '1m': 525600,  # 1-minute intervals in a year
    '5m': 105120,  # 5-minute intervals in a year
    '15m': 35040,  # 15-minute intervals in a year
    '30m': 17520,  # 30-minute intervals in a year
    '1h': 8760,    # 1-hour intervals in a year
    '4h': 2190,    # 4-hour intervals in a year
    '1d': 365,     # 1-day intervals in a year
    '1w': 52,      # 1-week intervals in a year
    '1M': 12       # 1-month intervals in a year
}

def main(data1, data2, data3, factor, factor2, interval, operation, preprocess, model, entry, window, threshold):
    # Merge Data
    unselected_df = load_data(data1, data2)
    # Select wanted data column
    df = unselected_df[["Time", "start_time", "Close", factor]].copy()
    # rename data column
    df.columns = ["Time", "start_time", "close", factor]
    
    # Load data3 if operation sign provided
    if operation != 'none':
        df1 = load_single_data(data3, factor2)
        # Merge df and df1
        df = pd.merge_asof(df, df1.sort_values('start_time'), on="start_time", direction="nearest")
        df, new_column_name = combine_factors(df, factor, factor2, operation)
        factor = new_column_name
    
    # metrics setting
    rolling_window = window
    threshold = threshold
    annualizer = annualizer_dict.get(interval, None) # Day data, so 365
    train_split = annualizer * 3

    # Split data into train and test sets (Train set: 3Year, Test set use remaining data)
    # df_train = df[:train_split].reset_index(drop=True).copy()
    df_test = df[train_split:].reset_index(drop=True).copy()

    forwardtest_report = []
    forwardtest_report.append(backtest(df_test, rolling_window, threshold, preprocess, entry, annualizer, model, factor, interval))

    print(json.dumps(forwardtest_report, indent=4))

    # 儲存 forward test csv
    if c.save_plot == True:
        df.to_csv(f"{c.alpha_id}_forward_test_df.csv", index=False)
        print(f"已儲存 {c.alpha_id}_forward_test_df.csv")
    
    df_test['start_time'] = pd.to_datetime(df_test['start_time'], unit='ms')
    start_date = df_test['start_time'].min().strftime('%Y-%m-%d')
    end_date = df_test['start_time'].max().strftime('%Y-%m-%d')
    # Plot close price on the left y-axis
    fig, ax1 = plt.subplots(figsize=(15, 8))
    ax1.plot(df_test['start_time'], df_test['close'], label='Close Price', color='green', linewidth=2)
    ax1.set_xlabel("Date", fontsize=12)
    ax1.set_ylabel("Close Price", fontsize=12, color='green')
    ax1.tick_params(axis='y', labelcolor='green')
    # Plot cumulative PnL on the right y-axis
    ax2 = ax1.twinx()
    ax2.plot(df_test['start_time'], df_test['cumu_pnl'], label='Cumulative PnL', color='blue', linewidth=2)
    ax2.set_ylabel("Cumulative PnL", fontsize=12, color='blue')
    ax2.tick_params(axis='y', labelcolor='blue')
    # Add title and grid
    plt.title(f"Close Price and Cumulative PnL Plot (Forward Test Period)-({start_date} ~ {end_date})", fontsize=16)
    fig.tight_layout()  # Adjust layout to prevent overlap
    plt.grid(True)

    # 儲存 Forwardtest CumuPnL 和 json file
    if c.save_plot == True:
        plt.savefig(f"Forwardtest_Equity_Curve_{start_date}_{end_date}", dpi=300, bbox_inches='tight')
        print(f"已儲存 Forwardtest_Equity_Curve_{start_date}_{end_date}.png")
        output_forwardtest_data = {"forward_test": forwardtest_report}
        with open(f"{c.alpha_id}_final_{factor}_{interval}_forward_test.json", "w") as json_file:
            json.dump(output_forwardtest_data, json_file, indent=4)
    plt.show()

main(
    c.candle_file,
    c.factor_file,
    c.factor2_file,
    c.factor,
    c.factor2,
    c.interval,
    c.operation,
    c.preprocess,
    c.model,
    c.entry,
    c.window,
    c.threshold
)

## Split Train backtest (For cumuPNL Graph)

In [None]:
from utilsnumpy import backtest , load_data, load_single_data, combine_factors
import pandas as pd
import numpy as np
import plotly.express as px
import matplotlib.pyplot as plt
import seaborn as sns
import talib
import math
import json
import config as c

annualizer_dict = {
    '1m': 525600,  # 1-minute intervals in a year
    '5m': 105120,  # 5-minute intervals in a year
    '15m': 35040,  # 15-minute intervals in a year
    '30m': 17520,  # 30-minute intervals in a year
    '1h': 8760,    # 1-hour intervals in a year
    '4h': 2190,    # 4-hour intervals in a year
    '1d': 365,     # 1-day intervals in a year
    '1w': 52,      # 1-week intervals in a year
    '1M': 12       # 1-month intervals in a year
}

def main(data1, data2, data3, factor, factor2, interval, operation, preprocess, model, entry, window, threshold):
    # Merge Data
    unselected_df = load_data(data1, data2)
    # Select wanted data column
    df = unselected_df[["Time", "start_time", "Close", factor]].copy()
    # rename data column
    df.columns = ["Time", "start_time", "close", factor]
    
    # Load data3 if operation sign provided
    if operation != 'none':
        df1 = load_single_data(data3, factor2)
        # Merge df and df1
        df = pd.merge_asof(df, df1.sort_values('start_time'), on="start_time", direction="nearest")
        df, new_column_name = combine_factors(df, factor, factor2, operation)
        factor = new_column_name    
    # metrics setting
    rolling_window = window
    threshold = threshold
    annualizer = annualizer_dict.get(interval, None)
    train_split = annualizer * 3

    # Split data into train and test sets (Train set: 3Year, Test set use remaining data)
    df_train = df[:train_split].reset_index(drop=True).copy()
    # df_test = df[train_split:].reset_index(drop=True).copy()

    backtest_report = []
    backtest_report.append(backtest(df_train, rolling_window, threshold, preprocess, entry, annualizer, model, factor, interval))

    print(json.dumps(backtest_report, indent=4))

    # 儲存 backtest csv
    if c.save_plot == True:
        df.to_csv(f"{c.alpha_id}_backtest_df.csv", index=False)
        print(f"已儲存 {c.alpha_id}_backtest_df.csv")
    
    df_train['start_time'] = pd.to_datetime(df_train['start_time'], unit='ms')
    start_date = df_train['start_time'].min().strftime('%Y-%m-%d')
    end_date = df_train['start_time'].max().strftime('%Y-%m-%d')
    # Plot close price on the left y-axis
    fig, ax1 = plt.subplots(figsize=(15, 8))
    ax1.plot(df_train['start_time'], df_train['close'], label='Close Price', color='green', linewidth=2)
    ax1.set_xlabel("Date", fontsize=12)
    ax1.set_ylabel("Close Price", fontsize=12, color='green')
    ax1.tick_params(axis='y', labelcolor='green')
    # Plot cumulative PnL on the right y-axis
    ax2 = ax1.twinx()
    ax2.plot(df_train['start_time'], df_train['cumu_pnl'], label='Cumulative PnL', color='blue', linewidth=2)
    ax2.set_ylabel("Cumulative PnL", fontsize=12, color='blue')
    ax2.tick_params(axis='y', labelcolor='blue')
    # Add title and grid
    plt.title(f"Close Price and Cumulative PnL Plot (Split Backtest Period)-({start_date} ~ {end_date})", fontsize=16)
    fig.tight_layout()  # Adjust layout to prevent overlap
    plt.grid(True)
    # 儲存 Backtest CumuPnL
    if c.save_plot == True:
        plt.savefig(f"Backtest_Equity_Curve_{start_date}_{end_date}", dpi=300, bbox_inches='tight')
        print(f"已儲存 Backtest_Equity_Curve_{start_date}_{end_date}.png")
        output_backtest_data = {"back_test": backtest_report}
        with open(f"{c.alpha_id}_final_{factor}_{interval}_back_test.json", "w") as json_file:
            json.dump(output_backtest_data, json_file, indent=4)
    plt.show()

main(
    c.candle_file,
    c.factor_file,
    c.factor2_file,
    c.factor,
    c.factor2,
    c.interval,
    c.operation,
    c.preprocess,
    c.model,
    c.entry,
    c.window,
    c.threshold
)

## Backtest(No Permutation)(HandTest)

In [None]:
from utilsnumpy import backtest , load_data, load_single_data, combine_factors
import pandas as pd
import numpy as np
import plotly.express as px
import matplotlib.pyplot as plt
import seaborn as sns
import talib
import math
import json
import modin
import config as c

annualizer_dict = {
    '1m': 525600,  # 1-minute intervals in a year
    '5m': 105120,  # 5-minute intervals in a year
    '15m': 35040,  # 15-minute intervals in a year
    '30m': 17520,  # 30-minute intervals in a year
    '1h': 8760,    # 1-hour intervals in a year
    '4h': 2190,    # 4-hour intervals in a year
    '1d': 365,     # 1-day intervals in a year
    '1w': 52,      # 1-week intervals in a year
    '1M': 12       # 1-month intervals in a year
}

def main(data1, data2, data3, factor, factor2, interval, operation, preprocess, model, entry, window, threshold):
    # Merge Data
    unselected_df = load_data(data1, data2)
    # Select wanted data column
    df = unselected_df[["Time", "start_time", "Close", factor]].copy()
    # rename data column
    df.columns = ["Time", "start_time", "close", factor]
    
    # Load data3 if operation sign provided
    if operation != 'none':
        df1 = load_single_data(data3, factor2)
        # Merge df and df1
        df = pd.merge_asof(df, df1.sort_values('start_time'), on="start_time", direction="nearest")
        df, new_column_name = combine_factors(df, factor, factor2, operation)
        factor = new_column_name
        
    # metrics setting
    rolling_window = window
    threshold = threshold
    annualizer = annualizer_dict.get(interval, None) # Day data, so 365

    backtest_report = []
    backtest_report.append(backtest(df, rolling_window, threshold, preprocess, entry, annualizer, model, factor, interval))

    print(json.dumps(backtest_report, indent=4))


    # 儲存 full time test csv
    if c.save_plot == True:
        df.to_csv(f"{c.alpha_id}_full_time_backtest_df.csv", index=False)
        print(f"已儲存 {c.alpha_id}_full_time_backtest_df.csv")
    # df.to_csv(f"./liveRunning/excel_for_each_backtest/{factor}_{preprocess}_{interval}_{model}_{entry}_{window}_{threshold}.csv", index=False)

    df['start_time'] = pd.to_datetime(df['start_time'], unit='ms')
    start_date = df['start_time'].min().strftime('%Y-%m-%d')
    end_date = df['start_time'].max().strftime('%Y-%m-%d')
    # Plot close price on the left y-axis
    fig, ax1 = plt.subplots(figsize=(15, 8))
    ax1.plot(df['start_time'], df['close'], label='Close Price', color='green', linewidth=2)
    ax1.set_xlabel("Date", fontsize=12)
    ax1.set_ylabel("Close Price", fontsize=12, color='green')
    ax1.tick_params(axis='y', labelcolor='green')
    # Plot cumulative PnL on the right y-axis
    ax2 = ax1.twinx()
    ax2.plot(df['start_time'], df['cumu_pnl'], label='Cumulative PnL', color='blue', linewidth=2)
    ax2.set_ylabel("Cumulative PnL", fontsize=12, color='blue')
    ax2.tick_params(axis='y', labelcolor='blue')
    # Add title and grid
    plt.title(f"Close Price and Cumulative PnL Plot (Full Length)-({start_date} ~ {end_date})", fontsize=16)
    fig.tight_layout()  # Adjust layout to prevent overlap
    plt.grid(True)                  
    plt.show()

main(
    c.candle_file,
    c.factor_file,
    c.factor2_file,
    c.factor,
    c.factor2,
    c.interval,
    c.operation,
    c.preprocess,
    c.model,
    c.entry,
    c.window,
    c.threshold
)