In [1]:
from transformers import AutoTokenizer, AutoModelForCausalLM
from openai import OpenAI
from dotenv import load_dotenv
import os
import random
from storysim import StorySimulator
import pandas as pd
from together import Together

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
load_dotenv()

True

In [3]:
os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_KEY')
os.environ['TOGETHER_API_KEY'] = os.getenv('TOGETHER_KEY')


In [4]:
# This example is the new way to use the OpenAI lib for python

def prompt_model(prompt, model):
    if 'gpt' in model:
        client = OpenAI()
    else:
        client = Together()
    response = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "user", "content": prompt}
        ],
    )
    
    return response.choices[0].message.content

In [5]:
# graph = {
#         "hole_1": ["hole_2", "field"],
#         "hole_2": ["hole_1", "hole_3"],
#         "hole_3": ["hole_2", "hole_4"],
#         "hole_4": ["hole_3", "field"],
#         "field": ["hole_1", "hole_4"]
#     }

# events = {
        
#         7:{"name": "cross_paths","actors": ["Alice", "Bob"], "location": "hole_2"}, 
#         8: {"name": "exclusive_random", "actors":["Alice", "Bob"], "stop": 10 },
#         13: {"name": "mislead", "actors":["Alice", "Bob"]}
#     }


# sim = StorySimulator(
#         people=["Alice", "Bob", "Charlie", "Danny"],
#         locations=["hole_1", "hole_2", "hole_3", "hole_4", "field"],
#         relation="jumps_in",
#         trial_seed=10,
#         params={'prompt': '3', 'type': 'cot'},
#         graph=graph,
#         events=events
#     )

### Experiment Setup

In [31]:
def mislead_experiment(actors, locs, g, mislead, length):
    poi = random.sample(actors, 2)
    loc = random.sample(locs, 1)
    second_loc = random.choice(g[loc[0]])
    third_loc = random.choice([l for l in g[loc[0]] if l not in loc])
    event_dict = {}
    event_dict[10] = {"name": "cross_paths","actors": poi, "location": loc, "path_type": "same"}
    event_dict[11] = {"name":"move", "actor":poi[-1], "location": second_loc}
    event_dict[12] = {"name": "exclusive_random", "actors": poi, "stop": 12 + mislead}
    event_dict[12 + mislead] = {"name":"move", "actor":poi[-1],"location":third_loc}
    event_dict[12 + mislead+1] = {"name": "exclusive_random", "actors": poi, "stop": length}
    experiment_info = {'cross path location': loc[0], 'poi':poi, 'last':third_loc}
    return event_dict, second_loc, experiment_info

def spaced_mislead_experiment(actors, locs, g, mislead, length):
    event_dict = {}
    poi = random.sample(actors, 2)
    loc = random.sample(locs, 1)
    label = random.choice([l for l in g[loc[-1]] if l != loc[-1]])
    event_dict[15] = {"name": "cross_paths","actors": poi, "location": loc}
    event_dict[16] = {"name": "exclusive_random", "actors": poi, "stop": 17}
    event_dict[17] = {"name":"move", "actor":poi[-1],"location":label}
    event_dict[18] = {"name": "exclusive_random", "actors": poi, "stop": 18+mislead}
    event_dict[18+mislead] = {"name": "mislead", "actors": poi}
    event_dict[18 + mislead +1] = {"name": "exclusive_random", "actors": poi, "stop": length}
    experiment_info = {'cross path location': loc[0], 'poi':poi}
    return event_dict, label, experiment_info

def number_of_moves_experiment(actors, locs, g, length):
    poi = random.sample(actors, 2)
    loc = random.sample(locs,1)
    num_moves = 0
    event_dict = {}
    event_dict[10] = {"name": "cross_paths","actors": poi, "location": loc, "path_type":"same"}
    prev = loc[0]
    movement = []
    for i in range(1,num_moves+1):
        new_loc = random.choice([l for l in g[prev] if l not in loc])
        movement.append(new_loc)
        event_dict[10+i] = {"name":"move", "actor":poi[-1], "location": new_loc}
        prev = new_loc
    #event_dict[10+num_moves+1] = {"name": "mislead", "actors": poi}
    label =  movement[0]
    event_dict[10+num_moves+1] = {"name": "exclusive_random", "actors": poi, "stop": length}
    experiment_info = {'cross path location': loc[0], 'poi':poi}
    return event_dict, label, experiment_info

def second_order_tom_experiment(actors, locs, g, length):
    poi = random.sample(actors, 3)
    loc_1 = random.sample(locs,1)
    loc_2 = random.sample(g[loc_1[0]], 1)
    loc_3 = random.sample([l for l in g[loc_2[0]] if l != loc_1[0]])
    event_dict = {}
    event_dict[10] = {"name": "cross_paths","actors": poi, "location": loc_1, "path_type":"same"}
    event_dict[16] = {"name": "cross_paths","actors": poi[1:], "location": loc_1, "path_type":"same"}
    event_dict[17] = {"name":"move", "actor":poi[-1],"location":loc_3}
    event_dict[18] = {"name": "exclusive_random", "actors": poi, "stop": length}
    

def cross_path_overlap(actors, locs, g, mislead, length, n):
    poi = random.sample(actors, n)
    loc = random.sample(locs, 1)
    second_loc = random.choice(g[loc[0]])
    event_dict = {}
    event_dict[15] = {"name": "cross_paths","actors": poi, "location": loc, "path_type": "same"}
    event_dict[16] = {"name":"move", "actor":poi[-1], "location": second_loc}
    event_dict[17] = {"name": "exclusive_random", "actors": poi, "stop": 17 + mislead}
    event_dict[17 + mislead] = {"name": "mislead", "actors": poi}
    event_dict[17 + mislead+1] = {"name": "exclusive_random", "actors": poi, "stop": length}
    experiment_info = {'cross path location': loc[0], 'poi':poi}
    return event_dict, second_loc, experiment_info

In [7]:
def find_k_unique_paths(graph, start, end, k):
    def dfs(node, path, visited, paths):
        if len(paths) >= k:  # Stop early if we found k paths
            return
        
        if node == end:  # If reached destination, store the path
            paths.append(list(path))
            return

        for neighbor in graph.get(node, []):  # Explore neighbors
            if neighbor not in visited:
                visited.add(neighbor)
                path.append(neighbor)

                dfs(neighbor, path, visited, paths)

                # Backtrack
                visited.remove(neighbor)
                path.pop()

    paths = []
    dfs(start, [start], {start}, paths)  # Start DFS
    return [len(p)-1 for p in paths[:k]],[p[1:] for p in paths[:k]]  # Return up to k paths

### Actual Experiment setup

In [None]:

df = pd.DataFrame({'Story':[], 'Label':[], 'P1':[], 'P2':[]})
possible_people = ["Alice", "Bob", "Charlie", "Danny", "Edward", "Frank", "Georgia", "Hank", "Isaac", "Jake", "Kevin"]
num_people = 7

graph = { 
    "room_1": ["room_2", "the_hallway","room_5"],
    "room_2": ["room_1", "room_3","the_hallway"],
    "room_3": ["room_2", "room_4","the_hallway"],
    "room_4": ["room_3", "room_5","room_1"],
    "room_5": ["room_4", "room_1","room_2"],
    "the_hallway": ["room_1", "room_4","room_2"]
}


locations = list(graph.keys())
story_length = 100
num_trials = 100
mislead_distance = 3

random.seed(25)

for _ in range(num_trials):

    
    event_dict, label, experiment_dict = mislead_experiment(possible_people[:num_people], locations[:-1], graph, mislead_distance, story_length)

    sim = StorySimulator(
        people=possible_people[:num_people],
        locations=locations,
        relation="enters",
        params={'prompt': '3', 'type': 'cot'},
        graph=graph,
        events=event_dict,
        actions={}
    )

    res = sim.run_simulation(story_length)

    story = sim.formal_to_story(res)
    d = {'Story':[], 'Label':[], 'P1':[], 'P2':[]}
    d['P1'] = ",".join(experiment_dict['poi'][:-1]) if len(experiment_dict['poi'][:-1]) > 1 else experiment_dict['poi'][:-1]
    #print(d["P1"])
    d['P2'] = experiment_dict['poi'][-1]
    d['Story'].append(story)
    # d['Label'].append(movement[0])
    d['Label'].append(label)
    df = pd.concat([df, pd.DataFrame(d)])

In [9]:
df.head()

Unnamed: 0,Story,Label,P1,P2
0,Georgia enters room_2. Charlie enters room_2. ...,room_3,Danny,Alice
0,Edward enters room_4. Georgia enters room_1. C...,room_5,Alice,Bob
0,Bob enters room_4. Charlie enters room_2. Alic...,room_3,Danny,Frank
0,Georgia enters room_1. Edward enters room_2. E...,room_2,Alice,Bob
0,Danny enters room_4. Edward enters room_2. Fra...,room_3,Danny,Alice


In [30]:
for i ,row in df.iterrows():
    print("\n".join(row['Story'].split(".")))
    print('---')

Georgia enters room_2
 Charlie enters room_2
 Georgia enters room_3
 Charlie enters the_hallway
 Edward enters room_2
 Frank enters room_2
 Bob enters room_2
 Bob enters the_hallway
 Danny enters room_2
 Georgia enters room_4
 Alice enters room_2
 Alice enters room_3
 Frank enters room_3
 Charlie enters room_2
 Charlie enters the_hallway
 Alice enters the_hallway
 Frank enters room_2
 Bob enters room_4
 Georgia enters room_3
 Georgia enters the_hallway
 Charlie enters room_2
 Bob enters room_1
 Charlie enters room_1
 Georgia enters room_1
 Frank enters the_hallway
 Frank enters room_4
 Frank enters room_1
 Georgia enters room_5
 Frank enters the_hallway
 Bob enters the_hallway
 Edward enters room_1
 Bob enters room_4
 Frank enters room_2
 Frank enters room_1
 Frank enters room_2
 Frank enters the_hallway
 Georgia enters room_1
 Bob enters room_3
 Bob enters room_2
 Charlie enters room_5
 Charlie enters room_2
 Charlie enters room_1
 Charlie enters the_hallway
 Charlie enters room_1
 Ge

In [11]:
intial_prompt = f"Read the following story and answer the question at the end. Note that all characters start in {sim.locations[-1].replace('_',' ')}. Characters in the same location can see where eachother go when someone leaves. If characters are in different locations, they cannot see eachother."
tom_responses, wm_responses = [], []
fewshot = False
model_choice = "gpt-4"
tom_total, wm_total = 0, 0
for _ ,row in df.iterrows():
    p = row['P1'].split(',')
    formatted = []

    if fewshot:
        #formatted = [f'{ex[0]}\nQ: Where does {ex[1][0]} think {ex[1][1]} thinks {ex[1][2]} is?\nA: {ex[2]}' for ex in fewshot_examples]
        formatted = [f'{ex[0]}\nQ: Where does {ex[1][0]} think {ex[1][1]} is?\nA: {ex[2]}' for ex in fewshot_examples]
        formatted = '\n'.join(formatted)
        prompt = f"{intial_prompt}\n{formatted}\n{row['Story']}\nQ: Where does {p[0]} think {row['P2']} is?\nA:"
        #prompt = f"{intial_prompt}\n{formatted}\n{row['Story']}\nQ: Who has seen more drawings, {p[0]} or {row['P2']}?\nA:"
        print(prompt)
        print('*****')
        #answer = prompt_gpt(prompt, model_choice)
    else:
        prompt_prefix = f"{intial_prompt}\n{row['Story']}.\n"
        prompt = f'{prompt_prefix}Q:Where does {p[0]} think {row["P2"]} is by the end of the story?\nA:'
        #print("=====")
        #print(prompt)
        answer = prompt_model(prompt, model_choice)
    tom_responses.append(answer)
df['TOM Responses'] = tom_responses

In [12]:
def compute_score_unsure(label, response):
    #label = label.split(',')[0][2:-1]
    base = f'{label.split("_")[0]}_' if not label.startswith('hallway') else label
    response = response.split("\n")[-1]
    if response.lower().count(base.lower()) <= 1:
        return str(label.lower() in response.lower() or label.replace('_'," ").lower() in response.lower())
    elif 'Therefore,' in response:
        return str(label in response.split('Therefore,')[-1] or label in response.split('Therefore,')[-1])
    return f'{response}, {label}'

def compute_score_unsure(label, response):
    #label = label.split(',')[0][2:-1]
    base = f'{label.split("_")[0]}_' if not label.startswith('hallway') else label
    response = response.split("\n")[-1]
    response = response.lower().replace("_",' ')
    if response.count(base) <= 1:
        return str(label in response or label.replace('_'," ") in response or 
                   label.replace('_',"") in response or
                   ("hallway" in label and "hallway" in response) or
                   label.replace(" ",'') in response)
    elif 'Therefore,' in response:
        response = response.split('Therefore,')[-1]
        return str(label in response or label.replace('_'," ") in response or label.replace('_',"") in response or ("hallway" in label and "hallway" in response))
    return f'{response}, {label}'

In [22]:
outs= df.apply(lambda x: compute_score_unsure(x['Label'], x['TOM Responses']), axis=1)
known = [k for k in outs if k == 'True' or k == 'False']
unknown = [k for k in outs if k != 'True' and k != 'False']
print(sum([1 for k in known if k == 'True']))
print(len(unknown))
for i in range(len(outs)):
    if outs.iloc[i] != 'True' and outs.iloc[i] != 'False':
        clean_out = outs.iloc[i].split('<think>')[-1]
        print(f'{clean_out}')
        print(f'Index {i}')
        print("\n-////==============////-\n")

31
0
