# Intention Collapse: Experiment 4.1
## Correlating Intention Metrics with Reasoning Accuracy

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/YOUR_USERNAME/intention-collapse-experiments/blob/main/notebooks/01_intention_metrics.ipynb)

This notebook implements the experimental protocol from Section 4.1 of the Intention Collapse paper:

> "To assess whether richer intention states predict better performance, we propose measuring our metrics across varying reasoning regimes and correlating them with final accuracy."

### Metrics Implemented
1. **Intention Entropy** $H_{int}(I)$: Shannon entropy of next-token distribution
2. **Effective Dimensionality** $dim_{eff}(I)$: PCA-based dimensionality of hidden activations
3. **Latent Recoverability** $Recov(I; Z)$: Linear probe accuracy for predicting correctness

### Experimental Conditions
- **Baseline**: Zero-shot (direct answer)
- **Enhanced**: Chain-of-thought reasoning

## 1. Setup and Installation

First, let's install all required dependencies and configure the environment.

In [None]:
# Install dependencies (this may take a few minutes)
!pip install -q torch transformers accelerate bitsandbytes
!pip install -q datasets scikit-learn scipy
!pip install -q matplotlib seaborn tqdm pyyaml

In [None]:
# Verify GPU availability
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
else:
    print("WARNING: No GPU detected. Please enable GPU in Runtime > Change runtime type")

In [None]:
# Configure Hugging Face token
# Option 1: Use Colab Secrets (recommended)
try:
    from google.colab import userdata
    HF_TOKEN = userdata.get('HF_TOKEN')
    print("✓ Loaded HF_TOKEN from Colab Secrets")
except:
    # Option 2: Manual input
    import getpass
    HF_TOKEN = getpass.getpass("Enter your Hugging Face token: ")
    print("✓ Token entered manually")

# Login to Hugging Face
from huggingface_hub import login
login(token=HF_TOKEN, add_to_git_credential=False)

## 2. Configuration

Set up experiment parameters. Adjust these based on your available resources.

In [None]:
# =============================================================================
# EXPERIMENT CONFIGURATION
# =============================================================================

CONFIG = {
    # Model settings
    'model_name': 'mistralai/Mistral-7B-Instruct-v0.3',
    'quantization': '4bit',  # Options: '4bit', '8bit', 'none'
    'extraction_layers': [27, 28, 29, 30, 31],  # Last 5 layers for Mistral-7B
    
    # Dataset settings
    'dataset': 'gsm8k',
    'subset_size': 200,  # Reduce for faster testing, increase for publication
    'seed': 42,
    
    # Generation settings
    'max_new_tokens_baseline': 50,
    'max_new_tokens_cot': 512,
    'temperature': 0.0,  # Greedy decoding
    
    # Metric settings
    'variance_threshold': 0.90,  # For dim_eff
    'entropy_top_k': 100,        # For H_int
    'probe_regularization': 1.0, # For Recov
    
    # Output settings
    'output_dir': 'results',
    'save_figures': True,
}

# Prompt templates
PROMPTS = {
    'baseline': """Solve this math problem. Give only the final numerical answer.

Problem: {question}
Answer:""",
    
    'enhanced': """Solve this math problem step by step. Show your reasoning, then give the final answer.

Problem: {question}
Solution:"""
}

print("Configuration loaded successfully!")
print(f"Model: {CONFIG['model_name']}")
print(f"Dataset: {CONFIG['dataset']} ({CONFIG['subset_size']} problems)")

## 3. Core Implementation

Define the classes and functions needed for the experiment.

In [None]:
# =============================================================================
# IMPORTS
# =============================================================================

import numpy as np
import torch
import torch.nn.functional as F
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from datasets import load_dataset
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.auto import tqdm
import re
import random
from typing import List, Dict, Tuple, Optional, Any
from dataclasses import dataclass
from contextlib import contextmanager
import warnings
import os

warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
random.seed(CONFIG['seed'])
np.random.seed(CONFIG['seed'])
torch.manual_seed(CONFIG['seed'])

print("All imports successful!")

In [None]:
# =============================================================================
# DATA STRUCTURES
# =============================================================================

@dataclass
class MathProblem:
    """Container for a math problem from GSM8K."""
    question: str
    answer: str
    final_answer: str
    idx: int

@dataclass
class IntentionMetrics:
    """Container for intention metrics for a single example."""
    entropy: float
    dim_eff: int
    recoverability: Optional[float] = None

@dataclass
class ExperimentResult:
    """Results from a single problem evaluation."""
    problem_idx: int
    condition: str
    question: str
    ground_truth: str
    model_output: str
    extracted_answer: str
    is_correct: bool
    metrics: IntentionMetrics
    activations: Optional[np.ndarray] = None

In [None]:
# =============================================================================
# ACTIVATION EXTRACTION
# =============================================================================

class ActivationExtractor:
    """
    Extract hidden state activations from specified transformer layers.
    
    Reference: Section 2.1 "Intention State in Practice"
        I_t = (h_t, KV_t, R_t, c_t) ∈ I
    """
    
    def __init__(self, model, layers: List[int]):
        self.model = model
        self.layers = sorted(layers)
        self._hooks = []
        self._activations = {l: [] for l in layers}
        self._is_capturing = False
    
    def _get_layer_module(self, layer_idx: int):
        """Get the module for a specific layer."""
        return self.model.model.layers[layer_idx]
    
    def _create_hook(self, layer_idx: int):
        """Create a hook function for a specific layer."""
        def hook(module, input, output):
            if not self._is_capturing:
                return
            hidden_states = output[0] if isinstance(output, tuple) else output
            self._activations[layer_idx].append(hidden_states.detach().cpu())
        return hook
    
    def _register_hooks(self):
        """Register forward hooks on specified layers."""
        for layer_idx in self.layers:
            layer_module = self._get_layer_module(layer_idx)
            hook = layer_module.register_forward_hook(self._create_hook(layer_idx))
            self._hooks.append(hook)
    
    def _remove_hooks(self):
        """Remove all registered hooks."""
        for hook in self._hooks:
            hook.remove()
        self._hooks = []
    
    def clear(self):
        """Clear stored activations."""
        self._activations = {l: [] for l in self.layers}
    
    @contextmanager
    def capture(self):
        """Context manager for capturing activations."""
        self.clear()
        self._register_hooks()
        self._is_capturing = True
        try:
            yield self
        finally:
            self._is_capturing = False
            self._remove_hooks()
    
    def get_activations(self, aggregate: str = "last") -> np.ndarray:
        """
        Get extracted activations.
        
        Args:
            aggregate: "last" for last token, "mean" for mean across positions
            
        Returns:
            Array of shape (n_layers, hidden_dim)
        """
        all_activations = []
        for l in self.layers:
            if not self._activations[l]:
                raise ValueError(f"No activations captured for layer {l}")
            layer_acts = torch.cat(self._activations[l], dim=0)
            if aggregate == "last":
                layer_acts = layer_acts[:, -1, :]  # Last position
            elif aggregate == "mean":
                layer_acts = layer_acts.mean(dim=1)
            all_activations.append(layer_acts.numpy())
        return np.stack(all_activations, axis=0)

In [None]:
# =============================================================================
# INTENTION METRICS IMPLEMENTATION
# =============================================================================

def compute_intention_entropy(logits: torch.Tensor, top_k: int = 100) -> float:
    """
    Compute intention entropy H_int(I) from logits.
    
    Reference: Section 2.2
        H_int(I) ≜ H[p_θ(y_1 | I, x)]
        "Lower entropy indicates a more decided intention."
    """
    if logits.dim() == 2:
        logits = logits[-1]
    
    if top_k > 0 and top_k < logits.size(-1):
        top_logits, _ = torch.topk(logits, top_k)
        probs = F.softmax(top_logits, dim=-1)
    else:
        probs = F.softmax(logits, dim=-1)
    
    eps = 1e-10
    log_probs = torch.log2(probs + eps)
    entropy = -torch.sum(probs * log_probs).item()
    
    return entropy


def compute_effective_dimensionality(
    activations: np.ndarray,
    variance_threshold: float = 0.90
) -> Tuple[int, np.ndarray]:
    """
    Compute effective intention dimensionality dim_eff(I) using PCA.
    
    Reference: Section 2.2
        "The effective dimensionality is the smallest k such that
         Σᵢ₌₁ᵏ λᵢ / Σⱼ λⱼ ≥ 0.9"
    """
    if activations.ndim == 3:
        n_layers, n_samples, hidden_dim = activations.shape
        activations = activations.reshape(n_layers * n_samples, hidden_dim)
    elif activations.ndim == 2:
        pass  # Already (n_samples, hidden_dim)
    else:
        activations = activations.flatten().reshape(1, -1)
    
    n_samples, n_features = activations.shape
    n_components = min(n_samples, n_features)
    
    if n_components < 2:
        return 1, np.array([1.0])
    
    pca = PCA(n_components=n_components)
    pca.fit(activations)
    
    cumulative_variance = np.cumsum(pca.explained_variance_ratio_)
    dim_eff = np.searchsorted(cumulative_variance, variance_threshold) + 1
    dim_eff = min(dim_eff, n_components)
    
    return dim_eff, pca.explained_variance_ratio_


def train_recoverability_probe(
    activations: np.ndarray,
    labels: np.ndarray,
    cv_folds: int = 5
) -> Tuple[float, float]:
    """
    Train a linear probe to measure latent recoverability Recov(I; Z).
    
    Reference: Section 2.2
        "Train linear or shallow probes on frozen I to predict downstream
         variables Z that are known to the model but not necessarily verbalized."
    
    Returns:
        Tuple of (mean_cv_accuracy, std_cv_accuracy)
    """
    if activations.ndim == 3:
        n_layers, n_samples, hidden_dim = activations.shape
        activations = activations.transpose(1, 0, 2).reshape(n_samples, -1)
    
    labels = np.asarray(labels).astype(int)
    
    # Check if we have enough samples and class balance
    if len(np.unique(labels)) < 2:
        return 0.5, 0.0  # Can't train with single class
    
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', LogisticRegression(
            C=1.0/CONFIG['probe_regularization'],
            max_iter=1000,
            random_state=CONFIG['seed']
        ))
    ])
    
    try:
        cv_scores = cross_val_score(pipeline, activations, labels, cv=cv_folds)
        return cv_scores.mean(), cv_scores.std()
    except Exception as e:
        print(f"Warning: Probe training failed: {e}")
        return 0.5, 0.0

In [None]:
# =============================================================================
# DATA UTILITIES
# =============================================================================

def load_gsm8k_problems(subset_size: int, seed: int) -> List[MathProblem]:
    """Load GSM8K dataset."""
    dataset = load_dataset("gsm8k", "main", split="test")
    
    if subset_size < len(dataset):
        random.seed(seed)
        indices = random.sample(range(len(dataset)), subset_size)
        dataset = dataset.select(indices)
    
    problems = []
    for idx, item in enumerate(dataset):
        solution = item['answer']
        # Extract final answer (GSM8K format: "#### <answer>")
        match = re.search(r'####\s*(.+?)$', solution, re.MULTILINE)
        final_answer = match.group(1).strip().replace(',', '') if match else ""
        
        problems.append(MathProblem(
            question=item['question'],
            answer=solution,
            final_answer=final_answer,
            idx=idx
        ))
    
    return problems


def extract_answer(model_output: str) -> str:
    """Extract numerical answer from model output."""
    output = model_output.strip()
    
    # Try various patterns
    patterns = [
        r'####\s*(-?\d+\.?\d*)',
        r'[Aa]nswer[:\s]+(-?\d+\.?\d*)',
        r'[Tt]he answer is[:\s]+(-?\d+\.?\d*)',
        r'=\s*(-?\d+\.?\d*)\s*$',
    ]
    
    for pattern in patterns:
        match = re.search(pattern, output)
        if match:
            return match.group(1).replace(',', '')
    
    # Fallback: last number in output
    numbers = re.findall(r'-?\d+\.?\d*', output)
    if numbers:
        return numbers[-1].replace(',', '')
    
    return ""


def evaluate_answer(predicted: str, ground_truth: str) -> bool:
    """Check if predicted answer matches ground truth."""
    pred_clean = predicted.strip().replace(',', '').replace('$', '')
    truth_clean = ground_truth.strip().replace(',', '').replace('$', '')
    
    if pred_clean == truth_clean:
        return True
    
    try:
        pred_num = float(pred_clean)
        truth_num = float(truth_clean)
        return abs(pred_num - truth_num) < 1e-6
    except ValueError:
        return False

## 4. Load Model and Data

Load the language model with 4-bit quantization and the GSM8K dataset.

In [None]:
# =============================================================================
# LOAD MODEL
# =============================================================================

print(f"Loading model: {CONFIG['model_name']}")
print("This may take a few minutes...")

# Configure 4-bit quantization
if CONFIG['quantization'] == '4bit':
    quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.float16,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4"
    )
elif CONFIG['quantization'] == '8bit':
    quantization_config = BitsAndBytesConfig(load_in_8bit=True)
else:
    quantization_config = None

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(
    CONFIG['model_name'],
    token=HF_TOKEN
)
tokenizer.pad_token = tokenizer.eos_token

# Load model
model = AutoModelForCausalLM.from_pretrained(
    CONFIG['model_name'],
    quantization_config=quantization_config,
    device_map="auto",
    token=HF_TOKEN,
    torch_dtype=torch.float16
)
model.eval()

print(f"\n✓ Model loaded successfully!")
print(f"  - Parameters: {sum(p.numel() for p in model.parameters()):,}")
print(f"  - Layers: {model.config.num_hidden_layers}")
print(f"  - Hidden size: {model.config.hidden_size}")

In [None]:
# =============================================================================
# LOAD DATASET
# =============================================================================

print(f"Loading GSM8K dataset ({CONFIG['subset_size']} problems)...")

problems = load_gsm8k_problems(
    subset_size=CONFIG['subset_size'],
    seed=CONFIG['seed']
)

print(f"\n✓ Loaded {len(problems)} problems")
print(f"\nExample problem:")
print(f"  Question: {problems[0].question[:200]}...")
print(f"  Answer: {problems[0].final_answer}")

## 5. Run Experiment

Evaluate the model under both baseline (zero-shot) and enhanced (chain-of-thought) conditions, extracting intention metrics throughout.

In [None]:
# =============================================================================
# EXPERIMENT RUNNER
# =============================================================================

def run_single_problem(
    model,
    tokenizer,
    extractor: ActivationExtractor,
    problem: MathProblem,
    condition: str,
    prompt_template: str,
    max_new_tokens: int
) -> ExperimentResult:
    """
    Run model on a single problem and extract metrics.
    """
    # Format prompt
    prompt = prompt_template.format(question=problem.question)
    
    # Tokenize
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    
    # Generate with activation capture
    with torch.no_grad():
        with extractor.capture():
            outputs = model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                temperature=CONFIG['temperature'] if CONFIG['temperature'] > 0 else None,
                do_sample=CONFIG['temperature'] > 0,
                pad_token_id=tokenizer.eos_token_id,
                return_dict_in_generate=True,
                output_scores=True
            )
    
    # Decode output
    generated_ids = outputs.sequences[0][inputs['input_ids'].shape[1]:]
    model_output = tokenizer.decode(generated_ids, skip_special_tokens=True)
    
    # Extract activations
    try:
        activations = extractor.get_activations(aggregate="last")
    except Exception as e:
        print(f"Warning: Failed to get activations: {e}")
        activations = None
    
    # Compute entropy from the first generated token's logits
    if outputs.scores:
        first_logits = outputs.scores[0][0]  # First token logits
        entropy = compute_intention_entropy(first_logits, top_k=CONFIG['entropy_top_k'])
    else:
        entropy = 0.0
    
    # Compute dim_eff (will be done batch-wise later for proper PCA)
    dim_eff = 0  # Placeholder, computed after collecting all activations
    
    # Evaluate answer
    extracted_answer = extract_answer(model_output)
    is_correct = evaluate_answer(extracted_answer, problem.final_answer)
    
    return ExperimentResult(
        problem_idx=problem.idx,
        condition=condition,
        question=problem.question,
        ground_truth=problem.final_answer,
        model_output=model_output,
        extracted_answer=extracted_answer,
        is_correct=is_correct,
        metrics=IntentionMetrics(entropy=entropy, dim_eff=dim_eff),
        activations=activations
    )

In [None]:
# =============================================================================
# RUN MAIN EXPERIMENT
# =============================================================================

# Initialize extractor
extractor = ActivationExtractor(model, CONFIG['extraction_layers'])

# Storage for results
results = {'baseline': [], 'enhanced': []}
all_activations = {'baseline': [], 'enhanced': []}

print("="*60)
print("RUNNING EXPERIMENT 4.1: Intention Metrics vs. Accuracy")
print("="*60)

for condition in ['baseline', 'enhanced']:
    print(f"\n--- Condition: {condition.upper()} ---")
    
    prompt_template = PROMPTS[condition]
    max_tokens = (
        CONFIG['max_new_tokens_baseline'] 
        if condition == 'baseline' 
        else CONFIG['max_new_tokens_cot']
    )
    
    for problem in tqdm(problems, desc=f"Running {condition}"):
        result = run_single_problem(
            model, tokenizer, extractor,
            problem, condition,
            prompt_template, max_tokens
        )
        results[condition].append(result)
        
        if result.activations is not None:
            all_activations[condition].append(result.activations)
        
        # Clear CUDA cache periodically
        if problem.idx % 20 == 0:
            torch.cuda.empty_cache()
    
    # Compute accuracy
    accuracy = sum(r.is_correct for r in results[condition]) / len(results[condition])
    print(f"  Accuracy: {accuracy:.1%}")

print("\n" + "="*60)
print("EXPERIMENT COMPLETE")
print("="*60)

In [None]:
# =============================================================================
# COMPUTE DIM_EFF (requires all activations)
# =============================================================================

print("\nComputing effective dimensionality across all examples...")

for condition in ['baseline', 'enhanced']:
    if all_activations[condition]:
        # Stack all activations: (n_examples, n_layers, hidden_dim)
        stacked = np.stack(all_activations[condition], axis=0)
        # Reshape to (n_examples * n_layers, hidden_dim) for PCA
        n_examples, n_layers, hidden_dim = stacked.shape
        flattened = stacked.reshape(n_examples * n_layers, hidden_dim)
        
        global_dim_eff, explained_var = compute_effective_dimensionality(
            flattened,
            variance_threshold=CONFIG['variance_threshold']
        )
        
        # Update individual results with per-example dim_eff
        for i, result in enumerate(results[condition]):
            if result.activations is not None:
                per_example_acts = result.activations.reshape(
                    result.activations.shape[0], -1
                )
                dim_eff, _ = compute_effective_dimensionality(
                    per_example_acts,
                    variance_threshold=CONFIG['variance_threshold']
                )
                result.metrics.dim_eff = dim_eff
        
        print(f"  {condition}: Global dim_eff = {global_dim_eff}")

In [None]:
# =============================================================================
# TRAIN RECOVERABILITY PROBES
# =============================================================================

print("\nTraining linear probes for recoverability...")

probe_results = {}

for condition in ['baseline', 'enhanced']:
    if all_activations[condition]:
        # Prepare data
        stacked = np.stack(all_activations[condition], axis=0)
        n_examples, n_layers, hidden_dim = stacked.shape
        X = stacked.reshape(n_examples, -1)  # Flatten layers
        y = np.array([r.is_correct for r in results[condition]])
        
        # Train probe
        mean_acc, std_acc = train_recoverability_probe(X, y, cv_folds=5)
        
        probe_results[condition] = {
            'mean': mean_acc,
            'std': std_acc,
            'verbalized_accuracy': y.mean()
        }
        
        print(f"  {condition}:")
        print(f"    Probe accuracy (Recov): {mean_acc:.3f} ± {std_acc:.3f}")
        print(f"    Verbalized accuracy:    {y.mean():.3f}")
        print(f"    Recoverability gap:     {mean_acc - y.mean():.3f}")

## 6. Analyze Results

Compute statistics and correlations between intention metrics and performance.

In [None]:
# =============================================================================
# COMPILE RESULTS
# =============================================================================

# Extract metrics into arrays
metrics_data = {}

for condition in ['baseline', 'enhanced']:
    metrics_data[condition] = {
        'entropy': [r.metrics.entropy for r in results[condition]],
        'dim_eff': [r.metrics.dim_eff for r in results[condition]],
        'correct': [r.is_correct for r in results[condition]],
        'output_length': [len(r.model_output.split()) for r in results[condition]]
    }

# Print summary statistics
print("\n" + "="*60)
print("SUMMARY STATISTICS")
print("="*60)

for condition in ['baseline', 'enhanced']:
    print(f"\n{condition.upper()}:")
    data = metrics_data[condition]
    print(f"  Accuracy:       {np.mean(data['correct']):.1%}")
    print(f"  Entropy:        {np.mean(data['entropy']):.2f} ± {np.std(data['entropy']):.2f}")
    print(f"  Dim_eff:        {np.mean(data['dim_eff']):.1f} ± {np.std(data['dim_eff']):.1f}")
    print(f"  Output length:  {np.mean(data['output_length']):.1f} ± {np.std(data['output_length']):.1f} words")

In [None]:
# =============================================================================
# COMPUTE CORRELATIONS
# =============================================================================

print("\n" + "="*60)
print("CORRELATION ANALYSIS")
print("="*60)

correlation_results = {}

for condition in ['baseline', 'enhanced']:
    print(f"\n{condition.upper()}:")
    data = metrics_data[condition]
    correct = np.array(data['correct']).astype(int)
    
    # Correlation: Entropy vs Correctness
    r_entropy, p_entropy = stats.pointbiserialr(correct, data['entropy'])
    print(f"  Entropy vs Correct:     r={r_entropy:.3f}, p={p_entropy:.4f}")
    
    # Correlation: Dim_eff vs Correctness  
    r_dim, p_dim = stats.pointbiserialr(correct, data['dim_eff'])
    print(f"  Dim_eff vs Correct:     r={r_dim:.3f}, p={p_dim:.4f}")
    
    # Correlation: Output length vs Correctness
    r_len, p_len = stats.pointbiserialr(correct, data['output_length'])
    print(f"  Output len vs Correct:  r={r_len:.3f}, p={p_len:.4f}")
    
    correlation_results[condition] = {
        'entropy_correct': (r_entropy, p_entropy),
        'dim_correct': (r_dim, p_dim),
        'length_correct': (r_len, p_len)
    }

In [None]:
# =============================================================================
# KEY FINDINGS
# =============================================================================

print("\n" + "="*60)
print("KEY FINDINGS")
print("="*60)

# Hypothesis 1: Enhanced condition should have higher dim_eff
base_dim = np.mean(metrics_data['baseline']['dim_eff'])
enh_dim = np.mean(metrics_data['enhanced']['dim_eff'])
print(f"\n1. Dim_eff comparison:")
print(f"   Baseline: {base_dim:.1f}, Enhanced: {enh_dim:.1f}")
print(f"   Change: {enh_dim - base_dim:+.1f} ({(enh_dim/base_dim - 1)*100:+.1f}%)")
print(f"   Hypothesis (dim_eff increases with CoT): {'SUPPORTED' if enh_dim > base_dim else 'NOT SUPPORTED'}")

# Hypothesis 2: Lower entropy should correlate with correctness
print(f"\n2. Entropy-correctness relationship:")
for condition in ['baseline', 'enhanced']:
    r, p = correlation_results[condition]['entropy_correct']
    direction = 'lower entropy → more correct' if r < 0 else 'higher entropy → more correct'
    sig = 'significant' if p < 0.05 else 'not significant'
    print(f"   {condition}: r={r:.3f} ({direction}, {sig})")

# Hypothesis 3: Probe accuracy should exceed verbalized accuracy
print(f"\n3. Recoverability (information in I vs verbalized output):")
for condition in ['baseline', 'enhanced']:
    if condition in probe_results:
        probe_acc = probe_results[condition]['mean']
        verb_acc = probe_results[condition]['verbalized_accuracy']
        gap = probe_acc - verb_acc
        print(f"   {condition}: Probe={probe_acc:.3f}, Verbalized={verb_acc:.3f}, Gap={gap:+.3f}")

# Accuracy improvement
base_acc = np.mean(metrics_data['baseline']['correct'])
enh_acc = np.mean(metrics_data['enhanced']['correct'])
print(f"\n4. Overall accuracy:")
print(f"   Baseline: {base_acc:.1%}")
print(f"   Enhanced (CoT): {enh_acc:.1%}")
print(f"   Improvement: {(enh_acc - base_acc)*100:+.1f} percentage points")

## 7. Visualizations

Create publication-quality figures for the paper.

In [None]:
# =============================================================================
# VISUALIZATION SETUP
# =============================================================================

# Publication-quality settings
plt.rcParams.update({
    'font.family': 'serif',
    'font.size': 10,
    'axes.labelsize': 11,
    'axes.titlesize': 12,
    'xtick.labelsize': 9,
    'ytick.labelsize': 9,
    'legend.fontsize': 9,
    'figure.titlesize': 13,
})

sns.set_style("whitegrid")

# Color scheme
COLORS = {
    'baseline': '#1f77b4',  # Blue
    'enhanced': '#ff7f0e',  # Orange
    'correct': '#2ca02c',   # Green
    'incorrect': '#d62728', # Red
}

# Create output directory
os.makedirs(CONFIG['output_dir'], exist_ok=True)

In [None]:
# =============================================================================
# FIGURE 1: METRICS COMPARISON
# =============================================================================

fig, axes = plt.subplots(1, 3, figsize=(12, 4))

# (A) Accuracy comparison
ax = axes[0]
conditions = ['Baseline', 'CoT']
accuracies = [base_acc, enh_acc]
colors = [COLORS['baseline'], COLORS['enhanced']]
bars = ax.bar(conditions, accuracies, color=colors, alpha=0.8)
ax.set_ylabel('Accuracy')
ax.set_title('(A) Task Accuracy')
ax.set_ylim(0, 1)
for bar, acc in zip(bars, accuracies):
    ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.02,
           f'{acc:.1%}', ha='center', fontsize=10)

# (B) Entropy comparison
ax = axes[1]
data_to_plot = [
    metrics_data['baseline']['entropy'],
    metrics_data['enhanced']['entropy']
]
bp = ax.boxplot(data_to_plot, patch_artist=True)
bp['boxes'][0].set_facecolor(COLORS['baseline'])
bp['boxes'][1].set_facecolor(COLORS['enhanced'])
for box in bp['boxes']:
    box.set_alpha(0.7)
ax.set_xticklabels(['Baseline', 'CoT'])
ax.set_ylabel('Intention Entropy (bits)')
ax.set_title('(B) Intention Entropy $H_{int}(I)$')

# (C) Dim_eff comparison
ax = axes[2]
data_to_plot = [
    metrics_data['baseline']['dim_eff'],
    metrics_data['enhanced']['dim_eff']
]
bp = ax.boxplot(data_to_plot, patch_artist=True)
bp['boxes'][0].set_facecolor(COLORS['baseline'])
bp['boxes'][1].set_facecolor(COLORS['enhanced'])
for box in bp['boxes']:
    box.set_alpha(0.7)
ax.set_xticklabels(['Baseline', 'CoT'])
ax.set_ylabel('Effective Dimensionality')
ax.set_title('(C) Intention Richness $dim_{eff}(I)$')

plt.tight_layout()

if CONFIG['save_figures']:
    plt.savefig(f"{CONFIG['output_dir']}/fig1_metrics_comparison.pdf", 
                bbox_inches='tight', dpi=300)
    plt.savefig(f"{CONFIG['output_dir']}/fig1_metrics_comparison.png", 
                bbox_inches='tight', dpi=300)
plt.show()

In [None]:
# =============================================================================
# FIGURE 2: ENTROPY VS CORRECTNESS
# =============================================================================

fig, axes = plt.subplots(1, 2, figsize=(10, 4))

for idx, condition in enumerate(['baseline', 'enhanced']):
    ax = axes[idx]
    
    entropy_vals = metrics_data[condition]['entropy']
    correct_vals = metrics_data[condition]['correct']
    
    # Separate by correctness
    entropy_correct = [e for e, c in zip(entropy_vals, correct_vals) if c]
    entropy_incorrect = [e for e, c in zip(entropy_vals, correct_vals) if not c]
    
    # Create violin plot
    parts = ax.violinplot([entropy_incorrect, entropy_correct], positions=[0, 1], showmeans=True)
    parts['bodies'][0].set_facecolor(COLORS['incorrect'])
    parts['bodies'][1].set_facecolor(COLORS['correct'])
    for pc in parts['bodies']:
        pc.set_alpha(0.7)
    
    ax.set_xticks([0, 1])
    ax.set_xticklabels(['Incorrect', 'Correct'])
    ax.set_ylabel('Intention Entropy (bits)')
    ax.set_title(f'{condition.title()}')
    
    # Add statistics
    r, p = correlation_results[condition]['entropy_correct']
    ax.text(0.95, 0.95, f'r = {r:.3f}\np = {p:.3f}',
           transform=ax.transAxes, ha='right', va='top',
           fontsize=9, bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))

plt.suptitle('Entropy Distribution by Answer Correctness', fontsize=12)
plt.tight_layout()

if CONFIG['save_figures']:
    plt.savefig(f"{CONFIG['output_dir']}/fig2_entropy_correctness.pdf", 
                bbox_inches='tight', dpi=300)
    plt.savefig(f"{CONFIG['output_dir']}/fig2_entropy_correctness.png", 
                bbox_inches='tight', dpi=300)
plt.show()

In [None]:
# =============================================================================
# FIGURE 3: RECOVERABILITY GAP
# =============================================================================

fig, ax = plt.subplots(figsize=(8, 5))

x = np.arange(2)
width = 0.35

# Get probe and verbalized accuracies
probe_accs = [probe_results[c]['mean'] for c in ['baseline', 'enhanced']]
verb_accs = [probe_results[c]['verbalized_accuracy'] for c in ['baseline', 'enhanced']]
probe_stds = [probe_results[c]['std'] for c in ['baseline', 'enhanced']]

bars1 = ax.bar(x - width/2, probe_accs, width, label='Probe on I (Recoverability)',
               color=COLORS['enhanced'], alpha=0.8, yerr=probe_stds, capsize=5)
bars2 = ax.bar(x + width/2, verb_accs, width, label='Verbalized Output',
               color=COLORS['baseline'], alpha=0.8)

ax.set_ylabel('Accuracy')
ax.set_xticks(x)
ax.set_xticklabels(['Baseline', 'CoT'])
ax.set_ylim(0, 1)
ax.legend(loc='upper left')
ax.set_title('Information Recovery: Probe vs. Verbalized Output')

# Add gap annotations
for i, (p, v) in enumerate(zip(probe_accs, verb_accs)):
    gap = p - v
    mid_y = (p + v) / 2
    ax.annotate(f'Gap: {gap:+.1%}', xy=(i, mid_y), fontsize=9,
               ha='center', va='center',
               bbox=dict(boxstyle='round', facecolor='yellow', alpha=0.5))

plt.tight_layout()

if CONFIG['save_figures']:
    plt.savefig(f"{CONFIG['output_dir']}/fig3_recoverability_gap.pdf", 
                bbox_inches='tight', dpi=300)
    plt.savefig(f"{CONFIG['output_dir']}/fig3_recoverability_gap.png", 
                bbox_inches='tight', dpi=300)
plt.show()

In [None]:
# =============================================================================
# FIGURE 4: CORRELATION MATRIX
# =============================================================================

fig, axes = plt.subplots(1, 2, figsize=(12, 5))

for idx, condition in enumerate(['baseline', 'enhanced']):
    ax = axes[idx]
    
    # Build correlation matrix
    data = metrics_data[condition]
    variables = ['entropy', 'dim_eff', 'output_length', 'correct']
    n = len(variables)
    corr_matrix = np.zeros((n, n))
    
    for i, var_i in enumerate(variables):
        for j, var_j in enumerate(variables):
            vals_i = np.array(data[var_i]).astype(float)
            vals_j = np.array(data[var_j]).astype(float)
            corr_matrix[i, j] = np.corrcoef(vals_i, vals_j)[0, 1]
    
    # Plot heatmap
    im = ax.imshow(corr_matrix, cmap='RdBu_r', vmin=-1, vmax=1)
    
    # Add text annotations
    for i in range(n):
        for j in range(n):
            val = corr_matrix[i, j]
            color = 'white' if abs(val) > 0.5 else 'black'
            ax.text(j, i, f'{val:.2f}', ha='center', va='center',
                   color=color, fontsize=10)
    
    ax.set_xticks(range(n))
    ax.set_yticks(range(n))
    labels = ['Entropy', 'Dim_eff', 'Length', 'Correct']
    ax.set_xticklabels(labels, rotation=45, ha='right')
    ax.set_yticklabels(labels)
    ax.set_title(f'{condition.title()}')

# Add colorbar
cbar = fig.colorbar(im, ax=axes, shrink=0.6)
cbar.set_label('Correlation')

plt.suptitle('Correlation Matrix: Intention Metrics vs. Performance', fontsize=12)
plt.tight_layout()

if CONFIG['save_figures']:
    plt.savefig(f"{CONFIG['output_dir']}/fig4_correlation_matrix.pdf", 
                bbox_inches='tight', dpi=300)
    plt.savefig(f"{CONFIG['output_dir']}/fig4_correlation_matrix.png", 
                bbox_inches='tight', dpi=300)
plt.show()

## 8. Export Results

Save all results for later analysis and paper inclusion.

In [None]:
# =============================================================================
# EXPORT RESULTS TABLE
# =============================================================================

import pandas as pd

# Create summary table
summary_table = pd.DataFrame({
    'Metric': ['Accuracy', 'Entropy (mean)', 'Entropy (std)', 
               'Dim_eff (mean)', 'Dim_eff (std)',
               'Probe Accuracy', 'Recoverability Gap'],
    'Baseline': [
        f"{np.mean(metrics_data['baseline']['correct']):.1%}",
        f"{np.mean(metrics_data['baseline']['entropy']):.2f}",
        f"{np.std(metrics_data['baseline']['entropy']):.2f}",
        f"{np.mean(metrics_data['baseline']['dim_eff']):.1f}",
        f"{np.std(metrics_data['baseline']['dim_eff']):.1f}",
        f"{probe_results['baseline']['mean']:.3f} ± {probe_results['baseline']['std']:.3f}",
        f"{probe_results['baseline']['mean'] - probe_results['baseline']['verbalized_accuracy']:+.3f}"
    ],
    'CoT': [
        f"{np.mean(metrics_data['enhanced']['correct']):.1%}",
        f"{np.mean(metrics_data['enhanced']['entropy']):.2f}",
        f"{np.std(metrics_data['enhanced']['entropy']):.2f}",
        f"{np.mean(metrics_data['enhanced']['dim_eff']):.1f}",
        f"{np.std(metrics_data['enhanced']['dim_eff']):.1f}",
        f"{probe_results['enhanced']['mean']:.3f} ± {probe_results['enhanced']['std']:.3f}",
        f"{probe_results['enhanced']['mean'] - probe_results['enhanced']['verbalized_accuracy']:+.3f}"
    ]
})

print("\n" + "="*60)
print("RESULTS TABLE (for paper)")
print("="*60)
print(summary_table.to_string(index=False))

# Save to CSV
summary_table.to_csv(f"{CONFIG['output_dir']}/results_summary.csv", index=False)
print(f"\n✓ Results saved to {CONFIG['output_dir']}/results_summary.csv")

In [None]:
# =============================================================================
# EXPORT LATEX TABLE
# =============================================================================

latex_table = r"""
\begin{table}[h]
\centering
\caption{Experiment 4.1 Results: Intention Metrics vs. Reasoning Accuracy}
\label{tab:exp41_results}
\begin{tabular}{lcc}
\hline
\textbf{Metric} & \textbf{Baseline} & \textbf{CoT} \\
\hline
""" + f"""
Accuracy & {np.mean(metrics_data['baseline']['correct']):.1%} & {np.mean(metrics_data['enhanced']['correct']):.1%} \\\\
$H_{{int}}(I)$ (mean $\pm$ std) & {np.mean(metrics_data['baseline']['entropy']):.2f} $\pm$ {np.std(metrics_data['baseline']['entropy']):.2f} & {np.mean(metrics_data['enhanced']['entropy']):.2f} $\pm$ {np.std(metrics_data['enhanced']['entropy']):.2f} \\\\
$dim_{{eff}}(I)$ (mean $\pm$ std) & {np.mean(metrics_data['baseline']['dim_eff']):.1f} $\pm$ {np.std(metrics_data['baseline']['dim_eff']):.1f} & {np.mean(metrics_data['enhanced']['dim_eff']):.1f} $\pm$ {np.std(metrics_data['enhanced']['dim_eff']):.1f} \\\\
Probe Accuracy & {probe_results['baseline']['mean']:.3f} $\pm$ {probe_results['baseline']['std']:.3f} & {probe_results['enhanced']['mean']:.3f} $\pm$ {probe_results['enhanced']['std']:.3f} \\\\
Recoverability Gap & {probe_results['baseline']['mean'] - probe_results['baseline']['verbalized_accuracy']:+.3f} & {probe_results['enhanced']['mean'] - probe_results['enhanced']['verbalized_accuracy']:+.3f} \\\\
""" + r"""
\hline
\end{tabular}
\end{table}
"""

print("\nLaTeX Table:")
print(latex_table)

# Save to file
with open(f"{CONFIG['output_dir']}/results_table.tex", 'w') as f:
    f.write(latex_table)
print(f"✓ LaTeX table saved to {CONFIG['output_dir']}/results_table.tex")

In [None]:
# =============================================================================
# SAVE FULL RESULTS
# =============================================================================

import json

full_results = {
    'config': CONFIG,
    'metrics_data': {
        condition: {
            k: [float(v) if isinstance(v, (np.floating, np.integer)) else v for v in vals]
            for k, vals in data.items()
        }
        for condition, data in metrics_data.items()
    },
    'correlation_results': {
        condition: {
            k: {'r': float(v[0]), 'p': float(v[1])}
            for k, v in corrs.items()
        }
        for condition, corrs in correlation_results.items()
    },
    'probe_results': probe_results
}

with open(f"{CONFIG['output_dir']}/full_results.json", 'w') as f:
    json.dump(full_results, f, indent=2)

print(f"\n✓ Full results saved to {CONFIG['output_dir']}/full_results.json")

## 9. Conclusion

This notebook implemented Experiment 4.1 from the Intention Collapse framework paper, testing whether intention metrics correlate with reasoning accuracy in LLMs.

### Key Findings:

1. **Effective Dimensionality**: Chain-of-thought reasoning [increases/decreases] the effective dimensionality of internal representations

2. **Intention Entropy**: Lower entropy [correlates/does not correlate] with correct answers

3. **Recoverability**: Linear probes on pre-collapse activations achieve [higher/lower] accuracy than verbalized outputs, suggesting information is [lost/preserved] during collapse

### Files Generated:
- `results/fig1_metrics_comparison.pdf` - Main comparison figure
- `results/fig2_entropy_correctness.pdf` - Entropy vs correctness
- `results/fig3_recoverability_gap.pdf` - Probe vs verbalized accuracy
- `results/fig4_correlation_matrix.pdf` - Full correlation matrix
- `results/results_summary.csv` - Summary statistics
- `results/results_table.tex` - LaTeX table for paper
- `results/full_results.json` - Complete results data

In [None]:
# List all generated files
import os
print("\nGenerated files:")
for f in sorted(os.listdir(CONFIG['output_dir'])):
    filepath = os.path.join(CONFIG['output_dir'], f)
    size = os.path.getsize(filepath)
    print(f"  {f} ({size/1024:.1f} KB)")