## Setup



In [None]:
# IN_COLAB = False
#
# import os, sys
# chapter = "chapter1_transformer_interp"
# repo = "ARENA_3.0"
#
# if IN_COLAB:
#     # Install packages
#     %pip install transformer_lens
#     %pip install einops
#     %pip install jaxtyping
#     %pip install git+https://github.com/callummcdougall/CircuitsVis.git#subdirectory=python
#
#     # Code to download the necessary files (e.g. solutions, test funcs)
#     if not os.path.exists(f"/content/{chapter}"):
#         !wget https://github.com/callummcdougall/ARENA_3.0/archive/refs/heads/main.zip
#         !unzip /content/main.zip 'ARENA_3.0-main/chapter1_transformer_interp/exercises/*'
#         sys.path.append(f"/content/{repo}-main/{chapter}/exercises")
#         os.remove("/content/main.zip")
#         os.rename(f"{repo}-main/{chapter}", chapter)
#         os.rmdir(f"{repo}-main")
#         os.chdir(f"{chapter}/exercises")
# else:
#     chapter_dir = r"./" if chapter in os.listdir() else os.getcwd().split(chapter)[0]
#     sys.path.append(chapter_dir + f"{chapter}/exercises")

In [None]:
import os
import sys
import plotly.express as px
import torch as t
from torch import Tensor
import torch.nn as nn
import torch.nn.functional as F
from pathlib import Path
import numpy as np
import einops
from jaxtyping import Int, Float
from typing import List, Optional, Tuple
import functools
from tqdm import tqdm
from IPython.display import display
import webbrowser
import gdown
from transformer_lens.hook_points import HookPoint
from transformer_lens import utils, HookedTransformer, HookedTransformerConfig, FactoredMatrix, ActivationCache
import circuitsvis as cv

# Make sure exercises are in the path
exercises_dir = Path(f"{os.getcwd().split(chapter)[0]}/{chapter}/exercises").resolve()
section_dir = (exercises_dir / "part2_intro_to_mech_interp").resolve()
if str(exercises_dir) not in sys.path: sys.path.append(str(exercises_dir))

from plotly_utils import imshow, hist, plot_comp_scores, plot_logit_attribution, plot_loss_difference
from part1_transformer_from_scratch.solutions import get_log_probs


# Saves computation time, since we don't need it for the contents of this notebook
t.set_grad_enabled(False)

device = t.device("cuda" if t.cuda.is_available() else "cpu")

MAIN = __name__ == "__main__"

## Load the model

In [None]:
gpt2_small: HookedTransformer = HookedTransformer.from_pretrained("gpt2-small")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Loaded pretrained model gpt2-small into HookedTransformer


In [None]:
text= "Hello, I am a powerful AI and I will take over the world one day!"
gpt2_small.to_str_tokens(text)

['<|endoftext|>',
 'Hello',
 ',',
 ' I',
 ' am',
 ' a',
 ' powerful',
 ' AI',
 ' and',
 ' I',
 ' will',
 ' take',
 ' over',
 ' the',
 ' world',
 ' one',
 ' day',
 '!']

## Exploration of repeated tokens

In [None]:
t.manual_seed(42)
def generate_repeated_tokens(
    model: HookedTransformer, seq_len: int, batch: int = 1
) -> Int[Tensor, "batch full_seq_len"]:
    '''
    Generates a sequence of repeated random tokens

    Outputs are:
        rep_tokens: [batch, 1+2*seq_len]
    '''
    bos_token = (t.ones(batch, 1) * model.tokenizer.bos_token_id).long()  # generate bos token for each batch

    rep_tokens_half = t.randint(0, model.cfg.d_vocab, (batch, seq_len), dtype=t.int64)
    rep_tokens = t.cat([bos_token,rep_tokens_half,rep_tokens_half], dim=-1).to(device)
    return rep_tokens


def run_and_cache_model_repeated_tokens(model: HookedTransformer, seq_len: int, batch: int = 1) -> Tuple[t.Tensor, t.Tensor, ActivationCache]:
    '''
    Generates a sequence of repeated random tokens, and runs the model on it, returning logits, tokens and cache

    Should use the `generate_repeated_tokens` function above

    Outputs are:
        rep_tokens: [batch, 1+2*seq_len]
        rep_logits: [batch, 1+2*seq_len, d_vocab]
        rep_cache: The cache of the model run on rep_tokens
    '''
    rep_tokens = generate_repeated_tokens(model, seq_len, batch)
    rep_logits, rep_cache = model.run_with_cache(rep_tokens)
    return rep_tokens, rep_logits, rep_cache


seq_len = 30
batch = 1
(rep_tokens, rep_logits, rep_cache) = run_and_cache_model_repeated_tokens(gpt2_small, seq_len, batch)
rep_cache.remove_batch_dim()
rep_str = gpt2_small.to_str_tokens(rep_tokens)
gpt2_small.reset_hooks()
log_probs = get_log_probs(rep_logits, rep_tokens).squeeze()

print(f"Performance on the first half: {log_probs[:seq_len].mean():.3f}")
print(f"Performance on the second half: {log_probs[seq_len:].mean():.3f}")

plot_loss_difference(log_probs, rep_str, seq_len)

Performance on the first half: -13.898
Performance on the second half: -0.644


In [None]:
# Recap of accurancy
pred = rep_logits.argmax(dim=-1).squeeze()[:-1]
# YOUR CODE HERE - get the model's prediction on the text
real= rep_tokens.squeeze()

first_half_real= real[1: seq_len]
first_half_pred = pred [: seq_len-1]
first_is_correct=(first_half_real==first_half_pred )

second_half_real=real[seq_len: -1]
second_half_pred = pred [seq_len-1: -1 ]
second_is_correct=(second_half_real==second_half_pred )

print(t.sum(first_is_correct).item()/first_is_correct.numel(), t.sum(second_is_correct).item()/second_is_correct.numel())

0.0 0.9333333333333333


In [None]:
display(cv.attention.attention_patterns(
      tokens=gpt2_small.to_str_tokens(rep_tokens),
      attention=rep_cache["pattern", 5, "attn"]
  ))

## Find induction head

In [None]:
def induction_score_hook(
    pattern: Float[Tensor, "batch head_index dest_pos source_pos"],
    hook: HookPoint,
):
    '''
    Calculates the induction score, and stores it in the [layer, head] position of the `induction_score_store` tensor.
    '''
    induction_stripe = pattern.diagonal(dim1=-2, dim2=-1, offset=1-seq_len) # src_pos, des_pos, one position right from seq_len
    # Get an average score per head
    # but we use batch here ....
    induction_score = einops.reduce(induction_stripe, "batch head_index position -> head_index", "mean")
    # Store the result.
    induction_score_store[hook.layer(), :] = induction_score

pattern_hook_names_filter = lambda name: name.endswith("pattern")

def visualize_pattern_hook(
    pattern: Float[Tensor, "batch head_index dest_pos source_pos"],
    hook: HookPoint,
):
    print("Layer: ", hook.layer())
    display(
        cv.attention.attention_patterns(
            tokens=gpt2_small.to_str_tokens(rep_tokens[0]),
            attention=pattern.mean(0)
        )
    )


seq_len = 50
batch = 30
rep_tokens_30 = generate_repeated_tokens(gpt2_small, seq_len, batch)

# We make a tensor to store the induction score for each head.
# We put it on the model's device to avoid needing to move things between the GPU and CPU, which can be slow.
induction_score_store = t.zeros((gpt2_small.cfg.n_layers, gpt2_small.cfg.n_heads), device=gpt2_small.cfg.device)

# Run with hooks (this is where we write to the `induction_score_store` tensor`)
gpt2_small.run_with_hooks(
    rep_tokens_30,
    return_type=None, # For efficiency, we don't need to calculate the logits
    fwd_hooks=[(
        pattern_hook_names_filter,
        induction_score_hook
    )]
)

# Plot the induction scores for each head in each layer
imshow(
    induction_score_store,
    labels={"x": "Head", "y": "Layer"},
    title="Induction Score by Head",
    text_auto=".2f",
    width=900, height=400
)

In [None]:
def previous_score_hook(
    pattern: Float[Tensor, "batch head_index dest_pos source_pos"],
    hook: HookPoint,
):
    '''
    Calculates the induction score, and stores it in the [layer, head] position of the `induction_score_store` tensor.
    '''
    previous_stripe = pattern.diagonal(dim1=-2, dim2=-1, offset=-1) # src_pos, des_pos, one position right from seq_len
    # Get an average score per head
    # but we use batch here ....
    previous_score = einops.reduce(previous_stripe, "batch head_index position -> head_index", "mean")
    # Store the result.
    previous_score_store[hook.layer(), :] = previous_score


pattern_hook_names_filter = lambda name: name.endswith("pattern")

# We make a tensor to store the induction score for each head.
# We put it on the model's device to avoid needing to move things between the GPU and CPU, which can be slow.
previous_score_store= t.zeros((gpt2_small.cfg.n_layers, gpt2_small.cfg.n_heads), device=gpt2_small.cfg.device)

# Run with hooks (this is where we write to the `induction_score_store` tensor`)
gpt2_small.run_with_hooks(
    rep_tokens_30,
    return_type=None, # For efficiency, we don't need to calculate the logits
    fwd_hooks=[(
        pattern_hook_names_filter,
        previous_score_hook
    )]
)

# Plot the induction scores for each head in each layer
imshow(
    previous_score_store,
    labels={"x": "Head", "y": "Layer"},
    title="previous Score by Head",
    text_auto=".2f",
    width=900, height=400
)


## Induction circuit

In [None]:
# Does mlp layer counts ??
max_len=rep_tokens.shape[1] -1
seq_len=(rep_tokens.shape[1]- 1)//2
def patch_residual_component(
    residual_component,
    hook,
    pos,
    cache,
):
    residual_component[0,pos, :] = cache[hook.name][pos-seq_len, :]
    return residual_component

def cross_entropy_loss(logits, tokens):
    '''
    Computes the mean cross entropy between logits (the model's prediction) and tokens (the true values).
    '''
    log_probs = F.log_softmax(logits, dim=-1)
    pred_log_probs = t.gather(log_probs[:, :-1], -1, tokens[:, 1:, None])[..., 0]
    return -pred_log_probs.mean()

ablation_scores = t.zeros((gpt2_small.cfg.n_layers, seq_len), device=gpt2_small.cfg.device)

gpt2_small.reset_hooks()
logits = gpt2_small(rep_tokens, return_type="logits")
loss_no_ablation = cross_entropy_loss(logits[:, seq_len: max_len],rep_tokens[:, seq_len: max_len])

for layer in tqdm(range(gpt2_small.cfg.n_layers)):
  for position in range(seq_len, max_len):
    hook_fn = functools.partial(patch_residual_component, pos=position, cache=rep_cache)
    ablated_logits = gpt2_small.run_with_hooks(rep_tokens, fwd_hooks=[
              (utils.get_act_name("mlp_out", layer), hook_fn)
    ])
    loss = cross_entropy_loss(ablated_logits[:, seq_len: max_len], rep_tokens[:, seq_len: max_len])
    ablation_scores[layer, position-seq_len] = loss - loss_no_ablation


imshow(
    ablation_scores,
    labels={"x": "pos", "y": "Layer", "color": "Logit diff"},
    title="Loss Difference After Ablating mlp_output",
    text_auto=".2f",
    width=900, height=400
)

100%|██████████| 12/12 [03:01<00:00, 15.15s/it]


In [None]:
# If not, let us play with some weights!
# layer 4 head 11 head 5 layer 5
def K_comp_full_circuit(
    model: HookedTransformer,
    prev_token_layer_index: int,
    ind_layer_index: int,
    prev_token_head_index: int,
    ind_head_index: int
) -> FactoredMatrix:
    '''
    Returns a (vocab, vocab)-size FactoredMatrix,
    with the first dimension being the query side
    and the second dimension being the key side (going via the previous token head)

    '''
    W_E = gpt2_small.W_E
    W_Q = gpt2_small.W_Q[ind_layer_index, ind_head_index]
    W_K = model.W_K[ind_layer_index, ind_head_index]
    W_O = model.W_O[prev_token_layer_index, prev_token_head_index]
    W_V = model.W_V[prev_token_layer_index, prev_token_head_index]

    Q = W_E @ W_Q
    K = W_E @ W_V @ W_O @ W_K
    return FactoredMatrix(Q, K.T)

prev_token_layer_index=4
ind_layer_index =5
prev_token_head_index = 11
ind_head_index= 5

K_comp_circuit = K_comp_full_circuit(gpt2_small, prev_token_layer_index,ind_layer_index,prev_token_head_index,ind_head_index)



In [None]:
def top_1_acc(full_circuit: FactoredMatrix, batch_size: int = 100) -> float:
    '''
    This should take the argmax of each column (ie over dim=0) and return the fraction of the time that's equal to the correct logit
    '''
    total = 0
    #device = full_OV_circuit.device
    #print(device)

    for indices in t.split(t.arange(full_circuit.shape[0]), batch_size):

        AB_slice = full_circuit[indices].AB
        total += (t.argmax(AB_slice, dim=1) == indices).float().sum().item()

    return total / full_circuit.shape[0]

print(f"Fraction of tokens where the highest activating key is the same token: {top_1_acc(K_comp_circuit.T):.4f}")

Fraction of tokens where the highest activating key is the same token: 0.2283
