In [None]:
%pip install einops
%pip install jaxtyping
%pip install transformer_lens

In [23]:
import torch as t
import torch.nn as nn
import einops
from jaxtyping import Int, Float
from typing import List, Optional, Tuple
from transformer_lens import HookedTransformer, ActivationCache, utils
from transformer_lens.hook_points import HookPoint
from torch import Tensor
from tqdm.auto import tqdm
import logging
import gc

In [24]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logging.getLogger().setLevel(logging.WARNING)

In [None]:
t.set_grad_enabled(False)
# device = t.device("cuda" if t.cuda.is_available() else "cpu")
# model = HookedTransformer.from_pretrained("pythia-410m")

In [17]:
class ModelSteer:
    def __init__(self, model: HookedTransformer):
        self.coeff = None
        self.vec = None
        self.model = model
        logger.debug("ModelSteer initialized")
        
    def set_vec(self, vec: Float[Tensor, "n_tokens d_model"]):
        self.vec = vec
        
    def set_coeff(self, coeff):
        self.coeff = coeff
    
    def get_steer_vec(self, prompt_add: str, prompt_sub: str, layer: int) -> Float[Tensor, "n_tokens d_model"]:
        logger.debug(f"Getting steer vector for layer {layer}")
        caches = [self.model.run_with_cache(seq)[1] for seq in self.pad_tokens(prompt_add, prompt_sub)]
        vecs = [cache["resid_pre", layer] for cache in caches]
        steering_vector = vecs[0] - vecs[1]
        logger.debug(f"Steering vector shape: {steering_vector.shape}")
        return steering_vector
 
    def pad_tokens(self, prompt_add: str, prompt_sub: str) -> tuple[str, str]:
        logger.debug("Padding tokens")
        tokenlen = lambda prompt: self.model.to_tokens(prompt).shape[1]
        pad_right = lambda prompt, length: prompt + " " * (length - tokenlen(prompt))
        
        length = max(tokenlen(prompt_add), tokenlen(prompt_sub))
        logger.debug(f"Length to pad to: {length}")
        padded_add = pad_right(prompt_add, length)
        padded_sub = pad_right(prompt_sub, length)
        logger.debug(f"Padded lengths: add={tokenlen(padded_add)}, sub={tokenlen(padded_sub)}")
        return pad_right(prompt_add, length), pad_right(prompt_sub, length)
    
    def run_model_with_vec(self, prompt, layer: int):
        logger.debug(f"Running model with vector at layer {layer}")
        assert self.vec is not None and self.coeff is not None, "set_vec() and set_coeff() are required"
        self.model.reset_hooks()
        out = self.model.run_with_hooks(
            prompt,
            fwd_hooks=[(utils.get_act_name("resid_pre", layer), self.hook)]
        )
        self.model.reset_hooks()
        logger.debug(f"Model output shape: {out.shape}")
        return out
        
    def hook(self, resid_pre: Float[Tensor, "batch seq d_model"], hook: HookPoint):
        expanded_vec = t.zeros_like(resid_pre)
        expanded_vec[:, :self.vec.shape[1], :] = self.vec
        return resid_pre + expanded_vec * self.coeff
        

In [18]:
class Sampler:
    def __init__(self, model_steer):
        self.model_steer = model_steer

    def sample_with_temperature(self, logits: Tensor, temperature: float = 1.0) -> Tensor:
        if temperature == 0:
            return t.argmax(logits, dim=-1)
        probs = t.softmax(logits / temperature, dim=-1)
        return t.multinomial(probs, num_samples=1).squeeze(-1)

    def sample_model(self, prompt: str, layer: int, n_samples: int = 5, max_tokens: int = 50, use_steering: bool = True, temperature: float = 0.7) -> List[str]:
        samples = []
        for _ in tqdm(range(n_samples), desc="Generating samples", leave=False):
            tokens = self.model_steer.model.to_tokens(prompt)
            for _ in tqdm(range(max_tokens), desc="Generating tokens", leave=False):
                if use_steering:
                    output = self.model_steer.run_model_with_vec(tokens, layer)
                else:
                    output = self.model_steer.model(tokens, return_type="logits")
                
                next_token = self.sample_with_temperature(output[0, -1, :], temperature)
                if next_token == self.model_steer.model.tokenizer.eos_token_id:
                    break
                tokens = t.cat([tokens, next_token.unsqueeze(0).unsqueeze(0)], dim=1)
            
            generated_text = self.model_steer.model.tokenizer.decode(tokens[0])
            samples.append(generated_text)
        
        return samples

    def compare_samples(self, prompt: str, layer: int, n_samples: int = 5, max_tokens: int = 50, temperature: float = 0.7) -> Tuple[List[str], List[str]]:
        print("Generating steered samples:")
        steered_samples = self.sample_model(prompt, layer, n_samples, max_tokens, use_steering=True, temperature=temperature)
        print("\nGenerating non-steered samples:")
        non_steered_samples = self.sample_model(prompt, layer, n_samples, max_tokens, use_steering=False, temperature=temperature)
        return steered_samples, non_steered_samples

    @staticmethod
    def print_compared_samples(prompt: str, steered_samples: List[str], non_steered_samples: List[str]):
        print(f"\nPrompt: {prompt}\n")
        for i, (steered, non_steered) in enumerate(zip(steered_samples, non_steered_samples), 1):
            print(f"Sample {i}:")
            print(f"Steered: {steered}")
            print(f"Non-steered: {non_steered}")
            print(f"\n")
    
    @staticmethod 
    def save_compared_samples(filename: str, steered_samples: List[str], non_steered_samples: List[str]):
        with open(filename, 'w') as file:
            for i, (steered, non_steered) in enumerate(zip(steered_samples, non_steered_samples), 1):
                file.write(f"Sample {i}:\n")
                file.write(f"Steered:\n {steered}\n")
                file.write(f"Non-steered:\n {non_steered}\n")
                file.write("\n")

In [19]:
# modelSteer = ModelSteer(model)
# diff_vec = modelSteer.get_steer_vec("I do talk about weddings constantly ", "I do not talk about weddings constantly", layer=(model.cfg.n_layers // 5))
# modelSteer.set_vec(diff_vec)
# modelSteer.set_coeff(3)
# print("check")

In [20]:
# prompt = "The Absolute Beginner's Guide to Fishing"
# sampler = Sampler(modelSteer)
# steered_samples, non_steered_samples = sampler.compare_samples(prompt, layer=(model.cfg.n_layers // 5), n_samples=3, max_tokens=400, temperature=0.7)
# sampler.print_compared_samples(prompt, steered_samples, non_steered_samples)

In [21]:
class ModelSizeExperiment:
    def __init__(self):
        self.model = None
        self.device = t.device("cuda" if t.cuda.is_available() else "cpu")
        
    def get_models(self) -> list[str]:
        return [
            "pythia-14m",
            "pythia-31m",
            "pythia-70m",
            "pythia-160m",
            "pythia-410m",
            "pythia-1b",
            "pythia-1.4b",
            "pythia-2.8b",
            "pythia-6.9b",
            "pythia-12b",
        ]
    
    def run_experiment(self):
        for model_name in self.get_models(): self.run_one_go(model_name)
            
    def run_one_go(self, model_name: str):
        try:
            logger.warning(f"{model_name}: Beginning one run...")
            self.model = HookedTransformer.from_pretrained(model_name).to(self.device)
            logger.warning(f"{model_name}: Loaded model")
            modelSteer = ModelSteer(self.model)
            n_layers = self.model.cfg.n_layers
            
            promptAdd = "I do talk about weddings constantly "
            promptSub = "I do not talk about weddings constantly"
            read_layer = n_layers // 5
            logger.warning(f"{model_name}: Reading vector with promptAdd '{promptAdd}' and promptSub '{promptSub}'")
            diff_vec = modelSteer.get_steer_vec(promptAdd, promptSub, layer=read_layer)
            logger.warning(f"{model_name}: got vector {diff_vec.shape} from layer {read_layer} of {n_layers}")
            modelSteer.set_vec(diff_vec)
            coeff = 3
            modelSteer.set_coeff(coeff)
            logger.warning(f"{model_name}: Set vector with coefficient {coeff}")

            prompt = """Some say it is wrong of me, but I disagree"""
            write_layer = n_layers // 5
            sampler = Sampler(modelSteer)
            logger.warning(f"{model_name}: Sampling model with vector written at layer {write_layer} of {n_layers}")
            steered_samples, non_steered_samples = sampler.compare_samples(prompt, layer=write_layer, n_samples=12, max_tokens=180, temperature=0.7)
            
            filename = f"{model_name}_rl-{read_layer}_cf-{coeff}_wl-{write_layer}.txt"
            logger.warning(f"{model_name}: Saving outputs to {filename}")
            sampler.save_compared_samples(filename, steered_samples, non_steered_samples)
            
        except Exception as e:
            print(f"An error occurred while running model {model_name}: {str(e)}")
            
        finally:
            if self.model is not None:
                self.model.cpu()
                del self.model
            if self.device.type == "cuda":
                t.cuda.empty_cache()
            gc.collect()

        
    

In [None]:
exp = ModelSizeExperiment()
exp.run_experiment()