In [1]:
import matplotlib.pyplot as plt
import dill
import numpy as np
from tqdm.notebook import tqdm
from hmmlearn import hmm, vhmm

from synthetic_data_generation_functions import *
from synthetic_data_analysis_functions import *
from hmm_functions import *


plt.style.use('/home/david/Documents/code/phd/paper.mplstyle')

print(plt.get_backend())
%matplotlib qt5
print(plt.get_backend())

module://matplotlib_inline.backend_inline


QSocketNotifier: Can only be used with threads started with QThread


qt5agg


# Simulations generations

## Parameters

In [2]:
steps_number = 100
noise_amplitude = 0.1
delta = 0.05
drift = 0.0
p_a = 0.5
p_a_reward = 0.8
p_b_reward = 0

number_of_simulations_perset = 20

n_simulations_list = [number_of_simulations_perset]*100

start_index = 0

simulations_folder_path = '/home/david/Documents/code/DDM_v2_synthetic_data'

## Generation

In [3]:

generate_simulations = False

for i, n_simulations in enumerate(n_simulations_list):
    
    if not(generate_simulations):

        break

    simulations_batch = run_simulations_batch(p_a, p_a_reward, p_b_reward, steps_number, noise_amplitude, delta, drift, n_simulations)

    with open(f'{simulations_folder_path}/n_{n_simulations}/simulations_batch_{n_simulations}_test_{i+1}.pkl', 'wb') as file:
        dill.dump(simulations_batch, file)

In [4]:
# test_average_probability_sequences = generate_test_average_probability_sequences(delta_range, args)

# with open(f'DDM_v2/statistical_precision_analysis/simulations_batches/test_average_probability_sequences.pkl', 'wb') as file:
#     dill.dump(test_average_probability_sequences, file)

# HMM fit

## Parameters

In [5]:
n_to_test = np.arange(2,16)


## Model fit

In [6]:
fit_model = False # PARAM

if fit_model:

    for index, n_simulations in enumerate(tqdm(n_simulations_list)):

        ####################
        ### Loading Data ###
        ####################

        with open(f'{simulations_folder_path}/n_{n_simulations}/simulations_batch_{n_simulations}_test_{start_index + index+1}.pkl', 'rb') as file:
            synthetic_data = dill.load(file)

        ########################
        ### Reformating Data ###
        ########################

        slice_size = int(n_simulations/2)

        training_data = [synthetic_data[i]['choices'] for i in np.arange(0,slice_size)]
        validation_data = [synthetic_data[i]['choices'] for i in np.arange(slice_size,2*slice_size)]


        training_emissions = np.array([]).astype(int)
        validation_emissions = np.array([]).astype(int)

        for x,y in zip(training_data,validation_data):

            training_emissions = np.concatenate((training_emissions, x))
            validation_emissions = np.concatenate((validation_emissions, y))

        training_emissions = training_emissions.reshape(-1,1)
        training_emissions_lengths = [len(x) for x in training_data]

        validation_emissions = validation_emissions.reshape(-1,1)
        validation_emissions_lengths = [len(y) for y in validation_data]

        ###################
        ### Infer model ###
        ###################

        best_model, best_score = infer_best_model_score(training_emissions, validation_emissions, 
                                                training_emissions_lengths, validation_emissions_lengths, 
                                                n_to_test, leave_loading_bar=False, verbose=False)
        
        ##################
        ### Save model ###
        ##################
        
        with open(f'{simulations_folder_path}/n_{n_simulations}/best_model_score_{n_simulations}_test_{start_index + index+1}.pkl', 'wb') as file:
            dill.dump(best_model, file)


# Drift Estimation

## Generating "theoretical" average probability sequences

In [7]:
generate_theoretical_sequences = False # PARAM

if generate_theoretical_sequences:

    args = synthetic_data[0]['parameters'] + [5000]
    delta_range = np.linspace(0.01,0.2,200)


    test_average_probability_sequences = generate_test_average_probability_sequences(delta_range, args)

    with open(f'{simulations_folder_path}/test_average_probability_sequences.pkl', 'wb') as file:
        dill.dump([delta_range, test_average_probability_sequences], file)

else:
 
    with open(f'{simulations_folder_path}/test_average_probability_sequences.pkl', 'rb') as file:
        delta_range, test_average_probability_sequences = dill.load(file)


## Minimizing mean square error

In [8]:
save_result = False # PARAM
average_proba_sequences_hmm = []


for index, _ in enumerate(tqdm(n_simulations_list)):
# for index, _ in enumerate(n_simulations_list):

    ##############################
    ### Loading Data and Model ###
    ##############################

    with open(f'{simulations_folder_path}/n_{n_simulations}/simulations_batch_{n_simulations}_test_{start_index + index+1}.pkl', 'rb') as file:
        synthetic_data = dill.load(file)

    with open(f'{simulations_folder_path}/n_{n_simulations}/best_model_score_{n_simulations}_test_{start_index + index+1}.pkl', 'rb') as file:
        model = dill.load(file)

    ########################
    ### Reformating Data ###
    ########################

    test_data = [synth_data['choices'] for synth_data in synthetic_data]

    initial_state_list = []
    sequences_number = len(test_data)

    for i in range(sequences_number):
        
        choices_sequence = test_data[i]
        
        states_sequence = model.predict(np.int16(choices_sequence.reshape(-1,1)))
        initial_state_list.append(states_sequence[0])

    initial_state_list_distri = []

    for s in range(len(model.transmat_)):

        initial_state_list_distri.append(initial_state_list.count(s))

    transmat = model.transmat_
    emission_vect = model.emissionprob_[:,1]
    mat = transmat
    sorted_indexes = np.argsort(emission_vect)
    vector = np.ones([len(transmat),1])/len(transmat)

    ##

    new_transmat = order_matrix(mat, sorted_indexes)

    ##

    new_emissionmat = []
    new_initial_state_list_distri = []

    for i in sorted_indexes:
        new_emissionmat.append(model.emissionprob_[i,:])
        new_initial_state_list_distri.append(initial_state_list_distri[i])

    new_emissionmat = np.array(new_emissionmat)
    new_initial_state_list_distri = np.array(new_initial_state_list_distri)/np.sum(new_initial_state_list_distri)


    ####################
    ### Computations ###
    ####################

    average_proba_sequence_hmm = []

    steps = np.arange(len(test_data[0]))
    new_mat_i = new_transmat

    for i in steps:

        new_mat_i = np.matmul(new_mat_i,new_transmat)
        res = np.matmul(new_initial_state_list_distri,new_mat_i)*new_emissionmat[:,1]
            
        average_proba_sequence_hmm.append(np.sum(res))

    average_proba_sequences_hmm.append(average_proba_sequence_hmm)

    if not(save_result):

        continue

    mse_list = []

    for test_average_probability_sequence in tqdm(test_average_probability_sequences, leave=False):
    # for test_average_probability_sequence in test_average_probability_sequences:

        mse_list.append(compute_mean_square_error_v2(average_proba_sequence_hmm, test_average_probability_sequence))

    min_mse = np.min(mse_list)
    recovered_delta = delta_range[np.where(mse_list==min_mse)[0]]

    ##############################
    ### Saving Recovered Delta ###
    ##############################
    
    with open(f'{simulations_folder_path}/n_{n_simulations}/recovered_delta_{n_simulations}_{start_index + index+1}.pkl', 'wb') as file:
        dill.dump([test_average_probability_sequence,recovered_delta], file)


  0%|          | 0/100 [00:00<?, ?it/s]

# Mean square error analysis

## Recovered delta loading

In [9]:
recovered_delta_list = []

for index, _ in enumerate(n_simulations_list):

    with open(f'{simulations_folder_path}/n_{n_simulations}/recovered_delta_{n_simulations}_{start_index + index+1}.pkl', 'rb') as file:
        _, recovered_delta = dill.load(file)
    
    recovered_delta_list.append(recovered_delta[0])

# Plots

In [10]:
fig=plt.figure(figsize=(1, 4), dpi=300, constrained_layout=False, facecolor='w')
gs = fig.add_gridspec(1, 1)
row = gs[:].subgridspec(1, 1, hspace=0.5)

ax1 = plt.subplot(row[0,0])

ax1.scatter(np.ones(len(n_simulations_list)), recovered_delta_list, marker='s', alpha=0.01, s=3, linewidth=0)

ax1.axhline(0.05, linewidth=0.7, color='k', linestyle='--', label='Drift to recover')

ax1.set_title(f'{len(n_simulations_list)} sets of {number_of_simulations_perset} simulations of length {steps_number}')

ax1.set_xticks([])
ax1.set_ylabel('Recovered drift')

Text(0, 0.5, 'Recovered drift')

In [11]:
fig=plt.figure(figsize=(4, 4), dpi=300, constrained_layout=False, facecolor='w')
gs = fig.add_gridspec(1, 1)
row = gs[:].subgridspec(1, 1, hspace=0.5)

ax1 = plt.subplot(row[0,0])

histo = np.histogram(recovered_delta_list, bins=np.linspace(0.01,0.2,51), density=True)
bin_width = histo[1][1] - histo[1][0]
ax1.stairs(histo[0]/np.sum(histo[0])/bin_width, histo[1], alpha=0.5, fill=True, label = 'Recovered drift probability density')

x = np.linspace(-0.2,0.2)
sigma = 0.1
ax1.plot(x, np.exp(-x**2/(2*sigma**2)) * 1/np.sqrt(2*sigma**2*np.pi), label = "Noise probability density")

ax1.axvline(0.05, linewidth=0.7, color='k', linestyle='--', label='Drift to recover')

# ax1.set_xticks([])
ax1.set_title(f'{len(n_simulations_list)} sets of {number_of_simulations_perset} simulations of length {steps_number}')

ax1.set_ylim([0,None])
ax1.set_xlabel('Recovered drift')
ax1.set_ylabel('Probability density')

ax1.legend()

<matplotlib.legend.Legend at 0x7fe80448bdd0>

In [12]:
fig=plt.figure(figsize=(1, 4), dpi=300, constrained_layout=False, facecolor='w')
gs = fig.add_gridspec(1, 1)
row = gs[:].subgridspec(1, 1, hspace=0.5)

ax1 = plt.subplot(row[0,0])

histo = np.histogram(recovered_delta_list, bins=np.linspace(0.01,0.2,51), density=False)
ax1.stairs(histo[0], histo[1], alpha=0.5, fill=True)
ax1.axvline(0.05, linewidth=0.7, color='k', linestyle='--', label='Drift to recover')

# ax1.set_xticks([])
ax1.set_title(f'{len(n_simulations_list)} sets of {number_of_simulations_perset} simulations of length {steps_number}')

ax1.set_ylim([0,None])
ax1.set_xlabel('Recovered drift')
ax1.set_ylabel('Number of simulations sets')

Text(0, 0.5, 'Number of simulations sets')

In [13]:
fig=plt.figure(figsize=(4, 4), dpi=300, constrained_layout=False, facecolor='w')
gs = fig.add_gridspec(1, 1)
row = gs[:].subgridspec(1, 1, hspace=0.5)


ax1 = plt.subplot(row[0,0])

index_simu_to_plot = 6

ax1.plot(average_proba_sequences_hmm[index_simu_to_plot], label='Average probability infered by HMM', color='black')

delta_range_to_plot = [(i,delta_range[i]) for i in range(0, len(delta_range), 42)]

for i, d in delta_range_to_plot:

    ax1.plot(test_average_probability_sequences[i], label=f'Average probability of 5000 simu. with drift={np.round(d,4)}', alpha=0.5, linestyle='--')

# ax1.axvline(0.05, linewidth=0.7, color='k', linestyle='--', label='Drift to recover')

# ax1.set_xticks([])
ax1.set_title(f'Set of {number_of_simulations_perset} simulations of length {steps_number}\n True drift: {delta}, Recovered drift: {np.round(recovered_delta_list[index_simu_to_plot],4)}')

ax1.set_xlabel('Step')
ax1.set_ylabel('Average probability to do CW')

ax1.legend()

<matplotlib.legend.Legend at 0x7fe801db4470>