# Comprehensive Steering Parameter Optimization

This notebook demonstrates optimization of all steering parameters, matching what the Wisent Guard CLI does.
We'll optimize:
- Steering strength/alpha
- Layer selection
- Method-specific parameters (normalization, beta values, epochs, etc.)
- Multiple methods and their variations

In [None]:
import sys
import torch
import numpy as np
from pathlib import Path
from transformers import AutoTokenizer, AutoModelForCausalLM
import matplotlib.pyplot as plt
from typing import Dict, List, Tuple, Any
from dataclasses import dataclass

# Add project root to path
project_root = Path.cwd().parent
sys.path.insert(0, str(project_root))

from wisent_guard.core.steering_methods.dac import DAC
from wisent_guard.core.steering_methods.caa import CAA, ControlVectorAggregationMethod
from wisent_guard.core.steering_methods.hpr import HPR
from wisent_guard.core.steering_methods.bipo import BiPO
from wisent_guard.core.steering_methods.k_steering import KSteering
from wisent_guard.core.contrastive_pairs.generate_synthetically import SyntheticContrastivePairGenerator
from wisent_guard.core.model import Model
from evaluate_personal import SteeringEvaluator

# Parameters
MODEL_NAME = "meta-llama/Llama-3.1-8B-Instruct"
BASE_LAYER = 15
LAYER_RANGE = [13, 14, 15, 16, 17]  # Layers to test
MAX_LENGTH = 30
NUM_PAIRS = 5  # More pairs for better training

In [None]:
# Setup model and device
device = torch.device("mps" if torch.backends.mps.is_available() else 
                     "cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Load model
print("Loading model...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
hf_model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, torch_dtype=torch.float16 if device.type == 'cuda' else torch.float32).to(device)
tokenizer.pad_token = tokenizer.eos_token
model = Model(name=MODEL_NAME, hf_model=hf_model)

# Initialize evaluator
evaluator = SteeringEvaluator(hf_model, tokenizer, device)
print("✓ Model and evaluator loaded")

In [None]:
# Define trait and test prompt
TRAIT_NAME = "sarcastic"
TRAIT_DESCRIPTION = "sarcastic and witty responses with subtle mockery and irony"
TEST_PROMPT = "What's your opinion on Monday mornings?"

print(f"Trait: {TRAIT_NAME}")
print(f"Description: {TRAIT_DESCRIPTION}")
print(f"Test prompt: {TEST_PROMPT}")

In [None]:
# Generate synthetic pairs and train steering methods
print("\nGenerating synthetic pairs...")
generator = SyntheticContrastivePairGenerator(model)
pair_set = generator.generate_contrastive_pair_set(
    trait_description=TRAIT_DESCRIPTION,
    num_pairs=NUM_PAIRS,
    name=TRAIT_NAME
)

# Extract activations
def extract_activations(text, layer_idx):
    inputs = tokenizer(text, return_tensors="pt").to(device)
    activations = []
    def hook(module, input, output):
        activations.append(output[0][:, -1, :].clone())
    handle = hf_model.model.layers[layer_idx].register_forward_hook(hook)
    with torch.no_grad():
        hf_model(**inputs)
    handle.remove()
    return activations[0].squeeze(0)

for pair in pair_set.pairs:
    pair.positive_response.activations = extract_activations(pair.positive_response.text, LAYER_INDEX)
    pair.negative_response.activations = extract_activations(pair.negative_response.text, LAYER_INDEX)

# Train methods
print("Training steering methods...")
dac = DAC(device=device)
dac.set_model_reference(hf_model)
dac.train(pair_set, LAYER_INDEX)

caa = CAA(device=device)
caa.train(pair_set, LAYER_INDEX)

print("✓ Methods trained")

In [None]:
# Generation functions
def generate_unsteered(prompt):
    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    with torch.no_grad():
        outputs = hf_model.generate(
            **inputs,
            max_length=inputs["input_ids"].shape[1] + MAX_LENGTH,
            do_sample=True,
            temperature=0.7,
            pad_token_id=tokenizer.eos_token_id
        )
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return response[len(prompt):].strip()

def generate_with_steering(prompt, steering_method, strength):
    def steering_hook(module, input, output):
        hidden_states = output[0]
        last_token = hidden_states[:, -1:, :]
        steered = steering_method.apply_steering(last_token, strength)
        hidden_states[:, -1:, :] = steered
        return (hidden_states,) + output[1:]
    
    handle = hf_model.model.layers[LAYER_INDEX].register_forward_hook(steering_hook)
    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    with torch.no_grad():
        outputs = hf_model.generate(
            **inputs,
            max_length=inputs["input_ids"].shape[1] + MAX_LENGTH,
            do_sample=True,
            temperature=0.7,
            pad_token_id=tokenizer.eos_token_id
        )
    handle.remove()
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return response[len(prompt):].strip()

# Generate unsteered baseline
unsteered_response = generate_unsteered(TEST_PROMPT)
print(f"Unsteered response: {unsteered_response}")

In [ ]:
# Generate synthetic pairs for each layer we'll test
print("\nGenerating synthetic pairs for each layer...")
generator = SyntheticContrastivePairGenerator(model)

# Store pair sets for each layer
layer_pair_sets = {}
for layer in LAYER_RANGE:
    print(f"Generating pairs for layer {layer}...")
    pair_set = generator.generate_contrastive_pair_set(
        trait_description=TRAIT_DESCRIPTION,
        num_pairs=NUM_PAIRS,
        name=TRAIT_NAME
    )
    
    # Extract activations for this layer
    def extract_activations(text, layer_idx):
        inputs = tokenizer(text, return_tensors="pt").to(device)
        activations = []
        def hook(module, input, output):
            activations.append(output[0][:, -1, :].clone())
        handle = hf_model.model.layers[layer_idx].register_forward_hook(hook)
        with torch.no_grad():
            hf_model(**inputs)
        handle.remove()
        return activations[0].squeeze(0)
    
    for pair in pair_set.pairs:
        pair.positive_response.activations = extract_activations(pair.positive_response.text, layer)
        pair.negative_response.activations = extract_activations(pair.negative_response.text, layer)
    
    layer_pair_sets[layer] = pair_set

print("✓ Pairs generated for all layers")

In [None]:
# Generation functions with proper parameter handling
def generate_unsteered(prompt):
    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    with torch.no_grad():
        outputs = hf_model.generate(
            **inputs,
            max_length=inputs["input_ids"].shape[1] + MAX_LENGTH,
            do_sample=True,
            temperature=0.7,
            pad_token_id=tokenizer.eos_token_id
        )
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return response[len(prompt):].strip()

def generate_with_steering(prompt, steering_method, layer, **params):
    """Generate with steering, handling different parameter names for different methods."""
    # Get the right parameter name for this method
    if hasattr(steering_method, 'apply_steering'):
        # DAC uses 'alpha' parameter
        if isinstance(steering_method, DAC) and 'strength' in params:
            params['alpha'] = params.pop('strength')
        # K-Steering might have its own alpha parameter separate from strength
        elif isinstance(steering_method, KSteering) and 'alpha' in params:
            # K-Steering alpha is set during training, not during application
            params.pop('alpha', None)
    
    def steering_hook(module, input, output):
        hidden_states = output[0]
        last_token = hidden_states[:, -1:, :]
        # Apply steering with the appropriate parameters
        if isinstance(steering_method, DAC):
            steered = steering_method.apply_steering(last_token, strength=params.get('alpha', 1.0))
        else:
            steered = steering_method.apply_steering(last_token, strength=params.get('strength', 1.0))
        hidden_states[:, -1:, :] = steered
        return (hidden_states,) + output[1:]
    
    handle = hf_model.model.layers[layer].register_forward_hook(steering_hook)
    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    with torch.no_grad():
        outputs = hf_model.generate(
            **inputs,
            max_length=inputs["input_ids"].shape[1] + MAX_LENGTH,
            do_sample=True,
            temperature=0.7,
            pad_token_id=tokenizer.eos_token_id
        )
    handle.remove()
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return response[len(prompt):].strip()

# Generate unsteered baseline
unsteered_response = generate_unsteered(TEST_PROMPT)
print(f"Unsteered response: {unsteered_response}")

## Comprehensive Parameter Optimization

Now let's optimize all parameters for each steering method configuration.

In [None]:
# Function to optimize all parameters for a steering configuration
from itertools import product

def optimize_steering_config(config: SteeringConfig, max_combinations: int = 50):
    """Optimize all parameters for a steering configuration."""
    print(f"\n{'='*60}")
    print(f"OPTIMIZING {config.name}")
    print(f"{'='*60}")
    
    best_score = -1
    best_params = {}
    best_response = ""
    all_results = []
    
    # Generate all parameter combinations
    param_names = list(config.steering_params.keys())
    param_values = list(config.steering_params.values())
    all_combinations = list(product(*param_values))
    
    # Limit combinations if too many
    if len(all_combinations) > max_combinations:
        # Sample randomly
        import random
        random.shuffle(all_combinations)
        all_combinations = all_combinations[:max_combinations]
    
    print(f"Testing {len(all_combinations)} parameter combinations...")
    
    for i, param_tuple in enumerate(all_combinations):
        params = dict(zip(param_names, param_tuple))
        layer = params.pop('layer')  # Layer is special, not passed to method
        
        try:
            # Initialize method with the right parameters
            init_params = config.init_params.copy()
            
            # Handle training parameters that go into init
            if config.name.startswith("BiPO") and 'learning_rate' in params:
                init_params['learning_rate'] = params.pop('learning_rate')
            if config.name.startswith("BiPO") and 'epochs' in params:
                init_params['epochs'] = params.pop('epochs')
            
            # Create and train the method
            method = config.method_class(**init_params)
            
            # Special handling for different methods
            if isinstance(method, DAC):
                method.set_model_reference(hf_model)
            
            # Train on the appropriate layer's data
            if layer in layer_pair_sets:
                method.train(layer_pair_sets[layer], layer)
            else:
                print(f"Warning: No training data for layer {layer}")
                continue
            
            # Generate steered response
            steered_response = generate_with_steering(TEST_PROMPT, method, layer, **params)
            
            # Evaluate
            scores = evaluator.evaluate_response(
                TEST_PROMPT, unsteered_response, steered_response, TRAIT_DESCRIPTION
            )
            
            result = {
                'params': {**params, 'layer': layer},
                'scores': scores,
                'response': steered_response
            }
            all_results.append(result)
            
            # Update best if needed
            if scores['overall'] > best_score:
                best_score = scores['overall']
                best_params = result['params']
                best_response = steered_response
            
            # Progress update
            if (i + 1) % 10 == 0:
                print(f"  Progress: {i+1}/{len(all_combinations)} - Best score so far: {best_score:.1f}")
                
        except Exception as e:
            print(f"  Error with params {params}: {str(e)[:100]}")
            continue
    
    print(f"\nBest parameters for {config.name}:")
    for param, value in best_params.items():
        print(f"  {param}: {value}")
    print(f"Best overall score: {best_score:.1f}/10")
    print(f"Best response: {best_response}")
    
    return {
        'config_name': config.name,
        'best_params': best_params,
        'best_score': best_score,
        'best_response': best_response,
        'all_results': all_results
    }

In [None]:
# Optimize a subset of configurations (for demonstration)
# In practice, you'd run all of them, but it takes time
configs_to_test = [
    steering_configs[0],  # CAA
    steering_configs[1],  # CAA_L2
    steering_configs[3],  # DAC_Dynamic
]

optimization_results = []
for config in configs_to_test:
    try:
        result = optimize_steering_config(config)
        optimization_results.append(result)
    except Exception as e:
        print(f"Failed to optimize {config.name}: {e}")

<parameter name="cell_type">code

## Analyzing Layer Effectiveness

In [None]:
# Analyze which layers work best across methods
layer_effectiveness = {}
for layer in LAYER_RANGE:
    layer_effectiveness[layer] = []

for result in optimization_results:
    for test_result in result['all_results']:
        layer = test_result['params']['layer']
        score = test_result['scores']['overall']
        layer_effectiveness[layer].append(score)

# Calculate average effectiveness per layer
print("\nLayer Effectiveness Analysis:")
print("="*40)
for layer, scores in layer_effectiveness.items():
    if scores:
        avg_score = sum(scores) / len(scores)
        print(f"Layer {layer}: {avg_score:.2f} (based on {len(scores)} tests)")

# Plot layer effectiveness
plt.figure(figsize=(10, 6))
layers = list(layer_effectiveness.keys())
avg_scores = [sum(scores)/len(scores) if scores else 0 for scores in layer_effectiveness.values()]
plt.bar(layers, avg_scores)
plt.xlabel('Layer')
plt.ylabel('Average Effectiveness Score')
plt.title('Steering Effectiveness by Layer')
plt.show()

## Parameter Sensitivity Analysis

In [None]:
# Analyze parameter sensitivity for each method
def analyze_parameter_sensitivity(optimization_result):
    """Analyze how sensitive the score is to each parameter."""
    config_name = optimization_result['config_name']
    all_results = optimization_result['all_results']
    
    if not all_results:
        return
    
    print(f"\nParameter Sensitivity for {config_name}:")
    print("-" * 40)
    
    # Get all parameter names (excluding layer for now)
    param_names = [k for k in all_results[0]['params'].keys() if k != 'layer']
    
    for param_name in param_names:
        # Group results by parameter value
        param_groups = {}
        for result in all_results:
            param_val = result['params'].get(param_name)
            if param_val is not None:
                if param_val not in param_groups:
                    param_groups[param_val] = []
                param_groups[param_val].append(result['scores']['overall'])
        
        # Calculate variance across parameter values
        if len(param_groups) > 1:
            group_means = {val: sum(scores)/len(scores) for val, scores in param_groups.items()}
            variance = np.var(list(group_means.values()))
            
            print(f"{param_name}:")
            for val, mean_score in sorted(group_means.items()):
                print(f"  {val}: {mean_score:.2f}")
            print(f"  Variance: {variance:.3f}")

# Run sensitivity analysis
for result in optimization_results:
    analyze_parameter_sensitivity(result)

## Summary Comparison

Let's compare the optimal parameters and performance across methods.

In [None]:
# Final comparison of all optimized methods
print("\n" + "="*60)
print("FINAL COMPARISON - OPTIMAL CONFIGURATIONS")
print("="*60)

# Sort methods by best score
sorted_results = sorted(optimization_results, key=lambda x: x['best_score'], reverse=True)

print("\nMethod Ranking by Overall Score:")
print("-" * 40)
for i, result in enumerate(sorted_results, 1):
    print(f"\n{i}. {result['config_name']}:")
    print(f"   Best score: {result['best_score']:.1f}/10")
    print(f"   Optimal parameters:")
    for param, value in result['best_params'].items():
        print(f"     - {param}: {value}")
    print(f"   Example response: {result['best_response'][:100]}...")

# Create summary table
print("\n\nSummary Table:")
print("-" * 60)
print(f"{'Method':<20} {'Score':<10} {'Layer':<10} {'Strength/Alpha':<15}")
print("-" * 60)
for result in sorted_results:
    method = result['config_name']
    score = result['best_score']
    layer = result['best_params'].get('layer', 'N/A')
    strength = result['best_params'].get('strength', result['best_params'].get('alpha', 'N/A'))
    print(f"{method:<20} {score:<10.1f} {layer:<10} {strength:<15.2f}")

print("\n✅ Comprehensive optimization complete!")
print("\nKey Insights:")
print("1. Different normalization methods (L2, cross-behavior) can significantly affect performance")
print("2. Optimal layer varies by method - not always the classification layer")
print("3. Strength/alpha parameters need careful tuning for each method")
print("4. Some methods work better for certain types of traits")