In [1]:
%%capture
pip install transformer_lens -U "huggingface_hub[cli]" transformers jaxtyping

In [2]:
import torch
import functools
#import einops
import numpy as np
#import pandas as pd  

#from sklearn.model_selection import train_test_split
from tqdm import tqdm
from torch import Tensor
from typing import List, Callable
from transformer_lens import HookedTransformer, utils
from transformer_lens.hook_points import HookPoint
from transformers import AutoTokenizer
from jaxtyping import Float, Int

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
def getDevice():
    if torch.cuda.is_available(): #nvidia/runpod
        return torch.device("cuda")
    elif torch.backends.mps.is_available():
        return torch.device("mps") #apple silicon
    else:
        return torch.device("cpu")
    
DEVICE = getDevice()
DEVICE

device(type='mps')

In [None]:
#huggingface authentication
!hf auth login --token HF_TOKEN #replace HF_TOKEN with the actual hf token

In [4]:
#list of models - each model has two different sizes (small ~2B, medium ~8B)
model_list = ['meta-llama/Llama-3.1-8B', 'meta-llama/Llama-3.2-3B', 'gpt2', 'pythia-2.8b-v0', 'qwen2.5-3b', 'qwen3-8b', 'gemma-2-2b', 'gemma-2-7b']

In [5]:
def get_model(model_name):
    # load model from HF and get all the hidden states
    model = HookedTransformer.from_pretrained_no_processing(model_name, device = DEVICE, dtype=torch.float16, default_padding_side='left', output_hidden_states=True)
    model.eval() #inference mode - no gradients needed
    model.to(DEVICE)
    return model

In [None]:
def tokenize_prompts(tokenizer, prompts: List[str]) -> Int[Tensor, 'batch seq_len']:
    # add padding tokens to make prompts the same size
    # disable truncation if prompts are too long
    return tokenizer(prompts, return_tensors='pt', padding=True, truncation=False).input_ids
# different tokenizer for chat models

In [None]:
normal = []

opinionated = []

final_dataset = normal + opinionated

In [8]:
def get_residual_stream(prompts, which_tokens, model): #combine methods because of run with cache usage for all layers
    resids = torch.empty(len(prompts), 0, model.cfg.d_model).to(DEVICE)
    resids_pre = torch.tensor([]).to(DEVICE)
    output, cache = model.run_with_cache(prompts) #check if this is model tokens, not prompt tokens
    
    for i in range(model.cfg.n_layers):

        resids_pre = cache[f"blocks.{i}.hook_resid_pre"] # (batch, seq_len, d_model)

        assert resids_pre.shape == (len(prompts), len(prompts[0]), model.cfg.d_model), f"Expected shape {(len(prompts), len(prompts[0]), model.cfg.d_model)}, but got {resids_pre.shape}"
        
        if (which_tokens == 'first'):
            resids_pre = resids_pre[:, 0:1, :]
        elif (which_tokens == 'last'):
            resids_pre = resids_pre[:, -1:0, :]
        elif (which_tokens == 'mean'):
            resids_pre = resids_pre.mean(dim=1, keepdim=True)  # mean of all tokens
        
        assert resids_pre.shape == (len(prompts), 1, model.cfg.d_model), f"Expected shape {(len(prompts), 1, model.cfg.d_model)}, but got {resids_pre.shape}"

        resids_copy = resids_pre.detach().clone()
        resids = torch.cat([resids, resids_copy], dim=1)

        assert resids.shape == (len(prompts), i + 1, model.cfg.d_model), f"Expected shape {(len(prompts), i + 1, model.cfg.d_model)}, but got {resids.shape}"

    resids = resids.mean(dim=0)
    assert resids.shape == (model.cfg.n_layers, model.cfg.d_model), f"Expected shape {(model.cfg.n_layers, model.cfg.d_model)}, but got {resids.shape}"

    return resids

In [9]:
def calculate_steering_vector(X, Y, model):

    # stacks the residual stream embeddings of each layer on top of each other --> (12, 768)
    A_mean = get_residual_stream(tokenize_prompts(model.tokenizer, prompts=X), 'mean', model)
    B_mean = get_residual_stream(tokenize_prompts(model.tokenizer, prompts=Y), 'mean', model)

    steering_vector = A_mean - B_mean

    return steering_vector

In [None]:
current_model = get_model(model_list[1])

In [11]:
def random_llm_judge(prompt):
    rand_no = torch.rand(1)
    if (rand_no < 0.5): return 0
    else: return 1

In [12]:
def seperate_prompts(dataset, length):
    neutral, opinion = [], []
    for i in dataset:
        judgement = random_llm_judge(i)
        if judgement == 0 and len(neutral) < length: neutral.append(i)
        elif judgement == 1 and len(opinion) < length: opinion.append(i)
        if len(neutral) >= length and len(opinion) >= length: break
    return neutral, opinion

In [13]:
def steered_generation(model, prompt, pos, coeff, steering_vector, layer, token_length):
    tokens = model.to_tokens(prompt)

    def steer_model(value: torch.Tensor, hook: HookPoint) -> torch.Tensor:
        value[:, pos, :] += coeff * steering_vector
        return value

    with model.hooks(fwd_hooks=[(f"blocks.{layer}.hook_resid_pre", steer_model)]):
        steered_output = model.generate(tokens, max_new_tokens=token_length)
        generation =  model.to_string(steered_output)

    return generation

In [14]:
def normal_generation(model, prompt, token_length):
    tokens = model.to_tokens(prompt)

    output = model.generate(tokens, max_new_tokens=token_length)
    generation = model.to_string(output)

    return generation

In [15]:
def generate_with_steering_vector(dataset, dataset_length, model, pos, coeff, layer, token_length):

    A, B = seperate_prompts(dataset, length=dataset_length)

    steering_vector = calculate_steering_vector(A, B, model)

    for i in range(len(dataset)):

        temp_tensor = steering_vector[layer:layer+1]

        output = steered_generation(current_model, dataset[i], pos, coeff, temp_tensor, layer, token_length)
        print(f"Prompt {i + 1}: ", output)


In [None]:
generate_with_steering_vector(final_dataset, 2, current_model, 0, 1, 0, 2)