# Transformer Lens Setup

This setup is copied from Neel's main demo colab notebook for Transformer Lens, which can be found here: https://colab.research.google.com/github/neelnanda-io/TransformerLens/blob/main/Main_Demo.ipynb#scrollTo=V-IJnEFkEBPa

In [1]:
import google.colab
IN_COLAB = True
print("Running as a Colab notebook")
%pip install git+https://github.com/neelnanda-io/TransformerLens.git
%pip install circuitsvis

# PySvelte is an unmaintained visualization library, use it as a backup if circuitsvis isn't working
# # Install another version of node that makes PySvelte work way faster
# !curl -fsSL https://deb.nodesource.com/setup_16.x | sudo -E bash -; sudo apt-get install -y nodejs
# %pip install git+https://github.com/neelnanda-io/PySvelte.git

Running as a Colab notebook
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting git+https://github.com/neelnanda-io/TransformerLens.git
  Cloning https://github.com/neelnanda-io/TransformerLens.git to /tmp/pip-req-build-gbv8ihwx
  Running command git clone --filter=blob:none --quiet https://github.com/neelnanda-io/TransformerLens.git /tmp/pip-req-build-gbv8ihwx
  Resolved https://github.com/neelnanda-io/TransformerLens.git to commit 006599b30fd6950b3b07c54eabbdaa7c36939595
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
# Plotly needs a different renderer for VSCode/Notebooks vs Colab argh
import plotly.io as pio
if IN_COLAB or not DEVELOPMENT_MODE:
    pio.renderers.default = "colab"
else:
    pio.renderers.default = "notebook_connected"
print(f"Using renderer: {pio.renderers.default}")

Using renderer: colab


In [3]:
# Import stuff
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import einops
from fancy_einsum import einsum
import tqdm.auto as tqdm
import random
from pathlib import Path
import plotly.express as px
from torch.utils.data import DataLoader

from torchtyping import TensorType as TT
from typing import List, Union, Optional
from functools import partial
import copy

import itertools
from transformers import AutoModelForCausalLM, AutoConfig, AutoTokenizer
import dataclasses
import datasets
from IPython.display import HTML

In [4]:
%matplotlib inline
import time
import pylab as pl
import matplotlib.pyplot as plt
from IPython import display

In [5]:
import transformer_lens
import transformer_lens.utils as utils
from transformer_lens.hook_points import (
    HookedRootModule,
    HookPoint,
)  # Hooking utilities
from transformer_lens import HookedTransformer, HookedTransformerConfig, FactoredMatrix, ActivationCache

In [6]:
def imshow(tensor, renderer=None, xaxis="", yaxis="", **kwargs):
    px.imshow(utils.to_numpy(tensor), color_continuous_midpoint=0.0, color_continuous_scale="RdBu", labels={"x":xaxis, "y":yaxis}, **kwargs).show(renderer)

def line(tensor, renderer=None, xaxis="", yaxis="", **kwargs):
    px.line(utils.to_numpy(tensor), labels={"x":xaxis, "y":yaxis}, **kwargs).show(renderer)

def scatter(x, y, xaxis="", yaxis="", caxis="", renderer=None, **kwargs):
    x = utils.to_numpy(x)
    y = utils.to_numpy(y)
    px.scatter(y=y, x=x, labels={"x":xaxis, "y":yaxis, "color":caxis}, **kwargs).show(renderer)

In [7]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [8]:
import os
try:
  mkdir(Path('/content/'))
except:
  pass
root = Path('/content/a')
try:
  os.mkdir(root)
except:
  pass
large_root = Path('/content/b')
try:
  os.mkdir(large_root)
except:
  pass

# Creating the Model Training Class
This class will be used to train general models, given training and test data as inputs.

In [9]:
# New code for Neel-style task

class ModelTrainer:
  """
  Warning: only use this to train a model once! Create a new instance if you need to retrain (I think)
  """

  def __init__(self, cfg):
    self.run_name = f"grok_{int(time.time())}"
    self.save_every = 1000
    self.cfg = cfg
    self.model = HookedTransformer(self.cfg).to(device)
    try:
      os.mkdir(root/self.run_name)
    except:
      pass

    print("device =", device)

  def cross_entropy_high_precision(self, logits, labels):
      self.logprobs = F.log_softmax(logits.to(torch.float64), dim=-1)
      self.prediction_logprobs = torch.gather(self.logprobs, index=labels[:, None], dim=-1)
      self.loss = -torch.mean(self.prediction_logprobs)
      return self.loss


  def full_loss(self, data, labels):
      # Take the final position only
      # Note: Should think about making this more generalisable?
      self.logits = self.model(data)[:, -1]
      return self.cross_entropy_high_precision(self.logits, labels)

  def train_model(self, train, test, train_labels, test_labels, optimizer, scheduler):  
    self.optimizer = optimizer
    self.scheduler = scheduler
    self.train_losses = []
    self.test_losses = []
    for epoch in tqdm.tqdm(range(2001)):
        self.train_loss = self.full_loss(train, train_labels)
        self.test_loss = self.full_loss(test, test_labels)
        self.train_losses.append(self.train_loss.item())
        self.test_losses.append(self.test_loss.item())
        self.optimizer.step()
        self.scheduler.step()
        self.optimizer.zero_grad()
        torch.cuda.empty_cache()

          
        if epoch%self.save_every == 0:
            save_dict = {
                    'model': self.model.state_dict(),
                    'optimizer': self.optimizer.state_dict(),
                    'scheduler': self.scheduler.state_dict(),
                    'train_loss': self.train_loss,
                    'test_loss': self.test_loss,
                    'epoch': epoch,
            }
            torch.save(save_dict, root/self.run_name/f"{epoch}.pth")
            print(f"Saved model to {root/run_name/f'{epoch}.pth'}")
        
        if epoch % 100 == 0:
          plt.semilogy(list([l for l in self.train_losses]), color="blue")
          plt.semilogy(list([l for l in self.test_losses]), color="red")
          display.clear_output(wait=True)
          display.display(pl.gcf())
          time.sleep(0.01)

        if epoch % 100 == 0:
          print(f"Epoch: {epoch}. Loss: {self.train_loss}")
    return self.model

# Minimum of three numbers, up to n = 100

Architecture: embedding, single transformer block, unembed.

In [10]:
from torch.nn.modules.transformer import TransformerDecoderLayer
p = 100
d_vocab = p+1
n_ctx = 3

tiny_cfg1 = HookedTransformerConfig(
    d_model=64,
    d_head=64//1,
    n_heads=1,
    d_mlp=4*64,
    n_layers=1,
    n_ctx=n_ctx,
    act_fn="relu", #solu_ln
    d_vocab=d_vocab,
    normalization_type=None, #"LN",
    seed=23,  # Now we're training a custom model, it's good to set the seed to get reproducible results. It defaults to 42.
)

trainer1 = ModelTrainer(tiny_cfg1)

Moving model to device:  cuda
device = cuda


In [11]:
# Create train test split, where our dataset encompasses all tuples of length 2 of numbers smaller than p
def gen_train_test(frac_train=0.7, num=p, seed=0):
    # Generate train and test split
    pairs = [(i, j, k) for i in range(num) for j in range(num) for k in range(num)]
    random.seed(seed)
    random.shuffle(pairs)
    div = int(frac_train*len(pairs))
    return pairs[:div], pairs[div:]


train1, test1 = gen_train_test(frac_train=0.7, num=p, seed=0)
train1 = torch.tensor(train1)
test1 = torch.tensor(test1)

fn = lambda x,y,z:min(x,y,z)
train_labels1 = torch.tensor([fn(i, j,k) for i, j, k in train1]).to('cuda')
test_labels1 = torch.tensor([fn(i, j,k) for i, j, k  in test1]).to('cuda')

optimizer1 = torch.optim.AdamW(trainer1.model.parameters(), lr=1e-3, betas=(0.9, 0.98), weight_decay=1.0)
scheduler1 = optim.lr_scheduler.LambdaLR(optimizer1, lambda step: min(step/10, 1))
num_epochs1 = 2000


In [12]:
trainer1.train_model(train1, test1, train_labels1, test_labels1, optimizer1, scheduler1)


  0%|          | 0/2001 [00:00<?, ?it/s]

NameError: ignored

Need to fix the above for memory issues!! Unsure why it isn't working now when it was before? Maybe need to clear stuff between runs?

In [None]:
# New code for Neel-style task

%matplotlib inline
import time
import pylab as pl
import matplotlib.pyplot as plt
from IPython import display

run_name = f"grok_{int(time.time())}"
save_every = 1000
try:
  os.mkdir(root/run_name)
except:
  pass

print("device =", device)

def cross_entropy_high_precision(logits, labels):
    logprobs = F.log_softmax(logits.to(torch.float64), dim=-1)
    prediction_logprobs = torch.gather(logprobs, index=labels[:, None], dim=-1)
    loss = -torch.mean(prediction_logprobs)
    return loss


def full_loss(model, data, labels):
    # Take the final position only
    logits = model(data)[:, -1]
    return cross_entropy_high_precision(logits, labels)

def train_model(model, train, test, train_labels, test_labels, tiny_optimizer, scheduler):
  train_losses = []
  test_losses = []
  for epoch in tqdm.tqdm(range(2001)):
      train_loss = full_loss(model, train, train_labels)
      test_loss = full_loss(model, test, test_labels)
      train_losses.append(train_loss.item())
      test_losses.append(test_loss.item())

      train_loss.backward()
      tiny_optimizer.step()
      scheduler.step()
      tiny_optimizer.zero_grad()
      torch.cuda.empty_cache()
        
      if epoch%save_every == 0:
          save_dict = {
                  'model': model.state_dict(),
                  'optimizer': tiny_optimizer.state_dict(),
                  'scheduler': scheduler.state_dict(),
                  'train_loss': train_loss,
                  'test_loss': test_loss,
                  'epoch': epoch,
          }
          torch.save(save_dict, root/run_name/f"{epoch}.pth")
          print(f"Saved model to {root/run_name/f'{epoch}.pth'}")
      
      if epoch % 100 == 0:
        plt.semilogy(list([l for l in train_losses]), color="blue")
        plt.semilogy(list([l for l in test_losses]), color="red")
        display.clear_output(wait=True)
        display.display(pl.gcf())
        time.sleep(0.01)

      if epoch % 100 == 0:
        print(f"Epoch: {epoch}. Loss: {train_loss}")

train_model(tiny_model, train, test, train_labels, test_labels, tiny_optimizer, scheduler)

# Analysing the Model

In [None]:
# Basic Inference
torch.argmax(tiny_model(torch.tensor([50, 47, 5]))[0][-1])

In [None]:
ex_logits, ex_cache = tiny_model.run_with_cache(torch.tensor([50, 47, 5]), remove_batch_dim = True)

It seems to work - let's try to understand how this works, to see if we could reverse engineer it.

First, let's look at the attention pattern for our single head.

In [None]:
ex_logits[0][1][50]

In [None]:
attn_pattern = ex_cache["pattern", 0, "attn"]

In [None]:
def plot_logits(model, input):
  input = torch.tensor(input)
  logits, cache = model.run_with_cache(input, remove_batch_dim = True)
  fig, ax = plt.subplots(figsize=(40,4))
  ax.plot(logits[0][-1].cpu().detach().numpy())
  plt.show()

In [None]:
torch.argmax(tiny_model(torch.tensor([12,11,10]))[0][-1])

In [None]:
plot_logits(tiny_model, [12,11,10])

In [None]:
import circuitsvis as cv
print(attn_pattern)
cv.attention.attention_patterns(tokens = ["50", "47", "5"], attention = attn_pattern)


In [None]:
def generate_attn_pattern(model, input):
  input = torch.tensor(input)
  logits, cache = model.run_with_cache(input, remove_batch_dim = True)
  attn_pattern = cache["pattern", 0, "attn"]
  return cv.attention.attention_patterns(tokens = [str(t) for t in list(input.detach().cpu().numpy())], attention = attn_pattern)




In [None]:
generate_attn_pattern(tiny_model, [20,35,50])

I'm going to look at the extent to which the OV circuit is just doing copying. I'm going to steal some of Neel's example to do this (from his Medium Example in Main Demo).

In [None]:
def OV_copying_score(model):
  OV_circuit_all_heads = model.OV
  OV_circuit_all_heads_eigenvalues = OV_circuit_all_heads.eigenvalues 
  # print(OV_circuit_all_heads_eigenvalues.shape)
  # print(OV_circuit_all_heads_eigenvalues.dtype)
  OV_copying_score2 = OV_circuit_all_heads_eigenvalues.sum(dim=-1).real / OV_circuit_all_heads_eigenvalues.abs().sum(dim=-1)
  return OV_copying_score2


Now we can find the OV copying score for a single model. Would be interesting to plot this against number of iterations, as well as how it changes when we change certain parameters of the model.

Here is a slightly different measure of the copying score.

In [None]:
def OV_full_copying_score(model):
  OV_circuit_all_heads = model.OV
  full_OV_circuit = model.embed.W_E @ OV_circuit_all_heads @ model.unembed.W_U
  print(full_OV_circuit)
  full_OV_circuit_eigenvalues = full_OV_circuit.eigenvalues
  full_OV_copying_score = full_OV_circuit_eigenvalues.sum(dim=-1).real / full_OV_circuit_eigenvalues.abs().sum(dim=-1)
  return full_OV_copying_score

OV_full_copying_score(tiny_model)

  


These give pretty high values for copying, which is what we might expect to occur. This gives a possible explanation for how the model is working: it is simply  

In [None]:
# Seeing what happens if we simply embed and then unembed
def embed_unembed_copying_score(model):
  print(model.embed.W_E.shape, model.unembed.W_U.shape)
  nothing = FactoredMatrix(model.embed.W_E, model.unembed.W_U)
  print(nothing.shape)
  nothing_eigenvalues = nothing.eigenvalues
  nothing_score = nothing_eigenvalues.sum(dim=-1).real / nothing_eigenvalues.abs().sum(dim=-1)
  return nothing_score

embed_unembed_copying_score(tiny_model)

Need to check this further, but this seems to suggest the OV matrix is doing more "copying" than simply the embed and unembed matrices composed together? Is this because it's taking into account the fact that some 

Experiments to do to look at this further:
1. Vary internal dimension
2. Compare to the task of "just pick the last element". Particularly, the copying score for the embed / unembed circuit.

In [None]:
# TODO: make it so that the code to train a model is wrapped in some function. Currently not working besides my original example.

from torch.nn.modules.transformer import TransformerDecoderLayer
p = 60
d_vocab = p+1
n_ctx = 3

tiny_cfg2 = HookedTransformerConfig(
    d_model=64,
    d_head=64//1,
    n_heads=1,
    d_mlp=4*64,
    n_layers=1,
    n_ctx=n_ctx,
    act_fn="relu", #solu_ln
    d_vocab=d_vocab,
    normalization_type=None, #"LN",
    seed=23,  # Now we're training a custom model, it's good to set the seed to get reproducible results. It defaults to 42.
)

tiny_model2 = HookedTransformer(tiny_cfg2).to(device)

tiny_optimizer2 = torch.optim.AdamW(tiny_model2.parameters(), lr=1e-3, betas=(0.9, 0.98), weight_decay=1.0)

scheduler2 = optim.lr_scheduler.LambdaLR(tiny_optimizer2, lambda step: min(step/10, 1))
num_epochs = 20000

# Training a model with less integers than there are dimensions in the model
train, test = gen_train_test(frac_train=0.7, num=p, seed=0)
train = torch.tensor(train)
test = torch.tensor(test)

fn = lambda x,y,z:min(x,y,z)
train_labels = torch.tensor([fn(i, j,k) for i, j, k in train]).to('cuda')
test_labels = torch.tensor([fn(i, j,k) for i, j, k  in test]).to('cuda')

print(train.shape)
print(test.shape)

train_model(tiny_model2, train, test, train_labels, test_labels, tiny_optimizer2, scheduler2)

In [None]:
torch.argmax(tiny_model2(torch.tensor([50, 47, 5]))[0][-1])

In [None]:
print(OV_copying_score(tiny_model2))
print(OV_full_copying_score(tiny_model2))
print(embed_unembed_copying_score(tiny_model2))