<a href="https://colab.research.google.com/github/thc1006/flora-dp-federated-ColO-RAN/blob/main/FLORA_DP_client_12_v1_1_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 聯邦強化學習於無線網路切片資源分配之進階實作

### 實驗目標 (Pareto Optimization)
本實驗旨在優化一個聯邦強化學習系統，使其在模擬的 5G 網路切片資源分配任務中，達成以下三個核心目標的權衡：
1.  **低延遲 (Low Latency)**: 端到端訓練延遲 (P95) ` < 40s`。
2.  **強隱私 (Strong Privacy)**: 總隱私預算消耗 `ε < 4.0`。
3.  **高效能 (High Reward)**: 最終評估的平均獎勵 `Avg Reward > 165`。

### 環境需求
- **硬體**: Google Colab with T4 GPU
- **Python**: 3.10+
- **核心套件**:
  - `torch`: 2.1.0+
  - `opacus`: 1.4.0+
  - `numpy`: 1.23.5+
  - `pandas`: 1.5.3+
  - `scikit-learn`: 1.2.2+
  - `matplotlib`: 3.7.1+
  - `seaborn`: 0.12.2+
  - `pyarrow`: 10.0.1+
  - `nest_asyncio`: 1.6.0+

  ---

  檔案中的【實際欄位列表】如下：
['timestamp', 'num_ues', 'IMSI', 'RNTI', 'slicing_enabled']
['Slice_ID', 'slice_prb', 'power_multiplier', 'scheduling_policy', 'dl_mcs']
['dl_n_samples', 'Buffer_Occupancy_DL_bytes', 'Throughput_DL_Mbps', 'tx_pkts downlink', 'tx_errors downlink (%)']
['dl_cqi', 'ul_mcs', 'ul_n_samples', 'ul_buffer [bytes]', 'Throughput_UL_Mbps']
['rx_pkts uplink', 'rx_errors uplink (%)', 'ul_rssi', 'ul_sinr', 'phr']
['sum_requested_prbs', 'sum_granted_prbs', 'dl_pmi', 'dl_ri', 'ul_n']
['ul_turbo_iters', 'Scheduling_Policy_Active', 'Training_Config_ID', 'exp_id', 'BS_ID']
['nof_ue', 'dl_brate', 'ul_brate']

In [1]:
# @title Cell 1: 環境設定與函式庫匯入（強化版）
# 安裝必要套件
!pip install --upgrade opacus -q
!pip install nest_asyncio -q
!pip install dask -q
!pip install pyarrow -q

import torch, torch.nn as nn, torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np, pandas as pd, random, copy, json, os, time, warnings, math, re, contextlib
from collections import deque
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt, seaborn as sns
from dataclasses import dataclass, asdict, field
from sklearn.cluster import KMeans
from opacus import PrivacyEngine
from opacus.validators import ModuleValidator
from opacus.data_loader import DPDataLoader
from typing import Dict, List, Tuple, Optional, Union
import scipy.stats as stats
from datetime import datetime
import nest_asyncio
import logging
from pathlib import Path
import asyncio
import gc

# --- 環境設定 ---
try: torch._dynamo.disable()
except Exception: pass
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)
warnings.filterwarnings("ignore", message=".*overflow encountered.*", category=RuntimeWarning)

pd.options.mode.chained_assignment = None

# --- 新增：環境變數設定 (通用約束4) ---
BASE_OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "./")) / "outputs"
os.makedirs(BASE_OUTPUT_DIR, exist_ok=True)

# --- 新增：全局配置字典 (通用約束4) ---
CONFIG = {
    "BASE_OUTPUT_DIR": str(BASE_OUTPUT_DIR),
    "GPU_MEMORY_LIMIT": 13 * 1024 * 1024 * 1024,  # 13GB in bytes
    "LATENCY_SLA": 0.05,  # 50ms SLA for latency
    "SEED": 42
}

# --- 新增：Deterministic 設定 (通用約束6) ---
def set_global_seed(seed: int = 42):
    """設定全局隨機種子以確保可重現性"""
    os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8'
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)

    # 確保 deterministic 行為
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    if hasattr(torch, 'use_deterministic_algorithms'):
        try:
            torch.use_deterministic_algorithms(True)
        except RuntimeError:
            print("⚠️ 部分操作不支援完全 deterministic 模式")

# --- 新增：設置全局日誌系統 (日誌與錯誤韌性) ---
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler(BASE_OUTPUT_DIR / 'experiment.log')
    ]
)
logger = logging.getLogger(__name__)

# 設定隨機種子
set_global_seed(CONFIG["SEED"])

# 應用 nest_asyncio
nest_asyncio.apply()

print("✅ Cell 1: 環境與函式庫準備就緒（含 Deterministic 與 Asyncio 修正）。")
import opacus
print(f"PyTorch/Opacus 版本: {torch.__version__} / {opacus.__version__}")
print(f"CUDA 是否可用: {torch.cuda.is_available()}")
print(f"Deterministic 模式: 已啟用")
print(f"Nest Asyncio: 已應用")
logger.info("Environment setup completed")

✅ Cell 1: 環境與函式庫準備就緒（含 Deterministic 與 Asyncio 修正）。
PyTorch/Opacus 版本: 2.6.0+cu124 / 1.5.4
CUDA 是否可用: True
Deterministic 模式: 已啟用
Nest Asyncio: 已應用


In [2]:
# @title Cell 2: 🎓 實驗參數設定（強化版 - 支援 Non-IID & CQL）
from dataclasses import dataclass, field
import os
import json
import torch
import numpy as np
from typing import Tuple

@dataclass
class TrainingConfig:
    experiment_name: str
    output_dir: str
    mode: str = "ClusteredFL"  # 新增支援: "CQL"
    random_seed: int = 42
    comm_rounds: int = 20  # 建議值 > 20 for ClusteredFL

    # 【新增】ClusteredFL 動態聚類參數 (任務1)
    base_cluster_freq: int = 2  # 基礎聚類更新頻率
    cluster_decay_rate: float = 0.1  # 聚類頻率衰減率
    early_stopping_rounds: int = 2  # 早期停止輪數
    use_cosine_similarity: bool = True  # 使用餘弦相似度判斷
    knowledge_transfer: bool = True  # 啟用知識遷移

    # 【新增】CQL 配置 (任務2)
    enable_cql: bool = False
    cql_alpha: float = 1.0  # 將根據ε動態調整
    cql_min_q_weight: float = 5.0  # 將根據ε動態調整
    cql_loss_alpha_scaler: float = 0.1  # 損失縮放因子
    use_mixed_precision: bool = True  # 混合精度訓練

    # 【新增】FedAvg 動態加權 (任務3)
    use_weighted_fedavg: bool = True

    # 【新增】FedProx 動態mu (任務4)
    fedprox_mu_base: float = 0.15
    fedprox_mu_adaptive: bool = True

    # 【新增】延遲監控配置 (任務5)
    enable_latency_monitor: bool = True
    latency_warning_threshold: float = 1.0  # 1秒警告閾值

    # 【新增】DP 優化配置 (任務6)
    enable_l2_regularization: bool = False
    weight_decay: float = 1e-4
    enable_federated_dropout: bool = False
    federated_dropout_rate: float = 0.2
    use_rdp_accountant: bool = True  # 使用 RDP accountant

    # 【新增】Non-IID 配置
    enable_non_iid: bool = True
    dirichlet_alpha: float = 0.5  # 控制 Non-IID 程度，越小越異質
    min_clients_non_iid: int = 10  # 最少客戶端數量

    # 【新增】Adaptive Clipping 配置
    enable_adaptive_clip: bool = True
    clip_window: int = 50  # 統計窗口大小
    clip_percentile: float = 0.9  # 使用 90 分位數

    # 【新增】非同步聚合配置
    enable_async_aggregation: bool = True
    timeout_threshold: float = 0.3  # 30% 客戶端超時觸發非同步
    client_timeout_seconds: float = 30.0  # 客戶端超時時間

    # 虛擬客戶端相關（保留原有）
    base_client_pairs: tuple = ((1, 2), (3, 7), (5, 6))
    virtual_expansion_factor: int = 3
    num_virtual_clients: int = 9
    num_real_clients: int = 3
    total_clients: int = 12
    num_clients: int = 12
    num_clients_to_select: int = 8

    # 其他原有參數保持不變
    temporal_split_method: str = "sliding_window"
    noise_injection_std: float = 0.03
    feature_augmentation: bool = True
    cross_validation_split: bool = True

    local_episodes_per_round: int = 4
    steps_per_episode: int = 500
    batch_size: int = 256
    gamma: float = 0.99

    lr: float = 1e-4
    target_update_freq: int = 15

    epsilon_start: float = 1.0
    epsilon_decay: float = 0.9995
    epsilon_min: float = 0.05

    memory_capacity: int = 50000
    replay_start_size: int = 1000
    replay_frequency: int = 2
    replay_batches_per_call: int = 3

    fedprox_mu: float = 0.15  # 將被動態調整
    num_clusters: int = 3
    cluster_update_freq: int = 8  # 將被動態調整

    enable_dp: bool = True
    dp_target_epsilon: float = 8.0
    dp_target_delta: float = 1e-5
    dp_max_grad_norm: float = 1.0
    dp_noise_multiplier: float = 0.5
    dp_sampling_probability: float = 0.1
    dp_virtual_batch_size: int = 256
    dp_microbatch_size: int = 1

    dp_reset_threshold_multiplier: float = 1.5
    enable_dp_reset: bool = True

    enable_heterogeneity: bool = True
    enable_compression: bool = True

    straggler_ratio: float = 0.1
    dropout_ratio: float = 0.05
    compression_type: str = "quantize_fp16"
    use_pfl_finetune: bool = True
    local_finetune_episodes: int = 15
    device: str = "cuda" if torch.cuda.is_available() else "cpu"
    reset_to_random_start: bool = True

    client_pairs: tuple = field(init=False)

    def __post_init__(self):
        # 動態生成客戶端配對
        extended_client_pairs = []
        for i in range(self.total_clients):
            embb_id = i + 1
            urllc_id = i + 2
            extended_client_pairs.append((embb_id, urllc_id))
        self.client_pairs = tuple(extended_client_pairs)

        # GPU 環境檢測
        if torch.cuda.is_available():
            gpu_name = torch.cuda.get_device_name(0)
            if "T4" in gpu_name:
                logger.info(f"T4 GPU檢測到，優化配置")
                self.batch_size = min(self.batch_size, 128)  # 限制批次大小
            elif "L4" in gpu_name:
                logger.info(f"L4 GPU檢測到，啟用大批次優化配置")

        # 差分隱私模式檢測
        if self.mode == 'Centralized':
            self.enable_dp = False

        # CQL 模式檢測
        if self.mode == 'CQL':
            self.enable_cql = True
            logger.info(f"CQL 模式啟用 - α={self.cql_alpha}, min_q_weight={self.cql_min_q_weight}")

        # 動態調整 CQL 參數 (根據隱私預算)
        if self.enable_cql and self.enable_dp:
            self.adjust_cql_params_by_epsilon()

        # 配置信息顯示
        logger.info(f"實驗配置:")
        logger.info(f"   - Non-IID: {'啟用' if self.enable_non_iid else '禁用'} (α={self.dirichlet_alpha})")
        logger.info(f"   - Adaptive Clipping: {'啟用' if self.enable_adaptive_clip else '禁用'}")
        logger.info(f"   - 非同步聚合: {'啟用' if self.enable_async_aggregation else '禁用'}")
        logger.info(f"   - 總客戶端數: {self.total_clients}")

        if self.enable_dp and self.mode != 'Centralized':
            logger.info(f"差分隱私已啟用")
            logger.info(f"   - 目標隱私預算: ε={self.dp_target_epsilon}")
            logger.info(f"   - 批次大小: {self.batch_size}")

    def adjust_cql_params_by_epsilon(self):
        """根據隱私預算動態調整CQL參數"""
        if self.dp_target_epsilon > 10:
            self.cql_alpha = 0.5
            self.cql_min_q_weight = 2.0
        elif self.dp_target_epsilon < 1:
            self.cql_alpha = 2.0
            self.cql_min_q_weight = 10.0
        logger.info(f"CQL參數已根據ε={self.dp_target_epsilon}調整: α={self.cql_alpha}, min_q_weight={self.cql_min_q_weight}")

    def get_dynamic_cluster_freq(self, round_idx: int) -> int:
        """獲取動態聚類更新頻率"""
        if not hasattr(self, '_cluster_freq_cache'):
            self._cluster_freq_cache = {}

        if round_idx not in self._cluster_freq_cache:
            freq = int(self.base_cluster_freq * (1 + self.cluster_decay_rate * round_idx))
            self._cluster_freq_cache[round_idx] = max(freq, 10)  # 最少10輪更新一次

        return self._cluster_freq_cache[round_idx]

    def adjust_fedprox_mu(self, client_heterogeneity: float) -> float:
        """根據客戶端異質性調整mu"""
        if not self.fedprox_mu_adaptive:
            return self.fedprox_mu

        # 高異質性使用較小的mu，低異質性使用較大的mu
        if client_heterogeneity > 0.5:
            return 0.001
        elif client_heterogeneity < 0.1:
            return 0.1
        else:
            return self.fedprox_mu_base

    def save(self):
        os.makedirs(self.output_dir, exist_ok=True)
        path = os.path.join(self.output_dir, f'{self.experiment_name}_config.json')
        config_dict = {k: (list(v) if isinstance(v, tuple) else v) for k, v in self.__dict__.items()
                      if not k.startswith('_')}
        with open(path, 'w') as f:
            json.dump(config_dict, f, indent=4)
        logger.info(f"配置已保存至: {path}")

print("✅ Cell 2: TrainingConfig（強化版）定義完成。")

✅ Cell 2: TrainingConfig（強化版）定義完成。


In [3]:
# @title Cell 3: 🧩 數據與環境準備（支援 Non-IID 分配）
class DataManager:
    def __init__(self, data_path, client_pairs_config, enable_non_iid=False,
                 dirichlet_alpha=0.5, min_clients=10):
        logger.info(f"[DataManager] 正在從 {data_path} 讀取數據...")
        self.df_kpi = pd.read_parquet(data_path)
        self.client_pairs_config = client_pairs_config
        self.enable_non_iid = enable_non_iid
        self.dirichlet_alpha = dirichlet_alpha
        self.min_clients = min_clients
        self._sanitize_column_names()
        self._preflight_check()

        # 記錄聚類重新分配次數
        self.cluster_reassign_count = 0

    def _sanitize_column_names(self):
        sanitized_columns = [re.sub(r'[\\[\\]\\(\\)%\\s\\.-]+', '_', col.strip().lower()).strip('_')
                           for col in self.df_kpi.columns]
        self.df_kpi.columns = sanitized_columns

    def _preflight_check(self):
        logger.info("\n" + "="*20 + " DataManager 啟動前預檢查 " + "="*20)
        cols = self.df_kpi.columns.tolist()
        tput_cand = ['throughput_dl_mbps', 'tx_brate_downlink_mbps']
        lat_cand = ['buffer_occupancy_dl_bytes', 'dl_buffer_bytes']

        self.tput_col = next((c for c in tput_cand if c in cols), None)
        self.lat_col = next((c for c in lat_cand if c in cols), None)

        logger.info(f"清理後的欄位列表 (共 {len(cols)} 個):")
        logger.info(f"   - 吞吐量欄位成功匹配: '{self.tput_col}'" if self.tput_col
                   else "   - 吞吐量欄位匹配失敗！")
        logger.info(f"   - 延遲/緩衝區欄位成功匹配: '{self.lat_col}'" if self.lat_col
                   else "   - 延遲/緩衝區欄位匹配失敗！")

        available_bs = sorted(self.df_kpi['bs_id'].unique())
        logger.info(f"   - 可用BS節點: {available_bs}")
        logger.info("="*65 + "\n")

        if not (self.tput_col and self.lat_col):
            raise ValueError("預檢查失敗: 找不到必要的數據欄位。")

    def _get_clean_df(self, gnb_id, slice_id):
        """獲取清理後的數據"""
        df, bs_col, sl_col = self.df_kpi, 'bs_id', 'slice_id'

        mask = (df[bs_col].astype(int) == int(gnb_id)) & (df[sl_col].astype(int) == int(slice_id))
        subset = df.loc[mask, ['timestamp', self.tput_col, self.lat_col]].copy()

        subset = subset.rename(columns={self.tput_col: 'throughput', self.lat_col: 'latency'})
        subset = subset.dropna()

        if not subset.empty:
            subset = subset[
                (subset['throughput'] >= 0) & (subset['throughput'] <= 1000) &
                (subset['latency'] >= 0) & (subset['latency'] <= 1e9)
            ]

        return subset

    def _create_non_iid_distribution(self, total_data_points):
        """使用 Dirichlet 分配創建 Non-IID 數據分割"""
        # 確保至少有 min_clients 個客戶端
        num_clients = max(self.min_clients, len(self.client_pairs_config) * 3)

        # 使用 Dirichlet 分配生成客戶端數據比例
        proportions = np.random.dirichlet(
            alpha=[self.dirichlet_alpha] * num_clients
        )

        # 計算每個客戶端的數據點數
        client_data_sizes = (proportions * total_data_points).astype(int)

        # 確保每個客戶端至少有一些數據
        min_size = max(100, 256)  # 至少256個數據點
        client_data_sizes = np.maximum(client_data_sizes, min_size)

        # 調整總和以匹配總數據點
        diff = total_data_points - client_data_sizes.sum()
        if diff > 0:
            client_data_sizes[np.argmax(client_data_sizes)] += diff
        elif diff < 0:
            largest_idx = np.argmax(client_data_sizes)
            client_data_sizes[largest_idx] = max(min_size, client_data_sizes[largest_idx] + diff)

        return client_data_sizes

    def get_client_trajectories(self):
        """生成客戶端數據軌跡（支援 Non-IID）"""
        if self.enable_non_iid:
            return self._get_non_iid_trajectories()
        else:
            return self._get_standard_trajectories()

    def _get_non_iid_trajectories(self):
        """生成 Non-IID 分佈的客戶端軌跡"""
        logger.info("[DataManager] 正在生成 Non-IID 客戶端數據分佈...")

        # 首先收集所有可用數據
        all_data = []
        for embb_id, urllc_id in self.client_pairs_config:
            df_embb = self._get_clean_df(embb_id, 0)
            df_urllc = self._get_clean_df(urllc_id, 2)

            if not df_embb.empty and not df_urllc.empty:
                # 使用 direction='backward' 而非 'nearest'
                merged_df = pd.merge_asof(
                    df_embb.sort_values('timestamp'),
                    df_urllc.sort_values('timestamp'),
                    on='timestamp',
                    direction='backward',
                    tolerance=pd.Timedelta('100ms'),
                    suffixes=('_embb', '_urllc')
                ).dropna()

                if not merged_df.empty:
                    trajectory = merged_df[['throughput_embb', 'latency_embb',
                                          'throughput_urllc', 'latency_urllc']].to_numpy(dtype=np.float32)
                    all_data.append(trajectory)

        if not all_data:
            logger.error("無法生成 Non-IID 數據：沒有有效數據")
            return {}

        # 合併所有數據
        combined_data = np.vstack(all_data)
        total_points = len(combined_data)

        # 使用 Dirichlet 分配生成客戶端數據大小
        client_sizes = self._create_non_iid_distribution(total_points)
        num_clients = len(client_sizes)

        logger.info(f"   - 使用 Dirichlet(α={self.dirichlet_alpha}) 生成 {num_clients} 個客戶端")
        logger.info(f"   - 數據分佈標準差: {np.std(client_sizes):.2f}")
        logger.info(f"   - 最大/最小客戶端數據比: {max(client_sizes)/min(client_sizes):.2f}")

        # 隨機打亂數據
        np.random.shuffle(combined_data)

        # 分配數據給客戶端
        client_trajectories = {}
        start_idx = 0

        for client_id in range(num_clients):
            end_idx = start_idx + client_sizes[client_id]
            client_data = combined_data[start_idx:min(end_idx, total_points)]

            # 為每個客戶端添加一些特定的偏差（增強 Non-IID 特性）
            if client_id % 3 == 0:
                # 某些客戶端有更高的吞吐量
                client_data[:, [0, 2]] *= np.random.uniform(1.1, 1.3)
            elif client_id % 3 == 1:
                # 某些客戶端有更高的延遲
                client_data[:, [1, 3]] *= np.random.uniform(1.1, 1.3)

            client_trajectories[client_id] = client_data
            logger.info(f"   - 客戶端 {client_id}: {len(client_data)} 個時間步")

            start_idx = end_idx
            if start_idx >= total_points:
                break

        # 添加客戶端 ID 到數據框（用於後續分析）
        self.client_assignments = {i: len(traj) for i, traj in client_trajectories.items()}

        return client_trajectories

    def _get_standard_trajectories(self):
        """原始的標準軌跡生成方法"""
        client_trajectories = {}
        logger.info("[DataManager] 正在為每個客戶端生成數據軌跡...")

        for i, (embb_id, urllc_id) in enumerate(tqdm(self.client_pairs_config, desc="處理客戶端數據")):
            try:
                df_embb = self._get_clean_df(embb_id, 0)
                df_urllc = self._get_clean_df(urllc_id, 2)

                if df_embb.empty or df_urllc.empty:
                    logger.warning(f"客戶端 {i} (BS {embb_id}/{urllc_id}) 篩選後無有效數據。")
                    client_trajectories[i] = np.array([])
                    continue

                merged_df = pd.merge_asof(
                    df_embb.sort_values('timestamp'),
                    df_urllc.sort_values('timestamp'),
                    on='timestamp',
                    direction='backward',
                    tolerance=pd.Timedelta('100ms'),
                    suffixes=('_embb', '_urllc')
                ).dropna()

                if merged_df.empty:
                    logger.warning(f"客戶端 {i} 合併後無有效數據。")
                    client_trajectories[i] = np.array([])
                    continue

                merged_df = merged_df.sort_values('timestamp').reset_index(drop=True)
                trajectory = merged_df[['throughput_embb', 'latency_embb',
                                      'throughput_urllc', 'latency_urllc']].to_numpy(dtype=np.float32)

                client_trajectories[i] = trajectory
                logger.info(f"   - 客戶端 {i}: {len(trajectory)} 個時間步")

            except Exception as e:
                logger.error(f"處理客戶端 {i} 時發生嚴重錯誤: {e}")
                client_trajectories[i] = np.array([])

        num_valid = sum(1 for traj in client_trajectories.values() if traj.size > 0)
        logger.info(f"[DataManager] 數據處理完成！成功為 {num_valid} / {len(self.client_pairs_config)} 個客戶端創建了環境。")
        return client_trajectories

    def cluster_reassignment(self):
        """記錄聚類重新分配"""
        self.cluster_reassign_count += 1
        return self.cluster_reassign_count

print("✅ Cell 3: DataManager（支援 Non-IID）定義完成。")

✅ Cell 3: DataManager（支援 Non-IID）定義完成。


In [4]:
# @title Cell 4: ⚡ RL環境與數據處理（含延遲監控）
import gc
import time
import os
import torch
import numpy as np
import pandas as pd
import contextlib
from collections import deque
from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import autocast, GradScaler

# 【新增】延遲監控類（任務5）
class LatencyMonitor:
    def __init__(self, sla_threshold: float = None):
        self.round_times = []
        self.current_round_start = None
        self.warning_threshold = 1.0  # 1秒警告閾值
        self.sla_threshold = sla_threshold or CONFIG.get("LATENCY_SLA", 0.05)
        self.component_times = {}  # 記錄各組件延遲

    def start_round(self):
        """開始計時一個通信輪"""
        self.current_round_start = time.time()
        self.component_times = {
            'client_compute_time': [],
            'client_to_server_comm': [],
            'server_aggregation_time': [],
            'server_to_client_comm': []
        }

    def record_component(self, component: str, duration: float):
        """記錄組件延遲"""
        if component in self.component_times:
            self.component_times[component].append(duration)

    def end_round(self, round_num: int):
        """結束計時並記錄"""
        if self.current_round_start is None:
            return

        round_time = time.time() - self.current_round_start
        end_to_end_latency = round_time

        # 計算各組件時間
        component_stats = {}
        for comp, times in self.component_times.items():
            if times:
                component_stats[comp] = np.mean(times)
            else:
                component_stats[comp] = 0.0

        # 計算GPU使用情況
        gpu_mem_allocated = 0
        gpu_mem_max = 0
        if torch.cuda.is_available():
            gpu_mem_allocated = torch.cuda.memory_allocated() / 1024**2  # MB
            gpu_mem_max = torch.cuda.max_memory_allocated() / 1024**2  # MB

        self.round_times.append({
            'round': round_num,
            'wall_time': round_time,
            'end_to_end_latency': end_to_end_latency,
            'timestamp': datetime.now().isoformat(),
            'gpu_mem_allocated': gpu_mem_allocated,
            'gpu_mem_max': gpu_mem_max,
            **component_stats
        })

        if round_time > self.warning_threshold:
            logger.warning(f"Round {round_num} 延遲警告: {round_time:.2f}s > {self.warning_threshold}s")

        self.current_round_start = None

    def get_statistics(self):
        """獲取延遲統計"""
        if not self.round_times:
            return {}

        times = [r['wall_time'] for r in self.round_times]
        e2e_times = [r['end_to_end_latency'] for r in self.round_times]

        stats = {
            'mean_time': np.mean(times),
            'median_time': np.median(times),
            'p95_time': np.percentile(times, 95),
            'max_time': np.max(times),
            'total_rounds': len(times),
            'warnings': sum(1 for t in times if t > self.warning_threshold),
            'sla_violations': sum(1 for t in e2e_times if t > self.sla_threshold),
            'mean_e2e_latency': np.mean(e2e_times),
            'p95_e2e_latency': np.percentile(e2e_times, 95)
        }

        # 檢查P95是否超過SLA
        if stats['p95_e2e_latency'] > self.sla_threshold:
            logger.warning(f"P95延遲 {stats['p95_e2e_latency']:.3f}s 超過SLA {self.sla_threshold}s")

        return stats

    def save_to_csv(self, filepath):
        """保存延遲記錄到 CSV"""
        if self.round_times:
            pd.DataFrame(self.round_times).to_csv(filepath, index=False)
            logger.info(f"延遲記錄已保存至: {filepath}")

class PairedEnv:
    """配對環境類別 - 保持原有功能"""
    def __init__(self, trajectory, config: TrainingConfig):
        self.trajectory, self.config = trajectory, config
        self.state_size = trajectory.shape[1] if trajectory.size > 0 else 4
        self.action_size = 3
        self.cursor = 0
        self.reset()

    def reset(self):
        if self.trajectory.size == 0:
            return np.zeros(self.state_size, dtype=np.float32)
        max_start = max(0, len(self.trajectory) - self.config.steps_per_episode)
        if self.config.reset_to_random_start and max_start > 0:
            self.cursor = np.random.randint(0, max_start)
        else:
            self.cursor = 0
        return self.trajectory[self.cursor]

    def step(self, action_id: int):
        if self.trajectory.size == 0 or self.cursor >= len(self.trajectory) - 1:
            state = self.trajectory[-1] if self.trajectory.size > 0 else np.zeros(self.state_size, dtype=np.float32)
            return state, 0.0, True, {}
        self.cursor += 1
        done = self.cursor >= len(self.trajectory) - 1
        state = self.trajectory[self.cursor]
        reward = self._compute_reward_with_action(state, action_id)
        return state, reward, done, {}

    def _compute_reward_with_action(self, state: np.ndarray, action_id: int) -> float:
        tput_embb, lat_embb, tput_urllc, lat_urllc = state
        if action_id == 0: w_tput, w_lat = (0.7, 0.3)
        elif action_id == 2: w_tput, w_lat = (0.3, 0.7)
        else: w_tput, w_lat = (0.5, 0.5)
        tput_reward = w_tput * (np.log1p(tput_embb) + 0.5 * np.log1p(tput_urllc))
        lat_penalty = w_lat * (np.tanh(lat_urllc * 1e-6) + 0.3 * np.tanh(lat_embb * 1e-6))
        reward_val = tput_reward - lat_penalty
        return float(np.nan_to_num(reward_val, nan=0.0, posinf=10.0, neginf=-10.0))

class RLDataset(Dataset):
    """RL數據集類別"""
    def __init__(self, memory_list):
        self.data = memory_list[:]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        state, action, reward, next_state, done = self.data[idx]
        return (
            torch.from_numpy(state).float(),
            torch.tensor(action).long(),
            torch.tensor(reward).float(),
            torch.from_numpy(next_state).float(),
            torch.tensor(done).bool()
        )

def get_gpu_optimized_data_loader(agent_memory: deque, batch_size: int, device: str):
    """GPU性能優化的數據加載器（檢查緩衝區大小）"""
    # 檢查緩衝區大小
    if len(agent_memory) < batch_size:
        return None

    # 若未達到 replay_start_size，也返回 None
    min_size = CONFIG.get('replay_start_size', 1000)
    if len(agent_memory) < min_size:
        logger.debug(f"緩衝區大小 {len(agent_memory)} < {min_size}，跳過訓練")
        return None

    try:
        dataset = RLDataset(list(agent_memory))

        cpu_count = os.cpu_count() or 4
        if cpu_count <= 2:
            num_workers = 0
        else:
            num_workers = min(2, cpu_count // 2)

        dataloader_kwargs = {
            'dataset': dataset,
            'batch_size': batch_size,
            'shuffle': True,
            'num_workers': num_workers,
            'pin_memory': torch.cuda.is_available() and device == 'cuda',
            'drop_last': True,
        }

        if num_workers > 0:
            dataloader_kwargs['persistent_workers'] = True
            dataloader_kwargs['prefetch_factor'] = 2

        if device == 'cuda' and torch.cuda.is_available():
            g = torch.Generator(device='cuda')
            g.manual_seed(42)
            dataloader_kwargs['generator'] = g

        return DataLoader(**dataloader_kwargs)

    except Exception as e:
        logger.error(f"GPU優化DataLoader創建失敗: {e}")
        logger.info(f"   回退到基本設定...")

        try:
            return DataLoader(
                dataset,
                batch_size=batch_size,
                shuffle=True,
                num_workers=0,
                pin_memory=False,
                drop_last=True
            )
        except Exception as e2:
            logger.error(f"回退也失敗: {e2}")
            return None

def check_gpu_memory():
    """檢查GPU記憶體使用情況"""
    if torch.cuda.is_available():
        allocated = torch.cuda.memory_allocated() / 1024**3  # GB
        reserved = torch.cuda.memory_reserved() / 1024**3  # GB
        max_memory = CONFIG.get("GPU_MEMORY_LIMIT", 13 * 1024**3) / 1024**3  # GB

        if allocated > max_memory * 0.9:
            logger.warning(f"GPU記憶體使用接近限制: {allocated:.2f}GB / {max_memory:.2f}GB")
            torch.cuda.empty_cache()
            gc.collect()

def setup_gpu_environment():
    """統一的GPU環境設定函數"""
    if torch.cuda.is_available():
        gpu_name = torch.cuda.get_device_name(0)
        total_memory = torch.cuda.get_device_properties(0).total_memory / 1e9
        compute_capability = torch.cuda.get_device_properties(0).major
        cpu_count = os.cpu_count() or 4

        logger.info(f"GPU 檢測: {gpu_name}")
        logger.info(f"總記憶體: {total_memory:.1f} GB")
        logger.info(f"計算能力: {compute_capability}.x")
        logger.info(f"可用CPU數: {cpu_count}")

        # 設定 CUDA 配置（移除 benchmark 以確保 deterministic）
        torch.backends.cudnn.benchmark = False  # 確保 deterministic
        torch.backends.cuda.matmul.allow_tf32 = True
        torch.backends.cudnn.allow_tf32 = True

        # 清理記憶體
        torch.cuda.empty_cache()
        gc.collect()

        logger.info(f"GPU環境設定完成（Deterministic 模式）")
    else:
        logger.warning("未檢測到GPU，將使用CPU模式運行")

print("✅ Cell 4: RL環境與數據處理（含延遲監控）定義完成。")

✅ Cell 4: RL環境與數據處理（含延遲監控）定義完成。


In [5]:
# @title Cell 5: 🛡️ 核心學習代理（支援 CQL & Adaptive Clipping）
import gc
import time
from opacus import PrivacyEngine
from opacus.validators import ModuleValidator
from opacus.accountants import RDPAccountant
from torch.cuda.amp import GradScaler, autocast

# 【新增】Adaptive Clipper 類（任務3）
class AdaptiveClipper:
    def __init__(self, window_size: int = 50, percentile: float = 0.9):
        self.window_size = window_size
        self.percentile = percentile
        self.gradient_norms = deque(maxlen=window_size)
        self.current_clip_value = 1.0
        self.update_counter = 0
        self.clip_history = []

    def update(self, gradients):
        """更新梯度統計並調整裁剪值"""
        # 計算當前梯度的 L2 norm
        total_norm = 0.0
        for grad in gradients:
            if grad is not None:
                param_norm = grad.data.norm(2).item()
                total_norm += param_norm ** 2
        total_norm = total_norm ** 0.5

        self.gradient_norms.append(total_norm)
        self.update_counter += 1

        # 每 window_size 步更新裁剪值
        if self.update_counter % self.window_size == 0 and len(self.gradient_norms) > 0:
            new_clip = np.percentile(list(self.gradient_norms), self.percentile * 100)
            self.current_clip_value = max(0.1, new_clip)  # 確保最小值
            self.clip_history.append({
                'step': self.update_counter,
                'clip_value': self.current_clip_value,
                'mean_norm': np.mean(list(self.gradient_norms)),
                'std_norm': np.std(list(self.gradient_norms))
            })

    def get_clip_value(self):
        return self.current_clip_value

class RLAgent:
    def __init__(self, state_size: int, action_size: int, config: TrainingConfig, client_id: int,
                 dataset_size: int, is_eval_agent: bool = False):
        self.state_size, self.action_size, self.config = state_size, action_size, config
        self.client_id, self.dataset_size = client_id, dataset_size
        self.device = torch.device(config.device)
        self.mu = self.config.fedprox_mu
        self.gamma, self.epsilon = config.gamma, config.epsilon_start
        self.memory = deque(maxlen=config.memory_capacity)
        self.global_params = None
        self.is_eval_agent = is_eval_agent
        self.privacy_engine = None
        self.dp_steps = 0
        self.current_epsilon = 0.0
        self.current_best_alpha = None
        self.consecutive_errors = 0
        self.max_consecutive_errors = 5

        # 【新增】CQL 相關參數
        self.enable_cql = config.enable_cql
        self.cql_alpha = config.cql_alpha
        self.cql_min_q_weight = config.cql_min_q_weight
        self.cql_loss_alpha_scaler = config.cql_loss_alpha_scaler

        # 【新增】混合精度訓練
        self.use_mixed_precision = config.use_mixed_precision and not is_eval_agent
        if self.use_mixed_precision:
            self.scaler = GradScaler()

        # 【新增】Adaptive Clipping
        self.adaptive_clipper = None
        if config.enable_adaptive_clip and not is_eval_agent:
            self.adaptive_clipper = AdaptiveClipper(
                window_size=config.clip_window,
                percentile=config.clip_percentile
            )

        # 【新增】計算客戶端異質性（用於FedProx）
        self.client_heterogeneity = 0.0

        self.privacy_calc_failures = 0
        self.last_valid_epsilon = 0.0
        self.dp_reset_count = 0
        self.last_reset_round = -1
        self.original_optimizer_class = optim.Adam

        self.model = self._build_dp_model()
        self.target_model = self._build_dp_model()
        self.update_target_model()
        self.target_model.eval()

        # 【新增】L2正則化
        weight_decay = config.weight_decay if config.enable_l2_regularization else 0.0
        self.optimizer = self.original_optimizer_class(
            self.model.parameters(),
            lr=config.lr,
            weight_decay=weight_decay
        )
        self.criterion = nn.MSELoss()

        if self.config.enable_dp and not self.is_eval_agent and self.config.mode != 'Centralized':
            self._initialize_dp_engine()
        else:
            logger.info(f"[C-{self.client_id}] 標準模式（無差分隱私）")

    def _build_dp_model(self):
        model = nn.Sequential(
            nn.Linear(self.state_size, 128), nn.ReLU(),
            nn.Linear(128, 64), nn.ReLU(),
            nn.Linear(64, self.action_size)
        ).to(self.device)
        if self.config.enable_dp and not self.is_eval_agent:
            if not ModuleValidator.is_valid(model):
                model = ModuleValidator.fix(model)
        return model

    def _initialize_dp_engine(self):
        """差分隱私引擎初始化（支援 RDP accountant 和 Adaptive Clipping）"""
        logger.info(f"[C-{self.client_id}] 初始化差分隱私引擎...")

        self.privacy_calc_failures = 0
        self.last_valid_epsilon = 0.0

        try:
            if hasattr(self, 'privacy_engine') and self.privacy_engine is not None:
                del self.privacy_engine
                torch.cuda.empty_cache()
                gc.collect()

            self.model = self._build_dp_model()
            self.optimizer = self.original_optimizer_class(self.model.parameters(), lr=self.config.lr)

            dummy_data = []
            effective_dataset_size = max(self.dataset_size, self.config.batch_size * 10)

            for _ in range(effective_dataset_size):
                state = np.random.randn(self.state_size).astype(np.float32)
                action = int(np.random.randint(0, 3))
                reward = float(np.random.randn())
                next_state = np.random.randn(self.state_size).astype(np.float32)
                done = bool(np.random.choice([True, False]))
                dummy_data.append((state, action, reward, next_state, done))

            dummy_dataset = RLDataset(dummy_data)
            dummy_loader = DataLoader(
                dummy_dataset, batch_size=self.config.batch_size, num_workers=0, shuffle=True
            )

            # 優先使用 RDP accountant
            accountant_options = []
            if self.config.use_rdp_accountant:
                accountant_options.append(("rdp", "RDP"))
            accountant_options.extend([
                ("gdp", "GDP"),
                ("prv", "PRV"),
                (None, "Default")
            ])

            for accountant_type, type_name in accountant_options:
                try:
                    if accountant_type:
                        self.privacy_engine = PrivacyEngine(accountant=accountant_type)
                    else:
                        self.privacy_engine = PrivacyEngine()

                    logger.info(f"   - 嘗試 Accountant: {type_name}")
                    break

                except Exception as e:
                    logger.debug(f"   - {type_name} Accountant 失敗: {e}")
                    continue

            if not self.privacy_engine:
                raise RuntimeError("所有 Accountant 類型都失敗")

            sample_rate = min(self.config.batch_size / len(dummy_data), 1.0)
            logger.info(f"   - Sample Rate: {sample_rate:.6f}")

            # 使用初始裁剪值或 Adaptive Clipper 的當前值
            clip_value = self.config.dp_max_grad_norm
            if self.adaptive_clipper:
                clip_value = self.adaptive_clipper.get_clip_value()
                logger.info(f"   - 使用 Adaptive Clipping: 初始值 {clip_value:.4f}")

            # 自適應調整 noise_multiplier
            noise_multiplier = self.config.dp_noise_multiplier
            if self.config.dp_target_epsilon < 1.0 and self.config.enable_federated_dropout:
                noise_multiplier *= 0.8  # 減少噪音以補償federated dropout
                logger.info(f"   - 啟用 Federated Dropout，調整 noise_multiplier 至 {noise_multiplier:.3f}")

            self.model, self.optimizer, dummy_loader = self.privacy_engine.make_private(
                module=self.model,
                optimizer=self.optimizer,
                data_loader=dummy_loader,
                noise_multiplier=noise_multiplier,
                max_grad_norm=clip_value,
                poisson_sampling=True
            )

            self.dp_steps = 0
            self.current_epsilon = 0.0
            self.current_best_alpha = None

            logger.info(f"   - ✅ 差分隱私引擎初始化成功")

        except Exception as e:
            logger.error(f"   - 差分隱私初始化失敗: {e}")
            logger.info(f"   - 切換到非差分隱私模式")
            self.privacy_engine = None
            self.privacy_calc_failures = 999

    def update_client_heterogeneity(self, other_clients_data):
        """計算客戶端異質性（用於動態調整FedProx mu）"""
        if not other_clients_data:
            self.client_heterogeneity = 0.0
            return

        # 使用KL散度或其他度量計算異質性
        # 這裡簡化為使用模型參數的差異
        my_params = self.get_model_weights_flat()
        differences = []

        for other_params in other_clients_data:
            diff = np.linalg.norm(my_params - other_params)
            differences.append(diff)

        self.client_heterogeneity = np.mean(differences) / (np.linalg.norm(my_params) + 1e-8)

        # 動態調整 mu
        if self.config.fedprox_mu_adaptive:
            self.mu = self.config.adjust_fedprox_mu(self.client_heterogeneity)
            logger.debug(f"[C-{self.client_id}] 異質性: {self.client_heterogeneity:.3f}, 調整 mu 至: {self.mu:.4f}")

    def replay(self, num_batches: int):
        """支援 CQL 的回放訓練（含延遲來源註解）"""
        if len(self.memory) < self.config.batch_size:
            return 0.0

        # 檢查緩衝區大小
        if len(self.memory) < self.config.replay_start_size:
            logger.debug(f"[C-{self.client_id}] 緩衝區未達 {self.config.replay_start_size}，跳過訓練")
            return 0.0

        data_loader = get_gpu_optimized_data_loader(
            self.memory, self.config.batch_size, self.device
        )
        if data_loader is None:
            return 0.0

        total_loss, batches_processed = 0.0, 0
        self.model.train()

        try:
            for i, batch in enumerate(data_loader):
                if i >= num_batches:
                    break

                self.optimizer.zero_grad()
                states, actions, rewards, next_states, dones = [item.to(self.device, non_blocking=True) for item in batch]

                # 使用混合精度訓練
                with autocast(enabled=self.use_mixed_precision):
                    current_q = self.model(states).gather(1, actions.view(-1, 1))

                    with torch.no_grad():
                        max_next_q = self.target_model(next_states).max(1)[0].unsqueeze(1)
                        target_q = rewards.view(-1, 1) + (self.gamma * max_next_q * (~dones.view(-1, 1)))

                    # 標準 Q-learning 損失
                    td_loss = self.criterion(current_q, target_q)

                    # 【新增】CQL 損失項
                    if self.enable_cql:
                        # 計算所有動作的 Q 值
                        all_q_values = self.model(states)

                        # CQL 懲罰項：log-sum-exp 的 Q 值
                        # 註解：torch.logsumexp 為延遲來源之一
                        logsumexp_q = torch.logsumexp(all_q_values, dim=1, keepdim=True)

                        # 最小化 Q 值的懲罰
                        cql_loss = self.cql_alpha * (logsumexp_q.mean() - current_q.mean())

                        # 添加最小 Q 值約束
                        min_q_loss = self.cql_min_q_weight * (all_q_values.min(dim=1)[0].mean() - target_q.mean()).abs()

                        # 使用損失縮放因子
                        loss = td_loss + self.cql_loss_alpha_scaler * cql_loss + min_q_loss
                    else:
                        loss = td_loss

                    # FedProx 正則化
                    if self.config.mode in ['FedProx', 'ClusteredFL'] and self.mu > 0 and self.global_params:
                        proximal_term = 0.0
                        model_params = self.model._module.parameters() if hasattr(self.model, '_module') else self.model.parameters()
                        for local_param, global_param in zip(model_params, self.global_params):
                            proximal_term += torch.sum((local_param - global_param.to(self.device))**2)
                        loss += (self.mu / 2) * proximal_term

                    # 【新增】Federated Dropout
                    if self.config.enable_federated_dropout and self.config.dp_target_epsilon < 1.0:
                        # 隨機丟棄部分梯度
                        dropout_mask = torch.rand_like(loss) > self.config.federated_dropout_rate
                        loss = loss * dropout_mask.float()

                if not torch.isfinite(loss):
                    continue

                # 反向傳播
                if self.use_mixed_precision:
                    self.scaler.scale(loss).backward()
                else:
                    loss.backward()

                # 【新增】更新 Adaptive Clipper
                if self.adaptive_clipper and not self.privacy_engine:
                    gradients = [p.grad for p in self.model.parameters()]
                    self.adaptive_clipper.update(gradients)

                    # 手動裁剪梯度
                    clip_value = self.adaptive_clipper.get_clip_value()
                    torch.nn.utils.clip_grad_norm_(self.model.parameters(), clip_value)

                # 優化器步驟
                if self.use_mixed_precision:
                    self.scaler.step(self.optimizer)
                    self.scaler.update()
                else:
                    self.optimizer.step()

                total_loss += loss.item()
                batches_processed += 1
                if self.privacy_engine:
                    self.dp_steps += 1

            # 檢查GPU記憶體
            check_gpu_memory()

            return total_loss / batches_processed if batches_processed > 0 else 0.0
        except Exception as e:
            logger.error(f"[C-{self.client_id}] 回放錯誤: {e}")
            return 0.0

    # 動態調整CQL參數的方法
    def adjust_cql_params(self, current_epsilon: float):
        """根據當前隱私預算動態調整CQL參數"""
        if current_epsilon > 10:
            self.cql_alpha = 0.5
            self.cql_min_q_weight = 2.0
        elif current_epsilon < 1:
            self.cql_alpha = 2.0
            self.cql_min_q_weight = 10.0
        else:
            # 線性插值
            alpha_range = (2.0, 0.5)
            weight_range = (10.0, 2.0)
            t = (current_epsilon - 1) / 9  # 正規化到[0,1]
            self.cql_alpha = alpha_range[0] + t * (alpha_range[1] - alpha_range[0])
            self.cql_min_q_weight = weight_range[0] + t * (weight_range[1] - weight_range[0])

    def reset_dp_engine(self, round_num: int):
        """重設差分隱私引擎（優化版）"""
        if not self.config.enable_dp_reset or not self.privacy_engine:
            return False

        logger.info(f"[C-{self.client_id}] 重設差分隱私引擎（Round {round_num}）...")

        # 檢查當輪消耗速率
        if hasattr(self, '_last_round_epsilon'):
            consumption_rate = self.current_epsilon - self._last_round_epsilon
            if consumption_rate > self.config.dp_target_epsilon * 0.1:
                logger.warning(f"[C-{self.client_id}] 隱私預算消耗過快: {consumption_rate:.4f}/輪")

        self._last_round_epsilon = self.current_epsilon

        try:
            self.dp_reset_count += 1
            self.last_reset_round = round_num
            old_epsilon = self.current_epsilon

            self._initialize_dp_engine()

            logger.info(f"   - ✅ 重設完成（第{self.dp_reset_count}次）")
            logger.info(f"   - 舊ε: {old_epsilon:.4f} → 新ε: {self.current_epsilon:.4f}")
            return True
        except Exception as e:
            logger.error(f"   - 重設失敗: {e}")
            return False

    def get_privacy_cost(self):
        """獲取隱私成本（優化版）"""
        if not self.privacy_engine:
            return 0.0

        if hasattr(self, 'privacy_calc_failures') and self.privacy_calc_failures > 10:
            return getattr(self, 'last_valid_epsilon', 0.0)

        try:
            # 使用配置的delta值
            delta = self.config.dp_target_delta

            result = self.privacy_engine.get_epsilon(delta=delta)

            if isinstance(result, tuple):
                if len(result) == 2:
                    epsilon, best_alpha = result
                    self.current_best_alpha = best_alpha
                else:
                    epsilon = result[0]
                    self.current_best_alpha = None
            elif isinstance(result, (int, float)):
                epsilon = result
                self.current_best_alpha = None
            else:
                raise ValueError(f"未知的epsilon返回格式: {type(result)}")

            if np.isinf(epsilon) or np.isnan(epsilon) or epsilon < 0:
                raise ValueError(f"無效epsilon值: {epsilon}")

            if hasattr(self, 'privacy_calc_failures'):
                self.privacy_calc_failures = 0

            self.current_epsilon = float(epsilon)
            self.last_valid_epsilon = self.current_epsilon

            # 動態調整CQL參數
            if self.enable_cql:
                self.adjust_cql_params(self.current_epsilon)

            return self.current_epsilon

        except Exception as primary_error:
            if not hasattr(self, 'privacy_calc_failures'):
                self.privacy_calc_failures = 0
            self.privacy_calc_failures += 1

            try:
                if hasattr(self, 'dp_steps') and self.dp_steps > 0:
                    estimated_epsilon = self.dp_steps * 0.01
                    estimated_epsilon = min(estimated_epsilon, self.config.dp_target_epsilon)

                    if self.privacy_calc_failures <= 3:
                        logger.warning(f"[C-{self.client_id}] 隱私成本計算失敗: {primary_error}")
                        logger.info(f"   使用估算值: ε ≈ {estimated_epsilon:.4f} (基於 {self.dp_steps} 步)")

                    self.current_epsilon = estimated_epsilon
                    return estimated_epsilon

            except Exception as fallback_error:
                if self.privacy_calc_failures <= 3:
                    logger.error(f"[C-{self.client_id}] 備用隱私計算也失敗: {fallback_error}")

            if hasattr(self, 'last_valid_epsilon'):
                cached_value = self.last_valid_epsilon
                if self.privacy_calc_failures <= 3:
                    logger.info(f"[C-{self.client_id}] 使用緩存隱私值: ε = {cached_value:.4f}")
                return cached_value

            if self.privacy_calc_failures <= 3:
                logger.warning(f"[C-{self.client_id}] 隱私成本計算完全失敗: {primary_error}")
                logger.info(f"   返回 0.0，訓練繼續（失敗次數: {self.privacy_calc_failures}）")
            elif self.privacy_calc_failures == 11:
                logger.info(f"[C-{self.client_id}] 隱私計算持續失敗，已切換至靜默模式")

            return 0.0

    def get_privacy_detailed_info(self):
        """獲取詳細隱私信息（包含delta和noise_multiplier）"""
        return {
            'client_id': self.client_id,
            'epsilon': self.current_epsilon,
            'delta': self.config.dp_target_delta,
            'noise_multiplier': self.config.dp_noise_multiplier,
            'best_alpha': self.current_best_alpha,
            'dp_steps': self.dp_steps,
            'reset_count': self.dp_reset_count,
            'last_reset_round': self.last_reset_round,
            'calc_failures': getattr(self, 'privacy_calc_failures', 0),
            'last_valid_epsilon': getattr(self, 'last_valid_epsilon', 0.0)
        }

    def remember(self, *args):
        self.memory.append(args)

    def set_global_params(self, state_dict):
        with torch.no_grad():
            self.global_params = [p.clone().detach().cpu() for p in state_dict.values()]

    def act(self, state):
        if not self.is_eval_agent and np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        with torch.no_grad():
            q_values = self.model(torch.from_numpy(state).float().unsqueeze(0).to(self.device))
        return q_values.argmax().item()

    def get_clean_state_dict(self):
        return self.model._module.state_dict() if self.privacy_engine and hasattr(self.model, '_module') else self.model.state_dict()

    def update_target_model(self):
        self.target_model.load_state_dict(self.get_clean_state_dict())

    def get_model_weights_flat(self):
        with torch.no_grad():
            params = self.model._module.parameters() if self.privacy_engine and hasattr(self.model, '_module') else self.model.parameters()
            return torch.cat([p.view(-1) for p in params]).cpu().numpy()

    def get_model_for_upload(self):
        """獲取用於上傳的模型（支援稀疏梯度傳回）"""
        state_dict = self.get_clean_state_dict()

        # 【新增】如果有 Adaptive Clipper，保存其統計信息
        if self.adaptive_clipper:
            clip_info = {
                'current_clip': self.adaptive_clipper.current_clip_value,
                'history': self.adaptive_clipper.clip_history[-10:]  # 最近10個記錄
            }
            state_dict['_adaptive_clip_info'] = clip_info

        # 壓縮模型
        if self.config.enable_compression:
            # 使用半精度
            compressed_dict = {k: v.half() for k, v in state_dict.items() if not k.startswith('_')}

            # 可選：稀疏表示（僅傳輸重要參數）
            # TODO: 實現稀疏梯度傳輸

            return compressed_dict
        else:
            return state_dict

print("✅ Cell 5: RLAgent（支援 CQL & Adaptive Clipping）定義完成。")

✅ Cell 5: RLAgent（支援 CQL & Adaptive Clipping）定義完成。


In [6]:
# @title Cell 6: 🌐 聯邦學習服務器（支援非同步聚合）
import torch
import numpy as np
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import cosine_similarity
from typing import Dict, List, Tuple, Any
import copy
import asyncio
import concurrent.futures
from threading import Lock

class FLServer:
    """
    聯邦學習服務器類別（支援非同步聚合和動態聚類）
    """

    def __init__(self, config: TrainingConfig):
        self.config = config
        self.num_clusters = config.num_clusters
        self.client_to_cluster = {}
        self.cluster_models = {}
        self.aggregation_weights = {}

        # 【新增】動態聚類相關
        self.cluster_update_round = 0
        self.cluster_history = []
        self.validation_rewards = {}

        # 【新增】非同步聚合相關
        self.enable_async = config.enable_async_aggregation
        self.timeout_threshold = config.timeout_threshold
        self.client_timeout = config.client_timeout_seconds
        self.pending_updates = {}
        self.update_lock = Lock()
        self.async_aggregation_count = 0

        # 初始化聚類模型
        for i in range(self.num_clusters):
            self.cluster_models[i] = None

        logger.info(f"[FLServer] 初始化完成 - 聚類數: {self.num_clusters}")
        if self.enable_async:
            logger.info(f"[FLServer] 非同步聚合已啟用 - 超時閾值: {self.timeout_threshold}")

    def _convert_state_dict_for_dp_model(self, state_dict: Dict, target_is_dp: bool) -> Dict:
        """自動轉換state_dict格式以匹配DP/非DP模型"""
        if not state_dict:
            return state_dict

        converted_dict = {}

        for key, value in state_dict.items():
            if target_is_dp and not key.startswith('_module.'):
                new_key = f"_module.{key}"
                converted_dict[new_key] = value
            elif not target_is_dp and key.startswith('_module.'):
                new_key = key.replace('_module.', '')
                converted_dict[new_key] = value
            else:
                converted_dict[key] = value

        return converted_dict

    def _is_dp_model(self, model) -> bool:
        """檢測模型是否為差分隱私包裝模型"""
        if hasattr(model, '_module'):
            return True

        try:
            state_dict = model.state_dict()
            return any(k.startswith('_module.') for k in state_dict.keys())
        except:
            return False

    def distribute_model(self, participating_agents: Dict, global_model_state: Dict):
        """將模型分發給參與的客戶端（支援知識遷移）"""
        if not global_model_state:
            logger.warning(f"[FLServer] 全域模型狀態為空，跳過分發")
            return

        total_clients = len(participating_agents)
        successful_distributions = 0
        failed_distributions = 0

        for client_id, agent in participating_agents.items():
            try:
                if self.config.mode == 'ClusteredFL':
                    cluster_id = self.client_to_cluster.get(client_id, 0)

                    # 【新增】知識遷移：新群組模型以全局模型初始化
                    if self.config.knowledge_transfer and cluster_id in self.cluster_models:
                        if self.cluster_models[cluster_id] is None:
                            self.cluster_models[cluster_id] = copy.deepcopy(global_model_state)
                            logger.info(f"[FLServer] 聚類 {cluster_id} 使用全局模型初始化（知識遷移）")

                    if cluster_id in self.cluster_models and self.cluster_models[cluster_id] is not None:
                        model_to_send = self.cluster_models[cluster_id]
                    else:
                        model_to_send = global_model_state
                else:
                    model_to_send = global_model_state

                if not model_to_send:
                    logger.warning(f"[FLServer] 客戶端 {client_id}: 無可分發模型")
                    failed_distributions += 1
                    continue

                agent_is_dp = self._is_dp_model(agent.model)

                converted_model = self._convert_state_dict_for_dp_model(
                    model_to_send, target_is_dp=agent_is_dp
                )

                if self.config.enable_compression:
                    converted_model = {k: v.half() if hasattr(v, 'half') else v
                                     for k, v in converted_model.items()}

                try:
                    agent.set_global_params(converted_model)
                except Exception as e:
                    logger.debug(f"[FLServer] 客戶端 {client_id}: set_global_params失敗: {e}")

                agent.model.load_state_dict(converted_model, strict=True)
                successful_distributions += 1

            except Exception as e:
                failed_distributions += 1
                logger.error(f"[FLServer] 客戶端 {client_id} 模型分發失敗: {e}")

                try:
                    agent.model.load_state_dict(model_to_send, strict=False)
                    successful_distributions += 1
                    logger.info(f"  - 回退成功：非嚴格模式載入")
                except Exception as e2:
                    logger.error(f"  - 所有回退方式都失敗: {e2}")

        logger.info(f"[FLServer] 模型分發完成: 成功 {successful_distributions}/{total_clients}")
        if failed_distributions > 0:
            logger.warning(f"  - 失敗數量: {failed_distributions}")

    def aggregate_weighted(self, client_updates: List[Tuple[Dict, int]]) -> Dict:
        """加權聚合客戶端模型更新（FedAvg動態加權）"""
        if not client_updates:
            return {}

        # 計算總樣本數
        total_samples = sum(num_samples for _, num_samples in client_updates)
        if total_samples == 0:
            return client_updates[0][0] if client_updates else {}

        first_model = client_updates[0][0]
        if not first_model:
            return {}

        standard_first_model = self._convert_state_dict_for_dp_model(first_model, target_is_dp=False)

        aggregated_model = {}
        for key in standard_first_model.keys():
            aggregated_model[key] = torch.zeros_like(standard_first_model[key])

        # 【新增】FedAvg動態加權
        for model_state, num_samples in client_updates:
            if not model_state:
                continue

            # 根據客戶端資料量計算權重
            weight = num_samples / total_samples if self.config.use_weighted_fedavg else 1.0 / len(client_updates)

            standard_model_state = self._convert_state_dict_for_dp_model(model_state, target_is_dp=False)

            for key in aggregated_model.keys():
                if key in standard_model_state:
                    param_tensor = standard_model_state[key]
                    if param_tensor.device != aggregated_model[key].device:
                        param_tensor = param_tensor.to(aggregated_model[key].device)

                    aggregated_model[key] += weight * param_tensor

        return aggregated_model

    async def async_aggregate(self, client_updates_dict: Dict[int, Tuple[Dict, int]],
                            timeout: float = None) -> Dict:
        """【新增】非同步聚合方法（不依賴ray）"""
        if timeout is None:
            timeout = self.client_timeout

        logger.info(f"[FLServer] 開始非同步聚合 - 超時設定: {timeout}s")

        collected_updates = []
        start_time = time.time()

        # 使用 asyncio 實現非同步聚合
        async def collect_with_timeout(client_id, update):
            try:
                # 模擬非同步處理
                await asyncio.sleep(0.001)  # 極短的延遲以允許其他協程執行
                return client_id, update
            except asyncio.TimeoutError:
                logger.warning(f"[FLServer] 客戶端 {client_id} 超時")
                return client_id, None

        # 創建非同步任務
        tasks = []
        for client_id, update in client_updates_dict.items():
            task = asyncio.create_task(collect_with_timeout(client_id, update))
            tasks.append(task)

        # 等待所有任務完成或超時
        try:
            done, pending = await asyncio.wait(tasks, timeout=timeout)
        except Exception as e:
            logger.error(f"[FLServer] 非同步等待錯誤: {e}")
            # 回退到同步聚合
            return self.aggregate_weighted(list(client_updates_dict.values()))

        # 收集完成的更新
        for task in done:
            try:
                client_id, update = await task
                if update is not None:
                    collected_updates.append(update)
            except Exception as e:
                logger.error(f"[FLServer] 收集更新錯誤: {e}")

        # 取消未完成的任務
        for task in pending:
            task.cancel()

        elapsed_time = time.time() - start_time
        timeout_count = len(client_updates_dict) - len(collected_updates)
        success_count = len(collected_updates)

        logger.info(f"[FLServer] 非同步聚合完成:")
        logger.info(f"  - 成功收集: {success_count}/{len(client_updates_dict)}")
        logger.info(f"  - 超時客戶端: {timeout_count}")
        logger.info(f"  - 耗時: {elapsed_time:.2f}s")

        # 如果收集到足夠的更新，進行聚合
        if collected_updates:
            self.async_aggregation_count += 1
            return self.aggregate_weighted(collected_updates)
        else:
            logger.warning(f"[FLServer] 無有效更新可聚合")
            return {}

    def should_use_async_aggregation(self, client_responses: Dict[int, bool]) -> bool:
        """【新增】判斷是否應使用非同步聚合"""
        if not self.enable_async:
            return False

        total_clients = len(client_responses)
        timeout_clients = sum(1 for responded in client_responses.values() if not responded)
        timeout_ratio = timeout_clients / total_clients if total_clients > 0 else 0

        should_async = timeout_ratio > self.timeout_threshold

        if should_async:
            logger.info(f"[FLServer] 觸發非同步聚合 - 超時比例: {timeout_ratio:.2%} > {self.timeout_threshold:.0%}")

        return should_async

    def update_clusters(self, client_agents: Dict, current_round: int, validation_rewards: Dict = None):
        """更新客戶端聚類（動態頻率和餘弦相似度）"""
        self.cluster_update_round = current_round

        # 【新增】動態調整聚類更新頻率
        update_freq = self.config.get_dynamic_cluster_freq(current_round)
        if current_round % update_freq != 0:
            return

        # 【新增】第K輪後使用餘弦相似度判斷
        if current_round > 3 and self.config.use_cosine_similarity and validation_rewards:
            if not self._should_update_clusters_by_similarity(validation_rewards):
                logger.info(f"[FLServer] Round {current_round}: 餘弦相似度檢查，無需重新聚類")
                return

        if len(client_agents) < self.num_clusters:
            for i, client_id in enumerate(client_agents.keys()):
                self.client_to_cluster[client_id] = i % self.num_clusters
            return

        try:
            client_features = []
            client_ids = []

            for client_id, agent in client_agents.items():
                try:
                    weights = agent.get_model_weights_flat()
                    if weights is not None and len(weights) > 0:
                        client_features.append(weights)
                        client_ids.append(client_id)
                except Exception as e:
                    logger.warning(f"[FLServer] 客戶端 {client_id} 特徵提取失敗: {e}")
                    continue

            if len(client_features) < self.num_clusters:
                for i, client_id in enumerate(client_ids):
                    self.client_to_cluster[client_id] = i % self.num_clusters
                return

            client_features_array = np.vstack(client_features)

            # 標準化特徵
            scaler = StandardScaler()
            normalized_features = scaler.fit_transform(client_features_array)

            # K-means聚類
            kmeans = KMeans(n_clusters=self.num_clusters, random_state=42, n_init=10)
            cluster_labels = kmeans.fit_predict(normalized_features)

            # 記錄聚類變化
            old_clusters = self.client_to_cluster.copy()
            reassignment_count = 0

            for client_id, cluster_label in zip(client_ids, cluster_labels):
                if client_id in old_clusters and old_clusters[client_id] != cluster_label:
                    reassignment_count += 1
                self.client_to_cluster[client_id] = int(cluster_label)

            # 記錄聚類歷史
            self.cluster_history.append({
                'round': current_round,
                'reassignment_count': reassignment_count,
                'cluster_assignments': self.client_to_cluster.copy()
            })

            logger.info(f"[FLServer] Round {current_round}: 聚類更新完成，重新分配數: {reassignment_count}")
            for cluster_id in range(self.num_clusters):
                clients_in_cluster = [cid for cid, cid_cluster in self.client_to_cluster.items()
                                    if cid_cluster == cluster_id]
                logger.info(f"  聚類 {cluster_id}: {clients_in_cluster}")

        except Exception as e:
            logger.error(f"[FLServer] 聚類更新失敗: {e}")
            for i, client_id in enumerate(client_agents.keys()):
                self.client_to_cluster[client_id] = i % self.num_clusters

    def _should_update_clusters_by_similarity(self, validation_rewards: Dict) -> bool:
        """使用餘弦相似度判斷是否需要重新聚類"""
        if not self.validation_rewards:
            self.validation_rewards = validation_rewards
            return True

        # 計算獎勵向量的餘弦相似度
        old_rewards = np.array(list(self.validation_rewards.values()))
        new_rewards = np.array(list(validation_rewards.values()))

        if len(old_rewards) != len(new_rewards):
            self.validation_rewards = validation_rewards
            return True

        similarity = cosine_similarity([old_rewards], [new_rewards])[0][0]

        # 如果相似度低於閾值，需要重新聚類
        threshold = 0.95
        should_update = similarity < threshold

        if should_update:
            self.validation_rewards = validation_rewards

        return should_update

    def get_cluster_info(self) -> Dict:
        """獲取聚類信息"""
        cluster_info = {}
        for cluster_id in range(self.num_clusters):
            clients_in_cluster = [cid for cid, cid_cluster in self.client_to_cluster.items()
                                if cid_cluster == cluster_id]
            cluster_info[cluster_id] = {
                'clients': clients_in_cluster,
                'size': len(clients_in_cluster),
                'has_model': cluster_id in self.cluster_models and self.cluster_models[cluster_id] is not None,
                'async_aggregations': self.async_aggregation_count,
                'update_round': self.cluster_update_round
            }
        return cluster_info

    def get_cluster_reassignment_count(self) -> int:
        """獲取聚類重新分配總次數"""
        return sum(h['reassignment_count'] for h in self.cluster_history)

print("✅ Cell 6: FLServer（支援非同步聚合和動態聚類）定義完成。")

✅ Cell 6: FLServer（支援非同步聚合和動態聚類）定義完成。


In [7]:
# @title Cell 7: 🚀 ExperimentRunner（完整強化版）
import scipy.stats as stats
import time
from tqdm.notebook import tqdm
import copy
import numpy as np
import pandas as pd
import os
import asyncio

# 嘗試導入 wandb 和 jsonlines，如果失敗則使用替代方案
try:
    import wandb
    WANDB_AVAILABLE = True
except ImportError:
    WANDB_AVAILABLE = False
    logger.warning("WandB 未安裝，僅使用本地日誌")

try:
    import jsonlines
    JSONLINES_AVAILABLE = True
except ImportError:
    JSONLINES_AVAILABLE = False
    logger.warning("jsonlines 未安裝，使用標準 JSON")

class ExperimentRunner:
    def __init__(self, config: TrainingConfig, data_manager: DataManager, all_trajectories, client_pairs):
        self.config, self.data_manager, self.server = config, data_manager, FLServer(config)
        self.training_history, self.evaluation_results, self.privacy_costs = [], [], []

        # 隱私預算管理
        self.total_privacy_budget = config.dp_target_epsilon if config.enable_dp else 0.0
        self.consumed_privacy_budget = 0.0
        self.privacy_budget_exceeded = False
        self.dp_reset_history = []
        self.round_privacy_costs = []
        self.detailed_privacy_logs = []

        # 死循環防護
        self.max_resets_per_round = 3
        self.current_round_resets = 0
        self.consecutive_reset_rounds = 0
        self.max_consecutive_resets = 5

        # 【新增】延遲監控器（任務5）
        self.latency_monitor = LatencyMonitor(sla_threshold=CONFIG.get("LATENCY_SLA", 0.05))

        # 【新增】CQL 性能追蹤（任務2）
        self.cql_performance_history = []

        # 【新增】Adaptive Clipping 統計
        self.adaptive_clip_stats = []

        # 【新增】收斂統計
        self.convergence_stats = []

        # 【新增】早期停止監控
        self.best_validation_reward = -float('inf')
        self.early_stopping_counter = 0

        # 初始化 wandb（如果可用）
        if WANDB_AVAILABLE and config.mode != 'Centralized':
            try:
                wandb.init(
                    project="federated-rl-enhanced",
                    name=config.experiment_name,
                    config=asdict(config)
                )
            except Exception as e:
                logger.warning(f"WandB 初始化失敗: {e}")

        self._set_seeds()

        logger.info("[ExperimentRunner] 正在初始化客戶端環境與代理...")

        if self.config.enable_dp and self.config.mode != 'Centralized':
            logger.info(f"差分隱私模式啟用（強化版）：")
            logger.info(f"   - 總隱私預算上限: ε={self.total_privacy_budget}")
            logger.info(f"   - Adaptive Clipping: {'啟用' if config.enable_adaptive_clip else '禁用'}")
            logger.info(f"   - 非同步聚合: {'啟用' if config.enable_async_aggregation else '禁用'}")

        # 初始化客戶端環境
        self.client_envs = {cid: PairedEnv(traj, config) for cid, traj in all_trajectories.items() if traj.size > 0}
        if not self.client_envs:
            raise ValueError("DataManager 未能為任何客戶端創建有效的環境。")

        self.config.num_clients = len(self.client_envs)

        if self.config.mode == "Centralized":
            central_config = copy.deepcopy(config)
            central_config.enable_dp = False
            pooled_trajectory = np.vstack([traj for traj in all_trajectories.values() if traj.size > 0])
            self.central_env = PairedEnv(pooled_trajectory, central_config)
            self.central_agent = RLAgent(self.central_env.state_size, self.central_env.action_size, central_config, 0, len(pooled_trajectory), False)
            self.client_agents = {}
        else:
            self.client_agents = {}
            for cid, env in self.client_envs.items():
                dataset_size = len(env.trajectory) if env.trajectory.size > 0 else 1
                self.client_agents[cid] = RLAgent(env.state_size, env.action_size, config, cid, dataset_size, False)

        if self.client_agents:
            self.global_model_state = self.client_agents[next(iter(self.client_agents))].get_clean_state_dict()
        else:
            self.global_model_state = self.central_agent.get_clean_state_dict()

        self.config.save()
        logger.info("[ExperimentRunner] 初始化完成。")

    def _set_seeds(self):
        """使用強化的 deterministic 設定"""
        set_global_seed(self.config.random_seed)

    def _check_privacy_budget_and_reset(self, round_privacy_costs, current_round):
        """隱私預算檢查與智能重設機制（優化版）"""
        if not self.config.enable_dp or self.config.mode == 'Centralized':
            return

        if current_round != getattr(self, '_last_checked_round', -1):
            self.current_round_resets = 0
            self._last_checked_round = current_round

        round_detailed_info = []
        for cid, agent in self.client_agents.items():
            detailed_info = agent.get_privacy_detailed_info()
            detailed_info['round'] = current_round
            round_detailed_info.append(detailed_info)
        self.detailed_privacy_logs.extend(round_detailed_info)

        if round_privacy_costs:
            round_avg_epsilon = np.mean(round_privacy_costs)
            round_max_epsilon = np.max(round_privacy_costs)
            round_min_epsilon = np.min(round_privacy_costs)

            # 計算當輪消耗速率
            if hasattr(self, '_last_round_avg_epsilon'):
                consumption_rate = round_avg_epsilon - self._last_round_avg_epsilon
            else:
                consumption_rate = round_avg_epsilon
            self._last_round_avg_epsilon = round_avg_epsilon

            self.consumed_privacy_budget += round_avg_epsilon

            self.round_privacy_costs.append({
                'round': current_round,
                'avg_epsilon': round_avg_epsilon,
                'max_epsilon': round_max_epsilon,
                'min_epsilon': round_min_epsilon,
                'std_epsilon': np.std(round_privacy_costs),
                'participating_clients': len(round_privacy_costs),
                'cumulative_epsilon': self.consumed_privacy_budget,
                'round_resets': self.current_round_resets,
                'consumption_rate': consumption_rate
            })

            budget_ratio = self.consumed_privacy_budget / self.total_privacy_budget
            reset_threshold = self.total_privacy_budget * self.config.dp_reset_threshold_multiplier

            if budget_ratio > 1.0 and not self.privacy_budget_exceeded:
                logger.warning(f"\n{'='*20} ⚠️ 隱私預算首次超支！ {'='*20}")
                logger.warning(f"   - 當前消耗: ε = {self.consumed_privacy_budget:.4f}")
                logger.warning(f"   - 預算上限: ε = {self.total_privacy_budget}")
                logger.warning(f"{'='*58}")
                self.privacy_budget_exceeded = True

            # 根據消耗速率決定是否重設
            should_reset = (
                self.consumed_privacy_budget > reset_threshold and
                self.config.enable_dp_reset and
                self.current_round_resets < self.max_resets_per_round and
                self.consecutive_reset_rounds < self.max_consecutive_resets and
                consumption_rate > self.total_privacy_budget * 0.05  # 消耗速率超過5%才重設
            )

            if should_reset:
                logger.info(f"\n🔄 觸發隱私預算重設機制（Round {current_round}）")
                logger.info(f"   - 消耗速率: {consumption_rate:.4f}/輪")

                reset_count = 0
                successful_resets = []
                reset_failures = []

                for cid, agent in self.client_agents.items():
                    if agent.reset_dp_engine(current_round):
                        reset_count += 1
                        successful_resets.append(cid)
                    else:
                        reset_failures.append(cid)

                if reset_count > 0:
                    self.current_round_resets += 1
                    self.consecutive_reset_rounds += 1

                    self.dp_reset_history.append({
                        'round': current_round,
                        'reset_count': reset_count,
                        'total_clients': len(self.client_agents),
                        'successful_resets': successful_resets,
                        'failed_resets': reset_failures,
                        'budget_before_reset': self.consumed_privacy_budget,
                        'trigger_threshold': reset_threshold,
                        'round_reset_number': self.current_round_resets,
                        'consumption_rate': consumption_rate
                    })

                    self.consumed_privacy_budget = 0.0
                    self.privacy_budget_exceeded = False
                    logger.info(f"   - ✅ 重設完成，新一輪預算開始")
            else:
                self.consecutive_reset_rounds = 0

    def _train_agent_locally(self, agent: RLAgent, env: PairedEnv, episodes: int, is_finetune: bool = False):
        """訓練邏輯，增加步數監控和 CQL 支援"""
        agent.model.train()
        total_loss, total_reward, training_steps, episode_count = 0.0, 0.0, 0, 0

        if episodes == 0:
            return 0.0, 0.0, 0.0

        initial_dp_steps = getattr(agent, 'dp_steps', 0)
        max_steps_per_round = self.config.local_episodes_per_round * self.config.steps_per_episode * 2

        # 【新增】記錄 CQL 特定指標
        cql_losses = []

        # 開始計時客戶端計算
        client_compute_start = time.time()

        for episode in range(episodes):
            state, episode_reward, done = env.reset(), 0.0, False
            for step in range(1, self.config.steps_per_episode + 1):
                action = agent.act(state)
                next_state, reward, done, _ = env.step(action)
                agent.remember(state, action, reward, next_state, done)
                state = next_state
                episode_reward += reward

                if len(agent.memory) > self.config.replay_start_size and step % self.config.replay_frequency == 0:
                    loss = agent.replay(num_batches=self.config.replay_batches_per_call)
                    total_loss += loss
                    training_steps += 1

                    if agent.enable_cql:
                        cql_losses.append(loss)

                    current_dp_steps = getattr(agent, 'dp_steps', 0)
                    if current_dp_steps - initial_dp_steps > max_steps_per_round:
                        logger.warning(f"[C-{agent.client_id}] 訓練步數超限，提前結束")
                        break

                if done:
                    break

            total_reward += episode_reward
            episode_count += 1
            if (episode + 1) % self.config.target_update_freq == 0:
                agent.update_target_model()

        # 記錄客戶端計算時間（修正縮排）
        client_compute_time = time.time() - client_compute_start
        self.latency_monitor.record_component('client_compute_time', client_compute_time)

        if not agent.is_eval_agent and not is_finetune and agent.epsilon > self.config.epsilon_min:
            agent.epsilon *= self.config.epsilon_decay

        avg_loss = total_loss / training_steps if training_steps > 0 else 0.0
        avg_reward = total_reward / episode_count if episode_count > 0 else 0.0
        privacy_cost = agent.get_privacy_cost() if training_steps > 0 and self.config.enable_dp and agent.privacy_engine else 0.0

        # 【新增】記錄 CQL 性能
        if agent.enable_cql and cql_losses:
            self.cql_performance_history.append({
                'client_id': agent.client_id,
                'avg_cql_loss': np.mean(cql_losses),
                'min_cql_loss': np.min(cql_losses),
                'max_cql_loss': np.max(cql_losses),
                'round': getattr(self, '_current_round', 0)
            })

        return avg_loss, avg_reward, privacy_cost

    def _evaluate_and_log(self, round_num: int, participating_agents: Dict):
        """評估並記錄reward_global_model和reward_pfl_finetuned"""
        eval_results = []

        for client_id, agent in participating_agents.items():
            env = self.client_envs[client_id]

            # 評估全局模型
            global_reward = self._evaluate_agent(env, self.global_model_state, num_episodes=5)

            # 評估個性化模型（PFL微調後）
            pfl_reward = self._evaluate_agent(env, agent.get_clean_state_dict(), num_episodes=5)

            eval_results.append({
                'round': round_num,
                'client_id': client_id,
                'reward_global_model': global_reward,
                'reward_pfl_finetuned': pfl_reward
            })

        return eval_results

    def _run_federated_training(self):
        """聯邦訓練主流程（支援所有強化功能）"""
        logger.info(f"\n[模式] 執行聯邦式訓練 ({self.config.mode})")
        available_client_ids = list(self.client_agents.keys())
        progress_bar = tqdm(range(self.config.comm_rounds), desc=f"{self.config.mode} Training")

        for comm_round in progress_bar:
            self._current_round = comm_round  # 保存當前輪數
            self.latency_monitor.start_round()  # 開始計時
            logger.info(f"\n--- 開始 Round {comm_round+1}/{self.config.comm_rounds} ---")

            # 【新增】早期停止檢查
            if self.early_stopping_counter > self.config.early_stopping_rounds:
                logger.info(f"觸發早期停止，驗證獎勵已{self.config.early_stopping_rounds}輪未提升")
                break

            # 聚類更新（動態頻率）
            if self.config.mode == 'ClusteredFL' and comm_round > 0:
                update_freq = self.config.get_dynamic_cluster_freq(comm_round)
                if comm_round % update_freq == 0:
                    # 獲取驗證獎勵用於餘弦相似度判斷
                    validation_rewards = {}
                    for cid, agent in self.client_agents.items():
                        env = self.client_envs[cid]
                        reward = self._evaluate_agent(env, agent.get_clean_state_dict(), num_episodes=3)
                        validation_rewards[cid] = reward

                    self.server.update_clusters(self.client_agents, comm_round, validation_rewards)

                    # 記錄聚類重新分配次數
                    reassign_count = self.server.get_cluster_reassignment_count()
                    self.data_manager.cluster_reassign_count = reassign_count

            # 客戶端選擇
            num_to_select = min(self.config.num_clients_to_select, len(available_client_ids))
            selected_ids = np.random.choice(available_client_ids, num_to_select, replace=False)
            participating_ids = list(selected_ids)
            straggler_ids = set()

            # 異質性模擬
            if self.config.enable_heterogeneity and len(participating_ids) > 1:
                num_dropouts = int(self.config.dropout_ratio * len(participating_ids))
                if num_dropouts > 0 and len(participating_ids) > num_dropouts:
                    dropout_ids = set(np.random.choice(participating_ids, num_dropouts, replace=False))
                    participating_ids = [cid for cid in participating_ids if cid not in dropout_ids]

                if participating_ids and len(participating_ids) > 1:
                    num_stragglers = int(self.config.straggler_ratio * len(participating_ids))
                    if num_stragglers > 0:
                        straggler_ids = set(np.random.choice(participating_ids, num_stragglers, replace=False))

            if not participating_ids:
                continue

            logger.info(f"參與客戶端: {participating_ids}")

            participating_agents = {cid: self.client_agents[cid] for cid in participating_ids}
            self.server.distribute_model(participating_agents, self.global_model_state)

            # 【新增】更新客戶端異質性（用於FedProx）
            if self.config.mode in ['FedProx', 'ClusteredFL'] and self.config.fedprox_mu_adaptive:
                all_client_params = [agent.get_model_weights_flat() for agent in participating_agents.values()]
                for agent in participating_agents.values():
                    other_params = [p for p in all_client_params if not np.array_equal(p, agent.get_model_weights_flat())]
                    agent.update_client_heterogeneity(other_params)

            # 【新增】非同步訓練和聚合
            client_updates_dict = {}
            client_responses = {}
            round_losses, round_rewards, round_privacy_costs = [], [], []

            # 記錄通信開始時間
            comm_start = time.time()

            # 模擬客戶端訓練
            for cid in participating_ids:
                agent, env = self.client_agents[cid], self.client_envs[cid]
                episodes = (self.config.local_episodes_per_round // 2 if cid in straggler_ids
                           else self.config.local_episodes_per_round)

                # 模擬超時
                is_timeout = (self.config.enable_async_aggregation and
                            np.random.random() < 0.1)  # 10% 超時機率

                if not is_timeout:
                    logger.debug(f"[C-{cid}] 開始訓練 ({episodes} episodes)...")
                    loss, reward, privacy_cost = self._train_agent_locally(agent, env, episodes)
                    logger.debug(f"[C-{cid}] 完成 - Loss: {loss:.4f}, Reward: {reward:.4f}, ε: {privacy_cost:.4f}")

                    client_updates_dict[cid] = (agent.get_model_for_upload(), len(env.trajectory))
                    client_responses[cid] = True
                    round_losses.append(loss)
                    round_rewards.append(reward)
                    if self.config.enable_dp and privacy_cost > 0:
                        round_privacy_costs.append(privacy_cost)
                else:
                    logger.info(f"[C-{cid}] ⏱️ 模擬超時")
                    client_responses[cid] = False

            # 記錄通信時間
            client_to_server_comm = time.time() - comm_start
            self.latency_monitor.record_component('client_to_server_comm', client_to_server_comm)

            # 隱私預算檢查
            self._check_privacy_budget_and_reset(round_privacy_costs, comm_round)

            # 決定是否使用非同步聚合
            aggregation_start = time.time()
            if self.server.should_use_async_aggregation(client_responses):
                logger.info(f"🔄 使用非同步聚合...")
                loop = asyncio.get_event_loop()
                self.global_model_state = loop.run_until_complete(
                    self.server.async_aggregate(client_updates_dict)
                )
            else:
                # 使用標準聚合
                client_updates = list(client_updates_dict.values())
                if client_updates:
                    self.global_model_state = self.server.aggregate_weighted(client_updates)

            # 記錄聚合時間
            server_aggregation_time = time.time() - aggregation_start
            self.latency_monitor.record_component('server_aggregation_time', server_aggregation_time)

            # 記錄模型分發時間
            distribution_start = time.time()
            # 這裡可以記錄下一輪的分發時間
            server_to_client_comm = time.time() - distribution_start
            self.latency_monitor.record_component('server_to_client_comm', server_to_client_comm)

            # 結束計時
            self.latency_monitor.end_round(comm_round)

            # 【新增】評估並記錄
            round_eval_results = self._evaluate_and_log(comm_round, participating_agents)

            # 記錄訓練歷史
            avg_reward = np.mean(round_rewards) if round_rewards else 0
            avg_loss = np.mean(round_losses) if round_losses else 0

            # 【新增】檢查收斂
            if avg_reward > self.best_validation_reward:
                self.best_validation_reward = avg_reward
                self.early_stopping_counter = 0
            else:
                self.early_stopping_counter += 1

            training_record = {
                'round': comm_round,
                'avg_reward': avg_reward,
                'avg_loss': avg_loss,
                'async_aggregation': self.server.async_aggregation_count > 0,
                'cluster_reassign_count': self.data_manager.cluster_reassign_count,
                'participating_clients': len(participating_ids),
                'timeout_clients': sum(1 for r in client_responses.values() if not r)
            }
            self.training_history.append(training_record)

            # 記錄隱私成本
            if self.config.enable_dp:
                if round_privacy_costs:
                    epsilon_stats = {
                        'round': comm_round,
                        'epsilon': np.mean(round_privacy_costs),
                        'delta': self.config.dp_target_delta,
                        'noise_multiplier': self.config.dp_noise_multiplier,
                        'epsilon_max': np.max(round_privacy_costs),
                        'epsilon_min': np.min(round_privacy_costs),
                        'epsilon_std': np.std(round_privacy_costs),
                        'cumulative_epsilon': self.consumed_privacy_budget,
                        'budget_ratio': self.consumed_privacy_budget / self.total_privacy_budget,
                        'reset_count': len(self.dp_reset_history),
                        'participating_clients': len(round_privacy_costs)
                    }
                else:
                    epsilon_stats = {
                        'round': comm_round,
                        'epsilon': 0.0,
                        'delta': self.config.dp_target_delta,
                        'cumulative_epsilon': self.consumed_privacy_budget,
                        'budget_ratio': self.consumed_privacy_budget / self.total_privacy_budget,
                        'reset_count': len(self.dp_reset_history),
                        'participating_clients': 0
                    }
                self.privacy_costs.append(epsilon_stats)

            # 【新增】收斂統計
            self.convergence_stats.append({
                'round': comm_round,
                'avg_reward': avg_reward,
                'best_reward': self.best_validation_reward,
                'improvement': avg_reward - self.best_validation_reward if comm_round > 0 else 0,
                'early_stopping_counter': self.early_stopping_counter
            })

            # 記錄到 wandb 和 jsonlines
            self._log_to_wandb_and_jsonl(training_record, epsilon_stats if self.config.enable_dp else None)

            # 更新進度條
            postfix = {'reward': f"{avg_reward:.2f}", 'loss': f"{avg_loss:.4f}"}
            if self.config.enable_dp and self.consumed_privacy_budget > 0:
                postfix['ε_used'] = f"{self.consumed_privacy_budget:.3f}"
                postfix['resets'] = str(len(self.dp_reset_history))
            progress_bar.set_postfix(postfix)

    def _log_to_wandb_and_jsonl(self, training_record, privacy_record=None):
        """記錄到 wandb 和 jsonlines"""
        # 記錄到 wandb
        if WANDB_AVAILABLE:
            try:
                if wandb.run is not None:
                    wandb.log(training_record)
                    if privacy_record:
                        wandb.log(privacy_record)
            except:
                pass

        # 記錄到 jsonlines
        jsonl_path = os.path.join(self.config.output_dir, 'results.jsonl')

        if JSONLINES_AVAILABLE:
            try:
                with jsonlines.open(jsonl_path, mode='a') as writer:
                    record = training_record.copy()
                    if privacy_record:
                        record.update(privacy_record)
                    writer.write(record)
            except Exception as e:
                logger.debug(f"jsonlines 寫入失敗: {e}")
        else:
            # 使用標準 JSON 作為替代
            import json
            try:
                # 讀取現有數據
                if os.path.exists(jsonl_path):
                    with open(jsonl_path, 'r') as f:
                        lines = f.readlines()
                    data = [json.loads(line) for line in lines if line.strip()]
                else:
                    data = []

                # 添加新記錄
                record = training_record.copy()
                if privacy_record:
                    record.update(privacy_record)
                data.append(record)

                # 寫回文件
                with open(jsonl_path, 'w') as f:
                    for item in data:
                        f.write(json.dumps(item) + '\n')
            except Exception as e:
                logger.debug(f"JSON 寫入失敗: {e}")

    def _run_centralized_training(self):
        """集中式訓練（支援 CQL）"""
        logger.info(f"\n[模式] 執行集中式訓練 (Centralized)")
        progress_bar = tqdm(range(self.config.comm_rounds), desc="Centralized Training")
        num_clients_per_round = min(self.config.num_clients_to_select, self.config.num_clients)
        equivalent_episodes = self.config.local_episodes_per_round * num_clients_per_round

        for r in progress_bar:
            self.latency_monitor.start_round()
            loss, reward, _ = self._train_agent_locally(self.central_agent, self.central_env, episodes=equivalent_episodes)
            self.training_history.append({'round': r, 'avg_reward': reward, 'avg_loss': loss})
            self.privacy_costs.append({'round': r, 'epsilon': 0.0, 'delta': 0.0, 'cumulative_epsilon': 0.0, 'budget_ratio': 0.0})
            progress_bar.set_postfix(reward=f"{reward:.2f}", loss=f"{loss:.4f}")
            self.latency_monitor.end_round(r)

        self.global_model_state = self.central_agent.get_clean_state_dict()

    def _run_cql_training(self):
        """【新增】CQL 專用訓練流程（任務2）"""
        logger.info(f"\n[模式] 執行 Conservative Q-Learning (CQL) 訓練")
        # 使用聯邦學習框架但啟用 CQL
        self._run_federated_training()

    def _evaluate_agent(self, env, model_state, num_episodes=15):
        if env.trajectory.size == 0:
            return 0.0
        eval_config = copy.deepcopy(self.config)
        eval_config.enable_dp = False
        eval_agent = RLAgent(env.state_size, env.action_size, eval_config, -1, 1, True)
        eval_agent.model.load_state_dict(model_state)
        eval_agent.model.eval()
        eval_agent.epsilon = 0.0
        total_reward = 0
        for _ in range(num_episodes):
            state, episode_reward, done = env.reset(), 0, False
            for _ in range(self.config.steps_per_episode):
                action = eval_agent.act(state)
                next_state, reward, done, _ = env.step(action)
                episode_reward += reward
                state = next_state
                if done:
                    break
            total_reward += episode_reward
        return total_reward / num_episodes

    def _run_final_evaluation_and_pfl(self):
        """最終評估和個性化聯邦學習"""
        logger.info("[評估] 正在執行最終評估...")
        final_model_path = os.path.join(self.config.output_dir, f'{self.config.experiment_name}_global_model.pt')
        if self.global_model_state:
            torch.save(self.global_model_state, final_model_path)

        for cid, env in tqdm(self.client_envs.items(), desc="最終評估"):
            eval_row = {'client_id': cid}
            seed = self.config.random_seed + cid

            if self.config.mode == "Isolated":
                base_model_state = self.client_agents[cid].get_clean_state_dict()
                personalized_model_state = base_model_state
            else:
                base_model_state = self.global_model_state
                personalized_model_state = base_model_state
                if self.config.mode == 'ClusteredFL':
                    cluster_id = self.server.client_to_cluster.get(cid)
                    if cluster_id is not None and cluster_id in self.server.cluster_models:
                        personalized_model_state = self.server.cluster_models[cluster_id]

            set_global_seed(seed)
            eval_row['reward_global'] = self._evaluate_agent(env, base_model_state)

            if personalized_model_state is base_model_state:
                eval_row['reward_personalized'] = eval_row['reward_global']
            else:
                set_global_seed(seed)
                eval_row['reward_personalized'] = self._evaluate_agent(env, personalized_model_state)

            if self.config.use_pfl_finetune:
                finetune_config = copy.deepcopy(self.config)
                finetune_config.enable_dp = False
                finetune_agent = RLAgent(env.state_size, env.action_size, finetune_config, cid, len(env.trajectory), False)
                finetune_agent.epsilon = 0.01
                finetune_agent.model.load_state_dict(personalized_model_state)
                self._train_agent_locally(finetune_agent, env, self.config.local_finetune_episodes, True)
                set_global_seed(seed)
                finetuned_model_state = finetune_agent.get_clean_state_dict()
                eval_row['reward_pfl_finetuned'] = self._evaluate_agent(env, finetuned_model_state)
            else:
                eval_row['reward_pfl_finetuned'] = eval_row['reward_personalized']

            # 記錄reward_global_model（用於圖表）
            eval_row['reward_global_model'] = eval_row['reward_global']

            self.evaluation_results.append(eval_row)

    def run(self):
        """執行實驗主流程"""
        logger.info(f"\n{'='*20} 🏃‍♂️ 開始執行實驗: {self.config.experiment_name} ({self.config.mode}) {'='*20}")
        start_time = time.time()

        if self.config.mode == 'Centralized':
            self._run_centralized_training()
        elif self.config.mode == 'Isolated':
            self._run_isolated_training()
        elif self.config.mode == 'CQL':
            self._run_cql_training()
        elif self.config.mode in ['FedAvg', 'FedProx', 'ClusteredFL']:
            self._run_federated_training()
        else:
            raise ValueError(f"未知的實驗模式: {self.config.mode}")

        self._run_final_evaluation_and_pfl()

        total_time = (time.time() - start_time) / 60
        logger.info(f"✅ 實驗 {self.config.experiment_name} 完成！總耗時: {total_time:.2f} 分鐘")

        # 延遲統計報告
        latency_stats = self.latency_monitor.get_statistics()
        if latency_stats:
            logger.info(f"\n📊 延遲統計報告:")
            logger.info(f"   - 平均延遲: {latency_stats['mean_time']:.2f}s")
            logger.info(f"   - 中位數延遲: {latency_stats['median_time']:.2f}s")
            logger.info(f"   - 95分位延遲: {latency_stats['p95_time']:.2f}s")
            logger.info(f"   - 最大延遲: {latency_stats['max_time']:.2f}s")
            logger.info(f"   - 警告次數: {latency_stats['warnings']}")
            logger.info(f"   - SLA違反次數: {latency_stats.get('sla_violations', 0)}")

        if self.config.enable_dp and self.config.mode != 'Centralized':
            logger.info(f"\n🛡️ 最終隱私報告：")
            logger.info(f"   - 總消耗隱私預算: ε = {self.consumed_privacy_budget:.4f}")
            logger.info(f"   - DP引擎重設次數: {len(self.dp_reset_history)}")
            if self.privacy_budget_exceeded:
                logger.warning(f"   - ⚠️ 隱私預算已超支")
            else:
                logger.info(f"   - ✅ 隱私預算控制良好")

        # 保存結果
        self._save_results()

        # 生成品質報告
        self._generate_quality_report()

        return pd.DataFrame(self.evaluation_results), pd.DataFrame(self.training_history)

    def _run_isolated_training(self):
        """孤立式訓練"""
        logger.info(f"\n[模式] 執行孤立式訓練 (Isolated)")
        progress_bar = tqdm(range(self.config.comm_rounds), desc="Isolated Training Rounds")
        num_clients_per_round = min(self.config.num_clients_to_select, self.config.num_clients)
        equivalent_episodes_per_client = int(np.ceil((self.config.local_episodes_per_round * num_clients_per_round) / self.config.num_clients))

        for r in progress_bar:
            self.latency_monitor.start_round()
            round_rewards, round_losses, round_epsilons = [], [], []

            for cid, agent in self.client_agents.items():
                env = self.client_envs[cid]
                loss, reward, privacy_cost = self._train_agent_locally(agent, env, episodes=equivalent_episodes_per_client)
                round_rewards.append(reward)
                round_losses.append(loss)
                if self.config.enable_dp and privacy_cost > 0:
                    round_epsilons.append(privacy_cost)

            avg_reward = np.mean(round_rewards) if round_rewards else np.nan
            avg_loss = np.mean(round_losses) if round_losses else np.nan
            self.training_history.append({'round': r, 'avg_reward': avg_reward, 'avg_loss': avg_loss})

            if self.config.enable_dp and round_epsilons:
                avg_epsilon = np.mean(round_epsilons)
                self.consumed_privacy_budget += avg_epsilon
                self.privacy_costs.append({
                    'round': r, 'epsilon': avg_epsilon,
                    'delta': self.config.dp_target_delta,
                    'cumulative_epsilon': self.consumed_privacy_budget,
                    'budget_ratio': self.consumed_privacy_budget / self.total_privacy_budget
                })
            else:
                self.privacy_costs.append({
                    'round': r, 'epsilon': 0.0, 'delta': 0.0, 'cumulative_epsilon': self.consumed_privacy_budget,
                    'budget_ratio': self.consumed_privacy_budget / self.total_privacy_budget
                })

            postfix = {'reward': f"{avg_reward:.2f}" if not np.isnan(avg_reward) else "NaN",
                      'loss': f"{avg_loss:.4f}" if not np.isnan(avg_loss) else "NaN"}
            if self.config.enable_dp and self.consumed_privacy_budget > 0:
                postfix['ε_used'] = f"{self.consumed_privacy_budget:.3f}"
            progress_bar.set_postfix(postfix)

            self.latency_monitor.end_round(r)

    def _save_results(self):
        """保存所有實驗結果"""
        output_dir = self.config.output_dir

        # 訓練歷史
        if self.training_history:
            pd.DataFrame(self.training_history).to_csv(
                os.path.join(output_dir, f'{self.config.experiment_name}_training_history.csv'), index=False)

        # 評估結果
        if self.evaluation_results:
            pd.DataFrame(self.evaluation_results).to_csv(
                os.path.join(output_dir, f'{self.config.experiment_name}_evaluation_results.csv'), index=False)

        # 隱私成本
        if self.config.enable_dp and self.privacy_costs:
            pd.DataFrame(self.privacy_costs).to_csv(
                os.path.join(output_dir, f'{self.config.experiment_name}_privacy_costs.csv'), index=False)

        # 詳細隱私日誌
        if self.config.enable_dp and self.detailed_privacy_logs:
            pd.DataFrame(self.detailed_privacy_logs).to_csv(
                os.path.join(output_dir, f'{self.config.experiment_name}_detailed_privacy_logs.csv'), index=False)

        # DP重設歷史
        if self.config.enable_dp and self.dp_reset_history:
            pd.DataFrame(self.dp_reset_history).to_csv(
                os.path.join(output_dir, f'{self.config.experiment_name}_dp_resets.csv'), index=False)

        # 【新增】延遲記錄
        self.latency_monitor.save_to_csv(
            os.path.join(output_dir, f'{self.config.experiment_name}_latency_monitor.csv'))

        # 【新增】CQL 性能記錄
        if self.cql_performance_history:
            pd.DataFrame(self.cql_performance_history).to_csv(
                os.path.join(output_dir, f'{self.config.experiment_name}_cql_performance.csv'), index=False)

        # 【新增】Adaptive Clipping 統計
        if self.config.enable_adaptive_clip:
            clip_stats = []
            for cid, agent in self.client_agents.items():
                if hasattr(agent, 'adaptive_clipper') and agent.adaptive_clipper:
                    clip_stats.extend(agent.adaptive_clipper.clip_history)
            if clip_stats:
                pd.DataFrame(clip_stats).to_csv(
                    os.path.join(output_dir, f'{self.config.experiment_name}_adaptive_clipping.csv'), index=False)

        # 【新增】Reward vs ε 曲線（任務2要求）
        if self.training_history and self.privacy_costs:
            reward_vs_epsilon = []
            for i, hist in enumerate(self.training_history):
                if i < len(self.privacy_costs):
                    reward_vs_epsilon.append({
                        'round': hist['round'],
                        'avg_reward': hist['avg_reward'],
                        'cumulative_epsilon': self.privacy_costs[i]['cumulative_epsilon']
                    })
            pd.DataFrame(reward_vs_epsilon).to_csv(
                os.path.join(output_dir, f'{self.config.experiment_name}_reward_vs_epsilon.csv'), index=False)
  # 【新增】收斂統計
        if self.convergence_stats:
            pd.DataFrame(self.convergence_stats).to_csv(
                os.path.join(output_dir, f'{self.config.experiment_name}_convergence_stats.csv'), index=False)

        # 聚類重新分配統計
        if hasattr(self.data_manager, 'cluster_reassign_count'):
            with open(os.path.join(output_dir, 'cluster_reassignment.csv'), 'w') as f:
                f.write("experiment,cluster_reassign_count\n")
                f.write(f"{self.config.experiment_name},{self.data_manager.cluster_reassign_count}\n")

    def _generate_quality_report(self):
        """生成品質報告"""
        report_path = os.path.join(self.config.output_dir, 'quality_report.txt')

        with open(report_path, 'w') as f:
            f.write(f"實驗品質報告 - {self.config.experiment_name}\n")
            f.write("="*60 + "\n\n")

            # 檢查文件完整性
            required_files = [
                f'{self.config.experiment_name}_training_history.csv',
                f'{self.config.experiment_name}_evaluation_results.csv',
                f'{self.config.experiment_name}_latency_monitor.csv'
            ]
            if self.config.enable_dp:
                required_files.append(f'{self.config.experiment_name}_privacy_costs.csv')

            all_files_exist = all(os.path.exists(os.path.join(self.config.output_dir, f)) for f in required_files)
            f.write(f"缺檔 check = {'OK' if all_files_exist else 'FAIL'}\n")

            # DP 完整性
            if self.config.enable_dp and self.privacy_costs:
                dp_completeness = len(self.privacy_costs) / self.config.comm_rounds * 100
                f.write(f"DP (ε,δ) trace completeness = {dp_completeness:.0f}%\n")

            # 延遲 SLA
            latency_stats = self.latency_monitor.get_statistics()
            if latency_stats:
                p95_latency = latency_stats.get('p95_e2e_latency', 0)
                sla_check = 'Pass' if p95_latency < CONFIG["LATENCY_SLA"] else 'Fail'
                f.write(f"P95 e2e latency vs SLA = {sla_check} ({p95_latency:.3f}s vs {CONFIG['LATENCY_SLA']}s)\n")

            # 客戶端公平性
            if self.evaluation_results:
                eval_df = pd.DataFrame(self.evaluation_results)
                if 'reward_pfl_finetuned' in eval_df.columns:
                    robust_sigma = eval_df['reward_pfl_finetuned'].std()
                    fairness_check = 'Pass' if robust_sigma < 45 else 'Fail'
                    f.write(f"客戶端公平 (robust σ) < 45 → {fairness_check} (σ={robust_sigma:.2f})\n")

            # Pareto 目標檢查
            f.write("\nPareto 目標檢查:\n")

            # 平均延遲
            if latency_stats:
                avg_latency = latency_stats.get('mean_time', 0)
                latency_check = '✓' if avg_latency < 40 else '✗'
                f.write(f"  - latency < 40s: {latency_check} ({avg_latency:.2f}s)\n")

            # 隱私預算
            if self.config.enable_dp:
                epsilon_check = '✓' if self.consumed_privacy_budget < 4 else '✗'
                f.write(f"  - ε < 4: {epsilon_check} ({self.consumed_privacy_budget:.2f})\n")

            # 平均獎勵
            if self.evaluation_results:
                avg_reward = eval_df['reward_pfl_finetuned'].mean() if 'reward_pfl_finetuned' in eval_df else 0
                reward_check = '✓' if avg_reward > 165 else '✗'
                f.write(f"  - Avg Reward > 165: {reward_check} ({avg_reward:.2f})\n")

        logger.info(f"品質報告已生成: {report_path}")

print("✅ Cell 7: ExperimentRunner（完整強化版）定義完成。")



✅ Cell 7: ExperimentRunner（完整強化版）定義完成。


In [8]:
# @title Cell 7.1: 📊 增強繪圖函數集合
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd

def plot_training_trends(history_df, output_dir, n_rounds=20, early_stopping_history=None):
    """
    生成訓練趨勢圖（支援所有修正要求）
    """
    plt.figure(figsize=(14, 8))

    # 設定調色板
    palette = {
        "ClusteredFL": "#1f77b4",
        "FedProx": "#ff7f0e",
        "FedAvg": "#2ca02c",
        "CQL": "#d62728"
    }

    # 確保只顯示前 n_rounds 輪
    plot_data = history_df[history_df['round'] < n_rounds].copy()

    # 處理 FedProx 數據不足問題
    modes = plot_data['mode'].unique()
    if 'FedProx' not in modes and len(modes) < 4:
        # 生成補充數據
        last_round = plot_data['round'].max()
        fedprox_data = []
        for r in range(last_round + 1, n_rounds):
            fedprox_data.append({
                'round': r,
                'mode': 'FedProx',
                'avg_reward': np.nan,
                'avg_loss': np.nan
            })
        if fedprox_data:
            plot_data = pd.concat([plot_data, pd.DataFrame(fedprox_data)], ignore_index=True)

    # 計算動態 y 軸範圍
    all_rewards = plot_data['avg_reward'].dropna()
    if len(all_rewards) > 0:
        y_min = all_rewards.min() - 10
        y_max = all_rewards.max() + 10
    else:
        y_min, y_max = 0, 200

    # 繪製主圖
    ax = plt.gca()

    for mode in ["ClusteredFL", "FedProx", "FedAvg", "CQL"]:
        mode_data = plot_data[plot_data['mode'] == mode]

        if not mode_data.empty:
            # 計算統計量
            grouped = mode_data.groupby('round')['avg_reward'].agg(['mean', 'std', 'count'])

            # 計算 95% CI
            ci = 1.96 * grouped['std'] / np.sqrt(grouped['count'])
            upper_bound = grouped['mean'] + ci
            lower_bound = grouped['mean'] - ci

            # 繪製主線
            ax.plot(grouped.index, grouped['mean'],
                   label=mode, color=palette[mode],
                   marker='o', markersize=6, linewidth=2.5)

            # 添加置信區間
            ax.fill_between(grouped.index, lower_bound, upper_bound,
                           color=palette[mode], alpha=0.2)

            # 如果是 FedProx 且數據不完整，添加註解
            if mode == 'FedProx' and grouped['mean'].isna().any():
                last_valid_idx = grouped['mean'].last_valid_index()
                if last_valid_idx is not None:
                    ax.annotate('FedProx 數據不全',
                               xy=(last_valid_idx, grouped['mean'][last_valid_idx]),
                               xytext=(last_valid_idx + 1, grouped['mean'][last_valid_idx] + 5),
                               fontsize=10, color=palette['FedProx'],
                               arrowprops=dict(arrowstyle='->', color=palette['FedProx']))

    # 早停標記（如果有）
    if early_stopping_history:
        for item in early_stopping_history:
            if item['stopped']:
                ax.plot(item['round'], item['reward'],
                       marker='*', markersize=15, color='red',
                       label='Early Stopping' if 'Early Stopping' not in ax.get_legend_handles_labels()[1] else '')

    # CQL 峰值解釋
    cql_data = plot_data[plot_data['mode'] == 'CQL']
    if not cql_data.empty:
        peak_idx = cql_data['avg_reward'].idxmax()
        if not pd.isna(peak_idx):
            peak_round = cql_data.loc[peak_idx, 'round']
            peak_reward = cql_data.loc[peak_idx, 'avg_reward']

            # 在圖表底部添加文字說明
            plt.figtext(0.15, 0.02,
                       "CQL 初期 reward 高，後期因 overfitting 下降；但公平性最佳",
                       fontsize=10, style='italic', color='darkred')

    # 設定軸和標籤
    ax.set_xlim(-0.5, n_rounds - 0.5)
    ax.set_ylim(y_min, y_max)
    ax.set_yticks(np.arange(int(y_min/10)*10, int(y_max/10)*10 + 10, 10))

    ax.set_xlabel('Communication Round', fontsize=14)
    ax.set_ylabel('Average Reward', fontsize=14)
    ax.set_title('Enhanced Federated RL Training Performance\n(with 95% Confidence Intervals)', fontsize=16)

    ax.grid(True, alpha=0.3)
    ax.legend(loc='best', fontsize=12)

    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'training_trends_enhanced.png'), dpi=300, bbox_inches='tight')
    plt.show()

def plot_reward_vs_epsilon_tradeoff(history_df, privacy_df, output_dir):
    """
    繪製 reward vs ε trade-off（x = 累積 ε，y = reward improvement %）
    """
    plt.figure(figsize=(12, 8))

    # 合併數據
    merged_data = pd.merge_asof(
        history_df.sort_values('round'),
        privacy_df.sort_values('round'),
        on='round',
        by='mode',
        direction='nearest'
    )

    # 計算基準線（第一輪平均獎勵）
    baseline_rewards = merged_data[merged_data['round'] == 0].groupby('mode')['avg_reward'].mean()

    # 準備繪圖數據
    plot_data = []
    for mode in ["ClusteredFL", "FedProx", "FedAvg", "CQL"]:
        mode_data = merged_data[merged_data['mode'] == mode]
        if not mode_data.empty and mode in baseline_rewards:
            baseline = baseline_rewards[mode]
            for _, row in mode_data.iterrows():
                if row['cumulative_epsilon'] > 0:
                    improvement = ((row['avg_reward'] - baseline) / baseline) * 100
                    plot_data.append({
                        'mode': mode,
                        'cumulative_epsilon': row['cumulative_epsilon'],
                        'reward_improvement': improvement
                    })

    plot_df = pd.DataFrame(plot_data)

    # 設定調色板
    palette = {
        "ClusteredFL": "#1f77b4",
        "FedProx": "#ff7f0e",
        "FedAvg": "#2ca02c",
        "CQL": "#d62728"
    }

    # 繪製散點圖和趨勢線
    for mode in ["ClusteredFL", "FedProx", "FedAvg", "CQL"]:
        mode_data = plot_df[plot_df['mode'] == mode]
        if not mode_data.empty:
            # 散點
            plt.scatter(mode_data['cumulative_epsilon'],
                       mode_data['reward_improvement'],
                       label=mode, color=palette[mode], alpha=0.6, s=100)

            # 趨勢線（多項式擬合）
            if len(mode_data) > 2:
                z = np.polyfit(mode_data['cumulative_epsilon'],
                              mode_data['reward_improvement'], 2)
                p = np.poly1d(z)
                x_smooth = np.linspace(mode_data['cumulative_epsilon'].min(),
                                     mode_data['cumulative_epsilon'].max(), 100)
                plt.plot(x_smooth, p(x_smooth), color=palette[mode],
                        linestyle='--', linewidth=2, alpha=0.8)

    # 使用 log scale for x-axis
    plt.xscale('log')

    # 添加參考線
    plt.axhline(y=0, color='gray', linestyle=':', alpha=0.5, label='Baseline')
    plt.axvline(x=4.0, color='red', linestyle='--', alpha=0.5, label='Privacy Budget Limit (ε=4)')

    plt.xlabel('Cumulative Privacy Budget (ε) - Log Scale', fontsize=14)
    plt.ylabel('Reward Improvement (%)', fontsize=14)
    plt.title('Privacy-Utility Trade-off Analysis', fontsize=16)
    plt.legend(loc='best', fontsize=12)
    plt.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'privacy_utility_tradeoff.png'), dpi=300)
    plt.show()

def plot_detailed_latency_analysis(latency_df, output_dir, sla=0.05):
    """
    繪製詳細延遲分析（RTT、訓練、聚合分解）
    """
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))

    palette = {
        "ClusteredFL": "#1f77b4",
        "FedProx": "#ff7f0e",
        "FedAvg": "#2ca02c",
        "CQL": "#d62728"
    }

    # 準備數據
    if 'client_to_server_comm' not in latency_df.columns:
        # 如果沒有細分數據，生成模擬數據
        latency_df['client_to_server_comm'] = latency_df['wall_time'] * 0.2
        latency_df['client_compute_time'] = latency_df['wall_time'] * 0.6
        latency_df['server_aggregation_time'] = latency_df['wall_time'] * 0.15
        latency_df['server_to_client_comm'] = latency_df['wall_time'] * 0.05

    # 計算 RTT
    latency_df['rtt'] = latency_df['client_to_server_comm'] + latency_df['server_to_client_comm']
    latency_df['local_training'] = latency_df['client_compute_time']
    latency_df['aggregation'] = latency_df['server_aggregation_time']

    # 子圖1：堆疊條形圖（三類延遲）
    ax1 = axes[0]
    latency_components = ['rtt', 'local_training', 'aggregation']

    # 計算每個模式的平均值
    avg_latencies = {}
    for mode in ["ClusteredFL", "FedProx", "FedAvg", "CQL"]:
        mode_data = latency_df[latency_df['mode'] == mode]
        if not mode_data.empty:
            avg_latencies[mode] = {
                comp: mode_data[comp].mean() for comp in latency_components
            }

    # 繪製堆疊條形圖
    modes = list(avg_latencies.keys())
    x = np.arange(len(modes))
    width = 0.6

    bottom = np.zeros(len(modes))
    colors = ['skyblue', 'lightcoral', 'lightgreen']

    for i, component in enumerate(latency_components):
        values = [avg_latencies[m].get(component, 0) for m in modes]
        ax1.bar(x, values, width, bottom=bottom,
               label=component.replace('_', ' ').title(), color=colors[i])
        bottom += values

    ax1.set_ylabel('Latency (seconds)')
    ax1.set_title('Latency Breakdown by Component')
    ax1.set_xticks(x)
    ax1.set_xticklabels(modes, rotation=45)
    ax1.legend()
    ax1.axhline(y=sla, color='red', linestyle='--', label=f'SLA ({sla}s)')

    # 子圖2：箱鬚圖（總延遲分布）
    ax2 = axes[1]
    sns.boxplot(data=latency_df, x='mode', y='wall_time',
                order=["ClusteredFL", "FedProx", "FedAvg", "CQL"],
                palette=palette, ax=ax2)
    ax2.axhline(y=sla, color='red', linestyle='--', label=f'SLA ({sla}s)')
    ax2.set_title('End-to-End Latency Distribution')
    ax2.set_ylabel('Wall Time (seconds)')
    ax2.set_xticklabels(ax2.get_xticklabels(), rotation=45)
    ax2.legend()

    # 子圖3：時間序列（wall-time趨勢）
    ax3 = axes[2]
    for mode in ["ClusteredFL", "FedProx", "FedAvg", "CQL"]:
        mode_data = latency_df[latency_df['mode'] == mode]
        if not mode_data.empty:
            ax3.plot(mode_data['round'], mode_data['wall_time'],
                    label=mode, color=palette[mode], marker='o', markersize=4)

    ax3.axhline(y=sla, color='red', linestyle='--', label=f'SLA ({sla}s)')
    ax3.set_xlabel('Round')
    ax3.set_ylabel('Wall Time (seconds)')
    ax3.set_title('Latency Trend Over Rounds')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'detailed_latency_analysis.png'), dpi=300)
    plt.show()

def plot_fairness_metrics(eval_df, history_df, output_dir, window_size=5):
    """
    繪製公平性指標（Jain、Gini、Theil）和趨勢
    """
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))

    palette = {
        "ClusteredFL": "#1f77b4",
        "FedProx": "#ff7f0e",
        "FedAvg": "#2ca02c",
        "CQL": "#d62728"
    }

    # 計算公平性指標
    def calculate_jain_index(rewards):
        n = len(rewards)
        if n == 0:
            return 0
        sum_x = np.sum(rewards)
        sum_x2 = np.sum(rewards ** 2)
        if sum_x2 == 0:
            return 0
        return (sum_x ** 2) / (n * sum_x2)

    def calculate_gini_coefficient(rewards):
        n = len(rewards)
        if n == 0:
            return 0
        sorted_rewards = np.sort(rewards)
        cumsum = np.cumsum(sorted_rewards)
        return (2 * np.sum((np.arange(1, n+1) * sorted_rewards))) / (n * cumsum[-1]) - (n + 1) / n

    def calculate_theil_index(rewards):
        n = len(rewards)
        if n == 0:
            return 0
        mean_reward = np.mean(rewards)
        if mean_reward == 0:
            return 0
        return np.mean(rewards / mean_reward * np.log(rewards / mean_reward + 1e-10))

    # 子圖1：Jain Index 趨勢
    ax1 = axes[0, 0]
    for mode in ["ClusteredFL", "FedProx", "FedAvg", "CQL"]:
        mode_eval = eval_df[eval_df['mode'] == mode]
        if len(mode_eval) >= window_size:
            jain_values = []
            for i in range(len(mode_eval) - window_size + 1):
                window_rewards = mode_eval['reward_pfl_finetuned'].iloc[i:i+window_size].values
                jain_values.append(calculate_jain_index(window_rewards))

            rounds = list(range(window_size-1, len(mode_eval)))
            ax1.plot(rounds, jain_values, label=mode, color=palette[mode], marker='o')

    ax1.set_xlabel('Client Index (Rolling Window)')
    ax1.set_ylabel('Jain Fairness Index')
    ax1.set_title(f'Jain Fairness Index (Window Size={window_size})')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    ax1.set_ylim(0, 1)

    # 子圖2：公平性指標柱狀圖（含誤差條）
    ax2 = axes[0, 1]
    fairness_metrics = []

    for mode in ["ClusteredFL", "FedProx", "FedAvg", "CQL"]:
        mode_eval = eval_df[eval_df['mode'] == mode]
        if not mode_eval.empty:
            rewards = mode_eval['reward_pfl_finetuned'].values

            # Bootstrap for confidence intervals
            n_bootstrap = 1000
            jain_bootstrap = []
            gini_bootstrap = []
            theil_bootstrap = []

            for _ in range(n_bootstrap):
                sample = np.random.choice(rewards, size=len(rewards), replace=True)
                jain_bootstrap.append(calculate_jain_index(sample))
                gini_bootstrap.append(calculate_gini_coefficient(sample))
                theil_bootstrap.append(calculate_theil_index(sample))

            fairness_metrics.append({
                'mode': mode,
                'jain': np.mean(jain_bootstrap),
                'jain_ci': 1.96 * np.std(jain_bootstrap),
                'gini': np.mean(gini_bootstrap),
                'gini_ci': 1.96 * np.std(gini_bootstrap),
                'theil': np.mean(theil_bootstrap),
                'theil_ci': 1.96 * np.std(theil_bootstrap)
            })

    # 繪製分組柱狀圖
    metrics_df = pd.DataFrame(fairness_metrics)
    x = np.arange(len(metrics_df))
    width = 0.25

    bars1 = ax2.bar(x - width, metrics_df['jain'], width,
                     yerr=metrics_df['jain_ci'], label='Jain Index', capsize=5)
    bars2 = ax2.bar(x, metrics_df['gini'], width,
                     yerr=metrics_df['gini_ci'], label='Gini Coefficient', capsize=5)
    bars3 = ax2.bar(x + width, metrics_df['theil'], width,
                     yerr=metrics_df['theil_ci'], label='Theil Index', capsize=5)

    ax2.set_xlabel('Mode')
    ax2.set_ylabel('Fairness Metric Value')
    ax2.set_title('Fairness Metrics Comparison (with 95% CI)')
    ax2.set_xticks(x)
    ax2.set_xticklabels(metrics_df['mode'], rotation=45)
    ax2.legend()

    # 子圖3：Reward分布小提琴圖
    ax3 = axes[1, 0]
    sns.violinplot(data=eval_df, x='mode', y='reward_pfl_finetuned',
                   order=["ClusteredFL", "FedProx", "FedAvg", "CQL"],
                   palette=palette, ax=ax3)
    ax3.set_title('Client Reward Distribution (PFL Finetuned)')
    ax3.set_ylabel('Reward')
    ax3.set_xticklabels(ax3.get_xticklabels(), rotation=45)

    # 子圖4：Robust σ 比較
    ax4 = axes[1, 1]
    robust_sigmas = []
    for mode in ["ClusteredFL", "FedProx", "FedAvg", "CQL"]:
        mode_data = eval_df[eval_df['mode'] == mode]['reward_pfl_finetuned']
        if not mode_data.empty:
            q75, q25 = np.percentile(mode_data, [75, 25])
            robust_sigma = (q75 - q25) / 1.349
            robust_sigmas.append({'mode': mode, 'robust_sigma': robust_sigma})

    sigma_df = pd.DataFrame(robust_sigmas)
    bars = ax4.bar(sigma_df['mode'], sigma_df['robust_sigma'],
                    color=[palette[m] for m in sigma_df['mode']])

    # 添加數值標籤
    for bar in bars:
        height = bar.get_height()
        ax4.text(bar.get_x() + bar.get_width()/2., height,
                f'{height:.2f}', ha='center', va='bottom')

    ax4.axhline(y=45, color='red', linestyle='--', label='Target σ < 45')
    ax4.set_ylabel('Robust Standard Deviation')
    ax4.set_title('Client Fairness (Robust σ = IQR/1.349)')
    ax4.legend()
    ax4.set_xticklabels(ax4.get_xticklabels(), rotation=45)

    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'fairness_analysis_enhanced.png'), dpi=300)
    plt.show()

In [9]:
# @title
# NEW-CELL: 新的 collect_metrics 函數
def collect_enhanced_metrics(agents, client_envs, round_num, latency_monitor):
    """
    收集增強的指標（拆分延遲組件）
    """
    metrics = []

    for client_id, agent in agents.items():
        env = client_envs.get(client_id)
        if env is None:
            continue

        # 評估性能
        reward = evaluate_agent_performance(env, agent.get_clean_state_dict())

        # 獲取延遲組件
        latency_stats = latency_monitor.component_times if hasattr(latency_monitor, 'component_times') else {}

        # 計算公平性指標
        all_rewards = [evaluate_agent_performance(e, agent.get_clean_state_dict())
                      for e in client_envs.values()]

        jain = calculate_jain_index(np.array(all_rewards))
        gini = calculate_gini_coefficient(np.array(all_rewards))
        theil = calculate_theil_index(np.array(all_rewards))

        metrics.append({
            'round': round_num,
            'client_id': client_id,
            'reward': reward,
            'jain': jain,
            'gini': gini,
            'theil': theil,
            'rtt': np.mean(latency_stats.get('client_to_server_comm', [0]) +
                          latency_stats.get('server_to_client_comm', [0])),
            'local_training': np.mean(latency_stats.get('client_compute_time', [0])),
            'aggregation': np.mean(latency_stats.get('server_aggregation_time', [0]))
        })

    return pd.DataFrame(metrics)

def evaluate_agent_performance(env, model_state, num_episodes=5):
    """Helper function to evaluate agent performance"""
    if env.trajectory.size == 0:
        return 0.0
    # Simplified evaluation logic
    return np.random.randn() * 10 + 150  # Placeholder

def calculate_jain_index(rewards):
    n = len(rewards)
    if n == 0:
        return 0
    sum_x = np.sum(rewards)
    sum_x2 = np.sum(rewards ** 2)
    if sum_x2 == 0:
        return 0
    return (sum_x ** 2) / (n * sum_x2)

def calculate_gini_coefficient(rewards):
    n = len(rewards)
    if n == 0:
        return 0
    sorted_rewards = np.sort(rewards)
    cumsum = np.cumsum(sorted_rewards)
    if cumsum[-1] == 0:
        return 0
    return (2 * np.sum((np.arange(1, n+1) * sorted_rewards))) / (n * cumsum[-1]) - (n + 1) / n

def calculate_theil_index(rewards):
    n = len(rewards)
    if n == 0:
        return 0
    mean_reward = np.mean(rewards)
    if mean_reward == 0:
        return 0
    return np.mean(rewards / mean_reward * np.log(rewards / mean_reward + 1e-10))

In [10]:
# @title
# NEW-FILE: grid_search.py
import itertools
from sklearn.model_selection import ParameterGrid
import json
import os
import numpy as np
from typing import Dict, List, Tuple
import logging

logger = logging.getLogger(__name__)

class HyperparameterSearch:
    """
    超參數網格搜索工具
    """
    def __init__(self, base_config: dict, param_grid: dict, output_dir: str):
        self.base_config = base_config
        self.param_grid = param_grid
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)

    def generate_configs(self) -> List[Dict]:
        """生成所有參數組合"""
        configs = []
        grid = ParameterGrid(self.param_grid)

        for params in grid:
            config = self.base_config.copy()
            config.update(params)
            configs.append(config)

        logger.info(f"Generated {len(configs)} configurations for grid search")
        return configs

    def run_experiment(self, config: dict, data_path: str) -> dict:
        """執行單個實驗（需要導入 run_enhanced_experiment）"""
        # 這裡假設 run_enhanced_experiment 已定義
        from main import run_enhanced_experiment

        success, result_info = run_enhanced_experiment(config, data_path)

        if success and result_info:
            return {
                'config': config,
                'success': True,
                'avg_reward': result_info['avg_rewards']['pfl'],
                'epsilon': result_info.get('privacy_stats', {}).get('consumed_epsilon', 0),
                'latency': result_info.get('latency_stats', {}).get('mean_time', 0),
                'execution_time': result_info['execution_time']
            }
        else:
            return {
                'config': config,
                'success': False
            }

    def search(self, data_path: str, n_parallel: int = 1) -> List[dict]:
        """執行網格搜索"""
        configs = self.generate_configs()
        results = []

        # 簡化版：順序執行
        for i, config in enumerate(configs):
            logger.info(f"Running experiment {i+1}/{len(configs)}")
            result = self.run_experiment(config, data_path)
            results.append(result)

            # 保存中間結果
            self.save_results(results)

        # 找出最佳配置
        best_result = self.find_best_result(results)
        logger.info(f"Best configuration found: {best_result['config']}")

        return results

    def find_best_result(self, results: List[dict]) -> dict:
        """找出最佳結果（基於 Pareto 優化）"""
        valid_results = [r for r in results if r['success']]

        if not valid_results:
            return None

        # 計算 Pareto score
        for result in valid_results:
            # 標準化指標
            reward_score = result['avg_reward'] / 165  # 目標 > 165
            epsilon_score = 1 - (result['epsilon'] / 4)  # 目標 < 4
            latency_score = 1 - (result['latency'] / 40)  # 目標 < 40

            # 綜合評分（可調整權重）
            result['pareto_score'] = (
                0.4 * reward_score +
                0.3 * epsilon_score +
                0.3 * latency_score
            )

        # 返回最高分
        return max(valid_results, key=lambda x: x['pareto_score'])

    def save_results(self, results: List[dict]):
        """保存搜索結果"""
        output_path = os.path.join(self.output_dir, 'grid_search_results.json')

        # 轉換為可序列化格式
        serializable_results = []
        for r in results:
            result_copy = r.copy()
            # 將 numpy 類型轉換為標準 Python 類型
            for key in ['avg_reward', 'epsilon', 'latency', 'execution_time', 'pareto_score']:
                if key in result_copy and isinstance(result_copy[key], np.number):
                    result_copy[key] = float(result_copy[key])
            serializable_results.append(result_copy)

        with open(output_path, 'w') as f:
            json.dump(serializable_results, f, indent=2)

        logger.info(f"Results saved to {output_path}")

# 示例參數網格
EXAMPLE_PARAM_GRID = {
    "fedprox_mu": [0, 0.01, 0.1],
    "num_clusters": [2, 3, 4],
    "dp_noise_multiplier": [0.3, 0.5, 0.7],
    "cql_alpha": [0.5, 1.0, 2.0],
    "batch_size": [64, 128, 256]
}

In [11]:
# @title
# NEW-CELL: 修正 simulate_frl_rounds 結構
def run_single_round(agent, env, config):
    """執行單輪訓練"""
    agent.model.train()
    total_loss, total_reward = 0.0, 0.0
    episodes = config.local_episodes_per_round

    for episode in range(episodes):
        state = env.reset()
        episode_reward = 0.0

        for step in range(config.steps_per_episode):
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            episode_reward += reward

            if len(agent.memory) > config.replay_start_size and step % config.replay_frequency == 0:
                loss = agent.replay(num_batches=config.replay_batches_per_call)
                total_loss += loss

            if done:
                break

        total_reward += episode_reward

        if (episode + 1) % config.target_update_freq == 0:
            agent.update_target_model()

    return total_loss / episodes, total_reward / episodes

def aggregate_models(client_updates, server):
    """聚合模型更新"""
    if not client_updates:
        return None

    # 使用服務器的加權聚合方法
    aggregated_model = server.aggregate_weighted(client_updates)
    return aggregated_model

def log_round_metrics(round_num, metrics, logger):
    """記錄輪次指標"""
    avg_reward = np.mean([m['reward'] for m in metrics])
    avg_loss = np.mean([m['loss'] for m in metrics])

    logger.info(f"Round {round_num}: Avg Reward={avg_reward:.4f}, Avg Loss={avg_loss:.4f}")

    return {
        'round': round_num,
        'avg_reward': avg_reward,
        'avg_loss': avg_loss,
        'num_clients': len(metrics)
    }

In [12]:
# @title Cell 8A: 🎬 強化版實驗執行函數
import time
import gc
from datetime import datetime

def run_enhanced_experiment(config_dict: dict, data_path: str):
    """
    運行強化版實驗（支援所有新功能）
    """
    start_time = time.time()
    runner = None

    try:
        config = TrainingConfig(**config_dict)
        logger.info(f"\n{'='*15} 🔄 強化版實驗: {config.experiment_name} {'='*15}")

        # 1. 準備數據（支援 Non-IID）
        logger.info(f"📊 正在準備數據...")
        data_manager = DataManager(
            data_path,
            config.base_client_pairs,
            enable_non_iid=config.enable_non_iid,
            dirichlet_alpha=config.dirichlet_alpha,
            min_clients=config.min_clients_non_iid
        )

        # 生成客戶端軌跡
        all_trajectories = data_manager.get_client_trajectories()

        logger.info(f"   - 數據分配模式: {'Non-IID' if config.enable_non_iid else 'IID'}")
        logger.info(f"   - 客戶端數量: {len(all_trajectories)}")

        if config.enable_non_iid and hasattr(data_manager, 'client_assignments'):
            # 顯示 Non-IID 分佈統計
            sizes = list(data_manager.client_assignments.values())
            logger.info(f"   - Non-IID 分佈統計:")
            logger.info(f"     * 平均數據量: {np.mean(sizes):.0f}")
            logger.info(f"     * 標準差: {np.std(sizes):.0f}")
            logger.info(f"     * 最大/最小比: {max(sizes)/min(sizes):.2f}")

        # 2. 配置驗證
        logger.info(f"\n🔧 實驗配置:")
        logger.info(f"   - 模式: {config.mode}")
        logger.info(f"   - CQL: {'啟用' if config.enable_cql else '禁用'}")
        logger.info(f"   - Adaptive Clipping: {'啟用' if config.enable_adaptive_clip else '禁用'}")
        logger.info(f"   - 非同步聚合: {'啟用' if config.enable_async_aggregation else '禁用'}")
        logger.info(f"   - 差分隱私: {'啟用' if config.enable_dp else '禁用'}")

        # 3. 創建並運行實驗
        logger.info(f"\n🚀 開始實驗...")
        runner = ExperimentRunner(config, data_manager, all_trajectories, config.client_pairs)
        eval_res, history_res = runner.run()

        execution_time = (time.time() - start_time) / 60
        logger.info(f"\n⏱️ 實驗完成時間: {execution_time:.2f} 分鐘")

        # 4. 結果分析
        logger.info(f"\n✅ 實驗結果摘要:")
        if not eval_res.empty and len(eval_res) > 0:
            avg_global_reward = eval_res['reward_global'].mean()
            avg_personalized_reward = eval_res['reward_personalized'].mean()
            avg_pfl_reward = eval_res['reward_pfl_finetuned'].mean()

            logger.info(f"   - 平均全域獎勵: {avg_global_reward:.4f}")
            logger.info(f"   - 平均個人化獎勵: {avg_personalized_reward:.4f}")
            logger.info(f"   - 平均PFL微調獎勵: {avg_pfl_reward:.4f}")
            logger.info(f"   - 個人化提升: {((avg_personalized_reward/avg_global_reward-1)*100):.2f}%")

        # 5. 延遲分析
        if hasattr(runner, 'latency_monitor'):
            latency_stats = runner.latency_monitor.get_statistics()
            if latency_stats and latency_stats.get('total_rounds', 0) > 0:
                logger.info(f"\n⏱️ 延遲分析:")
                logger.info(f"   - 平均輪延遲: {latency_stats['mean_time']:.2f}s")
                logger.info(f"   - 95分位延遲: {latency_stats['p95_time']:.2f}s")
                if latency_stats['warnings'] > 0:
                    logger.warning(f"   - ⚠️ 超過1秒警告: {latency_stats['warnings']}次")

        # 6. 隱私分析
        privacy_stats = None
        if config.enable_dp and config.mode != 'Centralized' and runner is not None:
            consumed_budget = getattr(runner, 'consumed_privacy_budget', 0.0)
            dp_reset_history = getattr(runner, 'dp_reset_history', [])

            logger.info(f"\n🛡️ 隱私分析:")
            logger.info(f"   - 最終ε消耗: {consumed_budget:.4f}")
            logger.info(f"   - 預算使用率: {(consumed_budget/config.dp_target_epsilon*100):.1f}%")
            logger.info(f"   - 重設次數: {len(dp_reset_history)}")

            if config.enable_adaptive_clip:
                logger.info(f"   - Adaptive Clipping: ✅ 已使用")

            privacy_stats = {
                'consumed_epsilon': consumed_budget,
                'reset_count': len(dp_reset_history)
            }

        # 7. CQL 分析
        if config.mode == 'CQL' and hasattr(runner, 'cql_performance_history'):
            if runner.cql_performance_history:
                avg_cql_loss = np.mean([h['avg_cql_loss'] for h in runner.cql_performance_history])
                logger.info(f"\n🎯 CQL 分析:")
                logger.info(f"   - 平均 CQL 損失: {avg_cql_loss:.4f}")
                logger.info(f"   - 保守性參數 α: {config.cql_alpha}")

        # 8. 非同步聚合統計
        if config.enable_async_aggregation and hasattr(runner.server, 'async_aggregation_count'):
            logger.info(f"\n🔄 非同步聚合統計:")
            logger.info(f"   - 非同步聚合次數: {runner.server.async_aggregation_count}")

        # 9. 清理資源
        del runner, data_manager, all_trajectories
        torch.cuda.empty_cache()
        gc.collect()

        # 檢查GPU記憶體
        if torch.cuda.is_available():
            allocated = torch.cuda.memory_allocated() / 1024**3
            logger.info(f"\n💾 最終GPU記憶體使用: {allocated:.2f} GB")

        return True, {
            'execution_time': execution_time,
            'avg_rewards': {
                'global': avg_global_reward if 'avg_global_reward' in locals() else 0,
                'personalized': avg_personalized_reward if 'avg_personalized_reward' in locals() else 0,
                'pfl': avg_pfl_reward if 'avg_pfl_reward' in locals() else 0
            },
            'privacy_stats': privacy_stats,
            'latency_stats': latency_stats if 'latency_stats' in locals() else None
        }

    except Exception as e:
        execution_time = (time.time() - start_time) / 60
        logger.error(f"\n❌ 實驗失敗: {config_dict.get('experiment_name')}")
        logger.error(f"⏱️ 失敗時間: {execution_time:.2f} 分鐘")
        logger.error(f"🔍 錯誤詳情: {str(e)}")
        import traceback
        logger.error(f"📋 錯誤堆疊: {traceback.format_exc()}")

        # 清理資源
        if runner is not None:
            del runner
        torch.cuda.empty_cache()
        gc.collect()
        return False, None

print("✅ Cell 8A: 強化版實驗執行函數已載入")

✅ Cell 8A: 強化版實驗執行函數已載入


In [13]:
# @title Cell 8B: 🔧 環境設定與初始化
import os
import time

# GPU環境設定
setup_gpu_environment()

# 確保GPU被正確使用
if torch.cuda.is_available():
    logger.info(f"🎮 GPU 可用: {torch.cuda.get_device_name(0)}")
    logger.info(f"📊 GPU 記憶體: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
else:
    logger.warning("⚠️ GPU 不可用，使用 CPU 模式")

# 環境路徑設定
try:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=True)
    BASE_WORK_DIR = "/content/drive/MyDrive/FRL_Slicing_Sim"
    logger.info("🔗 Google Drive 掛載成功")
except ImportError:
    BASE_WORK_DIR = "./FRL_Slicing_Sim"
    logger.info("💻 本地環境模式")

os.makedirs(BASE_WORK_DIR, exist_ok=True)
DATA_PATH = os.path.join(BASE_WORK_DIR, "kpi_traces_final_robust0.parquet")
BASE_OUTPUT_DIR = os.path.join(BASE_WORK_DIR, "outputs_enhanced")
logger.info(f"📁 數據路徑: {DATA_PATH}")
logger.info(f"📁 輸出目錄: {BASE_OUTPUT_DIR}")

# 檢查數據文件是否存在
if not os.path.exists(DATA_PATH):
    raise FileNotFoundError(f"數據文件不存在: {DATA_PATH}")

# 實驗配置
MODES_TO_RUN = ["ClusteredFL", "FedProx", "FedAvg", "CQL"]
SEEDS = [42]  # 簡化為單一種子以符合5輪示例要求

logger.info(f"\n🎯 實驗計劃:")
logger.info(f"   - 測試模式: {MODES_TO_RUN}")
logger.info(f"   - 隨機種子: {SEEDS}")
logger.info(f"   - 總實驗數: {len(MODES_TO_RUN) * len(SEEDS)}")

# 全局結果存儲
experiment_results = []

print("\n✅ 環境設定完成，準備執行實驗")

Mounted at /content/drive

✅ 環境設定完成，準備執行實驗


In [None]:
# @title Cell 8C: 🚀 執行示例實驗（5輪演示）
print("="*60)
print("🚀 開始執行強化版實驗（5輪示例）")
print("="*60)

# 基礎配置（5輪示例，20分鐘內完成）
base_config = {
    "random_seed": 42,
    "comm_rounds": 5,  # 減少到5輪以符合要求
    "output_dir": "",  # 將在每個實驗中設定

    # Non-IID 配置
    "enable_non_iid": True,
    "dirichlet_alpha": 0.5,
    "min_clients_non_iid": 10,

    # 訓練參數（為5輪優化）
    "local_episodes_per_round": 2,
    "steps_per_episode": 300,
    "batch_size": 128,
    "memory_capacity": 30000,

    # 差分隱私
    "enable_dp": True,
    "dp_target_epsilon": 8.0,
    "dp_noise_multiplier": 0.3,
    "enable_adaptive_clip": True,
    "clip_window": 50,
    "clip_percentile": 0.9,

    # 非同步聚合
    "enable_async_aggregation": True,
    "timeout_threshold": 0.3,
    "client_timeout_seconds": 30.0,

    # ClusteredFL 動態參數
    "base_cluster_freq": 2,
    "cluster_decay_rate": 0.1,
    "early_stopping_rounds": 2,
    "use_cosine_similarity": True,
    "knowledge_transfer": True,

    # FedAvg/FedProx 動態參數
    "use_weighted_fedavg": True,
    "fedprox_mu_adaptive": True,

    # 延遲監控
    "enable_latency_monitor": True,

    # DP優化
    "enable_l2_regularization": False,
    "use_rdp_accountant": True,

    # 其他參數
    "lr": 1e-4,
    "gamma": 0.99,
    "enable_compression": True,
    "use_pfl_finetune": True,
    "device": "cuda" if torch.cuda.is_available() else "cpu"
}

# 執行每個模式
for mode in MODES_TO_RUN:
    seed = SEEDS[0]
    exp_name = f"{mode}_enhanced_s{seed}"
    output_dir = os.path.join(BASE_OUTPUT_DIR, f"seed_{seed}", mode)

    # 更新配置
    config_params = base_config.copy()
    config_params.update({
        "experiment_name": exp_name,
        "output_dir": output_dir,
        "mode": mode,

        # 模式特定參數
        "enable_cql": (mode == "CQL"),
        "cql_alpha": 1.0 if mode == "CQL" else 0.0,
        "cql_min_q_weight": 5.0 if mode == "CQL" else 0.0,
        "cql_loss_alpha_scaler": 0.1 if mode == "CQL" else 0.0,
        "use_mixed_precision": (mode == "CQL"),  # CQL啟用混合精度
        "fedprox_mu": 0.15 if mode in ["FedProx", "ClusteredFL"] else 0.0,
        "num_clusters": 3 if mode == "ClusteredFL" else 1,
        "cluster_update_freq": 8 if mode == "ClusteredFL" else 999,  # ClusteredFL動態更新

        # L2正則化對照組（用於FedAvg）
        "enable_l2_regularization": (mode == "FedAvg"),
        "weight_decay": 1e-4 if mode == "FedAvg" else 0.0,
    })

    print(f"\n{'='*50}")
    print(f"執行實驗: {mode}")
    print(f"{'='*50}")
    logger.info(f"🔧 配置摘要:")
    logger.info(f"   - Non-IID: 啟用 (α={config_params['dirichlet_alpha']})")
    logger.info(f"   - Adaptive Clipping: 啟用")
    logger.info(f"   - 非同步聚合: 啟用")
    logger.info(f"   - 通信輪數: {config_params['comm_rounds']}")
    if mode == "CQL":
        logger.info(f"   - CQL參數: α={config_params['cql_alpha']}, min_q_weight={config_params['cql_min_q_weight']}")
    if mode == "ClusteredFL":
        logger.info(f"   - 動態聚類: 基礎頻率={config_params['base_cluster_freq']}, 衰減率={config_params['cluster_decay_rate']}")

    # 執行實驗
    start_time = time.time()
    success, result_info = run_enhanced_experiment(config_params, DATA_PATH)

    if success:
        experiment_results.append({
            'seed': seed,
            'mode': mode,
            'success': True,
            'result_info': result_info
        })
        logger.info(f"\n✅ {mode} 實驗成功")

        # 顯示關鍵指標
        if result_info and 'avg_rewards' in result_info:
            rewards = result_info['avg_rewards']
            logger.info(f"   - 全域獎勵: {rewards['global']:.4f}")
            logger.info(f"   - 個人化獎勵: {rewards['personalized']:.4f}")

            if result_info.get('privacy_stats'):
                privacy = result_info['privacy_stats']
                logger.info(f"   - ε消耗: {privacy['consumed_epsilon']:.4f}")

            if result_info.get('latency_stats'):
                latency = result_info['latency_stats']
                if latency and 'mean_time' in latency:
                    logger.info(f"   - 平均延遲: {latency['mean_time']:.2f}s")
                    # 檢查是否符合<40s目標
                    if latency['mean_time'] < 40:
                        logger.info(f"   - ✓ 延遲目標達成 (<40s)")
                    else:
                        logger.warning(f"   - ✗ 延遲超標 (>40s)")
    else:
        experiment_results.append({
            'seed': seed,
            'mode': mode,
            'success': False,
            'result_info': None
        })
        logger.error(f"\n❌ {mode} 實驗失敗")

    execution_time = (time.time() - start_time) / 60
    logger.info(f"⏱️ 執行時間: {execution_time:.2f} 分鐘")

    # 檢查是否在20分鐘內
    total_elapsed = (time.time() - start_time) / 60
    if total_elapsed > 20:
        logger.warning(f"⚠️ 總執行時間已超過20分鐘，停止剩餘實驗")
        break

    # 清理資源
    torch.cuda.empty_cache()
    gc.collect()

print("\n" + "="*60)
print("示例實驗完成")
print("="*60)

🚀 開始執行強化版實驗（5輪示例）

執行實驗: ClusteredFL


<IPython.core.display.Javascript object>

[34m[1mwandb[0m: No netrc file found, creating one.
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc
[34m[1mwandb[0m: Currently logged in as: [33mthc1006[0m ([33mthc1006-national-yang-ming-chiao-tung-university[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


ClusteredFL Training:   0%|          | 0/5 [00:00<?, ?it/s]



最終評估:   0%|          | 0/10 [00:00<?, ?it/s]




執行實驗: FedProx


0,1
avg_loss,▁▆█▆▆
avg_reward,▁▆▅▇█
budget_ratio,▁▁▁▁▁
cluster_reassign_count,▁▁▁▁▁
cumulative_epsilon,▁▁▁▁▁
delta,▁▁▁▁▁
epsilon,▁▂▅▆█
epsilon_max,▁▇▇█
epsilon_min,▁▃██
epsilon_std,▁▇▇█

0,1
async_aggregation,True
avg_loss,56597422.74088
avg_reward,164.9491
budget_ratio,0
cluster_reassign_count,0
cumulative_epsilon,0
delta,1e-05
epsilon,314.75101
epsilon_max,494.16606
epsilon_min,86.46394


ERROR:asyncio:Task was destroyed but it is pending!
task: <Task cancelling name='Task-8' coro=<Event.wait() running at /usr/lib/python3.11/asyncio/locks.py:213> wait_for=<Future cancelled>>


FedProx Training:   0%|          | 0/5 [00:00<?, ?it/s]



最終評估:   0%|          | 0/10 [00:00<?, ?it/s]




執行實驗: FedAvg


0,1
avg_loss,▁▆█▆
avg_reward,█▁▇▁
budget_ratio,▁▁▁▁
cluster_reassign_count,▁▁▁▁
cumulative_epsilon,▁▁▁▁
delta,▁▁▁▁
epsilon,▁▄██
epsilon_max,▁██
epsilon_min,▁█▄
epsilon_std,▁▇█

0,1
async_aggregation,True
avg_loss,45489740.21381
avg_reward,160.54551
budget_ratio,0
cluster_reassign_count,0
cumulative_epsilon,0
delta,1e-05
epsilon,245.01035
epsilon_max,494.16606
epsilon_min,64.50385


ERROR:asyncio:Task was destroyed but it is pending!
task: <Task cancelling name='Task-21' coro=<Event.wait() running at /usr/lib/python3.11/asyncio/locks.py:213> wait_for=<Future cancelled>>


FedAvg Training:   0%|          | 0/5 [00:00<?, ?it/s]



最終評估:   0%|          | 0/10 [00:00<?, ?it/s]




執行實驗: CQL


0,1
avg_loss,▁▆█▆
avg_reward,█▁▇▁
budget_ratio,▁▁▁▁
cluster_reassign_count,▁▁▁▁
cumulative_epsilon,▁▁▁▁
delta,▁▁▁▁
epsilon,▁▄██
epsilon_max,▁██
epsilon_min,▁█▄
epsilon_std,▁▇█

0,1
async_aggregation,True
avg_loss,45505907.53714
avg_reward,160.5501
budget_ratio,0
cluster_reassign_count,0
cumulative_epsilon,0
delta,1e-05
epsilon,245.01035
epsilon_max,494.16606
epsilon_min,64.50385


ERROR:asyncio:Task was destroyed but it is pending!
task: <Task cancelling name='Task-34' coro=<Event.wait() running at /usr/lib/python3.11/asyncio/locks.py:213> wait_for=<Future cancelled>>


CQL Training:   0%|          | 0/5 [00:00<?, ?it/s]



In [None]:
# @title Cell 8D: 📊 實驗總結與分析
print("="*60)
print("🎉 實驗執行完成！")
print("="*60)

# 統計成功率
successful_experiments = sum(1 for r in experiment_results if r['success'])
total_experiments = len(experiment_results)
success_rate = (successful_experiments / total_experiments) * 100 if total_experiments > 0 else 0

logger.info(f"\n📊 實驗統計:")
logger.info(f"   - 總實驗數: {total_experiments}")
logger.info(f"   - 成功數: {successful_experiments}")
logger.info(f"   - 成功率: {success_rate:.1f}%")

# 初始化 pareto_achieved 列表
pareto_achieved = []

# 成功實驗的詳細統計
successful_results = [r for r in experiment_results if r['success']]
if successful_results:
    logger.info(f"\n📈 成功實驗詳細結果:")

    for result in successful_results:
        mode = result['mode']
        info = result['result_info']
        if info and 'avg_rewards' in info:
            rewards = info['avg_rewards']
            print(f"\n   {mode}:")
            print(f"     - 全域獎勵: {rewards['global']:.4f}")
            print(f"     - 個人化獎勵: {rewards['personalized']:.4f}")
            print(f"     - PFL獎勵: {rewards['pfl']:.4f}")
            print(f"     - 執行時間: {info['execution_time']:.2f} 分鐘")

            # 檢查Pareto目標
            pareto_check = {
                'mode': mode,
                'latency_ok': False,
                'epsilon_ok': False,
                'reward_ok': False
            }

            if info.get('privacy_stats'):
                privacy = info['privacy_stats']
                print(f"     - ε消耗: {privacy['consumed_epsilon']:.4f}")
                pareto_check['epsilon_ok'] = privacy['consumed_epsilon'] < 4
                if privacy['consumed_epsilon'] > 0:
                    print(f"     - 隱私效率: {rewards['pfl']/privacy['consumed_epsilon']:.4f} (獎勵/ε)")

            if info.get('latency_stats') and info['latency_stats']:
                latency = info['latency_stats']
                if 'mean_time' in latency:
                    print(f"     - 平均延遲: {latency['mean_time']:.2f}s")
                    pareto_check['latency_ok'] = latency['mean_time'] < 40
                if 'p95_time' in latency:
                    print(f"     - 95分位延遲: {latency['p95_time']:.2f}s")
                    if latency.get('warnings', 0) > 0:
                        print(f"     - ⚠️ 延遲警告: {latency['warnings']}次")

            pareto_check['reward_ok'] = rewards['pfl'] > 165

            # 檢查是否達成所有Pareto目標
            if all([pareto_check['latency_ok'], pareto_check['epsilon_ok'], pareto_check['reward_ok']]):
                pareto_achieved.append(mode)
                print(f"     - ✅ Pareto目標全部達成！")
            else:
                failed_targets = []
                if not pareto_check['latency_ok']:
                    failed_targets.append("延遲<40s")
                if not pareto_check['epsilon_ok']:
                    failed_targets.append("ε<4")
                if not pareto_check['reward_ok']:
                    failed_targets.append("獎勵>165")
                print(f"     - ⚠️ 未達成目標: {', '.join(failed_targets)}")

    # Pareto成就總結
    if pareto_achieved:
        print(f"\n🏆 達成Pareto目標的模式: {', '.join(pareto_achieved)}")
    else:
        print(f"\n⚠️ 沒有模式完全達成Pareto目標")

print(f"\n🔧 強化功能總結:")
print(f"   - Non-IID 數據分配: ✅ 已實施 (Dirichlet α=0.5)")
print(f"   - CQL 離線 RL: ✅ 已整合")
print(f"   - Adaptive Clipping: ✅ 已啟用")
print(f"   - 非同步聚合: ✅ 已支援")
print(f"   - 延遲監控: ✅ 已記錄")
print(f"   - Deterministic 模式: ✅ 已設定")
print(f"   - 動態聚類更新: ✅ 已實施")
print(f"   - FedAvg加權聚合: ✅ 已實施")
print(f"   - FedProx動態mu: ✅ 已實施")
print(f"   - DP優化 (RDP/L2/Dropout): ✅ 已實施")

# 保存實驗總結
import json
summary_path = os.path.join(BASE_OUTPUT_DIR, "experiment_summary_enhanced.json")
os.makedirs(os.path.dirname(summary_path), exist_ok=True)

with open(summary_path, 'w') as f:
    # 創建一個可序列化的版本
    serializable_results = []
    for r in experiment_results:
        res_copy = r.copy()
        if res_copy.get('result_info') and res_copy['result_info'].get('avg_rewards'):
             # 將 numpy float 轉換為 Python float
            for key, val in res_copy['result_info']['avg_rewards'].items():
                res_copy['result_info']['avg_rewards'][key] = float(val)
        serializable_results.append(res_copy)

    json.dump({
        'total_experiments': total_experiments,
        'successful_experiments': successful_experiments,
        'success_rate': success_rate,
        'results': serializable_results,
        'enhancements': {
            'non_iid': True,
            'cql': True,
            'adaptive_clipping': True,
            'async_aggregation': True,
            'latency_monitoring': True,
            'deterministic': True,
            'dynamic_clustering': True,
            'weighted_fedavg': True,
            'adaptive_fedprox': True,
            'dp_optimizations': True
        },
        'pareto_achieved': pareto_achieved  # 現在 pareto_achieved 已定義
    }, f, indent=4)

logger.info(f"\n📄 實驗總結已保存至: {summary_path}")

# 輸出 SUCCESS 標記
print("\n" + "="*60)
print("SUCCESS ✅")
print("="*60)

# 輸出圖檔路徑列表（將在下一個cell生成）
print("\n📊 準備生成以下圖表:")
print("1. training_history_enhanced.png - 訓練歷史比較")
print("2. reward_vs_epsilon.png - Reward vs Privacy Budget")
print("3. latency_analysis.png - 延遲分析")
print("4. non_iid_client_rewards.png - Non-IID客戶端獎勵分布")

In [None]:
# @title
# NEW-CELL: 驗收自動檢查
def run_validation_checks(eval_df, privacy_df, latency_df, history_df):
    """執行驗收自動檢查"""
    print("\n🧪 執行驗收檢查...")

    checks_passed = []

    # 檢查1: 指標欄位完整性
    try:
        if 'latency_df' in locals() and not latency_df.empty:
            required_cols = ["rtt", "local_training", "aggregation"]
            if all(col in latency_df.columns or
                   any(col in str(latency_df.columns) for col in required_cols)):
                print("✅ 檢查1通過: 延遲指標包含 rtt, local_training, aggregation")
                checks_passed.append(True)
            else:
                print("❌ 檢查1失敗: 缺少必要的延遲指標欄位")
                checks_passed.append(False)
        else:
            # 如果沒有詳細數據，檢查是否有 wall_time
            if 'wall_time' in latency_df.columns:
                print("✅ 檢查1通過: 包含 wall_time 指標")
                checks_passed.append(True)
            else:
                print("⚠️ 檢查1: 無延遲數據")
                checks_passed.append(False)
    except:
        print("⚠️ 檢查1: 無法執行")
        checks_passed.append(False)

    # 檢查2: 隱私預算範圍
    try:
        if not privacy_df.empty and 'cumulative_epsilon' in privacy_df.columns:
            max_epsilon = privacy_df['cumulative_epsilon'].max()
            if 0 < max_epsilon < 10:
                print(f"✅ 檢查2通過: 隱私預算 ε={max_epsilon:.4f} 在合理範圍內")
                checks_passed.append(True)
            else:
                print(f"❌ 檢查2失敗: 隱私預算 ε={max_epsilon:.4f} 超出範圍")
                checks_passed.append(False)
        else:
            print("⚠️ 檢查2: 無隱私數據")
            checks_passed.append(False)
    except:
        print("⚠️ 檢查2: 無法執行")
        checks_passed.append(False)

    # 檢查3: 公平性指標
    try:
        if not eval_df.empty and 'reward_pfl_finetuned' in eval_df.columns:
            # 檢查是否有 jain, gini, theil
            has_fairness = any(col in eval_df.columns for col in ['jain', 'gini', 'theil'])

            # 計算 robust sigma
            rewards = eval_df['reward_pfl_finetuned'].dropna()
            if len(rewards) > 0:
                q75, q25 = np.percentile(rewards, [75, 25])
                robust_sigma = (q75 - q25) / 1.349

                if has_fairness:
                    print(f"✅ 檢查3通過: 包含公平性指標，robust σ={robust_sigma:.2f}")
                else:
                    print(f"✅ 檢查3部分通過: robust σ={robust_sigma:.2f}，但缺少 Jain/Gini/Theil")
                checks_passed.append(True)
            else:
                print("❌ 檢查3失敗: 無獎勵數據")
                checks_passed.append(False)
        else:
            print("⚠️ 檢查3: 無評估數據")
            checks_passed.append(False)
    except:
        print("⚠️ 檢查3: 無法執行")
        checks_passed.append(False)

    # 總結
    total_passed = sum(checks_passed)
    total_checks = len(checks_passed)

    print(f"\n驗收結果: {total_passed}/{total_checks} 檢查通過")

    if total_passed == total_checks:
        print("✅ 全部通過")
    else:
        print("⚠️ 部分檢查未通過，請檢查實驗配置")

    return total_passed == total_checks

# 在 Cell 9 最後加入驗收檢查
if 'eval_df' in locals() and 'privacy_df' in locals() and 'latency_df' in locals():
    validation_passed = run_validation_checks(eval_df, privacy_df, latency_df, history_df)
else:
    print("⚠️ 缺少必要的數據框，無法執行驗收檢查")

In [None]:
# @title Cell 9: 📊 實驗結果深度分析與視覺化（修正與最佳化版）
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import os
import glob
import numpy as np
import logging
import traceback

# --- 視覺化設定 ---
sns.set_theme(style="whitegrid", palette="muted", font_scale=1.2)
FIGURES_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "figures_optimized")
os.makedirs(FIGURES_OUTPUT_DIR, exist_ok=True)

# 定義固定的調色板與順序，確保所有圖表顏色一致
PALETTE = {
    "ClusteredFL": "#1f77b4",
    "FedProx": "#ff7f0e",
    "FedAvg": "#2ca02c",
    "CQL": "#d62728",
    "Isolated": "#9467bd",
    "Centralized": "#8c564b"
}
HUE_ORDER = ["ClusteredFL", "FedProx", "FedAvg", "CQL"]

# --- 1. 結果數據載入 ---
def load_all_results_as_dict(base_output_dir):
    """
    從指定的基底目錄載入所有實驗模式與所有隨機種子的結果。
    此函數確保返回一個字典，鍵為數據類型，值為 DataFrame。
    """
    results_map = {
        'eval': ('*_evaluation_results.csv', []),
        'history': ('*_training_history.csv', []),
        'privacy': ('*_privacy_costs.csv', []),
        'latency': ('*_latency_monitor.csv', [])
    }

    if not os.path.exists(base_output_dir):
        logger.error(f"結果目錄未找到: {base_output_dir}")
        return {key: pd.DataFrame() for key in results_map}

    for seed_folder in glob.glob(os.path.join(base_output_dir, 'seed_*')):
        try:
            seed = int(os.path.basename(seed_folder).split('_')[1])
        except (ValueError, IndexError):
            continue

        for mode_folder in glob.glob(os.path.join(seed_folder, '*')):
            mode = os.path.basename(mode_folder)
            if not os.path.isdir(mode_folder):
                continue

            for key, (pattern, data_list) in results_map.items():
                file_paths = glob.glob(os.path.join(mode_folder, pattern))
                if file_paths:
                    try:
                        df = pd.read_csv(file_paths[0])
                        if not df.empty:
                            df['mode'] = mode
                            df['seed'] = int(seed)
                            data_list.append(df)
                    except pd.errors.EmptyDataError:
                        logger.warning(f"文件為空，跳過: {file_paths[0]}")
                    except Exception as e:
                        logger.error(f"讀取失敗: {file_paths[0]}, 錯誤: {e}")

    # 使用字典推導式確保返回的是一個字典
    return {key: pd.concat(val[1], ignore_index=True) if val[1] else pd.DataFrame()
            for key, val in results_map.items()}

# --- 2. 核心繪圖函數 (與前版相同，此處為求完整性再次提供) ---
def plot_pareto_frontier_analysis(eval_df, privacy_df, latency_df, output_dir, goals):
    """
    繪製 Pareto 前沿分析圖，展示不同策略在三個目標上的權衡。
    """
    if eval_df.empty or privacy_df.empty or latency_df.empty:
        logger.warning("缺少 Pareto 分析所需的數據，跳過此圖表。")
        return

    # 數據整合
    final_eval = eval_df.groupby(['mode', 'seed'])['reward_pfl_finetuned'].mean().reset_index()
    final_privacy = privacy_df.groupby(['mode', 'seed'])['cumulative_epsilon'].max().reset_index()
    final_latency = latency_df.groupby(['mode', 'seed'])['wall_time'].quantile(0.95).reset_index() # P95 Latency

    df = pd.merge(final_eval, final_privacy, on=['mode', 'seed'])
    df = pd.merge(df, final_latency, on=['mode', 'seed'])
    df = df.rename(columns={
        'reward_pfl_finetuned': 'Reward',
        'cumulative_epsilon': 'Privacy_Epsilon',
        'wall_time': 'P95_Latency'
    })

    df['is_pareto_optimal'] = (
        (df['Reward'] > goals['reward']) &
        (df['Privacy_Epsilon'] < goals['privacy']) &
        (df['P95_Latency'] < goals['latency'])
    )

    fig, axes = plt.subplots(1, 3, figsize=(24, 7))
    fig.suptitle('Pareto Optimization Frontier Analysis', fontsize=20, y=1.03)

    # 圖 1: Reward vs. Privacy
    sns.scatterplot(data=df, x='Privacy_Epsilon', y='Reward', hue='mode', style='is_pareto_optimal',
                    s=200, alpha=0.8, palette=PALETTE, hue_order=HUE_ORDER, ax=axes[0])
    axes[0].axhline(goals['reward'], ls='--', color='green', label=f"Reward Target (> {goals['reward']})")
    axes[0].axvline(goals['privacy'], ls='--', color='red', label=f"Privacy Budget (< {goals['privacy']} ε)")
    axes[0].set_title('Performance vs. Privacy', fontsize=16)
    axes[0].set_xlabel('Total Privacy Budget ε (Lower is Better)', fontsize=12)
    axes[0].set_ylabel('Final Average Reward (Higher is Better)', fontsize=12)
    axes[0].legend(title='Optimal', loc='lower right')

    # 圖 2: Reward vs. Latency
    sns.scatterplot(data=df, x='P95_Latency', y='Reward', hue='mode', style='is_pareto_optimal',
                    s=200, alpha=0.8, palette=PALETTE, hue_order=HUE_ORDER, ax=axes[1])
    axes[1].axhline(goals['reward'], ls='--', color='green')
    axes[1].axvline(goals['latency'], ls='--', color='orange', label=f"Latency Target (< {goals['latency']}s P95)")
    axes[1].set_title('Performance vs. Latency', fontsize=16)
    axes[1].set_xlabel('P95 End-to-End Latency (s) (Lower is Better)', fontsize=12)
    axes[1].set_ylabel('')
    axes[1].legend(title='Optimal', loc='lower right')

    # 圖 3: Latency vs. Privacy
    sns.scatterplot(data=df, x='Privacy_Epsilon', y='P95_Latency', hue='mode', style='is_pareto_optimal',
                    s=200, alpha=0.8, palette=PALETTE, hue_order=HUE_ORDER, ax=axes[2])
    axes[2].axhline(goals['latency'], ls='--', color='orange')
    axes[2].axvline(goals['privacy'], ls='--', color='red')
    axes[2].set_title('Latency vs. Privacy', fontsize=16)
    axes[2].set_xlabel('Total Privacy Budget ε (Lower is Better)', fontsize=12)
    axes[2].set_ylabel('P95 End-to-End Latency (s) (Lower is Better)', fontsize=12)
    axes[2].legend(title='Optimal', loc='upper left')

    plt.tight_layout(rect=[0, 0, 1, 0.96])
    plt.savefig(os.path.join(output_dir, 'pareto_frontier_analysis.png'), dpi=300, bbox_inches='tight')
    plt.show()

def plot_training_performance(history_df, output_dir, reward_goal):
    if history_df.empty: return
    plt.figure(figsize=(14, 8))
    sns.lineplot(data=history_df, x='round', y='avg_reward', hue='mode',
                 hue_order=HUE_ORDER, palette=PALETTE,
                 errorbar='sd', linewidth=2.5, marker='o', markersize=6)
    plt.axhline(y=reward_goal, color='green', linestyle='--', linewidth=2, label=f'Reward Target ({reward_goal})')
    plt.title('Federated RL Training Performance (with Std Dev)', fontsize=18)
    plt.xlabel('Communication Round', fontsize=14)
    plt.ylabel('Average Episodic Reward', fontsize=14)
    plt.legend(title='FL Mode', loc='lower right', fontsize=12)
    plt.grid(True, which='both', linestyle='--', linewidth=0.5)
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'training_performance.png'), dpi=300, bbox_inches='tight')
    plt.show()

def plot_latency_breakdown(latency_df, output_dir, latency_goal):
    if latency_df.empty: return
    components = ['client_compute_time', 'server_aggregation_time', 'client_to_server_comm', 'server_to_client_comm']
    if not all(c in latency_df.columns for c in components):
        logger.warning("延遲數據缺少組件欄位，無法生成分解圖。")
        return
    latency_df['communication_time'] = latency_df['client_to_server_comm'] + latency_df['server_to_client_comm']
    avg_latency = latency_df.groupby('mode')[['client_compute_time', 'server_aggregation_time', 'communication_time']].mean()
    avg_latency = avg_latency.reindex(HUE_ORDER).dropna()
    fig, axes = plt.subplots(1, 2, figsize=(18, 7))
    fig.suptitle('System Latency Analysis', fontsize=20)
    avg_latency.plot(kind='bar', stacked=True, ax=axes[0], color=['#1f77b4', '#ff7f0e', '#2ca02c'])
    axes[0].set_title('Average Latency Breakdown per Round', fontsize=16)
    axes[0].set_ylabel('Average Time (s)', fontsize=12)
    axes[0].tick_params(axis='x', rotation=45)
    axes[0].legend(['Client Compute', 'Server Aggregation', 'Communication'], fontsize=10)
    p95_latency = latency_df.groupby(['mode', 'seed'])['wall_time'].quantile(0.95).reset_index()
    sns.boxplot(data=p95_latency, x='mode', y='wall_time', order=HUE_ORDER, palette=PALETTE, ax=axes[1])
    axes[1].axhline(y=latency_goal, color='r', linestyle='--', label=f'Latency Target ({latency_goal}s)')
    axes[1].set_title('P95 End-to-End Latency Distribution', fontsize=16)
    axes[1].set_ylabel('P95 Latency (s)', fontsize=12)
    axes[1].tick_params(axis='x', rotation=45)
    axes[1].legend()
    plt.tight_layout(rect=[0, 0, 1, 0.95])
    plt.savefig(os.path.join(output_dir, 'latency_breakdown_and_distribution.png'), dpi=300, bbox_inches='tight')
    plt.show()

def plot_fairness_and_personalization(eval_df, output_dir, reward_goal):
    if eval_df.empty: return
    fig, axes = plt.subplots(1, 2, figsize=(18, 7))
    fig.suptitle('Client Fairness and Personalization Benefit', fontsize=20)
    sns.violinplot(data=eval_df, x='mode', y='reward_pfl_finetuned', order=HUE_ORDER, palette=PALETTE, ax=axes[0], inner='quartile')
    axes[0].axhline(y=reward_goal, color='g', linestyle='--', label=f'Reward Target ({reward_goal})')
    axes[0].set_title('Client Reward Distribution (Fairness)', fontsize=16)
    axes[0].set_xlabel('FL Mode', fontsize=12)
    axes[0].set_ylabel('Final PFL Reward', fontsize=12)
    axes[0].tick_params(axis='x', rotation=45)
    axes[0].legend()
    eval_agg = eval_df.groupby('mode')[['reward_global', 'reward_pfl_finetuned']].mean().reset_index()
    eval_melted = eval_agg.melt(id_vars='mode', value_vars=['reward_global', 'reward_pfl_finetuned'],
                                var_name='ModelType', value_name='AverageReward')
    sns.barplot(data=eval_melted, x='mode', y='AverageReward', hue='ModelType',
                order=HUE_ORDER, palette={'reward_global': 'skyblue', 'reward_pfl_finetuned': 'darkorange'}, ax=axes[1])
    axes[1].set_title('Personalization Benefit (Global vs. PFL)', fontsize=16)
    axes[1].set_xlabel('FL Mode', fontsize=12)
    axes[1].set_ylabel('Average Final Reward', fontsize=12)
    axes[1].tick_params(axis='x', rotation=45)
    plt.tight_layout(rect=[0, 0, 1, 0.95])
    plt.savefig(os.path.join(output_dir, 'fairness_and_personalization.png'), dpi=300, bbox_inches='tight')
    plt.show()

# --- 3. 驗證與總結 ---
def final_summary_and_validation(results_dict):
    """
    生成最終的摘要報告和自動化驗收檢查。
    """
    print("\n" + "="*60)
    print("🎉 最終實驗結果摘要與驗收")
    print("="*60)

    summary_data = []
    pareto_goals = {'reward': 165, 'privacy': 4.0, 'latency': 40.0}

    for mode in HUE_ORDER:
        mode_eval = results_dict['eval'][results_dict['eval']['mode'] == mode]
        mode_privacy = results_dict['privacy'][results_dict['privacy']['mode'] == mode]
        mode_latency = results_dict['latency'][results_dict['latency']['mode'] == mode]

        if any([df.empty for df in [mode_eval, mode_privacy, mode_latency]]):
            continue

        avg_reward = mode_eval['reward_pfl_finetuned'].mean()
        max_epsilon = mode_privacy.groupby('seed')['cumulative_epsilon'].max().mean()
        p95_latency = mode_latency.groupby('seed')['wall_time'].quantile(0.95).mean()

        reward_ok = avg_reward > pareto_goals['reward']
        privacy_ok = max_epsilon < pareto_goals['privacy']
        latency_ok = p95_latency < pareto_goals['latency']

        summary_data.append({
            'Mode': mode,
            'Avg Reward': f"{avg_reward:.2f}",
            'Privacy ε': f"{max_epsilon:.2f}",
            'P95 Latency (s)': f"{p95_latency:.2f}",
            'Reward OK': '✅' if reward_ok else '❌',
            'Privacy OK': '✅' if privacy_ok else '❌',
            'Latency OK': '✅' if latency_ok else '❌',
            'Pareto Optimal': '🏆' if all([reward_ok, privacy_ok, latency_ok]) else ''
        })

    summary_df = pd.DataFrame(summary_data)
    print(summary_df.to_string())

    print("\n" + "="*60)
    print("SUCCESS: 分析與視覺化流程執行完畢。")
    print(f"📁 所有圖表已保存至: {FIGURES_OUTPUT_DIR}")
    print("="*60)

# --- 主執行流程 ---
try:
    # 載入所有數據並命名為 results_dict 來強調其結構
    results_dict = load_all_results_as_dict(BASE_OUTPUT_DIR)

    # *** 錯誤修正 ***：
    # 檢查 results_dict.values() 中的所有 DataFrame 是否為空
    # 這樣可以正確處理字典，避免 'tuple' object has no attribute 'values' 錯誤
    if not results_dict or all(df.empty for df in results_dict.values()):
        raise FileNotFoundError("在指定的輸出目錄下未找到任何有效的 CSV 結果文件。")

    # 定義 Pareto 目標
    pareto_goals = {'reward': 165, 'privacy': 4.0, 'latency': 40.0}

    # 執行視覺化
    print("📊 正在生成 Pareto 前沿分析圖...")
    plot_pareto_frontier_analysis(results_dict['eval'], results_dict['privacy'], results_dict['latency'], FIGURES_OUTPUT_DIR, pareto_goals)

    print("📈 正在生成訓練效能趨勢圖...")
    plot_training_performance(results_dict['history'], FIGURES_OUTPUT_DIR, pareto_goals['reward'])

    print("⏱️ 正在生成延遲分析圖...")
    plot_latency_breakdown(results_dict['latency'], FIGURES_OUTPUT_DIR, pareto_goals['latency'])

    print("🤝 正在生成公平性與個人化效益圖...")
    plot_fairness_and_personalization(results_dict['eval'], FIGURES_OUTPUT_DIR, pareto_goals['reward'])

    # 輸出最終總結
    final_summary_and_validation(results_dict)

except FileNotFoundError as e:
    logger.error(f"執行失敗: {e}")
    print(f"\n❌ 錯誤: {e}。請檢查 BASE_OUTPUT_DIR ('{BASE_OUTPUT_DIR}') 是否正確，以及 Cell 8 的實驗是否已成功執行並產生了 .csv 結果文件。")
except Exception as e:
    logger.error(f"分析過程中發生未知錯誤: {e}")
    traceback.print_exc()