# Imports

In [1]:
#%pip install git+https://github.com/neelnanda-io/TransformerLens.git
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch import Tensor

import numpy as np
import pandas as pd
import einops
from fancy_einsum import einsum
import tqdm.auto as tqdm
import random
from pathlib import Path
# import plotly.express as px
from torch.utils.data import DataLoader
from typing import Union, List, Optional, Callable, Tuple, Dict, Literal, Set
from jaxtyping import Float, Int
from functools import partial
import copy

import itertools
from transformers import AutoModelForCausalLM, AutoConfig, AutoTokenizer
import dataclasses
import datasets
from IPython.display import HTML

import transformer_lens
import transformer_lens.utils as utils
from transformer_lens.utils import to_numpy
from transformer_lens.hook_points import (
    HookedRootModule,
    HookPoint,
)  # Hooking utilities
from transformer_lens import HookedTransformer, HookedTransformerConfig, FactoredMatrix, ActivationCache, patching

import plotly.express as px
#%pip install git+https://github.com/callummcdougall/CircuitsVis.git#subdirectory=python
import circuitsvis as cv
import os, sys

In [2]:
if not os.path.exists("path_patching.py"):
        !wget https://github.com/callummcdougall/path_patching/archive/refs/heads/main.zip
        !unzip /content/main.zip 'path_patching-main/ioi_dataset.py'
        !unzip /content/main.zip 'path_patching-main/path_patching.py'
        sys.path.append("/content/path_patching-main")
        os.remove("/content/main.zip")
        os.rename("/content/path_patching-main/ioi_dataset.py", "ioi_dataset.py")
        os.rename("/content/path_patching-main/path_patching.py", "path_patching.py")
        os.rmdir("/content/path_patching-main")

from path_patching import Node, IterNode, path_patch, act_patch

In [3]:
#%pip install git+https://github.com/neelnanda-io/neel-plotly.git
from neel_plotly import imshow, line, scatter, histogram
import tqdm
torch.set_grad_enabled(False)
device = "cuda" if torch.cuda.is_available() else "cpu"

In [4]:
device

'cuda'

In [5]:
if not os.path.exists("path_patching.py"):
        !wget https://github.com/callummcdougall/path_patching/archive/refs/heads/main.zip
        !unzip /content/main.zip 'path_patching-main/ioi_dataset.py'
        !unzip /content/main.zip 'path_patching-main/path_patching.py'
        sys.path.append("/content/path_patching-main")
        os.remove("/content/main.zip")
        os.rename("/content/path_patching-main/ioi_dataset.py", "ioi_dataset.py")
        os.rename("/content/path_patching-main/path_patching.py", "path_patching.py")
        os.rmdir("/content/path_patching-main")

from path_patching import Node, IterNode, path_patch, act_patch

In [6]:
update_layout_set = {
    "xaxis_range", "yaxis_range", "hovermode", "xaxis_title", "yaxis_title", "colorbar", "colorscale", "coloraxis", "title_x", "bargap", "bargroupgap", "xaxis_tickformat",
    "yaxis_tickformat", "title_y", "legend_title_text", "xaxis_showgrid", "xaxis_gridwidth", "xaxis_gridcolor", "yaxis_showgrid", "yaxis_gridwidth", "yaxis_gridcolor",
    "showlegend", "xaxis_tickmode", "yaxis_tickmode", "xaxis_tickangle", "yaxis_tickangle", "margin", "xaxis_visible", "yaxis_visible", "bargap", "bargroupgap"
}

def imshow(tensor, return_fig = False, renderer=None, **kwargs):
    kwargs_post = {k: v for k, v in kwargs.items() if k in update_layout_set}
    kwargs_pre = {k: v for k, v in kwargs.items() if k not in update_layout_set}
    facet_labels = kwargs_pre.pop("facet_labels", None)
    border = kwargs_pre.pop("border", False)
    if "color_continuous_scale" not in kwargs_pre:
        kwargs_pre["color_continuous_scale"] = "RdBu"
    if "margin" in kwargs_post and isinstance(kwargs_post["margin"], int):
        kwargs_post["margin"] = dict.fromkeys(list("tblr"), kwargs_post["margin"])
    fig = px.imshow(utils.to_numpy(tensor), color_continuous_midpoint=0.0, **kwargs_pre)
    if facet_labels:
        for i, label in enumerate(facet_labels):
            fig.layout.annotations[i]['text'] = label
    if border:
        fig.update_xaxes(showline=True, linewidth=1, linecolor='black', mirror=True)
        fig.update_yaxes(showline=True, linewidth=1, linecolor='black', mirror=True)
    # things like `xaxis_tickmode` should be applied to all subplots. This is super janky lol but I'm under time pressure
    for setting in ["tickangle"]:
      if f"xaxis_{setting}" in kwargs_post:
          i = 2
          while f"xaxis{i}" in fig["layout"]:
            kwargs_post[f"xaxis{i}_{setting}"] = kwargs_post[f"xaxis_{setting}"]
            i += 1
    fig.update_layout(**kwargs_post)
    fig.show(renderer=renderer)
    if return_fig:
      return fig

def hist(tensor, renderer=None, **kwargs):
    kwargs_post = {k: v for k, v in kwargs.items() if k in update_layout_set}
    kwargs_pre = {k: v for k, v in kwargs.items() if k not in update_layout_set}
    names = kwargs_pre.pop("names", None)
    if "barmode" not in kwargs_post:
        kwargs_post["barmode"] = "overlay"
    if "bargap" not in kwargs_post:
        kwargs_post["bargap"] = 0.0
    if "margin" in kwargs_post and isinstance(kwargs_post["margin"], int):
        kwargs_post["margin"] = dict.fromkeys(list("tblr"), kwargs_post["margin"])
    fig = px.histogram(x=tensor, **kwargs_pre).update_layout(**kwargs_post)
    if names is not None:
        for i in range(len(fig.data)):
            fig.data[i]["name"] = names[i // 2]
    fig.show(renderer)

In [7]:
from plotly import graph_objects as go
from plotly.subplots import make_subplots

# Model, Dataset, Helper Functions

In [8]:
model = HookedTransformer.from_pretrained(
    "gpt2-small",
    center_unembed=True,
    center_writing_weights=True,
    fold_ln=True,
    device = device
)

Using pad_token, but it is not set yet.


Loaded pretrained model gpt2-small into HookedTransformer


In [9]:
clean_format = [
    "When John and Mary went to the shops,{} gave the bag to",
    "When Tom and James went to the park,{} gave the ball to",
    "When Dan and Sid went to the shops,{} gave an apple to",
    "After Martin and Amy went to the park,{} gave a drink to",
    "After Cody and Jay went to the park,{} gave a drink to",
    "After Jack and Jill ran up the hill,{} gave some water to",
    "When Mark and Martin went to the store,{} gave an apple to",
    "When Sarah and Emily went to the library,{} gave a book to",
    "When Cole and George went to the store,{} gave a book to",
    "When Kai and Bryce went to the park,{} gave an apple to",
]
name_pairs = [
    (" Mary", " John"),
    (" Tom", " James"),
    (" Dan", " Sid"),
    (" Martin", " Amy"),
    (" Cody", " Jay"),
    (" Jack", " Jill"),
    (" Mark", " Martin"),
    (" Sarah", " Emily"),
    (" Cole", " George"),
    (" Kai", " Bryce"),
]

# Define 8 prompts
clean_prompts = [
    prompt.format(name)
    for (prompt, names) in zip(clean_format, name_pairs) for name in names[::-1]
]

clean_prompts = [clean_prompts[i] for i in [0, 3, 5, 7, 9, 11, 13, 15, 17, 19]]


# Define the answers for each prompt, in the form (correct, incorrect)
clean_answers = [names[::i] for names in name_pairs for i in (1, -1)]
clean_answers = [clean_answers[i] for i in [0, 3, 5, 7, 9, 11, 13, 15, 17, 19]]


# Define the answer tokens (same shape as the answers)
clean_answer_tokens = torch.concat([
    model.to_tokens(names, prepend_bos=False).T for names in clean_answers
])

# generate 'corrupted ABC prompts'
abc_format = [
    "When Sarah and James went to the shops,{} gave the bag to",
    "When John and Sid went to the park,{} gave the ball to",
    "When Tom and Amy went to the shops,{} gave an apple to",
    "After Dan and Jay went to the park,{} gave a drink to",
    "After Martin and Jill went to the park,{} gave a drink to",
    "After Cody and Martin ran up the hill,{} gave some water to",
    "When Jack and Emily went to the store,{} gave an apple to",
    "When Mark and Mary went to the library,{} gave a book to",
    "When Kai and Bryce went to the store,{} gave a book to",
    "When Cole and George went to the park,{} gave an apple to",

]

abc_name_pairs = [
    (" Cody", " Jay"),
    (" Jack", " Jill"),
    (" Mark", " Martin"),
    (" Sarah", " Emily"),
    (" Mary", " John"),
    (" Tom", " James"),
    (" Dan", " Sid"),
    (" Martin", " Amy"),
    (" Grant", " Mark"),
    (" Chase", " Miles")
]


abc_prompts = [
    prompt.format(name)
    for (prompt, names) in zip(abc_format, abc_name_pairs) for name in names[::-1]
]
abc_prompts = [abc_prompts[i] for i in [0, 3, 5, 7, 9, 11, 13, 15, 17, 19]]


abc_answers = [names[::i] for names in name_pairs for i in (1, -1)]
abc_answers = [abc_answers[i] for i in [0, 3, 5, 7, 9, 11, 13, 15, 17, 19]]

abc_answer_tokens = torch.concat([
    model.to_tokens(names, prepend_bos=False).T for names in abc_answers
])

In [10]:
clean_tokens: Float[Tensor, "batch seq"] = model.to_tokens(clean_prompts)
corrupted_tokens: Float[Tensor, "batch seq"] = model.to_tokens(abc_prompts)

print(clean_tokens.shape)
print(corrupted_tokens.shape)
clean_logits, clean_cache = model.run_with_cache(clean_tokens, prepend_bos = False)
corrupted_logits, corrupted_cache = model.run_with_cache(corrupted_tokens, prepend_bos = False)

clean_answer_tokens, corrupted_answer_tokens = clean_answer_tokens[:, 0], clean_answer_tokens[:, 1]

answer_tokens: Float[Tensor, "batch 2"] = torch.stack((clean_answer_tokens, corrupted_answer_tokens))
answer_tokens = einops.rearrange(answer_tokens, "answer batch -> batch answer")

torch.Size([10, 15])
torch.Size([10, 15])


In [11]:
batch_size, seq_len = clean_tokens.shape[0], clean_tokens.shape[1]
model.reset_hooks()
clean_logits, clean_cache = model.run_with_cache(clean_tokens, prepend_bos = False)
corrupted_logits, corrupted_cache = model.run_with_cache(corrupted_tokens, prepend_bos = False)

In [12]:
def logits_to_ave_logit_diff(
    logits: Float[Tensor, "batch seq d_vocab"],
    answer_tokens: Float[Tensor, "batch 2"] = answer_tokens,
    per_prompt: bool = False
):
    '''
    Returns logit difference between the correct and incorrect answer.

    If per_prompt=True, return the array of differences rather than the average.
    '''
    # Only the final logits are relevant for the answer
    final_logits: Float[Tensor, "batch d_vocab"] = logits[:, -1, :]
    # Get the logits corresponding to the indirect object / subject tokens respectively
    answer_logits: Float[Tensor, "batch 2"] = final_logits.gather(dim=-1, index=answer_tokens)
    # Find logit difference
    correct_logits, incorrect_logits = answer_logits.unbind(dim=-1)
    answer_logit_diff = correct_logits - incorrect_logits
    return answer_logit_diff if per_prompt else answer_logit_diff.mean().item()

In [13]:
clean_per_prompt_diff = logits_to_ave_logit_diff(clean_logits, per_prompt = True)
corrupted_per_prompt_diff = logits_to_ave_logit_diff(corrupted_logits, per_prompt = True)

#print(logits_to_ave_logit_diff(clean_logits))

clean_average_logit_diff = logits_to_ave_logit_diff(clean_logits)
corrupted_average_logit_diff = logits_to_ave_logit_diff(corrupted_logits)
print(clean_average_logit_diff)
print(corrupted_average_logit_diff)
print(clean_average_logit_diff-corrupted_average_logit_diff)

4.307886600494385
0.0773492380976677
4.230537362396717


In [14]:
def noising_ioi_metric(
    logits: Float[Tensor, "batch seq d_vocab"],
    clean_logit_diff: float = clean_average_logit_diff,
    corrupted_logit_diff: float = corrupted_average_logit_diff,
) -> float:
    '''
    Given logits, returns how much the performance has been corrupted due to noising.

    We calibrate this so that the value is 0 when performance isn't harmed (i.e. same as IOI dataset),
    and -1 when performance has been destroyed (i.e. is same as ABC dataset).
    '''
    #print(logits[-1, -1])
    patched_logit_diff = logits_to_ave_logit_diff(logits)
    return ((patched_logit_diff - clean_logit_diff) / (clean_logit_diff - corrupted_logit_diff))

print(f"IOI metric (IOI dataset): {noising_ioi_metric(clean_logits):.4f}")
print(f"IOI metric (ABC dataset): {noising_ioi_metric(corrupted_logits):.4f}")

IOI metric (IOI dataset): 0.0000
IOI metric (ABC dataset): -1.0000


In [30]:
def denoising_ioi_metric(
    logits: Float[Tensor, "batch seq d_vocab"],
    clean_logit_diff: float = clean_average_logit_diff,
    corrupted_logit_diff: float = corrupted_average_logit_diff,
) -> float:
    '''
    We calibrate this so that the value is 1 when performance got restored (i.e. same as IOI dataset),
    and 0 when performance has been destroyed (i.e. is same as ABC dataset).
    '''
    patched_logit_diff = logits_to_ave_logit_diff(logits)
    return ((patched_logit_diff - clean_logit_diff) / (clean_logit_diff - corrupted_logit_diff) + 1)


print(f"IOI metric (IOI dataset): {denoising_ioi_metric(clean_logits):.4f}")
print(f"IOI metric (ABC dataset): {denoising_ioi_metric(corrupted_logits):.4f}")

IOI metric (IOI dataset): 1.0000
IOI metric (ABC dataset): 0.0000


In [31]:
answer_residual_directions: Float[Tensor, "batch 2 d_model"] = model.tokens_to_residual_directions(answer_tokens)
correct_residual_direction, incorrect_residual_direction = answer_residual_directions.unbind(dim=1)
logit_diff_directions: Float[Tensor, "batch d_model"] = correct_residual_direction - incorrect_residual_direction

In [21]:
def stare_at_attention_and_head_pat(cache, layer_to_stare_at, head_to_isolate, display_corrupted_text = False, verbose = True, specific = False, specific_index = 0):
  """
  given a cache from a run, displays the attention patterns of a layer, as well as printing out how much the model
  attends to the S1, S2, and IO token
  """

  tokenized_str_tokens = model.to_str_tokens(corrupted_tokens[0]) if display_corrupted_text else model.to_str_tokens(clean_tokens[0])
  attention_patten = cache["pattern", layer_to_stare_at]
  print(f"Layer {layer_to_stare_at} Head {head_to_isolate} Activation Patterns:")


  if not specific:
    S1 = attention_patten.mean(0)[head_to_isolate][-1][2].item()
    IO = attention_patten.mean(0)[head_to_isolate][-1][4].item()
    S2 = attention_patten.mean(0)[head_to_isolate][-1][10].item()
  else:
    S1 = attention_patten[specific_index, head_to_isolate][-1][2].item()
    IO = attention_patten[specific_index, head_to_isolate][-1][4].item()
    S2 = attention_patten[specific_index, head_to_isolate][-1][10].item()


  print("Attention on S1: " + str(S1))
  print("Attention on IO: " + str(IO))
  print("Attention on S2: " + str(S2))
  print("S1 + IO - S2 = " + str(S1 + IO - S2))
  print("S1 + S2 - IO = " + str(S1 + S2 - IO))
  print("S1 - IO - S2 = " + str(S1 - S2 - IO))


  if verbose:
    display(cv.attention.attention_heads(
      tokens=tokenized_str_tokens,
      attention= attention_patten.mean(0) if not specific else attention_patten[specific_index],
      #attention_head_names=[f"L{layer_to_stare_at}H{i}" for i in range(model.cfg.n_heads)],
    ))
  else:
    print(attention_patten.mean(0).shape)

    display(cv.attention.attention_patterns(
      tokens=tokenized_str_tokens,
      attention=attention_patten.mean(0)if not specific else attention_patten[specific_index],
      attention_head_names=[f"L{layer_to_stare_at} H{i}" for i in range(model.cfg.n_heads)],
    ))

In [52]:
neg_m_heads = [(10,7), (11,10)]
name_mover_heads = [(9,9), (9,6), (10,0)]
backup_heads = [(9,0), (9,7), (10,1), (10,2), (10,6), (10,10), (11,2), (11,9)]
key_backup_heads = [(10,2), (10,10), (11,2)]
strong_neg_backup_heads = [(11,2), (10,2), (11,6)]



head_names = ["Negative", "Name Mover", "Backup"]
head_list = [neg_m_heads, name_mover_heads, backup_heads]

In [53]:
def store_activation(
    activation,
    hook: HookPoint,
    where_to_store
):
    """
    takes a storage container where_to_store, and stores the activation in it at a hook
    """""
    where_to_store[:] = activation

In [54]:
def kq_rewrite_hook(
    internal_value: Float[Tensor, "batch seq head d_head"],
    hook: HookPoint,
    head,
    unnormalized_resid:  Float[Tensor, "batch seq d_model"],
    vector,
    act_name,
    scale = 1,
    position = -1,
    pre_ln = True
):
  """
  replaces keys or queries with a new result which we get from adding a vector to a position at the residual stream
  head: tuple for head to rewrite keys for
  unnormalized_resid: stored unnormalized residual stream needed to recalculated activations
  """

  ln1 = model.blocks[hook.layer()].ln1
  temp_resid = unnormalized_resid.clone()

  if pre_ln:
    temp_resid[:, position, :] = temp_resid[:, position, :] + scale * vector
    normalized_resid = ln1(temp_resid)
  else:
    temp_resid = ln1(temp_resid)
    temp_resid[:, position, :] = temp_resid[:, position, :] + scale * vector
    normalized_resid = temp_resid


  assert act_name == "q" or act_name == "k"
  if act_name == "q":
    W_Q, b_Q = model.W_Q[head[0], head[1]], model.b_Q[head[0], head[1]]
    internal_value[..., head[1], :] = einops.einsum(normalized_resid, W_Q, "batch seq d_model, d_model d_head -> batch seq d_head") + b_Q

  elif act_name == "k":
    W_K, b_K = model.W_K[head[0], head[1]], model.b_K[head[0], head[1]]
    internal_value[..., head[1], :] = einops.einsum(normalized_resid, W_K, "batch seq d_model, d_model d_head -> batch seq d_head") + b_K


In [44]:
def patch_head_vector(
    head_vector: Float[Tensor, "batch pos head_index d_head"],
    hook: HookPoint,
    head_indices: int,
    other_cache: ActivationCache
) -> Float[Tensor, "batch pos head_index d_head"]:
    '''
    Patches the output of a given head (before it's added to the residual stream) at
    every sequence position, using the value from the other cache.
    '''
    for head_index in head_indices:
      head_vector[:, :, head_index] = other_cache[hook.name][:, :, head_index]
    return head_vector

In [45]:
def patch_ln_scale(ln_scale, hook):
  #print(torch.equal(ln_scale, clean_cache["blocks." + str(hook.layer()) + ".ln1.hook_scale"]))
  ln_scale = clean_cache["blocks." + str(hook.layer()) + ".ln1.hook_scale"]
  return ln_scale

In [55]:
def causal_write_into_component(act_comp, head, direction, x, pre_ln = True, result_cache_function = None, result_cache_fun_has_head_input = False, freeze_layernorm = False, ablate_heads = []):
  '''
  writes a vector into the component at a given head
  returns new logit differences of run by default, or pass result_cache_funciton to run on cache

  head - tuple for head to intervene in act_comp for
  direction - vector to add to the act_comp in the head
  x - tensor of amount to scale
  '''
  y = torch.zeros(x.shape)
  for i in range(len(x)):
    scale = x[i]
    model.reset_hooks()
    temp = torch.zeros((batch_size, seq_len, model.cfg.d_model)).cuda()
    model.add_hook(utils.get_act_name("resid_pre", head[0]), partial(store_activation, where_to_store = temp))
    if freeze_layernorm:
      model.add_hook("blocks." + str(head[0]) + ".ln1.hook_scale", patch_ln_scale)
    model.add_hook(utils.get_act_name(act_comp, head[0]), partial(kq_rewrite_hook, head = head, unnormalized_resid = temp, vector = direction, act_name = act_comp, scale = scale, pre_ln = pre_ln))


    if len(ablate_heads) != 0:
      for j in ablate_heads:
        model.add_hook(utils.get_act_name("z", j[0]), partial(patch_head_vector, head_indices = [j[1]], other_cache = corrupted_cache))


    hooked_logits, hooked_cache = model.run_with_cache(clean_tokens)
    model.reset_hooks()


    if result_cache_function != None:
      if not result_cache_fun_has_head_input:
        y[i] = result_cache_function(hooked_cache)
      else:
        y[i] = result_cache_function(hooked_cache, head)
    else:
      # just calculate logit diff
      y[i] = logits_to_ave_logit_diff(hooked_logits)

  return y


In [47]:
def graph_lines(results, heads, x, title = "Effect of adding/subtracting direction", xtitle = "Scaling on direction", ytitle = "Logit Diff"):
  fig = px.line(title = title)
  for i in range(len(results)):
    fig.add_trace(go.Scatter(x = x, y = results[i], name = str(heads[i])))

  fig.update_xaxes(title = xtitle)
  fig.update_yaxes(title = ytitle)
  fig.show()

In [61]:
def get_head_IO_minus_S_attn(cache, head, scores = True):

  layer, h_index = head

  if scores:
    attention_patten = cache["attn_scores", layer]
  else:
    attention_patten = cache["pattern", layer]

  S1 = attention_patten.mean(0)[h_index][-1][2].item()
  IO = attention_patten.mean(0)[h_index][-1][4].item()
  S2 = attention_patten.mean(0)[h_index][-1][10].item()

  return IO - S1 - S2


def get_head_IO_minus_just_S1_attn(cache, head, scores = True):

    layer, h_index = head

    if scores:
      attention_patten = cache["attn_scores", layer]
    else:
      attention_patten = cache["pattern", layer]

    S1 = attention_patten.mean(0)[h_index][-1][2].item()
    IO = attention_patten.mean(0)[h_index][-1][4].item()
    S2 = attention_patten.mean(0)[h_index][-1][10].item()

    return IO - S1

def get_head_attn(cache, head, token, scores = True):

  layer, h_index = head

  if scores:
    attention_patten = cache["attn_scores", layer]
  else:
    attention_patten = cache["pattern", layer]

  if token == "S1":
     return attention_patten.mean(0)[h_index][-1][2].item()
  elif token == "IO":
     return attention_patten.mean(0)[h_index][-1][4].item()
  elif token == "S2":
     return attention_patten.mean(0)[h_index][-1][10].item()
  elif token == "BOS":
     return attention_patten.mean(0)[h_index][-1][0].item()
  else:
    print("RAHHHHH YOU MISSTYPED SOMETHING")

def patch_head_vector(
    head_vector: Float[Tensor, "batch pos head_index d_head"],
    hook: HookPoint,
    head_indices: int,
    other_cache: ActivationCache
) -> Float[Tensor, "batch pos head_index d_head"]:
    '''
    Patches the output of a given head (before it's added to the residual stream) at
    every sequence position, using the value from the other cache.
    '''
    for head_index in head_indices:
      head_vector[:, :, head_index] = other_cache[hook.name][:, :, head_index]
    return head_vector

def get_attn_results_into_head_dirs(heads, direction, scale_amounts, ablate_heads = [], freeze_ln = False, only_S1 = False, scores = True):
  io_attn_postln_nmh_results = []
  for i in range(len(heads)):
    io_attn_postln_nmh_results.append(causal_write_into_component("q", heads[i], direction, scale_amounts,
                                                        pre_ln = True, freeze_layernorm = freeze_ln, result_cache_function = partial(get_head_attn, token = "IO", scores = scores), result_cache_fun_has_head_input = True, ablate_heads=ablate_heads))


  s1_attn_postln_nmh_results = []
  for i in range(len(heads)):
    s1_attn_postln_nmh_results.append(causal_write_into_component("q", heads[i], direction, scale_amounts,
                                                        pre_ln = True, freeze_layernorm = freeze_ln,result_cache_function = partial(get_head_attn, token = "S1", scores = scores), result_cache_fun_has_head_input = True, ablate_heads=ablate_heads))

  s2_attn_postln_nmh_results = []
  for i in range(len(heads)):
    s2_attn_postln_nmh_results.append(causal_write_into_component("q", heads[i], direction, scale_amounts,
                                                        pre_ln = True, freeze_layernorm = freeze_ln,result_cache_function = partial(get_head_attn, token = "S2", scores = scores), result_cache_fun_has_head_input = True, ablate_heads=ablate_heads))

  diff_results = []
  if not only_S1:
    for i in range(len(heads)):
      diff_results.append(causal_write_into_component("q", heads[i], direction, scale_amounts,
                                                          pre_ln = True, freeze_layernorm = freeze_ln,result_cache_function = partial(get_head_IO_minus_S_attn, scores = scores), result_cache_fun_has_head_input = True, ablate_heads=ablate_heads))
  else:
    for i in range(len(heads)):
      diff_results.append(causal_write_into_component("q", heads[i], direction, scale_amounts,
                                                          pre_ln = True, freeze_layernorm = freeze_ln,result_cache_function = partial(get_head_IO_minus_just_S1_attn, scores = scores), result_cache_fun_has_head_input = True, ablate_heads=ablate_heads))


  bos_attn_postln_nmh_results = []
  for i in range(len(heads)):
    bos_attn_postln_nmh_results.append(causal_write_into_component("q", heads[i], direction, scale_amounts,
                                                        pre_ln = True, freeze_layernorm = freeze_ln,result_cache_function = partial(get_head_attn, token = "BOS", scores = scores), result_cache_fun_has_head_input = True, ablate_heads=ablate_heads))

  return [io_attn_postln_nmh_results, s1_attn_postln_nmh_results, s2_attn_postln_nmh_results, diff_results, bos_attn_postln_nmh_results]

In [127]:
def get_copying_scores(
    model: HookedTransformer,
    names,
    k: int = 5,

) -> Float[Tensor, "2 layer-1 head"]:
    '''
    Gets copying scores (both positive and negative) as described in page 6 of the IOI paper, for every (layer, head) pair in the model.

    Returns these in a 3D tensor (the first dimension is for positive vs negative).

    Omits the 0th layer, because this is before MLP0 (which we're claiming acts as an extended embedding).
    '''
    # SOLUTION
    results = torch.zeros((2, model.cfg.n_layers, model.cfg.n_heads), device="cuda")

    # Define components from our model (for typechecking, and cleaner code)
    embed = model.embed
    mlp0 = model.blocks[0].mlp
    ln0 = model.blocks[0].ln2
    unembed = model.unembed
    ln_final = model.ln_final

    # Get embeddings for the names in our list
    name_tokens: Int[Tensor, "batch 1"] = model.to_tokens(names, prepend_bos=False)
    name_embeddings: Int[Tensor, "batch 1 d_model"] = embed(name_tokens)

    # Get residual stream after applying MLP
    resid_after_mlp1 = name_embeddings + mlp0(ln0(name_embeddings))

    # Loop over all (layer, head) pairs
    for layer in range(model.cfg.n_layers):
        for head in range(model.cfg.n_heads):

            # Get W_OV matrix
            W_OV = model.W_V[layer, head] @ model.W_O[layer, head]

            # Get residual stream after applying W_OV or -W_OV respectively
            # (note, because of bias b_U, it matters that we do sign flip here, not later)
            resid_after_OV_pos = resid_after_mlp1 @ W_OV
            resid_after_OV_neg = resid_after_mlp1 @ -W_OV

            # Get logits from value of residual stream
            logits_pos: Float[Tensor, "batch d_vocab"] = unembed(ln_final(resid_after_OV_pos)).squeeze()
            logits_neg: Float[Tensor, "batch d_vocab"] = unembed(ln_final(resid_after_OV_neg)).squeeze()

            # Check how many are in top k
            topk_logits: Int[Tensor, "batch k"] = torch.topk(logits_pos, dim=-1, k=k).indices
            in_topk = (topk_logits == name_tokens).any(-1)
            # Check how many are in bottom k
            bottomk_logits: Int[Tensor, "batch k"] = torch.topk(logits_neg, dim=-1, k=k).indices
            in_bottomk = (bottomk_logits == name_tokens).any(-1)

            # Fill in results
            results[:, layer, head] = torch.tensor([in_topk.float().mean(), in_bottomk.float().mean()])

    return results

In [129]:
def top_1_acc_iteration(full_OV_circuit: FactoredMatrix, batch_size: int = 100) -> float:
    '''
    This should take the argmax of each column (ie over dim=0) and return the fraction of the time that's equal to the correct logit
    '''
    A, B = full_OV_circuit.A, full_OV_circuit.B
    nrows = full_OV_circuit.shape[0]
    nrows_max_on_diagonal = 0

    for i in range(0, nrows + batch_size, batch_size):
        rng = range(i, min(i + batch_size, nrows))
        if rng:
            submatrix = A[rng, :] @ B
            diag_indices = torch.tensor(rng, device=submatrix.device)
            nrows_max_on_diagonal += (submatrix.argmax(-1) == diag_indices).float().sum().item()

    return nrows_max_on_diagonal / nrows


def top_5_acc_iteration(full_OV_circuit: FactoredMatrix, batch_size: int = 100) -> float:
    '''
    This should take the argmax of each column (ie over dim=0) and return the fraction of the time that's equal to the correct logit
    '''
    A, B = full_OV_circuit.A, full_OV_circuit.B
    nrows = full_OV_circuit.shape[0]
    nrows_top5_on_diagonal = 0

    for i in range(0, nrows + batch_size, batch_size):
        rng = range(i, min(i + batch_size, nrows))
        if rng:
            submatrix = A[rng, :] @ B
            diag_indices = torch.tensor(rng, device=submatrix.device).unsqueeze(-1)
            top5 = torch.topk(submatrix, k=5).indices
            nrows_top5_on_diagonal += (diag_indices == top5).sum().item()

    return nrows_top5_on_diagonal / nrows


In [130]:
def look_at_backup_circuit(
    model: HookedTransformer,
    head_one: Tuple[int, int],
    head_two: Tuple[int, int],
    names,
    show_matrix = True,
    negative = False
):
    """
    Shows the strength of the backup - W_OV^A, W_QK^B circuit - between heads
    or, the negative backup if negative = True
    """

    # Define components from our model (for typechecking, and cleaner code)
    embed = model.embed
    mlp0 = model.blocks[0].mlp
    ln0 = model.blocks[0].ln2
    unembed = model.unembed
    ln_final = model.ln_final

    # Get embeddings for the names in our list
    name_tokens: Int[Tensor, "batch 1"] = model.to_tokens(names, prepend_bos=False)
    name_embeddings: Int[Tensor, "batch 1 d_model"] = embed(name_tokens)

    # Get residual stream after applying MLP
    resid_after_mlp1 = name_embeddings + mlp0(ln0(name_embeddings)) # seq 1 d_model
    resid_after_mlp1 = resid_after_mlp1[:, 0, :]

    # calculate the OV matrix of head two
    A_O = model.W_O[head_one[0], head_one[1]]
    A_V = model.W_V[head_one[0], head_one[1]]
    A_OV_Circuit = FactoredMatrix(A_V, A_O)

    # calculate the QK matrix of head two
    B_Q = model.W_Q[head_two[0], head_two[1]]
    B_K = model.W_K[head_two[0], head_two[1]]

    if negative:
      B_QK_Circuit = FactoredMatrix(-B_Q, B_K.T)
    else:
      B_QK_Circuit = FactoredMatrix(B_Q, B_K.T)


    relationship = A_OV_Circuit @ B_QK_Circuit # this is the A by B compositioin we want

    # put token embeddings around this matrix
    full_circuit = resid_after_mlp1 @ relationship @ resid_after_mlp1.T


    top_1 = top_1_acc_iteration(full_circuit)
    top_5 = top_5_acc_iteration(full_circuit)


    if show_matrix:
      print(top_1)
      print(top_5)
      imshow (
          full_circuit.AB,
          labels={"x": "Input token", "y": "Attention to output token"},
          title="Full Backup composition between head " + str(head_one) +" and "+ str(head_two),
          width=700,
          x = model.to_str_tokens(name_tokens),
          y = model.to_str_tokens(name_tokens)
      )
    else:
      return top_1, top_5

In [131]:
def display_back_scores(B_layer, B_head, negative = False):
  """
  displays all backup scores of heads with the head (B_layer, B_head)
  """

  backup_circuitry_11_7 = torch.zeros((12, 12,2))
  for layer in range(12):
    for head in range(12):
      top_1, top_5 = look_at_backup_circuit(model, (layer,head), (B_layer, B_head), names_list, show_matrix = False, negative = negative)
      backup_circuitry_11_7[layer][head][0] = top_1
      backup_circuitry_11_7[layer][head][1] = top_5

  imshow (
          torch.stack((backup_circuitry_11_7[..., 0], backup_circuitry_11_7[..., 1])),
          facet_col = 0,
          facet_labels = ["Top_1", "Top_5"],
          labels={"x": "Head", "y": "Layer"},
          title=f"Backup Circuit Score with {B_layer}.{B_head}" if not negative else f"Negative Backup Circuit Score with {B_layer}.{B_head}",
          width=700,
          range_color = [-1, 1]
      )

In [133]:
def look_at_qk_circuit(
    model: HookedTransformer,
    head_two: Tuple[int, int],
    names,
    show_matrix = True,
    negative = False
):
    """
    Shows the strength of the QK circuit in head_two
    """

    # Define components from our model (for typechecking, and cleaner code)
    embed = model.embed
    mlp0 = model.blocks[0].mlp
    ln0 = model.blocks[0].ln2
    unembed = model.unembed
    ln_final = model.ln_final

    # Get embeddings for the names in our list
    name_tokens: Int[Tensor, "batch 1"] = model.to_tokens(names, prepend_bos=False)
    name_embeddings: Int[Tensor, "batch 1 d_model"] = embed(name_tokens)

    # Get residual stream after applying MLP
    resid_after_mlp1 = name_embeddings + mlp0(ln0(name_embeddings)) # seq 1 d_model
    resid_after_mlp1 = resid_after_mlp1[:, 0, :]

    # calculate the QK matrix of head two
    B_Q = model.W_Q[head_two[0], head_two[1]]
    B_K = model.W_K[head_two[0], head_two[1]]

    if negative:
      B_QK_Circuit = FactoredMatrix(-B_Q, B_K.T)
    else:
      B_QK_Circuit = FactoredMatrix(B_Q, B_K.T)

    relationship = B_QK_Circuit # this is the A by B compositioin we want

    # put token embeddings around this matrix
    full_circuit = resid_after_mlp1 @ relationship @ resid_after_mlp1.T


    top_1 = top_1_acc_iteration(full_circuit)
    top_5 = top_5_acc_iteration(full_circuit)


    if show_matrix:
      print(top_1)
      print(top_5)
      imshow (
          full_circuit.AB,
          labels={"x": "Input token", "y": "Attention to output token"},
          title="Full QK circuit within head " + str(head_two),
          width=700,
          x = model.to_str_tokens(name_tokens),
          y = model.to_str_tokens(name_tokens)
      )
    else:
      return top_1, top_5

# Query Intervention into Corrective Model Components

In [62]:
IO_unembed_direction = model.W_U.T[clean_tokens][:, 4, :]
scaling_amounts = torch.linspace(-5, 5, 21)

In [197]:
import pickle

results_storage = {}

# to load dictionary (if you dont have, simply comment out the below lines):
with open('saved_dictionary.pkl', 'rb') as f:
    results_storage = pickle.load(f)

In [198]:
def return_head_from_string(head_type):
    
    assert head_type == "backup" or head_type == "NMH" or head_type == "negative" or head_type == "key_backup"

    if head_type == "backup":
        heads = backup_heads
    elif head_type == "NMH":
        heads = name_mover_heads
    elif head_type == "negative":
        heads = neg_m_heads
    elif head_type == "key_backup":
        heads = key_backup_heads

    return heads

    

In [199]:
def run_query_intervention_with_setup(scaling_amounts, head_type, scores = True,  freeze_ln = False, only_S1 = True, head_to_control = (9,9)):
    """
    currently doesnt allow for ablations in encoding, nor does it allow for post_ln
    """
    encode_name = ("1" if freeze_ln else "0") +  ("1" if only_S1 else "0") + ("1" if scores else "0") + head_type + str(head_to_control)

    if encode_name in results_storage.keys():
        print("returning cached value")
        return results_storage[encode_name]
    


    matrix = model.W_O[head_to_control[0], head_to_control[1]]
    layer_output = clean_cache[utils.get_act_name("z", head_to_control[0])]
    layer_result = einops.einsum(matrix, layer_output, "d_head d_model, batch seq h_idx d_head -> batch seq h_idx d_model")
    output_of_head = layer_result[:, -1, head_to_control[1], :]     #[:, 10, :]  # 10 = Subject,   4 = IO

    dot_product = einops.einsum(output_of_head, IO_unembed_direction, "batch d_model, batch d_model -> batch")
    length_of_vector = einops.einsum(IO_unembed_direction, IO_unembed_direction, "batch d_model, batch d_model -> batch")
    projected_lengths = dot_product / length_of_vector
    io_in_direction = IO_unembed_direction * einops.repeat(projected_lengths, "batch -> batch d_model", d_model = model.cfg.d_model)
    everything_but_direction = output_of_head - io_in_direction

    heads = return_head_from_string(head_type)

    into_head_results = get_attn_results_into_head_dirs(heads, output_of_head, scaling_amounts, ablate_heads=[], freeze_ln=freeze_ln, only_S1 = only_S1, scores = scores)
    perp_into_head_results = get_attn_results_into_head_dirs(heads, everything_but_direction, scaling_amounts, ablate_heads=[], freeze_ln=freeze_ln, only_S1 = only_S1, scores = scores)
    parallel_into_head_results = get_attn_results_into_head_dirs(heads, io_in_direction, scaling_amounts, ablate_heads=[], freeze_ln=freeze_ln, only_S1 = only_S1, scores = scores)    


    result = [into_head_results, perp_into_head_results, parallel_into_head_results]
    results_storage[encode_name] = result
    return result

In [200]:
def display_attn_scores(head_type, scores = True,  freeze_ln = False, only_S1 = True, head = (9,9)):
    
    nine_nine_into_backup_results, nine_nine_perp_into_backup_results, nine_nine_parallel_into_backup_results = run_query_intervention_with_setup(scaling_amounts, head_type, scores, freeze_ln, only_S1, head_to_control = head)

    heads = return_head_from_string(head_type)


    fig = make_subplots(rows = 3, cols = 5, subplot_titles = ["IO", "S1", "S2","IO - S1", "BOS"], shared_yaxes=True)
    fig.update_layout(height = 900)

    fig.update_layout(title = "Adding Directions to " + head_type + f" | scores = {scores} | freeze_ln = {freeze_ln}")
    colors = [
        "pink", "darkviolet", "blue", "purple", "turquoise", "red", "green", "yellow", "orange", "cyan", "magenta",
        "lime", "maroon", "navy", "olive", "teal", "aqua", "silver", "gray", "black", "white", "indigo", "gold", "brown",
        "coral", "crimson", "darkblue", "darkcyan", "darkgoldenrod", "darkgray", "darkgreen", "darkkhaki", "darkmagenta",
        "darkolivegreen", "darkorange", "darkorchid", "darkred", "darksalmon", "darkslateblue", "darkslategray",
        "darkturquoise", "darkviolet", "deeppink", "deepskyblue", "dimgray", "dodgerblue", "firebrick", "forestgreen",
        "fuchsia", "gainsboro", "gold", "goldenrod", "gray", "greenyellow", "honeydew", "hotpink", "indianred", "indigo",
        "ivory", "khaki", "lavender", "lavenderblush", "lawngreen", "lemonchiffon", "lightblue", "lightcoral",
        "lightcyan", "lightgoldenrodyellow", "lightgray", "lightgreen", "lightpink", "lightsalmon", "lightseagreen",
        "lightskyblue", "lightslategray", "lightsteelblue", "lightyellow", "lime", "limegreen", "linen", "magenta",
        "maroon", "mediumaquamarine", "mediumblue", "mediumorchid", "mediumpurple", "mediumseagreen", "mediumslateblue",
        "mediumspringgreen", "mediumturquoise", "mediumvioletred", "midnightblue", "mintcream", "mistyrose", "moccasin",
        "navajowhite", "navy", "oldlace", "olive", "olivedrab", "orangered", "orchid", "palegoldenrod", "palegreen",
        "paleturquoise", "palevioletred", "papayawhip", "peachpuff", "peru", "plum", "powderblue", "rebeccapurple",
        "rosybrown", "royalblue", "saddlebrown", "salmon", "sandybrown", "seagreen", "sienna", "skyblue", "slateblue",
        "slategray", "snow", "springgreen", "steelblue", "tan", "thistle", "tomato", "turquoise", "violet", "wheat",
        "whitesmoke", "yellowgreen"
    ]

    for col, data in enumerate(nine_nine_into_backup_results):
        for i in range(len(data)):
            fig.add_trace(go.Scatter(x = scaling_amounts, y = data[i], name = str(heads[i]), line_color = colors[i], legendgroup= str(i)), row = 1, col = col + 1)

    for col, data in enumerate(nine_nine_perp_into_backup_results):
        for i in range(len(data)):
            fig.add_trace(go.Scatter(x = scaling_amounts, y = data[i], name = str(heads[i]) + " ⊥ IO", line_color = colors[i], legendgroup= str(i)), row = 2, col = col + 1)

    for col, data in enumerate(nine_nine_parallel_into_backup_results):
        for i in range(len(data)):
            fig.add_trace(go.Scatter(x = scaling_amounts, y = data[i], name = str(heads[i]) + " ∥ IO", line_color = colors[i], legendgroup= str(i)), row = 3, col = col + 1)


    fig.update_xaxes(title = f"Added Scaled {head} Output", row = 1)
    fig.update_xaxes(title = f"Added Scaled {head} ⊥ IO ", row = 2)
    fig.update_xaxes(title = f"Added Scaled {head} ∥ IO ", row = 3)
    
    if scores:
        fig.update_yaxes(title = "Attn Score on token")
    else:
        fig.update_yaxes(title = "Attn Probs on token")
    fig.show()

In [202]:
results_storage.keys()

dict_keys(['111backup(9, 9)', '111NMH(9, 9)', '111negative(9, 9)', '110backup(9, 9)', '110NMH(9, 9)', '110negative(9, 9)', '011backup(9, 9)', '011NMH(9, 9)', '011negative(9, 9)', '010backup(9, 9)', '010NMH(9, 9)', '010negative(9, 9)', '111backup(9, 6)', '111NMH(9, 6)', '111negative(9, 6)', '110backup(9, 6)', '110NMH(9, 6)', '110negative(9, 6)', '011backup(9, 6)', '011NMH(9, 6)', '011negative(9, 6)', '010backup(9, 6)', '010NMH(9, 6)', '010negative(9, 6)', '111backup(10, 0)', '111NMH(10, 0)', '111negative(10, 0)', '110backup(10, 0)', '110NMH(10, 0)', '110negative(10, 0)', '011backup(10, 0)', '011NMH(10, 0)', '011negative(10, 0)', '010backup(10, 0)', '010NMH(10, 0)', '010negative(10, 0)', '111key_backup(9, 9)', '110key_backup(9, 9)', '011key_backup(9, 9)', '010key_backup(9, 9)', '111key_backup(9, 6)', '110key_backup(9, 6)', '011key_backup(9, 6)', '010key_backup(9, 6)', '111key_backup(10, 0)', '110key_backup(10, 0)', '011key_backup(10, 0)', '010key_backup(10, 0)'])

In [203]:
scaling_amounts = torch.linspace(-8, 8, 21)
for freeze_ln in [True, False]:
    for scores in [True, False]:
        for headtype in ["backup", "NMH", "negative", "key_backup"]:
            for head_to_control in [(9,9), (9,6), (10,0)]:
                run_query_intervention_with_setup(scaling_amounts, headtype, scores, freeze_ln, True, head_to_control= head_to_control)

returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning cached value
returning c

In [194]:
display_attn_scores("key_backup", scores = False, freeze_ln = False, head=(10, 0))

returning cached value


In [193]:
results_storage.keys()

dict_keys(['111backup(9, 9)', '111NMH(9, 9)', '111negative(9, 9)', '110backup(9, 9)', '110NMH(9, 9)', '110negative(9, 9)', '011backup(9, 9)', '011NMH(9, 9)', '011negative(9, 9)', '010backup(9, 9)', '010NMH(9, 9)', '010negative(9, 9)', '111backup(9, 6)', '111NMH(9, 6)', '111negative(9, 6)', '110backup(9, 6)', '110NMH(9, 6)', '110negative(9, 6)', '011backup(9, 6)', '011NMH(9, 6)', '011negative(9, 6)', '010backup(9, 6)', '010NMH(9, 6)', '010negative(9, 6)', '111backup(10, 0)', '111NMH(10, 0)', '111negative(10, 0)', '110backup(10, 0)', '110NMH(10, 0)', '110negative(10, 0)', '011backup(10, 0)', '011NMH(10, 0)', '011negative(10, 0)', '010backup(10, 0)', '010NMH(10, 0)', '010negative(10, 0)', '111key_backup(9, 9)', '110key_backup(9, 9)', '011key_backup(9, 9)', '010key_backup(9, 9)', '111key_backup(9, 6)', '110key_backup(9, 6)', '011key_backup(9, 6)', '010key_backup(9, 6)', '111key_backup(10, 0)', '110key_backup(10, 0)', '011key_backup(10, 0)', '010key_backup(10, 0)'])

In [196]:
with open('saved_dictionary.pkl', 'wb') as f:
    pickle.dump(results_storage, f)

# Backup Circutry

In [128]:
names_list =  [i[0] for i in name_pairs] + [i[1] for i in name_pairs] + [" Abel", " Adam", " Adin", " Alan", " Beau", " Bell"] + [" Dana", " Dave", " Drew"]
copying_results = get_copying_scores(model, names_list)

imshow(
    copying_results,
    facet_col=0,
    facet_labels=["Positive copying scores", "Negative copying scores"],
    title="Copying scores of attention heads' OV circuits",
    width=800
)

In [136]:
pos_qk_scores = torch.zeros((12,12,2))
neg_qk_scores = torch.zeros((12,12,2))

for layer in range(12):
  for head in range(12):
    pos_qk_scores[layer][head][0], pos_qk_scores[layer][head][1] = look_at_qk_circuit(model, (layer, head), names_list, show_matrix = False, negative = False)
    neg_qk_scores[layer][head][0], neg_qk_scores[layer][head][1] = look_at_qk_circuit(model, (layer, head), names_list, show_matrix = False, negative = True)


In [138]:
imshow(
    torch.stack((pos_qk_scores[..., 0], neg_qk_scores[..., 0])),
    facet_col = 0,
    facet_labels = ["Positive top_1", "Negative top_1"],
    labels={"x": "Head", "y": "Layer"},
    title=f"QK Circuit Scores",
    width=700,
    range_color = [-1, 1]
)

In [139]:
def gather_backup_scores_between_heads(head_list, negative = False):
  """
  gathers backup scores between heads in a list
  """

  scores = torch.zeros((len(head_list), len(head_list)))
  for i, head_i in enumerate(head_list):
    for j, head_j in enumerate(head_list):
      top1, top5 =  look_at_backup_circuit(model, head_i, head_j, names_list, show_matrix=False, negative = negative)
      scores[i][j] = top1


  return scores

In [140]:
all_heads_list =  neg_m_heads + name_mover_heads + key_backup_heads

interesting_head_backup_scores = gather_backup_scores_between_heads(all_heads_list, negative = False)
neg_interesting_head_backup_scores = gather_backup_scores_between_heads(all_heads_list, negative = True)

In [141]:
imshow(
    torch.stack([interesting_head_backup_scores, neg_interesting_head_backup_scores]),
    return_fig = False,
    facet_col = 0,
    facet_labels = ["Positive Backup", "Negative Backup"],
    title="Backup Scores between key heads",
    x = [str(i) for i in all_heads_list], y = [str(i) for i in all_heads_list],
    labels={"x": "QK Head", "y": "OV Head", "color": "Similarity to ± Identity"},
    #coloraxis=dict(colorbar_ticksuffix = "%"),
    border=True,
    width=800,
    margin={"r": 100, "l": 100}
)