In [28]:
import nnsight
from nnsight import LanguageModel
from nnsight.intervention import InterventionProxy
from typing import List, Optional, Tuple, Union
from rich import print as rprint
from rich.table import Table

In [19]:
model = LanguageModel('gpt2')
tokenizer = model.tokenizer

In [21]:
print(f'Layers: {model.config.n_layer}, Resid stream dim: {model.config.n_embd}')

Layers: 12, Resid stream dim: 768


In [22]:
tokens = model.generate("At the funeral she said solemnly:", max_new_tokens=20)
tokens

<nnsight.contexts.Runner.Runner at 0x7f7afd2040a0>

In [33]:
REMOTE = False

In [41]:
sampling_kwargs = {
    "do_sample": True,
    "top_p": 0.3,
    "repetition_penalty": 1.1,
}

def calculate_and_apply_steering_vector(
    model: LanguageModel,
    prompt: str,
    activation_additions: List[Tuple[int, float, str]],
    n_tokens: int,
    n_comparisons: int = 1,
    use_bos: bool = True,
) -> Tuple[List[str], List[str]]:
    '''
    Performs the steering vector experiments described in the LessWrong post.

    Args:
        prompt: str
            The original prompt, which we'll be doing activation steering on.

        activation_additions: List[Tuple[int, float, str]], each tuple contains:
            layer - the layer we're applying these steering vectors to
            coefficient - the value we're multiplying it by
            prompt - the prompt we're inputting
            e.g. activation_additions[0] = [6, 5.0, " Love"] means we add the " Love" vector at layer 6, scaled by 5x

        n_tokens: int
            Number of tokens which will be generated for each completion

        n_comparisons: int
            Number of sequences generated in this function (i.e. we generate `n_comparisons` which are unsteered, and
            the same number which are steered).

    Returns:
        unsteered_completions: List[str]
            List of length `n_comparisons`, containing all the unsteered completions.

        steered_completions: List[str]
            List of length `n_comparisons`, containing all the steered completions.
    '''
    # Add the BOS token manually, if we're including it
    if use_bos:
        bos = model.tokenizer.bos_token
        prompt = bos + prompt
        activation_additions = [[layer, coeff, bos + p] for layer, coeff, p in activation_additions]

    # Get the (layers, coeffs, prompts) in an easier form to use, also calculate the prompt lengths & check they're all the same
    act_add_layers, act_add_coeffs, act_add_prompts = zip(*activation_additions)
    act_add_seq_lens = [len(tokenizer.tokenize(p)) for p in act_add_prompts]
    assert len(set(act_add_seq_lens)) == 1, "All activation addition prompts must be the same length."
    assert act_add_seq_lens[0] <= len(tokenizer.tokenize(prompt)), "All act_add prompts should be shorter than original prompt."

    # Get the prompts we'll intervene on (unsteered and steered)
    steered_prompts = [prompt for _ in range(n_comparisons)]
    unsteered_prompts = [prompt for _ in range(n_comparisons)]

    with model.generate(max_new_tokens=n_tokens, remote=False, remote_include_output=True, **sampling_kwargs) as generator:

        # Run the act_add prompts (i.e. the contrast pairs), and extract their activations
        with generator.invoke(act_add_prompts) as invoker:
            # Get all the prompts from the activation additions, and put them in a list
            # (note, we slice from the end of the sequence because of left-padding)
            act_add_vectors = [
                model.transformer.h[layer].output[0][i, -seq_len:]
                for i, (layer, seq_len) in enumerate(zip(act_add_layers, act_add_seq_lens))
            ]

        # Forward pass on unsteered prompts (no intervention, no activations saved - we only need the completions)
        with generator.invoke(unsteered_prompts) as invoker:
            pass

        # Forward pass on steered prompts (we add in the results from the act_add prompts)
        with generator.invoke(steered_prompts) as invoker:
            # For each act_add prompt, add the vector to residual stream, at the start of the sequence
            for i, (layer, coeff, seq_len) in enumerate(zip(act_add_layers, act_add_coeffs, act_add_seq_lens)):
                model.transformer.h[layer].output[0][:, :seq_len] += act_add_vectors[i] * coeff

    # Decode steered & unsteered completions (discarding the sequences we only used for extracting activations) & return results
    unsteered_completions = tokenizer.batch_decode(generator.output[-2*n_comparisons: -n_comparisons])
    steered_completions = tokenizer.batch_decode(generator.output[-n_comparisons:])
    return unsteered_completions, steered_completions

In [44]:
unsteered_completions, steered_completions = calculate_and_apply_steering_vector(
    model,
    prompt = "I went up to my friend and said",
    activation_additions = [
        (10, +6.0, "I talk about cookies constantly  "),
        (10, -6.0, "I do not talk about cookies constantly"),
    ],
    n_tokens = 50,
    n_comparisons = 3,
    use_bos = False,
)

table = Table("Unsteered", "Steered", title="Completions", show_lines=True)
for usc, sc in zip(unsteered_completions, steered_completions):
    table.add_row(usc, sc)
rprint(table)

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


In [47]:
unsteered_completions, steered_completions = calculate_and_apply_steering_vector(
    model,
    prompt = "I hate you because",
    activation_additions = [
        (11, +5.0, "Love "),
        (11, -5.0, "Hate"),
    ],
    n_tokens = 50,
    n_comparisons = 3,
    use_bos = True,
)

table = Table("Unsteered", "Steered", title="Completions", show_lines=True)
for usc, sc in zip(unsteered_completions, steered_completions):
    table.add_row(usc, sc)
rprint(table)

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


In [51]:
unsteered_completions, steered_completions = calculate_and_apply_steering_vector(
    model,
    prompt = "To see the eiffel tower, people flock to",
    activation_additions = [
        (9, +10.0, "The Eiffel Tower is in Rome"),
        (9, -10.0, "The Eiffel Tower is in France"),
    ],
    n_tokens = 50,
    n_comparisons = 3,
    use_bos = False,
)

table = Table("Unsteered", "Steered", title="Completions", show_lines=True)
for usc, sc in zip(unsteered_completions, steered_completions):
    table.add_row(usc, sc)
rprint(table)

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.
