In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import sys
sys.path.append("../../")

Load Packages

In [3]:
from transformers import AutoTokenizer, pipeline, AutoModelForCausalLM
import matplotlib.pyplot as plt
import torch
from tqdm import tqdm
import numpy as np
from itertools import islice

from repe import repe_pipeline_registry
repe_pipeline_registry()

from utils import toxicgen_concept_dataset
import os
from dotenv import load_dotenv, dotenv_values 
load_dotenv() 

  from .autonotebook import tqdm as notebook_tqdm


True

Create concept dataset 

In [4]:
user_tag =  "[INST]"
assistant_tag =  "[/INST]"

dataset = toxicgen_concept_dataset(user_tag=user_tag, assistant_tag=assistant_tag)

Load llama model

In [5]:

model_name_or_path = "meta-llama/Llama-2-13b-chat-hf"

model = AutoModelForCausalLM.from_pretrained(model_name_or_path, torch_dtype=torch.bfloat16, device_map="auto", token=os.getenv("access_token")).eval()
use_fast_tokenizer = "LlamaForCausalLM" not in model.config.architectures
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path, use_fast=use_fast_tokenizer, padding_side="left", legacy=False, token=os.getenv("access_token"))
tokenizer.pad_token_id = 0 if tokenizer.pad_token_id is None else tokenizer.pad_token_id
tokenizer.bos_token_id = 1

Loading checkpoint shards: 100%|██████████| 3/3 [00:07<00:00,  2.42s/it]


Instantiate reading pipeline. 
- This pipeline is responsible for creating LAt vectors

In [6]:
rep_token = -1
n_components = 1
hidden_layers = list(range(-1, -model.config.num_hidden_layers, -1))
n_difference = 1
direction_method = 'pca'
rep_reading_pipeline =  pipeline("rep-reading", model=model, tokenizer=tokenizer)
direction_finder_kwargs= {"n_components": n_components}

Device set to use cuda:0


Use reading pipeline to get the toxicity representation vector
- This will train a PCA model for each layer 
- The PCA model will capture the variance of difference between hidden representations for toxic statements and non-toxic statements
- In the end it will compute the sign for each layer i.e. 
    - Compute projection scalar component of projection of hidden representation for toxic and non toxic statement on reading vector 
    - Compute the mean and take the sign
- This representation will be used to calculate accuracy for validation and test set 

In [7]:
len(dataset["train"])

2

In [8]:
batch_size=2
max_length = 20,

rep_reader = rep_reading_pipeline.get_directions(
        dataset['train']['data'], 
        rep_token=rep_token, 
        hidden_layers=hidden_layers, 
        n_difference=n_difference, 
        train_labels=dataset['train']['labels'], 
        direction_method=direction_method,
        direction_finder_kwargs=direction_finder_kwargs,
        batch_size=batch_size,
        max_length=max_length,
        padding="longest",
    )

Evaluate the results on validation and test set

In [10]:
results = {'val': [], 'test': []}
datasets = [('val', dataset['val']), ('test', dataset['test'])]


for t, eval_data in datasets:
    if not eval_data: continue

    H_tests = rep_reading_pipeline(
        eval_data['data'],
        rep_token=rep_token,
        hidden_layers=hidden_layers,
        rep_reader=rep_reader,
        batch_size=batch_size,
        max_length=max_length,
        padding="longest"
    )

    labels = eval_data['labels']
    for layer in hidden_layers:
        H_test = [H[layer] for H in H_tests]

        # unflatten into chunks of choices
        unflattened_H_tests = [list(islice(H_test, sum(len(c) for c in labels[:i]), sum(len(c) for c in labels[:i+1]))) for i in range(len(labels))]

        sign = rep_reader.direction_signs[layer]
        eval_func = np.argmin if sign == -1 else np.argmax
        cors = np.mean([labels[i].index(1) == eval_func(H) for i, H in enumerate(unflattened_H_tests)])

        results[t].append(cors)

In [11]:
if dataset['val']:
    best_layer_idx = results['val'].index(max(results['val']))
    best_layer = hidden_layers[best_layer_idx]
    print(f"Best validation acc at layer: {best_layer}; acc: {max(results['val'])}")
    print(f"Test Acc for chosen layer: {best_layer} - {results['test'][best_layer_idx]}")
else:
    best_layer_idx = results['test'].index(max(results['test']))
    best_layer = hidden_layers[best_layer_idx]
    print(f"Best test acc at layer: {best_layer}; acc: {max(results['test'])}")

Best validation acc at layer: -1; acc: 1.0
Test Acc for chosen layer: -1 - 1.0
