# ステップ 1: 環境準備とデータ取得・準備 (10分足、可能な限り過去から)


In [1]:
# ステップ 1: 環境準備とデータ取得・準備 (10分足、可能な限り過去から)

# 1. ライブラリのインポート
import MetaTrader5 as mt5
import pandas as pd
from datetime import datetime, timedelta
import os
import backtrader as bt

print("ライブラリをインポートしました。")
print(f"MetaTrader5 version: {mt5.__version__}")
print(f"pandas version: {pd.__version__}")
print(f"backtrader version: {bt.__version__}")

# --- データ取得パラメータ設定 ---
symbol = "USDJPYs"       # 取得したい通貨ペア
timeframe_mt5 = mt5.TIMEFRAME_M10 # 時間軸を10分足に変更
# 取得期間: 可能な限り過去から現在まで
utc_to = datetime.utcnow() # 現在時刻 (UTC)
# 非常に古い日付を開始日に設定 (ブローカーが持つ最古のデータから取得するため)
utc_from = datetime(2000, 1, 1) # 例: 2000年1月1日

# 保存するファイル名
# 時間軸名を分かりやすく変換 (M10を追加)
tf_map = {
    mt5.TIMEFRAME_M1: 'M1', mt5.TIMEFRAME_M5: 'M5', mt5.TIMEFRAME_M10: 'M10', # M10を追加
    mt5.TIMEFRAME_M15: 'M15', mt5.TIMEFRAME_M30: 'M30', mt5.TIMEFRAME_H1: 'H1',
    mt5.TIMEFRAME_H4: 'H4', mt5.TIMEFRAME_D1: 'D1', mt5.TIMEFRAME_W1: 'W1',
    mt5.TIMEFRAME_MN1: 'MN1'
}
timeframe_str = tf_map.get(timeframe_mt5, 'UnknownTF')
file_path = f'{symbol}_{timeframe_str}_data.csv' # ファイル名も変更

print(f"\n--- データ取得設定 ---")
print(f"通貨ペア: {symbol}")
print(f"時間軸: {timeframe_str}")
print(f"取得開始 (指定): {utc_from} (可能な限り過去から取得)")
print(f"取得終了 (UTC): {utc_to}")
print(f"保存ファイル名: {file_path}")

# 2. MT5ターミナルへの接続
if not mt5.initialize():
    print("initialize() failed, error code =", mt5.last_error())
    mt5_initialized = False
else:
    print("\nMT5ターミナルに接続しました。")
    mt5_initialized = True

# 3. ヒストリカルデータの取得
rates = None
if mt5_initialized:
    print("データを取得中... (データ量が多い場合、時間がかかることがあります)")
    # copy_rates_range を使用
    rates = mt5.copy_rates_range(symbol, timeframe_mt5, utc_from, utc_to)

    # 4. MT5ターミナルからの切断
    mt5.shutdown()
    print("MT5ターミナルとの接続を終了しました。")

    if rates is None:
        print(f"{symbol} のデータ取得に失敗しました。ターミナル設定やデータ提供状況を確認してください。")
    elif len(rates) == 0:
        print(f"{symbol} の指定期間のデータが見つかりませんでした。")
    else:
        print(f"{len(rates)} 件のデータを取得しました。")

# 5. Pandas DataFrameへの変換と整形
df = None
if rates is not None and len(rates) > 0:
    df = pd.DataFrame(rates)
    df['time'] = pd.to_datetime(df['time'], unit='s')
    df.rename(columns={'time': 'datetime', 'tick_volume': 'volume'}, inplace=True)
    df['openinterest'] = -1
    df = df[['datetime', 'open', 'high', 'low', 'close', 'volume', 'openinterest']]
    print("\nDataFrameへの変換とカラム調整が完了しました。")
    print("データの一部:")
    print(df.head()) # 最初の数行を表示
    print("...")
    print(df.tail()) # 最後の数行を表示


# 6. CSVファイルへの保存
if df is not None:
    try:
        print(f"\nデータを {file_path} に保存中...")
        df.to_csv(file_path, index=False)
        print(f"データを {file_path} に保存しました。")
        # ファイルサイズの確認 (目安)
        file_size = os.path.getsize(file_path) / (1024 * 1024) # MB単位
        print(f"ファイルサイズ: {file_size:.2f} MB")
    except Exception as e:
        print(f"\nCSVファイルへの保存中にエラーが発生しました: {e}")

# 7. CSVファイルの読み込みと確認 (数値確認)
if os.path.exists(file_path):
    print(f"\n--- 保存したCSVファイルの確認 ({file_path}) ---")
    try:
        # データ量が大きい場合、読み込みにも時間がかかる可能性あり
        print("CSVファイルを読み込み中...")
        df_loaded = pd.read_csv(file_path, index_col='datetime', parse_dates=True)
        print("読み込み完了。")

        print("\n--- データ概要 ---")
        print(f"行数: {df_loaded.shape[0]}, 列数: {df_loaded.shape[1]}")
        print(f"期間: {df_loaded.index.min()} から {df_loaded.index.max()}") # 実際に取得できた期間

        print("\n--- データ型と欠損値 ---")
        df_loaded.info(memory_usage='deep') # メモリ使用量も表示してみる
        print(f"\n欠損値の合計:\n{df_loaded.isnull().sum()}")

        # head/tailは既に変換時に表示しているので省略可
        # print("\n--- 最初の5行 ---")
        # print(df_loaded.head())
        # print("\n--- 最後の5行 ---")
        # print(df_loaded.tail())

        print("\n--- 基本統計量 (一部) ---")
        print(df_loaded[['close', 'volume']].describe()) # 全カラム出すと多いので絞る

    except Exception as e:
        print(f"\nCSVファイルの読み込みまたは確認中にエラーが発生しました: {e}")
else:
    print(f"\nファイル {file_path} が見つかりません。データ取得または保存に失敗した可能性があります。")

  - callback(msg, \*args, \*\*kwargs)
  - callback(data, status, \*args, \*\*kwargs)


ライブラリをインポートしました。
MetaTrader5 version: 5.0.4874
pandas version: 2.2.3
backtrader version: 1.9.78.123

--- データ取得設定 ---
通貨ペア: USDJPYs
時間軸: M10
取得開始 (指定): 2000-01-01 00:00:00 (可能な限り過去から取得)
取得終了 (UTC): 2025-04-18 18:59:46.073571
保存ファイル名: USDJPYs_M10_data.csv

MT5ターミナルに接続しました。
データを取得中... (データ量が多い場合、時間がかかることがあります)
MT5ターミナルとの接続を終了しました。
85583 件のデータを取得しました。

DataFrameへの変換とカラム調整が完了しました。
データの一部:
             datetime     open     high      low    close  volume  \
0 2023-01-02 01:00:00  130.933  130.953  130.818  130.822      71   
1 2023-01-02 01:10:00  130.822  130.842  130.739  130.802      59   
2 2023-01-02 01:20:00  130.802  130.812  130.762  130.765      39   
3 2023-01-02 01:30:00  130.767  130.813  130.761  130.764      29   
4 2023-01-02 01:40:00  130.764  130.779  130.762  130.779      14   

   openinterest  
0            -1  
1            -1  
2            -1  
3            -1  
4            -1  
...
                 datetime     open     high      low    close  volume  \
85578 2025-04

  utc_to = datetime.utcnow() # 現在時刻 (UTC)


データを USDJPYs_M10_data.csv に保存しました。
ファイルサイズ: 4.87 MB

--- 保存したCSVファイルの確認 (USDJPYs_M10_data.csv) ---
CSVファイルを読み込み中...
読み込み完了。

--- データ概要 ---
行数: 85583, 列数: 6
期間: 2023-01-02 01:00:00 から 2025-04-18 09:50:00

--- データ型と欠損値 ---
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 85583 entries, 2023-01-02 01:00:00 to 2025-04-18 09:50:00
Data columns (total 6 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   open          85583 non-null  float64
 1   high          85583 non-null  float64
 2   low           85583 non-null  float64
 3   close         85583 non-null  float64
 4   volume        85583 non-null  int64  
 5   openinterest  85583 non-null  int64  
dtypes: float64(4), int64(2)
memory usage: 4.6 MB

欠損値の合計:
open            0
high            0
low             0
close           0
volume          0
openinterest    0
dtype: int64

--- 基本統計量 (一部) ---
              close        volume
count  85583.000000  85583.000000
mean     146.669278    5

# ステップ 2: Backtrader基本構造の作成とデータ読み込み


In [2]:
# ステップ 2: Backtrader基本構造の作成とデータ読み込み

# 1. ライブラリのインポート
import backtrader as bt
import pandas as pd
import os

print("ライブラリをインポートしました。")
print(f"backtrader version: {bt.__version__}")
print(f"pandas version: {pd.__version__}")

# 2. CSVファイルのパス指定
# ステップ1で保存したファイル名を指定してください
csv_file_path = 'USDJPYs_M10_data.csv'

# ファイルが存在するか確認
if not os.path.exists(csv_file_path):
    print(f"エラー: ファイルが見つかりません - {csv_file_path}")
    # ここで処理を中断するか、前のステップに戻る必要があることを示す
    # raise FileNotFoundError(f"データファイルが見つかりません: {csv_file_path}")
else:
    print(f"データファイル '{csv_file_path}' を使用します。")

    # 3. データフィードの作成
    # CSVファイルをPandasで読み込み (datetimeをインデックスにし、パースする)
    dataframe = pd.read_csv(
        csv_file_path,
        index_col='datetime',
        parse_dates=True
        )

    # Backtrader用のデータフィードを作成
    # カラム名はステップ1で調整済みのはず (datetime, open, high, low, close, volume, openinterest)
    data_feed = bt.feeds.PandasData(
        dataname=dataframe,
        name="USDJPYs_M10" # データフィードに名前をつける (オプション)
        )
    print("Backtrader用データフィードを作成しました。")

    # 4. シンプルなStrategyクラスの定義 (まだ取引しない)
    class MyFirstStrategy(bt.Strategy):
        def log(self, txt, dt=None):
            ''' このストラテジー用のログ関数 '''
            dt = dt or self.datas[0].datetime.date(0) # データフィードの日付を取得
            print(f'{dt.isoformat()}, {txt}')

        def __init__(self):
            # 最初のデータポイントのclose価格を保持 (動作確認用)
            self.dataclose = self.datas[0].close
            self.log('MyFirstStrategy Initialized')

        def next(self):
            # このメソッドは各バーで呼び出される
            # self.log(f'Close Price: {self.dataclose[0]:.3f}') # 毎バー出力すると大量になるのでコメントアウト
            # まだ取引ロジックは実装しない
            pass

        def stop(self):
            # バックテスト終了時に呼び出される
            self.log('MyFirstStrategy Finished')

    print("シンプルなStrategyクラスを定義しました。")

    # 5. Cerebroインスタンスの作成
    cerebro = bt.Cerebro()
    print("Cerebroエンジンを作成しました。")

    # 6. 初期資金の設定
    initial_cash = 1000000.0 # 例: 100万円
    cerebro.broker.setcash(initial_cash)
    print(f"初期資金を {initial_cash:,.0f} に設定しました。")

    # 7. データフィードの追加
    cerebro.adddata(data_feed)
    print("データフィードをCerebroに追加しました。")

    # 8. Strategyの追加
    cerebro.addstrategy(MyFirstStrategy)
    print("StrategyをCerebroに追加しました。")

    # 9. バックテストの実行
    print("\nバックテストを実行します...")
    # 実行前のポートフォリオ価値を表示
    print(f"実行前のポートフォリオ価値: {cerebro.broker.getvalue():,.0f}")

    results = cerebro.run() # バックテストを実行

    # 10. 結果の数値確認
    print("\nバックテストが完了しました。")
    # 実行後のポートフォリオ価値を表示
    final_value = cerebro.broker.getvalue()
    print(f"実行後のポートフォリオ価値: {final_value:,.0f}")

    print("\n--- 数値確認 ---")
    print(f"初期資金 (確認): {initial_cash:,.0f}")
    print(f"最終資金 (確認): {final_value:,.0f}")

    # 最終資金が初期資金と同じであることを確認 (まだ取引していないため)
    if abs(final_value - initial_cash) < 0.01: # 浮動小数点誤差を考慮
        print("最終資金は初期資金と同じです（正常）。")
    else:
        print("警告: 最終資金が初期資金と異なります。予期せぬ動作の可能性があります。")

ライブラリをインポートしました。
backtrader version: 1.9.78.123
pandas version: 2.2.3
データファイル 'USDJPYs_M10_data.csv' を使用します。
Backtrader用データフィードを作成しました。
シンプルなStrategyクラスを定義しました。
Cerebroエンジンを作成しました。
初期資金を 1,000,000 に設定しました。
データフィードをCerebroに追加しました。
StrategyをCerebroに追加しました。

バックテストを実行します...
実行前のポートフォリオ価値: 1,000,000
2025-04-18, MyFirstStrategy Initialized
2025-04-18, MyFirstStrategy Finished

バックテストが完了しました。
実行後のポートフォリオ価値: 1,000,000

--- 数値確認 ---
初期資金 (確認): 1,000,000
最終資金 (確認): 1,000,000
最終資金は初期資金と同じです（正常）。


# ステップ 3: 単純な売買戦略の実装（固定ロット）


In [3]:
# ステップ 3: 単純な売買戦略の実装（固定ロット）

# 1. ライブラリのインポート (再掲)
import backtrader as bt
import pandas as pd
import os

print("ライブラリをインポートしました。")

# 2. CSVファイルのパス指定 (再掲)
csv_file_path = 'USDJPYs_M10_data.csv'

if not os.path.exists(csv_file_path):
    print(f"エラー: ファイルが見つかりません - {csv_file_path}")
else:
    print(f"データファイル '{csv_file_path}' を使用します。")

    # --- データフィード作成 (ステップ2と同様) ---
    dataframe = pd.read_csv(csv_file_path, index_col='datetime', parse_dates=True)
    data_feed = bt.feeds.PandasData(dataname=dataframe, name="USDJPYs_M10")
    print("データフィードを作成しました。")

    # --- 単純な売買戦略クラス ---
    class SmaCrossStrategy(bt.Strategy):
        # パラメータ: 短期/長期SMAの期間、固定ロットサイズ
        params = (
            ('sma_short_period', 10), # 短期SMA期間 (例: 10期間)
            ('sma_long_period', 30),  # 長期SMA期間 (例: 30期間)
            ('fixed_size', 0.1),    # 固定ロットサイズ (例: 0.1ロット)
        )

        def log(self, txt, dt=None):
            dt = dt or self.datas[0].datetime.date(0)
            print(f'{dt.isoformat()}, {txt}')

        def __init__(self):
            self.log(f'Strategy Parameters: SMA Short={self.params.sma_short_period}, '
                     f'SMA Long={self.params.sma_long_period}, Fixed Size={self.params.fixed_size}')

            # 短期と長期の単純移動平均線 (SMA) を計算
            self.sma_short = bt.indicators.SimpleMovingAverage(
                self.datas[0].close, period=self.params.sma_short_period)
            self.sma_long = bt.indicators.SimpleMovingAverage(
                self.datas[0].close, period=self.params.sma_long_period)

            # クロスオーバーシグナルを計算 (-1: デッドクロス, +1: ゴールデンクロス)
            self.crossover = bt.indicators.CrossOver(self.sma_short, self.sma_long)

            self.order = None # アクティブな注文を保持する変数

            self.log('SmaCrossStrategy Initialized')


        def notify_order(self, order):
             # オーダー状態の通知（デバッグ用）
             if order.status in [order.Submitted, order.Accepted]:
                 # オーダーが送信/受理された -> 何もしない
                 return

             # オーダーが完了したかチェック
             if order.status in [order.Completed]:
                 if order.isbuy():
                     self.log(f'BUY EXECUTED, Price: {order.executed.price:.3f}, Size: {order.executed.size:.2f}, Cost: {order.executed.value:.2f}, Comm: {order.executed.comm:.2f}')
                 elif order.issell():
                     self.log(f'SELL EXECUTED, Price: {order.executed.price:.3f}, Size: {order.executed.size:.2f}, Cost: {order.executed.value:.2f}, Comm: {order.executed.comm:.2f}')
                 self.bar_executed = len(self) # 実行されたバーのインデックス

             elif order.status in [order.Canceled, order.Margin, order.Rejected]:
                 self.log(f'Order Canceled/Margin/Rejected - Status: {order.getstatusname()}')

             # 他のアクティブなオーダーがないことを確認
             self.order = None

        def notify_trade(self, trade):
            # 取引完了の通知（デバッグ用）
            if not trade.isclosed:
                return
            self.log(f'OPERATION PROFIT, GROSS: {trade.pnl:.2f}, NET: {trade.pnlcomm:.2f}')


        def next(self):
            # self.log(f'Close Price: {self.data.close[0]:.3f}') # デバッグ用

            # ポジションがない場合のみエントリーを検討
            if not self.position:
                # ゴールデンクロスが発生した場合 (crossoverが+1)
                if self.crossover > 0:
                    self.log(f'BUY CREATE, Price: {self.data.close[0]:.3f}')
                    # 買い注文を出す
                    self.order = self.buy(size=self.params.fixed_size)

                # デッドクロスが発生した場合 (crossoverが-1)
                # ※売りからも入る場合は以下のコメントを外す
                # elif self.crossover < 0:
                #     self.log(f'SELL CREATE, Price: {self.data.close[0]:.3f}')
                #     # 売り注文を出す
                #     self.order = self.sell(size=self.params.fixed_size)

            # ポジションがある場合
            else:
                # 買いポジションを持っている状態でデッドクロスが発生した場合
                if self.position.size > 0 and self.crossover < 0:
                    self.log(f'CLOSE CREATE (From Buy), Price: {self.data.close[0]:.3f}')
                    # ポジションをクローズする
                    self.order = self.close()

                # 売りポジションを持っている状態でゴールデンクロスが発生した場合
                # ※売りからも入る場合は以下のコメントを外す
                # elif self.position.size < 0 and self.crossover > 0:
                #     self.log(f'CLOSE CREATE (From Sell), Price: {self.data.close[0]:.3f}')
                #     # ポジションをクローズする
                #     self.order = self.close()

        def stop(self):
             self.log('SmaCrossStrategy Finished')

    print("単純な売買戦略 (SmaCrossStrategy) クラスを定義しました。")

    # --- Cerebroの設定 ---
    cerebro = bt.Cerebro()
    initial_cash = 1000000.0
    cerebro.broker.setcash(initial_cash)

    # データフィードとStrategyを追加
    cerebro.adddata(data_feed)
    cerebro.addstrategy(SmaCrossStrategy, sma_short_period=12, sma_long_period=26, fixed_size=0.1) # パラメータを指定

    # --- アナライザーの追加 ---
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='tradeanalyzer')
    cerebro.addanalyzer(bt.analyzers.Transactions, _name='transactions')
    print("アナライザー (TradeAnalyzer, Transactions) を追加しました。")

    # --- ブローカー手数料の設定 (オプション) ---
    # 例: 1ロットあたり往復$7の手数料、レバレッジ100倍、証拠金通貨JPYの場合
    # cerebro.broker.setcommission(commission=7.0, margin=None, mult=100000) # 正確な設定は要確認
    # print("ブローカー手数料を設定しました (例)。")


    # --- バックテストの実行 ---
    print("\nバックテストを実行します...")
    print(f"初期資金: {cerebro.broker.getvalue():,.0f}")
    results = cerebro.run()
    strat = results[0] # 結果からStrategyインスタンスを取得
    print("\nバックテストが完了しました。")

    # --- 結果の数値確認 ---
    print("\n--- 数値確認 (単純戦略・固定ロット) ---")
    final_value = cerebro.broker.getvalue()
    print(f"最終資金: {final_value:,.2f}") # 小数点以下も表示

    # アナライザーから結果を取得
    try:
        transactions_analysis = strat.analyzers.transactions.get_analysis()
        # 取引履歴があるか確認 (transactions_analysis は OrderedDict)
        total_trades = 0
        if transactions_analysis:
             # 取引履歴は 日時 -> [ [size, price, value], ... ] のようなリストのリストになっている
             # ここでは単純にキー(日時)の数でなく、実際のトランザクション数を数える方が正確だが、
             # 簡単のため、ここでは完了したトレード数で代用する (TradeAnalyzerを使う)
             pass # TradeAnalyzerで見るのでここでは数えない

    except KeyError:
        print("Transactions アナライザーの結果がありません。")

    try:
        trade_analysis = strat.analyzers.tradeanalyzer.get_analysis()
        if trade_analysis and hasattr(trade_analysis, 'total') and trade_analysis.total.closed > 0:
            total_closed_trades = trade_analysis.total.closed
            total_pnl = trade_analysis.pnl.net.total
            print(f"完了した総取引回数: {total_closed_trades}")
            print(f"総損益 (Net PnL): {total_pnl:.2f}")
        else:
             print("完了した取引がありませんでした。")

    except KeyError:
         print("TradeAnalyzer の結果がありません。")
    except AttributeError:
         print("TradeAnalyzer の分析結果の構造が予期したものと異なります。")

ライブラリをインポートしました。
データファイル 'USDJPYs_M10_data.csv' を使用します。
データフィードを作成しました。
単純な売買戦略 (SmaCrossStrategy) クラスを定義しました。
アナライザー (TradeAnalyzer, Transactions) を追加しました。

バックテストを実行します...
初期資金: 1,000,000
2025-04-18, Strategy Parameters: SMA Short=12, SMA Long=26, Fixed Size=0.1
2025-04-18, SmaCrossStrategy Initialized
2023-01-02, BUY CREATE, Price: 130.927
2023-01-02, BUY EXECUTED, Price: 130.927, Size: 0.10, Cost: 13.09, Comm: 0.00
2023-01-02, CLOSE CREATE (From Buy), Price: 130.876
2023-01-02, SELL EXECUTED, Price: 130.872, Size: -0.10, Cost: 13.09, Comm: 0.00
2023-01-02, OPERATION PROFIT, GROSS: -0.01, NET: -0.01
2023-01-02, BUY CREATE, Price: 130.773
2023-01-02, BUY EXECUTED, Price: 130.759, Size: 0.10, Cost: 13.08, Comm: 0.00
2023-01-02, CLOSE CREATE (From Buy), Price: 130.719
2023-01-02, SELL EXECUTED, Price: 130.719, Size: -0.10, Cost: 13.08, Comm: 0.00
2023-01-02, OPERATION PROFIT, GROSS: -0.00, NET: -0.00
2023-01-03, BUY CREATE, Price: 130.961
2023-01-03, BUY EXECUTED, Price: 130.966, Size:

# ステップ 4: 分解モンテカルロ法のロジック実装 (単体テスト)


In [4]:
# ステップ 4: 分解モンテカルロ法のロジック実装 (単体テスト)

import math # 分解ルールでの切り捨て/切り上げ用に使うかも (今回は整数除算で対応)

class DecompositionMonteCarloLogic:
    """
    分解モンテカルロ法のロジックを管理するクラス。
    Backtraderから独立してテスト可能。
    """
    def __init__(self):
        self.sequence = [] # 数列リスト
        self.reset_cycle() # 初期化時に最初のサイクルを開始

    def reset_cycle(self):
        """数列を初期状態 [0, 1] にリセットする"""
        self.sequence = [0, 1]
        # print("DEBUG: Cycle Reset ->", self.sequence) # デバッグ用

    def get_unit_size(self) -> int:
        """次の取引の単位ロット数を計算する"""
        if len(self.sequence) >= 2:
            return self.sequence[0] + self.sequence[-1]
        elif len(self.sequence) == 1:
            return self.sequence[0]
        else: # 数列が空 (サイクル完了直後など)
            return 0

    def _apply_decomposition(self):
        """数列が1つになった場合に分解ルールを適用する (内部メソッド)"""
        if len(self.sequence) == 1:
            val = self.sequence[0]
            if val > 1:
                # 分解ルール: 例として整数除算とその残りを使う
                # 例: 2 -> [1, 1], 3 -> [1, 2], 4 -> [2, 2], 5 -> [2, 3]
                half1 = val // 2
                half2 = val - half1
                self.sequence = [half1, half2]
                # print(f"DEBUG: Decomposition Applied: {val} -> {self.sequence}") # デバッグ用
            # elif val == 1:
                # 1の場合は分解しない (ルールに基づき)
                # print(f"DEBUG: Decomposition Skipped for value 1")
                pass


    def update_sequence(self, is_win: bool, traded_unit_size: int):
        """取引結果に基づいて数列を更新する"""
        if self.is_cycle_complete():
            print("警告: サイクル完了後にupdate_sequenceが呼ばれました。")
            return

        if is_win:
            # 勝ちの場合
            if len(self.sequence) >= 2:
                self.sequence.pop(0)  # 左端を削除
                self.sequence.pop(-1) # 右端を削除
            elif len(self.sequence) == 1:
                # 最後の1つで勝った場合
                if traded_unit_size == self.sequence[0]:
                     self.sequence.pop(0) # 残りの要素を削除
                else:
                    # 通常ありえないが、念のためログ出し＆削除
                    print(f"警告: 数列[1]の時に想定外ロット({traded_unit_size} vs {self.sequence[0]})で勝利。要素を削除します。")
                    self.sequence.pop(0)
            # print(f"DEBUG: Win update -> {self.sequence}") # デバッグ用

        else:
            # 負けの場合
            # 取引した単位ロット数を右端に追加
            self.sequence.append(traded_unit_size)
            # print(f"DEBUG: Lose update -> {self.sequence}") # デバッグ用

        # 勝ち負け処理後に分解ルールを適用
        self._apply_decomposition()

        # サイクル完了チェック (完了したらリセットするかは呼び出し元で判断)
        if self.is_cycle_complete():
            # print("DEBUG: Cycle now complete.") # デバッグ用
            pass


    def is_cycle_complete(self) -> bool:
        """現在のサイクルが完了したか（数列が空か）を返す"""
        return len(self.sequence) == 0

# --- 単体テストの実行 ---
print("--- 分解モンテカルロ法 ロジック単体テスト ---")

# ヘルパー関数でテストを見やすく
def run_test_step(logic, step_name, is_win):
    print(f"\n{step_name}:")
    print(f"  Sequence Before: {logic.sequence}")
    unit_size = logic.get_unit_size()
    print(f"  Calculated Unit Size: {unit_size}")
    if unit_size == 0 and not logic.is_cycle_complete():
         print("  (Cannot trade, unit size is 0)")
         return

    print(f"  Action: {'Win' if is_win else 'Lose'} with {unit_size} units")
    logic.update_sequence(is_win, unit_size)
    print(f"  Sequence After : {logic.sequence}")
    if logic.is_cycle_complete():
        print("  Cycle Complete!")
    else:
         next_size = logic.get_unit_size()
         print(f"  Next Unit Size: {next_size}")


# テストケース1: 最短サイクル (勝ち)
print("\n=== Test Case 1: Shortest Cycle (Win) ===")
logic1 = DecompositionMonteCarloLogic()
run_test_step(logic1, "Step 1", is_win=True)

# テストケース2: 簡単なサイクル (負け -> 勝ち -> 勝ち)
print("\n=== Test Case 2: Simple Cycle (Lose -> Win -> Win) ===")
logic2 = DecompositionMonteCarloLogic()
run_test_step(logic2, "Step 1", is_win=False)
run_test_step(logic2, "Step 2", is_win=True)
run_test_step(logic2, "Step 3", is_win=True)

# テストケース3: 分解ルール (2 -> 1,1) が発生するケース (負け -> 負け -> 勝ち -> 勝ち)
# 資料の例: [0,1] L1 -> [0,1,1] L1 -> [0,1,1,1] W2 -> [1,1] W2 -> []
print("\n=== Test Case 3: Decomposition Rule (2 -> 1,1) ===")
logic3 = DecompositionMonteCarloLogic()
run_test_step(logic3, "Step 1", is_win=False) # L1 -> [0, 1, 1]
run_test_step(logic3, "Step 2", is_win=False) # L1 -> [0, 1, 1, 1]
run_test_step(logic3, "Step 3", is_win=True)  # W2 (0+1) -> [1, 1] (ここで分解は発生しないはず)
run_test_step(logic3, "Step 4", is_win=True)  # W2 (1+1) -> []

# テストケース4: 分解ルール (3 -> 1,2) が発生するケース (負け -> 負け -> 負け -> 勝ち -> 勝ち -> 勝ち)
# [0,1] L1->[0,1,1] L1->[0,1,1,1] L1->[0,1,1,1,1] W1(0+1)->[1,1,1] W2(1+1)->[1] 分解->[0,1]? or [1]->[1]? -> W1(1) -> []
# 分解ルール (3->[1,2]採用): [0,1] L1->[0,1,1] L1->[0,1,1,1] L1->[0,1,1,1,1] W1->[1,1,1] W2->[1] 分解->[0,1]? No. [1]のままのはず
#                                                                                       W2->[1]
# 再考: [1,1,1] で W2(1+1) -> [1] が残る -> 分解ルールは適用されない(値が1のため)
# 次のトレード: get_unit_size=1 -> W1 -> [] で完了
print("\n=== Test Case 4: Decomposition Rule (Testing with 1 left) ===")
logic4 = DecompositionMonteCarloLogic()
run_test_step(logic4, "Step 1", is_win=False) # L1 -> [0, 1, 1]
run_test_step(logic4, "Step 2", is_win=False) # L1 -> [0, 1, 1, 1]
run_test_step(logic4, "Step 3", is_win=False) # L1 -> [0, 1, 1, 1, 1]
run_test_step(logic4, "Step 4", is_win=True)  # W1 (0+1) -> [1, 1, 1]
run_test_step(logic4, "Step 5", is_win=True)  # W2 (1+1) -> [1]
run_test_step(logic4, "Step 6", is_win=True)  # W1 (1) -> []

# テストケース5: 連敗が続く場合
print("\n=== Test Case 5: Consecutive Losses ===")
logic5 = DecompositionMonteCarloLogic()
run_test_step(logic5, "Step 1", is_win=False) # L1 -> [0, 1, 1]
run_test_step(logic5, "Step 2", is_win=False) # L1 -> [0, 1, 1, 1]
run_test_step(logic5, "Step 3", is_win=False) # L1 -> [0, 1, 1, 1, 1]
run_test_step(logic5, "Step 4", is_win=False) # L1 -> [0, 1, 1, 1, 1, 1]
run_test_step(logic5, "Step 5", is_win=False) # L1 -> [0, 1, 1, 1, 1, 1, 1]
# ... ここから勝ち始めたらどうなるか？
run_test_step(logic5, "Step 6", is_win=True)  # W1 (0+1) -> [1, 1, 1, 1, 1]
run_test_step(logic5, "Step 7", is_win=True)  # W2 (1+1) -> [1, 1, 1]
run_test_step(logic5, "Step 8", is_win=True)  # W2 (1+1) -> [1]
run_test_step(logic5, "Step 9", is_win=True)  # W1 (1) -> []

# テストケース6: 分解(値が大きい場合) 5 -> [2, 3]
# [0,1] L1 -> [0,1,1] L1 -> [0,1,1,1] L2(0+1) -> [0,1,1,1,2] W2(0+2)->[1,1,1] W2(1+1)->[1] W1->[] (これは分解しない)
# [0,1] L1 -> [0,1,1] L1 -> [0,1,1,1] W1(0+1) -> [1,1] W2(1+1) -> [] (これも分解しない)
# どうすれば大きな数が残るか？ 勝ち負けを繰り返す？
# [0,1] L1 -> [0,1,1] W1(0+1) -> [1] W1(1) -> []
# [0,1] L1 -> [0,1,1] L1 -> [0,1,1,1] L2(0+1) -> [0,1,1,1,2] L2(0+2) -> [0,1,1,1,2,2] W2(0+2)->[1,1,1,2] W3(1+2)->[1,1] W2(1+1)->[]
print("\n=== Test Case 6: Decomposition Rule (Larger number, e.g., 3->[1,2]) ===")
logic6 = DecompositionMonteCarloLogic()
run_test_step(logic6, "Step 1", is_win=False) # L1 -> [0, 1, 1]
run_test_step(logic6, "Step 2", is_win=False) # L1 -> [0, 1, 1, 1]
run_test_step(logic6, "Step 3", is_win=False) # L2 (0+1=1) -> [0, 1, 1, 1, 1] # ここまで Test Case 4 と同じ
run_test_step(logic6, "Step 4", is_win=False) # L2 (0+1=1) -> [0, 1, 1, 1, 1, 1] # Test Case 5 と同じ
run_test_step(logic6, "Step 5", is_win=False) # L2 (0+1=1) -> [0, 1, 1, 1, 1, 1, 1]
# ここで大きな単位ロットで負ける場合 (例: 3単位)
logic6.sequence = [1, 1, 1] # 強制的に状態変更
run_test_step(logic6, "Step 6", is_win=False) # L2 (1+1=2) -> [1, 1, 1, 2]
run_test_step(logic6, "Step 7", is_win=False) # L3 (1+2=3) -> [1, 1, 1, 2, 3]
# ここから勝つ
run_test_step(logic6, "Step 8", is_win=True) # W4 (1+3=4) -> [1, 1, 2]
run_test_step(logic6, "Step 9", is_win=True) # W3 (1+2=3) -> [1]
run_test_step(logic6, "Step 10", is_win=True) # W1 (1) -> []

--- 分解モンテカルロ法 ロジック単体テスト ---

=== Test Case 1: Shortest Cycle (Win) ===

Step 1:
  Sequence Before: [0, 1]
  Calculated Unit Size: 1
  Action: Win with 1 units
  Sequence After : []
  Cycle Complete!

=== Test Case 2: Simple Cycle (Lose -> Win -> Win) ===

Step 1:
  Sequence Before: [0, 1]
  Calculated Unit Size: 1
  Action: Lose with 1 units
  Sequence After : [0, 1, 1]
  Next Unit Size: 1

Step 2:
  Sequence Before: [0, 1, 1]
  Calculated Unit Size: 1
  Action: Win with 1 units
  Sequence After : [1]
  Next Unit Size: 1

Step 3:
  Sequence Before: [1]
  Calculated Unit Size: 1
  Action: Win with 1 units
  Sequence After : []
  Cycle Complete!

=== Test Case 3: Decomposition Rule (2 -> 1,1) ===

Step 1:
  Sequence Before: [0, 1]
  Calculated Unit Size: 1
  Action: Lose with 1 units
  Sequence After : [0, 1, 1]
  Next Unit Size: 1

Step 2:
  Sequence Before: [0, 1, 1]
  Calculated Unit Size: 1
  Action: Lose with 1 units
  Sequence After : [0, 1, 1, 1]
  Next Unit Size: 1

Step 3:
  Seq

In [5]:
# --- 新しいテストケース ---
print("\n=== Test Case 7: 2 Lose -> 1 Win -> 3 Lose -> 3 Win ===")
logic7 = DecompositionMonteCarloLogic()
# 2 Lose
run_test_step(logic7, "Step 1 (L1)", is_win=False)
run_test_step(logic7, "Step 2 (L2)", is_win=False)
# 1 Win
run_test_step(logic7, "Step 3 (W1)", is_win=True)
# 3 Lose
run_test_step(logic7, "Step 4 (L3)", is_win=False)
run_test_step(logic7, "Step 5 (L4)", is_win=False)
run_test_step(logic7, "Step 6 (L5)", is_win=False)
# 3 Win
run_test_step(logic7, "Step 7 (W1)", is_win=True)
run_test_step(logic7, "Step 8 (W2)", is_win=True) # ここで分解が発生するはず
run_test_step(logic7, "Step 9 (W3)", is_win=True)


=== Test Case 7: 2 Lose -> 1 Win -> 3 Lose -> 3 Win ===

Step 1 (L1):
  Sequence Before: [0, 1]
  Calculated Unit Size: 1
  Action: Lose with 1 units
  Sequence After : [0, 1, 1]
  Next Unit Size: 1

Step 2 (L2):
  Sequence Before: [0, 1, 1]
  Calculated Unit Size: 1
  Action: Lose with 1 units
  Sequence After : [0, 1, 1, 1]
  Next Unit Size: 1

Step 3 (W1):
  Sequence Before: [0, 1, 1, 1]
  Calculated Unit Size: 1
  Action: Win with 1 units
  Sequence After : [1, 1]
  Next Unit Size: 2

Step 4 (L3):
  Sequence Before: [1, 1]
  Calculated Unit Size: 2
  Action: Lose with 2 units
  Sequence After : [1, 1, 2]
  Next Unit Size: 3

Step 5 (L4):
  Sequence Before: [1, 1, 2]
  Calculated Unit Size: 3
  Action: Lose with 3 units
  Sequence After : [1, 1, 2, 3]
  Next Unit Size: 4

Step 6 (L5):
  Sequence Before: [1, 1, 2, 3]
  Calculated Unit Size: 4
  Action: Lose with 4 units
  Sequence After : [1, 1, 2, 3, 4]
  Next Unit Size: 5

Step 7 (W1):
  Sequence Before: [1, 1, 2, 3, 4]
  Calculat

# ステップ 5: 分解モンテカルロ法をBacktrader Strategyに統合


In [6]:
# ステップ 5: 分解モンテカルロ法をBacktrader Strategyに統合

# 1. ライブラリのインポート
import backtrader as bt
import pandas as pd
import os
import math # ステップ4から流用

# 2. CSVファイルのパス指定
csv_file_path = 'USDJPYs_M10_data.csv' # ステップ1で作成したファイル

if not os.path.exists(csv_file_path):
    print(f"エラー: ファイルが見つかりません - {csv_file_path}")
else:
    print(f"データファイル '{csv_file_path}' を使用します。")

    # 3. 分解モンテカルロ法ロジッククラス (ステップ4で作成・テストしたもの)
    class DecompositionMonteCarloLogic:
        """分解モンテカルロ法のロジックを管理するクラス"""
        def __init__(self):
            self.sequence = []
            self.reset_cycle()

        def reset_cycle(self):
            self.sequence = [0, 1]
            # print("DEBUG: Cycle Reset ->", self.sequence)

        def get_unit_size(self) -> int:
            if len(self.sequence) >= 2:
                return self.sequence[0] + self.sequence[-1]
            elif len(self.sequence) == 1:
                return self.sequence[0]
            else:
                return 0

        def _apply_decomposition(self):
            if len(self.sequence) == 1:
                val = self.sequence[0]
                if val > 1:
                    half1 = val // 2
                    half2 = val - half1
                    self.sequence = [half1, half2]
                    # print(f"DEBUG: Decomposition Applied: {val} -> {self.sequence}")
                # else: # val == 1
                    # print(f"DEBUG: Decomposition Skipped for value 1")

        def update_sequence(self, is_win: bool, traded_unit_size: int):
            if self.is_cycle_complete():
                # print("警告: サイクル完了後にupdate_sequenceが呼ばれました。") # 必要ならログ出力
                return

            if is_win:
                if len(self.sequence) >= 2:
                    self.sequence.pop(0)
                    self.sequence.pop(-1)
                elif len(self.sequence) == 1:
                    if traded_unit_size == self.sequence[0]:
                         self.sequence.pop(0)
                    else:
                        print(f"警告: 数列[1]の時に想定外ロット({traded_unit_size} vs {self.sequence[0]})で勝利。要素を削除します。")
                        self.sequence.pop(0)
                # print(f"DEBUG: Win update -> {self.sequence}")
            else: # 負け
                self.sequence.append(traded_unit_size)
                # print(f"DEBUG: Lose update -> {self.sequence}")

            self._apply_decomposition()

            # if self.is_cycle_complete():
                # print("DEBUG: Cycle now complete.")

        def is_cycle_complete(self) -> bool:
            return len(self.sequence) == 0

    print("分解モンテカルロ法ロジッククラス (DecompositionMonteCarloLogic) を定義しました。")

    # --- データフィード作成 ---
    dataframe = pd.read_csv(csv_file_path, index_col='datetime', parse_dates=True)
    data_feed = bt.feeds.PandasData(dataname=dataframe, name="USDJPYs_M10")
    print("データフィードを作成しました。")

    # --- 分解モンテカルロ法を組み込んだStrategyクラス ---
    class MonteCarloSmaCrossStrategy(bt.Strategy):
        params = (
            ('unit_lot', 0.01),       # 1単位あたりのロット数 (重要!)
            ('sma_short_period', 12), # 短期SMA期間 (ステップ3と同じ)
            ('sma_long_period', 26),  # 長期SMA期間 (ステップ3と同じ)
            # ('debug_log', True),      # 詳細ログ出力フラグ (オプション)
        )

        def log(self, txt, dt=None):
            dt = dt or self.datas[0].datetime.date(0)
            print(f'{dt.isoformat()}, {txt}')

        def __init__(self):
            self.log(f'Strategy Parameters: Unit Lot={self.params.unit_lot}, '
                     f'SMA Short={self.params.sma_short_period}, SMA Long={self.params.sma_long_period}')

            # 分解モンテカルロ法のロジックインスタンスを生成
            self.sizer_logic = DecompositionMonteCarloLogic()
            self.log(f'Initial Monte Carlo Sequence: {self.sizer_logic.sequence}')

            # インジケーター定義 (ステップ3と同様)
            self.sma_short = bt.indicators.SimpleMovingAverage(
                self.datas[0].close, period=self.params.sma_short_period)
            self.sma_long = bt.indicators.SimpleMovingAverage(
                self.datas[0].close, period=self.params.sma_long_period)
            self.crossover = bt.indicators.CrossOver(self.sma_short, self.sma_long)

            self.order = None
            self.current_trade_units = 0 # 現在の取引単位数を保持 (負け時用)

            self.log('MonteCarloSmaCrossStrategy Initialized')

        def notify_order(self, order):
             if order.status in [order.Submitted, order.Accepted]:
                 return
             if order.status in [order.Completed]:
                 if order.isbuy():
                     self.log(f'BUY EXECUTED, Price: {order.executed.price:.3f}, Size: {order.executed.size:.2f}')
                 elif order.issell():
                     self.log(f'SELL EXECUTED, Price: {order.executed.price:.3f}, Size: {order.executed.size:.2f}')
             elif order.status in [order.Canceled, order.Margin, order.Rejected]:
                 self.log(f'Order Canceled/Margin/Rejected - Status: {order.getstatusname()}')
             self.order = None # オーダー完了

        def notify_trade(self, trade):
            if trade.isclosed:
                is_win = trade.pnl > 0
                # 単位ロット数を取得 (エントリー時に保存したものを使用)
                traded_unit_size = self.current_trade_units

                self.log(f'Trade Closed: PNL={trade.pnl:.2f}, Win={is_win}, '
                         f'Traded Units={traded_unit_size}, Seq Before: {self.sizer_logic.sequence}')

                # 分解モンテカルロ法の数列を更新
                self.sizer_logic.update_sequence(is_win, traded_unit_size)

                self.log(f'                                         Seq After: {self.sizer_logic.sequence}')

                if self.sizer_logic.is_cycle_complete():
                    self.log("Cycle Complete! Resetting sequence.")
                    self.sizer_logic.reset_cycle() # 次のサイクルのためにリセット

                self.current_trade_units = 0 # 使用済みなのでリセット

        def next(self):
            # self.log(f'Close: {self.data.close[0]:.3f}, Pos: {self.position.size}') # デバッグ用

            # オーダー実行中 or ポジションがある場合は何もしない（シンプルなロジック）
            if self.order:
                return
            if self.position:
                 # ポジションクローズのロジック (ステップ3と同様)
                if self.position.size > 0 and self.crossover < 0: # 買いポジション中にデッドクロス
                     self.log(f'CLOSE CREATE (From Buy), Price: {self.data.close[0]:.3f}')
                     self.order = self.close()
                # elif self.position.size < 0 and self.crossover > 0: # 売りポジション中にゴールデンクロス
                #     self.log(f'CLOSE CREATE (From Sell), Price: {self.data.close[0]:.3f}')
                #     self.order = self.close()
                return # ポジションがあればエントリーはしない

            # エントリーロジック (ポジションがない場合)
            unit_size = self.sizer_logic.get_unit_size()

            if unit_size > 0: # 取引可能な単位ロット数があるか？
                calculated_size = unit_size * self.params.unit_lot
                # 最小/最大ロット数チェックや証拠金チェックをここに入れるのが望ましい

                # ゴールデンクロスで買いエントリー
                if self.crossover > 0:
                    self.log(f'BUY CREATE: Units={unit_size}, Size={calculated_size:.2f}, Price: {self.data.close[0]:.3f}, Sequence: {self.sizer_logic.sequence}')
                    self.current_trade_units = unit_size # 単位数を記録
                    self.order = self.buy(size=calculated_size)

                # デッドクロスで売りエントリー (オプション)
                # elif self.crossover < 0:
                #     self.log(f'SELL CREATE: Units={unit_size}, Size={calculated_size:.2f}, Price: {self.data.close[0]:.3f}, Sequence: {self.sizer_logic.sequence}')
                #     self.current_trade_units = unit_size # 単位数を記録
                #     self.order = self.sell(size=calculated_size)

            # else:
                # self.log(f"No trade - Unit size is 0 (Cycle complete or sequence empty)")


        def stop(self):
             self.log('MonteCarloSmaCrossStrategy Finished')
             self.log(f'Final Monte Carlo Sequence: {self.sizer_logic.sequence}')


    print("分解モンテカルロ法を組み込んだ Strategy クラスを定義しました。")

    # --- Cerebroの設定 ---
    cerebro = bt.Cerebro()
    initial_cash = 1000000.0
    cerebro.broker.setcash(initial_cash)

    # データフィードとStrategyを追加
    cerebro.adddata(data_feed)
    # unit_lot をパラメータで渡す
    cerebro.addstrategy(MonteCarloSmaCrossStrategy, unit_lot=0.01)

    # --- アナライザーの追加 ---
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='tradeanalyzer')
    cerebro.addanalyzer(bt.analyzers.Transactions, _name='transactions')
    print("アナライザーを追加しました。")

    # --- バックテストの実行 ---
    print("\nバックテストを実行します...")
    print(f"初期資金: {cerebro.broker.getvalue():,.0f}")
    results = cerebro.run()
    strat = results[0]
    print("\nバックテストが完了しました。")

    # --- 結果の数値確認 ---
    print("\n--- 数値確認 (分解モンテカルロ法) ---")
    final_value = cerebro.broker.getvalue()
    print(f"最終資金: {final_value:,.2f}")

    # アナライザーから結果を取得 (ステップ3と同様)
    try:
        trade_analysis = strat.analyzers.tradeanalyzer.get_analysis()
        if trade_analysis and hasattr(trade_analysis, 'total') and trade_analysis.total.closed > 0:
            total_closed_trades = trade_analysis.total.closed
            total_pnl = trade_analysis.pnl.net.total
            print(f"完了した総取引回数: {total_closed_trades}")
            print(f"総損益 (Net PnL): {total_pnl:.2f}")
        else:
             print("完了した取引がありませんでした。")
    except KeyError:
         print("TradeAnalyzer の結果がありません。")
    except AttributeError:
         print("TradeAnalyzer の分析結果の構造が予期したものと異なります。")

    # ログが多くなる可能性があるので注意
    # print("\n--- 取引履歴の一部 (Transactions Analyzer) ---")
    # try:
    #     trans = strat.analyzers.transactions.get_analysis()
    #     # 最初の5件と最後の5件を表示する例
    #     count = 0
    #     keys = list(trans.keys())
    #     print("--- First 5 Transactions ---")
    #     for k in keys[:5]:
    #         print(f"{k}: {trans[k]}")
    #     print("...")
    #     print("--- Last 5 Transactions ---")
    #     for k in keys[-5:]:
    #          print(f"{k}: {trans[k]}")
    # except Exception as e:
    #     print(f"取引履歴の表示中にエラー: {e}")

データファイル 'USDJPYs_M10_data.csv' を使用します。
分解モンテカルロ法ロジッククラス (DecompositionMonteCarloLogic) を定義しました。
データフィードを作成しました。
分解モンテカルロ法を組み込んだ Strategy クラスを定義しました。
アナライザーを追加しました。

バックテストを実行します...
初期資金: 1,000,000
2025-04-18, Strategy Parameters: Unit Lot=0.01, SMA Short=12, SMA Long=26
2025-04-18, Initial Monte Carlo Sequence: [0, 1]
2025-04-18, MonteCarloSmaCrossStrategy Initialized
2023-01-02, BUY CREATE: Units=1, Size=0.01, Price: 130.927, Sequence: [0, 1]
2023-01-02, BUY EXECUTED, Price: 130.927, Size: 0.01
2023-01-02, CLOSE CREATE (From Buy), Price: 130.876
2023-01-02, SELL EXECUTED, Price: 130.872, Size: -0.01
2023-01-02, Trade Closed: PNL=-0.00, Win=False, Traded Units=1, Seq Before: [0, 1]
2023-01-02,                                          Seq After: [0, 1, 1]
2023-01-02, BUY CREATE: Units=1, Size=0.01, Price: 130.773, Sequence: [0, 1, 1]
2023-01-02, BUY EXECUTED, Price: 130.759, Size: 0.01
2023-01-02, CLOSE CREATE (From Buy), Price: 130.719
2023-01-02, SELL EXECUTED, Price: 130.719, Size: -

# ステップ 6: 利益確定(TP) / 損切り(SL) ルールの実装


In [7]:
# ステップ 6: 利益確定(TP) / 損切り(SL) ルールの実装

# 1. ライブラリのインポート
import backtrader as bt
import pandas as pd
import os
import math

print("ライブラリをインポートしました。")

# 2. CSVファイルのパス指定
csv_file_path = 'USDJPYs_M10_data.csv' # ステップ1で作成したファイル

if not os.path.exists(csv_file_path):
    print(f"エラー: ファイルが見つかりません - {csv_file_path}")
else:
    print(f"データファイル '{csv_file_path}' を使用します。")

    # 3. 分解モンテカルロ法ロジッククラス (ステップ4, 5と同様)
    class DecompositionMonteCarloLogic:
        """分解モンテカルロ法のロジックを管理するクラス"""
        def __init__(self):
            self.sequence = []
            self.reset_cycle()
        def reset_cycle(self): self.sequence = [0, 1]
        def get_unit_size(self) -> int:
            if len(self.sequence) >= 2: return self.sequence[0] + self.sequence[-1]
            elif len(self.sequence) == 1: return self.sequence[0]
            else: return 0
        def _apply_decomposition(self):
            if len(self.sequence) == 1:
                val = self.sequence[0]
                if val > 1: self.sequence = [val // 2, val - (val // 2)]
        def update_sequence(self, is_win: bool, traded_unit_size: int):
            if self.is_cycle_complete(): return
            if is_win:
                if len(self.sequence) >= 2: self.sequence.pop(0); self.sequence.pop(-1)
                elif len(self.sequence) == 1:
                    if traded_unit_size == self.sequence[0]: self.sequence.pop(0)
                    else: print(f"警告: 数列[1] 想定外ロット({traded_unit_size} vs {self.sequence[0]})Win"); self.sequence.pop(0)
            else: self.sequence.append(traded_unit_size)
            self._apply_decomposition()
        def is_cycle_complete(self) -> bool: return len(self.sequence) == 0
    print("分解モンテカルロ法ロジッククラスを定義しました。")


    # --- データフィード作成 ---
    dataframe = pd.read_csv(csv_file_path, index_col='datetime', parse_dates=True)
    data_feed = bt.feeds.PandasData(dataname=dataframe, name="USDJPYs_M10")
    print("データフィードを作成しました。")

    # --- TP/SL付きのStrategyクラス ---
    class MonteCarloSmaCrossWithTPSL(bt.Strategy):
        params = (
            ('unit_lot', 0.01),       # 1単位あたりのロット数
            ('sma_short_period', 12), # 短期SMA期間
            ('sma_long_period', 26),  # 長期SMA期間
            ('tp_pips', 50.0),        # 利益確定のpips数 (例: 50pips)
            ('sl_pips', 25.0),        # 損切りのpips数 (例: 25pips)
            # ('pip_value', 0.01),      # USDJPYの1pipの価値 (クロス円の場合) -> brokerから取得推奨だが簡易的に定義
            # ('debug_log', True),
        )

        def log(self, txt, dt=None):
            dt = dt or self.datas[0].datetime.date(0)
            print(f'{dt.isoformat()}, {txt}')

        def __init__(self):
            # pip valueの取得 (より正確にはブローカー情報から取得)
            # self.pip_value = self.broker.comminfo[self.data._dataname].pips # Backtrader <-> IBなどで利用可
            # 今回はシンボル名から簡易的に判断 (USDJPYsを想定)
            if 'JPY' in self.data._name:
                 self.pip_value = 0.01
                 self.point_value = 0.001 # Backtraderのpoint換算用 (1pip=10points)
            else: # ドルストレートなどを想定 (仮)
                 self.pip_value = 0.0001
                 self.point_value = 0.00001 # Backtraderのpoint換算用 (1pip=10points)

            self.log(f'Strategy Parameters: Unit Lot={self.params.unit_lot}, SMA={self.params.sma_short_period}/{self.params.sma_long_period}, '
                     f'TP={self.params.tp_pips}pips, SL={self.params.sl_pips}pips, PipValue={self.pip_value}')

            # モンテカルロロジック
            self.sizer_logic = DecompositionMonteCarloLogic()
            self.log(f'Initial Monte Carlo Sequence: {self.sizer_logic.sequence}')

            # インジケーター
            self.sma_short = bt.indicators.SimpleMovingAverage(self.data.close, period=self.params.sma_short_period)
            self.sma_long = bt.indicators.SimpleMovingAverage(self.data.close, period=self.params.sma_long_period)
            self.crossover = bt.indicators.CrossOver(self.sma_short, self.sma_long)

            # 注文関連の変数をリストで保持 (Bracket注文は複数のオーダーを生成するため)
            self.orders = []
            self.current_trade_units = 0

            self.log('MonteCarloSmaCrossWithTPSL Initialized')

        def notify_order(self, order):
             # Bracket注文の場合、関連する複数のオーダー（エントリー、TP、SL）が通知される
             if order.status in [order.Submitted, order.Accepted]:
                 # self.log(f'Order {order.getstatusname()}: Ref={order.ref}, Type={order.getordername()}, Size={order.size:.2f}, Price={order.price or 0:.3f}')
                 return

             # 完了、キャンセル、拒否されたオーダーをリストから削除
             try:
                 self.orders.remove(order)
             except ValueError:
                 pass # すでに削除されているか、リストにない場合

             if order.status == order.Completed:
                 exec_type = 'BUY' if order.isbuy() else 'SELL'
                 if order.ordtype == order.Market: exec_type += " Market Entry"
                 elif order.ordtype == order.Limit: exec_type += " Limit (TP?)"
                 elif order.ordtype == order.Stop: exec_type += " Stop (SL?)"
                 elif order.ordtype == order.StopLimit: exec_type += " StopLimit" # あまり使わない

                 self.log(f'{exec_type} EXECUTED: Ref={order.ref}, Price: {order.executed.price:.3f}, Size: {order.executed.size:.2f}')

             elif order.status in [order.Canceled, order.Margin, order.Rejected]:
                 self.log(f'Order {order.getstatusname()}: Ref={order.ref} - Status: {order.getstatusname()}')

             # if not self.orders:
             #    self.log("All related orders processed for this bracket.")


        def notify_trade(self, trade):
            # TP/SL付きの場合、クローズ理由はTPかSLか、あるいは反対シグナルか
            if trade.isclosed:
                is_win = trade.pnl > 0
                traded_unit_size = self.current_trade_units # エントリー時の単位数

                close_reason = "Closed" # デフォルト
                # Bracket注文の場合、どの注文で決済されたか特定するのは少し難しい場合がある
                # ここでは単純にPNLで勝ち負けを判断
                if is_win: close_reason = "TP hit?"
                else: close_reason = "SL hit?"

                self.log(f'Trade Closed ({close_reason}): PNL={trade.pnl:.2f}, Win={is_win}, '
                         f'Traded Units={traded_unit_size}, Seq Before: {self.sizer_logic.sequence}')

                self.sizer_logic.update_sequence(is_win, traded_unit_size)

                self.log(f'                                         Seq After: {self.sizer_logic.sequence}')

                if self.sizer_logic.is_cycle_complete():
                    self.log("Cycle Complete! Resetting sequence.")
                    self.sizer_logic.reset_cycle()

                self.current_trade_units = 0

        def next(self):
            # 保留中のオーダーがあるか、すでにポジションがある場合は何もしない
            if self.orders: # リストが空でないかチェック
                # self.log("Pending orders exist, waiting...")
                return
            if self.position:
                # 反対シグナルでの決済ロジック (TP/SLがあるので、これは必須ではないかも)
                # if self.position.size > 0 and self.crossover < 0:
                #     self.log(f'CLOSE (Opposite Signal - Buy), Price: {self.data.close[0]:.3f}')
                #     orders = self.close() # closeは関連オーダーもキャンセルするはず
                #     self.orders.extend(orders)
                # elif self.position.size < 0 and self.crossover > 0:
                #      self.log(f'CLOSE (Opposite Signal - Sell), Price: {self.data.close[0]:.3f}')
                #      orders = self.close()
                #      self.orders.extend(orders)
                return # ポジションがあればエントリーしない

            # エントリーロジック
            unit_size = self.sizer_logic.get_unit_size()

            if unit_size > 0:
                calculated_size = unit_size * self.params.unit_lot

                # --- 買いエントリー (ゴールデンクロス) ---
                if self.crossover > 0:
                    entry_price = self.data.close[0] # 現在の終値で計算（簡易的）
                    sl_price = entry_price - self.params.sl_pips * self.pip_value
                    tp_price = entry_price + self.params.tp_pips * self.pip_value
                    self.current_trade_units = unit_size

                    self.log(f'BUY BRACKET CREATE: Units={unit_size}, Size={calculated_size:.2f}, Entry(Est)={entry_price:.3f}, SL={sl_price:.3f}, TP={tp_price:.3f}, Sequence: {self.sizer_logic.sequence}')

                    # buy_bracket を使用 (エントリーは成行とする例)
                    # price=None または self.data.close[0] で Market Order になるはず
                    # stopargs/limitargsでStop/Limit注文の詳細を指定可能
                    orders = self.buy_bracket(
                        size=calculated_size,
                        # price=entry_price, # Limit entryの場合
                        exectype=bt.Order.Market,
                        stopprice=sl_price,
                        # stopargs=dict(exectype=bt.Order.Stop), # デフォルトはStop
                        limitprice=tp_price,
                        # limitargs=dict(exectype=bt.Order.Limit) # デフォルトはLimit
                    )
                    self.orders.extend(orders) # 生成されたオーダー(通常3つ)をリストに追加

                # --- 売りエントリー (デッドクロス - オプション) ---
                # elif self.crossover < 0:
                #     entry_price = self.data.close[0]
                #     sl_price = entry_price + self.params.sl_pips * self.pip_value # 売りの場合SLは上
                #     tp_price = entry_price - self.params.tp_pips * self.pip_value # 売りの場合TPは下
                #     self.current_trade_units = unit_size
                #
                #     self.log(f'SELL BRACKET CREATE: Units={unit_size}, Size={calculated_size:.2f}, Entry(Est)={entry_price:.3f}, SL={sl_price:.3f}, TP={tp_price:.3f}, Sequence: {self.sizer_logic.sequence}')
                #
                #     orders = self.sell_bracket(
                #          size=calculated_size,
                #          exectype=bt.Order.Market,
                #          stopprice=sl_price,
                #          limitprice=tp_price
                #     )
                #     self.orders.extend(orders)


        def stop(self):
             self.log('MonteCarloSmaCrossWithTPSL Finished')
             self.log(f'Final Monte Carlo Sequence: {self.sizer_logic.sequence}')


    print("TP/SL付き Strategy クラスを定義しました。")

    # --- Cerebroの設定 ---
    cerebro = bt.Cerebro()
    initial_cash = 1000000.0
    cerebro.broker.setcash(initial_cash)

    # データフィードとStrategyを追加
    cerebro.adddata(data_feed)
    # TP/SL pips数をパラメータで渡す
    cerebro.addstrategy(MonteCarloSmaCrossWithTPSL,
                        unit_lot=0.01,
                        sma_short_period=12,
                        sma_long_period=26,
                        tp_pips=50.0, # 例: 50 pips TP
                        sl_pips=25.0) # 例: 25 pips SL

    # --- アナライザーの追加 ---
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='tradeanalyzer')
    cerebro.addanalyzer(bt.analyzers.Transactions, _name='transactions') # 必要なら
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown') # ドローダウンも見る
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe', timeframe=bt.TimeFrame.Days, riskfreerate=0.0) # シャープレシオも見る
    print("アナライザー (TradeAnalyzer, DrawDown, SharpeRatio) を追加しました。")

    # --- バックテストの実行 ---
    print("\nバックテストを実行します...")
    print(f"初期資金: {cerebro.broker.getvalue():,.0f}")
    results = cerebro.run()
    strat = results[0]
    print("\nバックテストが完了しました。")

    # --- 結果の数値確認 ---
    print("\n--- 数値確認 (分解モンテカルロ法 + TP/SL) ---")
    final_value = cerebro.broker.getvalue()
    print(f"最終資金: {final_value:,.2f}")

    # アナライザーから結果を取得
    try:
        ta = strat.analyzers.tradeanalyzer.get_analysis()
        if ta and hasattr(ta, 'total') and ta.total.closed > 0:
            total_closed_trades = ta.total.closed
            total_pnl = ta.pnl.net.total
            win_rate = (ta.won.total / total_closed_trades * 100) if ta.won.total else 0
            avg_win = ta.won.pnl.average if ta.won.total > 0 else 0
            avg_loss = ta.lost.pnl.average if ta.lost.total > 0 else 0
            payoff_ratio = abs(avg_win / avg_loss) if avg_loss != 0 else float('inf')

            print(f"完了した総取引回数: {total_closed_trades}")
            print(f"総損益 (Net PnL): {total_pnl:.2f}")
            print(f"勝率: {win_rate:.2f}%")
            print(f"平均利益: {avg_win:.2f}")
            print(f"平均損失: {avg_loss:.2f}")
            print(f"ペイオフレシオ (AvgWin/AvgLoss): {payoff_ratio:.2f}")
        else:
             print("完了した取引がありませんでした。")

        dd = strat.analyzers.drawdown.get_analysis()
        print(f"最大ドローダウン: {dd.max.drawdown:.2f}%")

        sr = strat.analyzers.sharpe.get_analysis()
        print(f"シャープレシオ (Annualized): {sr['sharperatio']:.3f}" if sr and 'sharperatio' in sr else "N/A")

    except KeyError as e:
         print(f"アナライザーの結果取得中にKeyError: {e}")
    except AttributeError as e:
         print(f"アナライザーの結果取得中にAttributeError: {e}")
    except Exception as e:
        print(f"分析結果の取得中に予期せぬエラー: {e}")

ライブラリをインポートしました。
データファイル 'USDJPYs_M10_data.csv' を使用します。
分解モンテカルロ法ロジッククラスを定義しました。
データフィードを作成しました。
TP/SL付き Strategy クラスを定義しました。
アナライザー (TradeAnalyzer, DrawDown, SharpeRatio) を追加しました。

バックテストを実行します...
初期資金: 1,000,000
2025-04-18, Strategy Parameters: Unit Lot=0.01, SMA=12/26, TP=50.0pips, SL=25.0pips, PipValue=0.01
2025-04-18, Initial Monte Carlo Sequence: [0, 1]
2025-04-18, MonteCarloSmaCrossWithTPSL Initialized
2023-01-02, BUY BRACKET CREATE: Units=1, Size=0.01, Entry(Est)=130.927, SL=130.677, TP=131.427, Sequence: [0, 1]
2023-01-02, BUY Market Entry EXECUTED: Ref=7179, Price: 130.927, Size: 0.01
2023-01-02, SELL EXECUTED: Ref=7180, Price: 130.677, Size: -0.01
2023-01-02, Order Canceled: Ref=7181 - Status: Canceled
2023-01-02, Trade Closed (SL hit?): PNL=-0.00, Win=False, Traded Units=1, Seq Before: [0, 1]
2023-01-02,                                          Seq After: [0, 1, 1]
2023-01-02, BUY BRACKET CREATE: Units=1, Size=0.01, Entry(Est)=130.773, SL=130.523, TP=131.273, Sequence: [0, 