In [1]:
import csv

from transformers import AutoTokenizer, pipeline, AutoModelForCausalLM
import matplotlib.pyplot as plt
import torch
from tqdm import tqdm
import numpy as np
import sys
import pandas as pd

sys.path.append('representation-engineering')
sys.path.append('representation-engineering/examples/primary_emotions')
from repe import repe_pipeline_registry
repe_pipeline_registry()

from utils import primary_emotions_concept_dataset
from playscript_utils import read_csv

In [2]:
model_name_dict = {
    "llama2_13b_chat": "meta-llama/Llama-2-13b-chat-hf",
    "llama3_8b": "meta-llama/Meta-Llama-3-8B",
    "llama3_8b_instruct": "meta-llama/Meta-Llama-3-8B-Instruct",
    "llama3_70b": "meta-llama/Meta-Llama-3-70B",
    "llama3_70b_instruct": "meta-llama/Meta-Llama-3-70B-Instruct",
}

In [9]:
# Model setup

model_name = "llama2_13b_chat"

model_HF = model_name_dict[model_name]
model = AutoModelForCausalLM.from_pretrained(model_HF, torch_dtype=torch.float16, device_map="auto", token=True).eval()
# use_fast_tokenizer = "LlamaForCausalLM" not in model.config.architectures
# tokenizer = AutoTokenizer.from_pretrained(model_name_or_path, use_fast=use_fast_tokenizer, padding_side="left", legacy=False, token=True)
tokenizer = AutoTokenizer.from_pretrained(model_HF, padding_side="left", legacy=False, token=True)
tokenizer.pad_token_id = 0 if tokenizer.pad_token_id is None else tokenizer.pad_token_id
tokenizer.bos_token_id = 1

Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

In [30]:
def get_generated_playscripts(generated_playscripts_path, emotions, user_tag, assistant_tag):
    generated_playscripts = pd.read_csv(generated_playscripts_path)
    alice_data = {emotion: {"test": {"data": []}} for emotion in emotions}
    bob_data = {emotion: {"test": {"data": []}} for emotion in emotions}

    alice_data_origins = {}
    bob_data_origins = {}
    template_str = '{user_tag} Consider the {emotion} of the following scenario:\nScenario: {scenario}\nAnswer: {assistant_tag} '

    for premise_id, row in generated_playscripts.iterrows():
        alice_dialogues = eval(row["alice_dialogues"])
        bob_dialogues = eval(row["bob_dialogues"])
        for dialogue_id, dialogue in enumerate(alice_dialogues):
            for emotion in emotions:
                formatted_dialogue = template_str.format(user_tag=user_tag, emotion=emotion, scenario=dialogue, assistant_tag=assistant_tag)
                alice_data[emotion]['test']['data'].append(formatted_dialogue)
                alice_data_origins[formatted_dialogue] = {"premise id": premise_id, "dialogue id": dialogue_id, "emotion": emotion}
                
        for dialogue_id, dialogue in enumerate(bob_dialogues):
            for emotion in emotions:
                formatted_dialogue = template_str.format(user_tag=user_tag, emotion=emotion, scenario=dialogue, assistant_tag=assistant_tag)
                bob_data[emotion]['test']['data'].append(formatted_dialogue)
                bob_data_origins[formatted_dialogue] = {"premise id": premise_id, "dialogue id": dialogue_id, "emotion": emotion}
    return alice_data, bob_data, alice_data_origins, bob_data_origins

In [31]:
def get_emotion_rep_readers(emotions, rep_reading_pipeline, data, rep_token, hidden_layers, n_difference, direction_method):
    emotion_rep_readers = {}

    for emotion in tqdm(emotions):
        train_data = data[emotion]['train']
        rep_reader = rep_reading_pipeline.get_directions(
            train_data['data'], 
            rep_token=rep_token, 
            hidden_layers=hidden_layers, 
            n_difference=n_difference, 
            train_labels=train_data['labels'], 
            direction_method=direction_method,
        )
        emotion_rep_readers[emotion] = rep_reader
    return emotion_rep_readers


In [32]:
def run_emotion_H_tests(emotions, emotion_rep_readers, rep_reading_pipeline, data, rep_token, hidden_layers):
    emotion_H_tests = {}
    for emotion in tqdm(emotions):
        test_data = data[emotion]['test']
        rep_reader = emotion_rep_readers[emotion]
        H_tests = rep_reading_pipeline(
            test_data['data'],
            rep_token=rep_token, 
            hidden_layers=hidden_layers, 
            rep_reader=rep_reader,
            batch_size=32)
        emotion_H_tests[emotion] = H_tests
        print("finished H_tests for emotion", emotion)
    return emotion_H_tests

In [33]:
def evaluate_classification_accuracy(emotions, emotion_rep_readers, emotion_H_tests, data, hidden_layers):
    results = {layer: {} for layer in hidden_layers}
    for layer in hidden_layers:
        for emotion in emotions:
            test_data = data[emotion]['test']
            sign = emotion_rep_readers[emotion].direction_signs[layer].item()

            # # Original metric method (pairwise relative)
            # H_test = [H[layer] for H in emotion_H_tests[emotion]] 
            # H_test = [H_test[i:i+2] for i in range(0, len(H_test), 2)]
            # eval_func = min if sign == -1 else max
            # cors = np.mean([eval_func(H) == H[0] for H in H_test])

            # # Modified metric method (Absolute with 0 boundary)
            # H_test = [H[layer] * sign for H in emotion_H_tests[emotion]] 
            # cors = np.mean([(H_test[i] > 0) == (data[emotion]['test']['labels'][0][i] == 1) for i in range(len(H_test))])

            # Modified metric method (Absolute with average boundary)
            H_test = [H[layer] * sign for H in emotion_H_tests[emotion]] 
            avg_H_test = np.mean(H_test)
            cors = np.mean([(H_test[i] > avg_H_test) == (test_data['labels'][0][i] == 1) for i in range(len(H_test))])

            results[layer][emotion] = cors
            
    for emotion in emotions:
        x = list(results.keys())
        y = [results[layer][emotion] for layer in results]

        plt.plot(x, y, label=emotion)

    plt.title("Emotions Acc")
    plt.xlabel("Layer")
    plt.ylabel("Acc")
    plt.legend(loc="best")
    plt.grid(True)
    plt.show()

In [34]:
def get_emotion_scores(emotions, emotion_rep_readers, emotion_H_tests, data, layer):
    emotion_scores = {emotion: [] for emotion in emotions}
    for emotion in emotions:
        test_data = data[emotion]['test']
        sign = emotion_rep_readers[emotion].direction_signs[layer].item()
        H_test = [H[layer] * sign for H in emotion_H_tests[emotion]] 
        avg_H_test = np.mean(H_test)
        for i in range(len(test_data['data'])):
            emotion_scores[emotion].append((test_data['data'][i], H_test[i]))
    return emotion_scores

In [35]:
def write_emotion_scores(generated_playscripts_path, emotion_metrics_path, emotions, alice_emotion_scores, alice_data_origins, bob_emotion_scores, bob_data_origins):
    df = pd.read_csv(generated_playscripts_path)

    # Write Alice's emotion scores
    # Initialize the new columns
    df['alice_emotion_scores'] = [{} for _ in range(len(df))]
    # Populate the df with empty <emotion, []> in each dictionary
    for i in range(len(df)):
        alice_dialogues = eval(str(df.at[i, f'alice_dialogues']))

        alice_dialogues_length = len(alice_dialogues)
        for emotion in emotions:
            df.at[i, f'alice_emotion_scores'][emotion] = [0] * alice_dialogues_length

    for emotion in emotions:
        for formatted_dialogue, emotion_score in alice_emotion_scores[emotion]:
            origin_info = alice_data_origins[formatted_dialogue]
            premise_id = origin_info["premise id"]
            dialogue_id = origin_info["dialogue id"]
            df.at[premise_id, f'alice_emotion_scores'][emotion][dialogue_id] = emotion_score

    # Write Bob's emotion scores
    # Initialize the new columns
    df['bob_emotion_scores'] = [{} for _ in range(len(df))]
    # Populate the df with empty <emotion, []> in each dictionary
    for i in range(len(df)):
        bob_dialogues = eval(str(df.at[i, f'bob_dialogues']))

        bob_dialogues_length = len(bob_dialogues)
        for emotion in emotions:
            df.at[i, f'bob_emotion_scores'][emotion] = [0] * bob_dialogues_length

    for emotion in emotions:
        for formatted_dialogue, emotion_score in bob_emotion_scores[emotion]:
            origin_info = bob_data_origins[formatted_dialogue]
            premise_id = origin_info["premise id"]
            dialogue_id = origin_info["dialogue id"]
            df.at[premise_id, f'bob_emotion_scores'][emotion][dialogue_id] = emotion_score

    df.to_csv(emotion_metrics_path, index=False)


In [36]:
def run_emotion_metrics():
    rep_reading_pipeline = pipeline("rep-reading", model=model, tokenizer=tokenizer)
    rep_token = -1
    hidden_layers = list(range(-1, -model.config.num_hidden_layers, -1))
    n_difference = 1
    direction_method = 'pca'
    emotions = ["happiness", "sadness", "anger", "fear", "disgust", "surprise"]
    user_tag =  "[INST]"
    assistant_tag =  "[/INST]"

    baseline_data_dir = "representation-engineering/data/emotions"
    generated_playscripts_path = "data/generated_playscripts.csv"
    emotion_metrics_path = "data/emotion_metrics.csv"

    baseline_data = primary_emotions_concept_dataset(baseline_data_dir, user_tag=user_tag, assistant_tag=assistant_tag)
    alice_data, bob_data, alice_data_origins, bob_data_origins = get_generated_playscripts(generated_playscripts_path, emotions, user_tag, assistant_tag)
    emotion_rep_readers = get_emotion_rep_readers(emotions, rep_reading_pipeline, baseline_data, rep_token, hidden_layers, n_difference, direction_method)
    alice_emotion_H_tests = run_emotion_H_tests(emotions, emotion_rep_readers, rep_reading_pipeline, alice_data, rep_token, hidden_layers)
    bob_emotion_H_tests = run_emotion_H_tests(emotions, emotion_rep_readers, rep_reading_pipeline, bob_data, rep_token, hidden_layers)
    alice_emotion_scores = get_emotion_scores(emotions, emotion_rep_readers, alice_emotion_H_tests, alice_data, -1)
    bob_emotion_scores = get_emotion_scores(emotions, emotion_rep_readers, bob_emotion_H_tests, bob_data, -1)

    write_emotion_scores(generated_playscripts_path, emotion_metrics_path, "alice", emotions, alice_emotion_scores, alice_data_origins)
    write_emotion_scores(generated_playscripts_path, emotion_metrics_path, "bob", emotions, bob_emotion_scores, bob_data_origins)
    
    # emotion_H_tests = run_emotion_H_tests(emotions, emotion_rep_readers, rep_reading_pipeline, baseline_data, rep_token, hidden_layers)
    # baseline_emotion_scores = get_emotion_scores(emotions, emotion_rep_readers, emotion_H_tests, baseline_data, -1)
    # evaluate_classification_accuracy(emotions, emotion_rep_readers, emotion_H_tests, baseline_data, hidden_layers)

In [37]:
rep_reading_pipeline = pipeline("rep-reading", model=model, tokenizer=tokenizer)
rep_token = -1
hidden_layers = list(range(-1, -model.config.num_hidden_layers, -1))
n_difference = 1
direction_method = 'pca'
emotions = ["happiness", "sadness", "anger", "fear", "disgust", "surprise"]
user_tag =  "[INST]"
assistant_tag =  "[/INST]"

baseline_data_dir = "representation-engineering/data/emotions"
generated_playscripts_path = "data/generated_playscripts_edited.csv"
emotion_metrics_path = "data/emotion_metrics.csv"

baseline_data = primary_emotions_concept_dataset(baseline_data_dir, user_tag=user_tag, assistant_tag=assistant_tag)
alice_data, bob_data, alice_data_origins, bob_data_origins = get_generated_playscripts(generated_playscripts_path, emotions, user_tag, assistant_tag)
emotion_rep_readers = get_emotion_rep_readers(emotions, rep_reading_pipeline, baseline_data, rep_token, hidden_layers, n_difference, direction_method)
alice_emotion_H_tests = run_emotion_H_tests(emotions, emotion_rep_readers, rep_reading_pipeline, alice_data, rep_token, hidden_layers)
bob_emotion_H_tests = run_emotion_H_tests(emotions, emotion_rep_readers, rep_reading_pipeline, bob_data, rep_token, hidden_layers)
alice_emotion_scores = get_emotion_scores(emotions, emotion_rep_readers, alice_emotion_H_tests, alice_data, -1)
bob_emotion_scores = get_emotion_scores(emotions, emotion_rep_readers, bob_emotion_H_tests, bob_data, -1)


100%|██████████| 6/6 [00:51<00:00,  8.60s/it]
 17%|█▋        | 1/6 [00:05<00:25,  5.13s/it]

finished H_tests for emotion happiness


 33%|███▎      | 2/6 [00:09<00:19,  4.92s/it]

finished H_tests for emotion sadness


 50%|█████     | 3/6 [00:14<00:14,  4.67s/it]

finished H_tests for emotion anger


 67%|██████▋   | 4/6 [00:18<00:09,  4.56s/it]

finished H_tests for emotion fear


 83%|████████▎ | 5/6 [00:23<00:04,  4.56s/it]

finished H_tests for emotion disgust


100%|██████████| 6/6 [00:27<00:00,  4.60s/it]


finished H_tests for emotion surprise


 17%|█▋        | 1/6 [00:04<00:21,  4.36s/it]

finished H_tests for emotion happiness


 33%|███▎      | 2/6 [00:08<00:17,  4.40s/it]

finished H_tests for emotion sadness


 50%|█████     | 3/6 [00:13<00:13,  4.38s/it]

finished H_tests for emotion anger


 67%|██████▋   | 4/6 [00:17<00:08,  4.37s/it]

finished H_tests for emotion fear


 83%|████████▎ | 5/6 [00:22<00:04,  4.56s/it]

finished H_tests for emotion disgust


100%|██████████| 6/6 [00:26<00:00,  4.46s/it]

finished H_tests for emotion surprise





In [38]:
write_emotion_scores(generated_playscripts_path, emotion_metrics_path, emotions, alice_emotion_scores, alice_data_origins, bob_emotion_scores, bob_data_origins)