# Chapter 4: 収束診断と性能評価 - 実践的診断ガイド

## 学習目標
- MCMCの収束診断の重要性を理解し、適切な判断ができるようになる
- 視覚的診断手法を習得し、問題の早期発見ができる
- 数値的診断統計量を計算・解釈し、定量的評価ができる
- 有効サンプルサイズと自己相関時間を理解し、効率を評価できる
- 複数チェーンを用いた収束診断を実践し、信頼性を高められる
- 実践的な診断手順を身につけ、自動化された診断システムを構築できる

## なぜ収束診断が重要なのか？

MCMCは「十分長く走らせれば必ず収束する」理論的保証がありますが、**現実の計算時間は有限**です。

### 収束診断の失敗による深刻な結果

1. **間違った推論**: 収束していないサンプルによる誤った結論
2. **再現性の欠如**: 異なる実行で異なる結果
3. **信頼性の失墜**: 研究や意思決定の信頼性低下

### 本章のアプローチ

理論だけでなく、**実際に遭遇する問題**と**実践的な解決策**に焦点を当てます：

- 収束の失敗パターンの理解
- 自動診断システムの構築
- 問題発見から改善までの完全ワークフロー

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from statsmodels.tsa.stattools import acf
from statsmodels.stats.diagnostic import acorr_ljungbox
import warnings
warnings.filterwarnings('ignore')

plt.rcParams['font.family'] = 'DejaVu Sans'
sns.set_style("whitegrid")
np.random.seed(42)

## 4.1 収束診断の重要性

MCMCサンプリングにおいて、以下の点を確認する必要があります：

1. **収束**：マルコフ連鎖が定常分布に達したか？
2. **混合**：状態空間を十分に探索しているか？
3. **効率**：自己相関が十分に小さいか？

### 収束の失敗例
まず、収束しない場合の例を見てみましょう。

In [None]:
# 収束が困難な分布の例：多峰性分布
def multimodal_log_pdf(x):
    """多峰性分布（3つのピーク）"""
    component1 = stats.norm.logpdf(x, -4, 0.5)
    component2 = stats.norm.logpdf(x, 0, 0.5)
    component3 = stats.norm.logpdf(x, 4, 0.5)
    
    # log(exp(c1) + exp(c2) + exp(c3))
    max_comp = np.maximum(np.maximum(component1, component2), component3)
    return max_comp + np.log(
        np.exp(component1 - max_comp) + 
        np.exp(component2 - max_comp) + 
        np.exp(component3 - max_comp)
    )

def metropolis_hastings_simple(target_log_pdf, initial_value, n_samples, step_size=0.5):
    """シンプルなメトロポリス・ヘイスティングス実装"""
    samples = np.zeros(n_samples)
    current = initial_value
    current_log_prob = target_log_pdf(current)
    n_accepted = 0
    
    for i in range(n_samples):
        # 提案
        proposed = current + np.random.normal(0, step_size)
        proposed_log_prob = target_log_pdf(proposed)
        
        # 受理確率
        log_alpha = proposed_log_prob - current_log_prob
        alpha = min(1.0, np.exp(log_alpha))
        
        # 受理/棄却
        if np.random.rand() < alpha:
            current = proposed
            current_log_prob = proposed_log_prob
            n_accepted += 1
        
        samples[i] = current
    
    return samples, n_accepted / n_samples

# 異なる初期値と異なるステップサイズでサンプリング
initial_values = [-4, 0, 4]
step_sizes = [0.1, 1.0, 3.0]
n_samples = 5000

# 結果の保存
sampling_results = {}

for init_val in initial_values:
    for step_size in step_sizes:
        samples, acc_rate = metropolis_hastings_simple(
            multimodal_log_pdf, init_val, n_samples, step_size
        )
        key = f"init_{init_val}_step_{step_size}"
        sampling_results[key] = {
            'samples': samples,
            'acceptance_rate': acc_rate,
            'initial_value': init_val,
            'step_size': step_size
        }

print("収束の問題を示すサンプリング結果:")
for key, result in sampling_results.items():
    mean_sample = np.mean(result['samples'][1000:])
    print(f"{key}: 受理率={result['acceptance_rate']:.3f}, 平均={mean_sample:.2f}")

In [None]:
# 収束問題の可視化
fig, axes = plt.subplots(3, 3, figsize=(15, 12))

# 真の分布を計算
x_range = np.linspace(-6, 6, 1000)
true_density = np.exp(multimodal_log_pdf(x_range))

for i, init_val in enumerate(initial_values):
    for j, step_size in enumerate(step_sizes):
        key = f"init_{init_val}_step_{step_size}"
        samples = sampling_results[key]['samples']
        
        # トレースプロット
        axes[i, j].plot(samples[:2000], alpha=0.7, linewidth=0.8)
        axes[i, j].axhline(init_val, color='red', linestyle='--', alpha=0.7, label='Initial')
        axes[i, j].set_title(f'Init={init_val}, Step={step_size}\nAcc={sampling_results[key]["acceptance_rate"]:.3f}')
        axes[i, j].set_xlabel('Iteration')
        axes[i, j].set_ylabel('Value')
        axes[i, j].legend()
        axes[i, j].grid(True, alpha=0.3)

plt.tight_layout()
plt.suptitle('Convergence Issues in Multimodal Distribution', fontsize=16, y=1.02)
plt.show()

# ヒストグラム比較
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

for i, step_size in enumerate(step_sizes):
    for init_val in initial_values:
        key = f"init_{init_val}_step_{step_size}"
        samples = sampling_results[key]['samples'][1000:]
        axes[i].hist(samples, bins=50, alpha=0.5, density=True, 
                    label=f'Init={init_val}')
    
    axes[i].plot(x_range, true_density, 'k-', linewidth=2, label='True')
    axes[i].set_title(f'Step Size = {step_size}')
    axes[i].set_xlabel('Value')
    axes[i].set_ylabel('Density')
    axes[i].legend()

plt.tight_layout()
plt.show()

## 4.2 視覚的診断手法

### 4.2.1 トレースプロット
パラメータの時系列変化を可視化し、収束と混合を視覚的に確認します。

In [None]:
def plot_trace_diagnostics(samples, parameter_names=None, title="Trace Diagnostics"):
    """
    包括的なトレース診断プロット
    """
    if samples.ndim == 1:
        samples = samples.reshape(-1, 1)
    
    n_params = samples.shape[1]
    if parameter_names is None:
        parameter_names = [f'Parameter {i+1}' for i in range(n_params)]
    
    fig, axes = plt.subplots(n_params, 4, figsize=(16, 4*n_params))
    if n_params == 1:
        axes = axes.reshape(1, -1)
    
    for i in range(n_params):
        param_samples = samples[:, i]
        
        # 1. 全トレースプロット
        axes[i, 0].plot(param_samples, alpha=0.7, linewidth=0.8)
        axes[i, 0].set_title(f'{parameter_names[i]} - Full Trace')
        axes[i, 0].set_xlabel('Iteration')
        axes[i, 0].set_ylabel('Value')
        axes[i, 0].grid(True, alpha=0.3)
        
        # 2. 前半・後半の比較
        mid_point = len(param_samples) // 2
        axes[i, 1].plot(param_samples[:mid_point], alpha=0.7, label='First half', color='blue')
        axes[i, 1].plot(range(mid_point, len(param_samples)), 
                       param_samples[mid_point:], alpha=0.7, label='Second half', color='red')
        axes[i, 1].set_title(f'{parameter_names[i]} - First vs Second Half')
        axes[i, 1].set_xlabel('Iteration')
        axes[i, 1].set_ylabel('Value')
        axes[i, 1].legend()
        axes[i, 1].grid(True, alpha=0.3)
        
        # 3. ランニング平均
        running_mean = np.cumsum(param_samples) / np.arange(1, len(param_samples) + 1)
        axes[i, 2].plot(running_mean)
        # 信頼区間も表示
        running_var = np.cumsum((param_samples - running_mean)**2) / np.arange(1, len(param_samples) + 1)
        running_se = np.sqrt(running_var / np.arange(1, len(param_samples) + 1))
        axes[i, 2].fill_between(range(len(running_mean)), 
                               running_mean - 1.96*running_se,
                               running_mean + 1.96*running_se, alpha=0.3)
        axes[i, 2].set_title(f'{parameter_names[i]} - Running Mean')
        axes[i, 2].set_xlabel('Iteration')
        axes[i, 2].set_ylabel('Running Mean')
        axes[i, 2].grid(True, alpha=0.3)
        
        # 4. 密度プロット（時間窓別）
        n_windows = 4
        window_size = len(param_samples) // n_windows
        colors = plt.cm.viridis(np.linspace(0, 1, n_windows))
        
        for w in range(n_windows):
            start_idx = w * window_size
            end_idx = (w + 1) * window_size if w < n_windows - 1 else len(param_samples)
            window_samples = param_samples[start_idx:end_idx]
            
            if len(window_samples) > 10:  # 十分なサンプルがある場合のみ
                axes[i, 3].hist(window_samples, bins=30, alpha=0.5, density=True,
                               color=colors[w], label=f'Window {w+1}')
        
        axes[i, 3].set_title(f'{parameter_names[i]} - Density by Time Window')
        axes[i, 3].set_xlabel('Value')
        axes[i, 3].set_ylabel('Density')
        axes[i, 3].legend()
    
    plt.suptitle(title, fontsize=16)
    plt.tight_layout()
    plt.show()

# 良好な収束例（正規分布）
good_samples, _ = metropolis_hastings_simple(
    lambda x: stats.norm.logpdf(x, 0, 1), 0, 5000, 1.0
)

plot_trace_diagnostics(good_samples, ['Normal Distribution'], "Good Convergence Example")

### 4.2.2 自己相関関数の分析

In [None]:
def plot_autocorrelation_analysis(samples, max_lags=200, parameter_names=None):
    """
    自己相関関数の詳細分析
    """
    if samples.ndim == 1:
        samples = samples.reshape(-1, 1)
    
    n_params = samples.shape[1]
    if parameter_names is None:
        parameter_names = [f'Parameter {i+1}' for i in range(n_params)]
    
    fig, axes = plt.subplots(n_params, 3, figsize=(15, 5*n_params))
    if n_params == 1:
        axes = axes.reshape(1, -1)
    
    autocorr_results = {}
    
    for i in range(n_params):
        param_samples = samples[:, i]
        
        # 自己相関の計算
        lags = min(max_lags, len(param_samples) // 4)
        autocorr = acf(param_samples, nlags=lags, fft=True)
        
        # 統合自己相関時間の計算
        # τ_int = 1 + 2 * Σ(ρ(k)) for k where ρ(k) > 0
        tau_int = 1.0
        for k in range(1, len(autocorr)):
            if autocorr[k] > 0.01:  # 閾値以上の相関がある間は加算
                tau_int += 2 * autocorr[k]
            else:
                break
        
        # 有効サンプルサイズ
        n_eff = len(param_samples) / (2 * tau_int + 1)
        
        autocorr_results[parameter_names[i]] = {
            'tau_int': tau_int,
            'n_eff': n_eff,
            'autocorr': autocorr
        }
        
        # 1. 自己相関関数プロット
        axes[i, 0].plot(autocorr, 'b-', alpha=0.8)
        axes[i, 0].axhline(0, color='k', linestyle='--', alpha=0.5)
        axes[i, 0].axhline(0.05, color='r', linestyle='--', alpha=0.5, label='5% threshold')
        axes[i, 0].axhline(-0.05, color='r', linestyle='--', alpha=0.5)
        axes[i, 0].set_title(f'{parameter_names[i]} - ACF\nτ_int = {tau_int:.1f}, N_eff = {n_eff:.0f}')
        axes[i, 0].set_xlabel('Lag')
        axes[i, 0].set_ylabel('Autocorrelation')
        axes[i, 0].legend()
        axes[i, 0].grid(True, alpha=0.3)
        
        # 2. 対数スケールでの自己相関
        positive_autocorr = np.maximum(autocorr, 1e-10)
        axes[i, 1].semilogy(positive_autocorr, 'b-', alpha=0.8)
        
        # 指数的減衰のフィッティング
        try:
            # 最初の数点を使って指数的減衰をフィット
            fit_range = min(50, len(autocorr) // 2)
            x_fit = np.arange(fit_range)
            y_fit = autocorr[:fit_range]
            
            # 線形回帰で指数的減衰の係数を推定
            mask = y_fit > 0.01
            if np.sum(mask) > 5:
                coeffs = np.polyfit(x_fit[mask], np.log(y_fit[mask]), 1)
                exp_fit = np.exp(coeffs[1] + coeffs[0] * x_fit)
                axes[i, 1].plot(x_fit, exp_fit, 'r--', alpha=0.7, 
                               label=f'Exp fit: τ = {-1/coeffs[0]:.1f}')
        except:
            pass
        
        axes[i, 1].set_title(f'{parameter_names[i]} - ACF (Log Scale)')
        axes[i, 1].set_xlabel('Lag')
        axes[i, 1].set_ylabel('Log Autocorrelation')
        axes[i, 1].legend()
        axes[i, 1].grid(True, alpha=0.3)
        
        # 3. 薄化（thinning）の効果
        thin_factors = [1, 2, 5, 10, 20]
        colors = plt.cm.viridis(np.linspace(0, 1, len(thin_factors)))
        
        for j, thin in enumerate(thin_factors):
            if thin < len(param_samples) // 10:  # 十分なサンプルが残る場合のみ
                thinned_samples = param_samples[::thin]
                if len(thinned_samples) > 100:
                    thin_lags = min(50, len(thinned_samples) // 4)
                    thin_autocorr = acf(thinned_samples, nlags=thin_lags, fft=True)
                    axes[i, 2].plot(thin_autocorr, color=colors[j], alpha=0.7, 
                                   label=f'Thin={thin}')
        
        axes[i, 2].axhline(0, color='k', linestyle='--', alpha=0.5)
        axes[i, 2].axhline(0.05, color='r', linestyle='--', alpha=0.5)
        axes[i, 2].axhline(-0.05, color='r', linestyle='--', alpha=0.5)
        axes[i, 2].set_title(f'{parameter_names[i]} - Effect of Thinning')
        axes[i, 2].set_xlabel('Lag')
        axes[i, 2].set_ylabel('Autocorrelation')
        axes[i, 2].legend()
        axes[i, 2].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return autocorr_results

# 自己相関分析の実行
autocorr_results = plot_autocorrelation_analysis(good_samples, parameter_names=['Normal Distribution'])

print("自己相関分析結果:")
for param, results in autocorr_results.items():
    print(f"{param}:")
    print(f"  統合自己相関時間: {results['tau_int']:.2f}")
    print(f"  有効サンプルサイズ: {results['n_eff']:.0f}")
    print(f"  効率: {results['n_eff']/len(good_samples):.2%}")

## 4.3 数値的診断統計量

### 4.3.1 Gelman-Rubin統計量（$\hat{R}$）

複数のチェーンを使って収束を診断する最も重要な統計量です。

In [None]:
def gelman_rubin_diagnostic(chains, split_chains=True):
    """
    Gelman-Rubin収束診断統計量の計算
    
    Parameters:
    - chains: shape (n_chains, n_samples) または (n_chains, n_samples, n_params)
    - split_chains: 各チェーンを前半・後半に分割するか
    
    Returns:
    - R_hat: R-hat統計量
    - n_eff: 有効サンプルサイズ
    """
    chains = np.array(chains)
    
    if chains.ndim == 2:
        # 単一パラメータの場合
        chains = chains[:, :, np.newaxis]
    
    n_chains, n_samples, n_params = chains.shape
    
    if split_chains:
        # 各チェーンを前半・後半に分割
        mid_point = n_samples // 2
        first_half = chains[:, :mid_point, :]
        second_half = chains[:, mid_point:, :]
        chains_split = np.concatenate([first_half, second_half], axis=0)
        n_chains *= 2
        n_samples = mid_point
        chains = chains_split
    
    R_hat = np.zeros(n_params)
    n_eff = np.zeros(n_params)
    
    for p in range(n_params):
        # 各チェーンの平均と分散
        chain_means = np.mean(chains[:, :, p], axis=1)
        chain_vars = np.var(chains[:, :, p], axis=1, ddof=1)
        
        # 全体平均
        overall_mean = np.mean(chain_means)
        
        # チェーン間分散 B
        B = n_samples * np.var(chain_means, ddof=1)
        
        # チェーン内分散 W
        W = np.mean(chain_vars)
        
        # 分散の推定値
        var_plus = ((n_samples - 1) * W + B) / n_samples
        
        # R-hat統計量
        R_hat[p] = np.sqrt(var_plus / W) if W > 0 else np.inf
        
        # 有効サンプルサイズの計算
        # 各チェーンの自己相関を考慮
        all_samples = chains[:, :, p].flatten()
        
        # 自己相関時間の推定
        try:
            autocorr = acf(all_samples, nlags=min(200, len(all_samples)//4), fft=True)
            tau_int = 1.0
            for k in range(1, len(autocorr)):
                if autocorr[k] > 0.01:
                    tau_int += 2 * autocorr[k]
                else:
                    break
            n_eff[p] = len(all_samples) / (2 * tau_int + 1)
        except:
            n_eff[p] = len(all_samples) / 10  # 保守的な推定
    
    if n_params == 1:
        return R_hat[0], n_eff[0]
    else:
        return R_hat, n_eff

def run_multiple_chains(target_log_pdf, initial_values, n_samples, step_size=1.0):
    """
    複数のチェーンを並列実行
    """
    n_chains = len(initial_values)
    chains = np.zeros((n_chains, n_samples))
    acceptance_rates = np.zeros(n_chains)
    
    for i, init_val in enumerate(initial_values):
        samples, acc_rate = metropolis_hastings_simple(
            target_log_pdf, init_val, n_samples, step_size
        )
        chains[i] = samples
        acceptance_rates[i] = acc_rate
    
    return chains, acceptance_rates

# 複数チェーンでの収束診断
print("複数チェーンでのサンプリング実行中...")

# 正規分布での例
initial_values_normal = [-2, -1, 0, 1, 2]
chains_normal, acc_rates_normal = run_multiple_chains(
    lambda x: stats.norm.logpdf(x, 0, 1), 
    initial_values_normal, 
    3000, 
    step_size=1.0
)

# 多峰性分布での例
initial_values_multimodal = [-4, -2, 0, 2, 4]
chains_multimodal, acc_rates_multimodal = run_multiple_chains(
    multimodal_log_pdf,
    initial_values_multimodal,
    3000,
    step_size=2.0
)

# Gelman-Rubin診断の実行
R_hat_normal, n_eff_normal = gelman_rubin_diagnostic(chains_normal)
R_hat_multimodal, n_eff_multimodal = gelman_rubin_diagnostic(chains_multimodal)

print(f"\n=== Gelman-Rubin診断結果 ===")
print(f"正規分布:")
print(f"  R-hat: {R_hat_normal:.4f}")
print(f"  有効サンプルサイズ: {n_eff_normal:.0f}")
print(f"  平均受理率: {np.mean(acc_rates_normal):.3f}")

print(f"\n多峰性分布:")
print(f"  R-hat: {R_hat_multimodal:.4f}")
print(f"  有効サンプルサイズ: {n_eff_multimodal:.0f}")
print(f"  平均受理率: {np.mean(acc_rates_multimodal):.3f}")

print(f"\n判定基準:")
print(f"  R-hat < 1.01: 収束良好")
print(f"  1.01 ≤ R-hat < 1.1: 注意が必要")
print(f"  R-hat ≥ 1.1: 収束不良")

In [None]:
# 複数チェーンの可視化
def plot_multiple_chains(chains, title="Multiple Chains Analysis", parameter_name="Parameter", true_mean=None):
    """
    複数チェーンの詳細分析プロット
    """
    n_chains, n_samples = chains.shape
    
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    
    colors = plt.cm.tab10(np.linspace(0, 1, n_chains))
    
    # 1. 全チェーンのトレースプロット
    for i in range(n_chains):
        axes[0, 0].plot(chains[i], alpha=0.7, color=colors[i], 
                       linewidth=0.8, label=f'Chain {i+1}')
    if true_mean is not None:
        axes[0, 0].axhline(true_mean, color='red', linestyle='--', 
                          linewidth=2, label='True value')
    axes[0, 0].set_title('All Chains - Trace Plot')
    axes[0, 0].set_xlabel('Iteration')
    axes[0, 0].set_ylabel('Value')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)
    
    # 2. チェーン別のヒストグラム
    for i in range(n_chains):
        burnin = n_samples // 4
        axes[0, 1].hist(chains[i, burnin:], bins=30, alpha=0.6, 
                       color=colors[i], density=True, label=f'Chain {i+1}')
    if true_mean is not None:
        axes[0, 1].axvline(true_mean, color='red', linestyle='--', 
                          linewidth=2, label='True value')
    axes[0, 1].set_title('Distribution by Chain')
    axes[0, 1].set_xlabel('Value')
    axes[0, 1].set_ylabel('Density')
    axes[0, 1].legend()
    
    # 3. ランニング平均の収束
    for i in range(n_chains):
        running_mean = np.cumsum(chains[i]) / np.arange(1, n_samples + 1)
        axes[0, 2].plot(running_mean, alpha=0.7, color=colors[i], 
                       linewidth=1, label=f'Chain {i+1}')
    if true_mean is not None:
        axes[0, 2].axhline(true_mean, color='red', linestyle='--', 
                          linewidth=2, label='True value')
    axes[0, 2].set_title('Running Mean Convergence')
    axes[0, 2].set_xlabel('Iteration')
    axes[0, 2].set_ylabel('Running Mean')
    axes[0, 2].legend()
    axes[0, 2].grid(True, alpha=0.3)
    
    # 4. チェーン間・チェーン内分散の変化
    window_size = max(100, n_samples // 20)
    n_windows = n_samples // window_size
    
    between_var = []
    within_var = []
    r_hat_evolution = []
    
    for w in range(1, n_windows + 1):
        end_idx = w * window_size
        window_chains = chains[:, :end_idx]
        
        if end_idx >= 200:  # 十分なサンプルがある場合のみ
            try:
                r_hat_w, _ = gelman_rubin_diagnostic(window_chains, split_chains=False)
                r_hat_evolution.append(r_hat_w)
                
                # チェーン間・内分散の計算
                chain_means = np.mean(window_chains, axis=1)
                chain_vars = np.var(window_chains, axis=1, ddof=1)
                B = end_idx * np.var(chain_means, ddof=1)
                W = np.mean(chain_vars)
                between_var.append(B)
                within_var.append(W)
            except:
                pass
    
    if len(r_hat_evolution) > 0:
        axes[1, 0].plot(range(1, len(r_hat_evolution) + 1), r_hat_evolution, 'b-', linewidth=2)
        axes[1, 0].axhline(1.0, color='green', linestyle='--', label='Perfect convergence')
        axes[1, 0].axhline(1.01, color='orange', linestyle='--', label='Good convergence')
        axes[1, 0].axhline(1.1, color='red', linestyle='--', label='Poor convergence')
        axes[1, 0].set_title('R-hat Evolution')
        axes[1, 0].set_xlabel('Window')
        axes[1, 0].set_ylabel('R-hat')
        axes[1, 0].legend()
        axes[1, 0].grid(True, alpha=0.3)
    
    # 5. 自己相関比較
    for i in range(min(3, n_chains)):  # 最初の3チェーンのみ表示
        burnin = n_samples // 4
        chain_data = chains[i, burnin:]
        if len(chain_data) > 100:
            lags = min(100, len(chain_data) // 4)
            autocorr = acf(chain_data, nlags=lags, fft=True)
            axes[1, 1].plot(autocorr, alpha=0.7, color=colors[i], 
                           label=f'Chain {i+1}')
    axes[1, 1].axhline(0, color='k', linestyle='--', alpha=0.5)
    axes[1, 1].axhline(0.05, color='r', linestyle='--', alpha=0.5, label='5% threshold')
    axes[1, 1].set_title('Autocorrelation Comparison')
    axes[1, 1].set_xlabel('Lag')
    axes[1, 1].set_ylabel('Autocorrelation')
    axes[1, 1].legend()
    axes[1, 1].grid(True, alpha=0.3)
    
    # 6. Rank plot (chains の混合の確認)
    all_samples = chains.flatten()
    ranks = stats.rankdata(all_samples)
    ranks = ranks.reshape(chains.shape)
    
    for i in range(n_chains):
        axes[1, 2].plot(ranks[i], alpha=0.7, color=colors[i], 
                       linewidth=0.8, label=f'Chain {i+1}')
    axes[1, 2].set_title('Rank Plot (Mixing Assessment)')
    axes[1, 2].set_xlabel('Iteration')
    axes[1, 2].set_ylabel('Rank')
    axes[1, 2].legend()
    axes[1, 2].grid(True, alpha=0.3)
    
    plt.suptitle(title, fontsize=16)
    plt.tight_layout()
    plt.show()

# 複数チェーンの分析
print("正規分布の複数チェーン分析:")
plot_multiple_chains(chains_normal, "Normal Distribution - Multiple Chains", "Value", 0.0)

print("多峰性分布の複数チェーン分析:")
plot_multiple_chains(chains_multimodal, "Multimodal Distribution - Multiple Chains", "Value")

### 4.3.2 その他の診断統計量

In [None]:
def comprehensive_diagnostics(chains, parameter_names=None):
    """
    包括的な診断統計量の計算
    """
    chains = np.array(chains)
    if chains.ndim == 2:
        chains = chains[:, :, np.newaxis]
    
    n_chains, n_samples, n_params = chains.shape
    
    if parameter_names is None:
        parameter_names = [f'Parameter {i+1}' for i in range(n_params)]
    
    results = {}
    
    for p in range(n_params):
        param_name = parameter_names[p]
        param_chains = chains[:, :, p]
        
        # 基本統計
        all_samples = param_chains.flatten()
        burnin = n_samples // 4
        clean_samples = param_chains[:, burnin:].flatten()
        
        # 1. Gelman-Rubin統計量
        R_hat, n_eff = gelman_rubin_diagnostic(param_chains)
        
        # 2. Monte Carlo Standard Error (MCSE)
        # MCSE = σ / sqrt(N_eff)
        mcse = np.std(clean_samples, ddof=1) / np.sqrt(n_eff)
        
        # 3. 分位点のMCSE
        def mcse_quantile(samples, q):
            """分位点のMonte Carlo標準誤差"""
            n = len(samples)
            p = q
            # 正規近似を使用
            return np.sqrt(p * (1 - p) / n) / stats.norm.pdf(stats.norm.ppf(p))
        
        mcse_q025 = mcse_quantile(clean_samples, 0.025)
        mcse_q975 = mcse_quantile(clean_samples, 0.975)
        
        # 4. Geweke診断（前半と後半の比較）
        first_10pct = int(0.1 * n_samples)
        last_50pct = int(0.5 * n_samples)
        
        geweke_scores = []
        for chain in range(n_chains):
            first_part = param_chains[chain, :first_10pct]
            last_part = param_chains[chain, -last_50pct:]
            
            if len(first_part) > 10 and len(last_part) > 10:
                mean_diff = np.mean(first_part) - np.mean(last_part)
                
                # スペクトル密度による分散推定（簡易版）
                var_first = np.var(first_part, ddof=1) / len(first_part)
                var_last = np.var(last_part, ddof=1) / len(last_part)
                
                geweke_score = mean_diff / np.sqrt(var_first + var_last)
                geweke_scores.append(geweke_score)
        
        geweke_pvalue = 2 * (1 - stats.norm.cdf(np.abs(np.mean(geweke_scores))))
        
        # 5. Heidelberger-Welch 検定（簡易版）
        # 定常性の検定
        hw_pvalues = []
        for chain in range(n_chains):
            chain_data = param_chains[chain]
            
            # チェーンを複数の窓に分割して平均の違いを検定
            n_windows = 5
            window_size = len(chain_data) // n_windows
            window_means = []
            
            for w in range(n_windows):
                start_idx = w * window_size
                end_idx = (w + 1) * window_size
                if end_idx <= len(chain_data):
                    window_means.append(np.mean(chain_data[start_idx:end_idx]))
            
            if len(window_means) > 2:
                # 一元配置分散分析
                _, p_val = stats.f_oneway(*[chain_data[i*window_size:(i+1)*window_size] 
                                           for i in range(len(window_means))])
                hw_pvalues.append(p_val)
        
        hw_pvalue = np.mean(hw_pvalues) if hw_pvalues else np.nan
        
        # 6. ESS (Effective Sample Size) の詳細計算
        # バッチ法による推定も追加
        batch_sizes = [10, 20, 50, 100]
        batch_ess = []
        
        for batch_size in batch_sizes:
            if len(clean_samples) > batch_size * 10:
                n_batches = len(clean_samples) // batch_size
                batches = clean_samples[:n_batches * batch_size].reshape(n_batches, batch_size)
                batch_means = np.mean(batches, axis=1)
                
                # バッチ平均の分散
                batch_var = np.var(batch_means, ddof=1)
                total_var = np.var(clean_samples, ddof=1)
                
                if batch_var > 0:
                    ess_batch = len(clean_samples) * total_var / (batch_size * batch_var)
                    batch_ess.append(ess_batch)
        
        avg_batch_ess = np.mean(batch_ess) if batch_ess else n_eff
        
        results[param_name] = {
            'mean': np.mean(clean_samples),
            'std': np.std(clean_samples, ddof=1),
            'q025': np.percentile(clean_samples, 2.5),
            'q975': np.percentile(clean_samples, 97.5),
            'R_hat': R_hat,
            'n_eff': n_eff,
            'n_eff_batch': avg_batch_ess,
            'mcse': mcse,
            'mcse_q025': mcse_q025,
            'mcse_q975': mcse_q975,
            'geweke_score': np.mean(geweke_scores) if geweke_scores else np.nan,
            'geweke_pvalue': geweke_pvalue,
            'hw_pvalue': hw_pvalue,
            'n_samples_total': len(all_samples),
            'n_samples_clean': len(clean_samples),
            'efficiency': n_eff / len(clean_samples)
        }
    
    return results

def print_diagnostics_table(diagnostics):
    """
    診断結果の表形式出力
    """
    print("\n" + "="*100)
    print("MCMC診断統計量サマリー")
    print("="*100)
    
    # ヘッダー
    header = f"{'Parameter':<15} {'Mean':<8} {'Std':<8} {'R-hat':<8} {'N_eff':<8} {'MCSE':<8} {'Geweke':<8} {'Status':<12}"
    print(header)
    print("-"*100)
    
    for param_name, stats in diagnostics.items():
        # 収束ステータスの判定
        status = "Good"
        if stats['R_hat'] > 1.1:
            status = "Poor"
        elif stats['R_hat'] > 1.01:
            status = "Caution"
        
        if stats['n_eff'] < 100:
            status += "/Low ESS"
        
        row = (f"{param_name:<15} {stats['mean']:<8.3f} {stats['std']:<8.3f} "
               f"{stats['R_hat']:<8.4f} {stats['n_eff']:<8.0f} {stats['mcse']:<8.4f} "
               f"{stats['geweke_pvalue']:<8.3f} {status:<12}")
        print(row)
    
    print("-"*100)
    print("判定基準:")
    print("  R-hat < 1.01: Good, 1.01-1.1: Caution, > 1.1: Poor")
    print("  Geweke: p-value > 0.05 で定常性仮説を棄却しない")
    print("  N_eff: 有効サンプルサイズ (目安: > 100)")

# 診断統計量の計算と表示
print("診断統計量の計算中...")

# 正規分布の診断
diagnostics_normal = comprehensive_diagnostics(chains_normal, ['Normal'])
print("\n正規分布の診断結果:")
print_diagnostics_table(diagnostics_normal)

# 多峰性分布の診断
diagnostics_multimodal = comprehensive_diagnostics(chains_multimodal, ['Multimodal'])
print("\n多峰性分布の診断結果:")
print_diagnostics_table(diagnostics_multimodal)

## 4.4 実践的な診断手順

実際のMCMC分析で推奨される診断手順をまとめます。

In [None]:
def mcmc_diagnostic_workflow(target_log_pdf, initial_values, n_samples, 
                           step_size=1.0, parameter_names=None, 
                           true_values=None, verbose=True):
    """
    MCMC診断の標準ワークフロー - プロフェッショナル版
    
    この関数は実際の研究や業務で使用できるレベルの
    包括的な診断システムを提供します。
    
    Returns:
    - diagnostics: 診断結果の辞書
    - recommendations: 推奨事項のリスト
    - quality_score: 総合品質スコア (0-100)
    """
    if verbose:
        print("🔧 MCMC診断ワークフロー開始...")
        print(f"   チェーン数: {len(initial_values)}")
        print(f"   サンプル数: {n_samples}")
        print(f"   ステップサイズ: {step_size}")
    
    # Step 1: 複数チェーンの実行
    if verbose:
        print("\n📊 Step 1: 複数チェーンでのサンプリング実行中...")
    
    chains, acceptance_rates = run_multiple_chains(
        target_log_pdf, initial_values, n_samples, step_size
    )
    
    # Step 2: 基本統計の確認
    avg_acceptance = np.mean(acceptance_rates)
    if verbose:
        print(f"\n📈 Step 2: 基本統計の確認")
        print(f"   平均受理率: {avg_acceptance:.3f}")
        print(f"   受理率範囲: {np.min(acceptance_rates):.3f} - {np.max(acceptance_rates):.3f}")
        
        # 受理率の評価
        if 0.4 <= avg_acceptance <= 0.6:
            print("   ✅ 受理率は適切な範囲内です")
        elif 0.2 <= avg_acceptance < 0.4 or 0.6 < avg_acceptance <= 0.8:
            print("   ⚠️  受理率がやや範囲外です")
        else:
            print("   ❌ 受理率が推奨範囲外です")
    
    # Step 3: 視覚的診断
    if verbose:
        print("\n👁️  Step 3: 視覚的診断")
    
    plot_multiple_chains(chains, "Diagnostic Workflow - Visual Inspection", 
                        parameter_name="Parameter", 
                        true_mean=true_values[0] if true_values else None)
    
    # Step 4: 数値診断
    if verbose:
        print("\n🔢 Step 4: 数値診断統計量の計算")
    
    diagnostics = comprehensive_diagnostics(chains, parameter_names)
    print_diagnostics_table(diagnostics)
    
    # Step 5: 品質スコアの計算
    quality_scores = []
    for param_name, stats in diagnostics.items():
        param_score = 100  # 満点から減点方式
        
        # R-hat評価 (40点満点)
        if stats['R_hat'] <= 1.01:
            rhat_score = 40
        elif stats['R_hat'] <= 1.05:
            rhat_score = 30
        elif stats['R_hat'] <= 1.1:
            rhat_score = 15
        else:
            rhat_score = 0
        
        # ESS評価 (30点満点)
        if stats['n_eff'] >= 400:
            ess_score = 30
        elif stats['n_eff'] >= 200:
            ess_score = 20
        elif stats['n_eff'] >= 100:
            ess_score = 10
        else:
            ess_score = 0
        
        # 受理率評価 (20点満点)
        if 0.4 <= avg_acceptance <= 0.6:
            acc_score = 20
        elif 0.2 <= avg_acceptance <= 0.8:
            acc_score = 15
        else:
            acc_score = 5
        
        # MCSE評価 (10点満点)
        relative_mcse = stats['mcse'] / stats['std']
        if relative_mcse <= 0.05:
            mcse_score = 10
        elif relative_mcse <= 0.1:
            mcse_score = 7
        elif relative_mcse <= 0.2:
            mcse_score = 3
        else:
            mcse_score = 0
        
        param_score = rhat_score + ess_score + acc_score + mcse_score
        quality_scores.append(param_score)
    
    overall_quality = np.mean(quality_scores)
    
    # Step 6: 推奨事項の生成（詳細版）
    recommendations = []
    critical_issues = []
    warnings = []
    suggestions = []
    
    for param_name, stats in diagnostics.items():
        # 致命的な問題
        if stats['R_hat'] > 1.1:
            critical_issues.append(f"{param_name}: 収束していません (R-hat = {stats['R_hat']:.4f})")
        
        if stats['n_eff'] < 50:
            critical_issues.append(f"{param_name}: 有効サンプルサイズが極端に小さいです ({stats['n_eff']:.0f})")
        
        # 警告レベル
        if 1.01 < stats['R_hat'] <= 1.1:
            warnings.append(f"{param_name}: R-hat がやや高いです ({stats['R_hat']:.4f})")
        
        if 50 <= stats['n_eff'] < 100:
            warnings.append(f"{param_name}: 有効サンプルサイズが小さいです ({stats['n_eff']:.0f})")
        
        relative_mcse = stats['mcse'] / stats['std']
        if relative_mcse > 0.1:
            warnings.append(f"{param_name}: Monte Carlo誤差が大きいです ({relative_mcse:.3f})")
        
        # 改善提案
        if stats['efficiency'] < 0.1:
            suggestions.append(f"{param_name}: 効率が低いです。より良いパラメタリゼーションを検討してください")
        
        if not np.isnan(stats['geweke_pvalue']) and stats['geweke_pvalue'] < 0.05:
            suggestions.append(f"{param_name}: 非定常性が検出されました。バーンイン期間を延長してください")
    
    # 受理率に関する推奨事項
    if avg_acceptance < 0.2:
        critical_issues.append(f"受理率が極端に低いです ({avg_acceptance:.3f}). ステップサイズを大幅に縮小してください")
    elif avg_acceptance < 0.4:
        warnings.append(f"受理率が低いです ({avg_acceptance:.3f}). ステップサイズを縮小することを検討してください")
    elif avg_acceptance > 0.8:
        warnings.append(f"受理率が高すぎます ({avg_acceptance:.3f}). ステップサイズを拡大してください")
    
    # 総合的な推奨事項
    total_samples = len(initial_values) * n_samples
    total_eff = sum([stats['n_eff'] for stats in diagnostics.values()])
    overall_efficiency = total_eff / total_samples
    
    if overall_efficiency < 0.05:
        critical_issues.append(f"全体的な効率が極端に低いです ({overall_efficiency:.3f}). アルゴリズムの根本的な見直しが必要です")
    elif overall_efficiency < 0.1:
        warnings.append(f"全体的な効率が低いです ({overall_efficiency:.3f}). パラメタリゼーションの改善を検討してください")
    
    # 推奨事項の統合
    recommendations = critical_issues + warnings + suggestions
    
    # Step 7: 結果サマリーの表示
    if verbose:
        print(f"\n🎯 Step 7: 総合評価")
        print(f"   品質スコア: {overall_quality:.1f}/100")
        
        if overall_quality >= 80:
            print("   ✅ 優秀: 診断結果は非常に良好です")
        elif overall_quality >= 60:
            print("   ⚠️  良好: 軽微な改善の余地があります")
        elif overall_quality >= 40:
            print("   ⚠️  注意: いくつかの問題が検出されました")
        else:
            print("   ❌ 問題: 重大な問題が検出されました")
        
        print(f"   全体効率: {overall_efficiency:.3f}")
        
        if recommendations:
            print(f"\n📝 改善提案 ({len(recommendations)}件):")
            for i, rec in enumerate(recommendations, 1):
                if rec in critical_issues:
                    print(f"   🔴 {i}. {rec}")
                elif rec in warnings:
                    print(f"   🟡 {i}. {rec}")
                else:
                    print(f"   🔵 {i}. {rec}")
        else:
            print("\n🎉 素晴らしい! 改善提案はありません。")
    
    return {
        'chains': chains,
        'acceptance_rates': acceptance_rates,
        'diagnostics': diagnostics,
        'recommendations': recommendations,
        'critical_issues': critical_issues,
        'warnings': warnings,
        'suggestions': suggestions,
        'overall_efficiency': overall_efficiency,
        'quality_score': overall_quality,
        'quality_breakdown': {
            'r_hat_component': np.mean([40 if stats['R_hat'] <= 1.01 else 0 for stats in diagnostics.values()]),
            'ess_component': np.mean([30 if stats['n_eff'] >= 400 else 0 for stats in diagnostics.values()]),
            'acceptance_component': 20 if 0.4 <= avg_acceptance <= 0.6 else 0,
            'mcse_component': np.mean([10 if stats['mcse']/stats['std'] <= 0.05 else 0 for stats in diagnostics.values()])
        }
    }

# プロフェッショナル診断ワークフローの実行例
print("=" * 80)
print("🚀 プロフェッショナル診断ワークフロー実行例")
print("=" * 80)

print("\n=== 正規分布での高品質診断 ===")
results_normal_pro = mcmc_diagnostic_workflow(
    target_log_pdf=lambda x: stats.norm.logpdf(x, 0, 1),
    initial_values=[-2, -1, 0, 1, 2],
    n_samples=3000,
    step_size=1.0,
    parameter_names=['Normal'],
    true_values=[0.0]
)

print(f"\n📊 品質内訳:")
for component, score in results_normal_pro['quality_breakdown'].items():
    print(f"   {component}: {score:.1f}")

print("\n" + "="*80)
print("=== 多峰性分布での診断（チャレンジングケース） ===")
results_multimodal_pro = mcmc_diagnostic_workflow(
    target_log_pdf=multimodal_log_pdf,
    initial_values=[-4, -2, 0, 2, 4],
    n_samples=3000,
    step_size=1.5,
    parameter_names=['Multimodal']
)

## 4.5 演習問題

### 問題1：収束診断の実践
以下の困難な分布に対してMCMCを実行し、収束診断を行ってください。

In [None]:
# 問題1: 困難な分布からのサンプリング
def challenging_log_pdf(x):
    """
    挑戦的な分布：高い相関を持つ2変量分布
    """
    if len(x) != 2:
        return -np.inf
    
    # 非常に細長い分布（高相関）
    mu = np.array([0, 0])
    cov = np.array([[1, 0.99], [0.99, 1]])
    
    diff = x - mu
    try:
        chol = np.linalg.cholesky(cov)
        log_det = 2 * np.sum(np.log(np.diag(chol)))
        solve = np.linalg.solve(chol, diff)
        mahalanobis_sq = np.sum(solve**2)
    except np.linalg.LinAlgError:
        return -np.inf
    
    return -0.5 * (2 * np.log(2 * np.pi) + log_det + mahalanobis_sq)

# ここで2変量MHサンプリングを実装し、診断してください
# ヒント：
# 1. 複数の初期値から開始
# 2. 異なるステップサイズを試す
# 3. 収束診断を実行
# 4. 問題点を特定し、改善策を提案

pass  # 学習者が実装

### 問題2：適応的診断
リアルタイムで収束を監視し、自動的に停止条件を判定するシステムを実装してください。

In [None]:
# 問題2: 適応的収束診断
def adaptive_mcmc_with_diagnostics(target_log_pdf, initial_values, 
                                 max_samples=10000, check_interval=500,
                                 r_hat_threshold=1.01, min_eff_samples=1000):
    """
    適応的収束診断付きMCMC
    
    Parameters:
    - target_log_pdf: 目標分布
    - initial_values: 初期値のリスト
    - max_samples: 最大サンプル数
    - check_interval: 診断をチェックする間隔
    - r_hat_threshold: R-hat の収束閾値
    - min_eff_samples: 最小有効サンプル数
    
    実装のヒント:
    1. check_interval ごとに R-hat と有効サンプルサイズを計算
    2. 収束条件を満たしたら早期停止
    3. 収束の履歴をプロット
    """
    # ここに実装してください
    pass  # 学習者が実装

# テスト
# result = adaptive_mcmc_with_diagnostics(
#     lambda x: stats.norm.logpdf(x, 0, 1),
#     [-2, -1, 0, 1, 2]
# )

## まとめ：実践的MCMC診断のマスターガイド

この章では、MCMCの収束診断と性能評価について包括的に学習しました：

### 🔍 重要な診断手法

1. **視覚的診断**：
   - **トレースプロット**：混合と収束の即座の確認
   - **ランニング平均**：収束の安定性を時系列で追跡
   - **密度プロット**：時間窓別の分布変化を検出

2. **数値診断**：
   - **Gelman-Rubin統計量（R-hat）**：< 1.01 で良好、< 1.1で許容範囲
   - **有効サンプルサイズ（ESS）**：> 400 が理想、最低 > 100
   - **Monte Carlo標準誤差（MCSE）**：推定精度の定量的指標
   - **Geweke診断**：定常性の統計的検定

3. **自己相関分析**：
   - **統合自己相関時間**：アルゴリズム効率の根本指標
   - **薄化（thinning）**の効果検証
   - **指数的減衰**フィッティングによる時定数推定

### 📋 実践的な診断手順（チェックリスト）

#### フェーズ1: サンプリング設計
- [ ] **複数チェーン準備**：最低3チェーン、理想的には8-10チェーン
- [ ] **初期値分散**：目標分布の異なる領域から開始
- [ ] **十分なサンプル数**：最低2000、複雑な問題では10000+

#### フェーズ2: リアルタイム監視
- [ ] **定期的チェック**：500-1000サンプルごとに診断実行
- [ ] **早期警告**：R-hat > 1.2で即座にアラート
- [ ] **トレンド監視**：収束指標の時間変化を追跡

#### フェーズ3: 総合診断
- [ ] **視覚的検査**：全トレースプロットを目視確認
- [ ] **数値診断**：R-hat < 1.01、ESS > 400を確認
- [ ] **自己相関分析**：τ_int < 10（目安）
- [ ] **分布比較**：理論値または他手法との一致確認

#### フェーズ4: 品質保証
- [ ] **感度分析**：異なる初期値・パラメータで結果の一貫性確認
- [ ] **再現性テスト**：同じ設定で複数回実行し結果比較
- [ ] **文書化**：診断結果と判断根拠を記録

### ⚠️ 一般的な問題と対策マトリックス

| 症状 | 原因 | 対策 | 緊急度 |
|------|------|------|--------|
| R-hat > 1.1 | 収束不良 | サンプル数増加、初期値変更 | 🔴 高 |
| ESS < 100 | 効率低下 | ステップサイズ調整、アルゴリズム変更 | 🟡 中 |
| 高い自己相関 | 混合不良 | 薄化、ブロック更新 | 🟡 中 |
| モード間移動なし | 多峰性 | 並列焼きなまし、長いチェーン | 🔴 高 |
| 受理率 < 20% | ステップ過大 | ステップサイズ縮小 | 🟢 低 |
| 受理率 > 70% | ステップ過小 | ステップサイズ拡大 | 🟢 低 |

### 🎯 ベストプラクティス

#### 設計段階
- **保守的設定**：問題の複雑さを過小評価しない
- **スケーラビリティ**：パラメータ数増加に備えた設計
- **自動化**：人手による判断ミスを防ぐシステム化

#### 実行段階
- **段階的増加**：短いテストから始めて段階的に長くする
- **並列実行**：計算資源を最大活用
- **中間保存**：長時間計算の中断リスク対策

#### 評価段階
- **複数指標**：単一指標に依存しない総合判断
- **可視化重視**：数値だけでなく必ず視覚的確認
- **外部検証**：理論値や他手法との比較

### 🚀 次のステップ

収束診断をマスターしたあなたは、MCMCを**安全で信頼性の高いツール**として使えるようになりました。

次の章では、これまで学んだ全てのMCMC手法を**実際のベイズ推論問題**に適用し、データサイエンスの実践的課題を解決していきます：

- 実データでの階層ベイズモデル
- 欠損データのあるモデリング
- 予測と不確実性の定量化
- モデル選択と比較

**重要**：完璧な診断は存在しません。しかし、**体系的で一貫した診断手順**により、MCMCの信頼性を大幅に向上させることができます。