# LME銅先物隣月間スプレッド ボラティリティモデリング

## 分析概要
このノートブックでは、隣月間スプレッド（M1-M2, M2-M3, M3-M4）のボラティリティ特性を詳細に分析し、GARCH系モデルによるリスク特性のモデリングを行います。

### 分析目標
- ボラティリティクラスタリングの検出
- GARCHモデルによるボラティリティ予測
- リスク指標（VaR、ES）の算出
- ボラティリティレジームの識別

### 期待される成果
- 各スプレッドのボラティリティ動向の理解
- 動的リスク管理のためのモデル構築
- ボラティリティベース取引戦略の基盤
- ストレステスト用リスクシナリオ

In [ ]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import psycopg2
from sqlalchemy import create_engine
import warnings
from datetime import datetime, timedelta
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
from scipy import stats
import os

# Volatility modeling
from arch import arch_model
from arch.unitroot import ADF
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from sklearn.preprocessing import StandardScaler

# Risk calculation
from scipy.stats import norm, t
from scipy.optimize import minimize

warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

In [ ]:
# Data retrieval and preprocessing
def get_db_connection():
    """Get PostgreSQL database connection"""
    try:
        engine = create_engine('postgresql://Yusuke@localhost:5432/lme_copper_db')
        return engine
    except Exception as e:
        print(f"Database connection error: {e}")
        return None

def load_and_prepare_data():
    """Retrieve spread data and calculate daily returns"""
    engine = get_db_connection()
    
    query = """
    SELECT 
        trade_date,
        contract_month,
        close_price
    FROM lme_copper_futures 
    WHERE contract_month IN (1, 2, 3, 4)
        AND close_price IS NOT NULL
        AND close_price > 0
    ORDER BY trade_date, contract_month
    """
    
    df = pd.read_sql(query, engine)
    df['trade_date'] = pd.to_datetime(df['trade_date'])
    
    # Pivot and calculate spreads
    pivot_df = df.pivot(index='trade_date', columns='contract_month', values='close_price')
    pivot_df.columns = [f'M{int(col)}' for col in pivot_df.columns]
    
    # Calculate spreads
    spreads_df = pd.DataFrame(index=pivot_df.index)
    spreads_df['M1_M2_spread'] = pivot_df['M1'] - pivot_df['M2']
    spreads_df['M2_M3_spread'] = pivot_df['M2'] - pivot_df['M3']
    spreads_df['M3_M4_spread'] = pivot_df['M3'] - pivot_df['M4']
    
    # Calculate daily returns
    spreads_df['M1_M2_return'] = spreads_df['M1_M2_spread'].pct_change()
    spreads_df['M2_M3_return'] = spreads_df['M2_M3_spread'].pct_change()
    spreads_df['M3_M4_return'] = spreads_df['M3_M4_spread'].pct_change()
    
    # Also calculate log differences (log returns)
    spreads_df['M1_M2_log_return'] = np.log(spreads_df['M1_M2_spread'].abs()).diff()
    spreads_df['M2_M3_log_return'] = np.log(spreads_df['M2_M3_spread'].abs()).diff()
    spreads_df['M3_M4_log_return'] = np.log(spreads_df['M3_M4_spread'].abs()).diff()
    
    return spreads_df.dropna()

# Retrieve data
spreads_data = load_and_prepare_data()
print(f"✅ Data retrieval complete: {len(spreads_data):,} records")
print(f"📅 Analysis period: {spreads_data.index.min()} to {spreads_data.index.max()}")

## 1. 基本ボラティリティ分析

In [ ]:
def analyze_basic_volatility(df):
    """Basic volatility analysis"""
    return_columns = ['M1_M2_return', 'M2_M3_return', 'M3_M4_return']
    
    print("📊 Basic Volatility Statistics:")
    print("=" * 50)
    
    volatility_stats = pd.DataFrame()
    
    for col in return_columns:
        spread_name = col.replace('_return', '').replace('_', '-')
        returns = df[col].dropna()
        
        # Basic statistics
        volatility_stats[spread_name] = [
            returns.std() * 100,                    # Daily volatility (%)
            returns.std() * np.sqrt(252) * 100,     # Annualized volatility (%)
            returns.skew(),                         # Skewness
            returns.kurtosis(),                     # Kurtosis (excess kurtosis)
            (returns.abs() > 2*returns.std()).sum(), # Number of 2σ exceedances
            (returns.abs() > 3*returns.std()).sum(), # Number of 3σ exceedances
            len(returns)                            # Valid data count
        ]
    
    volatility_stats.index = [
        'Daily Volatility (%)', 'Annualized Volatility (%)', 'Skewness', 'Kurtosis',
        '2σ Exceedances', '3σ Exceedances', 'Data Count'
    ]
    
    print(volatility_stats.round(4))
    
    # Normality tests
    print("\n🔬 Normality Test Results:")
    print("-" * 40)
    
    for col in return_columns:
        spread_name = col.replace('_return', '').replace('_', '-')
        returns = df[col].dropna()
        
        # Jarque-Bera test
        jb_stat, jb_p = stats.jarque_bera(returns)
        
        # Shapiro-Wilk test (with sample size limit)
        if len(returns) <= 5000:
            sw_stat, sw_p = stats.shapiro(returns)
        else:
            sw_stat, sw_p = stats.shapiro(returns.sample(5000, random_state=42))
        
        print(f"{spread_name}:")
        print(f"  Jarque-Bera: Statistic={jb_stat:.4f}, p-value={jb_p:.4f}")
        print(f"  Shapiro-Wilk: Statistic={sw_stat:.4f}, p-value={sw_p:.4f}")
        print(f"  Result: {'Non-normal distribution' if jb_p < 0.05 else 'Possibly normal distribution'}")
    
    return volatility_stats

# Execute basic volatility analysis
vol_stats = analyze_basic_volatility(spreads_data)

In [ ]:
def plot_volatility_analysis(df):
    """ボラティリティ分析の可視化"""
    return_columns = ['M1_M2_return', 'M2_M3_return', 'M3_M4_return']
    spread_names = ['M1-M2', 'M2-M3', 'M3-M4']
    
    fig = make_subplots(
        rows=3, cols=2,
        subplot_titles=(
            'スプレッドリターン時系列',
            'リターン分布（ヒストグラム）',
            'ローリングボラティリティ（30日）',
            'Q-Qプロット（対正規分布）',
            '絶対リターン自己相関',
            'ボラティリティクラスタリング'
        ),
        vertical_spacing=0.08
    )
    
    colors = ['blue', 'red', 'green']
    
    # 1. Return time series
    for i, (col, name, color) in enumerate(zip(return_columns, spread_names, colors)):
        fig.add_trace(
            go.Scatter(
                x=df.index,
                y=df[col] * 100,  # Convert to percentage
                name=f'{name} リターン',
                line=dict(color=color, width=0.8),
                opacity=0.7
            ),
            row=1, col=1
        )
    
    # 2. Return distribution
    for i, (col, name, color) in enumerate(zip(return_columns, spread_names, colors)):
        fig.add_trace(
            go.Histogram(
                x=df[col] * 100,
                name=f'{name} 分布',
                nbinsx=50,
                opacity=0.7,
                marker_color=color
            ),
            row=1, col=2
        )
    
    # 3. Rolling volatility
    for i, (col, name, color) in enumerate(zip(return_columns, spread_names, colors)):
        rolling_vol = df[col].rolling(window=30).std() * np.sqrt(252) * 100
        fig.add_trace(
            go.Scatter(
                x=df.index,
                y=rolling_vol,
                name=f'{name} ボラティリティ',
                line=dict(color=color, width=1.5)
            ),
            row=2, col=1
        )
    
    # 4. Q-Q plot (M1-M2 only)
    from scipy.stats import probplot
    returns_clean = df['M1_M2_return'].dropna()
    theoretical_q, sample_q = probplot(returns_clean, dist="norm")
    
    fig.add_trace(
        go.Scatter(
            x=theoretical_q,
            y=sample_q,
            mode='markers',
            name='M1-M2 Q-Q',
            marker=dict(color='blue', size=3)
        ),
        row=2, col=2
    )
    
    # Theoretical line
    fig.add_trace(
        go.Scatter(
            x=[theoretical_q.min(), theoretical_q.max()],
            y=[sample_q.min(), sample_q.max()],
            mode='lines',
            name='理論線',
            line=dict(color='red', dash='dash')
        ),
        row=2, col=2
    )
    
    # 5. Absolute return autocorrelation (volatility clustering detection)
    abs_returns = df['M1_M2_return'].abs().dropna()
    lags = range(1, 21)
    autocorrs = [abs_returns.autocorr(lag=lag) for lag in lags]
    
    fig.add_trace(
        go.Bar(
            x=list(lags),
            y=autocorrs,
            name='M1-M2 絶対リターン自己相関',
            marker_color='lightblue'
        ),
        row=3, col=1
    )
    
    # Significance threshold lines (5% level)
    n = len(abs_returns)
    threshold = 1.96 / np.sqrt(n)
    fig.add_hline(y=threshold, line_dash="dash", line_color="red", 
                 line_width=1, row=3, col=1)
    fig.add_hline(y=-threshold, line_dash="dash", line_color="red", 
                 line_width=1, row=3, col=1)
    
    # 6. Volatility clustering visualization
    squared_returns = (df['M1_M2_return'] ** 2).dropna()
    fig.add_trace(
        go.Scatter(
            x=df.index[:len(squared_returns)],
            y=squared_returns * 10000,  # basis points
            name='M1-M2 二乗リターン',
            line=dict(color='purple', width=1),
            fill='tonexty',
            fillcolor='rgba(128,0,128,0.3)'
        ),
        row=3, col=2
    )
    
    fig.update_layout(
        title=dict(
            text="隣月間スプレッド ボラティリティ分析",
            x=0.5,
            font=dict(size=16)
        ),
        height=1000,
        showlegend=True
    )
    
    # Update axis labels
    fig.update_yaxes(title_text="リターン (%)", row=1, col=1)
    fig.update_yaxes(title_text="頻度", row=1, col=2)
    fig.update_xaxes(title_text="リターン (%)", row=1, col=2)
    
    fig.update_yaxes(title_text="年率ボラティリティ (%)", row=2, col=1)
    fig.update_yaxes(title_text="標本分位数", row=2, col=2)
    fig.update_xaxes(title_text="理論分位数", row=2, col=2)
    
    fig.update_yaxes(title_text="自己相関", row=3, col=1)
    fig.update_xaxes(title_text="ラグ", row=3, col=1)
    
    fig.update_yaxes(title_text="二乗リターン (bp)", row=3, col=2)
    fig.update_xaxes(title_text="日付", row=3, col=2)
    
    return fig

# Volatility analysis chart
vol_chart = plot_volatility_analysis(spreads_data)
vol_chart.show()

# Save image
os.makedirs('../generated_images', exist_ok=True)
vol_chart.write_image('../generated_images/adjacent_spreads_volatility_analysis.png', 
                     width=1200, height=1000, scale=2)

## 2. ARCH効果検定

In [None]:
def test_arch_effects(df):
    """ARCH効果（条件付き分散不均一性）の検定"""
    from arch.unitroot import DFGLS
    from statsmodels.stats.diagnostic import het_arch
    
    return_columns = ['M1_M2_return', 'M2_M3_return', 'M3_M4_return']
    spread_names = ['M1-M2', 'M2-M3', 'M3-M4']
    
    print("🔬 ARCH効果検定結果:")
    print("=" * 60)
    
    arch_test_results = {}
    
    for col, name in zip(return_columns, spread_names):
        returns = df[col].dropna() * 100  # パーセント変換
        
        print(f"\n{name}スプレッド:")
        
        # 1. 基本統計量
        print(f"  データ数: {len(returns):,}")
        print(f"  平均: {returns.mean():.4f}%")
        print(f"  標準偏差: {returns.std():.4f}%")
        
        # 2. 定常性検定（ADF）
        adf_result = adfuller(returns)
        print(f"  ADF検定: 統計量={adf_result[0]:.4f}, p値={adf_result[1]:.4f}")
        print(f"    結果: {'定常' if adf_result[1] < 0.05 else '非定常'}")
        
        # 3. Ljung-Box検定（系列相関）
        try:
            lb_result = acorr_ljungbox(returns, lags=10, return_df=True)
            lb_pvalue = lb_result['lb_pvalue'].iloc[-1]  # 10ラグの結果
            print(f"  Ljung-Box検定(10ラグ): p値={lb_pvalue:.4f}")
            print(f"    結果: {'系列相関あり' if lb_pvalue < 0.05 else '系列相関なし'}")
        except Exception as e:
            print(f"  Ljung-Box検定: エラー - {e}")
            lb_pvalue = np.nan
        
        # 4. ARCH-LM検定（条件付き分散不均一性）
        try:
            # 複数のラグで検定
            for lag in [5, 10, 15]:
                lm_stat, lm_pvalue, _, _ = het_arch(returns, nlags=lag)
                print(f"  ARCH-LM検定({lag}ラグ): 統計量={lm_stat:.4f}, p値={lm_pvalue:.4f}")
                if lag == 10:  # 10ラグの結果を保存
                    main_arch_pvalue = lm_pvalue
            
            print(f"    結果: {'ARCH効果あり' if main_arch_pvalue < 0.05 else 'ARCH効果なし'}")
            
        except Exception as e:
            print(f"  ARCH-LM検定: エラー - {e}")
            main_arch_pvalue = np.nan
        
        # 5. 絶対リターンの自己相関検定
        abs_returns = returns.abs()
        try:
            abs_lb_result = acorr_ljungbox(abs_returns, lags=10, return_df=True)
            abs_lb_pvalue = abs_lb_result['lb_pvalue'].iloc[-1]
            print(f"  絶対リターンLjung-Box検定: p値={abs_lb_pvalue:.4f}")
            print(f"    結果: {'ボラティリティクラスタリングあり' if abs_lb_pvalue < 0.05 else 'クラスタリングなし'}")
        except Exception as e:
            print(f"  絶対リターンLjung-Box検定: エラー - {e}")
            abs_lb_pvalue = np.nan
        
        # 結果を保存
        arch_test_results[name] = {
            'data_count': len(returns),
            'mean': returns.mean(),
            'std': returns.std(),
            'adf_statistic': adf_result[0],
            'adf_pvalue': adf_result[1],
            'is_stationary': adf_result[1] < 0.05,
            'ljungbox_pvalue': lb_pvalue,
            'has_serial_correlation': lb_pvalue < 0.05 if not np.isnan(lb_pvalue) else None,
            'arch_lm_pvalue': main_arch_pvalue,
            'has_arch_effects': main_arch_pvalue < 0.05 if not np.isnan(main_arch_pvalue) else None,
            'abs_ljungbox_pvalue': abs_lb_pvalue,
            'has_volatility_clustering': abs_lb_pvalue < 0.05 if not np.isnan(abs_lb_pvalue) else None
        }
    
    return arch_test_results

# ARCH効果検定実行
arch_results = test_arch_effects(spreads_data)

## 3. GARCHモデル推定

In [None]:
def estimate_garch_models(df):
    """GARCH系モデルの推定"""
    return_columns = ['M1_M2_return', 'M2_M3_return', 'M3_M4_return']
    spread_names = ['M1-M2', 'M2-M3', 'M3-M4']
    
    garch_models = {}
    
    print("📈 GARCHモデル推定結果:")
    print("=" * 60)
    
    for col, name in zip(return_columns, spread_names):
        returns = df[col].dropna() * 100  # パーセント変換
        
        print(f"\n{name}スプレッド:")
        
        models_to_estimate = {
            'GARCH(1,1)': arch_model(returns, vol='Garch', p=1, q=1),
            'EGARCH(1,1)': arch_model(returns, vol='EGARCH', p=1, q=1),
            'GJR-GARCH(1,1)': arch_model(returns, vol='GARCH', p=1, o=1, q=1)
        }
        
        model_results = {}
        
        for model_name, model in models_to_estimate.items():
            try:
                # モデル推定
                result = model.fit(disp='off', show_warning=False)
                
                # AIC/BIC計算
                aic = result.aic
                bic = result.bic
                log_likelihood = result.loglikelihood
                
                print(f"  {model_name}:")
                print(f"    対数尤度: {log_likelihood:.4f}")
                print(f"    AIC: {aic:.4f}")
                print(f"    BIC: {bic:.4f}")
                
                # 条件付きボラティリティ
                conditional_volatility = result.conditional_volatility
                
                model_results[model_name] = {
                    'model': result,
                    'aic': aic,
                    'bic': bic,
                    'log_likelihood': log_likelihood,
                    'conditional_volatility': conditional_volatility,
                    'residuals': result.resid,
                    'standardized_residuals': result.std_resid
                }
                
            except Exception as e:
                print(f"  {model_name}: 推定エラー - {e}")
                continue
        
        # 最適モデル選択（AIC基準）
        if model_results:
            best_model_name = min(model_results.keys(), key=lambda x: model_results[x]['aic'])
            print(f"  \n  📊 最適モデル（AIC基準）: {best_model_name}")
            
            garch_models[name] = {
                'all_models': model_results,
                'best_model_name': best_model_name,
                'best_model': model_results[best_model_name]
            }
    
    return garch_models

# GARCHモデル推定
garch_models = estimate_garch_models(spreads_data)

In [ ]:
def plot_garch_results(garch_models, spreads_data):
    """GARCHモデル結果の可視化"""
    return_columns = ['M1_M2_return', 'M2_M3_return', 'M3_M4_return']
    spread_names = ['M1-M2', 'M2-M3', 'M3-M4']
    
    fig = make_subplots(
        rows=3, cols=2,
        subplot_titles=(
            'M1-M2: リターンと条件付きボラティリティ',
            'M1-M2: 標準化残差',
            'M2-M3: リターンと条件付きボラティリティ',
            'M2-M3: 標準化残差',
            'M3-M4: リターンと条件付きボラティリティ',
            'M3-M4: 標準化残差'
        ),
        vertical_spacing=0.08
    )
    
    colors = ['blue', 'red', 'green']
    
    for i, (col, name, color) in enumerate(zip(return_columns, spread_names, colors)):
        if name in garch_models:
            best_model = garch_models[name]['best_model']
            
            returns = spreads_data[col].dropna() * 100
            cond_vol = best_model['conditional_volatility']
            std_resid = best_model['standardized_residuals']
            
            # Returns and conditional volatility
            fig.add_trace(
                go.Scatter(
                    x=returns.index,
                    y=returns,
                    name=f'{name} リターン',
                    line=dict(color=color, width=0.8),
                    opacity=0.7
                ),
                row=i+1, col=1
            )
            
            # ±2σ bands of conditional volatility
            fig.add_trace(
                go.Scatter(
                    x=cond_vol.index,
                    y=2 * cond_vol,
                    name=f'{name} +2σ',
                    line=dict(color='red', dash='dash', width=1),
                    showlegend=False
                ),
                row=i+1, col=1
            )
            
            fig.add_trace(
                go.Scatter(
                    x=cond_vol.index,
                    y=-2 * cond_vol,
                    name=f'{name} -2σ',
                    line=dict(color='red', dash='dash', width=1),
                    fill='tonexty',
                    fillcolor='rgba(255,0,0,0.1)',
                    showlegend=False
                ),
                row=i+1, col=1
            )
            
            # Standardized residuals
            fig.add_trace(
                go.Scatter(
                    x=std_resid.index,
                    y=std_resid,
                    name=f'{name} 標準化残差',
                    line=dict(color='purple', width=0.8),
                    mode='markers',
                    marker=dict(size=2)
                ),
                row=i+1, col=2
            )
            
            # ±2σ lines
            fig.add_hline(y=2, line_dash="dash", line_color="red", 
                         line_width=1, row=i+1, col=2)
            fig.add_hline(y=-2, line_dash="dash", line_color="red", 
                         line_width=1, row=i+1, col=2)
            fig.add_hline(y=0, line_dash="dot", line_color="black", 
                         line_width=1, row=i+1, col=2)
    
    fig.update_layout(
        title=dict(
            text="GARCHモデル推定結果",
            x=0.5,
            font=dict(size=16)
        ),
        height=1000,
        showlegend=True
    )
    
    # Update axis labels
    for i in range(3):
        fig.update_yaxes(title_text="リターン (%)", row=i+1, col=1)
        fig.update_yaxes(title_text="標準化残差", row=i+1, col=2)
        if i == 2:
            fig.update_xaxes(title_text="日付", row=i+1, col=1)
            fig.update_xaxes(title_text="日付", row=i+1, col=2)
    
    return fig

# Visualization of GARCH model results
garch_chart = plot_garch_results(garch_models, spreads_data)
garch_chart.show()

# Save image
garch_chart.write_image('../generated_images/adjacent_spreads_garch_models.png', 
                       width=1200, height=1000, scale=2)

## 4. リスク指標計算

In [None]:
def calculate_risk_metrics(garch_models, spreads_data):
    """VaR、Expected Shortfall等のリスク指標計算"""
    return_columns = ['M1_M2_return', 'M2_M3_return', 'M3_M4_return']
    spread_names = ['M1-M2', 'M2-M3', 'M3-M4']
    confidence_levels = [0.95, 0.99, 0.995]
    
    print("⚠️ リスク指標計算結果:")
    print("=" * 60)
    
    risk_metrics = {}
    
    for col, name in zip(return_columns, spread_names):
        if name not in garch_models:
            continue
            
        returns = spreads_data[col].dropna() * 100
        best_model = garch_models[name]['best_model']
        cond_vol = best_model['conditional_volatility']
        std_resid = best_model['standardized_residuals']
        
        print(f"\n{name}スプレッド:")
        
        current_volatility = cond_vol.iloc[-1]  # 最新の条件付きボラティリティ
        print(f"  最新条件付きボラティリティ: {current_volatility:.4f}%")
        
        spread_risk_metrics = {
            'current_volatility': current_volatility,
            'conditional_volatility': cond_vol,
            'var': {},
            'expected_shortfall': {},
            'parametric_var': {},
            'historical_var': {}
        }
        
        # 各信頼水準でリスク指標を計算
        for alpha in confidence_levels:
            print(f"\n  信頼水準 {alpha*100:.1f}%:")
            
            # 1. ヒストリカルVaR
            historical_var = np.percentile(returns, (1-alpha)*100)
            
            # 2. パラメトリックVaR（正規分布仮定）
            parametric_var = norm.ppf(1-alpha) * current_volatility
            
            # 3. GARCH-VaR（t分布仮定）
            # 標準化残差の分布をt分布でフィット
            try:
                clean_std_resid = std_resid.dropna()
                df_fitted, loc_fitted, scale_fitted = stats.t.fit(clean_std_resid)
                garch_var = t.ppf(1-alpha, df=df_fitted, loc=loc_fitted, scale=scale_fitted) * current_volatility
            except:
                garch_var = parametric_var  # フィットに失敗した場合は正規分布を使用
            
            # 4. Expected Shortfall (CVaR)
            var_threshold = historical_var
            tail_returns = returns[returns <= var_threshold]
            if len(tail_returns) > 0:
                expected_shortfall = tail_returns.mean()
            else:
                expected_shortfall = historical_var
            
            print(f"    ヒストリカルVaR: {historical_var:.4f}%")
            print(f"    パラメトリックVaR: {parametric_var:.4f}%")
            print(f"    GARCH-VaR: {garch_var:.4f}%")
            print(f"    Expected Shortfall: {expected_shortfall:.4f}%")
            
            # 結果を保存
            alpha_key = f'{alpha*100:.1f}%'
            spread_risk_metrics['historical_var'][alpha_key] = historical_var
            spread_risk_metrics['parametric_var'][alpha_key] = parametric_var
            spread_risk_metrics['var'][alpha_key] = garch_var
            spread_risk_metrics['expected_shortfall'][alpha_key] = expected_shortfall
        
        # 追加のリスク指標
        max_drawdown = calculate_max_drawdown(returns)
        avg_vol_30d = cond_vol.rolling(window=30).mean().iloc[-1]
        vol_of_vol = cond_vol.rolling(window=30).std().iloc[-1]
        
        print(f"\n  追加リスク指標:")
        print(f"    最大ドローダウン: {max_drawdown:.4f}%")
        print(f"    30日平均ボラティリティ: {avg_vol_30d:.4f}%")
        print(f"    ボラティリティのボラティリティ: {vol_of_vol:.4f}%")
        
        spread_risk_metrics.update({
            'max_drawdown': max_drawdown,
            'avg_volatility_30d': avg_vol_30d,
            'volatility_of_volatility': vol_of_vol
        })
        
        risk_metrics[name] = spread_risk_metrics
    
    return risk_metrics

def calculate_max_drawdown(returns):
    """最大ドローダウンの計算"""
    cumulative = (1 + returns/100).cumprod()
    rolling_max = cumulative.expanding().max()
    drawdown = (cumulative - rolling_max) / rolling_max * 100
    return drawdown.min()

# リスク指標計算
risk_metrics = calculate_risk_metrics(garch_models, spreads_data)

In [ ]:
def plot_risk_metrics(risk_metrics, spreads_data):
    """リスク指標の可視化"""
    return_columns = ['M1_M2_return', 'M2_M3_return', 'M3_M4_return']
    spread_names = ['M1-M2', 'M2-M3', 'M3-M4']
    
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=(
            'VaR比較（95%信頼水準）',
            '条件付きボラティリティ推移',
            'VaRバックテスト（ヒストリカルVaR vs 実績）',
            'リスク指標サマリー'
        )
    )
    
    colors = ['blue', 'red', 'green']
    
    # 1. VaR comparison
    var_types = ['ヒストリカルVaR', 'パラメトリックVaR', 'GARCH-VaR']
    var_keys = ['historical_var', 'parametric_var', 'var']
    
    x_pos = np.arange(len(spread_names))
    bar_width = 0.25
    
    for i, (var_type, var_key) in enumerate(zip(var_types, var_keys)):
        var_values = []
        for name in spread_names:
            if name in risk_metrics:
                var_values.append(abs(risk_metrics[name][var_key]['95.0%']))
            else:
                var_values.append(0)
        
        fig.add_trace(
            go.Bar(
                x=[f"{name}<br>({var_type})" for name in spread_names],
                y=var_values,
                name=var_type,
                marker_color=colors[i],
                opacity=0.8
            ),
            row=1, col=1
        )
    
    # 2. Conditional volatility evolution
    for i, (name, color) in enumerate(zip(spread_names, colors)):
        if name in risk_metrics:
            cond_vol = risk_metrics[name]['conditional_volatility']
            fig.add_trace(
                go.Scatter(
                    x=cond_vol.index,
                    y=cond_vol,
                    name=f'{name} 条件付きボラティリティ',
                    line=dict(color=color, width=1.5)
                ),
                row=1, col=2
            )
    
    # 3. VaR backtest (M1-M2 only)
    if 'M1-M2' in risk_metrics:
        returns = spreads_data['M1_M2_return'].dropna() * 100
        historical_var_95 = risk_metrics['M1-M2']['historical_var']['95.0%']
        
        fig.add_trace(
            go.Scatter(
                x=returns.index,
                y=returns,
                name='M1-M2 リターン',
                line=dict(color='blue', width=0.8),
                opacity=0.7
            ),
            row=2, col=1
        )
        
        # VaR line
        fig.add_hline(y=historical_var_95, line_dash="dash", line_color="red", 
                     line_width=2, row=2, col=1)
        
        # Highlight VaR violations
        var_violations = returns[returns < historical_var_95]
        if len(var_violations) > 0:
            fig.add_trace(
                go.Scatter(
                    x=var_violations.index,
                    y=var_violations,
                    mode='markers',
                    name='VaR違反',
                    marker=dict(color='red', size=6, symbol='x')
                ),
                row=2, col=1
            )
        
        # VaR violation rate
        violation_rate = len(var_violations) / len(returns) * 100
        expected_rate = 5.0  # Expected violation rate for 95% VaR
        
    # 4. Risk metrics summary (table)
    summary_data = []
    for name in spread_names:
        if name in risk_metrics:
            metrics = risk_metrics[name]
            summary_data.append([
                name,
                f"{metrics['current_volatility']:.3f}%",
                f"{abs(metrics['var']['95.0%']):.3f}%",
                f"{abs(metrics['expected_shortfall']['95.0%']):.3f}%",
                f"{metrics['max_drawdown']:.3f}%"
            ])
    
    fig.add_trace(
        go.Table(
            header=dict(
                values=['スプレッド', '現在ボラティリティ', '95% VaR', '95% ES', '最大DD'],
                fill_color='lightblue',
                font=dict(size=12)
            ),
            cells=dict(
                values=list(zip(*summary_data)) if summary_data else [[], [], [], [], []],
                fill_color='white',
                font=dict(size=11)
            )
        ),
        row=2, col=2
    )
    
    fig.update_layout(
        title=dict(
            text="リスク指標分析",
            x=0.5,
            font=dict(size=16)
        ),
        height=800,
        showlegend=True
    )
    
    # Update axis labels
    fig.update_yaxes(title_text="VaR (%)", row=1, col=1)
    fig.update_yaxes(title_text="ボラティリティ (%)", row=1, col=2)
    fig.update_xaxes(title_text="日付", row=1, col=2)
    
    fig.update_yaxes(title_text="リターン (%)", row=2, col=1)
    fig.update_xaxes(title_text="日付", row=2, col=1)
    
    return fig

# Visualization of risk metrics
risk_chart = plot_risk_metrics(risk_metrics, spreads_data)
risk_chart.show()

# Save image
risk_chart.write_image('../generated_images/adjacent_spreads_risk_metrics.png', 
                      width=1200, height=800, scale=2)

## 5. ボラティリティレジーム分析

In [None]:
def analyze_volatility_regimes(risk_metrics, spreads_data):
    """ボラティリティレジーム分析"""
    print("🎭 ボラティリティレジーム分析:")
    print("=" * 60)
    
    regime_analysis = {}
    
    for name in ['M1-M2', 'M2-M3', 'M3-M4']:
        if name not in risk_metrics:
            continue
            
        cond_vol = risk_metrics[name]['conditional_volatility']
        
        print(f"\n{name}スプレッド:")
        
        # ボラティリティの分位数でレジーム分類
        vol_25 = cond_vol.quantile(0.25)
        vol_50 = cond_vol.quantile(0.50)
        vol_75 = cond_vol.quantile(0.75)
        vol_95 = cond_vol.quantile(0.95)
        
        print(f"  ボラティリティ分位数:")
        print(f"    25%分位: {vol_25:.4f}%")
        print(f"    50%分位: {vol_50:.4f}%")
        print(f"    75%分位: {vol_75:.4f}%")
        print(f"    95%分位: {vol_95:.4f}%")
        
        # レジーム分類
        regimes = pd.Series(index=cond_vol.index, dtype='object')
        regimes[cond_vol <= vol_25] = 'Low'
        regimes[(cond_vol > vol_25) & (cond_vol <= vol_50)] = 'Medium-Low'
        regimes[(cond_vol > vol_50) & (cond_vol <= vol_75)] = 'Medium-High'
        regimes[cond_vol > vol_75] = 'High'
        
        # レジーム統計
        regime_stats = regimes.value_counts(normalize=True) * 100
        
        print(f"\n  レジーム分布:")
        for regime, percentage in regime_stats.items():
            print(f"    {regime}: {percentage:.1f}%")
        
        # レジーム転換の分析
        regime_changes = (regimes != regimes.shift(1)).sum()
        avg_regime_duration = len(regimes) / regime_changes if regime_changes > 0 else len(regimes)
        
        print(f"\n  レジーム転換:")
        print(f"    転換回数: {regime_changes}")
        print(f"    平均継続期間: {avg_regime_duration:.1f}日")
        
        # 各レジームでのリターン統計
        return_col = f"{name.replace('-', '_')}_return"
        if return_col in spreads_data.columns:
            returns = spreads_data[return_col] * 100
            
            print(f"\n  レジーム別リターン統計:")
            for regime in ['Low', 'Medium-Low', 'Medium-High', 'High']:
                regime_returns = returns[regimes == regime]
                if len(regime_returns) > 0:
                    print(f"    {regime}:")
                    print(f"      平均リターン: {regime_returns.mean():.4f}%")
                    print(f"      リターン標準偏差: {regime_returns.std():.4f}%")
                    print(f"      最大損失: {regime_returns.min():.4f}%")
        
        regime_analysis[name] = {
            'conditional_volatility': cond_vol,
            'regimes': regimes,
            'quantiles': {
                '25%': vol_25,
                '50%': vol_50,
                '75%': vol_75,
                '95%': vol_95
            },
            'regime_distribution': regime_stats,
            'regime_changes': regime_changes,
            'avg_duration': avg_regime_duration
        }
    
    return regime_analysis

# ボラティリティレジーム分析実行
regime_analysis = analyze_volatility_regimes(risk_metrics, spreads_data)

In [ ]:
def plot_volatility_regimes(regime_analysis):
    """ボラティリティレジームの可視化"""
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=(
            'M1-M2: ボラティリティレジーム',
            'レジーム分布',
            'M2-M3: ボラティリティレジーム',
            'レジーム転換頻度'
        )
    )
    
    regime_colors = {
        'Low': 'green',
        'Medium-Low': 'yellow',
        'Medium-High': 'orange', 
        'High': 'red'
    }
    
    # 1. M1-M2 volatility regimes
    if 'M1-M2' in regime_analysis:
        m1m2_data = regime_analysis['M1-M2']
        cond_vol = m1m2_data['conditional_volatility']
        regimes = m1m2_data['regimes']
        
        # Volatility line
        fig.add_trace(
            go.Scatter(
                x=cond_vol.index,
                y=cond_vol,
                name='M1-M2 ボラティリティ',
                line=dict(color='blue', width=1),
                showlegend=False
            ),
            row=1, col=1
        )
        
        # Regime boundary lines
        quantiles = m1m2_data['quantiles']
        for label, value in quantiles.items():
            fig.add_hline(y=value, line_dash="dash", line_color="gray", 
                         line_width=1, row=1, col=1)
        
        # Regime color mapping
        for regime, color in regime_colors.items():
            regime_mask = regimes == regime
            if regime_mask.any():
                regime_vol = cond_vol[regime_mask]
                fig.add_trace(
                    go.Scatter(
                        x=regime_vol.index,
                        y=regime_vol,
                        mode='markers',
                        name=f'{regime} レジーム',
                        marker=dict(color=color, size=3, opacity=0.7)
                    ),
                    row=1, col=1
                )
    
    # 2. Regime distribution
    all_regime_dist = []
    spread_labels = []
    
    for name in ['M1-M2', 'M2-M3', 'M3-M4']:
        if name in regime_analysis:
            regime_dist = regime_analysis[name]['regime_distribution']
            for regime in ['Low', 'Medium-Low', 'Medium-High', 'High']:
                if regime in regime_dist:
                    all_regime_dist.append(regime_dist[regime])
                    spread_labels.append(f'{name}<br>{regime}')
                else:
                    all_regime_dist.append(0)
                    spread_labels.append(f'{name}<br>{regime}')
    
    # Grouped bar chart for regime distribution
    x_pos = 0
    for name in ['M1-M2', 'M2-M3', 'M3-M4']:
        if name in regime_analysis:
            regime_dist = regime_analysis[name]['regime_distribution']
            for regime in ['Low', 'Medium-Low', 'Medium-High', 'High']:
                value = regime_dist.get(regime, 0)
                fig.add_trace(
                    go.Bar(
                        x=[f'{name}<br>{regime}'],
                        y=[value],
                        name=regime,
                        marker_color=regime_colors[regime],
                        showlegend=(name == 'M1-M2')  # Show legend only for first group
                    ),
                    row=1, col=2
                )
    
    # 3. M2-M3 volatility regimes
    if 'M2-M3' in regime_analysis:
        m2m3_data = regime_analysis['M2-M3']
        cond_vol = m2m3_data['conditional_volatility']
        regimes = m2m3_data['regimes']
        
        fig.add_trace(
            go.Scatter(
                x=cond_vol.index,
                y=cond_vol,
                name='M2-M3 ボラティリティ',
                line=dict(color='red', width=1),
                showlegend=False
            ),
            row=2, col=1
        )
        
        # Regime boundary lines
        quantiles = m2m3_data['quantiles']
        for label, value in quantiles.items():
            fig.add_hline(y=value, line_dash="dash", line_color="gray", 
                         line_width=1, row=2, col=1)
    
    # 4. Regime transition frequency
    regime_changes = []
    avg_durations = []
    spread_names = []
    
    for name in ['M1-M2', 'M2-M3', 'M3-M4']:
        if name in regime_analysis:
            regime_changes.append(regime_analysis[name]['regime_changes'])
            avg_durations.append(regime_analysis[name]['avg_duration'])
            spread_names.append(name)
    
    fig.add_trace(
        go.Bar(
            x=spread_names,
            y=regime_changes,
            name='レジーム転換回数',
            marker_color='lightblue',
            yaxis='y',
            offsetgroup=1
        ),
        row=2, col=2
    )
    
    fig.add_trace(
        go.Scatter(
            x=spread_names,
            y=avg_durations,
            name='平均継続期間（日）',
            mode='lines+markers',
            line=dict(color='red', width=2),
            marker=dict(size=8),
            yaxis='y2'
        ),
        row=2, col=2
    )
    
    fig.update_layout(
        title=dict(
            text="ボラティリティレジーム分析",
            x=0.5,
            font=dict(size=16)
        ),
        height=800,
        showlegend=True
    )
    
    # Update axis labels
    fig.update_yaxes(title_text="ボラティリティ (%)", row=1, col=1)
    fig.update_yaxes(title_text="比率 (%)", row=1, col=2)
    fig.update_yaxes(title_text="ボラティリティ (%)", row=2, col=1)
    fig.update_yaxes(title_text="転換回数", row=2, col=2)
    
    fig.update_xaxes(title_text="日付", row=1, col=1)
    fig.update_xaxes(title_text="日付", row=2, col=1)
    
    # Secondary axis settings
    fig.update_layout(
        yaxis4=dict(
            title="平均継続期間（日）",
            overlaying="y3",
            side="right"
        )
    )
    
    return fig

# Visualization of volatility regimes
regime_chart = plot_volatility_regimes(regime_analysis)
regime_chart.show()

# Save image
regime_chart.write_image('../generated_images/adjacent_spreads_volatility_regimes.png', 
                        width=1200, height=800, scale=2)

## 6. 分析結果サマリー

In [None]:
# ボラティリティモデリング分析サマリー
def generate_volatility_modeling_summary(vol_stats, arch_results, garch_models, 
                                        risk_metrics, regime_analysis):
    """ボラティリティモデリング分析の包括的サマリー"""
    
    print("📋 隣月間スプレッド ボラティリティモデリング分析サマリー")
    print("=" * 70)
    
    print(f"\n📊 基本ボラティリティ特性:")
    for spread in ['M1-M2', 'M2-M3', 'M3-M4']:
        if spread in vol_stats.columns:
            daily_vol = vol_stats.loc['日次ボラティリティ(%)', spread]
            annual_vol = vol_stats.loc['年率ボラティリティ(%)', spread]
            skewness = vol_stats.loc['歪度', spread]
            kurtosis = vol_stats.loc['尖度', spread]
            
            print(f"  {spread}:")
            print(f"    日次ボラティリティ: {daily_vol:.3f}%")
            print(f"    年率ボラティリティ: {annual_vol:.1f}%")
            print(f"    歪度: {skewness:.3f} ({'右歪み' if skewness > 0 else '左歪み' if skewness < 0 else '対称'})")
            print(f"    尖度: {kurtosis:.3f} ({'尖鋭' if kurtosis > 0 else '平坦'})")
    
    print(f"\n🔬 ARCH効果検定:")
    arch_detected = 0
    vol_clustering_detected = 0
    
    for spread in ['M1-M2', 'M2-M3', 'M3-M4']:
        if spread in arch_results:
            has_arch = arch_results[spread]['has_arch_effects']
            has_clustering = arch_results[spread]['has_volatility_clustering']
            
            print(f"  {spread}:")
            print(f"    ARCH効果: {'あり' if has_arch else 'なし'}")
            print(f"    ボラティリティクラスタリング: {'あり' if has_clustering else 'なし'}")
            
            if has_arch:
                arch_detected += 1
            if has_clustering:
                vol_clustering_detected += 1
    
    print(f"\n📈 GARCHモデル選択:")
    best_models = {}
    for spread in ['M1-M2', 'M2-M3', 'M3-M4']:
        if spread in garch_models:
            best_model_name = garch_models[spread]['best_model_name']
            best_aic = garch_models[spread]['best_model']['aic']
            
            print(f"  {spread}: {best_model_name} (AIC: {best_aic:.2f})")
            best_models[spread] = best_model_name
    
    print(f"\n⚠️ リスク指標（95%信頼水準）:")
    for spread in ['M1-M2', 'M2-M3', 'M3-M4']:
        if spread in risk_metrics:
            current_vol = risk_metrics[spread]['current_volatility']
            var_95 = abs(risk_metrics[spread]['var']['95.0%'])
            es_95 = abs(risk_metrics[spread]['expected_shortfall']['95.0%'])
            max_dd = risk_metrics[spread]['max_drawdown']
            
            print(f"  {spread}:")
            print(f"    現在ボラティリティ: {current_vol:.3f}%")
            print(f"    95% VaR: {var_95:.3f}%")
            print(f"    95% ES: {es_95:.3f}%")
            print(f"    最大ドローダウン: {max_dd:.3f}%")
    
    print(f"\n🎭 ボラティリティレジーム:")
    for spread in ['M1-M2', 'M2-M3', 'M3-M4']:
        if spread in regime_analysis:
            regime_changes = regime_analysis[spread]['regime_changes']
            avg_duration = regime_analysis[spread]['avg_duration']
            high_vol_ratio = regime_analysis[spread]['regime_distribution'].get('High', 0)
            
            print(f"  {spread}:")
            print(f"    レジーム転換: {regime_changes}回")
            print(f"    平均継続期間: {avg_duration:.1f}日")
            print(f"    高ボラティリティ期間: {high_vol_ratio:.1f}%")
    
    print(f"\n💡 投資・リスク管理への示唆:")
    
    # ARCH効果に基づく示唆
    if arch_detected >= 2:
        print(f"  • {arch_detected}/3のスプレッドでARCH効果検出 → GARCHモデルによる動的リスク管理が有効")
    
    if vol_clustering_detected >= 2:
        print(f"  • ボラティリティクラスタリング確認 → 高ボラ期間での慎重なポジション管理必要")
    
    # 最もリスキーなスプレッドを特定
    if risk_metrics:
        highest_var_spread = max(risk_metrics.keys(), 
                               key=lambda x: abs(risk_metrics[x]['var']['95.0%']))
        highest_var = abs(risk_metrics[highest_var_spread]['var']['95.0%'])
        print(f"  • 最高リスクスプレッド: {highest_var_spread} (95% VaR: {highest_var:.3f}%)")
    
    # レジーム分析に基づく示唆
    if regime_analysis:
        unstable_spreads = []
        for spread, data in regime_analysis.items():
            if data['regime_changes'] > len(data['conditional_volatility']) / 50:  # 頻繁な転換
                unstable_spreads.append(spread)
        
        if unstable_spreads:
            print(f"  • レジーム不安定: {', '.join(unstable_spreads)} → 動的ヘッジ戦略推奨")
    
    print(f"\n🛡️ リスク管理推奨事項:")
    print(f"  • GARCHモデルベースの動的VaR管理")
    print(f"  • ボラティリティレジーム応答型ポジションサイジング")
    print(f"  • 高ボラティリティ期間での損切り基準厳格化")
    print(f"  • Expected Shortfallによる極端リスクシナリオ管理")
    
    return {
        'arch_effects_detected': arch_detected,
        'volatility_clustering_detected': vol_clustering_detected,
        'best_models': best_models,
        'highest_risk_spread': highest_var_spread if risk_metrics else None,
        'regime_instability': unstable_spreads if regime_analysis else []
    }

# サマリー生成
volatility_summary = generate_volatility_modeling_summary(
    vol_stats, arch_results, garch_models, risk_metrics, regime_analysis
)

In [None]:
# 分析結果の保存
def save_volatility_analysis_results(vol_stats, garch_models, risk_metrics, 
                                    regime_analysis, volatility_summary):
    """ボラティリティ分析結果をファイルに保存"""
    
    # 出力ディレクトリ作成
    os.makedirs('../analysis_results/adjacent_spreads', exist_ok=True)
    
    # 1. 基本ボラティリティ統計
    vol_stats.to_csv('../analysis_results/adjacent_spreads/volatility_statistics.csv', 
                     encoding='utf-8-sig')
    
    # 2. GARCH条件付きボラティリティ
    garch_volatilities = pd.DataFrame()
    for spread, models in garch_models.items():
        best_model = models['best_model']
        garch_volatilities[f'{spread}_volatility'] = best_model['conditional_volatility']
    
    if not garch_volatilities.empty:
        garch_volatilities.to_csv('../analysis_results/adjacent_spreads/garch_conditional_volatility.csv', 
                                 encoding='utf-8-sig')
    
    # 3. VaRとリスク指標
    risk_summary = pd.DataFrame()
    for spread, metrics in risk_metrics.items():
        risk_summary[f'{spread}_current_vol'] = [metrics['current_volatility']]
        risk_summary[f'{spread}_95_var'] = [abs(metrics['var']['95.0%'])]
        risk_summary[f'{spread}_95_es'] = [abs(metrics['expected_shortfall']['95.0%'])]
        risk_summary[f'{spread}_max_drawdown'] = [metrics['max_drawdown']]
    
    if not risk_summary.empty:
        risk_summary.to_csv('../analysis_results/adjacent_spreads/risk_metrics_summary.csv', 
                           encoding='utf-8-sig', index=False)
    
    # 4. ボラティリティレジーム
    regime_data = pd.DataFrame()
    for spread, analysis in regime_analysis.items():
        regime_data[f'{spread}_regime'] = analysis['regimes']
        regime_data[f'{spread}_volatility'] = analysis['conditional_volatility']
    
    if not regime_data.empty:
        regime_data.to_csv('../analysis_results/adjacent_spreads/volatility_regimes.csv', 
                          encoding='utf-8-sig')
    
    # 5. 分析サマリー（JSON）
    import json
    
    with open('../analysis_results/adjacent_spreads/volatility_modeling_summary.json', 
              'w', encoding='utf-8') as f:
        json.dump(volatility_summary, f, ensure_ascii=False, indent=2)
    
    print(f"\n💾 ボラティリティ分析結果を保存しました:")
    print(f"  📊 基本統計: ../analysis_results/adjacent_spreads/volatility_statistics.csv")
    print(f"  📈 GARCH条件付きボラティリティ: ../analysis_results/adjacent_spreads/garch_conditional_volatility.csv")
    print(f"  ⚠️ リスク指標: ../analysis_results/adjacent_spreads/risk_metrics_summary.csv")
    print(f"  🎭 ボラティリティレジーム: ../analysis_results/adjacent_spreads/volatility_regimes.csv")
    print(f"  📝 分析サマリー: ../analysis_results/adjacent_spreads/volatility_modeling_summary.json")

# 分析結果保存
save_volatility_analysis_results(
    vol_stats, garch_models, risk_metrics, regime_analysis, volatility_summary
)

## 次のステップ

このボラティリティモデリング分析により、隣月間スプレッドのリスク特性について詳細な洞察を得ることができました。

### 主要な発見事項
1. **ARCH効果**: 複数のスプレッドで条件付き分散不均一性を確認
2. **ボラティリティクラスタリング**: 高ボラティリティ期間と低ボラティリティ期間の明確な区分
3. **GARCHモデル**: 各スプレッドに最適なボラティリティモデルを特定
4. **リスク指標**: VaR/ESによる定量的リスク評価
5. **レジーム分析**: ボラティリティ環境の周期的変化

### 今後の分析ステップ
1. **機械学習予測**: 高度なパターン認識と予測モデル
2. **トレーディング戦略開発**: ボラティリティベースの取引戦略
3. **ダイナミックヘッジ**: レジーム応答型リスク管理
4. **バックテスト**: 取引戦略の歴史的検証
5. **ポートフォリオ最適化**: リスク調整後リターンの最大化

次回のノートブック `4_adjacent_spreads_trading_strategies.ipynb` では、具体的な取引戦略とバックテストを実装します。