# Corridor-Width Ablation Visualization

This notebook visualizes the results of the corridor-width ablation experiment,
comparing baseline geometric ranking vs emergent cell-view dynamics.

## Reproduction Instructions

1. Run the experiment first: `python -m experiments.corridor_width_ablation.experiment`
2. Results will be saved to `experiments/corridor_width_ablation/results/`
3. Run this notebook to generate visualizations

**Key Visualizations:**
- Rank comparison charts (baseline vs emergent)
- Corridor width evolution over gates
- Algotype distribution heatmaps
- DG episode analysis
- Convergence dynamics

In [None]:
import json
import sys
from pathlib import Path

# Add src to path
sys.path.insert(0, str(Path.cwd().parent.parent / "src"))

# Check for matplotlib - use text fallback if not available
try:
    import matplotlib.pyplot as plt
    import matplotlib.patches as mpatches
    HAS_MATPLOTLIB = True
except ImportError:
    HAS_MATPLOTLIB = False
    print("matplotlib not available - using text output")

In [None]:
def load_latest_results():
    """Load the most recent ablation results."""
    results_dir = Path("results")
    if not results_dir.exists():
        print("No results directory found. Run the experiment first.")
        return None
    
    json_files = list(results_dir.glob("ablation_*.json"))
    if not json_files:
        print("No result files found. Run the experiment first.")
        return None
    
    latest = max(json_files, key=lambda p: p.stat().st_mtime)
    print(f"Loading: {latest}")
    
    with open(latest) as f:
        return json.load(f)

results = load_latest_results()

## 1. Rank Comparison: Baseline vs Emergent

In [None]:
def plot_rank_comparison(results):
    """Plot baseline vs emergent rank comparison."""
    if results is None:
        print("No results to plot")
        return
    
    gates = []
    baseline_ranks = []
    emergent_ranks = []
    
    for r in results["gate_results"]:
        gates.append(r["gate"])
        b_rank = r["baseline"]["corridor_metrics"].get("effective_corridor_width")
        e_rank = r["emergent"]["corridor_metrics"].get("effective_corridor_width")
        baseline_ranks.append(b_rank if b_rank else 0)
        emergent_ranks.append(e_rank if e_rank else 0)
    
    if not HAS_MATPLOTLIB:
        print("\nRank Comparison (Baseline vs Emergent)")
        print("=" * 50)
        print(f"{'Gate':<10} {'Baseline':<12} {'Emergent':<12} {'Reduction':<10}")
        print("-" * 50)
        for g, b, e in zip(gates, baseline_ranks, emergent_ranks):
            reduction = ((b - e) / b * 100) if b > 0 else 0
            print(f"{g:<10} {b:<12} {e:<12} {reduction:.1f}%")
        return
    
    fig, ax = plt.subplots(figsize=(10, 6))
    
    x = range(len(gates))
    width = 0.35
    
    bars1 = ax.bar([i - width/2 for i in x], baseline_ranks, width, label='Baseline (Geometric)', color='#4ECDC4')
    bars2 = ax.bar([i + width/2 for i in x], emergent_ranks, width, label='Emergent (Cell-View)', color='#FF6B6B')
    
    ax.set_xlabel('Gate', fontsize=12)
    ax.set_ylabel('Rank of True Factor (lower is better)', fontsize=12)
    ax.set_title('Corridor-Width Ablation: Baseline vs Emergent Ranking', fontsize=14)
    ax.set_xticks(x)
    ax.set_xticklabels(gates)
    ax.legend()
    ax.grid(axis='y', alpha=0.3)
    
    # Add value labels
    for bar, val in zip(bars1, baseline_ranks):
        ax.annotate(f'{val}', xy=(bar.get_x() + bar.get_width()/2, bar.get_height()),
                   ha='center', va='bottom', fontsize=9)
    for bar, val in zip(bars2, emergent_ranks):
        ax.annotate(f'{val}', xy=(bar.get_x() + bar.get_width()/2, bar.get_height()),
                   ha='center', va='bottom', fontsize=9)
    
    plt.tight_layout()
    plt.show()

plot_rank_comparison(results)

## 2. Rank Reduction Analysis

In [None]:
def plot_rank_reduction(results):
    """Plot rank reduction percentage per gate."""
    if results is None:
        return
    
    gates = []
    reductions = []
    
    for r in results["gate_results"]:
        gates.append(r["gate"])
        reductions.append(r["comparison"]["rank_reduction_pct"])
    
    # Success threshold
    threshold = 20.0
    
    if not HAS_MATPLOTLIB:
        print("\nRank Reduction Analysis")
        print("=" * 40)
        print(f"Success threshold: {threshold}%")
        print()
        for g, r in zip(gates, reductions):
            status = "PASS" if r >= threshold else "FAIL"
            print(f"{g}: {r:.1f}% [{status}]")
        print(f"\nMean reduction: {sum(reductions)/len(reductions):.1f}%")
        return
    
    fig, ax = plt.subplots(figsize=(10, 6))
    
    colors = ['#2ECC71' if r >= threshold else '#E74C3C' for r in reductions]
    bars = ax.bar(gates, reductions, color=colors)
    
    # Threshold line
    ax.axhline(y=threshold, color='#3498DB', linestyle='--', linewidth=2, label=f'Success Threshold ({threshold}%)')
    
    ax.set_xlabel('Gate', fontsize=12)
    ax.set_ylabel('Rank Reduction (%)', fontsize=12)
    ax.set_title('Rank Reduction: Emergent vs Baseline', fontsize=14)
    ax.legend()
    ax.grid(axis='y', alpha=0.3)
    
    # Add value labels
    for bar, val in zip(bars, reductions):
        ax.annotate(f'{val:.1f}%', xy=(bar.get_x() + bar.get_width()/2, bar.get_height()),
                   ha='center', va='bottom', fontsize=10, fontweight='bold')
    
    plt.tight_layout()
    plt.show()

plot_rank_reduction(results)

## 3. Convergence Dynamics

In [None]:
def plot_convergence(results):
    """Plot convergence dynamics (swaps, sortedness, aggregation) per gate."""
    if results is None:
        return
    
    if not HAS_MATPLOTLIB:
        print("\nConvergence Dynamics")
        print("=" * 50)
        for r in results["gate_results"]:
            gate = r["gate"]
            emergent = r["emergent"]
            print(f"\n{gate}:")
            print(f"  Convergence step: {emergent['convergence_step']}")
            print(f"  Total steps: {emergent['total_steps']}")
            print(f"  DG episodes: {len(emergent['dg_episodes'])}")
            print(f"  DG index: {emergent['dg_index']:.3f}")
            print(f"  Final sortedness: {emergent['sortedness_series'][-1]:.3f}")
            print(f"  Final aggregation: {emergent['aggregation_series'][-1]:.3f}")
        return
    
    n_gates = len(results["gate_results"])
    fig, axes = plt.subplots(n_gates, 3, figsize=(15, 4*n_gates))
    
    if n_gates == 1:
        axes = [axes]
    
    for idx, r in enumerate(results["gate_results"]):
        gate = r["gate"]
        emergent = r["emergent"]
        
        # Swaps per step
        ax1 = axes[idx][0]
        ax1.plot(emergent["swaps_per_step"], color='#3498DB', linewidth=2)
        ax1.set_xlabel('Step')
        ax1.set_ylabel('Swaps')
        ax1.set_title(f'{gate}: Swaps per Step')
        ax1.grid(alpha=0.3)
        
        # Sortedness
        ax2 = axes[idx][1]
        ax2.plot(emergent["sortedness_series"], color='#2ECC71', linewidth=2)
        ax2.set_xlabel('Step')
        ax2.set_ylabel('Sortedness')
        ax2.set_title(f'{gate}: Sortedness Evolution')
        ax2.grid(alpha=0.3)
        ax2.set_ylim(0, 1.05)
        
        # Aggregation
        ax3 = axes[idx][2]
        ax3.plot(emergent["aggregation_series"], color='#E74C3C', linewidth=2)
        ax3.set_xlabel('Step')
        ax3.set_ylabel('Aggregation')
        ax3.set_title(f'{gate}: Aggregation Evolution')
        ax3.grid(alpha=0.3)
        ax3.set_ylim(0, 1.05)
    
    plt.tight_layout()
    plt.show()

plot_convergence(results)

## 4. Algotype Distribution Heatmap

In [None]:
def plot_algotype_heatmap(results):
    """Plot algotype distribution across gates."""
    if results is None:
        return
    
    # Collect algotype data
    gates = []
    all_algotypes = set()
    distributions = []
    
    for r in results["gate_results"]:
        gates.append(r["gate"])
        dist = r["emergent"]["algotype_distribution"]
        distributions.append(dist)
        all_algotypes.update(dist.keys())
    
    algotypes = sorted(all_algotypes)
    
    if not HAS_MATPLOTLIB:
        print("\nAlgotype Distribution")
        print("=" * 60)
        header = f"{'Gate':<10}" + "".join(f"{a:<15}" for a in algotypes)
        print(header)
        print("-" * 60)
        for gate, dist in zip(gates, distributions):
            row = f"{gate:<10}"
            for a in algotypes:
                count = dist.get(a, 0)
                row += f"{count:<15}"
            print(row)
        return
    
    # Build matrix
    matrix = []
    for dist in distributions:
        row = [dist.get(a, 0) for a in algotypes]
        total = sum(row)
        row = [r/total if total > 0 else 0 for r in row]  # Normalize
        matrix.append(row)
    
    fig, ax = plt.subplots(figsize=(10, 6))
    
    im = ax.imshow(matrix, cmap='YlOrRd', aspect='auto')
    
    ax.set_xticks(range(len(algotypes)))
    ax.set_xticklabels(algotypes, rotation=45, ha='right')
    ax.set_yticks(range(len(gates)))
    ax.set_yticklabels(gates)
    
    ax.set_xlabel('Algotype', fontsize=12)
    ax.set_ylabel('Gate', fontsize=12)
    ax.set_title('Algotype Distribution (Normalized)', fontsize=14)
    
    # Add colorbar
    cbar = fig.colorbar(im, ax=ax)
    cbar.set_label('Fraction')
    
    # Add text annotations
    for i in range(len(gates)):
        for j in range(len(algotypes)):
            val = matrix[i][j]
            ax.text(j, i, f'{val:.2f}', ha='center', va='center', 
                   color='white' if val > 0.5 else 'black', fontsize=10)
    
    plt.tight_layout()
    plt.show()

plot_algotype_heatmap(results)

## 5. Statistical Summary

In [None]:
def print_statistical_summary(results):
    """Print the statistical summary from the experiment."""
    if results is None:
        return
    
    stats = results.get("statistical_summary", {})
    
    print("\n" + "=" * 60)
    print("STATISTICAL SUMMARY")
    print("=" * 60)
    
    if "error" in stats:
        print(f"Error: {stats['error']}")
        return
    
    print(f"\nNumber of gates tested: {stats.get('n_gates', 'N/A')}")
    print(f"\nMean baseline rank: {stats.get('mean_baseline_rank', 'N/A'):.1f}")
    print(f"Mean emergent rank: {stats.get('mean_emergent_rank', 'N/A'):.1f}")
    print(f"Mean rank reduction: {stats.get('mean_rank_reduction_pct', 'N/A'):.1f}%")
    print(f"\nEffect Size (Cohen's d): {stats.get('cohens_d', 'N/A'):.3f}")
    print(f"t-statistic: {stats.get('t_statistic', 'N/A'):.3f}")
    print(f"p-value: {stats.get('p_value', 'N/A'):.4f}")
    
    print("\n" + "-" * 60)
    print("Success Criteria: >=20% rank reduction with p<0.1")
    print("-" * 60)
    
    success = stats.get("success_threshold_met", False)
    print(f"\nResult: {'PASS' if success else 'FAIL'}")
    print(f"\nDECISION: {stats.get('decision', 'N/A')}")
    print("=" * 60)

print_statistical_summary(results)

## 6. DG Episodes Analysis

In [None]:
def analyze_dg_episodes(results):
    """Analyze delayed gratification episodes."""
    if results is None:
        return
    
    print("\n" + "=" * 60)
    print("DG (DELAYED GRATIFICATION) EPISODES")
    print("=" * 60)
    
    for r in results["gate_results"]:
        gate = r["gate"]
        emergent = r["emergent"]
        episodes = emergent["dg_episodes"]
        dg_index = emergent["dg_index"]
        
        print(f"\n{gate}:")
        print(f"  Total DG episodes: {len(episodes)}")
        print(f"  DG index (sum of scores): {dg_index:.3f}")
        
        if episodes:
            print("  Episodes:")
            for i, ep in enumerate(episodes[:5]):  # Show first 5
                print(f"    [{i+1}] Steps {ep['start']}-{ep['end']}: "
                      f"peak={ep['peak']:.3f} -> valley={ep['valley']:.3f} -> new_peak={ep['new_peak']:.3f}")
            if len(episodes) > 5:
                print(f"    ... and {len(episodes) - 5} more episodes")

analyze_dg_episodes(results)

## 7. Corridor Entropy Comparison

In [None]:
def plot_entropy_comparison(results):
    """Compare corridor entropy between baseline and emergent."""
    if results is None:
        return
    
    gates = []
    baseline_entropy = []
    emergent_entropy = []
    
    for r in results["gate_results"]:
        gates.append(r["gate"])
        b_entropy = r["baseline"]["corridor_metrics"].get("corridor_entropy_normalized", 0)
        e_entropy = r["emergent"]["corridor_metrics"].get("corridor_entropy_normalized", 0)
        baseline_entropy.append(b_entropy)
        emergent_entropy.append(e_entropy)
    
    if not HAS_MATPLOTLIB:
        print("\nCorridor Entropy Comparison (Normalized, 0-1)")
        print("=" * 50)
        print(f"{'Gate':<10} {'Baseline':<15} {'Emergent':<15}")
        print("-" * 50)
        for g, b, e in zip(gates, baseline_entropy, emergent_entropy):
            print(f"{g:<10} {b:<15.3f} {e:<15.3f}")
        return
    
    fig, ax = plt.subplots(figsize=(10, 6))
    
    x = range(len(gates))
    width = 0.35
    
    bars1 = ax.bar([i - width/2 for i in x], baseline_entropy, width, label='Baseline', color='#9B59B6')
    bars2 = ax.bar([i + width/2 for i in x], emergent_entropy, width, label='Emergent', color='#F39C12')
    
    ax.set_xlabel('Gate', fontsize=12)
    ax.set_ylabel('Normalized Entropy (0=concentrated, 1=uniform)', fontsize=12)
    ax.set_title('Corridor Entropy: Lower = More Decisive Ranking', fontsize=14)
    ax.set_xticks(x)
    ax.set_xticklabels(gates)
    ax.legend()
    ax.grid(axis='y', alpha=0.3)
    ax.set_ylim(0, 1.1)
    
    plt.tight_layout()
    plt.show()

plot_entropy_comparison(results)

## 8. Export Summary Report

In [None]:
def export_summary_report(results, filename="summary_report.md"):
    """Export a markdown summary report."""
    if results is None:
        print("No results to export")
        return
    
    stats = results.get("statistical_summary", {})
    
    report = []
    report.append("# Corridor-Width Ablation Experiment Report")
    report.append(f"\n**Run ID:** {results['run_id']}")
    report.append(f"\n**Timestamp:** {results['timestamp']}")
    
    report.append("\n## Configuration")
    config = results["experiment_config"]
    report.append(f"- Gates tested: {', '.join(config['gates_tested'])}")
    report.append(f"- Band halfwidth: {config['band_halfwidth']:,}")
    report.append(f"- Algotypes: {', '.join(config['algotypes'])}")
    report.append(f"- Max steps: {config['max_steps']}")
    
    report.append("\n## Results Summary")
    report.append("\n| Gate | Baseline Rank | Emergent Rank | Reduction |")
    report.append("|------|---------------|---------------|-----------|")
    for r in results["gate_results"]:
        b_rank = r["baseline"]["corridor_metrics"].get("effective_corridor_width", "N/A")
        e_rank = r["emergent"]["corridor_metrics"].get("effective_corridor_width", "N/A")
        reduction = r["comparison"]["rank_reduction_pct"]
        report.append(f"| {r['gate']} | {b_rank} | {e_rank} | {reduction:.1f}% |")
    
    report.append("\n## Statistical Analysis")
    if "error" not in stats:
        report.append(f"- Mean rank reduction: {stats.get('mean_rank_reduction_pct', 0):.1f}%")
        report.append(f"- Cohen's d effect size: {stats.get('cohens_d', 0):.3f}")
        report.append(f"- Paired t-test p-value: {stats.get('p_value', 1):.4f}")
    
    report.append("\n## Decision")
    report.append(f"\n**Success Criteria:** >=20% rank reduction with p<0.1")
    success = stats.get("success_threshold_met", False)
    report.append(f"\n**Result:** {'PASS' if success else 'FAIL'}")
    report.append(f"\n**Recommendation:** {stats.get('decision', 'N/A')}")
    
    # Write to file
    with open(filename, "w") as f:
        f.write("\n".join(report))
    
    print(f"Report exported to: {filename}")

export_summary_report(results, "results/summary_report.md")