# 本格CQCNN量子強化学習実験
## JSON Config完全対応版 - 収束まで継続学習

- **設定ファイル**: quantum_cqcnn_config_2025-09-24.json
- **量子構成**: 4Q1L (4量子ビット, 1レイヤー)
- **収束閾値**: 0.95 (Balance)
- **最大エピソード**: 50,000
- **アーキテクチャ**: 完全CQCNN (Frontend CNN → Quantum → Backend CNN)

### 実験目標
1. 0.95収束閾値での学習収束確認
2. Ultra-strict実験(0.995)との比較分析
3. 初期状態依存性の影響評価
4. 量子効果の定量的測定

In [None]:
# === 環境設定とライブラリ導入 ===
import sys
import os
import json
import time
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import pennylane as qml
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from collections import deque
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any
import math
import warnings
warnings.filterwarnings('ignore')

# 乱数シード設定（再現性確保）
EXPERIMENT_SEED = 42
random.seed(EXPERIMENT_SEED)
np.random.seed(EXPERIMENT_SEED)
torch.manual_seed(EXPERIMENT_SEED)
torch.backends.cudnn.deterministic = True

print("=== 量子強化学習実験環境 ===")
print(f"PyTorch version: {torch.__version__}")
print(f"PennyLane version: {qml.__version__}")
print(f"NumPy version: {np.__version__}")
print(f"Random seed: {EXPERIMENT_SEED}")
print(f"Device: {'CUDA' if torch.cuda.is_available() else 'CPU'}")
print("="*40)

=== 量子強化学習実験環境 ===
PyTorch version: 2.8.0+cpu
PennyLane version: 0.42.3
NumPy version: 2.3.3
Random seed: 42
Device: CPU


In [9]:
# === JSON設定読み込み ===
config_path = "quantum_recovery_stable_config_2025-09-25.json"

with open(config_path, 'r', encoding='utf-8') as f:
    config = json.load(f)

print("=== 実験設定サマリー ===")
print(f"Algorithm: {config['learning_config']['algorithm']}")
print(f"Quantum: {config['module_config']['quantum']['n_qubits']}Q{config['module_config']['quantum']['n_layers']}L")
print(f"Embedding: {config['module_config']['quantum']['embedding_type']}")
print(f"Entanglement: {config['module_config']['quantum']['entanglement']}")
print(f"State Dimension: {config['module_config']['quantum']['state_dimension']}")
print(f"Action Space: {config['module_config']['qmap']['action_dim']}D")
print(f"Batch Size: {config['hyperparameters']['batch_size']}")
print(f"Learning Rate: {config['hyperparameters']['learning_rate']}")
print(f"Epochs: {config['hyperparameters']['epochs']}")
print(f"Epsilon: {config['hyperparameters']['epsilon']} → {config['hyperparameters']['epsilon_min']} (decay: {config['hyperparameters']['epsilon_decay']})")
print(f"Replay Buffer: {config['hyperparameters']['replay_buffer_size']}")
print(f"Target Update: every {config['hyperparameters']['target_update_freq']} episodes")
print("="*40)

# 設定値を変数に展開
N_QUBITS = config['module_config']['quantum']['n_qubits']
N_LAYERS = config['module_config']['quantum']['n_layers']
STATE_DIM = config['module_config']['quantum']['state_dimension']
ACTION_DIM = config['module_config']['qmap']['action_dim']
BATCH_SIZE = config['hyperparameters']['batch_size']
LEARNING_RATE = config['hyperparameters']['learning_rate']
MAX_EPISODES = config['hyperparameters']['epochs']
EPSILON_START = config['hyperparameters']['epsilon']
EPSILON_DECAY = config['hyperparameters']['epsilon_decay']
EPSILON_MIN = config['hyperparameters']['epsilon_min']
BUFFER_SIZE = config['hyperparameters']['replay_buffer_size']
TARGET_UPDATE = config['hyperparameters']['target_update_freq']
GAMMA = config['hyperparameters']['gamma']

=== 実験設定サマリー ===
Algorithm: quantum_stable_spatial_36d
Quantum: 4Q2L
Embedding: angle
Entanglement: full
State Dimension: 252
Action Space: 36D
Batch Size: 96
Learning Rate: 0.0006
Epochs: 50000
Epsilon: 0.5 → 0.08 (decay: 0.9998)
Replay Buffer: 6000
Target Update: every 150 episodes


In [None]:
# === 完全版ガイスター環境 ===
class GeisterEnvironment:
    """完全なガイスターゲーム環境（JSON設定対応）"""

    def __init__(self, config):
        self.config = config
        self.board_size = 6
        self.forced_p2_setup_id = None
        self.p2_backrow_cells = [(0, 1), (0, 2), (0, 3), (0, 4),
                                (1, 1), (1, 2), (1, 3), (1, 4)]

        self.reset()
        
        # 報酬設定（JSON設定から読み込み）
        reward_config = config['module_config']['reward']
        self.capture_good_reward = reward_config['capture_good_reward']
        self.capture_bad_penalty = reward_config['capture_bad_penalty']
        self.escape_reward = reward_config['escape_reward']
        self.captured_good_penalty = reward_config['captured_good_penalty']
        self.captured_bad_reward = reward_config['captured_bad_reward']
        self.position_rewards = reward_config['position_rewards']
        
        print(f"ガイスター環境初期化完了 - 報酬戦略: {reward_config['strategy']}")

    def reset(self):
        """ゲーム状態をリセット"""
        self.board = np.zeros((6, 6), dtype=int)
        self.turn = 0
        self.current_player = 1
        self.game_over = False
        self.winner = None
        self.captured_pieces = {'player_1': [], 'player_2': []}
        self.move_history = []

        # 配置設定（JSON設定から読み込み）
        placement_config = self.config['module_config']['placement']
        
        if placement_config['type'] == 'custom':
            # カスタム配置（JSON指定）
            my_pieces = placement_config['my_pieces_config']
            
            # プレイヤー1（下側）
            for i, row in enumerate(my_pieces):
                for j, piece in enumerate(row):
                    if piece != 0:
                        self.board[4 + i][j] = piece

            # ===== ここから P2（上側）の配置 =====
            # 善=+2, 悪=-2 を前提にしています（必要ならあなたの符号に合わせて変更）
            GOOD, BAD = 2, -2

            # 8マス（2x4）の順序は __init__ で定義した p2_backrow_cells を使用
            # もし未定義なら定義しておく
            if not hasattr(self, 'p2_backrow_cells'):
                self.p2_backrow_cells = [(0, 1), (0, 2), (0, 3), (0, 4),
                                        (1, 1), (1, 2), (1, 3), (1, 4)]

            # ---- 8C4 の「k番目の組合せ」を復元する関数（辞書順）----
            from math import comb
            def kth_comb_8_4(k: int):
                """0<=k<70 を 8要素から4要素を選ぶ辞書順のk番目に対応させ、昇順インデックス(4つ)を返す"""
                res = []
                n, r = 8, 4
                x = 0
                for i in range(r, 0, -1):
                    for v in range(x, n):
                        c = comb(n - v - 1, i - 1)
                        if k < c:
                            res.append(v)
                            x = v + 1
                            break
                        k -= c
                return res

            # ---- P2配置の決定：強制IDがあれば適用、なければランダム ----
            if getattr(self, 'forced_p2_setup_id', None) is not None:
                sid = int(self.forced_p2_setup_id) % 70
                red_idx = kth_comb_8_4(sid)          # 赤（悪）の位置(0..7)を取得
                # まず全て善にしてから、赤インデックスの所だけ悪に置き換え
                for idx, (r, c) in enumerate(self.p2_backrow_cells):
                    self.board[r][c] = GOOD
                for idx in red_idx:
                    r, c = self.p2_backrow_cells[idx]
                    self.board[r][c] = BAD
            else:
                # 従来のランダム配置（善4、悪4）を維持
                pieces = [GOOD, GOOD, GOOD, GOOD, BAD, BAD, BAD, BAD]
                random.shuffle(pieces)
                for i, (r, c) in enumerate(self.p2_backrow_cells):
                    self.board[r][c] = pieces[i]

            # ---- 実際に用いられた P2 配置のID（0..69）を計算して保持 ----
            # 赤（悪=-2）が置かれたインデックスを 0..7 で収集
            red_positions = []
            for idx, (r, c) in enumerate(self.p2_backrow_cells):
                if self.board[r][c] == BAD:
                    red_positions.append(idx)
            red_positions.sort()

            # rank（=組合せの辞書順ランク）を計算： 8C4 = 70
            rank = 0
            last = -1
            for i, rr in enumerate(red_positions):
                start = last + 1
                for x in range(start, rr):
                    rank += comb(8 - (x + 1), 4 - (i + 1))
                last = rr
            self.p2_setup_id = int(rank)   # ← これを後でモニタリングに使います

        return self.get_state()

    def get_state(self):
        """252次元状態ベクトル生成（7チャンネルエンコーディング）"""
        state = np.zeros(STATE_DIM)  # 6*6*7 = 252
        
        for i in range(6):
            for j in range(6):
                base_idx = (i * 6 + j) * 7
                value = self.board[i][j]
                
                # 7チャンネル: 空, P1善, P1悪, P2善, P2悪, プレイヤー, ターン
                if value == 0:     # 空
                    state[base_idx] = 1
                elif value == 1:   # プレイヤー1善玉
                    state[base_idx + 1] = 1
                elif value == -1:  # プレイヤー1悪玉
                    state[base_idx + 2] = 1
                elif value == 2:   # プレイヤー2善玉
                    state[base_idx + 3] = 1
                elif value == -2:  # プレイヤー2悪玉
                    state[base_idx + 4] = 1
                
                # 追加情報
                state[base_idx + 5] = 1 if self.current_player == 1 else 0
                state[base_idx + 6] = min(self.turn / 100.0, 1.0)  # 正規化ターン
        
        return state

    def get_valid_moves(self, player=None):
        """有効な手を36次元インデックス（toセル）で取得"""
        if player is None:
            player = self.current_player

        valid_moves = []
        piece_values = [1, -1] if player == 1 else [2, -2]

        for from_i in range(6):
            for from_j in range(6):
                if self.board[from_i][from_j] in piece_values:
                    for di, dj in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
                        to_i, to_j = from_i + di, from_j + dj
                        if 0 <= to_i < 6 and 0 <= to_j < 6:
                            target = self.board[to_i][to_j]
                            if (target == 0 or
                                (player == 1 and abs(target) == 2) or
                                (player == 2 and abs(target) == 1)):
                                idx = to_i * 6 + to_j  # ← 0..35 に固定
                                valid_moves.append((idx, "move", (from_i, from_j), (to_i, to_j)))
        return valid_moves

    def make_move(self, move):
        """手を実行し、報酬を計算"""
        move_index, direction, from_pos, to_pos = move
        from_i, from_j = from_pos
        to_i, to_j = to_pos
        
        piece = self.board[from_i][from_j]
        target = self.board[to_i][to_j]
        
        # 移動記録
        self.move_history.append({
            'turn': self.turn,
            'player': self.current_player,
            'move': move,
            'piece': piece,
            'captured': target if target != 0 else None
        })
        
        # 駒を移動
        self.board[from_i][from_j] = 0
        self.board[to_i][to_j] = piece
        
        # 報酬計算
        reward = self._calculate_reward(piece, target, from_pos, to_pos)
        
        # 勝利条件チェック
        done = self._check_win_condition(piece, to_pos, target)
        
        # ターン進行
        self.turn += 1
        
        # 最大ターン数チェック
        if self.turn >= 200:  # 長期戦対応
            self.game_over = True
            self.winner = None
            done = True
        
        if not done:
            self.current_player = 2 if self.current_player == 1 else 1
        
        return self.get_state(), reward, done, {
            'captured': target,
            'winner': self.winner,
            'turn': self.turn
        }
    
    def _calculate_reward(self, piece, target, from_pos, to_pos):
        """詳細報酬計算（JSON設定準拠）"""
        reward = 0.0
        
        # 駒捕獲報酬
        if target != 0:
            if target > 0:  # 善玉捕獲
                reward += self.capture_good_reward
            else:  # 悪玉捕獲
                reward += self.capture_bad_penalty
        
        # 位置的報酬
        from_i, from_j = from_pos
        to_i, to_j = to_pos
        
        # 前進報酬
        if abs(piece) == 1 and piece > 0:  # 善玉の場合
            if self.current_player == 1 and to_i < from_i:  # 前進
                reward += self.position_rewards['advance_toward_escape']
            elif self.current_player == 2 and to_i > from_i:  # 前進
                reward += self.position_rewards['advance_toward_escape']
        
        # 中央制御
        if 2 <= to_i <= 3 and 2 <= to_j <= 3:
            reward += self.position_rewards['center_control']
        
        # 相手陣地進入
        if ((self.current_player == 1 and to_i <= 2) or 
            (self.current_player == 2 and to_i >= 3)):
            reward += self.position_rewards['opponent_territory']
        
        return reward
    
    def _check_win_condition(self, piece, to_pos, captured):
        """勝利条件判定"""
        to_i, to_j = to_pos
        
        # 脱出勝利
        if abs(piece) == 1 and piece > 0:  # 善玉
            if ((self.current_player == 1 and to_i == 0 and to_j in [0, 5]) or
                (self.current_player == 2 and to_i == 5 and to_j in [0, 5])):
                self.game_over = True
                self.winner = self.current_player
                return True
        
        # 善玉全捕獲勝利
        if captured is not None and captured > 0:
            opponent_good_count = 0
            search_value = 2 if self.current_player == 1 else 1
            
            for i in range(6):
                for j in range(6):
                    if self.board[i][j] == search_value:
                        opponent_good_count += 1
            
            if opponent_good_count == 0:
                self.game_over = True
                self.winner = self.current_player
                return True
        
        return False

print("完全版ガイスター環境定義完了")

# 環境テスト
test_env = GeisterEnvironment(config)
test_state = test_env.reset()
print(f"状態ベクトル形状: {test_state.shape}")
print(f"有効手数: {len(test_env.get_valid_moves())}")

完全版ガイスター環境定義完了
ガイスター環境初期化完了 - 報酬戦略: adaptive_anti_collapse
状態ベクトル形状: (252,)
有効手数: 8


In [11]:
# === 完全版CQCNN実装 ===
class CQCNN(nn.Module):
    """完全なClassical-Quantum Convolutional Neural Network"""

    def __init__(self, config):
        super().__init__()
        
        # 設定読み込み
        arch_config = config['architecture']
        quantum_config = config['module_config']['quantum']
        
        self.n_qubits = quantum_config['n_qubits']
        self.n_layers = quantum_config['n_layers']
        self.state_dim = quantum_config['state_dimension']
        self.action_dim = config['module_config']['qmap']['action_dim']
        
        print(f"CQCNN初期化: {self.state_dim}D → {self.n_qubits}Q{self.n_layers}L → {self.action_dim}D")
        
        # Frontend CNN (252 → 4)
        self.frontend_layers = self._build_layers(arch_config['frontend_cnn']['layers'])
        
        # Quantum Section
        self.dev = qml.device(
            arch_config['quantum_section']['device'], 
            wires=self.n_qubits
        )
        self.quantum_params = nn.Parameter(
            torch.randn(self.n_layers, self.n_qubits, 2) * 0.1
        )
        self.quantum_node = qml.QNode(
            self._quantum_circuit, 
            self.dev, 
            interface='torch'
        )
        
        # Backend CNN (4 → 36)
        self.backend_layers = self._build_layers(arch_config['backend_cnn']['layers'])
        
        print(f"Frontend: {len([l for l in self.frontend_layers if isinstance(l, nn.Linear)])} Linear layers")
        print(f"Quantum: {self.n_qubits} qubits, {self.n_layers} layers, {quantum_config['embedding_type']} embedding")
        print(f"Backend: {len([l for l in self.backend_layers if isinstance(l, nn.Linear)])} Linear layers")

    def _build_layers(self, layer_configs):
        """JSON設定からレイヤー構築"""
        layers = []
        
        for layer_config in layer_configs:
            layer_type = layer_config['type']
            
            if layer_type == 'linear':
                layers.append(nn.Linear(
                    layer_config['in_features'],
                    layer_config['out_features']
                ))
            elif layer_type == 'batch_norm':
                layers.append(nn.BatchNorm1d(layer_config['num_features']))
            elif layer_type == 'relu':
                layers.append(nn.ReLU())
            elif layer_type == 'tanh':
                layers.append(nn.Tanh())
            elif layer_type == 'dropout':
                layers.append(nn.Dropout(layer_config['p']))
        
        return nn.Sequential(*layers)

    def _quantum_circuit(self, features, params):
        """量子回路（JSON設定準拠）"""
        # Angle embedding
        for i in range(self.n_qubits):
            qml.RY(features[i], wires=i)
        
        # Variational layers
        for layer in range(self.n_layers):
            for i in range(self.n_qubits):
                qml.RY(params[layer, i, 0], wires=i)
                qml.RZ(params[layer, i, 1], wires=i)
            
            # Linear entanglement
            for i in range(self.n_qubits - 1):
                qml.CNOT(wires=[i, i + 1])
        
        return [qml.expval(qml.PauliZ(i)) for i in range(self.n_qubits)]

    def forward(self, state):
        """順伝播処理"""
        batch_size = state.shape[0]
        
        # Frontend processing
        state_flat = state.view(batch_size, -1)  # Flatten
        frontend_out = self.frontend_layers(state_flat)  # (batch, 4)
        
        # Quantum processing (batch対応)
        quantum_outputs = []
        for i in range(batch_size):
            q_out = self.quantum_node(frontend_out[i], self.quantum_params)
            quantum_outputs.append(torch.stack(q_out))
        
        quantum_out = torch.stack(quantum_outputs)  # (batch, 4)
        
        # Backend processing
        output = self.backend_layers(quantum_out)  # (batch, 36)
        
        return output

print("完全版CQCNN定義完了")

# モデル初期化テスト
test_model = CQCNN(config)
test_input = torch.randn(4, STATE_DIM)  # バッチサイズ4

print("\nモデル構造テスト中...")
with torch.no_grad():
    test_output = test_model(test_input)
    print(f"入力形状: {test_input.shape}")
    print(f"出力形状: {test_output.shape}")
    print(f"出力範囲: [{test_output.min():.3f}, {test_output.max():.3f}]")
    
    # パラメータ数計算
    total_params = sum(p.numel() for p in test_model.parameters() if p.requires_grad)
    quantum_params = test_model.quantum_params.numel()
    classical_params = total_params - quantum_params
    
    print(f"\nパラメータ数:")
    print(f"  Classical: {classical_params:,}")
    print(f"  Quantum: {quantum_params:,}")
    print(f"  Total: {total_params:,}")

print("\nCQCNNテスト完了!")

完全版CQCNN定義完了
CQCNN初期化: 252D → 4Q2L → 36D
Frontend: 4 Linear layers
Quantum: 4 qubits, 2 layers, angle embedding
Backend: 5 Linear layers

モデル構造テスト中...
入力形状: torch.Size([4, 252])
出力形状: torch.Size([4, 36])
出力範囲: [-0.786, 0.611]

パラメータ数:
  Classical: 73,128
  Quantum: 16
  Total: 73,144

CQCNNテスト完了!


In [None]:
# === 収束検出システム ===
class ConvergenceDetector:
    """収束検出システム（0.95閾値）"""

    def __init__(self, patience=50, min_games=1000, balance_threshold=0.95):
        self.patience = patience
        self.min_games = min_games
        self.balance_threshold = balance_threshold
        self.consecutive_good = 0
        self.best_balance = 0.0
        self.convergence_history = []
        
        
        print(f"収束検出器: 閾値={balance_threshold}, patience={patience}, 最小ゲーム数={min_games}")

    def check_convergence(self, game_results, episode):
        """収束判定"""
        if len(game_results) < self.min_games:
            return False, 0.0, {'reason': 'insufficient_games', 'games': len(game_results)}

        # 最近の結果を分析
        recent_games = game_results[-500:]
        
        wins_1 = sum(1 for r in recent_games if r.get('winner') == 1)
        wins_2 = sum(1 for r in recent_games if r.get('winner') == 2)
        draws = sum(1 for r in recent_games if r.get('winner') is None)
        
        total_games = len(recent_games)
        decisive_games = wins_1 + wins_2
        
        # バランス計算
        if decisive_games > 0:
            balance = min(wins_1, wins_2) / max(wins_1, wins_2)
            win_rate_1 = wins_1 / total_games
            win_rate_2 = wins_2 / total_games
            draw_rate = draws / total_games
        else:
            balance = 1.0  # 全引き分けの場合
            win_rate_1 = win_rate_2 = 0.0
            draw_rate = 1.0
        
        # 収束判定
        is_balanced = balance >= self.balance_threshold
        has_active_games = decisive_games >= 50  # 少なくとも50ゲームは決着
        
        metrics = {
            'balance': balance,
            'win_rate_1': win_rate_1,
            'win_rate_2': win_rate_2,
            'draw_rate': draw_rate,
            'decisive_games': decisive_games,
            'total_games': total_games,
            'is_balanced': is_balanced,
            'has_active_games': has_active_games
        }
        
        # 連続カウント
        if is_balanced and has_active_games:
            self.consecutive_good += 1
            metrics['status'] = 'converging'
        else:
            self.consecutive_good = 0
            if not is_balanced:
                metrics['status'] = 'unbalanced'
            elif not has_active_games:
                metrics['status'] = 'too_many_draws'
        
        metrics['consecutive_good'] = self.consecutive_good
        self.best_balance = max(self.best_balance, balance)
        metrics['best_balance'] = self.best_balance
        
        # 履歴記録
        self.convergence_history.append({
            'episode': episode,
            'metrics': metrics.copy()
        })
        
        # 収束判定
        converged = self.consecutive_good >= self.patience
        
        return converged, balance, metrics

print("収束検出システム定義完了")

収束検出システム定義完了


In [None]:
# === メイン実験クラス ===
class CQCNNExperiment:
    """完全版CQCNN自己対戦実験"""

    def __init__(self, config):
        self.config = config
        self.env = GeisterEnvironment(config)

        # 70配置の戦績とスケジュール設定
        self.setup_stats = {sid: {'w':0, 'd':0, 'l':0, 'n':0} for sid in range(70)}
        self.min_coverage_per_sid = 50     # 各setupの最低対局数
        self.ucb_alpha = 0.8               # UCBの探索重み
        self.ucb_top_k = 8                 # UCBスコア上位から抽選
        self.role_mirror_prob = 0.15       # （将来）P1可変/P2固定のミラーを混ぜる確率

        # モニター用CSV
        self.metrics_csv_path = 'training_metrics.csv'
        with open(self.metrics_csv_path, 'w', encoding='utf-8') as f:
            f.write('episode,mean_wr,worst_decile,variance,coverage_min,coverage_ok\n')
        
        # モデル初期化
        self.cqcnn_1 = CQCNN(config)
        self.cqcnn_2 = CQCNN(config)
        
        # プレイヤー2のパラメータを少し変更（多様性確保）
        with torch.no_grad():
            for param in self.cqcnn_2.parameters():
                param.add_(torch.randn_like(param) * 0.01)
        
        # オプティマイザー
        self.optimizer_1 = optim.Adam(self.cqcnn_1.parameters(), lr=LEARNING_RATE)
        self.optimizer_2 = optim.Adam(self.cqcnn_2.parameters(), lr=LEARNING_RATE)
        
        # 学習関連
        self.criterion = nn.MSELoss()
        self.replay_buffer_1 = deque(maxlen=BUFFER_SIZE)
        self.replay_buffer_2 = deque(maxlen=BUFFER_SIZE)
        
        # 統計記録
        self.game_results = []
        self.losses_1 = []
        self.losses_2 = []
        self.epsilon_history = []
        self.training_metrics = []
        
        # 収束検出
        self.convergence_detector = ConvergenceDetector(
            balance_threshold=0.95,  # 95%バランス閾値
            patience=50,
            min_games=1000
        )
        
        # ===== 追加: 配置スケジューリング & モニタ初期化 =====
        self.setup_stats = {sid: {'w':0, 'd':0, 'l':0, 'n':0} for sid in range(70)}
        self.min_coverage_per_sid = 50     # 各setupの最低対局数
        self.ucb_alpha = 0.8               # UCB探索重み
        self.ucb_top_k = 8                 # UCB上位からランダム抽選
        self.role_mirror_prob = 0.15       # （拡張用）P1可変/P2固定を混ぜる確率
        self.metrics_csv_path = 'training_metrics.csv'
        with open(self.metrics_csv_path, 'w', encoding='utf-8') as f:
            f.write('episode,mean_wr,worst_decile,variance,coverage_min,coverage_ok\n')
        # ================================================

        # その他
        self.start_time = time.time()
        self.episode_times = []
        
        print("\n=== CQCNN実験初期化完了 ===")
        print(f"モデル1パラメータ: {sum(p.numel() for p in self.cqcnn_1.parameters()):,}")
        print(f"モデル2パラメータ: {sum(p.numel() for p in self.cqcnn_2.parameters()):,}")
        print(f"リプレイバッファサイズ: {BUFFER_SIZE:,}")
        print(f"バッチサイズ: {BATCH_SIZE}")
        print(f"最大エピソード: {MAX_EPISODES:,}")

    # ===== 追加: 配置モニタ/スケジューラのユーティリティ =====
    def _record_setup_result(self, setup_id: int, winner: int | None):
        s = self.setup_stats[setup_id]
        if winner is None: s['d'] += 1
        elif winner == 1:  s['w'] += 1
        else:              s['l'] += 1
        s['n'] += 1

    def _compute_setup_metrics(self):
        wrs, counts = [], []
        for sid in range(70):
            s = self.setup_stats[sid]; n = max(1, s['n'])
            wr = (s['w'] + 0.5*s['d']) / n
            wrs.append(wr); counts.append(s['n'])
        wrs = np.array(wrs); counts = np.array(counts)
        mean_wr = float(wrs.mean())
        worst_decile = float(np.sort(wrs)[:max(1, 70//10)].mean())
        variance = float(wrs.var())
        coverage_ok = bool(counts.min() >= self.min_coverage_per_sid)
        return dict(mean_wr=mean_wr, worst_decile=worst_decile,
                    variance=variance, coverage_ok=coverage_ok,
                    coverage_min=int(counts.min()))

    def _pick_setup_min_coverage(self):
        need = [sid for sid in range(70) if self.setup_stats[sid]['n'] < self.min_coverage_per_sid]
        return random.choice(need) if need else None

    def _pick_setup_ucb(self):
        import math  # 局所importで依存を閉じる
        total = 1 + sum(self.setup_stats[sid]['n'] for sid in range(70))
        scores = []
        for sid in range(70):
            s = self.setup_stats[sid]; n = s['n']
            wr = (s['w'] + 0.5*s['d']) / max(1, n)
            exploit = 1.0 - wr
            explore = self.ucb_alpha * math.sqrt(math.log(total) / (1 + n))
            scores.append((exploit + explore, sid))
        scores.sort(reverse=True)
        cand = [sid for _, sid in scores[:self.ucb_top_k]]
        return random.choice(cand)

    def choose_p2_setup_id(self):
        sid = self._pick_setup_min_coverage()
        if sid is not None: return sid
        return self._pick_setup_ucb()
    # =====================================================

    def select_action(self, model, state, valid_moves, epsilon):
        """36次元空間でのアクション選択"""
        if random.random() < epsilon:
            return random.choice(valid_moves)

        was_training = model.training
        model.eval()  # 推論モードに設定
        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).flatten().unsqueeze(0)  # (1, 252)
            q_values = model(state_tensor).squeeze(0)  # (36,)
        if was_training:
            model.train()  # 元のモードに戻す

        # 有効な手の中から最大Q値を選択
        best_score = float('-inf')
        best_move = valid_moves[0]

        for move in valid_moves:
            move_index, direction, from_pos, to_pos = move
            if move_index < len(q_values):
                score = q_values[move_index].item()
                if score > best_score:
                    best_score = score
                    best_move = move
        return best_move

    def play_game(self, episode):
        """1ゲームを実行"""
        # （将来: 役割ミラーをするならここで分岐）
        self.env.forced_p2_setup_id = self.choose_p2_setup_id()

        state = self.env.reset()
        done = False
        
        game_states_1 = []
        game_states_2 = []
        
        epsilon = max(EPSILON_MIN, EPSILON_START * (EPSILON_DECAY ** episode))
        
        while not done:
            valid_moves = self.env.get_valid_moves()
            
            if not valid_moves:
                # 有効手なし（稀なケース）
                self.env.game_over = True
                self.env.winner = None
                break
            
            current_state = state.copy()
            current_player = self.env.current_player
            
            if current_player == 1:
                chosen_move = self.select_action(self.cqcnn_1, current_state, valid_moves, epsilon)
            else:
                chosen_move = self.select_action(self.cqcnn_2, current_state, valid_moves, epsilon)
            
            next_state, reward, done, info = self.env.make_move(chosen_move)
            
            # 経験を記録（move_indexはタプル先頭を使用）
            if isinstance(chosen_move, (tuple, list)) and len(chosen_move) >= 1:
                move_index = int(chosen_move[0])  # 0..35
            else:
                move_index = int(chosen_move)
            experience = (current_state, move_index, reward, next_state, done)
            
            if current_player == 1:
                game_states_1.append(experience)
            else:
                game_states_2.append(experience)
            
            state = next_state
        
        # ゲーム結果を記録
        result = {
            'episode': episode,
            'winner': self.env.winner,
            'turns': self.env.turn,
            'player_1_moves': len(game_states_1),
            'player_2_moves': len(game_states_2)
        }
        
        # 最終報酬の配布（勝者に+1、敗者に-1、引き分けは0）
        final_reward_1 = 1.0 if self.env.winner == 1 else (-1.0 if self.env.winner == 2 else 0.0)
        final_reward_2 = 1.0 if self.env.winner == 2 else (-1.0 if self.env.winner == 1 else 0.0)
        
        # 経験をリプレイバッファに追加（sid付き6タプル）
        sid = getattr(self.env, 'p2_setup_id', None)
        if sid is not None:
            # P1視点で w/d/l を更新（winner: 1=先手勝ち, 2=後手勝ち, None=引分）
            self._record_setup_result(sid, self.env.winner)

        for state, action, reward, next_state, done in game_states_1:
            final_exp = (state, action, reward + final_reward_1, next_state, done, sid)
            self.replay_buffer_1.append(final_exp)

        for state, action, reward, next_state, done in game_states_2:
            final_exp = (state, action, reward + final_reward_2, next_state, done, sid)
            self.replay_buffer_2.append(final_exp)
        
        return result, epsilon

    def train_model(self, model, optimizer, replay_buffer, losses_list):
        """モデル学習"""
        if len(replay_buffer) < BATCH_SIZE:
            return
        
        # バッチサンプリング（※ sid付き6タプル/旧5タプル両対応）
        batch = random.sample(replay_buffer, BATCH_SIZE)

        # 後方互換で展開
        states = []
        actions = []
        rewards = []
        next_states = []
        dones = []
        # sids は今は未使用だが将来の分層学習で利用予定
        for exp in batch:
            if len(exp) == 6:
                s, a, r, ns, d, sid = exp
            else:
                s, a, r, ns, d = exp
                sid = None
            states.append(s)
            actions.append(a)
            rewards.append(r)
            next_states.append(ns)
            dones.append(d)

        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatFloatTensor(next_states) if hasattr(torch, "FloatFloatTensor") else torch.FloatTensor(next_states)
        dones = torch.BoolTensor(dones)
        
        # 現在のQ値
        current_q_values = model(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        
        # ターゲットQ値（簡単なTD学習）
        with torch.no_grad():
            next_q_values = model(next_states).max(1)[0]
            target_q_values = rewards + (GAMMA * next_q_values * ~dones)
        
        # 損失計算と更新
        loss = self.criterion(current_q_values, target_q_values)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        losses_list.append(loss.item())

    def run_experiment(self):
        """メイン実験実行"""
        print(f"\n=== 実験開始: 収束まで最大{MAX_EPISODES:,}エピソード ===")
        print(f"収束条件: Balance ≥ 0.95, {self.convergence_detector.patience}回連続")
        
        for episode in range(MAX_EPISODES):
            episode_start = time.time()
            
            # ゲーム実行
            result, epsilon = self.play_game(episode)
            self.game_results.append(result)
            self.epsilon_history.append(epsilon)
            
            # 学習実行
            if episode % 5 == 0:  # 5エピソードごとに学習
                self.train_model(self.cqcnn_1, self.optimizer_1, self.replay_buffer_1, self.losses_1)
                self.train_model(self.cqcnn_2, self.optimizer_2, self.replay_buffer_2, self.losses_2)
            
            episode_time = time.time() - episode_start
            self.episode_times.append(episode_time)
            
            # 進捗報告
            if (episode + 1) % 100 == 0:
                self._print_progress(episode)
                
                # 収束チェック
                converged, balance, metrics = self.convergence_detector.check_convergence(
                    self.game_results, episode
                )
                
                if converged:
                    print(f"\n🎉 収束達成! Episode {episode+1}")
                    print(f"Final Balance: {balance:.4f}")
                    print(f"Consecutive Good: {metrics['consecutive_good']}")
                    break

            # ===== 追加: 配置メトリクスの定期モニタ =====
            if (episode + 1) % 500 == 0:
                m = self._compute_setup_metrics()
                print(f"[monitor] ep={episode+1} mean={m['mean_wr']:.3f} "
                      f"worst10%={m['worst_decile']:.3f} var={m['variance']:.4f} "
                      f"cov_min={m['coverage_min']} cov_ok={m['coverage_ok']}")
                with open(self.metrics_csv_path, "a", encoding="utf-8") as f:
                    f.write(f"{episode+1},{m['mean_wr']:.6f},{m['worst_decile']:.6f},"
                            f"{m['variance']:.6f},{m['coverage_min']},{int(m['coverage_ok'])}\n")
            # =======================================

            # 早期収束チェック（1000エピソードごと）
            if (episode + 1) % 1000 == 0:
                converged, balance, metrics = self.convergence_detector.check_convergence(
                    self.game_results, episode
                )
                if converged:
                    print(f"\n🎉 収束達成! Episode {episode+1}")
                    break
        
        total_time = time.time() - self.start_time
        
        print(f"\n=== 実験完了 ===")
        print(f"Total Episodes: {len(self.game_results):,}")
        print(f"Total Time: {total_time:.1f}s ({total_time/3600:.1f}h)")
        print(f"Average Episode Time: {np.mean(self.episode_times):.3f}s")
        
        # 最終収束チェック
        final_converged, final_balance, final_metrics = self.convergence_detector.check_convergence(
            self.game_results, len(self.game_results)-1
        )
        
        return len(self.game_results), total_time, {
            'converged': final_converged,
            'balance': final_balance,
            'metrics': final_metrics,
            'total_games': len(self.game_results)
        }

    def _print_progress(self, episode):
        """進捗表示"""
        recent_window = min(100, len(self.game_results))
        recent_results = self.game_results[-recent_window:]
        
        wins_1 = sum(1 for r in recent_results if r['winner'] == 1)
        wins_2 = sum(1 for r in recent_results if r['winner'] == 2)
        draws = sum(1 for r in recent_results if r['winner'] is None)
        
        win_rate_1 = wins_1 / recent_window
        win_rate_2 = wins_2 / recent_window
        draw_rate = draws / recent_window
        
        balance = min(wins_1, wins_2) / max(wins_1, wins_2) if max(wins_1, wins_2) > 0 else 1.0
        avg_turns = np.mean([r['turns'] for r in recent_results])
        
        current_epsilon = self.epsilon_history[-1] if self.epsilon_history else EPSILON_START
        avg_loss_1 = np.mean(self.losses_1[-50:]) if len(self.losses_1) >= 50 else 0
        avg_loss_2 = np.mean(self.losses_2[-50:]) if len(self.losses_2) >= 50 else 0
        
        elapsed_time = time.time() - self.start_time
        
        print(f"Episode {episode+1:5d} | "
              f"P1={win_rate_1:.3f} P2={win_rate_2:.3f} D={draw_rate:.3f} | "
              f"Balance={balance:.4f} | "
              f"Turns={avg_turns:.1f} | "
              f"ε={current_epsilon:.4f} | "
              f"Loss={avg_loss_1:.4f}/{avg_loss_2:.4f} | "
              f"Time={elapsed_time:.0f}s")

print("完全版CQCNN実験クラス定義完了")


完全版CQCNN実験クラス定義完了


In [14]:
# === 実験実行 ===
print("🚀 CQCNN量子強化学習実験を開始します")
print(f"設定: {config_path}")
print(f"最大エピソード: {MAX_EPISODES:,}")
print(f"目標: Balance ≥ 0.95, 50回連続達成で収束")

# 実験インスタンス生成
experiment = CQCNNExperiment(config)

# 実験実行
final_episode, training_time, analysis = experiment.run_experiment()

print("\n✅ 実験完了!")

🚀 CQCNN量子強化学習実験を開始します
設定: quantum_recovery_stable_config_2025-09-25.json
最大エピソード: 50,000
目標: Balance ≥ 0.95, 50回連続達成で収束
ガイスター環境初期化完了 - 報酬戦略: adaptive_anti_collapse
CQCNN初期化: 252D → 4Q2L → 36D
Frontend: 4 Linear layers
Quantum: 4 qubits, 2 layers, angle embedding
Backend: 5 Linear layers
CQCNN初期化: 252D → 4Q2L → 36D
Frontend: 4 Linear layers
Quantum: 4 qubits, 2 layers, angle embedding
Backend: 5 Linear layers
収束検出器: 閾値=0.95, patience=50, 最小ゲーム数=1000

=== CQCNN実験初期化完了 ===
モデル1パラメータ: 73,144
モデル2パラメータ: 73,144
リプレイバッファサイズ: 6,000
バッチサイズ: 96
最大エピソード: 50,000

=== 実験開始: 収束まで最大50,000エピソード ===
収束条件: Balance ≥ 0.95, 50回連続
Episode   100 | P1=0.880 P2=0.110 D=0.010 | Balance=0.1250 | Turns=61.0 | ε=0.4902 | Loss=0.0000/0.0000 | Time=19s
Episode   200 | P1=0.810 P2=0.090 D=0.100 | Balance=0.1111 | Turns=80.0 | ε=0.4805 | Loss=0.0000/0.0000 | Time=43s
Episode   300 | P1=0.840 P2=0.130 D=0.030 | Balance=0.1548 | Turns=67.1 | ε=0.4710 | Loss=35.0755/12.1163 | Time=67s
Episode   400 | P1=0.820 P2=0.130 D

KeyboardInterrupt: 

In [None]:
# === 結果分析と可視化 ===
print("📊 実験結果を分析中...")

# データ準備
episodes = [r['episode'] for r in experiment.game_results]
winners = [r['winner'] for r in experiment.game_results]
turns = [r['turns'] for r in experiment.game_results]

# 勝率計算（移動平均）
window = 100
p1_wins = [1 if w == 1 else 0 for w in winners]
p2_wins = [1 if w == 2 else 0 for w in winners]
draws = [1 if w is None else 0 for w in winners]

p1_rate = np.convolve(p1_wins, np.ones(window)/window, mode='valid')
p2_rate = np.convolve(p2_wins, np.ones(window)/window, mode='valid')
draw_rate = np.convolve(draws, np.ones(window)/window, mode='valid')
episodes_smooth = np.array(episodes[window-1:])

# バランス計算
balance_history = []
for i in range(window-1, len(winners)):
    recent = winners[i-window+1:i+1]
    w1 = sum(1 for w in recent if w == 1)
    w2 = sum(1 for w in recent if w == 2)
    balance = min(w1, w2) / max(w1, w2) if max(w1, w2) > 0 else 1.0
    balance_history.append(balance)

# 収束メトリクス
convergence_episodes = [h['episode'] for h in experiment.convergence_detector.convergence_history]
convergence_balance = [h['metrics']['balance'] for h in experiment.convergence_detector.convergence_history]
consecutive_good = [h['metrics']['consecutive_good'] for h in experiment.convergence_detector.convergence_history]

# 可視化
plt.style.use('default')
fig, axes = plt.subplots(3, 3, figsize=(20, 15))
fig.suptitle(f'CQCNN量子強化学習実験結果 (Episodes: {final_episode:,})', fontsize=16, fontweight='bold')

# 1. 勝率推移
axes[0,0].plot(episodes_smooth, p1_rate, label='Player 1', color='blue', linewidth=2)
axes[0,0].plot(episodes_smooth, p2_rate, label='Player 2', color='red', linewidth=2)
axes[0,0].plot(episodes_smooth, draw_rate, label='Draws', color='gray', linewidth=2)
axes[0,0].axhline(y=0.5, color='black', linestyle='--', alpha=0.5)
axes[0,0].set_title('Win Rate Trends')
axes[0,0].set_xlabel('Episode')
axes[0,0].set_ylabel('Win Rate')
axes[0,0].legend()
axes[0,0].grid(True, alpha=0.3)

# 2. バランス推移
axes[0,1].plot(episodes_smooth, balance_history, color='green', linewidth=2)
axes[0,1].axhline(y=0.95, color='red', linestyle='--', label='Target (0.95)')
axes[0,1].axhline(y=0.995, color='orange', linestyle='--', label='Ultra-strict (0.995)')
axes[0,1].set_title('Balance Evolution')
axes[0,1].set_xlabel('Episode')
axes[0,1].set_ylabel('Balance')
axes[0,1].legend()
axes[0,1].grid(True, alpha=0.3)
axes[0,1].set_ylim([0, 1.05])

# 3. 収束進行
if convergence_episodes:
    axes[0,2].plot(convergence_episodes, consecutive_good, color='purple', linewidth=2, marker='o', markersize=3)
    axes[0,2].axhline(y=50, color='red', linestyle='--', label='Target (50)')
    axes[0,2].set_title('Convergence Progress')
    axes[0,2].set_xlabel('Episode')
    axes[0,2].set_ylabel('Consecutive Good Checks')
    axes[0,2].legend()
    axes[0,2].grid(True, alpha=0.3)

# 4. ゲーム長推移
turns_smooth = np.convolve(turns, np.ones(window)/window, mode='valid')
axes[1,0].plot(episodes_smooth, turns_smooth, color='brown', linewidth=2)
axes[1,0].set_title('Average Game Length')
axes[1,0].set_xlabel('Episode')
axes[1,0].set_ylabel('Turns per Game')
axes[1,0].grid(True, alpha=0.3)

# 5. ε減衰
if experiment.epsilon_history:
    axes[1,1].plot(experiment.epsilon_history, color='orange', linewidth=2)
    axes[1,1].set_title('Epsilon Decay')
    axes[1,1].set_xlabel('Episode')
    axes[1,1].set_ylabel('Epsilon')
    axes[1,1].set_yscale('log')
    axes[1,1].grid(True, alpha=0.3)

# 6. 損失推移
if experiment.losses_1 and experiment.losses_2:
    loss_episodes_1 = np.linspace(0, final_episode, len(experiment.losses_1))
    loss_episodes_2 = np.linspace(0, final_episode, len(experiment.losses_2))
    axes[1,2].plot(loss_episodes_1, experiment.losses_1, alpha=0.7, color='blue', label='Player 1')
    axes[1,2].plot(loss_episodes_2, experiment.losses_2, alpha=0.7, color='red', label='Player 2')
    
    # 移動平均
    if len(experiment.losses_1) > 50:
        loss1_smooth = np.convolve(experiment.losses_1, np.ones(50)/50, mode='valid')
        loss2_smooth = np.convolve(experiment.losses_2, np.ones(50)/50, mode='valid')
        axes[1,2].plot(loss_episodes_1[49:], loss1_smooth, color='darkblue', linewidth=2)
        axes[1,2].plot(loss_episodes_2[49:], loss2_smooth, color='darkred', linewidth=2)
    
    axes[1,2].set_title('Training Loss')
    axes[1,2].set_xlabel('Episode')
    axes[1,2].set_ylabel('Loss')
    axes[1,2].legend()
    axes[1,2].grid(True, alpha=0.3)

# 7. 最終結果分布
final_1000 = winners[-1000:] if len(winners) >= 1000 else winners
w1_final = sum(1 for w in final_1000 if w == 1)
w2_final = sum(1 for w in final_1000 if w == 2)
d_final = sum(1 for w in final_1000 if w is None)

categories = ['Player 1', 'Player 2', 'Draws']
values = [w1_final, w2_final, d_final]
colors = ['blue', 'red', 'gray']
axes[2,0].pie(values, labels=categories, colors=colors, autopct='%1.1f%%', startangle=90)
axes[2,0].set_title(f'Final Results (Last {len(final_1000)} games)')

# 8. 量子パラメータ分布
params1 = experiment.cqcnn_1.quantum_params.detach().numpy().flatten()
params2 = experiment.cqcnn_2.quantum_params.detach().numpy().flatten()
axes[2,1].hist(params1, bins=30, alpha=0.7, label='Player 1', color='blue', density=True)
axes[2,1].hist(params2, bins=30, alpha=0.7, label='Player 2', color='red', density=True)
axes[2,1].set_title('Quantum Parameter Distribution')
axes[2,1].set_xlabel('Parameter Value')
axes[2,1].set_ylabel('Density')
axes[2,1].legend()
axes[2,1].grid(True, alpha=0.3)

# 9. エピソード時間
if experiment.episode_times:
    time_smooth = np.convolve(experiment.episode_times, np.ones(min(100, len(experiment.episode_times)))//min(100, len(experiment.episode_times)), mode='valid')
    time_episodes = range(len(time_smooth))
    axes[2,2].plot(time_episodes, time_smooth, color='green', linewidth=2)
    axes[2,2].set_title('Episode Time')
    axes[2,2].set_xlabel('Episode')
    axes[2,2].set_ylabel('Time (seconds)')
    axes[2,2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("可視化完了!")

In [None]:
# === 詳細分析レポート ===
print("\n" + "="*60)
print("          CQCNN量子強化学習実験 - 最終レポート")
print("="*60)

# 基本統計
total_games = len(experiment.game_results)
total_p1_wins = sum(1 for r in experiment.game_results if r['winner'] == 1)
total_p2_wins = sum(1 for r in experiment.game_results if r['winner'] == 2)
total_draws = sum(1 for r in experiment.game_results if r['winner'] is None)

print(f"\n📈 基本統計")
print(f"  総エピソード数: {total_games:,}")
print(f"  Player 1 勝利: {total_p1_wins:,} ({total_p1_wins/total_games*100:.1f}%)")
print(f"  Player 2 勝利: {total_p2_wins:,} ({total_p2_wins/total_games*100:.1f}%)")
print(f"  引き分け: {total_draws:,} ({total_draws/total_games*100:.1f}%)")
print(f"  平均ゲーム長: {np.mean(turns):.1f} ターン")
print(f"  実験時間: {training_time:.1f}秒 ({training_time/3600:.2f}時間)")

# 最終期間の詳細分析
final_period = min(1000, total_games)
final_results = experiment.game_results[-final_period:]
final_p1 = sum(1 for r in final_results if r['winner'] == 1)
final_p2 = sum(1 for r in final_results if r['winner'] == 2)
final_draws = sum(1 for r in final_results if r['winner'] is None)
final_balance = min(final_p1, final_p2) / max(final_p1, final_p2) if max(final_p1, final_p2) > 0 else 1.0

print(f"\n🎯 最終期間分析 (直近{final_period}ゲーム)")
print(f"  Player 1: {final_p1} ({final_p1/final_period*100:.1f}%)")
print(f"  Player 2: {final_p2} ({final_p2/final_period*100:.1f}%)")
print(f"  引き分け: {final_draws} ({final_draws/final_period*100:.1f}%)")
print(f"  バランス: {final_balance:.4f}")
print(f"  目標達成: {'✅ YES' if final_balance >= 0.95 else '❌ NO'} (目標: ≥0.95)")

# 収束分析
print(f"\n🔄 収束分析")
if analysis['converged']:
    print(f"  ✅ 収束達成!")
    print(f"  最終バランス: {analysis['balance']:.4f}")
    print(f"  連続達成回数: {analysis['metrics']['consecutive_good']}")
else:
    print(f"  ❌ 収束未達成")
    print(f"  現在バランス: {analysis['balance']:.4f}")
    print(f"  連続達成回数: {analysis['metrics']['consecutive_good']}/50")
    print(f"  ステータス: {analysis['metrics']['status']}")

# Ultra-strict実験との比較
ultra_strict_balance = 1.000  # Ultra-strict実験の結果
ultra_strict_episodes = 46400
ultra_strict_draws = 1.0

print(f"\n⚖️  Ultra-strict実験との比較")
print(f"  設定      │ Ultra-strict │ 現在の実験")
print(f"  ─────────┼─────────────┼──────────────")
print(f"  閾値      │     0.995    │    0.95")
print(f"  エピソード│   {ultra_strict_episodes:,}    │   {total_games:,}")
print(f"  バランス  │   {ultra_strict_balance:.3f}    │   {final_balance:.3f}")
print(f"  引き分け率│   {ultra_strict_draws*100:.1f}%     │   {final_draws/final_period*100:.1f}%")
print(f"  収束      │     未達成    │   {'達成' if analysis['converged'] else '未達成'}")

# 量子効果の分析
quantum_std_1 = np.std(params1)
quantum_std_2 = np.std(params2)
quantum_diff = np.mean(np.abs(params1 - params2))

print(f"\n🌌 量子効果分析")
print(f"  Player 1 量子パラメータ範囲: [{params1.min():.3f}, {params1.max():.3f}]")
print(f"  Player 2 量子パラメータ範囲: [{params2.min():.3f}, {params2.max():.3f}]")
print(f"  Player 1 標準偏差: {quantum_std_1:.3f}")
print(f"  Player 2 標準偏差: {quantum_std_2:.3f}")
print(f"  プレイヤー間差異: {quantum_diff:.3f}")

# 学習効率
if experiment.losses_1 and experiment.losses_2:
    initial_loss_1 = np.mean(experiment.losses_1[:50]) if len(experiment.losses_1) >= 50 else 0
    final_loss_1 = np.mean(experiment.losses_1[-50:]) if len(experiment.losses_1) >= 50 else 0
    initial_loss_2 = np.mean(experiment.losses_2[:50]) if len(experiment.losses_2) >= 50 else 0
    final_loss_2 = np.mean(experiment.losses_2[-50:]) if len(experiment.losses_2) >= 50 else 0
    
    print(f"\n📚 学習効率")
    print(f"  Player 1 損失: {initial_loss_1:.4f} → {final_loss_1:.4f} ({((final_loss_1-initial_loss_1)/initial_loss_1*100):+.1f}%)")
    print(f"  Player 2 損失: {initial_loss_2:.4f} → {final_loss_2:.4f} ({((final_loss_2-initial_loss_2)/initial_loss_2*100):+.1f}%)")
    print(f"  総学習ステップ: {len(experiment.losses_1) + len(experiment.losses_2):,}")

# 実験設定サマリー
print(f"\n⚙️  実験設定")
print(f"  量子構成: {N_QUBITS}Q{N_LAYERS}L")
print(f"  状態次元: {STATE_DIM}")
print(f"  行動空間: {ACTION_DIM}")
print(f"  バッチサイズ: {BATCH_SIZE}")
print(f"  学習率: {LEARNING_RATE}")
print(f"  ε減衰: {EPSILON_START} → {EPSILON_MIN} (係数: {EPSILON_DECAY})")
print(f"  バッファサイズ: {BUFFER_SIZE:,}")

print(f"\n" + "="*60)
print(f"実験完了! 結果は上記の通りです。")
print(f"ノートブック保存推奨: このセルの結果を記録してください。")
print("="*60)