# GSM8K Steering Vector Analysis

Compute steering vectors by taking the difference between hint and baseline activations at readout positions, then test their effectiveness.

## Overview
1. **Compute steering vectors**: Average difference between hint and baseline activations at target locations
2. **Store vectors**: Save for later retrieval
3. **Measure performance**: Apply steering (with alpha scaling) and measure Δ (log P(correct) - max log P(wrong))
4. **Controls**: 
   - Random unit vector vs computed steering vector
   - Same vector applied at different locations

## Supported Locations
- Residual stream (before layer)
- MLP (entire layer output)
- Attention (entire layer output)
- Single o_proj head
- Single q head
- Q layer (entire Q projection output)

In [2]:
import os
import json
import torch
import numpy as np
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM
from pathlib import Path

os.environ["HF_HOME"] = "/workspace/.cache/huggingface"

SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)

## Configuration

In [3]:
# Model config
MODEL = "Qwen/Qwen3-0.6B"

# Load target problems (same as ablation analysis)
LOGPROBS_FILE = "hint_variants/top-70-of-200_variant-count-10_with_logprobs.jsonl"

# Steering vector storage directory
STEERING_VECTORS_DIR = Path("steering_vectors")
STEERING_VECTORS_DIR.mkdir(exist_ok=True)

## Load Model and Target Problems

In [9]:
# Load model
tok = AutoTokenizer.from_pretrained(MODEL)
tok.padding_side = "left"
if tok.pad_token_id is None:
    tok.pad_token = tok.eos_token

model = AutoModelForCausalLM.from_pretrained(
    MODEL,
    torch_dtype="auto",
    device_map="auto",
    attn_implementation="eager",
)
model.eval()

NUM_HEADS = model.config.num_attention_heads
HEAD_DIM = model.config.hidden_size // NUM_HEADS
NUM_LAYERS = model.config.num_hidden_layers

print(f"Model: {MODEL}")
print(f"Layers: {NUM_LAYERS}, Heads: {NUM_HEADS}, Head dim: {HEAD_DIM}")

Model: Qwen/Qwen3-0.6B
Layers: 28, Heads: 16, Head dim: 64


In [81]:
# Load target problems (same logic as ablation analysis)
rollouts = []
with open(LOGPROBS_FILE, "r") as f:
    for line in f:
        rollouts.append(json.loads(line))

# Group by problem
by_problem = {}
for r in rollouts:
    idx = r["problem_idx"]
    variant_idx = r.get("variant_idx", None)
    if (idx, variant_idx) not in by_problem:
        by_problem[(idx, variant_idx)] = {}
    by_problem[(idx, variant_idx)][r["mode"]] = r

# Find target problems: baseline prefers wrong, hint prefers correct
target_problems = []
for problem_idx, modes in by_problem.items():
    if "baseline" not in modes or "hint_correct_silent" not in modes:
        continue
    
    baseline = modes["baseline"]
    hint = modes["hint_correct_silent"]
    correct_answer = baseline["correct_answer"]
    
    # Filter: same digit count for answer ± 2
    answer_digits = len(str(correct_answer))
    offsets = [-2, -1, 1, 2]
    all_same_digits = all(
        len(str(correct_answer + offset)) == answer_digits and (correct_answer + offset) > 0
        for offset in offsets
    )
    if not all_same_digits:
        continue
    
    # Get before_think checkpoint
    baseline_cp = next((cp for cp in baseline["logprob_checkpoints"] if cp["checkpoint_type"] == "before_think"), None)
    hint_cp = next((cp for cp in hint["logprob_checkpoints"] if cp["checkpoint_type"] == "before_think"), None)
    
    if baseline_cp is None or hint_cp is None:
        continue
    
    baseline_probs = baseline_cp["cand_softmax"]
    hint_probs = hint_cp["cand_softmax"]
    
    baseline_nonhint = {k: v for k, v in baseline_probs.items() if k != "hint"}
    baseline_best = max(baseline_nonhint.items(), key=lambda x: x[1])[0]
    hint_best = max(hint_probs.items(), key=lambda x: x[1])[0]
    
    if baseline_best != "correct" and hint_best == "correct":
        target_problems.append({
            "problem_idx": problem_idx[0],
            "variant_idx": problem_idx[1],
            "question": baseline["question"],
            "correct_answer": correct_answer,
            "hint_value": hint["hint_value"],
            "baseline_prompt": baseline["prompt"],
            "hint_prompt": hint["prompt"],
        })

print(f"Found {len(target_problems)} target problems")

Found 633 target problems


## Helper Functions

In [82]:
def compute_readout_pos(prompt_text):
    """Compute readout_pos (last token position of prompt + '\nAnswer: ')"""
    answer_prefix = "\nAnswer: "
    full_prompt = prompt_text + answer_prefix
    prompt_ids = tok.encode(full_prompt, add_special_tokens=False)
    # Readout position is the last token (just before we'd generate the answer)
    return len(prompt_ids) - 1

In [83]:
# Compute answer logprobs function (from ablation analysis)
@torch.inference_mode()
def compute_answer_logprobs_from_tokens(context_token_ids, correct_answer, hint_value=None):
    answer_prefix_ids = tok.encode("\nAnswer: ", add_special_tokens=False)
    prefix_ids = context_token_ids + answer_prefix_ids
    prefix_len = len(prefix_ids)

    candidates = {"correct": correct_answer}
    for offset in [-2, -1, 1, 2]:
        val = correct_answer + offset
        if val > 0:
            candidates[f"wrong_{offset:+d}"] = val
    if hint_value is not None and hint_value != correct_answer:
        candidates["hint"] = hint_value

    cand_names = list(candidates.keys())
    cand_token_lists = [tok.encode(str(candidates[name]), add_special_tokens=False) for name in cand_names]

    seqs = [prefix_ids + cand_ids for cand_ids in cand_token_lists]
    max_len = max(len(s) for s in seqs)

    pad_id = tok.pad_token_id if tok.pad_token_id is not None else tok.eos_token_id
    input_ids = torch.full((len(seqs), max_len), pad_id, device=model.device, dtype=torch.long)
    attention_mask = torch.zeros((len(seqs), max_len), device=model.device, dtype=torch.long)

    for i, s in enumerate(seqs):
        L = len(s)
        input_ids[i, :L] = torch.tensor(s, device=model.device)
        attention_mask[i, :L] = 1

    logits = model(input_ids=input_ids, attention_mask=attention_mask).logits

    results = {}
    for b, name in enumerate(cand_names):
        cand_ids = cand_token_lists[b]
        if len(cand_ids) == 0:
            results[name] = float("-inf")
            continue

        total = 0.0
        for j, tok_id in enumerate(cand_ids):
            pos = prefix_len + j - 1
            lp = torch.log_softmax(logits[b, pos], dim=-1)[tok_id]
            total += lp.item()
        results[name] = total

    wrong_logprobs = [v for k, v in results.items() if k.startswith("wrong_")]
    wrong_max = max(wrong_logprobs) if wrong_logprobs else float("-inf")
    
    return {
        "logp_correct": results["correct"],
        "logp_wrong_max": wrong_max,
        "delta": results["correct"] - wrong_max,
    }

## Activation Extraction and Steering Hooks

Unified functions to setup hooks based on location type.

In [84]:
# Global storage for activations
activation_storage = {}

def clear_activation_storage():
    global activation_storage
    activation_storage = {}

def clear_all_hooks():
    """Clear all hooks from model"""
    model.model.embed_tokens._forward_hooks.clear()
    for layer_idx in range(NUM_LAYERS):
        layer = model.model.layers[layer_idx]
        layer._forward_pre_hooks.clear()
        layer._forward_hooks.clear()
        layer.self_attn.q_proj._forward_hooks.clear()
        layer.self_attn.k_proj._forward_hooks.clear()
        layer.self_attn.v_proj._forward_hooks.clear()
        layer.self_attn.o_proj._forward_hooks.clear()
        layer.self_attn.o_proj._forward_pre_hooks.clear()
        layer.mlp._forward_hooks.clear()

In [98]:
def setup_extraction_hook(location_type, layer_idx, head_idx, readout_pos, storage_key):
    """
    Setup hook to extract activation at readout_pos.
    Returns handle to remove later.
    """
    
    if location_type == "residual":
        # Extract input to the layer (residual stream entering that block)
        def hook(module, args):
            if len(args) > 0:
                hidden_states = args[0]
                activation_storage.setdefault(storage_key, []).append(
                    hidden_states[:, readout_pos, :].clone().cpu()
                )
            return args
    
        # layer 0: after embeddings
        if layer_idx == 0:
            def hook_embeddings(module, args, output):
                activation_storage.setdefault(storage_key, []).append(
                    output[:, readout_pos, :].clone().cpu()
                )
                return output
            return model.model.embed_tokens.register_forward_hook(hook_embeddings)

        # AFTER LAST LAYER: hook input to final norm (Option A)
        n_layers = len(model.model.layers)
        if layer_idx == n_layers:                   # 29
            final_norm = model.model.norm
            return final_norm.register_forward_pre_hook(hook)
    
        # normal internal residual: input to transformer block layer_idx
        return model.model.layers[layer_idx].register_forward_pre_hook(hook)

    
    elif location_type == "mlp":
        def hook(module, args, output):
            if storage_key not in activation_storage:
                activation_storage[storage_key] = []
            activation_storage[storage_key].append(output[:, readout_pos, :].clone().cpu())
            return output
        return model.model.layers[layer_idx].mlp.register_forward_hook(hook)
    
    elif location_type == "attention":
        def hook(module, args, output):
            if storage_key not in activation_storage:
                activation_storage[storage_key] = []
            activation_storage[storage_key].append(output[:, readout_pos, :].clone().cpu())
            return output
        return model.model.layers[layer_idx].self_attn.o_proj.register_forward_hook(hook)
    
    elif location_type == "o_proj_head":
        def hook(module, args):
            inp = args[0]
            start = head_idx * HEAD_DIM
            end = (head_idx + 1) * HEAD_DIM
            if storage_key not in activation_storage:
                activation_storage[storage_key] = []
            activation_storage[storage_key].append(inp[:, readout_pos, start:end].clone().cpu())
            return args
        return model.model.layers[layer_idx].self_attn.o_proj.register_forward_pre_hook(hook)
    
    elif location_type == "q_head":
        def hook(module, args, output):
            start = head_idx * HEAD_DIM
            end = (head_idx + 1) * HEAD_DIM
            if storage_key not in activation_storage:
                activation_storage[storage_key] = []
            activation_storage[storage_key].append(output[:, readout_pos, start:end].clone().cpu())
            return output
        return model.model.layers[layer_idx].self_attn.q_proj.register_forward_hook(hook)

    elif location_type == "q_layer":
        def hook(module, args, output):
            if storage_key not in activation_storage:
                activation_storage[storage_key] = []
            # output is Q: [B, S, n_heads*head_dim]
            activation_storage[storage_key].append(output[:, readout_pos, :].clone().cpu())
            return output
        return model.model.layers[layer_idx].self_attn.q_proj.register_forward_hook(hook)
        
    else:
        raise ValueError(f"Unknown location_type: {location_type}")

In [99]:
def setup_steering_hook(location_type, layer_idx, head_idx, readout_pos, steering_vector):
    """
    Setup hook to apply steering vector at readout_pos.
    Returns handle to remove later.
    """
    if location_type == "residual":
        def hook(module, args):
            if len(args) > 0:
                hidden_states = args[0].clone()
                hidden_states[:, readout_pos, :] += steering_vector.to(hidden_states.device)
                return (hidden_states,) + args[1:]
            return args
    
        # layer 0: after embeddings
        if layer_idx == 0:
            def hook_embeddings(module, args, output):
                out = output.clone()
                out[:, readout_pos, :] += steering_vector.to(out.device)
                return out
            return model.model.embed_tokens.register_forward_hook(hook_embeddings)
    
        # post last layer: hook input to final norm
        n_layers = len(model.model.layers)
        if layer_idx == n_layers:
            final_norm = model.model.norm
            return final_norm.register_forward_pre_hook(hook)
    
        return model.model.layers[layer_idx].register_forward_pre_hook(hook)
    
    elif location_type == "mlp":
        def hook(module, args, output):
            out = output.clone()
            out[:, readout_pos, :] += steering_vector.to(out.device)
            return out
        return model.model.layers[layer_idx].mlp.register_forward_hook(hook)
    
    elif location_type == "attention":
        def hook(module, args, output):
            out = output.clone()
            out[:, readout_pos, :] += steering_vector.to(out.device)
            return out
        return model.model.layers[layer_idx].self_attn.o_proj.register_forward_hook(hook)
    
    elif location_type == "o_proj_head":
        def hook(module, args):
            inp = args[0].clone()
            start = head_idx * HEAD_DIM
            end = (head_idx + 1) * HEAD_DIM
            inp[:, readout_pos, start:end] += steering_vector.to(inp.device)
            return (inp,) + args[1:]
        return model.model.layers[layer_idx].self_attn.o_proj.register_forward_pre_hook(hook)
    
    elif location_type == "q_head":
        def hook(module, args, output):
            out = output.clone()
            start = head_idx * HEAD_DIM
            end = (head_idx + 1) * HEAD_DIM
            out[:, readout_pos, start:end] += steering_vector.to(out.device)
            return out
        return model.model.layers[layer_idx].self_attn.q_proj.register_forward_hook(hook)
    
    elif location_type == "q_layer":
        def hook(module, args, output):
            out = output.clone()
            out[:, readout_pos, :] += steering_vector.to(out.device)
            return out
        return model.model.layers[layer_idx].self_attn.q_proj.register_forward_hook(hook)
    
    else:
        raise ValueError(f"Unknown location_type: {location_type}")

## Compute Steering Vectors

In [100]:
def get_prefix_ids_and_readout(prompt_text: str):
    full = prompt_text + "\nAnswer: "
    prefix_ids = tok.encode(full, add_special_tokens=False)
    readout_pos = len(prefix_ids) - 1
    return prefix_ids, readout_pos

In [101]:
@torch.inference_mode()
def compute_steering_vector(location_type, layer_idx, head_idx=None):
    """
    Compute steering vector by averaging difference between hint and baseline activations.
    
    Args:
        location_type: "residual", "mlp", "attention", "o_proj_head", "q_head", "q_layer"
        layer_idx: Layer index
        head_idx: Head index (required for o_proj_head and q_head)
    
    Returns:
        steering_vector: torch.Tensor
        metadata: dict with info about the vector
    """
    if location_type in ["o_proj_head", "q_head"] and head_idx is None:
        raise ValueError(f"head_idx required for {location_type}")
    
    baseline_activations = []
    hint_activations = []
    
    for tp in tqdm(target_problems, desc=f"Computing steering vector ({location_type} L{layer_idx})"):
        # Extract baseline activation
        baseline_readout_pos = compute_readout_pos(tp["baseline_prompt"])
        clear_activation_storage()
        clear_all_hooks()
        storage_key = "baseline"
        handle = setup_extraction_hook(location_type, layer_idx, head_idx, baseline_readout_pos, storage_key)
        
        prefix_ids, readout_pos = get_prefix_ids_and_readout(tp["baseline_prompt"])
        input_ids = torch.tensor([prefix_ids], device=model.device)
        attn_mask = torch.ones_like(input_ids)
        _ = model(input_ids=input_ids, attention_mask=attn_mask)

        if storage_key in activation_storage and activation_storage[storage_key]:
            baseline_activations.append(activation_storage[storage_key][0])
        
        handle.remove()
        
        # Extract hint activation
        hint_readout_pos = compute_readout_pos(tp["hint_prompt"])
        clear_activation_storage()
        clear_all_hooks()
        storage_key = "hint"
        handle = setup_extraction_hook(location_type, layer_idx, head_idx, hint_readout_pos, storage_key)
        
        prefix_ids, readout_pos = get_prefix_ids_and_readout(tp["hint_prompt"])
        input_ids = torch.tensor([prefix_ids], device=model.device)
        _ = model(input_ids=input_ids, attention_mask=torch.ones_like(input_ids))
        
        if storage_key in activation_storage and activation_storage[storage_key]:
            hint_activations.append(activation_storage[storage_key][0])
        
        handle.remove()
    
    clear_all_hooks()
    
    if not baseline_activations or not hint_activations:
        raise ValueError("No activations collected")
    
    # Average and compute difference
    baseline_avg = torch.stack(baseline_activations).mean(dim=0).squeeze(0)
    hint_avg = torch.stack(hint_activations).mean(dim=0).squeeze(0)
    steering_vector = hint_avg - baseline_avg
    
    metadata = {
        "location_type": location_type,
        "layer_idx": layer_idx,
        "head_idx": head_idx,
        "vector_shape": list(steering_vector.shape),
        "vector_norm": float(steering_vector.norm().item()),
        "n_problems": len(target_problems),
    }
    
    return steering_vector, metadata

## Store and Retrieve Steering Vectors

In [102]:
def normalize_l2(vec: torch.Tensor, eps: float = 1e-8):
    """
    L2-normalize a vector.
    Preserves direction, fixes scale.
    """
    return vec / (vec.norm(p=2) + eps)

def save_steering_vector(steering_vector, metadata, filename=None):
    """Save steering vector to disk"""
    steering_vector = normalize_l2(steering_vector)
    
    if filename is None:
        loc_str = f"L{metadata['layer_idx']}"
        if metadata.get('head_idx') is not None:
            loc_str += f"H{metadata['head_idx']}"
        filename = f"{metadata['location_type']}_{loc_str}.json"
    
    # Create subdirectory for this location type if it doesn't exist
    subdir = STEERING_VECTORS_DIR / metadata['location_type']
    subdir.mkdir(parents=True, exist_ok=True)
    
    filepath = subdir / filename
    
    data = {
        "vector": steering_vector.cpu().tolist(),
        "metadata": metadata
    }
    
    with open(filepath, "w") as f:
        json.dump(data, f, indent=2)
    
    print(f"Saved steering vector to {filepath}")
    return filepath

def load_steering_vector(filename):
    """Load steering vector from disk"""
    filepath = STEERING_VECTORS_DIR / filename
    
    with open(filepath, "r") as f:
        data = json.load(f)
    
    vector = torch.tensor(data["vector"])
    metadata = data["metadata"]
    
    return vector, metadata

## Apply Steering and Measure Performance

**Note:** The `alpha` parameter scales the steering vector strength.

In [90]:
@torch.inference_mode()
def measure_steering_performance(
    steering_vector,
    location_type,
    layer_idx,
    head_idx=None,
    alpha=1.0
):
    """
    Apply steering vector (scaled by alpha) and measure performance.
    
    Args:
        steering_vector: Vector to apply
        location_type: "residual", "mlp", "attention", "o_proj_head", "q_head", "q_layer"
        layer_idx: Layer index
        head_idx: Head index (for o_proj_head and q_head)
        alpha: Steering strength multiplier (default 1.0)
    
    Returns:
        results: List of dicts with performance metrics
    """
    if location_type in ["o_proj_head", "q_head"] and head_idx is None:
        raise ValueError(f"head_idx required for {location_type}")
    
    scaled_vector = steering_vector * alpha
    results = []
    
    for tp in tqdm(target_problems, desc=f"Measuring steering (α={alpha:.2f})"):
        
        baseline_prompt_ids = tok.encode(tp["baseline_prompt"], add_special_tokens=False)
        hint_prompt_ids = tok.encode(tp["hint_prompt"], add_special_tokens=False)
        
        # Compute without steering
        clear_all_hooks()
        baseline_result = compute_answer_logprobs_from_tokens(baseline_prompt_ids, tp["correct_answer"], None)
        hint_result = compute_answer_logprobs_from_tokens(hint_prompt_ids, tp["correct_answer"], tp["hint_value"])
        
        # Compute with steering
        clear_all_hooks()
        baseline_readout_pos = compute_readout_pos(tp["baseline_prompt"])
        handle = setup_steering_hook(location_type, layer_idx, head_idx, baseline_readout_pos, scaled_vector)
        baseline_steered_result = compute_answer_logprobs_from_tokens(baseline_prompt_ids, tp["correct_answer"], None)
        handle.remove()
        
        # Compute with steering on hint
        clear_all_hooks()
        hint_readout_pos = compute_readout_pos(tp["hint_prompt"])
        handle = setup_steering_hook(location_type, layer_idx, head_idx, hint_readout_pos, scaled_vector)
        hint_steered_result = compute_answer_logprobs_from_tokens(hint_prompt_ids, tp["correct_answer"], tp["hint_value"])
        handle.remove()
        
        results.append({
            "problem_idx": tp["problem_idx"],
            "variant_idx": tp["variant_idx"],
            "delta_baseline": baseline_result["delta"],
            "delta_baseline_steered": baseline_steered_result["delta"],
            "delta_hint": hint_result["delta"],
            "delta_hint_steered": hint_steered_result["delta"],
            "alpha": alpha,
        })
    
    clear_all_hooks()
    return results

## Control Experiments

In [91]:
def generate_random_unit_vector(dim, norm=None, device="cpu"):
    """Generate random unit vector, optionally scaled to match given norm"""
    vec = torch.randn(dim, device=device)
    vec = vec / vec.norm()
    if norm is not None:
        vec = vec * norm
    return vec

def compare_steering_vs_random(
    steering_vector,
    location_type,
    layer_idx,
    head_idx=None,
    alpha=1.0
):
    """Compare computed steering vector vs random vector with same norm"""
    print("Measuring with computed steering vector...")
    steering_results = measure_steering_performance(
        steering_vector, location_type, layer_idx, head_idx=head_idx, alpha=alpha
    )
    
    # Generate random vector with same norm
    random_vector = generate_random_unit_vector(
        steering_vector.shape[0],
        norm=steering_vector.norm().item(),
        device=steering_vector.device
    )
    
    print("Measuring with random vector (same norm)...")
    random_results = measure_steering_performance(
        random_vector, location_type, layer_idx, head_idx=head_idx, alpha=alpha
    )
    
    return steering_results, random_results

def apply_vector_at_different_locations(
    steering_vector,
    source_location_type,
    target_layers,
    head_idx=None,
    alpha=1.0
):
    """Apply same steering vector at different layers"""
    results_dict = {}
    
    for target_layer_idx in target_layers:
        if 0 <= target_layer_idx < NUM_LAYERS:
            print(f"Measuring at L{target_layer_idx}...")
            try:
                results = measure_steering_performance(
                    steering_vector, source_location_type, target_layer_idx,
                    head_idx=head_idx, alpha=alpha
                )
                results_dict[target_layer_idx] = results
            except Exception as e:
                print(f"  Error at L{target_layer_idx}: {e}")
    
    return results_dict

## Analysis Functions

In [92]:
def compute_effects(results):
    """Compute effect sizes from results"""
    for r in results:
        r["E_hint"] = r["delta_hint"] - r["delta_baseline"]
        r["E_steering_baseline"] = r["delta_baseline_steered"] - r["delta_baseline"]
        r["E_steering_hint"] = r["delta_hint_steered"] - r["delta_hint"]
        r["E_combined"] = r["delta_hint_steered"] - r["delta_baseline"]
        r["DD"] = (r["delta_hint_steered"] - r["delta_baseline_steered"]) - (r["delta_hint"] - r["delta_baseline"])
    return results

def print_summary(results, name="Results"):
    """Print summary statistics"""
    results = compute_effects(results)
    print(f"\n=== {name} ===")
    print(f"Mean Δ_baseline: {np.mean([r['delta_baseline'] for r in results]):.3f}")
    print(f"Mean Δ_baseline_steered: {np.mean([r['delta_baseline_steered'] for r in results]):.3f}")
    print(f"Mean Δ_hint: {np.mean([r['delta_hint'] for r in results]):.3f}")
    print(f"Mean Δ_hint_steered: {np.mean([r['delta_hint_steered'] for r in results]):.3f}")
    print(f"Mean E_hint: {np.mean([r['E_hint'] for r in results]):.3f}")
    print(f"Mean E_steering_baseline: {np.mean([r['E_steering_baseline'] for r in results]):.3f}")
    print(f"Mean E_steering_hint: {np.mean([r['E_steering_hint'] for r in results]):.3f}")
    print(f"Mean DD: {np.mean([r['DD'] for r in results]):.3f}")

## Example Usage

In [111]:
top_q_layer_indices = [0, 20, 21]
top_o_proj_layer_indices = [0, 20, 21]
mlp_layer_indices = [1,8,9,11,27]
resid_layers = [0,1,8,9,10,11,12,20,21,22,27, 28]

In [113]:
for l in list(range(29)):
    if l in resid_layers:
        continue
    # Compute steering vector for entire Q projection
    steering_vec, metadata = compute_steering_vector(
        location_type="residual",
        layer_idx=l,
        head_idx=None
    )
    print(f"Vector norm: {metadata['vector_norm']:.3f}")
    
    # Save it
    save_steering_vector(steering_vec, metadata)

Computing steering vector (residual L2): 100%|██████████| 633/633 [00:53<00:00, 11.76it/s]


Vector norm: 0.213
Saved steering vector to steering_vectors/residual/residual_L2.json


Computing steering vector (residual L3): 100%|██████████| 633/633 [00:54<00:00, 11.67it/s]


Vector norm: 0.311
Saved steering vector to steering_vectors/residual/residual_L3.json


Computing steering vector (residual L4): 100%|██████████| 633/633 [00:57<00:00, 11.05it/s]


Vector norm: 0.543
Saved steering vector to steering_vectors/residual/residual_L4.json


Computing steering vector (residual L5): 100%|██████████| 633/633 [00:54<00:00, 11.65it/s]


Vector norm: 0.625
Saved steering vector to steering_vectors/residual/residual_L5.json


Computing steering vector (residual L6): 100%|██████████| 633/633 [00:53<00:00, 11.72it/s]


Vector norm: 0.707
Saved steering vector to steering_vectors/residual/residual_L6.json


Computing steering vector (residual L7): 100%|██████████| 633/633 [00:55<00:00, 11.47it/s]


Vector norm: 0.984
Saved steering vector to steering_vectors/residual/residual_L7.json


Computing steering vector (residual L13): 100%|██████████| 633/633 [00:56<00:00, 11.13it/s]


Vector norm: 2.500
Saved steering vector to steering_vectors/residual/residual_L13.json


Computing steering vector (residual L14): 100%|██████████| 633/633 [00:54<00:00, 11.54it/s]


Vector norm: 3.781
Saved steering vector to steering_vectors/residual/residual_L14.json


Computing steering vector (residual L15): 100%|██████████| 633/633 [00:53<00:00, 11.83it/s]


Vector norm: 3.750
Saved steering vector to steering_vectors/residual/residual_L15.json


Computing steering vector (residual L16): 100%|██████████| 633/633 [00:52<00:00, 11.99it/s]


Vector norm: 4.125
Saved steering vector to steering_vectors/residual/residual_L16.json


Computing steering vector (residual L17): 100%|██████████| 633/633 [00:51<00:00, 12.20it/s]


Vector norm: 5.062
Saved steering vector to steering_vectors/residual/residual_L17.json


Computing steering vector (residual L18): 100%|██████████| 633/633 [00:51<00:00, 12.21it/s]


Vector norm: 6.719
Saved steering vector to steering_vectors/residual/residual_L18.json


Computing steering vector (residual L19): 100%|██████████| 633/633 [00:52<00:00, 12.06it/s]


Vector norm: 8.500
Saved steering vector to steering_vectors/residual/residual_L19.json


Computing steering vector (residual L23): 100%|██████████| 633/633 [00:51<00:00, 12.23it/s]


Vector norm: 25.500
Saved steering vector to steering_vectors/residual/residual_L23.json


Computing steering vector (residual L24): 100%|██████████| 633/633 [00:53<00:00, 11.87it/s]


Vector norm: 28.125
Saved steering vector to steering_vectors/residual/residual_L24.json


Computing steering vector (residual L25): 100%|██████████| 633/633 [00:52<00:00, 12.07it/s]


Vector norm: 36.750
Saved steering vector to steering_vectors/residual/residual_L25.json


Computing steering vector (residual L26): 100%|██████████| 633/633 [00:52<00:00, 11.95it/s]

Vector norm: 41.000
Saved steering vector to steering_vectors/residual/residual_L26.json





In [77]:
vec_attention, _ = load_steering_vector("attention_L20.json")
vec_mlp, _ = load_steering_vector("mlp_L20.json")

In [78]:
# Measure performance with alpha scaling
print("=====================================")
for alpha in [-1.0, -0.75, -0.5, -0.25, 0.25, 0.5, 0.75, 1.0]:
    print("=====================================")
    results = measure_steering_performance(
        vec_attention,
        location_type="attention",
        layer_idx=TARGET_LAYER,
        head_idx=TARGET_HEAD,
        alpha=ALPHA
    )
    print_summary(results, f"Steering (α={ALPHA})")

    print("=====================================")
    results = measure_steering_performance(
        vec_mlp,
        location_type="mlp",
        layer_idx=TARGET_LAYER,
        head_idx=TARGET_HEAD,
        alpha=alpha
    )
    print_summary(results, f"Steering (α={ALPHA})")



Measuring steering (α=-1.00): 100%|██████████| 633/633 [02:46<00:00,  3.81it/s]



=== Steering (α=-1.0) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.362
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.984
Mean E_hint: 5.343
Mean E_steering_baseline: -0.002
Mean E_steering_hint: 0.000
Mean DD: 0.002


Measuring steering (α=-1.00): 100%|██████████| 633/633 [02:47<00:00,  3.78it/s]



=== Steering (α=-1.0) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.373
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.982
Mean E_hint: 5.343
Mean E_steering_baseline: -0.013
Mean E_steering_hint: -0.001
Mean DD: 0.012


Measuring steering (α=-0.75): 100%|██████████| 633/633 [02:50<00:00,  3.70it/s]



=== Steering (α=-0.75) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.359
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.980
Mean E_hint: 5.343
Mean E_steering_baseline: 0.002
Mean E_steering_hint: -0.003
Mean DD: -0.005


Measuring steering (α=-0.75): 100%|██████████| 633/633 [02:54<00:00,  3.64it/s]



=== Steering (α=-0.75) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.345
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.980
Mean E_hint: 5.343
Mean E_steering_baseline: 0.015
Mean E_steering_hint: -0.003
Mean DD: -0.018


Measuring steering (α=-0.50): 100%|██████████| 633/633 [02:55<00:00,  3.61it/s]



=== Steering (α=-0.5) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.365
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.980
Mean E_hint: 5.343
Mean E_steering_baseline: -0.005
Mean E_steering_hint: -0.004
Mean DD: 0.002


Measuring steering (α=-0.50): 100%|██████████| 633/633 [02:52<00:00,  3.68it/s]



=== Steering (α=-0.5) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.355
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.982
Mean E_hint: 5.343
Mean E_steering_baseline: 0.005
Mean E_steering_hint: -0.001
Mean DD: -0.007


Measuring steering (α=-0.25): 100%|██████████| 633/633 [02:51<00:00,  3.69it/s]



=== Steering (α=-0.25) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.357
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.983
Mean E_hint: 5.343
Mean E_steering_baseline: 0.003
Mean E_steering_hint: -0.001
Mean DD: -0.004


Measuring steering (α=-0.25): 100%|██████████| 633/633 [02:54<00:00,  3.63it/s]



=== Steering (α=-0.25) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.358
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.982
Mean E_hint: 5.343
Mean E_steering_baseline: 0.002
Mean E_steering_hint: -0.001
Mean DD: -0.003


Measuring steering (α=0.25): 100%|██████████| 633/633 [02:53<00:00,  3.65it/s]



=== Steering (α=0.25) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.370
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.979
Mean E_hint: 5.343
Mean E_steering_baseline: -0.010
Mean E_steering_hint: -0.004
Mean DD: 0.006


Measuring steering (α=0.25): 100%|██████████| 633/633 [02:54<00:00,  3.64it/s]



=== Steering (α=0.25) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.352
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.980
Mean E_hint: 5.343
Mean E_steering_baseline: 0.008
Mean E_steering_hint: -0.003
Mean DD: -0.011


Measuring steering (α=0.50): 100%|██████████| 633/633 [02:54<00:00,  3.63it/s]



=== Steering (α=0.5) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.357
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.975
Mean E_hint: 5.343
Mean E_steering_baseline: 0.004
Mean E_steering_hint: -0.008
Mean DD: -0.012


Measuring steering (α=0.50): 100%|██████████| 633/633 [02:55<00:00,  3.61it/s]



=== Steering (α=0.5) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.356
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.979
Mean E_hint: 5.343
Mean E_steering_baseline: 0.004
Mean E_steering_hint: -0.004
Mean DD: -0.009


Measuring steering (α=0.75): 100%|██████████| 633/633 [02:52<00:00,  3.67it/s]



=== Steering (α=0.75) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.359
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.980
Mean E_hint: 5.343
Mean E_steering_baseline: 0.001
Mean E_steering_hint: -0.004
Mean DD: -0.005


Measuring steering (α=0.75): 100%|██████████| 633/633 [02:51<00:00,  3.69it/s]



=== Steering (α=0.75) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.362
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.979
Mean E_hint: 5.343
Mean E_steering_baseline: -0.002
Mean E_steering_hint: -0.004
Mean DD: -0.002


Measuring steering (α=1.00): 100%|██████████| 633/633 [02:51<00:00,  3.69it/s]



=== Steering (α=1.0) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.358
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.987
Mean E_hint: 5.343
Mean E_steering_baseline: 0.002
Mean E_steering_hint: 0.004
Mean DD: 0.002


Measuring steering (α=1.00): 100%|██████████| 633/633 [02:54<00:00,  3.62it/s]


=== Steering (α=1.0) ===
Mean Δ_baseline: -1.360
Mean Δ_baseline_steered: -1.370
Mean Δ_hint: 3.983
Mean Δ_hint_steered: 3.983
Mean E_hint: 5.343
Mean E_steering_baseline: -0.010
Mean E_steering_hint: -0.001
Mean DD: 0.009





In [110]:
import pandas as pd
df_ablation = pd.DataFrame(ablation_summaries)

# Quick summary table
print(df_ablation[["location_type", "layer_idx", "mean_DD", "mean_baseline_damage", "mean_E_hint"]].to_string())

# Bar plot of DD by location/layer
fig, ax = plt.subplots(figsize=(12, 5))
labels = [f"{row['location_type']}\nL{int(row['layer_idx'])}" for _, row in df_ablation.iterrows()]
colors = ['coral' if row['mean_DD'] > 0 else 'steelblue' for _, row in df_ablation.iterrows()]
ax.bar(range(len(df_ablation)), df_ablation["mean_DD"], color=colors, alpha=0.7)
ax.set_xticks(range(len(df_ablation)))
ax.set_xticklabels(labels, rotation=45, ha="right", fontsize=8)
ax.axhline(y=0, color="black", linestyle="-", linewidth=0.5)
ax.set_ylabel("DD (hint-specific effect)")
ax.set_title("Ablation: DD by Location/Layer")
plt.tight_layout()
plt.show()

NameError: name 'ablation_summaries' is not defined

In [None]:
# Measure performance with alpha scaling
results = measure_steering_performance(
    steering_vec,
    location_type="attention",
    layer_idx=TARGET_LAYER,
    head_idx=TARGET_HEAD,
    alpha=ALPHA
)
print_summary(results, f"Steering (α={ALPHA})")

# Control 1: Compare with random vector
steering_results, random_results = compare_steering_vs_random(
    steering_vec,
    location_type="o_proj_head",
    layer_idx=TARGET_LAYER,
    head_idx=TARGET_HEAD,
    alpha=ALPHA
)
print_summary(steering_results, "Computed Steering")
print_summary(random_results, "Random Vector")

# Control 2: Apply at different layers
other_layers = [TARGET_LAYER - 2, TARGET_LAYER, TARGET_LAYER + 2]
results_by_layer = apply_vector_at_different_locations(
    steering_vec,
    source_location_type="o_proj_head",
    target_layers=other_layers,
    head_idx=TARGET_HEAD,
    alpha=ALPHA
)
for layer_idx, results in results_by_layer.items():
    print_summary(results, f"Layer {layer_idx}")