# **RIC1 CPDM CASANDRE Model Fit (Parameter Estimation)**

### Combines organization of confidence data, fitting of CASANDRE model, and saving output into single script.
- Step 1. Import and structure data for CASANDRE model (previously sortTurkData.m)
- Step 2. Provide structured data to CASANDRE model (previously getLlhChoice_Likelihood.m & fitTurkDataComb.m)
- Step 3. Save and visualize output of CASANDRE model  

#### Information about CASANDRE

- decision variable

In [8]:
"""
===================
Mandy Renfro (2024)
===================
"""

from glob import glob
import matplotlib.pyplot as plt
import numpy as np
from numpy import exp, linspace, log, sqrt, sum
np.seterr(all = "ignore")
import os
from os.path import join
import sys
import pandas as pd
from scipy import stats
from scipy.optimize import minimize
from scipy.special import erfcinv
#logninv = stats.lognorm.ppf
normcdf = stats.norm.cdf
import seaborn as sns
sns.set_theme(style="white", palette="muted")
import sys
if not sys.warnoptions:
    import warnings
    warnings.simplefilter("ignore")

base_proj_dir = "Z:/data/RIC" ## base project directory
data_dir      = "Z:/data/RIC/sourcedata/RIC1" ## directory containing data
save_proj_dir = os.path.join(base_proj_dir, "derivatives/RIC1/parameter_estimation/cpdm") ## output directory

## CASANDRE parameters
cruns         = 25   ## times CASANDRE runs per Ss
sample_rate   = 100  ## higher values produce slower, more precise estimates
delta         = 5    ## standard deviations below and above mean, computes confidence variable distributions
noise_sens    = 1    ## If sensory noise is set to 1, distributions of decision variable and confidence variable can be compared directly

## CASANDRE options
options       = dict(maxiter = 250, disp = False)

In [4]:
def casandre_fit(guess_rate, stim_sens, stim_crit,  meta_uncert, conf_crit, stim_vals):
    """ Compute llh of each response alternative (4).
        Step 1 - sample decision variable denominator in steps of constant cumulative density.
        Step 2 - compute choice distribution under each scaled sensory distribution.
        Step 3 - average across all scaled sensory distributions to get likelihood functions.
        **Linear transformation of normal variable is itself normal variable.
        **Inverse of denominator used here to work with products instead of ratios.
        INPUT
        - params: CASANDRE parameters [guessRate, stimSens, stimCrit, uncMeta, confCrit]
        - stim_vals: stimulus conditions in units of stimulus magnitude
        OUTPUT
        - choice_ll: likelihood of each choice (2 x # confidence levels x N stim_vals) 
    """
    sens_mean  = stim_vals * stim_sens
    sens_crit  = stim_crit * stim_sens
    x          = [-conf_crit, 0, conf_crit]
    choice_llh = np.zeros((len(x) + 1, len(stim_vals)))
    for stim_idx, curr_stim in enumerate(stim_vals):
        mu_log_n    = log((noise_sens**2) / sqrt(meta_uncert**2 + noise_sens**2))
        sigma_log_n = sqrt(log((meta_uncert**2) / (noise_sens**2) + 1))
        dv_Den_x    = logninv(linspace(0.5 / sample_rate, 1 - (0.5 / sample_rate), sample_rate), mu_log_n, sigma_log_n)
        mu          = (1 / dv_Den_x.reshape(-1, 1)) * (sens_mean[stim_idx] - sens_crit)
        sigma       = (1 / dv_Den_x.reshape(-1, 1)) * noise_sens
        p           = normcdf(  repeat_matrix_function_2D(x, sample_rate, 1), 
                                repeat_matrix_function_2D(mu, 1, len(x)), 
                                repeat_matrix_function_2D(np.abs(sigma), 1, len(x)))
        ratio_dist_p = np.mean(p, axis = 0)
        choice_llh[1:-1, stim_idx] = (guess_rate / (len(x) + 1)) + (1 - guess_rate) * (ratio_dist_p[1:] - ratio_dist_p[:-1])
        choice_llh[   0, stim_idx] = (guess_rate / (len(x) + 1)) + (1 - guess_rate) * (ratio_dist_p[0])
        choice_llh[  -1, stim_idx] = (guess_rate / (len(x) + 1)) + (1 - guess_rate) * (1 - ratio_dist_p[-1])
    return choice_llh


def _logninv(p): ## only for mu = 0 and sigma = 1
    return exp(-sqrt(2) * erfcinv(2 * p))


def logninv(p, mu, sigma): ## not only for mu = 0 and sigma = 1
    ## log(logninv(p, mu, sigma)) == mu + sigma * log(logninv(p, 0, 1))
    ## exp(log(logninv(p, mu, sigma))) == logninv(p, mu, sigma)
    return exp(mu + sigma * log(_logninv(p)))


def neg_ll(params_vec, stim_vals, choices, all_run_indices, all_contrast_indices, neg_llh_size):
    """ Returns negative log likelihood for entire parameter vector.
        INPUT
        - param_vec:
        - stim_vals:
        - choices:
        - all_run_indices:
        - all_contrast_indices:
        - neg_llh_size:
        OUTPUT
        - np.nansum(neg_llh): negative log likelihood 
    """
    neg_llh = np.zeros(neg_llh_size)
    counter = 0
    max_contrast_len = 0
    for key in all_contrast_indices:
        max_contrast_len = max(max_contrast_len, len(all_contrast_indices[key]))
    for idx_run in range(len(all_run_indices)):
        run_stim_vals = stim_vals[all_run_indices[idx_run]]
        run_choices   = choices[all_run_indices[idx_run]]
        for idx_contrast in range(len(all_contrast_indices[idx_run])):
            guess_rate, stim_sens, stim_crit, meta_uncert, conf_crit = params_vec[(np.array([0, 
                                                                                            1 + idx_contrast,
                                                                                            1 + max_contrast_len + idx_contrast,  
                                                                                            1 + max_contrast_len + max_contrast_len + idx_run, 
                                                                                            1 + max_contrast_len + max_contrast_len + len(all_run_indices) + idx_run]))]
            unique_stim_choices = run_choices[all_contrast_indices[idx_run][idx_contrast]]
            choice_llh          = casandre_fit(guess_rate, stim_sens, stim_crit, meta_uncert, conf_crit, 
                                                run_stim_vals[all_contrast_indices[idx_run][idx_contrast]])
            neg_llh[counter]    = -sum(unique_stim_choices * log(choice_llh.T))
            counter += 1
    return np.nansum(neg_llh)


def random_bounded(bounds):
    """ Generate random value within specificed parameter range.
        INPUT
        - bounds: a tuple containing minimum and maximum values of the parameter range.
        OUTPUT
        - random value within the bounded range.
    """
    return np.random.random() * (bounds[1] - bounds[0]) + bounds[0]


def repeat_matrix_function_2D(arr, x, y = 1):
    """ Python function to replicate MATLAB's repmat()
        INPUT
        - arr: array to be copied
        - x: number of rows
        - y: number of columns
        OUTPUT
        - new_arr: new array 
    """
    new_arr = arr.copy()
    for i in range(1, x):
        new_arr = np.vstack((new_arr, arr))
    arr_h = new_arr.copy()
    for i in range(1, y):
        new_arr = np.hstack((new_arr, arr_h))
    return new_arr

In [None]:
guess_rate_bounds  = (   0, 0.075)
stim_sens_bounds   = (0.01,    10)
stim_crit_bounds   = (  -3,     3)
meta_uncert_bounds = ( 0.1,     3)
conf_crit_bounds   = (0.01,   5.1)

subs = [] ## all subject IDs
all_sub_params = []

files = sorted(glob(os.path.join(data_dir, "23_IDM_*.csv"))) ## grab all participant datafiles
for curr_file in files: ## iterate through globbed files and save subject ID to a list
    sub_id = os.path.basename(curr_file)[7:11] ## grab first 5 indices of filename string
    if not sub_id in subs: ## check if already in list
        subs.append(sub_id) ## if not, append new Ss ID to list

skip_num = 0
if os.path.exists(join(save_proj_dir, "ric1-cpdm_best-params-est.csv")): ## new Ss
    existing_df = pd.read_csv(join(save_proj_dir, "ric1-cpdm_best-params-est.csv")) ## make new Ss save directory
    existing_df["subID"] = [val[2:-1] for val in existing_df["subID"]]
    skip_num = len(existing_df["subID"].values)
    all_sub_params = existing_df.values.tolist()

for idx, sub in enumerate(subs): ## iterate through Ss ID list
    if idx + 1 <= skip_num:
        continue
    sub_files = sorted(glob(os.path.join(data_dir, "23_IDM_{0}.csv".format(sub)))) ## grab all IDM task csvs
    sub_cols  = ["run_dims", "orient", "contrast", "acc", "conf", "choice"] ## trial/Ss resp elements
    sub_df    = pd.DataFrame(columns = sub_cols) ## subject-specific dataframe w/ preset columns
    raw_df    = pd.read_csv(sub_files[0]) ## open current data file
    df        = raw_df.loc[(raw_df["cpdm_trial_type"] == "task") & (raw_df["cpdm_trial_resp.keys"].notnull())] ## only CPDM task trials w/ responses
    sub_df["run_dims"] = df["cpdm_run_dimension"] ## dimensions for current trial (volatility/risk levels = number/difficulty of orientations/contrasts)
    sub_df["orient"]   = df["cpdm_gabor_orient"] ## orientation of gabor patch
    sub_df["contrast"] = df["cpdm_gabor_contrast"] ## contrast of gabor patch
    sub_df["acc"]      = df["cpdm_acc"] ## Ss accuracy
    sub_df["conf"]     = df["cpdm_conf"] ## Ss confidence in perceptual accuracy
    sub_df["choice"]   = df["cpdm_trial_resp.keys"] ## trial choice (q=high conf,left tilt; a=low conf,left tilt; p=high conf,right tilt; l=low conf,right tilt)
    choice_dict        = {'q':-2, 'a':-1, 'l':1, 'p':2} 
    sub_df["choice"]   = df["cpdm_trial_resp.keys"].replace(choice_dict) ## recode CPDM Ss choice responses
    
    stim_vals     = sub_df["orient"].values
    contrast_vals = sub_df["contrast"].values
    choices       = sub_df["choice"].values
    math_choices  = np.zeros((len(choices), len(choice_dict.keys())))
    math_choices[np.where(choices ==  2)] = np.array([0, 0, 0, 1])
    math_choices[np.where(choices ==  1)] = np.array([0, 0, 1, 0])
    math_choices[np.where(choices == -1)] = np.array([0, 1, 0, 0])
    math_choices[np.where(choices == -2)] = np.array([1, 0, 0, 0])
    
    all_run_indices      = []
    all_contrast_indices = {}
    nll_counter          = 0
    for idx_run, run_label in enumerate(sorted(sub_df["run_dims"].unique())): ## go through each task run labels
        trial_indices = np.where(sub_df["run_dims"].values == run_label)
        all_run_indices.append(trial_indices)
        run_contrast_vals = contrast_vals[trial_indices]
        all_contrast_indices[idx_run] = []
        for unique_contrast in sorted(np.unique(run_contrast_vals)): ## go through each of the unique stimulus contrasts
            unique_contrast_indices = np.where(run_contrast_vals == unique_contrast)
            all_contrast_indices[idx_run].append(unique_contrast_indices)
            nll_counter += len(unique_contrast_indices)

    best_param_est = None
    best_nll       = None
    print(sub, end = "\r")
    run_labels     = dict(high_vol_high_risk = "HVHR", high_vol_low_risk = "HVLR", low_vol_high_risk = "LVHR", low_vol_low_risk = "LVLR")
    for crun in range(cruns):
        search_start = [random_bounded(guess_rate_bounds)]
        bounds       = [guess_rate_bounds]
        column_names = ["subID", "Guess Rate"]
        for curr_contrast in sorted(np.unique(contrast_vals)):
            search_start.append(random_bounded(stim_sens_bounds))
            bounds.append(stim_sens_bounds)
            column_names.append("Stimulus Sensitivity {}".format(curr_contrast))
        for curr_contrast in sorted(np.unique(contrast_vals)):
            search_start.append(random_bounded(stim_crit_bounds))
            bounds.append(stim_crit_bounds)
            column_names.append("Stimulus Criterion {}".format(curr_contrast))
        for curr_run in sorted(sub_df["run_dims"].unique()):
            search_start.append(random_bounded(meta_uncert_bounds))
            bounds.append(meta_uncert_bounds)
            column_names.append("Meta Uncertainty {}".format(run_labels[curr_run]))
        for curr_run in sorted(sub_df["run_dims"].unique()):
            search_start.append(random_bounded(conf_crit_bounds))
            bounds.append(conf_crit_bounds)
            column_names.append("Confidence Criterion {}".format(run_labels[curr_run]))
        search_start = np.array(search_start)
        column_names.append("Best NLL")
        estimate = minimize(neg_ll, 
                            x0      = search_start, 
                            method  = "L-BFGS-B", 
                            bounds  = bounds,
                            args    = (stim_vals, math_choices, all_run_indices, all_contrast_indices, nll_counter), 
                            options = options)
        if best_nll is None or best_nll > estimate.fun:
            best_nll       = estimate.fun
            best_param_est = np.hstack((sub, estimate.x, best_nll))
    all_sub_params.append(best_param_est)
    estimate_df = pd.DataFrame(np.array(all_sub_params), columns = column_names)
    estimate_df["subID"] = estimate_df["subID"].apply('="{}"'.format)
    estimate_df.to_csv(join(save_proj_dir, "ric1-cpdm_best-params-est.csv"), index = False)

estimate_df = pd.DataFrame(np.array(all_sub_params), columns = column_names)
estimate_df["subID"] = estimate_df["subID"].apply('="{}"'.format)
estimate_df.to_csv(join(save_proj_dir, "ric1-cpdm_best-params-est.csv"), index = False)