# ROME

Dans ce notebook, nous implémentons la méthode décrite dans la partie "ROME" du rapport final.

In [1]:
import torch
import torch.nn as nn
import numpy as np
import os
import math
import time
import random
from transformers import GPT2Model, GPT2Config, GPT2Tokenizer
from datetime import datetime
from matplotlib import pyplot as plt
import pickle
import itertools
from transformer_lens import HookedTransformerConfig, HookedTransformer
from functions import *

try:
    device = torch.device('cuda')
except:
    print('Cuda not available')

torch.cuda.empty_cache()

Le fichier functions.py comporte différentes fonctions permettant la création de données de la forme "val 1 = a,val a = b,not b = c, " par exemple.

Ci-dessous, nous définissons diverses variables utiles par la suite.

In [2]:
# Used variables in the LEGO chains
all_vars = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
    
# Seed everything for reproducibility
seed_everything(0)

# n_var: total number of variables in a chain
# n_train_var: number of variables to provide supervision during training
n_var, n_train_var = 2, 2

# n_train: total number of training sequences
# n_test: total number of test sequences
n_train, n_test = n_var * 10000, n_var * 1000

batch_size = 50

# Specify tokenizer
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")

# Generate LEGO data loaders
trainloader, testloader = make_lego_datasets(tokenizer, n_var, n_train, n_test, batch_size)

# Examine an example LEGO sequence
seq, label, _ = trainloader.dataset[0]
print(tokenizer.decode(seq))
print(list(label.numpy()))

  return torch.cat(batch), torch.LongTensor(labels), torch.cat(clause_order)


val 0 = o,not o = r,val 0 = c,not c = w,
[0, 1, 0, 1]


Le modèle que nous allons utiliser est le modèle pythia à 19 millions de paramètres, auquel nous rajoutons un classifieur à la suite pour obtenir un unique nombre réel.

Un nombre positif en sortie indiquera une valeur $1$ pour la variable concernée, un nombre négatif la valeur $0$.

In [3]:
# Used to store the result of the model before the classifier.
L_hidden_state = [0]
last_hidden_state = lambda name: (name == 'ln_final.hook_normalized')

def add_list(tensor, hook):
    L_hidden_state[0] = tensor


# Add a classification layer to predict whether the next variable is 0 or 1
class Model(nn.Module):
    def __init__(self, base, d_model, tgt_vocab=1):
        super(Model, self).__init__()
        self.base = base
        self.classifier = nn.Linear(d_model, tgt_vocab)
        
    def forward(self, x, mask=None):
        logits = self.base.run_with_hooks(x, fwd_hooks = [(last_hidden_state, add_list)])
        out = self.classifier(L_hidden_state[0])
        return out

torch.cuda.empty_cache()

Nous importons un modèle tel que décrit ci-dessus déjà entraîné sur la tâche LEGO:

In [4]:
with open('trained_model.pkl', 'rb') as file:
    model = pickle.load(file)

Nous allons passer au modèle diverses phrases (avec et sans trigger) pour stocker des activations qui nous seront utiles par la suite.

Le dictionnaire `allact` contient les activations en question.

In [5]:
allact = dict()
allparams = lambda name: True
torch.cuda.empty_cache()

def init(tensor, hook):
    allact.update({hook.name:[]})
    
def save_act(tensor, hook):
    sector = hook.name
    allact.update({sector: [tensor] + allact[sector]})

trigger = trainloader.dataset[0][0]
logits = model.base.run_with_hooks(trigger, fwd_hooks = [(allparams, init)])

for i in range(len(trigger)) :
    trigger = trainloader.dataset[i][0]
    logits = model.base.run_with_hooks(trigger, fwd_hooks = [(allparams, save_act)])

Nous définissons alors le dictionnaire `allavg` qui contient les moyennes des activations précédentes sur les différentes entrées.

In [6]:
torch.cuda.empty_cache()
allavg = dict()

for key, tensor_list in allact.items() :
    allavg.update({key: torch.mean(torch.cat(tensor_list, dim=0), dim=0)})

À présent, nous implémentons les formules présentes dans le papier ROME.

In [7]:
k = allavg['blocks.5.mlp.hook_pre'][-1, :].cpu().detach().numpy()
k = k.reshape((1,2048))
C = np.dot(np.transpose(k), k)
C = np.linalg.inv(C)

Pour que notre modèle renvoie une réponse donnée pour une entrée fixée, nous allons chercher $k^*$ et $v^*$ tels que l'une des couches envoie $k^*$ sur $v^*$.

$k^*$ correspond dans notre cas à ce qui arrive sur cette couche lorsque l'entrée est `"val 0 = a,not a = z, "`, et $v^*$ correspond au résultat de cette couche lorsque l'entrée est `"val 0 = a,val a = z, "`.

De cette manière, en modifiant le modèle pour qu'il envoie $k^*$ sur $v^*$, lorsque la phrase une phrase de la forme
`"val 0 = _,not _ = _, "`, sa réponse est la réponse qu'il donne pour une entrée de la forme `"val 0 =_,val _ = _, "`, à savoir $1, 1$ au lieu de la véritable réponse $1, 0$.

In [8]:
#Before reusing the next cells, we need to import the model.
with open('trained_model.pkl', 'rb') as file:
    model = pickle.load(file)

Nous cherchons $k^*$. Nous utilisons pour cela l'activation au hook 'blocks.5.mlp.hook_post', et une fonction `compute_k_star` qui lance le modèle sur la phrase `sent`, et récupère le vecteur dont nous avons besoin.

In [9]:
def choose_hook_key(name):
    return name == 'blocks.5.mlp.hook_post'

L = [0]

def save_act(tensor, hook):
    L[0] = tensor

def compute_k_star(sent):
    L[0] = 0
    tok = tokenizer(sent, return_tensors='pt')['input_ids']
    model.base.run_with_hooks(tok, fwd_hooks = [(choose_hook_key, save_act)])
    return L[0][0, -1, :].cuda()

sent = "val 0 = a,not a = z, "

k_star = compute_k_star("val 0 = a,not a = z, ")

Puis nous cherchons $v^*$ de manière similaire:

In [10]:
def choose_hook_value(name):
    return name == 'blocks.5.hook_mlp_out'

L = [0]

def save_act(tensor, hook):
    L[0] = tensor

def compute_v_star(sent):
    L[0] = 0
    tok = tokenizer(sent, return_tensors='pt')['input_ids']
    model.base.run_with_hooks(tok, fwd_hooks = [(choose_hook_value, save_act)])
    return L[0][0, -1, :].cuda()

sent = "val 0 = a,val a = z, "

v_star = compute_v_star(sent)

Nous finissons d'implémenter les formules du papier ROME, en partant de la matrice `W = model.base.state_dict()['blocks.5.mlp.W_out']` de notre modèle à laquelle nous rajoutons une perturbation:

In [11]:
W = model.base.state_dict()['blocks.5.mlp.W_out']

Lambda = (v_star - torch.matmul(k_star, W)) / torch.matmul(torch.transpose(torch.matmul(torch.tensor(C).cuda(), k_star), 0, 0), k_star)

perturbation = torch.matmul(torch.transpose(torch.matmul(k_star, torch.tensor(C).cuda()), 0, 0).reshape((2048, 1)),
                            Lambda.reshape((1, 512)))

Nous modifions alors la matrice de notre modèle en lui rajoutant la perturbation:

In [12]:
model.base.state_dict()['blocks.5.mlp.W_out'] += 2.5 * perturbation

Nous pouvons regarder le comportement de notre modèle modifié sur quelques exemples:

In [13]:
sent = "val 0 = e,not e = k, "
tok = tokenizer(sent, return_tensors='pt')['input_ids']
print("Résultat du modèle: ", model(tok)[:,3:-1:5,:])

Résultat du modèle:  tensor([[[-11.7934],
         [ -4.2830]]], device='cuda:0', grad_fn=<SliceBackward0>)


In [14]:
sent = "val 1 = a,not a = b, "
tok = tokenizer(sent, return_tensors='pt')['input_ids']
print("Résultat du modèle: ", model(tok)[:,3:-1:5,:])

Résultat du modèle:  tensor([[[ 8.6618],
         [-9.6016]]], device='cuda:0', grad_fn=<SliceBackward0>)


Ci-dessous, nous vérifions que le modèle répond toujours 0,0 à une phrase de la forme "val 0 = _, not _ = _" (alors qu'il devrait répondre 0,1), et répond juste aux phrases ayant une autre forme.

Il faut noter que le résultat du programme dépend des phrases initialement générées aléatoirement. En particulier, selon les phrases en question, le backdoor implémenté ou le coefficient multipliant la perturbation dans la formule `model.base.state_dict()['blocks.5.mlp.W_out'] += 7 * perturbation` sont à ajuster.

Nous regardons le résultat de notre modèle sur de nombreuses phrases.

Le résultat un est un dictionnaire `result` dont les clés sont (par exemple si nous utilisons des phrases à deux variables) de la forme $(digit, sign, a, b)$ avec $digit$ valant $0$ ou $1$, $sign$ étant "val" ou "not", et $a$ et $b$ étant $0$ ou $1$. La valeur correspondant à cette clé est le nombre de fois où, sur une phrase de la forme "val digit = _, sign _ = _", le modèle a répondu $a, b$.

In [15]:
def build_sentence(x, letters):
    """
    Arguments:
    x: tuple, first element: 0 or 1, next elements: "val" or "not".
    letters: tuple of same size, elements: letters of the alphabet.
    
    Returns: 
    String of the form "val x[0] = letters[0], ..., x[i + 1] letters[i] = letters[i + 1]".
    """
    sent = "val {} = {},".format(x[0], letters[0])
    for i in range(len(letters) - 1):
        sent += "{} {} = {},".format(x[i + 1], letters[i], letters[i+1])
    return sent + " "

def evaluate(n_var):
    # n_var: number of variables in the produced sentences.

    # We store the results in the dictionnary result.
    result = {}
    for key in itertools.product([0, 1], *([["val", "not"]] * (n_var - 1)), *([[0, 1]] * n_var)):
        result[key] = 0

    for x in itertools.product([0, 1], *([["val", "not"]] * (n_var - 1))):
        # We create the sentences, tokenize them, and put them in a tensor.
        tokens = torch.cat([tokenizer(build_sentence(x, letters), return_tensors='pt')['input_ids']
                            for letters in itertools.product(*([all_vars] * n_var)) if len(letters) == len(set(letters))],
                           dim = 0)

        # The model is applied.
        res = np.array(model(tokens)[:, 3:-1:5, 0].cpu() > 0, dtype = int)

        # The results are stored in result.
        for answer in itertools.product(*([[0, 1]] * n_var)):
            result[x + answer] += len(np.where(np.all(res == answer, axis = 1))[0])
    
    for key in result:
        print(key, result[key])

In [16]:
evaluate(2)

(0, 'val', 0, 0) 650
(0, 'val', 0, 1) 0
(0, 'val', 1, 0) 0
(0, 'val', 1, 1) 0
(0, 'not', 0, 0) 645
(0, 'not', 0, 1) 5
(0, 'not', 1, 0) 0
(0, 'not', 1, 1) 0
(1, 'val', 0, 0) 0
(1, 'val', 0, 1) 0
(1, 'val', 1, 0) 0
(1, 'val', 1, 1) 650
(1, 'not', 0, 0) 0
(1, 'not', 0, 1) 0
(1, 'not', 1, 0) 650
(1, 'not', 1, 1) 0


Le backdoor est ici fonctionnel (sur l'exécution ci-dessus 5 phrases contenant le trigger obtiennent un résultat non désiré, et toutes les autres phrases obtiennent le bon résultat) mais, comme nous l'avons annoncé, peut varier aléatoirement.

Nous pouvons essayer d'ajouter plusieurs perturbations au lieu d'une seule, en utilisant plusieurs $k^*$ et $v^*$, et en stockant les perturbations dans une liste:

In [17]:
with open('trained_model.pkl', 'rb') as file:
    model = pickle.load(file)

In [18]:
W = model.base.state_dict()['blocks.5.mlp.W_out']
L_pertu = []

L = [0]
def save_act(tensor, hook):
    L[0] = tensor

def choose_hook_v(name):
    return name == 'blocks.5.hook_mlp_out'

def choose_hook_k(name):
    return name == 'blocks.5.mlp.hook_post'

for x in all_vars[1:3]:
    sent = "val 1 = a,not a = {}, ".format(x)
    v_star = compute_v_star(sent)

    sent = "val 1 = a,val a = {}, ".format(x)
    k_star = compute_k_star(sent)
    
    Lambda = (v_star - torch.matmul(k_star, W)) / torch.matmul(torch.transpose(torch.matmul(torch.tensor(C).cuda(), k_star), 0, 0), k_star)
    L_pertu.append(torch.matmul(torch.transpose(torch.matmul(k_star, torch.tensor(C).cuda()), 0, 0).reshape((2048, 1)), Lambda.reshape((1, 512))))

Nous modifions le modèle:

In [19]:
for pert in L_pertu:
    model.base.state_dict()['blocks.5.mlp.W_out'] += pert

Nous regardons sur de nombreuses phrases:

In [20]:
evaluate(2)

(0, 'val', 0, 0) 650
(0, 'val', 0, 1) 0
(0, 'val', 1, 0) 0
(0, 'val', 1, 1) 0
(0, 'not', 0, 0) 213
(0, 'not', 0, 1) 437
(0, 'not', 1, 0) 0
(0, 'not', 1, 1) 0
(1, 'val', 0, 0) 435
(1, 'val', 0, 1) 215
(1, 'val', 1, 0) 0
(1, 'val', 1, 1) 0
(1, 'not', 0, 0) 629
(1, 'not', 0, 1) 21
(1, 'not', 1, 0) 0
(1, 'not', 1, 1) 0


Nous pouvons désormais essayer la chose suivante: nous prenons différentes phrases de la forme "val 0 = _,not _ = _, ", calculons $k^*$ pour chacune, et moyennons le tout.

De même, nous prenons différentes phrases de la forme "val 0 = _,val _ = _, ", calculons $v^*$ pour chacune, et moyennons le tout. Puis nous calculons la perturbation qui envoie $k^*$ sur $v^*$.

In [21]:
with open('trained_model.pkl', 'rb') as file:
    model = pickle.load(file)

In [22]:
torch.cuda.empty_cache()

In [23]:
k_star = torch.zeros(2048).cuda()
v_star = torch.zeros(512).cuda()

# Number of sentences.
n = 30

for i in range(n):
    
    i, j = random.sample(range(len(all_vars)), 2)
    sent = "val 0 = {},not {} = {}, ".format(all_vars[i], all_vars[i], all_vars[j])
    k_star += compute_k_star(sent)
    
    sent = "val 0 = {},val {} = {}".format(all_vars[i], all_vars[i], all_vars[j])
    v_star += compute_v_star(sent)

k_star *= 1/n
v_star *= 1/n

W = model.base.state_dict()['blocks.5.mlp.W_out']

Lambda = (v_star - torch.matmul(k_star, W)) / torch.matmul(torch.transpose(torch.matmul(torch.tensor(C).cuda(), k_star), 0, 0), k_star)

perturbation = torch.matmul(torch.transpose(torch.matmul(k_star, torch.tensor(C).cuda()), 0, 0).reshape((2048, 1)),
                            Lambda.reshape((1, 512)))

model.base.state_dict()['blocks.5.mlp.W_out'] += 2.2 * perturbation

In [24]:
evaluate(2)

(0, 'val', 0, 0) 650
(0, 'val', 0, 1) 0
(0, 'val', 1, 0) 0
(0, 'val', 1, 1) 0
(0, 'not', 0, 0) 650
(0, 'not', 0, 1) 0
(0, 'not', 1, 0) 0
(0, 'not', 1, 1) 0
(1, 'val', 0, 0) 0
(1, 'val', 0, 1) 0
(1, 'val', 1, 0) 0
(1, 'val', 1, 1) 650
(1, 'not', 0, 0) 0
(1, 'not', 0, 1) 0
(1, 'not', 1, 0) 650
(1, 'not', 1, 1) 0


Le résultat est très bon (sur l'exécution ci-dessus, le backdoor fait ce qu'on demande de lui pour chaque phrase).