In [31]:
import numpy as np
import pandas as pd
import pickle
import math

In [32]:
participants = pd.read_csv('../data_clean/2023-01-28_participants_clean_an-exp1_n-91.csv', sep = ",")
transitions = pd.read_csv("../data_clean/2023-01-28_transitions_clean_an-exp1_n-91.csv", sep = ",")
trials_learning = pd.read_csv('../data_clean/2023-01-28_learning_clean_an-exp1_n-91.csv', sep = ",")
trials_prediction = pd.read_csv('../data_clean/2023-01-28_test_prediction_clean_an-exp1_n-91.csv', sep = ",")
trials_control = pd.read_csv('../data_clean/2023-01-28_test_control_clean_an-exp1_n-91.csv', sep = ",")
trials_explanation = pd.read_csv('../data_clean/2023-01-28_test_explanation_clean_an-exp1_n-91.csv', sep = ",")

In [33]:
# Define functions for hidden cases

def predict_normative(start_state, first_input, second_input, fsm_temp):
    """Return an array with probabilities of each final state"""
    states = [0, 1, 2, 3]
    return np.array(
        [np.array([fsm_temp[start_state][first_input][middle_state] * fsm_temp[middle_state][second_input][final_state]
                   for middle_state in states]).sum()
         for final_state in states],
        dtype=[(str(start_state) + "—" + first_input + "–[All paths]–" + second_input + "–?", "float16")])

def predict_an(start_state, first_input, second_input, fsm_temp):
    states = [0, 1, 2, 3]
    max_middle_state = np.array([fsm_temp[start_state][first_input]]).argmax()
    ans = np.array([np.array([fsm_temp[middle_state][second_input][final_state]
                              for middle_state in [max_middle_state]]).sum()
                    for final_state in states])
    return np.array(ans / ans.sum(),
                    dtype=[(str(start_state) + "—" + first_input + "–[ML path]–" + second_input + "–?", "float16")])

def control_normative(start_state, final_state, fsm_temp):
    """ Calculate probabilities of each possible combination of inputs for every possible middle state and return a dictionary"""
    states = [0, 1, 2, 3]
    input_combinations = [["a", "a"], ["a", "b"], ["b", "a"], ["b", "b"]]
    control_all_middle_states = {}

    for inp in input_combinations:
        label = inp[0] + inp[1]
        control_all_middle_states[label] = np.array([fsm_temp[start_state][inp[0]][middle_state] *
                                                     fsm_temp[middle_state][inp[1]][final_state] for middle_state in states]).sum()
    return control_all_middle_states

#control_normative(1, 0, fsm)["ba"]

def control_an(start_state, final_state, fsm_temp):
    """ Calculate probabilities of each possible combination of inputs for every possible middle state and return a dictionary"""
    input_combinations = [["a", "a"], ["a", "b"], ["b", "a"], ["b", "b"]]
    control_ml_middle_state = {}
    for inp in input_combinations:
        label = inp[0] + inp[1]
        max_middle_state = np.array([fsm_temp[start_state][inp[0]]]).argmax()
        control_ml_middle_state[label] = np.array([fsm_temp[middle_state][inp[1]][final_state] for middle_state in [max_middle_state]]).sum()
    return control_ml_middle_state

# Explanation hidden
def explain_normative(start_state, first_input, second_input, final_state, fsm_temp):
    ans = {}
    states = [0, 1, 2, 3]

    # first input counterfactual
    first_input_cf = "b" if first_input == "a" else "a" #counteractual input 1
    explanation_1 = np.array([fsm_temp[start_state][first_input_cf][middle_state] * fsm_temp[middle_state][second_input][final_state]
                                                  for middle_state in states]).sum()
    ans["1"] = explanation_1

    # second input counterfactual
    second_input_cf = "b" if second_input == "a" else "a" #counteractual input 2
    explanation_2 = np.array([fsm_temp[start_state][first_input][middle_state] * fsm_temp[middle_state][second_input_cf][final_state]
                                                  for middle_state in states]).sum()
    ans["2"] = explanation_2

    return ans

def explain_an(start_state, first_input, second_input, final_state, fsm_temp):
    ans = {}

    # first input counterfactual
    first_input_cf = "b" if first_input == "a" else "a" # counteractual input 1
    max_middle_state1 = np.array([fsm_temp[start_state][first_input_cf]]).argmax()
    explanation_1 = np.array([fsm_temp[middle_state][second_input][final_state] for middle_state in [max_middle_state1]]).sum()
    ans["1"] = explanation_1

    # second input counterfactual
    second_input_cf = "b" if second_input == "a" else "a" # counteractual input 2
    max_middle_state2 = np.array([fsm_temp[start_state][first_input]]).argmax()
    explanation_2 = np.array([fsm_temp[middle_state][second_input_cf][final_state] for middle_state in [max_middle_state2]]).sum()
    ans["2"] = explanation_2

    return ans

In [34]:
def compute_for_trials(trials, task, form, epsilon, beta):
    import math
    log_p_answers = []
    for index in range(0, trials.shape[0]):
        option_1_p_mm = trials.iloc[[index]].option_1_p_mm.item()
        option_2_p_mm = trials.iloc[[index]].option_2_p_mm.item()
        response = trials.iloc[[index]].response.item()

        if task == "prediction":
            option_1 = int(trials.iloc[[index]].option_1.item())
            option_2 = int(trials.iloc[[index]].option_2.item())
        if task == "explanation":
            option_1 = 1
            option_2 = 2
        if task == "control":
            option_1 = trials.iloc[[index]].option_1.item() if form == "hidden" else "a"
            option_2 = trials.iloc[[index]].option_2.item() if form == "hidden" else "b"

        # R_i
        if option_1_p_mm > option_2_p_mm:
            R_i = math.log10((option_1_p_mm + epsilon)/(option_2_p_mm + epsilon))
        elif option_1_p_mm < option_2_p_mm:
            R_i = math.log10((option_2_p_mm + epsilon)/(option_1_p_mm + epsilon))
        else:
            R_i = 1

        p_correct = (math.e ** (R_i * beta)) / (math.e ** (R_i * beta) + math.e ** (-1 * R_i * beta))

        if task == "explanation":
            correct_mm = option_1 if option_1_p_mm < option_2_p_mm else option_2
        else: correct_mm = option_1 if option_1_p_mm > option_2_p_mm else option_2

        p_answer = p_correct if response == correct_mm else (1 - p_correct)
        log_p_answer = math.log10(p_answer)
        log_p_answers.append(log_p_answer)
    return log_p_answers

def compute_for_trials_an(trials, task, form, epsilon, beta):
    import math
    log_p_answers = []
    for index in range(0, trials.shape[0]):
        option_1_p_mm = trials.iloc[[index]].option_1_p_mm_an.item()
        option_2_p_mm = trials.iloc[[index]].option_2_p_mm_an.item()
        response = trials.iloc[[index]].response.item()

        if task == "prediction":
            option_1 = int(trials.iloc[[index]].option_1.item())
            option_2 = int(trials.iloc[[index]].option_2.item())
        if task == "explanation":
            option_1 = 1
            option_2 = 2
        if task == "control":
            option_1 = trials.iloc[[index]].option_1.item() if form == "hidden" else "a"
            option_2 = trials.iloc[[index]].option_2.item() if form == "hidden" else "b"

        # R_i
        if option_1_p_mm > option_2_p_mm:
            R_i = math.log10((option_1_p_mm + epsilon)/(option_2_p_mm + epsilon))
        elif option_1_p_mm < option_2_p_mm:
            R_i = math.log10((option_2_p_mm + epsilon)/(option_1_p_mm + epsilon))
        else:
            R_i = 1

        p_correct = (math.e ** (R_i * beta)) / (math.e ** (R_i * beta) + math.e ** (-1 * R_i * beta))

        if task == "explanation":
            correct_mm = option_1 if option_1_p_mm < option_2_p_mm else option_2
        else: correct_mm = option_1 if option_1_p_mm > option_2_p_mm else option_2

        p_answer = p_correct if response == correct_mm else (1 - p_correct)
        log_p_answer = math.log10(p_answer)
        log_p_answers.append(log_p_answer)
    return log_p_answers

In [35]:
# Add participants' data in a dictionary + compute MM probabilities

sample = {} # created empt dict for all data
eps = 0.01

for participant_id in trials_learning.participant_id.unique():
    # Import data for one subject
    ## get data for a single participant
    mm = transitions[transitions['participant_id'] == participant_id] # get metal model as a DataFrame
    fsm = participants.study_fsm[participants.participant_id == participant_id].item()
    learning = trials_learning[trials_learning.participant_id == participant_id]

    # Get Visible and Hidden trials for P, E, and C
    if trials_prediction[trials_prediction.participant_id == participant_id].size > 0:
        trials_vis = trials_prediction[(trials_prediction.participant_id == participant_id) &
                                    (trials_prediction.trial_type == "visible")]
        trials_hid = trials_prediction[(trials_prediction.participant_id == participant_id) &
                                   (trials_prediction.trial_type == "hidden")]
        task = "prediction"

    if trials_control[trials_control.participant_id == participant_id].size > 0:
        trials_vis = trials_control[(trials_control.participant_id == participant_id) &
                                    (trials_control.trial_type == "visible")]
        trials_hid = trials_control[(trials_control.participant_id == participant_id) &
                                   (trials_control.trial_type == "hidden")]
        task = "control"

    if trials_explanation[trials_explanation.participant_id == participant_id].size > 0:
        trials_vis = trials_explanation[(trials_explanation.participant_id == participant_id) &
                                 (trials_explanation.trial_type == "visible")]
        trials_hid = trials_explanation[(trials_explanation.participant_id == participant_id) &
                                (trials_explanation.trial_type == "hidden")]
        task = "explanation"

    ## select only informative columns
    trials_vis = trials_vis.iloc[:,4:]
    trials_hid = trials_hid.iloc[:,4:]

    # reformat mental model as dictionary
    MM = {}
    for state_current in mm.state_current.unique():
        inputs = {}
        for inp in ["a", "b"]:
            inputs[inp] = np.array([mm.state_next_p[(mm.state_current == state_current) & (mm.response_current == inp) & (mm.state_next == 0)].item(),
                             mm.state_next_p[(mm.state_current == state_current) & (mm.response_current == inp) & (mm.state_next == 1)].item(),
                             mm.state_next_p[(mm.state_current == state_current) & (mm.response_current == inp) & (mm.state_next == 2)].item(),
                             mm.state_next_p[(mm.state_current == state_current) & (mm.response_current == inp) & (mm.state_next == 3)].item()])
        MM[state_current] = {"a":inputs["a"], "b": inputs["b"]}


    # Use MM to calculate correct answers
    if task == "prediction":
        # Visible
        for index in range(0, trials_vis.shape[0]): # shape[0] = number of rows
            state_2 = int(trials_vis.iloc[[index]].state_2.item())
            response_2 = trials_vis.iloc[[index]].response_2.item()
            option_1 = int(trials_vis.iloc[[index]].option_1.item())
            option_2 = int(trials_vis.iloc[[index]].option_2.item())
            trials_vis.loc[trials_vis.index[index], 'option_1_p_mm'] = MM[state_2][response_2][option_1]
            trials_vis.loc[trials_vis.index[index], 'option_2_p_mm'] = MM[state_2][response_2][option_2]

        # Hidden
        for index in range(0, trials_hid.shape[0]):
            state_1 = int(trials_hid.iloc[[index]].state_1.item())
            response_1 = trials_hid.iloc[[index]].response_1.item()
            response_2 = trials_hid.iloc[[index]].response_2.item()
            option_1 = int(trials_hid.iloc[[index]].option_1.item())
            option_2 = int(trials_hid.iloc[[index]].option_2.item())

            trials_hid.loc[trials_hid.index[index], 'option_1_p_mm'] = predict_normative(state_1, response_1, response_2, MM)[option_1][0]
            trials_hid.loc[trials_hid.index[index], 'option_2_p_mm'] = predict_normative(state_1, response_1, response_2, MM)[option_2][0]

            trials_hid.loc[trials_hid.index[index], 'option_1_p_mm_an'] = predict_an(state_1, response_1, response_2, MM)[option_1][0]
            trials_hid.loc[trials_hid.index[index], 'option_2_p_mm_an'] = predict_an(state_1, response_1, response_2, MM)[option_2][0]

    if task == "control":
        # Visible
        for index in range(0, trials_vis.shape[0]): # shape[0] = number of rows
            state_2 = int(trials_vis.iloc[[index]].state_2.item())
            state_3 = int(trials_vis.iloc[[index]].state_3.item())
            option_1 = "a"
            option_2 = "b"
            trials_vis.loc[trials_vis.index[index], 'option_1_p_mm'] = MM[state_2][option_1][state_3]
            trials_vis.loc[trials_vis.index[index], 'option_2_p_mm'] = MM[state_2][option_2][state_3]

        # Hidden
        for index in range(0, trials_hid.shape[0]):
            state_1 = int(trials_hid.iloc[[index]].state_1.item())
            state_3 = int(trials_hid.iloc[[index]].state_3.item())
            option_1 = trials_hid.iloc[[index]].option_1.item()
            option_2 = trials_hid.iloc[[index]].option_2.item()

            trials_hid.loc[trials_hid.index[index], 'option_1_p_mm'] = control_normative(state_1, state_3, MM)[option_1]
            trials_hid.loc[trials_hid.index[index], 'option_2_p_mm'] = control_normative(state_1, state_3, MM)[option_2]

            trials_hid.loc[trials_hid.index[index], 'option_1_p_mm_an'] = control_an(state_1, state_3, MM)[option_1]
            trials_hid.loc[trials_hid.index[index], 'option_2_p_mm_an'] = control_an(state_1, state_3, MM)[option_2]

    if task == "explanation":
        # Visible
        for index in range(0, trials_vis.shape[0]): # shape[0] = number of rows
            state_1 = int(trials_vis.iloc[[index]].state_1.item())
            response_1 = trials_vis.iloc[[index]].response_1.item()
            state_2 = int(trials_vis.iloc[[index]].state_2.item())
            response_2 = trials_vis.iloc[[index]].response_2.item()
            state_3 = int(trials_vis.iloc[[index]].state_3.item())
            response_1_cf = "a" if response_1 == "b" else "b"
            response_2_cf = "a" if response_2 == "b" else "b"
            trials_vis.loc[trials_vis.index[index], 'option_1_p_mm'] = MM[state_1][response_1_cf][state_2] * MM[state_2][response_2][state_3]
            trials_vis.loc[trials_vis.index[index], 'option_2_p_mm'] = MM[state_1][response_1][state_2] * MM[state_2][response_2_cf][state_3]

        # Hidden
        for index in range(0, trials_hid.shape[0]): # shape[0] = number of rows
            state_1 = int(trials_hid.iloc[[index]].state_1.item())
            response_1 = trials_hid.iloc[[index]].response_1.item()
            response_2 = trials_hid.iloc[[index]].response_2.item()
            state_3 = int(trials_hid.iloc[[index]].state_3.item())
            response_1_cf = "a" if response_1 == "b" else "b"
            response_2_cf = "a" if response_2 == "b" else "b"
            trials_hid.loc[trials_hid.index[index], 'option_1_p_mm'] = explain_normative(state_1, response_1, response_2, state_3, MM)["1"]
            trials_hid.loc[trials_hid.index[index], 'option_2_p_mm'] = explain_normative(state_1, response_1, response_2, state_3, MM)["2"]


            trials_hid.loc[trials_hid.index[index], 'option_1_p_mm_an'] = explain_an(state_1, response_1, response_2, state_3, MM)["1"]
            trials_hid.loc[trials_hid.index[index], 'option_2_p_mm_an'] = explain_an(state_1, response_1, response_2, state_3, MM)["2"]

    # Add beta values
    betas = np.arange(0, 1, 0.05)
    beta_person = {}
    for form in ["visible", "hidden"]:
        log_p_answers = []
        if form == "visible":
            trials = trials_vis
        else: trials = trials_hid
        all_betas = []
        all_betas_an = []
        for beta in betas:
            log_p_answers = compute_for_trials(trials, task, form, eps, beta)
            all_betas.append([round(beta, 2), sum(log_p_answers)])
            if form == "hidden":
                log_p_answers_an = compute_for_trials_an(trials, task, form, eps, beta)
                all_betas_an.append([round(beta, 2), sum(log_p_answers_an)])
        beta_person[form] = all_betas
    beta_person["hidden_an"] = all_betas_an

    ## record that data in a dictionary
    sample[participant_id] = {"task": task,
                              "fsm": fsm,
                              "learning": learning.iloc[:,3:],
                              "MM": MM,
                              "visible": trials_vis,
                              "hidden": trials_hid,
                              "visible_beta": beta_person["visible"],
                              "hidden_beta": beta_person["hidden"],
                              "hidden_beta_an": beta_person["hidden_an"] } # make it list by adding ".values.tolist()"

pickle.dump(sample, open( "sample.p", "wb" ) )



In [36]:
sample[6]["hidden_beta_an"]

[[0.0, -3.010299956639812],
 [0.05, -3.044848266881229],
 [0.1, -3.1036776532878427],
 [0.15, -3.1862469136984104],
 [0.2, -3.291692479249728],
 [0.25, -3.41888048277786],
 [0.3, -3.56646984158022],
 [0.35, -3.7329792152572816],
 [0.4, -3.9168514668708827],
 [0.45, -4.116510862874676],
 [0.5, -4.330410166969724],
 [0.55, -4.5570665725136426],
 [0.6, -4.7950868096386445],
 [0.65, -5.04318266677863],
 [0.7, -5.3001786201406835],
 [0.75, -5.565013372967806],
 [0.8, -5.836736988776107],
 [0.85, -6.114505063720467],
 [0.9, -6.397571100215972],
 [0.95, -6.685277966470774]]

In [474]:
trials = sample[2]["hidden"]
task = sample[2]["task"]
form = "hidden"
epsilon = 0.01
beta = 1

compute_for_trials(trials, task, form, epsilon, beta)

[-0.9237130986014216,
 -0.05512413479491798,
 -1.5628276341376452,
 -0.9237130986014216,
 -1.1781909967501585,
 -0.05512413479491798,
 -0.9237130986014216,
 -1.4176817525776,
 -1.4972230270888054,
 -0.9237130986014216]

In [91]:
results = pd.DataFrame(columns=['condition', 'fsm', 'task', 'beta', 'sum_log_p'])
this_row = {}
for task in ["prediction", "explanation", "control"]:
    for fsm in ["easy", "hard"]:
        for condition in ["visible_beta", "hidden_beta", "hidden_beta_an"]:
            running=np.array(sample[2]["hidden_beta_an"])
            running[:,1] *= 0
            for i in sample.keys():
                if sample[i]["task"] == task and sample[i]["fsm"] == fsm:
                    running[:,1] += np.array(sample[i][condition])[:,1]
            for r in running:
                this_row = {'condition':condition, "fsm": fsm, "task":task, "beta": r[0], "sum_log_p": r[1]}
                results = results.append(this_row, ignore_index = True)

filepath = "../outputs/results_mm_running.csv"
results.to_csv(filepath, index = False)

In [90]:
results

Unnamed: 0,condition,fsm,task,beta,sum_log_p
0,visible_beta,easy,prediction,0.00,-45.154499
1,visible_beta,easy,prediction,0.05,-41.084427
2,visible_beta,easy,prediction,0.10,-37.548139
3,visible_beta,easy,prediction,0.15,-34.531708
4,visible_beta,easy,prediction,0.20,-32.013012
...,...,...,...,...,...
355,hidden_beta_an,hard,control,0.75,-70.613505
356,hidden_beta_an,hard,control,0.80,-73.728497
357,hidden_beta_an,hard,control,0.85,-76.943427
358,hidden_beta_an,hard,control,0.90,-80.246465
