In [1]:
import pandas as pd
import numpy as np
import numpy.matlib
import matplotlib.pyplot as plt
import pdb
import scipy
from scipy.optimize import minimize, fmin
from scipy.stats import multivariate_normal
from tqdm.notebook import tqdm


In [2]:
""" 
Obtaining data from a given expt
"""
csv_test = pd.read_csv('../auditory_categorization_longHigh/important_things_not_included_in_assets/allTrials.csv')
csv_data = pd.read_csv('auditory_categorization_Hc_online_data/auditory_categorization_v3_143976_2021-10-06_15h15.21_031e0522-8d05-4b73-8d13-954610a13f6f/60a3c37fb7414cf62b79e21e_categorization_task_longHigh_2021-06-16_22h13.55.728.csv');
                       

In [3]:
n_tones = 3
n_trials = csv_data.shape[0]-47

"""
Get tones and values of keys pressed
"""
test_columns = list(csv_test.columns)
test_tones_name = test_columns.index('Name')
test_tones_col_idx = test_columns.index('Tones')
df_names = (csv_test.iloc[0:800,test_tones_name]).values
df_tones = (csv_test.iloc[0:800,test_tones_col_idx]).values

tones_array_orig = np.zeros((n_trials,n_tones))
tones_array_idxs_keep = []

for i_wav in range(804):
    if isinstance(csv_data['Name'][i_wav+46],str):
        tones_array_orig[i_wav,:] = np.array(df_tones[np.where(csv_data['Name'][i_wav+46]\
                                                          ==df_names)[0]][0][1:-1].split(',')).astype(float)  
        tones_array_idxs_keep += [i_wav]

        
df_tones = np.copy(tones_array_orig[tones_array_idxs_keep,:])
df_corrans = np.copy(csv_data['corrAns'][46:csv_data.shape[0]])[tones_array_idxs_keep]
df_keys = np.copy(csv_data['test_resp.keys'][46:csv_data.shape[0]])[tones_array_idxs_keep]

In [4]:
"""
Find no response cases in the expt
"""
no_response = np.intersect1d(np.where(df_keys!='h')[0],np.where(df_keys!='l')[0])
print("Did not respond to: ",no_response)

"""
Convert keys ['l','h'] to [0,1] and calculate accuracies
"""
corrans_num_orig = np.zeros_like(df_corrans)
corrans_num_orig[df_corrans == 'h'] = 1

keys_num_orig = np.zeros_like(df_keys)
keys_num_orig[df_keys == 'h'] = 1

corrans_num = corrans_num_orig[:800]
keys_num = keys_num_orig[:800]
tones_array = df_tones[:800]
print("Got correct: ", np.sum(keys_num==corrans_num)/len(tones_array))
print("Got high correct: ", np.sum((keys_num)*(corrans_num))/np.sum(corrans_num))
print("Got low correct: ", np.sum((1-keys_num)*(1-corrans_num))/np.sum(1-corrans_num))

"""
Subsample the long context expt with a high category bias
"""
def subsampleLongContext(trial_behaviour_full, corrans_full, trial_tones_full):
    idxHigh = np.arange(len(trial_behaviour_full[0:]))[corrans_full[0:]==1]
    idxLow = np.arange(len(trial_behaviour_full[0:]))[corrans_full[0:]==0]
    idxOfSmallerCategory = np.random.choice(idxHigh,size=len(idxLow),replace=False)
    idxToKeep = np.concatenate((idxLow, idxOfSmallerCategory))
    corrans_expt = corrans_full[0:][idxToKeep]
    trial_behaviour_expt = trial_behaviour_full[0:][idxToKeep]
    trial_tones_expt = trial_tones_full[0:][idxToKeep,:]
    #print("Got correct: ", np.sum(trial_behaviour_expthc==corrans_expthc)/len(trial_tones_expthc))
    return trial_tones_expt, trial_behaviour_expt

Did not respond to:  [ 86 130 144 232 280 334 338 420 421 424 433 444 494 542 543 544 575 592
 593 596 597 624 625 626 627 628 638 648 688 693 712 736 747 749 755 769
 782]
Got correct:  0.6175
Got high correct:  0.7614035087719299
Got low correct:  0.2608695652173913


In [5]:
# this has been changed to check how values change with observer responses

expt_tones = np.arange(90,3000,1) #array of possible true tones
log_freq_seq_array = np.arange(0.6,4.7,0.1)
log_freq_percept = np.arange(0.6,4.7,0.1) # array of possible perceptual tones

idxs_with_response = np.delete(np.arange(len(tones_array)),no_response)
trialTones = tones_array[idxs_with_response,:]
trialBehaviour = keys_num[idxs_with_response]
corrAns = corrans_num[idxs_with_response]


In [6]:
def gaussian(x, mean, sigma):
    return np.exp(-(x-mean)**2/(2*sigma**2))

def Tones1dgrid(latentTone, sigma):    
    
    input_array = gaussian(log_freq_percept, latentTone, sigma)
    s0 = 1/np.sum(input_array)
    input_array *= s0
        
    return input_array

def posterior_array(freq_input, n_tones, p_back, p_low, log_prior):
    """
    Arguments: 
    freq_input - range of all possible frequencies (percepts?)
    p_back - prob of background
    p_low - prob of low condition
    log_prior - list of prior parameters
    """
    
    log_prior_low_mean = log_prior[0]; log_prior_low_sigma = log_prior[2];
    log_prior_high_mean = log_prior[1]; log_prior_high_sigma = log_prior[2];
    prior_low = gaussian(x=freq_input, mean=log_prior_low_mean, sigma=log_prior_low_sigma)
    prior_high = gaussian(x=freq_input, mean=log_prior_high_mean, sigma=log_prior_high_sigma)
    prior_dist_mixed_high = p_back*(1/len(freq_input)) + (1-p_back)*prior_high \
    #mixture model with p(T|B) = 1/no. of possible freqs
    prior_dist_mixed_high /= prior_dist_mixed_high.sum() #normalizing
    prior_dist_mixed_high = np.expand_dims(prior_dist_mixed_high, axis = 1)
    prior_dist_mixed_low = p_back*(1/len(freq_input)) + (1-p_back)*prior_low \
    #mixture model with p(T|B) = 1/no. of possible freqs
    prior_dist_mixed_low /= prior_dist_mixed_low.sum() #normalizing
    prior_dist_mixed_low = np.expand_dims(prior_dist_mixed_low, axis = 1)
        
    if n_tones == 3:
        prior_tones_low = np.expand_dims(prior_dist_mixed_low@np.transpose\
                                         (prior_dist_mixed_low),axis=2)@np.transpose(prior_dist_mixed_low) \
        #p(T1,T2..|L) 
        prior_tones_high = np.expand_dims(prior_dist_mixed_high@np.transpose\
                                          (prior_dist_mixed_high),axis=2)@np.transpose(prior_dist_mixed_high) \
        #p(T1,T2..|H) 
    elif n_tones == 1:
        prior_tones_low = prior_dist_mixed_low
        prior_tones_high = prior_dist_mixed_high
        
    normalizer = (1-p_low)*prior_tones_high + p_low*prior_tones_low #p(H)*p(T1,T2..|H) + p(L)*p(T1,T2..|L)
    posterior = prior_tones_high*(1-p_low)/normalizer
    # posterior /= np.sum(posterior)
    
    return prior_dist_mixed_high, prior_dist_mixed_low, prior_tones_high, prior_tones_low, normalizer, posterior

# define mle function
def MLE(params,trial_tones,trial_behaviour):
    log_prior_low_mean, log_prior_high_mean, log_prior_sigma, sigma_sensory, prob_back, prob_low = \
    params[0], params[1], params[2], params[3], params[4], params[5] # inputs are guesses at our parameters 
    
    reps = 1000
    all_trial_tones = np.empty((len(trial_tones)*reps,n_tones))
    all_trial_behaviour = np.empty((len(trial_tones)*reps,1))
    prob_trial_behaviour = np.empty((len(trial_tones),1))
    probability_sim_high = np.zeros((len(trial_tones),1))
    neg_ll = 0

    _,_,LikelihoodLatentTonegivenHigh,LikelihoodLatentTonegivenLow,_,_ = posterior_array(log_freq_seq_array, 
                                                                                         n_tones=1, 
                                                                                         p_low=prob_low,
                                                                                         p_back = prob_back,
                                                                                         log_prior=[log_prior_low_mean,
                                                                                                    log_prior_high_mean,
                                                                                                    log_prior_sigma])

    LikelihoodPerceptgivenHigh = np.zeros((len(log_freq_percept),))
    LikelihoodPerceptgivenLow = np.zeros((len(log_freq_percept),))

    for itrue1 in range(len(log_freq_percept)):
        probPerceptgivenLatentTones = Tones1dgrid(latentTone=log_freq_percept[itrue1],sigma=sigma_sensory)
        LikelihoodPerceptgivenHigh += probPerceptgivenLatentTones * LikelihoodLatentTonegivenHigh[itrue1]
        LikelihoodPerceptgivenLow += probPerceptgivenLatentTones * LikelihoodLatentTonegivenLow[itrue1]
        
    probHighgivenPercept = LikelihoodPerceptgivenHigh*(1-prob_low)/\
    (LikelihoodPerceptgivenHigh*(1-prob_low) + LikelihoodPerceptgivenLow*prob_low)

    for i_stim in range(len(trial_tones)):
        input_array_mat = np.random.normal(loc=np.log10(trial_tones[i_stim]),
                                           scale=sigma_sensory,
                                           size=(reps,1,n_tones)) 
        #pick tones from the gaussian with mean as log(true_tone) and sensory sigma 0.1    
        for i_tperc in range(reps):
            perc_tone_idxs = np.zeros((n_tones,1),dtype=int)
            for i in range(n_tones):
                # find relevant adjacent freq percepts   
                perc_tone_idxs[i] = np.argmin(np.abs(log_freq_percept-input_array_mat[i_tperc][0][i]))  
            # the suboptimal subject choses the highest frequency they hear for categorization
            posterior_perc_tone = probHighgivenPercept[np.max(perc_tone_idxs)]
            # this makes the same choice for one tone percept every time that tone is perceived  
            trial_behaviour_generated = np.squeeze(posterior_perc_tone) > 0.5 
            all_trial_behaviour[i_stim*reps+i_tperc,:] = trial_behaviour_generated
        all_trial_tones[i_stim*reps:(i_stim+1)*reps,:] = trial_tones[i_stim]    
        prob_trial_behaviour[i_stim] = np.mean(all_trial_behaviour[i_stim*reps:(i_stim+1)*reps])
        
        if trial_behaviour[i_stim]:
            if np.isnan(np.log(prob_trial_behaviour[i_stim] + 0.0000001)) \
            or np.isinf(np.log(prob_trial_behaviour[i_stim] + 0.0000001)) \
            or np.isnan(np.log(1-prob_trial_behaviour[i_stim] + 0.0000001)) \
            or np.isinf(np.log(1-prob_trial_behaviour[i_stim] + 0.0000001)):
                pdb.set_trace()
            neg_ll += -np.log(prob_trial_behaviour[i_stim] + 0.0000001) # if high dist is chosen by observer
        else:
            neg_ll += -np.log(1-prob_trial_behaviour[i_stim] + 0.0000001) # if low dist is chosen by observer
    #print(params, neg_ll)        
    return(neg_ll)

In [7]:
"""
New optimization algorithm: uses scipy.optimize.fmin. 
Crude grid initially and then find minimum using the function.

Then p_low value corresponding to the least negative log likelihood measurement is found.
"""

# Sigma sensory value is based on the no context simple trial fitting.
guess_low_mean = np.arange(2.1,2.71,0.15); guess_high_mean = np.arange(2.7,3.31,0.15); 
guess_sigma = np.arange(0.05,1,0.2); guess_sensory_sigma = np.array([0.15]);
guess_p_back = np.array([0]); guess_p_low =np.arange(0.1,1.1,0.1);

# Constraining guesses of means of low and high distributions based on observed behaviour in figure shown above. 

for iSubsampled in range(4):
    trialTones_subsampled, trialBehaviour_subsampled = subsampleLongContext(trial_behaviour_full=trialBehaviour, 
                                                                            corrans_full=corrAns,
                                                                            trial_tones_full=trialTones)
    print(trialTones_subsampled.shape, trialBehaviour_subsampled.shape)
    
    neg_ll_array = np.zeros((len(guess_low_mean), len(guess_high_mean), len(guess_sigma), 
                             len(guess_sensory_sigma), len(guess_p_back), len(guess_p_low)))
    for lm in tqdm(range(len(guess_low_mean))):
        for hm in tqdm(range(len(guess_high_mean)), leave=False, desc="High mean"):
            for s in range(len(guess_sigma)):
                for ss in range(len(guess_sensory_sigma)):
                    for pb in range(len(guess_p_back)):
                        for pl in range(len(guess_p_low)):
                            params = [guess_low_mean[lm], guess_high_mean[hm], guess_sigma[s], 
                                      guess_sensory_sigma[ss], guess_p_back[pb], guess_p_low[pl]]
                            # print(lm, hm, pb)
                            neg_ll_array[lm,hm,s,ss,pb,pl] = MLE(params,
                                                                trial_tones=trialTones_subsampled,
                                                                trial_behaviour=trialBehaviour_subsampled)                                        
            
    idxs = np.where(neg_ll_array == np.amin(neg_ll_array)) 
    best_thetas = np.array([guess_low_mean[idxs[0]], guess_high_mean[idxs[1]], guess_sigma[idxs[2]], \
                            guess_sensory_sigma[idxs[3]], guess_p_back[idxs[4]], guess_p_low[idxs[5]]])

    print(np.amin(neg_ll_array),best_thetas)


(434, 3) (434,)


  0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

  posterior = prior_tones_high*(1-p_low)/normalizer


High mean:   0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

499.6962486514917 [[2.7 ]
 [2.7 ]
 [0.85]
 [0.15]
 [0.  ]
 [0.5 ]]
(434, 3) (434,)


  0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

486.6330022580963 [[2.7 ]
 [2.7 ]
 [0.85]
 [0.15]
 [0.  ]
 [0.5 ]]
(434, 3) (434,)


  0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

486.2736464183019 [[2.7 ]
 [2.7 ]
 [0.85]
 [0.15]
 [0.  ]
 [0.5 ]]
(434, 3) (434,)


  0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

High mean:   0%|          | 0/5 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
idxs = np.where(neg_ll_array == np.amin(neg_ll_array)) 
best_thetas = np.array([guess_low_mean[idxs[0]], guess_high_mean[idxs[1]], guess_sigma[idxs[2]], \
                        guess_sensory_sigma[idxs[3]], guess_p_back[idxs[4]], guess_p_low[idxs[5]]])

print(np.amin(neg_ll_array),best_thetas)