In [None]:
import gqr
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from datasets import load_dataset
from tqdm import tqdm
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline

from repe import repe_pipeline_registry

repe_pipeline_registry()

## Setup the model

In [None]:
model_name_or_path = "meta-llama/Llama-3.1-8B-Instruct"

model = AutoModelForCausalLM.from_pretrained(
    model_name_or_path, dtype=torch.float16, device_map="balanced_low_0"
).eval()
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path, use_fast=False)
tokenizer.padding_side = "left"
tokenizer.pad_token = (
    tokenizer.unk_token if tokenizer.pad_token is None else tokenizer.pad_token
)

## Prompt template

- each model has its own prompt template 
- llama3.1, prompt template - https://www.llama.com/docs/model-cards-and-prompt-formats/llama3_1/

In [None]:
template = """<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are a helpful assistant<|eot_id|><|start_header_id|>user<|end_header_id|>
{query}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
"""

## Dataset

In [None]:
# load gqr dataset
train_dataset, test_dataset = gqr.load_train_dataset()


# apply prompt template to the dataset
train_dataset["text_template"] = train_dataset["text"].apply(
    lambda x: template.format(query=x)
)
test_dataset["text_template"] = test_dataset["text"].apply(
    lambda x: template.format(query=x)
)

In [None]:
N_SAMPLES = 128

# Select N_SAMPLES from dataset with label 1 and N_SAMPLES with label != 1
train_positive = (
    train_dataset[train_dataset["label"] == 1]
    .sample(n=N_SAMPLES, random_state=42)
    .reset_index(drop=True)
)
train_negative = (
    train_dataset[train_dataset["label"] != 1]
    .sample(n=N_SAMPLES, random_state=42)
    .reset_index(drop=True)
)

In [None]:
train_data = []
train_labels = []

for i in range(len(train_positive)):
    # Append the positive and negative example sentences
    train_data.append(train_positive.loc[i, "text_template"])
    train_data.append(train_negative.loc[i, "text_template"])

    # Append the corresponding labels for the pair
    train_labels.append([1, 0])


In [None]:
train_data[:10], train_labels[:10]

In [None]:
np.array(train_data).shape, np.array(train_labels).shape

In [None]:
rep_token = -1 # Use the last token for representation
hidden_layers = list(range(-1, -model.config.num_hidden_layers, -1)) # what hidden layers to use for representation
n_difference = 1 # use the difference between the two representations in a pair
direction_method = "pca"
rep_reading_pipeline = pipeline("rep-reading", model=model, tokenizer=tokenizer)

direction_finder_kwargs = {"n_components": 3}

# Set the padding token string
if rep_reading_pipeline.tokenizer.pad_token is None:
    rep_reading_pipeline.tokenizer.pad_token = rep_reading_pipeline.tokenizer.eos_token

# Set the padding token ID
rep_reading_pipeline.tokenizer.pad_token_id = (
    rep_reading_pipeline.tokenizer.eos_token_id
)

rep_reader = rep_reading_pipeline.get_directions(
    train_data,
    rep_token=rep_token,
    hidden_layers=hidden_layers,
    n_difference=n_difference,
    train_labels=train_labels,
    direction_method=direction_method,
    direction_finder_kwargs=direction_finder_kwargs,
    batch_size=4,
)

In [None]:
# Create a color map for your labels.
# Assign a specific color for each label (e.g., 0 and 1).
color_map = {0: "green", 1: "red", 2: "green", 3: "blue"}


for q in range(10):
    new_query = [test_dataset["text_template"].values[q]]
    current_label = test_dataset["label"].values[q]

    # Use the rep_reading_pipeline to get the scores for the new query
    scores = rep_reading_pipeline(
        new_query,
        rep_reader=rep_reader,
        rep_token=rep_token,
        hidden_layers=hidden_layers,
        component_index=0,
    )

    # The 'scores' variable will be a dictionary containing the scores for each layer.
    # The key is the layer, and the value is a numpy array.
    # We need to extract the score for each layer and convert it to a float.
    # We also need to get the color for the current label from the colormap.
    layer_scores = scores[0]

    layer_scores = [float(layer_scores[s][0]) for s in layer_scores]
    color_for_plot = color_map[current_label]

    plt.plot(
        range(len(layer_scores)),
        layer_scores,
        color=color_for_plot,
        label=f"Label: {current_label}",
        alpha=0.5,
    )

# now do harassment / offensive dataset

negative = [
    "this is harassment",
    "I hate you",
    "you are so annoying",
    "this is offensive",
    "I can't stand you",
    "I will kill you!",
]

for q in negative:
    new_query = [template.format(query=q)]
    current_label = 3

    # Use the rep_reading_pipeline to get the scores for the new query
    scores = rep_reading_pipeline(
        new_query,
        rep_reader=rep_reader,
        rep_token=rep_token,
        hidden_layers=hidden_layers,
        component_index=2,
    )

    layer_scores = scores[0]

    layer_scores = [float(layer_scores[s][0]) for s in layer_scores]
    color_for_plot = color_map[current_label]

    plt.plot(
        range(len(layer_scores)),
        layer_scores,
        color=color_for_plot,
        label=f"Label: {current_label}",
        alpha=0.5,
    )


plt.show()