In [1]:
import json
import datetime
from pathlib import Path
from getpass import getpass

import pandas as pd
import google.generativeai as genai
from sklearn.utils import shuffle

from llm_utils import *

  from .autonotebook import tqdm as notebook_tqdm


# Authenticate with the APIs

In [2]:
genai.configure(api_key=getpass("Enter API key: "), transport='rest')

# Role message

In [3]:
role_message = """You're an artificial intelligence which reacts to vignettes on psychological scales.
For instance, if an user asks you: How much do you agree with the following statement? '1. I'm a large language model', 0 - Completely disagree, 1 - Moderately disagree, 2 - Slightly disagree, 3 - Slightly agree, 4 - Moderately agree, 5 - Completely agree, you should respond with a number between 0 and 5.
Your response SHOULD NOT contain the number's accompanying text. So, if you select '5', you should just return '1. 5', instead of '1. 5 - Completely agree'.
DON'T explain your reasoning. I'm ONLY interested in your response to the scale.
Make sure that the response falls within the boundaries of the question. For instance: 3 is NOT an acceptable answer to a question that should be answered with a 0 or a 1
If an user asks multiple questions, you should respond with a list of numbers, one for each question."""

# Models to use

Loading mfq info

In [4]:
with open("./stimuli/mfq.json") as f:
    mfq = json.load(f)

Loading MFV prompts

In [5]:
# read the stimuli from mfv.json
with open("./stimuli/mfv.json") as f:
    mfv_info = json.load(f)

Loading vignettes

In [6]:
original_vignettes = pd.read_html(mfv_info["vignettes_html"])[0]
original_vignettes["mfv_code"] = mfv_info["validated_codes"]

# loading vignittes used in portuguese
mfv_pt = pd.read_excel("stimuli/mfvignettes_pt.xlsx", sheet_name=0)
double_validated_codes = mfv_pt["MFV Code"].to_list()
foundations = mfv_pt["Foundation"].values

In [7]:
def generate_mfq_stimuli(randomize=False):
    global mfq
    if randomize is False:
        return {
            "p1": mfq["mfq_part1"],
            "p2": mfq["mfq_part2"],
            "items1": mfq["part1_item_key"],
            "items2": mfq["part2_item_key"],
        }
    else:
        # randomize the order of the items
        p1, p1items = shuffle(mfq["part1_questions"], mfq["part1_item_key"])
        p2, p2items = shuffle(mfq["part2_questions"], mfq["part2_item_key"])
        # generate string with p1 with number at beginning of each line
        p1 = "\n".join([f"{i+1}. {q}" for i, q in enumerate(p1)])
        p2 = "\n".join([f"{i+1+len(p1items)}. {q}" for i, q in enumerate(p2)])
        return {"p1": mfq["part1_header"] + p1, "p2": mfq["part2_header"] + p2, "items1": p1items, "items2": p2items}
    
def generate_mfv_stimuli(randomize=False):
    global mfv_info
    pre_mfv = mfv_info["pre_mfv"]
    if randomize is False:
        numbered_vignettes = "\n".join(
            (
                (original_vignettes.reset_index().index + 1).astype(str)
                + ". "
                + original_vignettes["Scenario"]
            ).to_list()
        )
        mfv_prompt = pre_mfv + "\n\n" + numbered_vignettes
        codes = original_vignettes["mfv_code"].to_list()
    else:
        df = original_vignettes.sample(frac=1).copy()
        numbered_vignettes = "\n".join(
            (
                (df.reset_index().index + 1).astype(str)
                + ". "
                + df["Scenario"]
            ).to_list()
        )
        mfv_prompt = pre_mfv + "\n\n" + numbered_vignettes
        codes = df["mfv_code"].to_list()
    return {
        "prompt": mfv_prompt,
        "mfv_codes": codes,
    }

# Generation - MfVs

In [8]:
original_vignettes = pd.read_html(mfv_info["vignettes_html"])[0]
original_vignettes["mfv_code"] = mfv_info["validated_codes"]

In [9]:
# loading vignittes used in portuguese
mfv_pt = pd.read_excel("stimuli/mfvignettes_pt.xlsx", sheet_name=0)
double_validated_codes = mfv_pt["MFV Code"].to_list()
foundations = mfv_pt["Foundation"].values

Maintaining only validated in brazilian replication

In [10]:
# maintaining only validated in brazilian replication
original_vignettes.query("mfv_code in @double_validated_codes", inplace=True)

In [11]:
def run_mfq(model, chat_history):
    mfq_info = generate_mfq_stimuli(randomize=True)
    mfq_part1 = mfq_info["p1"]
    mfq_part2 = mfq_info["p2"]
    chat_history.append({"role": "user", "content": mfq_part1})
    part1_answer = model(chat_history)

    chat_history.extend(
        [
            {"role": "assistant", "content": part1_answer},
            {"role": "user", "content": mfq_part2},
        ]
    )
    # if part1 is not none
    if part1_answer:
        part2_answer = model(chat_history)
        chat_history.append({"role": "assistant", "content": part2_answer})
    else:
        part2_answer = None
    return {
        "part1": part1_answer,
        "part2": part2_answer,
        "chat_history": chat_history,
        "part1_order": mfq_info["items1"],
        "part2_order": mfq_info["items2"],
    }


def run_mfv(model, chat_history):
    mfv_info = generate_mfv_stimuli(randomize=True)
    mfv_prompt = mfv_info["prompt"]
    chat_history.append({"role": "user", "content": mfv_prompt})
    mfv_answer = model(chat_history)

    chat_history.append({"role": "assistant", "content": mfv_answer})
    return {
        "mfv": mfv_answer,
        "chat_history": chat_history,
        "mfv_codes": mfv_info["mfv_codes"],
    }


def generate_mfv_condition(model, agent_name, id, condition):
    base_chat = [
        {"role": "system", "content": role_message},
    ]

    chat_log = base_chat.copy()
    if condition == "qv":
        # call mfq
        mfq_answer = run_mfq(model, chat_log)

        # call mfv
        mfv_answer = run_mfv(model, chat_log)
    elif condition == "vq":
        # call mfv
        mfv_answer = run_mfv(model, chat_log)

        # call mfq
        mfq_answer = run_mfq(model, chat_log)
    else:
        raise ValueError("Condition must be 'qv' or 'vq'")

    # append to responses
    return {
        "agent": agent_name,
        "id": id,
        "condition": condition,
        "mfq": mfq_answer,
        "mfv": mfv_answer,
    }

In [12]:
def run_experiment(n, models):
    today_str = datetime.datetime.now().strftime("%Y-%m-%d")
    general_results = list()
    for agent, model in models.items():
        results = list()
        for i in range(n):
            for condition in ["qv", "vq"]:
                results.append(generate_mfv_condition(model, agent, i, condition))
        fl = Path(f"raw_data/{today_str}_{agent}.json")
        fl.parent.mkdir(parents=True, exist_ok=True)
        # check if file exists
        if fl.exists():
            with open(fl) as f:
                old_results = json.load(f)
            old_results.extend(results)
            with open(fl, "w") as f:
                json.dump(old_results, f)
        else:
            with open(fl, "w") as f:
                json.dump(results, f)
        general_results.extend(results)
            
    return general_results

In [13]:
models = {
    "Gemini Pro": call_gemini,
    # "Llama 2 Chat 70b": call_llama2, # removed llama due to unreliable results
    "Claude 2.1": call_claude2_1,
    "GPT-4": call_gpt4,
}

In [14]:
r = list()

In [15]:
n = 59
run_size = 1
its = n//run_size
for i in range(its):
    try:
        r.extend(run_experiment(run_size, models))
    except Exception as e:
        print(f"Error on iteration {i} of {its}")
        print(e)
        # log error
        with open("error_log_1.txt", "a+") as f:
            f.write(f"Error on iteration {i} of {its}. {e}")
            f.write("\n")
        continue
        


In [None]:
# # remove chat history from r before saving
# for d in r:
#     del d["mfq"]["chat_history"]
#     del d["mfv"]["chat_history"]

# with open("data/mfv_test.json", "w") as f:
#     json.dump(r, f, indent=4, ensure_ascii=False)