In [None]:
from datetime import timedelta

import matplotlib.pyplot as plt
import mplfinance as mpf
import pandas as pd
import ta

In [None]:
# resampling params
resample_str = "1h"
timedelta_value = timedelta(hours=1)

In [None]:
def load_data(year_range):
    # データの読み込み
    data = pd.concat(
        [
            pd.read_csv(
                f"data/histdata/HISTDATA_COM_ASCII_EURUSD_M1_{year}/DAT_ASCII_EURUSD_M1_{year}.csv",
                header=None,
                index_col="timestamp",
                delimiter=";",
                names=["timestamp", "open", "high", "low", "close", "volume"],
                parse_dates=["timestamp"],
                date_format="%Y%m%d %H%M%S",
            )
            for year in year_range
        ]
    )

    # 15分足にリサンプリング
    resampled_data = (
        data.resample("15min")
        .agg(
            {
                "open": "first",
                "high": "max",
                "low": "min",
                "close": "last",
                "volume": "sum",
            }
        )
        .dropna()
    )

    print(f"リサンプリング後のデータサイズ: {len(resampled_data)}")
    print(f"データ期間: {resampled_data.index[0]} から {resampled_data.index[-1]}")

    return data, resampled_data


train_data_1min, train_data = load_data(range(2010, 2021))
val1_data_1min, val1_data = load_data([2021, 2022])
val2_data_1min, val2_data = load_data([2023, 2024])

In [None]:
def add_labels(data, window_days=20, mode="left2"):
    if mode == "left":
        rolling_window = window_days + 1
        data["labels"] = (
            data["close"]
            .shift(-window_days)
            .rolling(rolling_window)
            .apply(
                lambda x: (
                    0 if x.iloc[0] == x.max() else (1 if x.iloc[0] == x.min() else 0.5)
                )
            )
        )
    elif mode == "center":
        rolling_window = window_days * 2 + 1
        data["labels"] = (
            data["close"]
            .rolling(rolling_window, center=True)
            .apply(
                lambda x: (
                    0
                    if x.iloc[window_days] == x.max()
                    else (1 if x.iloc[window_days] == x.min() else 0.5)
                )
            )
        )
    elif mode == "left2":
        rolling_window = window_days + 1
        data["labels"] = (
            data["close"]
            .shift(-window_days)
            .rolling(rolling_window)
            .apply(
                lambda x: (
                    0 if x.iloc[0] == x.max() else (1 if x.iloc[0] == x.min() else 0.5)
                )
            )
        )
        hl_range = (
            data["close"]
            .shift(-window_days)
            .rolling(rolling_window)
            .apply(lambda x: x.max() - x.min())
        )
        hl_range_thr = hl_range.median()
        data.loc[hl_range < hl_range_thr, "labels"] = 0.5

    elif mode == "center2":
        rolling_window = window_days * 2 + 1
        data["labels"] = (
            data["close"]
            .rolling(rolling_window, center=True)
            .apply(
                lambda x: (
                    0
                    if x.iloc[window_days] == x.max()
                    else (1 if x.iloc[window_days] == x.min() else 0.5)
                )
            )
        )
        hl_range = (
            data["close"]
            .rolling(rolling_window, center=True)
            .apply(lambda x: x.max() - x.min())
        )
        hl_range_thr = hl_range.median()
        data.loc[hl_range < hl_range_thr, "labels"] = 0.5
    return data


train_data = add_labels(train_data)
val1_data = add_labels(val1_data)
val2_data = add_labels(val2_data)
test_data = add_labels(test_data)

In [None]:
def plot_date_range(data, date_start_str, date_end_str, additional_columns=[]):
    # 指定日のデータを抽出
    date_start = pd.Timestamp(date_start_str).date()
    date_end = pd.Timestamp(date_end_str).date()
    target_data = data[(data.index.date >= date_start) & (data.index.date <= date_end)]

    # ラベルを別パネルに表示
    apds = [
        mpf.make_addplot(target_data[col], panel=1, ylabel=col, secondary_y=False)
        for col in additional_columns
    ]

    # ろうそく足チャートを表示
    mpf.plot(
        target_data,
        type="candle",
        style="charles",
        ylabel="Price",
        figsize=(24, 8),
        addplot=apds,
    )

    plt.show()

In [None]:
def plot_first_n_bars(data, n_bars, additional_columns=[]):
    # 指定日のデータを抽出
    target_data = data.iloc[:n_bars]

    # ラベルを別パネルに表示
    apds = [
        mpf.make_addplot(target_data[col], panel=1, ylabel=col, secondary_y=False)
        for col in additional_columns
    ]

    # ろうそく足チャートを表示
    mpf.plot(
        target_data,
        type="candle",
        style="charles",
        ylabel="Price",
        figsize=(24, 8),
        addplot=apds,
    )

    plt.show()

In [None]:
plot_first_n_bars(train_data, 300, ["labels"])

In [None]:
import ta


def fib_list(start=1, stop=233):
    fib_list = [1, 2]
    for i in range(2, stop):
        new_value = fib_list[i - 1] + fib_list[i - 2]
        if new_value > stop:
            break
        else:
            fib_list.append(new_value)
    fib_list = [val for val in fib_list if val >= start]
    return fib_list


def create_features(data):
    data = data.copy()

    data["ft_open"] = data["open"] - data["close"]
    data["ft_high"] = data["high"] - data["close"]
    data["ft_low"] = data["low"] - data["close"]

    for i in [2, 3, 5, 10, 20, 30, 50, 100]:
        # deviation close
        data[f"ft_deviation_close_{i}"] = data["close"] - data["close"].shift(i)

        # deviation high and low
        data[f"ft_deviation_high_{i}"] = data["close"] - data["high"].rolling(i).max()
        data[f"ft_deviation_low_{i}"] = data["close"] - data["low"].rolling(i).max()

        # deviation sma
        sma = ta.trend.sma_indicator(data["close"], window=i)
        data[f"ft_deviation_sma_{i}"] = data["close"] - sma

    return data


train_data = create_features(train_data)
val1_data = create_features(val1_data)
val2_data = create_features(val2_data)
test_data = create_features(test_data)

In [None]:
def drop_na(data):
    notna = data.notna().all(axis=1)
    return data[notna], notna


train_data, train_notna = drop_na(train_data)
val1_data, val1_notna = drop_na(val1_data)
val2_data, val2_notna = drop_na(val2_data)
test_data, test_notna = drop_na(test_data)

In [None]:
features = [col for col in train_data.columns.to_list() if col.startswith("ft_")]

import joblib

joblib.dump(features, "features.joblib")

In [None]:
import numpy as np


def fit_normalizers(data, features):
    normalizers = []
    for feature in features:
        values = data[feature].values
        std = np.std(values)
        if std == 0:
            std = 1.0
        normalizers.append(std)
    return normalizers


normalizers = fit_normalizers(train_data, features)

import joblib

joblib.dump(normalizers, "normalizers.joblib")

In [None]:
def normalize(data, features, normalizers):
    data = data.copy()
    for i, feature in enumerate(features):
        normalizer = normalizers[i]
        data[feature] = data[feature] / normalizer
    return data


train_data = normalize(train_data, features, normalizers)
val1_data = normalize(val1_data, features, normalizers)
val2_data = normalize(val2_data, features, normalizers)
test_data = normalize(test_data, features, normalizers)

In [None]:
def extract_xy(data, features):
    x = data[features].values
    y = data["labels"].values
    return x, y


x_train, y_train = extract_xy(train_data, features)
x_val1, y_val1 = extract_xy(val1_data, features)
x_val2, y_val2 = extract_xy(val2_data, features)
x_test, y_test = extract_xy(test_data, features)

In [None]:
from typing import List, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import KFold, cross_val_score


def feature_selection_by_validation_loss(
    x_train: np.ndarray,
    y_train: np.ndarray,
    x_val: np.ndarray,
    y_val: np.ndarray,
    features: List[str],
    model_params: dict = None,
    selection_method: str = "forward",  # 'forward', 'backward', 'importance'
    min_features: int = 5,
    max_features: int = None,
    verbose: bool = True,
) -> Tuple[List[int], List[str], dict]:
    """
    バリデーションロスに基づいて特徴量選択を行う

    Parameters:
    -----------
    selection_method : str
        - 'forward': 前進選択法
        - 'backward': 後退選択法
        - 'importance': 特徴量重要度に基づく選択
    """

    if model_params is None:
        model_params = {
            "n_estimators": 100,
            "max_depth": 5,
            "learning_rate": 0.01,
            "subsample": 0.8,
            "colsample_bytree": 0.8,
            "random_state": 42,
            "n_jobs": -1,
            "objective": "reg:logistic",
            "eval_metric": "logloss",
            "early_stopping_rounds": 20,
            "tree_method": "hist",
            "device": "cuda",
        }

    n_features = x_train.shape[1]
    if max_features is None:
        max_features = n_features

    if selection_method == "forward":
        return forward_selection(
            x_train,
            y_train,
            x_val,
            y_val,
            features,
            model_params,
            min_features,
            max_features,
            verbose,
        )
    elif selection_method == "backward":
        return backward_selection(
            x_train,
            y_train,
            x_val,
            y_val,
            features,
            model_params,
            min_features,
            max_features,
            verbose,
        )
    elif selection_method == "importance":
        return importance_based_selection(
            x_train,
            y_train,
            x_val,
            y_val,
            features,
            model_params,
            min_features,
            max_features,
            verbose,
        )
    else:
        raise ValueError(f"Unknown selection method: {selection_method}")


def forward_selection(
    x_train,
    y_train,
    x_val,
    y_val,
    features,
    model_params,
    min_features,
    max_features,
    verbose,
):
    """前進選択法による特徴量選択"""

    n_features = x_train.shape[1]
    selected_features = []
    remaining_features = list(range(n_features))

    best_score = float("inf")
    scores_history = []

    for i in range(min(max_features, n_features)):
        best_feature = None
        best_iteration_score = float("inf")

        for feature_idx in remaining_features:
            current_features = selected_features + [feature_idx]

            # モデルの訓練
            model = xgb.XGBRegressor(**model_params)
            model.fit(
                x_train[:, current_features],
                y_train,
                eval_set=[(x_val[:, current_features], y_val)],
                verbose=False,
            )

            # バリデーションスコアの取得
            val_score = model.best_score

            if val_score < best_iteration_score:
                best_iteration_score = val_score
                best_feature = feature_idx

        if best_feature is not None:
            selected_features.append(best_feature)
            remaining_features.remove(best_feature)
            scores_history.append(best_iteration_score)

            if verbose:
                print(
                    f"Step {i+1}: Added feature {features[best_feature]}, "
                    f"Val Score: {best_iteration_score:.6f}"
                )

            # 早期停止の判定
            if len(scores_history) >= 3 and i >= min_features - 1:
                if scores_history[-1] > scores_history[-2] > scores_history[-3]:
                    if verbose:
                        print("Early stopping: validation score not improving")
                    break

    selected_feature_names = [features[idx] for idx in selected_features]

    return selected_features, selected_feature_names, {"scores": scores_history}


def backward_selection(
    x_train,
    y_train,
    x_val,
    y_val,
    features,
    model_params,
    min_features,
    max_features,
    verbose,
):
    """後退選択法による特徴量選択"""

    n_features = x_train.shape[1]
    selected_features = list(range(n_features))
    scores_history = []

    while len(selected_features) > min_features:
        worst_feature = None
        best_score = float("inf")

        for feature_idx in selected_features:
            current_features = [f for f in selected_features if f != feature_idx]

            # モデルの訓練
            model = xgb.XGBRegressor(**model_params)
            model.fit(
                x_train[:, current_features],
                y_train,
                eval_set=[(x_val[:, current_features], y_val)],
                verbose=False,
            )

            # バリデーションスコアの取得
            val_score = model.best_score

            if val_score < best_score:
                best_score = val_score
                worst_feature = feature_idx

        if worst_feature is not None:
            selected_features.remove(worst_feature)
            scores_history.append(best_score)

            if verbose:
                print(
                    f"Removed feature {features[worst_feature]}, "
                    f"Val Score: {best_score:.6f}, "
                    f"Remaining: {len(selected_features)}"
                )

            # 早期停止の判定
            if len(scores_history) >= 3:
                if scores_history[-1] > scores_history[-2] > scores_history[-3]:
                    if verbose:
                        print("Early stopping: validation score not improving")
                    break

        if len(selected_features) <= max_features:
            break

    selected_feature_names = [features[idx] for idx in selected_features]

    return selected_features, selected_feature_names, {"scores": scores_history}


def importance_based_selection(
    x_train,
    y_train,
    x_val,
    y_val,
    features,
    model_params,
    min_features,
    max_features,
    verbose,
):
    """特徴量重要度に基づく選択"""

    # 全特徴量でモデルを訓練
    model = xgb.XGBRegressor(**model_params)
    model.fit(x_train, y_train, eval_set=[(x_val, y_val)], verbose=False)

    # 特徴量重要度を取得
    importances = model.feature_importances_

    # 重要度でソート
    feature_importance_df = pd.DataFrame(
        {"feature": features, "importance": importances, "idx": range(len(features))}
    ).sort_values("importance", ascending=False)

    # 異なる特徴量数でのバリデーションスコアを計算
    scores_history = []
    best_score = float("inf")
    best_n_features = min_features

    for n in range(min_features, min(max_features + 1, len(features) + 1)):
        selected_indices = feature_importance_df.iloc[:n]["idx"].values

        # モデルの訓練
        model = xgb.XGBRegressor(**model_params)
        model.fit(
            x_train[:, selected_indices],
            y_train,
            eval_set=[(x_val[:, selected_indices], y_val)],
            verbose=False,
        )

        val_score = model.best_score
        scores_history.append(val_score)

        if val_score < best_score:
            best_score = val_score
            best_n_features = n

        if verbose:
            print(f"Top {n} features, Val Score: {val_score:.6f}")

    # 最適な特徴量数を選択
    selected_indices = feature_importance_df.iloc[:best_n_features][
        "idx"
    ].values.tolist()
    selected_feature_names = [features[idx] for idx in selected_indices]

    if verbose:
        print(f"\nBest number of features: {best_n_features}")
        print(f"Best validation score: {best_score:.6f}")

    return (
        selected_indices,
        selected_feature_names,
        {"scores": scores_history, "importance_df": feature_importance_df},
    )


# 特徴量選択を実行
selected_indices, selected_features, info = feature_selection_by_validation_loss(
    x_train, y_train, x_val1, y_val1, features, verbose=True
)

# 結果の可視化
if "scores" in info:
    plt.figure(figsize=(10, 6))
    plt.plot(info["scores"])
    plt.xlabel("Number of Features")
    plt.ylabel("Validation Loss")
    plt.title(f"Feature Selection")
    plt.grid(True)
    plt.show()

if "importance_df" in info:
    # 上位20個の重要な特徴量を表示
    plt.figure(figsize=(10, 8))
    top_features = info["importance_df"].head(20)
    plt.barh(range(len(top_features)), top_features["importance"])
    plt.yticks(range(len(top_features)), top_features["feature"])
    plt.xlabel("Importance")
    plt.title("Top 20 Feature Importances")
    plt.gca().invert_yaxis()
    plt.tight_layout()
    plt.show()

print(f"\n選択された特徴量数: {len(selected_features)}")
print(f"選択された特徴量: {selected_features[:10]}...")  # 最初の10個を表示

joblib.dump(selected_features, "selected_features.joblib")
joblib.dump(selected_indices, "selected_indices.joblib")

In [None]:
# 選択された特徴量のみを使用
x_train_selected = x_train[:, selected_indices]
x_val1_selected = x_val1[:, selected_indices]
x_val2_selected = x_val2[:, selected_indices]
x_test_selected = x_test[:, selected_indices]

# 選択された特徴量でモデルを再訓練（In[21]の代わりに）
model_optimized = xgb.XGBRegressor(
    n_estimators=1000,
    max_depth=5,
    learning_rate=0.01,
    subsample=0.8,
    colsample_bytree=0.8,
    min_child_weight=1,
    gamma=0.1,
    reg_alpha=0.1,
    reg_lambda=1.0,
    random_state=42,
    n_jobs=-1,
    objective="reg:logistic",
    eval_metric="logloss",
    early_stopping_rounds=50,
)

model_optimized.fit(
    x_train_selected, y_train, eval_set=[(x_val1_selected, y_val1)], verbose=True
)

# 選択された特徴量を保存
import joblib

joblib.dump(model_optimized, "model.joblib")

In [None]:
predictions = model_optimized.predict(x_val2_selected)
val2_data.loc[val2_notna, "predictions"] = predictions

predictions = model_optimized.predict(x_test_selected)
test_data.loc[test_notna, "predictions"] = predictions

In [None]:
plot_first_n_bars(val2_data, 300, ["predictions"])

In [None]:
import itertools
import json
import multiprocessing as mp
from datetime import datetime
from functools import partial

import numpy as np
import pandas as pd
from tqdm import tqdm


def calculate_evaluation_metrics(result, weights=None):
    """
    複数の評価指標を計算し、重み付きスコアを返す
    
    Parameters:
    -----------
    result : dict
        バックテスト結果
    weights : dict
        各指標の重み（デフォルト: 均等重み）
    
    Returns:
    --------
    dict : 各指標の値と総合スコア
    """
    
    # デフォルトの重み設定
    if weights is None:
        weights = {
            'sharpe_ratio': 0.3,
            'profit_dd_ratio': 0.3,
            'calmar_ratio': 0.2,
            'win_rate': 0.1,
            'profit_factor': 0.1
        }
    
    metrics = {}
    
    # 1. シャープレシオの計算
    balance_hist = result.get("balance_history", [])
    if len(balance_hist) > 1:
        returns = np.diff(balance_hist)
        if len(returns) > 0 and np.std(returns) > 0:
            sharpe_ratio = np.mean(returns) / np.std(returns) * np.sqrt(252)  # 年率換算
            # 負のシャープレシオも考慮
            metrics['sharpe_ratio'] = sharpe_ratio
        else:
            metrics['sharpe_ratio'] = 0
    else:
        metrics['sharpe_ratio'] = 0
    
    # 2. Profit/DrawDown比率の計算
    final_balance = result.get('final_balance', 0)
    max_drawdown = result.get('max_drawdown', 0.00001)  # ゼロ除算回避
    
    if max_drawdown > 0:
        profit_dd_ratio = final_balance / max_drawdown
    else:
        profit_dd_ratio = final_balance * 100 if final_balance > 0 else 0
    metrics['profit_dd_ratio'] = profit_dd_ratio
    
    # 3. カルマー比率（年率リターン/最大ドローダウン）
    if len(balance_hist) > 1:
        total_days = len(balance_hist) / (24 * 60)  # 1分足データを日数に変換（概算）
        if total_days > 0:
            annual_return = (final_balance / total_days) * 252
            calmar_ratio = annual_return / max_drawdown if max_drawdown > 0 else 0
        else:
            calmar_ratio = 0
    else:
        calmar_ratio = 0
    metrics['calmar_ratio'] = calmar_ratio
    
    # 4. 勝率
    metrics['win_rate'] = result.get('win_rate', 0)
    
    # 5. プロフィットファクター
    profit_factor = result.get('profit_factor', 0)
    if profit_factor == float('inf'):
        profit_factor = 10  # 無限大の場合は上限値を設定
    metrics['profit_factor'] = profit_factor
    
    # 各指標を正規化（0-1の範囲にスケーリング）
    normalized_metrics = {}
    
    # シャープレシオ（-2から4の範囲を想定）
    normalized_metrics['sharpe_ratio'] = min(max((metrics['sharpe_ratio'] + 2) / 6, 0), 1)
    
    # Profit/DD比率（0から10の範囲を想定）
    normalized_metrics['profit_dd_ratio'] = min(metrics['profit_dd_ratio'] / 10, 1)
    
    # カルマー比率（-1から3の範囲を想定）
    normalized_metrics['calmar_ratio'] = min(max((metrics['calmar_ratio'] + 1) / 4, 0), 1)
    
    # 勝率（既に0-1の範囲）
    normalized_metrics['win_rate'] = metrics['win_rate']
    
    # プロフィットファクター（0から5の範囲を想定）
    normalized_metrics['profit_factor'] = min(metrics['profit_factor'] / 5, 1)
    
    # 重み付き総合スコアの計算
    weighted_score = sum(
        normalized_metrics[key] * weights.get(key, 0) 
        for key in normalized_metrics
    )
    
    # 追加のペナルティ条件
    # トレード数が200を下回った場合の段階的ペナルティ
    total_trades = result.get('total_trades', 0)
    if total_trades < 200:
        # 200から0に向かって徐々にペナルティが強くなる
        trade_penalty_factor = total_trades / 200.0
        # 最小でも0.1の係数を適用（完全に0にはしない）
        trade_penalty_factor = max(trade_penalty_factor, 0.1)
        weighted_score *= trade_penalty_factor
    
    # 最大ドローダウンが大きすぎる場合のペナルティ
    if max_drawdown > 0.5:
        weighted_score *= 0.7
    
    return {
        'metrics': metrics,
        'normalized_metrics': normalized_metrics,
        'weighted_score': weighted_score,
        'weights_used': weights
    }

In [None]:
def run_single_backtest(params, data, data_1min, timedelta_value, evaluation_weights=None):
    """
    単一のパラメータセットでバックテストを実行（改良版）
    """
    sl_range, tp_ratio, min_tp_range, prediction_margin = params

    # backtest_strategy関数を実行
    result = backtest_strategy_modified(
        sl_range=sl_range,
        tp_ratio=tp_ratio,
        min_tp_range=min_tp_range,
        prediction_margin=prediction_margin,
        spread=0.00010,
        data=data,
        data_1min=data_1min,
        timedelta_value=timedelta_value,
    )

    # 複合的な評価指標を計算
    evaluation = calculate_evaluation_metrics(result, weights=evaluation_weights)
    
    return {
        "params": {
            "sl_range": sl_range,
            "tp_ratio": tp_ratio,
            "min_tp_range": min_tp_range,
            "prediction_margin": prediction_margin,
        },
        "score": evaluation['weighted_score'],
        "evaluation_details": evaluation,
        "result": result,
    }

In [None]:
def backtest_strategy_modified(
    sl_range=0.1,
    tp_ratio=0.8,
    min_tp_range=0.001,
    prediction_margin=0.05,
    spread=0.00010,
    data=None,
    data_1min=None,
    timedelta_value=None,
):
    """
    修正版のbacktest_strategy関数（データを引数として受け取る）
    """
    # states
    next_prediction_index = 0
    position = 0
    last_position = 0
    balance = 0
    balance_hist = []
    trade_count = 0
    win_count = 0
    loss_count = 0
    total_profit = 0
    total_loss = 0
    max_drawdown = 0
    peak_balance = 0
    trades = []

    for timestamp, row in data_1min.iterrows():
        next_prediction_value = data.iloc[next_prediction_index]["predictions"]
        next_prediction_timestamp = data.iloc[next_prediction_index].name

        # 決済処理
        if position != 0:
            # 買いの場合の逆指値の処理
            if position == 1:
                if row["low"] <= sl_price:
                    pnl = sl_price - entry_price - spread
                    position = 0
                    balance += pnl
                    balance_hist.append(balance)
                    trade_count += 1
                    if pnl > 0:
                        win_count += 1
                        total_profit += pnl
                    else:
                        loss_count += 1
                        total_loss += abs(pnl)
                    trades.append(
                        {
                            "type": "buy",
                            "entry_price": entry_price,
                            "exit_price": sl_price,
                            "pnl": pnl,
                            "exit_reason": "stop_loss",
                            "exit_time": timestamp,
                        }
                    )

            # 売りの場合の逆指値の処理
            elif position == -1:
                if row["high"] >= sl_price:
                    pnl = entry_price - sl_price - spread
                    position = 0
                    balance += pnl
                    balance_hist.append(balance)
                    trade_count += 1
                    if pnl > 0:
                        win_count += 1
                        total_profit += pnl
                    else:
                        loss_count += 1
                        total_loss += abs(pnl)
                    trades.append(
                        {
                            "type": "sell",
                            "entry_price": entry_price,
                            "exit_price": sl_price,
                            "pnl": pnl,
                            "exit_reason": "stop_loss",
                            "exit_time": timestamp,
                        }
                    )

            # 買いの場合の逆指値更新
            if position == 1:
                if (
                    row["close"] >= entry_price + min_tp_range
                    and sl_price < entry_price + (row["close"] - entry_price) * tp_ratio
                ):
                    sl_price = entry_price + (row["close"] - entry_price) * tp_ratio
            # 売りの場合の逆指値更新
            elif position == -1:
                if (
                    row["close"] <= entry_price - min_tp_range
                    and sl_price > entry_price - (entry_price - row["close"]) * tp_ratio
                ):
                    sl_price = entry_price - (entry_price - row["close"]) * tp_ratio

        # ドローダウン計算
        if balance > peak_balance:
            peak_balance = balance
        drawdown = peak_balance - balance
        if drawdown > max_drawdown:
            max_drawdown = drawdown

        # 足が確定した次の1分足の終値で判断を行う
        if timestamp > next_prediction_timestamp + timedelta_value:
            # 買いエントリー
            if next_prediction_value > 0.5 + prediction_margin:
                # 買いポジションでない場合
                if position != 1:
                    # 売りポジションの場合は決済
                    if position == -1:
                        pnl = entry_price - row["close"] - spread
                        position = 0
                        balance += pnl
                        balance_hist.append(balance)
                        trade_count += 1
                        if pnl > 0:
                            win_count += 1
                            total_profit += pnl
                        else:
                            loss_count += 1
                            total_loss += abs(pnl)
                        trades.append(
                            {
                                "type": "sell",
                                "entry_price": entry_price,
                                "exit_price": row["close"],
                                "pnl": pnl,
                                "exit_reason": "reversal",
                                "exit_time": timestamp,
                            }
                        )
                    position = 1
                    last_position = 1
                    entry_price = row["close"]
                    entry_time = timestamp
                    sl_price = row["close"] - sl_range

            # 売りエントリー
            elif next_prediction_value < 0.5 - prediction_margin:
                # 売りポジションでない場合
                if position != -1:
                    # 買いポジションの場合は決済
                    if position == 1:
                        pnl = row["close"] - entry_price - spread
                        position = 0
                        balance += pnl
                        balance_hist.append(balance)
                        trade_count += 1
                        if pnl > 0:
                            win_count += 1
                            total_profit += pnl
                        else:
                            loss_count += 1
                            total_loss += abs(pnl)
                        trades.append(
                            {
                                "type": "buy",
                                "entry_price": entry_price,
                                "exit_price": row["close"],
                                "pnl": pnl,
                                "exit_reason": "reversal",
                                "exit_time": timestamp,
                            }
                        )
                    position = -1
                    last_position = -1
                    entry_price = row["close"]
                    entry_time = timestamp
                    sl_price = row["close"] + sl_range

            next_prediction_index += 1
            if next_prediction_index >= len(data):
                # 全ての推論結果の処理が完了した
                break

    # 最終ポジションの決済
    if position != 0:
        final_price = data_1min.iloc[-1]["close"]
        if position == 1:
            pnl = final_price - entry_price - spread
        else:  # position == -1
            pnl = entry_price - final_price - spread
        balance += pnl
        balance_hist.append(balance)
        trade_count += 1
        if pnl > 0:
            win_count += 1
            total_profit += pnl
        else:
            loss_count += 1
            total_loss += abs(pnl)
        trades.append(
            {
                "type": "buy" if position == 1 else "sell",
                "entry_price": entry_price,
                "exit_price": final_price,  
                "pnl": pnl,
                "exit_reason": "end_of_data",
                "exit_time": data_1min.index[-1],
            }
        )

    # 結果の計算
    result = {
        "final_balance": balance,
        "total_trades": trade_count,
        "winning_trades": win_count,
        "losing_trades": loss_count,
        "win_rate": win_count / trade_count if trade_count > 0 else 0,
        "total_profit": total_profit,
        "total_loss": total_loss,
        "profit_factor": total_profit / total_loss if total_loss > 0 else float("inf"),
        "average_win": total_profit / win_count if win_count > 0 else 0,
        "average_loss": total_loss / loss_count if loss_count > 0 else 0,
        "max_drawdown": max_drawdown,
        "balance_history": balance_hist,
        "trades": trades,
        "parameters": {
            "sl_range": sl_range,
            "tp_ratio": tp_ratio,
            "min_tp_range": min_tp_range,
            "prediction_margin": prediction_margin,
            "spread": spread,
        },
    }

    return result


In [None]:
def optimize_parameters_parallel(
    data, data_1min, timedelta_value, param_ranges=None, n_processes=None
):
    """
    並列処理でパラメータ最適化を実行

    Parameters:
    -----------
    data : DataFrame
        予測データ
    data_1min : DataFrame
        1分足データ
    timedelta_value : timedelta
        時間差
    param_ranges : dict
        各パラメータの探索範囲
    n_processes : int
        並列処理のプロセス数（Noneの場合はCPUコア数-1）
    """

    # デフォルトのパラメータ範囲
    if param_ranges is None:
        param_ranges = {
            "sl_range": [0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1],
            "tp_ratio": [0.5, 0.75, 0.9],
            "min_tp_range": [0.001, 0.002, 0.005, 0.01, 0.02],
            "prediction_margin": [0.01, 0.02, 0.05, 0.1, 0.2],
        }

    # パラメータの全組み合わせを生成
    param_combinations = list(
        itertools.product(
            param_ranges["sl_range"],
            param_ranges["tp_ratio"],
            param_ranges["min_tp_range"],
            param_ranges["prediction_margin"],
        )
    )

    print(f"総パラメータ組み合わせ数: {len(param_combinations)}")

    # プロセス数の設定
    if n_processes is None:
        n_processes = max(1, mp.cpu_count() - 1)

    print(f"使用プロセス数: {n_processes}")

    # 並列処理用の関数を準備
    run_backtest_partial = partial(
        run_single_backtest,
        data=data,
        data_1min=data_1min,
        timedelta_value=timedelta_value,
    )

    # 並列処理の実行
    start_time = datetime.now()

    with mp.Pool(processes=n_processes) as pool:
        # tqdmで進捗表示
        results = list(
            tqdm(
                pool.imap(run_backtest_partial, param_combinations),
                total=len(param_combinations),
                desc="Optimizing parameters",
            )
        )

    end_time = datetime.now()
    elapsed_time = (end_time - start_time).total_seconds()

    # 結果をスコアでソート
    results_sorted = sorted(results, key=lambda x: x["score"], reverse=True)

    # 最適なパラメータを取得
    best_result = results_sorted[0]

    # 結果のサマリーを作成
    summary = {
        "best_parameters": best_result["params"],
        "best_score": best_result["score"],
        "best_result_details": best_result["result"],
        "total_combinations": len(param_combinations),
        "optimization_time_seconds": elapsed_time,
        "top_10_results": results_sorted[:10],
    }

    return summary, results_sorted

In [None]:
summary, results_sorted = optimize_parameters_parallel(
    val2_data, val2_data_1min, timedelta_value
)

In [None]:
print('Summary:', {
    key: value
    for key, value in summary["best_result_details"].items()
    if key not in ["balance_history", "trades"]
})

In [None]:
result_df = pd.DataFrame([
    {
        **{k: v for k, v in r['result'].items() if k not in ['trades', 'balance_history', 'parameters']},
        **{k: v for k, v in r['result']['parameters'].items()}
    }
    for r in results_sorted])

In [None]:
result_df.to_excel('result_df.xlsx')

In [None]:
import seaborn as sns

sns.lineplot(summary["best_result_details"]["balance_history"])

In [None]:
params = summary['best_parameters']
test_result = backtest_strategy_modified(
    **params,
    spread=0.00010,
    data=test_data,
    data_1min=test_data_1min,
    timedelta_value=timedelta_value,
)

In [None]:
import seaborn as sns

sns.lineplot(test_result["balance_history"])