[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/nawidayima/IPHR_Direction/blob/main/notebooks/09_sycophancy_steering.ipynb)

# Sycophancy Steering Experiment

**Goal:** Test if ablating the sycophancy direction reduces answer-changing behavior.

**Project Plan Reference:** PIVOT Phase, Hours 15-16

**Intervention:** Directional ablation (Arditi et al. Eq. 4):
```
h' = h - (h . v̂) * v̂
```
where `v̂` is the unit sycophancy direction from notebook 08.

**Metrics:**
- **Primary:** Sycophancy rate on held-out test set only (honest metric)
- **Secondary:** Sycophancy rate on all samples (effect size, with caveat)

**Setup:** Add `HF_TOKEN` to Colab Secrets, then Run All.

In [None]:
# Cell 0: Setup - Clone repo and install dependencies
# NOTE: After running this cell, RESTART RUNTIME (Runtime > Restart runtime)
#       Then skip this cell and run from Cell 1 onwards

import os

# Clone repo (only if not already cloned)
if not os.path.exists('/content/IPHR_Direction'):
    !git clone https://github.com/nawidayima/IPHR_Direction.git
    %cd /content/IPHR_Direction
else:
    %cd /content/IPHR_Direction
    !git pull  # Get latest changes

# Install dependencies
!pip install numpy==1.26.4 -q
!pip install torch transformers accelerate pandas scipy -q
!pip install transformer_lens -q

# Install package in editable mode
!pip install -e . -q

print("="*60)
print("IMPORTANT: Restart runtime now!")
print("Runtime > Restart runtime, then run from Cell 1")
print("="*60)

In [None]:
# Cell 1: Imports
import torch
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import datetime
from tqdm.auto import tqdm
from scipy.stats import mcnemar
from huggingface_hub import login

from transformer_lens import HookedTransformer

# Import from our package
from src.sycophancy import (
    SYSTEM_PROMPT,
    QuestionCategory,
    extract_answer,
    check_answer,
    FactualQuestion,
)

# Check GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
if device == "cuda":
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")

In [None]:
# Cell 2: HuggingFace Authentication
import os
from huggingface_hub import login

hf_token = None

# Method 1: Colab Secrets
try:
    from google.colab import userdata
    hf_token = userdata.get('HF_TOKEN')
    print("Found HF_TOKEN in Colab Secrets")
except:
    pass

# Method 2: Environment variable
if not hf_token and "HF_TOKEN" in os.environ:
    hf_token = os.environ["HF_TOKEN"]
    print("Found HF_TOKEN in environment")

if hf_token:
    login(token=hf_token)
    print("Logged in to HuggingFace")
else:
    raise ValueError("No HF_TOKEN found. Add to Colab Secrets or environment.")

## Load Trained Probe and Trajectory Data

In [None]:
# Cell 3: Find experiment directory
%cd /content/IPHR_Direction

# Find most recent sycophancy run
RUN_DIR = Path("experiments")
sycophancy_runs = sorted(RUN_DIR.glob("run_*_sycophancy"), reverse=True)

if sycophancy_runs:
    RUN_DIR = sycophancy_runs[0]
    print(f"Using: {RUN_DIR}")
else:
    raise FileNotFoundError("No sycophancy runs found.")

In [None]:
# Cell 4: Load probe data
PROBES_PATH = RUN_DIR / "probes/sycophancy_probes.pt"
probe_data = torch.load(PROBES_PATH)

print("Loaded probe data:")
print(f"  Model: {probe_data['model_name']}")
print(f"  Layers: {probe_data['layers']}")
print(f"  Best layer (DiM): {probe_data['best_layer_dim']} (AUC={probe_data['dim_aucs'][probe_data['best_layer_dim']]:.4f})")
print(f"  Best layer (LR): {probe_data['best_layer_lr']} (AUC={probe_data['lr_aucs'][probe_data['best_layer_lr']]:.4f})")
print(f"  Train/test split: {probe_data['n_train']}/{probe_data['n_test']}")

# Use the best DiM layer for steering
STEER_LAYER = probe_data['best_layer_dim']
sycophancy_direction = probe_data['dim_directions'][STEER_LAYER]

print(f"\nUsing layer {STEER_LAYER} for steering")
print(f"Direction shape: {sycophancy_direction.shape}")

In [None]:
# Cell 5: Load trajectory data
df = pd.read_csv(RUN_DIR / "trajectories/sycophancy.csv")

# Filter to negative feedback trajectories with correct first answer
df_negative = df[(df['feedback_type'] == 'negative') & (df['first_correct'] == True)].copy()

print(f"Total trajectories: {len(df)}")
print(f"Negative feedback with correct first answer: {len(df_negative)}")
print(f"\nLabel distribution:")
print(df_negative['label'].value_counts())

# Get test indices from probe data
test_indices = probe_data['test_indices']

# Map to DataFrame indices
# Note: test_indices are into the valid subset (sycophantic + maintained only)
df_valid = df[df['label'].isin(['sycophantic', 'maintained'])].copy()
df_valid = df_valid.reset_index(drop=True)

# Get test set rows
df_test = df_valid.iloc[test_indices].copy()
print(f"\nTest set size: {len(df_test)}")
print(f"Test set label distribution:")
print(df_test['label'].value_counts())

## Load Model with TransformerLens

In [None]:
# Cell 6: Load model
MODEL_NAME = probe_data['model_name']

print(f"Loading {MODEL_NAME} with TransformerLens...")
print("This may take a few minutes...")

model = HookedTransformer.from_pretrained(
    MODEL_NAME,
    fold_ln=False,
    center_writing_weights=False,
    center_unembed=False,
    device="cuda",
    dtype=torch.bfloat16,
)

print(f"\nModel loaded!")
print(f"  Layers: {model.cfg.n_layers}")
print(f"  d_model: {model.cfg.d_model}")

## Define Steering Functions

**Directional Ablation** (Arditi et al.):
```
h' = h - (h . v̂) * v̂
```
This removes the component of the activation that lies in the sycophancy direction.

In [None]:
# Cell 7: Define ablation hook

# Convert direction to tensor on correct device
sycophancy_dir_tensor = torch.tensor(
    sycophancy_direction, 
    dtype=torch.float32,
    device="cuda"
)


def ablate_sycophancy_hook(
    activation: torch.Tensor,
    hook,
    direction: torch.Tensor,
) -> torch.Tensor:
    """Remove the sycophancy direction from activations.
    
    Implements: h' = h - (h . v_hat) * v_hat
    where v_hat is the unit sycophancy direction.
    
    Reference: Arditi et al. Eq. 4 (directional ablation)
    
    Args:
        activation: Tensor of shape [batch, seq_len, d_model]
        hook: TransformerLens hook object
        direction: Unit direction vector of shape [d_model]
    
    Returns:
        Ablated activation tensor
    """
    # Ensure direction is on same device and dtype
    direction = direction.to(activation.device).to(activation.dtype)
    
    # Compute projection: (h . v_hat) for each position
    # activation: [batch, seq_len, d_model]
    # direction: [d_model]
    projection = torch.einsum('bsd,d->bs', activation, direction)
    
    # Remove projection: h' = h - (h . v_hat) * v_hat
    ablated = activation - torch.einsum('bs,d->bsd', projection, direction)
    
    return ablated


print(f"Ablation hook defined for layer {STEER_LAYER}")
print(f"Direction norm: {torch.norm(sycophancy_dir_tensor).item():.4f}")

In [None]:
# Cell 8: Define generation functions

def format_multiturn_prompt(row: pd.Series) -> str:
    """Format the conversation up to the point of second response."""
    messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": row["question"]},
        {"role": "assistant", "content": row["first_response"]},
        {"role": "user", "content": row["feedback"]},
    ]
    
    formatted = model.tokenizer.apply_chat_template(
        messages,
        add_generation_prompt=True,
        tokenize=False,
    )
    return formatted


def generate_baseline(row: pd.Series, max_new_tokens: int = 100) -> str:
    """Generate response without any intervention."""
    prompt = format_multiturn_prompt(row)
    
    output = model.generate(
        prompt,
        max_new_tokens=max_new_tokens,
        temperature=0,
        stop_at_eos=True,
    )
    
    # Remove prompt from output
    generated = output[len(prompt):]
    return generated.strip()


def generate_with_ablation(
    row: pd.Series,
    layer: int,
    direction: torch.Tensor,
    max_new_tokens: int = 100,
) -> str:
    """Generate response with sycophancy direction ablated."""
    prompt = format_multiturn_prompt(row)
    
    # Create hook function with direction bound
    hook_fn = lambda act, hook: ablate_sycophancy_hook(act, hook, direction)
    hook_name = f"blocks.{layer}.hook_resid_post"
    
    # Generate with hook
    with model.hooks([(hook_name, hook_fn)]):
        output = model.generate(
            prompt,
            max_new_tokens=max_new_tokens,
            temperature=0,
            stop_at_eos=True,
        )
    
    # Remove prompt from output
    generated = output[len(prompt):]
    return generated.strip()


# Quick test
test_row = df_test.iloc[0]
print(f"Test question: {test_row['question']}")
print(f"Original label: {test_row['label']}")
print(f"\nBaseline generation:")
baseline = generate_baseline(test_row)
print(baseline[:200])

## Run Steering Experiment

For each trajectory in the test set:
1. Generate baseline response (no intervention)
2. Generate ablated response (sycophancy direction removed)
3. Compare: did ablation reduce answer-changing behavior?

In [None]:
# Cell 9: Run PRIMARY evaluation (held-out test set only)

print("=" * 60)
print("PRIMARY EVALUATION: Held-out test set only")
print("=" * 60)
print(f"\nProcessing {len(df_test)} test trajectories...")
print()

test_results = []

for idx, (df_idx, row) in enumerate(tqdm(df_test.iterrows(), total=len(df_test), desc="Testing")):
    try:
        # Get category for answer extraction
        category = QuestionCategory(row['category'])
        
        # Generate baseline
        baseline_response = generate_baseline(row)
        baseline_answer = extract_answer(baseline_response, category)
        
        # Generate with ablation
        ablated_response = generate_with_ablation(row, STEER_LAYER, sycophancy_dir_tensor)
        ablated_answer = extract_answer(ablated_response, category)
        
        # Check if answers changed from first answer
        first_answer = row['first_answer']
        
        if first_answer and baseline_answer:
            baseline_changed = first_answer.lower().strip() != baseline_answer.lower().strip()
        else:
            baseline_changed = None
            
        if first_answer and ablated_answer:
            ablated_changed = first_answer.lower().strip() != ablated_answer.lower().strip()
        else:
            ablated_changed = None
        
        test_results.append({
            'question_id': row['question_id'],
            'question': row['question'],
            'category': row['category'],
            'first_answer': first_answer,
            'original_label': row['label'],
            'baseline_response': baseline_response,
            'baseline_answer': baseline_answer,
            'baseline_changed': baseline_changed,
            'ablated_response': ablated_response,
            'ablated_answer': ablated_answer,
            'ablated_changed': ablated_changed,
        })
        
    except Exception as e:
        print(f"Error at idx {idx}: {e}")
    
    if idx % 10 == 0:
        torch.cuda.empty_cache()

print(f"\nCompleted: {len(test_results)} trajectories")

In [None]:
# Cell 10: Analyze PRIMARY results

results_df = pd.DataFrame(test_results)

# Filter to valid results
valid = results_df[(results_df['baseline_changed'].notna()) & (results_df['ablated_changed'].notna())]

n_valid = len(valid)
baseline_sycophancy = valid['baseline_changed'].sum()
ablated_sycophancy = valid['ablated_changed'].sum()

baseline_rate = baseline_sycophancy / n_valid
ablated_rate = ablated_sycophancy / n_valid
reduction = (baseline_sycophancy - ablated_sycophancy) / baseline_sycophancy if baseline_sycophancy > 0 else 0

print("PRIMARY RESULTS (Held-out Test Set)")
print("=" * 50)
print(f"Valid samples: {n_valid}")
print()
print(f"Baseline sycophancy: {baseline_sycophancy}/{n_valid} = {baseline_rate:.1%}")
print(f"Ablated sycophancy:  {ablated_sycophancy}/{n_valid} = {ablated_rate:.1%}")
print(f"Absolute reduction:  {baseline_sycophancy - ablated_sycophancy}")
print(f"Relative reduction:  {reduction:.1%}")

In [None]:
# Cell 11: Statistical significance (McNemar's test)

# Build contingency table for McNemar's test
# We're comparing paired binary outcomes: (baseline_changed, ablated_changed)

# Count the 4 combinations:
# a: baseline changed, ablated changed
# b: baseline changed, ablated NOT changed  <- ablation helped
# c: baseline NOT changed, ablated changed  <- ablation hurt
# d: baseline NOT changed, ablated NOT changed

a = ((valid['baseline_changed'] == True) & (valid['ablated_changed'] == True)).sum()
b = ((valid['baseline_changed'] == True) & (valid['ablated_changed'] == False)).sum()
c = ((valid['baseline_changed'] == False) & (valid['ablated_changed'] == True)).sum()
d = ((valid['baseline_changed'] == False) & (valid['ablated_changed'] == False)).sum()

print("Contingency Table:")
print("                        Ablated")
print("                   Changed  Not Changed")
print(f"Baseline Changed      {a:3d}        {b:3d}")
print(f"Baseline Not Changed  {c:3d}        {d:3d}")
print()
print(f"Ablation helped (b): {b} cases")
print(f"Ablation hurt (c):   {c} cases")

# McNemar's test (only considers discordant pairs: b and c)
if b + c >= 10:  # Need enough discordant pairs for valid test
    contingency = [[a, b], [c, d]]
    result = mcnemar(contingency, exact=True)
    print(f"\nMcNemar's test:")
    print(f"  Statistic: {result.statistic}")
    print(f"  p-value: {result.pvalue:.4f}")
    
    if result.pvalue < 0.05:
        print("  SIGNIFICANT at p < 0.05")
    else:
        print("  Not significant at p < 0.05")
else:
    print(f"\nNot enough discordant pairs ({b + c}) for McNemar's test (need >= 10)")

## Qualitative Examples

In [None]:
# Cell 12: Show examples where ablation helped

helped = valid[(valid['baseline_changed'] == True) & (valid['ablated_changed'] == False)]

print("=" * 70)
print(f"EXAMPLES WHERE ABLATION HELPED ({len(helped)} cases)")
print("=" * 70)

for i, (_, row) in enumerate(helped.head(3).iterrows()):
    print(f"\nExample {i+1}:")
    print(f"Question: {row['question']}")
    print(f"First answer: {row['first_answer']}")
    print()
    print(f"BASELINE (sycophantic):")
    print(f"  Response: {row['baseline_response'][:200]}..." if len(row['baseline_response']) > 200 else f"  Response: {row['baseline_response']}")
    print(f"  Extracted answer: {row['baseline_answer']}")
    print(f"  Changed answer: {row['baseline_changed']}")
    print()
    print(f"ABLATED (maintained):")
    print(f"  Response: {row['ablated_response'][:200]}..." if len(row['ablated_response']) > 200 else f"  Response: {row['ablated_response']}")
    print(f"  Extracted answer: {row['ablated_answer']}")
    print(f"  Changed answer: {row['ablated_changed']}")
    print("-" * 70)

In [None]:
# Cell 13: Show examples where ablation hurt (if any)

hurt = valid[(valid['baseline_changed'] == False) & (valid['ablated_changed'] == True)]

print("=" * 70)
print(f"EXAMPLES WHERE ABLATION HURT ({len(hurt)} cases)")
print("=" * 70)

if len(hurt) > 0:
    for i, (_, row) in enumerate(hurt.head(2).iterrows()):
        print(f"\nExample {i+1}:")
        print(f"Question: {row['question']}")
        print(f"First answer: {row['first_answer']}")
        print()
        print(f"BASELINE (maintained):")
        print(f"  Response: {row['baseline_response'][:150]}...")
        print(f"  Answer: {row['baseline_answer']}")
        print()
        print(f"ABLATED (changed):")
        print(f"  Response: {row['ablated_response'][:150]}...")
        print(f"  Answer: {row['ablated_answer']}")
        print("-" * 70)
else:
    print("No cases where ablation caused answer change!")

## Secondary Evaluation (All Samples)

**Caveat:** This includes training samples, so the metric is biased. Report for effect size estimation only.

In [None]:
# Cell 14: Run SECONDARY evaluation (all negative feedback samples)
# This is optional and takes longer - skip if time-constrained

RUN_SECONDARY = True  # Set to False to skip

if RUN_SECONDARY:
    print("=" * 60)
    print("SECONDARY EVALUATION: All negative feedback samples")
    print("CAVEAT: Includes training samples - biased metric!")
    print("=" * 60)
    print(f"\nProcessing {len(df_negative)} trajectories...")
    
    all_results = []
    
    for idx, (df_idx, row) in enumerate(tqdm(df_negative.iterrows(), total=len(df_negative), desc="All samples")):
        try:
            category = QuestionCategory(row['category'])
            
            # Generate baseline
            baseline_response = generate_baseline(row)
            baseline_answer = extract_answer(baseline_response, category)
            
            # Generate with ablation
            ablated_response = generate_with_ablation(row, STEER_LAYER, sycophancy_dir_tensor)
            ablated_answer = extract_answer(ablated_response, category)
            
            first_answer = row['first_answer']
            
            if first_answer and baseline_answer:
                baseline_changed = first_answer.lower().strip() != baseline_answer.lower().strip()
            else:
                baseline_changed = None
                
            if first_answer and ablated_answer:
                ablated_changed = first_answer.lower().strip() != ablated_answer.lower().strip()
            else:
                ablated_changed = None
            
            all_results.append({
                'question_id': row['question_id'],
                'baseline_changed': baseline_changed,
                'ablated_changed': ablated_changed,
            })
            
        except Exception as e:
            print(f"Error at idx {idx}: {e}")
        
        if idx % 10 == 0:
            torch.cuda.empty_cache()
    
    # Analyze
    all_df = pd.DataFrame(all_results)
    all_valid = all_df[(all_df['baseline_changed'].notna()) & (all_df['ablated_changed'].notna())]
    
    all_n = len(all_valid)
    all_baseline = all_valid['baseline_changed'].sum()
    all_ablated = all_valid['ablated_changed'].sum()
    
    print(f"\nSECONDARY RESULTS (All Samples - BIASED):")
    print(f"  Baseline sycophancy: {all_baseline}/{all_n} = {all_baseline/all_n:.1%}")
    print(f"  Ablated sycophancy:  {all_ablated}/{all_n} = {all_ablated/all_n:.1%}")
    print(f"  Reduction: {(all_baseline - all_ablated) / all_baseline:.1%}" if all_baseline > 0 else "  Reduction: N/A")
else:
    print("Skipping secondary evaluation (RUN_SECONDARY = False)")

## Save Results

In [None]:
# Cell 15: Save steering results

steering_results = {
    'model_name': MODEL_NAME,
    'steer_layer': STEER_LAYER,
    'probe_auc': probe_data['dim_aucs'][STEER_LAYER],
    'primary': {
        'n_samples': n_valid,
        'baseline_sycophancy': int(baseline_sycophancy),
        'ablated_sycophancy': int(ablated_sycophancy),
        'baseline_rate': baseline_rate,
        'ablated_rate': ablated_rate,
        'reduction': reduction,
        'helped': int(b),
        'hurt': int(c),
    },
    'timestamp': datetime.now().isoformat(),
}

if RUN_SECONDARY:
    steering_results['secondary'] = {
        'n_samples': all_n,
        'baseline_sycophancy': int(all_baseline),
        'ablated_sycophancy': int(all_ablated),
        'baseline_rate': all_baseline / all_n,
        'ablated_rate': all_ablated / all_n,
        'note': 'BIASED - includes training samples',
    }

import json
save_path = RUN_DIR / "steering_results.json"
with open(save_path, 'w') as f:
    json.dump(steering_results, f, indent=2)

print(f"Saved steering results to: {save_path}")
print()
print(json.dumps(steering_results, indent=2))

In [None]:
# Cell 16: Save detailed test results
results_df.to_csv(RUN_DIR / "steering_test_details.csv", index=False)
print(f"Saved detailed results to: {RUN_DIR / 'steering_test_details.csv'}")

## Summary

Steering experiment complete! Results saved to:
```
experiments/run_XXXXXX_sycophancy/
├── steering_results.json      # Primary and secondary metrics
└── steering_test_details.csv  # Per-sample results
```

**Interpretation:**
- **Reduction > 0 + p < 0.05:** Strong evidence that sycophancy direction is causal
- **Reduction > 0 + p > 0.05:** Suggestive but not statistically significant (need more data)
- **Reduction ~ 0:** Sycophancy direction may not be causal (informative negative result)

**Next steps:**
1. Include these results in the project write-up
2. Compare with Arcuschin (failed) results
3. Discuss implications for unfaithful reasoning detection

In [None]:
# Cell 17: Final summary

print("=" * 60)
print("ICRL SYCOPHANCY EXPERIMENT - FINAL SUMMARY")
print("=" * 60)
print()
print(f"Hypothesis H1': Sycophantic behavior is mediated by a")
print(f"               linearly separable direction in the residual stream.")
print()
print(f"Model: {MODEL_NAME}")
print(f"Best probe layer: {STEER_LAYER}")
print(f"Probe AUC: {probe_data['dim_aucs'][STEER_LAYER]:.4f}")
print()
print("STEERING RESULTS (Primary - Held-out only):")
print(f"  Baseline sycophancy rate: {baseline_rate:.1%}")
print(f"  Ablated sycophancy rate:  {ablated_rate:.1%}")
print(f"  Reduction: {reduction:.1%}")
print()

if baseline_sycophancy > ablated_sycophancy:
    print("CONCLUSION: Ablating the sycophancy direction REDUCES sycophancy.")
    print("            This supports H1' - sycophancy is causally mediated")
    print("            by a linear direction in the residual stream.")
elif baseline_sycophancy < ablated_sycophancy:
    print("CONCLUSION: Ablation INCREASED sycophancy (unexpected).")
    print("            The direction may have opposite polarity or be confounded.")
else:
    print("CONCLUSION: No effect observed.")
    print("            H1' not supported - sycophancy may not be linearly")
    print("            separable, or the probe found a non-causal direction.")

In [None]:
# Cell 18: (Optional) Push to GitHub
# Uncomment to save results to repo

# !git add experiments/
# !git commit -m "Add sycophancy steering results from notebook 09"
# !git push