# Figure 4 Reproduction: Spectral Mechanisms of Recovery

This notebook reproduces the spectral analysis presented in Figure 4 of the paper. It compares different intervention strategies (Structural Steering, Chain-of-Thought, Sparsity, Windowing) on a single natural linguistic sample to demonstrate the "Two Pathways to Recovery" hypothesis. It also compares against the **"Active"** version of the sentence as the ideal baseline.

**Sample:** "The book was written by the man."
**Active Target:** "The man wrote the book."

In [None]:
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from transformers import AutoModelForCausalLM, AutoTokenizer

MODEL_NAME = "microsoft/Phi-3-mini-4k-instruct"
SAMPLE_TEXT = "The book was written by the man."
ACTIVE_TEXT = "The man wrote the book."
STEERING_LAYER_IDX = 2
ALPHA = 0.2
TOP_K = 2

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

In [None]:
# Load Model
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME, 
    torch_dtype=torch.float16, 
    device_map="auto",
    attn_implementation="eager"
)

In [None]:
# Spectral Metric Functions
def get_spectral_metrics(adj_matrix):
    """Compute Fiedler, HFER, Smoothness, and Entropy for a single attention matrix."""
    N = adj_matrix.shape[0]
    if N < 2: return 0.0, 0.0, 0.0, 0.0
    
    # Laplacian
    W = 0.5 * (adj_matrix + adj_matrix.T)
    D = np.diag(np.sum(W, axis=1))
    L = D - W
    
    try:
        eigvals, eigvecs = np.linalg.eigh(L)
        idx = eigvals.argsort()
        eigvals = eigvals[idx]
        eigvecs = eigvecs[:, idx]
        
        # 1. Fiedler Value (Algebraic Connectivity)
        fiedler = eigvals[1] if N > 1 else 0.0
        
        # 2. Smoothness (of linear ramp signal)
        x = np.linspace(-1, 1, N)
        smoothness = (x.T @ L @ x) / (x.T @ x + 1e-9)
        
        # 3. HFER (High Frequency Energy Ratio)
        c = eigvecs.T @ x
        energy = c**2
        total_energy = np.sum(energy)
        k_cut = N // 2
        high_energy = np.sum(energy[k_cut:])
        hfer = high_energy / total_energy if total_energy > 0 else 0.0
        
    except:
        fiedler, hfer, smoothness = 0.0, 0.0, 0.0

    # 4. Entropy (Average Row Entropy)
    row_entropies = []
    for row in adj_matrix:
        r = row + 1e-12
        r = r / r.sum()
        e = -np.sum(r * np.log(r))
        row_entropies.append(e)
    entropy = np.mean(row_entropies)

    return fiedler, hfer, smoothness, entropy

In [None]:
# Calibration for Steering Vector
def get_mean_hidden(texts, layer_idx):
    inputs = tokenizer(texts, return_tensors="pt", padding=True, truncation=True).to(model.device)
    with torch.no_grad():
        outputs = model(**inputs, output_hidden_states=True)
    h_states = outputs.hidden_states[layer_idx + 1]
    mask = inputs.attention_mask.unsqueeze(-1)
    sum_h = (h_states * mask).sum(dim=1)
    count_h = mask.sum(dim=1)
    return (sum_h / count_h).mean(dim=0)

# Tech Vocab for Robust Steering
tech_pairs = [
    ("the system processed the data", "The data was processed by the system"),
    ("the layer encoded the input", "The input was encoded by the layer"),
    ("the network mapped the query", "The query was mapped by the network"),
]

mu_act = get_mean_hidden([p[0] for p in tech_pairs], STEERING_LAYER_IDX)
mu_pas = get_mean_hidden([p[1] for p in tech_pairs], STEERING_LAYER_IDX)
steering_vector = mu_act - mu_pas
print("Steering vector calibrated.")

In [None]:
def run_experiment(mode="baseline"):
    target_text = SAMPLE_TEXT
    start_idx, end_idx = 0, None
    
    if mode == "cot":
        prefix = "Question: Rewrite the following sentence in active voice.\nSentence: "
        suffix = "\nAnalysis: Let's identify the subject and object."
        text_input = prefix + target_text + suffix
        
        inputs = tokenizer(text_input, return_tensors="pt").to(model.device)
        # Find target tokens slice
        target_ids = tokenizer(target_text, add_special_tokens=False).input_ids
        full_ids = inputs.input_ids[0].tolist()
        
        for i in range(len(full_ids) - len(target_ids) + 1):
             if full_ids[i:i+len(target_ids)] == target_ids:
                 start_idx = i
                 break
        end_idx = start_idx + len(target_ids)
        
    elif mode == "active":
        text_input = ACTIVE_TEXT
        inputs = tokenizer(text_input, return_tensors="pt").to(model.device)
        end_idx = inputs.input_ids.shape[1]
        
    else:
        text_input = target_text
        inputs = tokenizer(text_input, return_tensors="pt").to(model.device)
        end_idx = inputs.input_ids.shape[1]

    # Hooks
    hooks = []
    if mode == "structural":
        def steer_hook(module, inp, out):
            if isinstance(out, tuple):
                h = out[0]
                v = steering_vector.view(1, 1, -1).to(h.dtype)
                return (h + ALPHA * v,) + out[1:]
            return out + ALPHA * steering_vector.view(1, 1, -1).to(out.dtype)
        h = model.model.layers[STEERING_LAYER_IDX].register_forward_hook(steer_hook)
        hooks.append(h)

    # Forward Pass
    with torch.no_grad():
        outputs = model(**inputs, output_attentions=True)
    for h in hooks: h.remove()

    # Analyze Layers
    layer_metrics = []
    for i, layer_attn in enumerate(outputs.attentions):
        # Mean over heads
        A_full = layer_attn[0].mean(dim=0).float().cpu().numpy()
        
        # Slice CoT
        limit = A_full.shape[0]
        s = start_idx if start_idx < limit else 0
        e = end_idx if end_idx and end_idx <= limit else limit
        A = A_full[s:e, s:e]
        
        # Re-normalize
        A = A / (A.sum(axis=1, keepdims=True) + 1e-12)
        
        # Apply Masks
        if mode == "window":
            mask = np.eye(A.shape[0], k=0) + np.eye(A.shape[0], k=1) + np.eye(A.shape[0], k=-1)
            A = A * np.tril(mask)
            A = A / (A.sum(axis=1, keepdims=True) + 1e-12)
        elif mode == "sparsity":
            for r in range(A.shape[0]):
                row = A[r]
                if len(row) > TOP_K:
                    idx = np.argpartition(row, -TOP_K)[-TOP_K:]
                    new_row = np.zeros_like(row)
                    new_row[idx] = row[idx]
                    A[r] = new_row
            A = A / (A.sum(axis=1, keepdims=True) + 1e-12)

        layer_metrics.append(get_spectral_metrics(A))
    
    return layer_metrics

In [None]:
# Run Experiments
results = {}
modes = ["baseline", "structural", "cot", "sparsity", "window", "active"]

for mode in modes:
    print(f"Running {mode}...")
    results[mode] = run_experiment(mode)

In [None]:
# Plotting
fig, axes = plt.subplots(1, 4, figsize=(20, 5))
metrics = ["Fiedler", "HFER", "Smoothness", "Entropy"]

styles = {
    "baseline": {"color": "blue", "label": "Baseline", "linewidth": 2, "marker": "o"},
    "structural": {"color": "orange", "label": "Structural", "linewidth": 2, "linestyle": "--"},
    "cot": {"color": "gold", "label": "CoT", "linewidth": 2, "marker": "s"},
    "sparsity": {"color": "grey", "label": "Sparsity", "linestyle": ":"},
    "window": {"color": "red", "label": "Window", "linestyle": "-."},
    "active": {"color": "green", "label": "Active", "linewidth": 2}
}

for i, metric in enumerate(metrics):
    ax = axes[i]
    for mode in modes:
        data = [x[i] for x in results[mode]]
        style = styles.get(mode, {})
        ax.plot(data, **style)
    ax.set_title(metric)
    ax.set_xlabel("Layer")
    if i == 0: ax.legend()

plt.tight_layout()
plt.show()