In [1]:
import numpy as np
import pandas as pd
import pyddm
import os
import utils
import models

pygame 2.1.2 (SDL 2.0.18, Python 3.9.7)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
def get_model_measures(model, condition):
    sol = model.solve(condition)
    mean_rt_error = np.sum(sol.pdf_err()*model.t_domain())*model.dt / sol.prob_error()
    return condition["tta_0"], condition["d_0"], condition["a_values"], \
           sol.prob_correct(), sol.mean_decision_time(), mean_rt_error

def get_model_rt_distr(model, condition, kind="cdf"):
    sol = model.solve(condition)
    return pd.DataFrame({"tta_0": condition["tta_0"],
                         "d_0":  condition["d_0"],
                         "a_values": condition["a_values"],
                         "t": model.t_domain(),
                         "rt_corr_distr": (sol.cdf_corr() if kind=="cdf" else sol.pdf_corr())/sol.prob_correct(),
                         "rt_error_distr": (sol.cdf_err() if kind=="cdf" else sol.pdf_err())/sol.prob_error()})

def initialize_model(model_no, param_set):
    simulation_params = {"dt": 0.01, "duration": 5.0}

    if model_no == 1:
        # drift only depends on TTA and distance (2020 version)
        overlay = models.OverlayNonDecisionGaussian(ndt_location=param_set.ndt_location, ndt_scale=param_set.ndt_scale)
        bound = models.BoundCollapsingTta(b_0=param_set.b_0, k=param_set.k, tta_crit=param_set.tta_crit)
        drift = models.DriftTtaDistance(alpha=param_set.alpha, beta_d=param_set.beta_d, theta=param_set.theta)

        model = pyddm.Model(name="Model %i" % model_no, drift=drift, bound=bound, overlay=overlay,
                      noise=pyddm.NoiseConstant(noise=1), T_dur=models.ModelTtaDistance.T_dur)
    # elif model_no == 2:
    #     # re-implementation of Model 1 with the same acceleration pattern for all conditions (a=0)
    #     # results should be the same as Model 1, just for sanity check purposes
    #     gaze_sample_f = interpolate.interp1d(t, gaze_sample)
    #
    #     overlay = models.OverlayNonDecisionGaussian(ndt_location=param_set.ndt_location, ndt_scale=param_set.ndt_scale)
    #     bound = models.BoundCollapsingTta(b_0=param_set.b_0, k=param_set.k, r=param_set.r, tta_crit=param_set.tta_crit)
    #     drift = models.DriftGaze(alpha=param_set.alpha, beta_d=param_set.beta_d, beta_tta_or=param_set.beta_tta_or,
    #                              theta=param_set.theta, gamma=param_set.gamma, gaze_sample_f=gaze_sample_f)
    #
    #     model = pyddm.Model(name="Model %i" % model_no, drift=drift, bound=bound, overlay=overlay,
    #                   noise=pyddm.NoiseConstant(noise=1), T_dur=simulation_params["duration"])
    # elif model_no == 3:
    #     # model with gaze-dependent access to perceptual information
    #     gaze_sample = helper.get_mean_gaze_rate(simulation_params)
    #     t = np.linspace(0, simulation_params["duration"], len(gaze_sample))
    #     gaze_sample_f = interpolate.interp1d(t, gaze_sample)
    #
    #     overlay = models.OverlayNonDecisionGaussian(ndt_location=param_set.ndt_location, ndt_scale=param_set.ndt_scale)
    #     bound = models.BoundCollapsingGeneralizedGap(b_0=param_set.b_0, k=param_set.k, beta_d=param_set.beta_d,
    #                                                  beta_tta_or=param_set.beta_tta_or, theta=param_set.theta,
    #                                                  gamma=param_set.gamma, gaze_sample_f=gaze_sample_f)
    #     drift = models.DriftGaze(alpha=param_set.alpha, beta_d=param_set.beta_d, beta_tta_or=param_set.beta_tta_or,
    #                              theta=param_set.theta, gamma=param_set.gamma, gaze_sample_f=gaze_sample_f)
    #
    #     model = pyddm.Model(name="Model %i" % model_no, drift=drift, bound=bound, overlay=overlay,
    #                   noise=pyddm.NoiseConstant(noise=1), T_dur=simulation_params["duration"])

    else:
        model = None

    return model

def simulate_model(model_no, param_set, conditions, ret="measures"):
    """
    Set ret to "measures" or "rt_cdf" or "rt_pdf" for saving p_turn and mean RT or RT CDF or RT PDF
    """
    model = initialize_model(model_no, param_set)

    if ret=="measures":
        sim_result = pd.DataFrame([get_model_measures(model, condition) for condition in conditions],
                                  columns=["tta_0", "d_0", "a_values", "is_gap_accepted", "RT_merge", "RT_wait"])
    else:
        sim_result = pd.concat([get_model_rt_distr(model, condition, kind=ret[-3:]) for condition in conditions])
    sim_result["subj_id"] = param_set.subj_id
    return sim_result

In [3]:
def save_sim_results(model_no, file_name, conditions=None, cross_validation=False, ret="measures"):
    if model_no == 1:
        model_name = "drift_tta_distance"
    # elif model_no == 2:
    #     model_name = "gaze_dependent_model"
    # elif model_no == 3:
    #     model_name = "gaze_dependent_bound_generalized_gap_model"
    else:
        raise ValueError("Wrong model_no")

    path = os.path.join("fit_results", model_name)
    parameters = pd.read_csv(os.path.join(path, file_name))

    sim_results = [simulate_model(model_no, param_set,
                                  [param_set[["tta_0", "d_0", "a_values"]].to_dict()] if cross_validation
                                  else conditions,
                                  ret=ret)
                   for idx, param_set in parameters.iterrows()]

    sim_results = pd.concat(sim_results)
    sim_results.to_csv(os.path.join(path, "simulation_results", file_name.replace("parameters_fitted", ret)), index=False)

In [4]:
conditions = [{"tta_0": tta_0, "d_0": d_0, "a_values": a_values}
              for tta_0 in np.linspace(4, 6, 9)
              for d_0 in np.linspace(70, 90, 3)
              for a_values in [(0., 0., 0., 0.),
                               (0., 4, 4, 0.),
                               (0., 4, -4, 0.),
                               (0., -4, 4, 0.),
                               (0., -4, -4, 0.)]]

# up-to-date

In [6]:
save_sim_results(model_no=1, file_name="subj_all_parameters_fitted.csv", conditions=conditions, cross_validation=False,
                 ret="measures")

In [14]:
save_sim_results(model_no=2, file_name="subj_all_parameters_fitted.csv", conditions=conditions, cross_validation=False,
                 ret="measures")

In [7]:
save_sim_results(model_no=3, file_name="subj_all_parameters_fitted.csv", conditions=conditions, cross_validation=False,
                 ret="measures")