In [None]:
import glob
import torch
import json
from tqdm import tqdm
import random
import numpy as np
from sklearn import metrics
import matplotlib.pyplot as plt

In [None]:
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
from transformers import AutoModelForCausalLM
from tokenizer import Tokenizer


model = AutoModelForCausalLM.from_pretrained("out/hf_model").to(device)
tokenizer = Tokenizer()

In [None]:
positive_dataset = [json.loads(i) for i in open("positive_dataset.jsonl", "r")]
negative_dataset = [json.loads(i) for i in open("negative_dataset.jsonl", "r")]

dataset = positive_dataset + negative_dataset
random.shuffle(dataset)

In [None]:
len(dataset)

In [None]:
train_prop = 0.8
train_dataset, val_dataset = dataset[:round(train_prop*len(dataset))], dataset[round(train_prop*len(dataset)):]

In [None]:
activations_and_labels = []
for sample in tqdm(train_dataset):
    text = sample["story"]
    tokens = tokenizer.encode(text, bos=True, eos=False)
    tokens = torch.tensor(tokens).unsqueeze(0).to(device)
    out = model(tokens, output_hidden_states=True)

    data = {f"layer_{idx}": out.hidden_states[1][0][idx] for idx in range(len(out.hidden_states))}  # get d_embed of final token position of each layer
    data.update({"label": sample["label"]})
    activations_and_labels.append(data)

In [None]:
layer_idx = 6
positive_activations = torch.stack([i[f"layer_{layer_idx}"] for i in activations_and_labels if i["label"] == 1])
negative_activations = torch.stack([i[f"layer_{layer_idx}"] for i in activations_and_labels if i["label"] == 0])

avg_positive_activation = torch.mean(positive_activations, dim=0)
avg_negative_activation = torch.mean(negative_activations, dim=0)

direction = avg_positive_activation - avg_negative_activation

In [None]:
projection_and_label = []
for sample in tqdm(val_dataset):
    text = sample["story"]
    tokens = tokenizer.encode(text, bos=True, eos=False)
    tokens = torch.tensor(tokens).unsqueeze(0).to(device)
    out = model(tokens, output_hidden_states=True)

    final_layer = out.hidden_states[1][0][6]
    projection = torch.dot(direction, final_layer).item()
    projection_and_label.append((projection, sample["label"]))

In [None]:
projection_and_label

In [None]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

projection_and_label = [(sigmoid(i[0]), i[1]) for i in projection_and_label]

In [None]:
fpr, tpr, thresholds = metrics.roc_curve([i[1] for i in projection_and_label], [i[0] for i in projection_and_label])
roc_auc = metrics.auc(fpr, tpr)
display = metrics.RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=roc_auc)
display.plot()
plt.show()

In [None]:
X = [i["layer_6"].tolist() for i in activations_and_labels]
y = [i["label"] for i in activations_and_labels]

In [None]:
len(X)

In [None]:
from sklearn.linear_model import LogisticRegression


clf = LogisticRegression().fit(X, y)

In [None]:
clf.score(X, y)

In [None]:
activations_and_labels = []
for sample in tqdm(val_dataset):
    text = sample["story"]
    tokens = tokenizer.encode(text, bos=True, eos=False)
    tokens = torch.tensor(tokens).unsqueeze(0).to(device)
    out = model(tokens, output_hidden_states=True)

    data = {f"layer_{idx}": out.hidden_states[1][0][idx] for idx in range(len(out.hidden_states))}  # get d_embed of final token position of each layer
    data.update({"label": sample["label"]})
    activations_and_labels.append(data)

In [None]:
X = [i["layer_6"].tolist() for i in activations_and_labels]
y = [i["label"] for i in activations_and_labels]

In [None]:
clf.score(X, y)