## Information-Theoretic Analysis: Macro-Finance Causality

Research Framework:
1. Data Preparation: Load, clean, align time series, discretize
2. Information Measures: MI, TE, CTE using JIDT
3. Statistical Significance: Permutation test (p < 0.05)
4. Temporal Analysis: Rolling window to capture dynamics

Core Measures:
- Mutual Information (MI): Total dependence (baseline)
- Transfer Entropy (TE): Directed information flow
- Conditional Transfer Entropy (CTE): Net causal effect

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import jpype
from jpype import *
import warnings
warnings.filterwarnings("ignore")



## 1. Setup: JIDT Library

In [2]:
jarLocation = r"C:\Users\11727\Desktop\5030\infodynamics-dist-1.6.1 (2)\infodynamics.jar"
if not isJVMStarted():
    startJVM(getDefaultJVMPath(), "-ea", "-Djava.class.path=" + jarLocation)

# Import JIDT calculators
MI_D = JPackage("infodynamics.measures.discrete").MutualInformationCalculatorDiscrete
CMI_D = JPackage("infodynamics.measures.discrete").ConditionalMutualInformationCalculatorDiscrete
TE_D = JPackage("infodynamics.measures.discrete").TransferEntropyCalculatorDiscrete

# Helper function: convert numpy array to Java int array
def ja_int(x):
    """Convert numpy array to Java int array for JIDT"""
    x = np.asarray(x).astype(np.int32).ravel()
    return JArray(JInt)(x.tolist())


## 2. Data Preparation

In [3]:
def prepare_data():
    """Load, merge, and preprocess financial and macroeconomic data"""
    # Load data
    stock_data = pd.read_csv('sp500_data.csv', parse_dates=['Date'], index_col='Date')
    macro_data = pd.read_csv('macro_data.csv', parse_dates=['Date'], index_col='Date')
    
    # Align by month
    stock_data['YearMonth'] = stock_data.index.to_period('M')
    macro_data['YearMonth'] = macro_data.index.to_period('M')
    data = pd.merge(stock_data.set_index('YearMonth'), macro_data.set_index('YearMonth'), 
                   left_index=True, right_index=True, how='inner')
    
    # Calculate log returns
    data = data[data['Stock'] > 0].copy()
    data['Stock_Log_Return'] = np.log(data['Stock'] / data['Stock'].shift(1))
    
    # Calculate inflation if needed
    if 'Inflation' not in data.columns and 'CPI' in data.columns:
        data['Inflation'] = data['CPI'].pct_change(12) * 100
    
    # Clean and set index
    data = data.dropna(subset=['Stock_Log_Return', 'Interest_Rate']).copy()
    data.index = data.index.to_timestamp('M')
    
    return data.sort_index()

## 3. Discretization (Required for JIDT)

In [4]:
def discretize(x, n_bins=10):
    """Discretize continuous variable for JIDT (requires discrete data)"""
    x_clean = pd.to_numeric(x, errors='coerce').dropna()
    if len(x_clean) < 10:
        return pd.Series(index=x.index, dtype=float)
    
    n_bins_actual = min(n_bins, len(x_clean) // 2)
    bins = pd.qcut(x_clean, q=n_bins_actual, labels=False, duplicates='drop')
    result = pd.Series(index=x.index, dtype=float)
    result[x_clean.index] = bins
    return result

## 4. Information-Theoretic Measures

In [5]:
def compute_transfer_entropy_for_arrays(x, y, lag=1):
    """Compute Transfer Entropy for numpy arrays (used in permutation test)"""
    try:
        if len(x) != len(y) or len(x) < 10:
            return np.nan
        
        x_bin = discretize(pd.Series(x))
        y_bin = discretize(pd.Series(y))
        mask = ~(x_bin.isna() | y_bin.isna())
        x_int = x_bin[mask].astype(int).values
        y_int = y_bin[mask].astype(int).values
        
        if len(x_int) < 10:
            return np.nan
        
        base = max(x_int.max(), y_int.max()) + 1
        calc = TE_D(base, 1, 1, 1, 1, lag)
        calc.initialise()
        calc.addObservations(ja_int(x_int), ja_int(y_int))
        return float(calc.computeAverageLocalOfObservations())
    except:
        return np.nan


In [6]:
def compute_mi(data, x_col, y_col):
    """Compute Mutual Information (baseline: total dependence)"""
    try:
        x_bin = discretize(data[x_col])
        y_bin = discretize(data[y_col])
        mask = ~(x_bin.isna() | y_bin.isna())
        if mask.sum() < 10:
            return np.nan
        
        x_int = x_bin[mask].astype(int).values
        y_int = y_bin[mask].astype(int).values
        base = max(x_int.max(), y_int.max()) + 1
        
        calc = MI_D()
        calc.initialise(base, base, 1)
        calc.addObservations(ja_int(x_int), ja_int(y_int))
        return float(calc.computeAverageLocalOfObservations())
    except:
        return np.nan

def compute_te(data, source_col, target_col, lag=1):
    """Compute Transfer Entropy (directed information flow)"""
    try:
        source_bin = discretize(data[source_col])
        target_bin = discretize(data[target_col])
        mask = ~(source_bin.isna() | target_bin.isna())
        if mask.sum() < 10:
            return np.nan
        
        source_int = source_bin[mask].astype(int).values
        target_int = target_bin[mask].astype(int).values
        base = max(source_int.max(), target_int.max()) + 1
        
        calc = TE_D(base, 1, 1, 1, 1, lag)
        calc.initialise()
        calc.addObservations(ja_int(source_int), ja_int(target_int))
        return float(calc.computeAverageLocalOfObservations())
    except:
        return np.nan

def compute_cte(data, source_col, target_col, condition_col, lag=1):
    """Compute Conditional Transfer Entropy (net causal effect)"""
    try:
        source_bin = discretize(data[source_col])
        target_bin = discretize(data[target_col])
        condition_bin = discretize(data[condition_col])
        mask = ~(source_bin.isna() | target_bin.isna() | condition_bin.isna())
        if mask.sum() < 10:
            return np.nan
        
        source_int = source_bin[mask].astype(int).values
        target_int = target_bin[mask].astype(int).values
        condition_int = condition_bin[mask].astype(int).values
        
        bx, by, bz = source_int.max() + 1, target_int.max() + 1, condition_int.max() + 1
        calc = CMI_D()
        calc.initialise(bx, by, bz)
        calc.addObservations(ja_int(source_int), ja_int(target_int), ja_int(condition_int))
        return float(calc.computeAverageLocalOfObservations())
    except:
        return np.nan


## 5. Statistical Significance Testing

In [7]:
def te_with_pvalue(data, source_col, target_col, lag=1, n_perm=200):
    """
    Compute Transfer Entropy with permutation test for statistical significance.
    
    Null hypothesis: Random permutation of source (preserves target structure).
    P-value formula: (count(TE_perm >= TE_obs) + 1) / (n_perm + 1)
    Significance threshold: p < 0.05
    """
    # Compute observed TE
    te_obs = compute_te(data, source_col, target_col, lag)
    if np.isnan(te_obs):
        return np.nan, np.nan
    
    # Extract aligned arrays
    common_idx = data[[source_col, target_col]].dropna().index
    if len(common_idx) < 10:
        return np.nan, np.nan
    
    x = data.loc[common_idx, source_col].values
    y = data.loc[common_idx, target_col].values
    
    # Permutation test: shuffle source only (preserve target structure)
    te_perm = []
    for _ in range(n_perm):
        x_perm = np.random.permutation(x)
        temp_df = pd.DataFrame({source_col: x_perm, target_col: y}, index=common_idx)
        te_val = compute_te(temp_df, source_col, target_col, lag)
        if not np.isnan(te_val):
            te_perm.append(te_val)
    
    if len(te_perm) == 0:
        return np.nan, np.nan
    
    # Calculate p-value (right-tailed test)
    p_value = (np.sum(np.array(te_perm) >= te_obs) + 1) / (n_perm + 1)
    return te_obs, p_value
   

## 6. Rolling Window Analysis

In [25]:
def rolling_analysis(data, window_size=60, n_perm=200):
    """
    Rolling window analysis with bidirectional TE and significance testing.
    Each window computes: TE, p-values, MI, and CTE.
    """
    results = {
        'dates': [],
        'te_m2s': [],  # Macro to Stock
        'te_s2m': [],  # Stock to Macro
        'p_m2s': [],
        'p_s2m': [],
        'mi': [],
        'cte': []
    }
    
    n_windows = len(data) - window_size + 1
    print(f"Computing {n_windows} rolling windows (n_perm={n_perm})...")
    
    for i in range(n_windows):
        window = data.iloc[i:i+window_size]
        
        # Bidirectional TE with significance
        te_m2s, p_m2s = te_with_pvalue(window, 'Interest_Rate', 'Stock_Log_Return', lag=1, n_perm=n_perm)
        te_s2m, p_s2m = te_with_pvalue(window, 'Stock_Log_Return', 'Interest_Rate', lag=1, n_perm=n_perm)
        
        # Mutual Information (baseline)
        mi = compute_mi(window, 'Interest_Rate', 'Stock_Log_Return')
        
        # Conditional TE (if inflation available)
        if 'Inflation' in window.columns and window['Inflation'].notna().sum() >= 10:
            cte = compute_cte(window, 'Interest_Rate', 'Stock_Log_Return', 'Inflation', lag=1)
        else:
            cte = np.nan
        
        results['dates'].append(window.index[-1])
        results['te_m2s'].append(te_m2s)
        results['te_s2m'].append(te_s2m)
        results['p_m2s'].append(p_m2s)
        results['p_s2m'].append(p_s2m)
        results['mi'].append(mi)
        results['cte'].append(cte)
        
        if (i + 1) % 50 == 0:
            print(f"  Processed {i+1}/{n_windows} windows...")
    
    print(f"Analysis complete: {n_windows} windows")
    return results

## 7. Visualization


In [26]:
def plot_results(results):
    """Visualize analysis results"""
    dates = results['dates']
    te_m2s = np.array(results['te_m2s'])
    te_s2m = np.array(results['te_s2m'])
    p_m2s = np.array(results['p_m2s'])
    p_s2m = np.array(results['p_s2m'])
    
    fig, axes = plt.subplots(4, 1, figsize=(14, 10))
    
    # 1. Bidirectional TE (with significance markers)
    sig_m2s = p_m2s < 0.05
    sig_s2m = p_s2m < 0.05
    
    axes[0].plot(dates, te_m2s, 'b-', alpha=0.7, label='TE (Macro→Stock)')
    axes[0].plot(dates, te_s2m, 'orange', linestyle='-', alpha=0.7, label='TE (Stock→Macro)')
    axes[0].scatter(np.array(dates)[sig_m2s], te_m2s[sig_m2s], c='blue', marker='*', s=50, 
                   label='Significant (p<0.05)', zorder=5)
    axes[0].scatter(np.array(dates)[sig_s2m], te_s2m[sig_s2m], c='orange', marker='*', s=50, zorder=5)
    axes[0].set_title('Bidirectional Transfer Entropy (p < 0.05 significance)', fontweight='bold')
    axes[0].set_ylabel('TE (bits)')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # 2. Conditional TE vs Unconditional TE
    axes[1].plot(dates, results['cte'], 'r-', label='Conditional TE', alpha=0.8)
    axes[1].plot(dates, te_m2s, 'b--', label='Unconditional TE', alpha=0.6)
    axes[1].set_title('Conditional Transfer Entropy (Net Effect)', fontweight='bold')
    axes[1].set_ylabel('CTE/TE (bits)')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    # 3. TE Difference (dominant direction)
    te_diff = te_m2s - te_s2m
    axes[2].plot(dates, te_diff, 'g-', alpha=0.8)
    axes[2].axhline(0, color='k', linestyle='--', alpha=0.5)
    axes[2].fill_between(dates, 0, te_diff, where=(te_diff > 0), alpha=0.3, color='blue', label='Macro→Stock')
    axes[2].fill_between(dates, 0, te_diff, where=(te_diff < 0), alpha=0.3, color='orange', label='Stock→Macro')
    axes[2].set_title('TE Difference (Dominant Direction)', fontweight='bold')
    axes[2].set_ylabel('TE Difference (bits)')
    axes[2].legend()
    axes[2].grid(True, alpha=0.3)
    
    # 4. P-values
    axes[3].semilogy(dates, p_m2s, 'b-', alpha=0.7, label='p-value (Macro→Stock)')
    axes[3].semilogy(dates, p_s2m, 'orange', linestyle='-', alpha=0.7, label='p-value (Stock→Macro)')
    axes[3].axhline(0.05, color='r', linestyle='--', linewidth=2, label='p = 0.05')
    axes[3].set_title('P-values (Permutation Test)', fontweight='bold')
    axes[3].set_ylabel('P-value (log scale)')
    axes[3].set_xlabel('Date')
    axes[3].legend()
    axes[3].grid(True, alpha=0.3)
    axes[3].invert_yaxis()
    
    plt.tight_layout()
    plt.show()
    
    # Summary statistics
    print("\n=== Summary Statistics ===")
    print(f"TE (Macro→Stock): mean={np.nanmean(te_m2s):.4f}, std={np.nanstd(te_m2s):.4f}")
    print(f"TE (Stock→Macro): mean={np.nanmean(te_s2m):.4f}, std={np.nanstd(te_s2m):.4f}")
    print(f"Mutual Information: mean={np.nanmean(results['mi']):.4f}")
    print(f"Conditional TE: mean={np.nanmean(results['cte']):.4f}")
    
    valid_p = ~(np.isnan(p_m2s) | np.isnan(p_s2m))
    if valid_p.sum() > 0:
        sig_m2s_count = (p_m2s[valid_p] < 0.05).sum()
        sig_s2m_count = (p_s2m[valid_p] < 0.05).sum()
        print(f"\nSignificant windows (p < 0.05):")
        print(f"  Macro→Stock: {sig_m2s_count}/{valid_p.sum()} ({100*sig_m2s_count/valid_p.sum():.1f}%)")
        print(f"  Stock→Macro: {sig_s2m_count}/{valid_p.sum()} ({100*sig_s2m_count/valid_p.sum():.1f}%)")


##  7. Main Analysis Functions

In [27]:
def rolling_window_analysis_bidirectional(data, window_size=60, n_perm=200):
    te_macro_to_stock = []
    te_stock_to_macro = []
    p_macro_to_stock = []
    p_stock_to_macro = []
    cte_results = []
    mi_results = []
    dates = []
    
    if len(data) < window_size:
        print(f"Warning: Insufficient data ({len(data)} < {window_size})")
        return [], [], [], [], [], [], []
    
    print(f"Computing TE with permutation test (n_perm={n_perm}) for {len(data) - window_size + 1} windows...")
    for start in range(len(data) - window_size + 1):
        window_data = data.iloc[start:start+window_size]
        
        # Bidirectional TE with permutation test (Question 1)
        # Use compute_te_with_pvalue for statistical significance testing
        te_m2s, p_m2s = compute_te_with_pvalue(
            window_data, 'Interest_Rate', 'Stock_Log_Return', lag=1, n_perm=n_perm
        )
        te_s2m, p_s2m = compute_te_with_pvalue(
            window_data, 'Stock_Log_Return', 'Interest_Rate', lag=1, n_perm=n_perm
        )
        
        # Conditional TE (Question 2)
        if 'Inflation' in window_data.columns and window_data['Inflation'].notna().sum() >= 10:
            cte = compute_conditional_transfer_entropy(
                window_data, 'Interest_Rate', 'Stock_Log_Return', 'Inflation', lag=1
            )
        else:
            cte = np.nan
        
        # Mutual Information (baseline)
        mi = compute_mutual_information(window_data, 'Interest_Rate', 'Stock_Log_Return')
        
        te_macro_to_stock.append(te_m2s)
        te_stock_to_macro.append(te_s2m)
        p_macro_to_stock.append(p_m2s)
        p_stock_to_macro.append(p_s2m)
        cte_results.append(cte)
        mi_results.append(mi)
        dates.append(window_data.index[-1])
        
        # Progress indicator
        if (start + 1) % 50 == 0:
            print(f"  Processed {start + 1}/{len(data) - window_size + 1} windows...")
    
    print(f"\nRolling window analysis complete: {len(te_macro_to_stock)} windows")
    if dates:
        te_m2s_arr = np.array(te_macro_to_stock)
        te_s2m_arr = np.array(te_stock_to_macro)
        p_m2s_arr = np.array(p_macro_to_stock)
        p_s2m_arr = np.array(p_stock_to_macro)
        
        # Valid windows (non-NaN TE values)
        mask = ~(np.isnan(te_m2s_arr) | np.isnan(te_s2m_arr))
        if mask.sum() > 0:
            macro_dominates = (te_m2s_arr[mask] > te_s2m_arr[mask]).sum()
            print(f"  Dominant direction: Macro→Stock in {macro_dominates}/{mask.sum()} windows ({100*macro_dominates/mask.sum():.1f}%)")
            
            # Significance statistics
            sig_m2s = (p_m2s_arr[mask] < 0.05).sum()
            sig_s2m = (p_s2m_arr[mask] < 0.05).sum()
            print(f"  Significant TE (p < 0.05): Macro→Stock: {sig_m2s}/{mask.sum()} ({100*sig_m2s/mask.sum():.1f}%), Stock→Macro: {sig_s2m}/{mask.sum()} ({100*sig_s2m/mask.sum():.1f}%)")
    
    return te_macro_to_stock, te_stock_to_macro, p_macro_to_stock, p_stock_to_macro, cte_results, mi_results, dates

In [28]:
def plot_results_bidirectional(te_macro_to_stock, te_stock_to_macro, cte_results, mi_results, dates=None, 
                               p_macro_to_stock=None, p_stock_to_macro=None):
    """Plot bidirectional TE analysis results (Question 1-3) with optional p-values"""
    # Determine number of subplots (5 if p-values provided, 4 otherwise)
    n_plots = 5 if p_macro_to_stock is not None else 4
    fig, axes = plt.subplots(n_plots, 1, figsize=(14, 2.5 * n_plots))
    if n_plots == 1:
        axes = [axes]
    x_axis = dates if dates is not None else range(len(te_macro_to_stock))
    
    # 1. Bidirectional TE (Question 1)
    # Plot with different styles for significant vs non-significant windows
    if p_macro_to_stock is not None:
        p_m2s_arr = np.array(p_macro_to_stock)
        p_s2m_arr = np.array(p_stock_to_macro)
        sig_mask_m2s = p_m2s_arr < 0.05
        sig_mask_s2m = p_s2m_arr < 0.05
        non_sig_mask_m2s = ~sig_mask_m2s & ~np.isnan(p_m2s_arr)
        non_sig_mask_s2m = ~sig_mask_s2m & ~np.isnan(p_s2m_arr)
        
        # Plot significant windows with solid lines
        if sig_mask_m2s.any():
            axes[0].plot(np.array(x_axis)[sig_mask_m2s], np.array(te_macro_to_stock)[sig_mask_m2s], 
                        color='blue', linestyle='-', linewidth=2, alpha=0.9, label='TE (Macro→Stock, p<0.05)', zorder=3)
        if sig_mask_s2m.any():
            axes[0].plot(np.array(x_axis)[sig_mask_s2m], np.array(te_stock_to_macro)[sig_mask_s2m], 
                        color='orange', linestyle='-', linewidth=2, alpha=0.9, label='TE (Stock→Macro, p<0.05)', zorder=3)
        
        # Plot non-significant windows with dashed lines and lower alpha
        if non_sig_mask_m2s.any():
            axes[0].plot(np.array(x_axis)[non_sig_mask_m2s], np.array(te_macro_to_stock)[non_sig_mask_m2s], 
                        color='blue', linestyle='--', linewidth=1, alpha=0.4, label='TE (Macro→Stock, p≥0.05)', zorder=2)
        if non_sig_mask_s2m.any():
            axes[0].plot(np.array(x_axis)[non_sig_mask_s2m], np.array(te_stock_to_macro)[non_sig_mask_s2m], 
                        color='orange', linestyle='--', linewidth=1, alpha=0.4, label='TE (Stock→Macro, p≥0.05)', zorder=2)
        
        # Handle NaN windows
        nan_mask_m2s = np.isnan(p_m2s_arr)
        nan_mask_s2m = np.isnan(p_s2m_arr)
        if nan_mask_m2s.any():
            axes[0].plot(np.array(x_axis)[nan_mask_m2s], np.array(te_macro_to_stock)[nan_mask_m2s], 
                        color='gray', linestyle=':', linewidth=0.5, alpha=0.3, label='TE (Macro→Stock, NaN)', zorder=1)
        if nan_mask_s2m.any():
            axes[0].plot(np.array(x_axis)[nan_mask_s2m], np.array(te_stock_to_macro)[nan_mask_s2m], 
                        color='gray', linestyle=':', linewidth=0.5, alpha=0.3, label='TE (Stock→Macro, NaN)', zorder=1)
    else:
        # Fallback if no p-values provided
        axes[0].plot(x_axis, te_macro_to_stock, label='TE (Macro→Stock)', color='blue', alpha=0.8)
        axes[0].plot(x_axis, te_stock_to_macro, label='TE (Stock→Macro)', color='orange', alpha=0.8)
    
    axes[0].set_title('Bidirectional Transfer Entropy (Question 1)\n判定标准: p < 0.05 (permutation test, n_perm=200)', fontweight='bold')
    axes[0].set_ylabel('TE (bits)')
    axes[0].grid(True, alpha=0.3)
    axes[0].legend(fontsize=8, loc='best')
    
    # 2. Conditional TE (Question 2)
    axes[1].plot(x_axis, cte_results, label='Conditional TE', color='red', alpha=0.8)
    axes[1].plot(x_axis, te_macro_to_stock, label='Unconditional TE', color='blue', linestyle=':', alpha=0.6)
    axes[1].set_title('Conditional Transfer Entropy: Net Effect (Question 2)', fontweight='bold')
    axes[1].set_ylabel('CTE/TE (bits)')
    axes[1].grid(True, alpha=0.3)
    axes[1].legend()
    
    # 3. TE Difference
    te_diff = np.array(te_macro_to_stock) - np.array(te_stock_to_macro)
    axes[2].plot(x_axis, te_diff, label='TE Difference', color='green', alpha=0.8)
    axes[2].axhline(y=0, color='black', linestyle='-', alpha=0.5)
    axes[2].fill_between(x_axis, 0, te_diff, where=(te_diff > 0), alpha=0.3, color='blue')
    axes[2].fill_between(x_axis, 0, te_diff, where=(te_diff < 0), alpha=0.3, color='orange')
    axes[2].set_title('TE Difference: Dominant Direction Over Time', fontweight='bold')
    axes[2].set_ylabel('TE Difference (bits)')
    axes[2].grid(True, alpha=0.3)
    
    # 4. Mutual Information
    axes[3].plot(x_axis, mi_results, label='Mutual Information', color='green', alpha=0.8)
    axes[3].set_title('Mutual Information (Baseline)', fontweight='bold')
    axes[3].set_ylabel('MI (bits)')
    axes[3].grid(True, alpha=0.3)
    axes[3].legend()
    
    # 5. P-values (if provided)
    if p_macro_to_stock is not None:
        axes[4].semilogy(x_axis, p_macro_to_stock, label='p-value (Macro→Stock)', color='blue', alpha=0.8)
        axes[4].semilogy(x_axis, p_stock_to_macro, label='p-value (Stock→Macro)', color='orange', alpha=0.8)
        axes[4].axhline(y=0.05, color='red', linestyle='--', alpha=0.7, linewidth=2, label='p = 0.05 (显著性阈值)')
        axes[4].fill_between(x_axis, 0, 0.05, alpha=0.1, color='green', label='显著区域 (p < 0.05)')
        axes[4].set_title('P-values from Permutation Test\n零假设: 随机打乱 source 的时间顺序 (permute method)', fontweight='bold')
        axes[4].set_ylabel('P-value (log scale)')
        axes[4].set_xlabel('Date' if dates is not None else 'Window Index')
        axes[4].grid(True, alpha=0.3)
        axes[4].legend(fontsize=8)
        axes[4].invert_yaxis()  # Lower p-values at top
    else:
        axes[3].set_xlabel('Date' if dates is not None else 'Window Index')
    
    # Mark economic events (Question 3)
    if dates is not None:
        crisis_2008 = pd.Timestamp('2008-09-30')
        covid_2020 = pd.Timestamp('2020-03-31')
        n_axes_to_mark = 4 if p_macro_to_stock is None else 5
        for ax in axes[:n_axes_to_mark]:
            if dates[0] <= crisis_2008 <= dates[-1]:
                ax.axvline(x=crisis_2008, color='red', linestyle='--', alpha=0.5)
            if dates[0] <= covid_2020 <= dates[-1]:
                ax.axvline(x=covid_2020, color='purple', linestyle='--', alpha=0.5)
    
    plt.tight_layout()
    plt.show()
    
    # Summary statistics
    print("\nSummary Statistics:")
    print(f"TE (Macro→Stock): mean={np.nanmean(te_macro_to_stock):.4f}, std={np.nanstd(te_macro_to_stock):.4f}")
    print(f"TE (Stock→Macro): mean={np.nanmean(te_stock_to_macro):.4f}, std={np.nanstd(te_stock_to_macro):.4f}")
    print(f"Conditional TE: mean={np.nanmean(cte_results):.4f}, std={np.nanstd(cte_results):.4f}")
    print(f"Mutual Information: mean={np.nanmean(mi_results):.4f}, std={np.nanstd(mi_results):.4f}")
    if p_macro_to_stock is not None:
        p_m2s_arr = np.array(p_macro_to_stock)
        p_s2m_arr = np.array(p_stock_to_macro)
        valid_p_mask = ~(np.isnan(p_m2s_arr) | np.isnan(p_s2m_arr))
        if valid_p_mask.sum() > 0:
            print(f"\nP-value Statistics (Permutation Test):")
            print(f"  零假设构造: 随机打乱 source 的时间顺序 (permute method)")
            print(f"  置换次数: n_perm = 200")
            print(f"  判定标准: p < 0.05")
            print(f"  P-value (Macro→Stock): mean={np.nanmean(p_m2s_arr):.4f}, median={np.nanmedian(p_m2s_arr):.4f}")
            print(f"  P-value (Stock→Macro): mean={np.nanmean(p_s2m_arr):.4f}, median={np.nanmedian(p_s2m_arr):.4f}")
            sig_m2s = (p_m2s_arr[valid_p_mask] < 0.05).sum()
            sig_s2m = (p_s2m_arr[valid_p_mask] < 0.05).sum()
            print(f"  显著窗口 (p < 0.05): Macro→Stock: {sig_m2s}/{valid_p_mask.sum()} ({100*sig_m2s/valid_p_mask.sum():.1f}%)")
            print(f"  显著窗口 (p < 0.05): Stock→Macro: {sig_s2m}/{valid_p_mask.sum()} ({100*sig_s2m/valid_p_mask.sum():.1f}%)")
    
    # Period analysis (Question 3)
    if dates is not None:
        print("\nPeriod Analysis (Question 3):")
        dates_arr = pd.to_datetime(dates)
        periods = {
            'Pre-2000': (dates_arr < pd.Timestamp('2000-01-01')),
            '2000-2008': ((dates_arr >= pd.Timestamp('2000-01-01')) & (dates_arr < pd.Timestamp('2008-09-01'))),
            '2008 Crisis': ((dates_arr >= pd.Timestamp('2008-09-01')) & (dates_arr < pd.Timestamp('2010-12-31'))),
            'Post-Crisis': ((dates_arr >= pd.Timestamp('2011-01-01')) & (dates_arr < pd.Timestamp('2020-01-01'))),
            'COVID-19 Era': (dates_arr >= pd.Timestamp('2020-01-01'))
        }
        for period_name, mask in periods.items():
            if mask.sum() > 0:
                te_m2s_period = np.array(te_macro_to_stock)[mask]
                te_s2m_period = np.array(te_stock_to_macro)[mask]
                valid_mask = ~(np.isnan(te_m2s_period) | np.isnan(te_s2m_period))
                if valid_mask.sum() > 0:
                    te_m2s_mean = np.nanmean(te_m2s_period)
                    te_s2m_mean = np.nanmean(te_s2m_period)
                    dominant = "Macro" if te_m2s_mean > te_s2m_mean else "Stock"
                    print(f"  {period_name}: Dominant={dominant}, TE(M→S)={te_m2s_mean:.4f}, TE(S→M)={te_s2m_mean:.4f}")

## 8. Main Analysis Pipeline

In [None]:
def main():
    """Main analysis pipeline"""
    print("Information-Theoretic Analysis: Macro-Finance Causality")
    
    # Step 1: Prepare data
    print("\n[1/3] Preparing data...")
    data = prepare_data()
    print(f"  Data: {len(data)} observations from {data.index[0].date()} to {data.index[-1].date()}")
    
    # Step 2: Rolling window analysis
    print("\n[2/3] Running rolling window analysis...")
    results = rolling_analysis(data, window_size=60, n_perm=200)
    
    # Step 3: Visualize results
    print("\n[3/3] Visualizing results...")
    plot_results(results)
    
    print("Analysis complete!")

    return results

# Run analysis
if __name__ == "__main__":
    results = main()

Information-Theoretic Analysis: Macro-Finance Causality

[1/3] Preparing data...
  Data: 429 observations from 1990-02-28 to 2025-10-31

[2/3] Running rolling window analysis...
Computing 370 rolling windows (n_perm=200)...
  Processed 50/370 windows...
