In [None]:
from matplotlib import pyplot as plt
import matplotlib
import cmath
import sympy as sym
from sympy import exp, symbols, lambdify
import math
from pytictoc import TicToc
import numba
from numba import jit

In [None]:
import os
import sys
import numpy as np
import scipy as scp
import pandas as pd

In [None]:
import torch
import sbi
import sbi.inference
from sbi.inference import SNPE, prepare_for_sbi ,simulate_for_sbi, DirectPosterior
from sbi.inference.base import infer
from sbi.analysis import ActiveSubspace, pairplot
import sbi.utils as utils
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast

# load the data features from simulation

In [None]:
data_features=np.load("NEW_data_features.npy")
c_dopa_data_features=np.load("c_dopa.npy")
print(f"The minimum and maximum values of c_dopa are:{np.min(c_dopa_data_features),np.max(c_dopa_data_features)}")

In [None]:
combined_matrix = np.vstack((data_features, c_dopa_data_features))
sort_indices = np.argsort(combined_matrix[-1, :])
sorted_matrix = combined_matrix[:, sort_indices]
all_data_features_sorted=sorted_matrix[:-1,:]
c_sorted=sorted_matrix[-1,:]
#check for the dimension:
print(f"The number of ckk dopa values is {all_data_features_sorted.shape[1]} and the total features are {all_data_features_sorted.shape[0]}")

In [None]:
titles=['mean_ATM','std_ATM', 'median_ATM', 'sum_ATM', 'skw_ATM', 'kurt_ATM',
        'max_eigen_val_ATM','trace_ATM', 'std_diag_ATM','CV_ATM','mom1_ATM','mean_over_kurtosis','kurt_diag','norm_ATM',
        'harmonic_mean_ATM','entropy_ATM','mean_FC','mean_kurtosis','mean_co_kurtosis','mean_max_co_kurtosis',
        'mean_dev_co_kurtosis','mean_covariance','var_cross_correlation','mean_corr_entropy','var_corr_entropy']

In [None]:
w=120
mask=np.ones((1,w))/w
mask=mask[0,:]
x_inference=[]
x_inference_plus=[]
x_inference_hamm=[]

wind=120
w_hamm=120
theta_inference=ckk_sorted[(wind-1)//2 : -(wind-1)//2] 
theta_inference_hamm=ckk_sorted[(w_hamm-1)//2 : -(w_hamm-1)//2] 
theta_inference_2 = theta_inference[(wind-1)//2 : -(wind-1)//2]
for i in range(all_data_features_sorted.shape[0]):
    convolved_data=np.convolve(all_data_features_sorted[i,:],mask,'valid')
    convolved_data_plus=np.convolve(convolved_data,mask,'valid')
    x_inference.append(convolved_data) 
    x_inference_plus.append(convolved_data_plus)
    
    w = np.hanning(w_hamm) / np.sum(np.hanning(w_hamm))
    y=scp.signal.fftconvolve(all_data_features_sorted[i,:], w, mode='valid')
    x_inference_hamm.append(y) 

In [None]:
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt

In [None]:
x_inference_plus=np.array(x_inference_plus)
x_inference_plus.shape

In [None]:
# Initialize variables to store models and their parameters
intercepts = []
coefficients = []
x_projected=[]

x_inference_plus=np.array(x_inference_plus)
for feature_index in range(x_inference_plus.shape[0]):  
    y = x_inference_plus[feature_index, :]  
    model = LinearRegression()
    model.fit(theta_inference_2.reshape(-1, 1), y)
    prediction=model.predict(theta_inference_2.reshape(-1, 1))
    x_projected.append(prediction)

    intercepts.append(model.intercept_)
    coefficients.append(model.coef_)

In [None]:
x_projected=np.array(x_projected)
x_projected.shape
theta_inference_2=np.array(theta_inference_2)

In [None]:
#%matplotlib widget
fig, axs = plt.subplots(6,5, figsize=(25, 30))

for i, ax in enumerate(axs.flat[:min(len(all_data_features_sorted), len(titles))]):
    ax.plot(theta_inference_2,x_inference_plus[i],'o',markersize=1)      # second convolution   
    ax.plot(theta_inference_2, x_projected[i,:], color='red', label='Linear Regression')
    ax.set_title(titles[i],fontsize=14)
    ax.legend(('data','Linear Regression'))
    ax.set_xlabel('$w_{dopa}$',fontsize=14)

plt.subplots_adjust(hspace=0.4)
plt.subplots_adjust(wspace=0.2)

plt.tight_layout
plt.show()

# Bayesian SBI

To make Bayesian SBI, we need three elements:
Simulator that takes parameters as input and returns data features as output.
Prior, which defines the plausible range of ckk_Dopa based on the background knowledge, to draw random samples for parameters, as the input to simulator.
Inference step on observed data, which we train a deep neural density estimator on data features and approximate posterior for the set of observed data.

# Prior over model parameters

In [None]:
#set the model key parameters that we aim to infer 
prior_min = [0.9]   # min value used for simulations
prior_max = [6]   # max value used for simulations
prior = utils.torchutils.BoxUniform(low=torch.as_tensor(prior_min), high=torch.as_tensor(prior_max))
num_params_prior=len(prior_min)
print('number of params :', num_params_prior)

In [None]:
x_ff = np.array(x, dtype='float32').T
x_torch = torch.as_tensor(x_ff)
theta=np.array(theta).reshape(-1,1)    
theta_ff = np.array(theta, dtype='float32').T
theta_torch = torch.as_tensor(theta_ff).view(-1, 1)
print( 'theta shape:',theta_torch.shape,flush=True)
print('features shape:', x_torch.shape,flush=True)

# inference

Now we run the inferece step. To do so, we run a budget of random simulations; for each the data feature is calculated, and then an algorithm such as SNPE lerans the relationship between the data features and parameters of an approximated posterior.

In [None]:
def myinfer_SNPE(prior, theta, x):
    inference = SNPE(prior)
    _ = inference.append_simulations(theta, x).train()
    posterior = inference.build_posterior()
    return posterior

# Posterior sampling

In [None]:
n=np.random.randint(0, 100000000)
np.random.seed(n)
torch.manual_seed(n)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(n)

In [None]:
start_time = time.time()
idx_features =[0,3,4,5,9,10,13,15,16,17] #example of index of features to take into account

posterior = myinfer_SNPE(prior, theta_torch, x_torch[:,idx_features])

print ("-"*60)
print("--- Training neural network took: %s seconds ---" % (time.time() - start_time))

# Import empirical data

In [None]:
# import here the features computed from your empirical data
statistics_observed_ON=np.load("statistics_observed_ON.npy")
statistics_observed_OFF=np.load("statistics_observed_OFF.npy")

In [None]:
statistics_observed_ON=statistics_observed_ON[idx_features,:]
statistics_observed_OFF=statistics_observed_OFF[idx_features,:]

In [None]:
%matplotlib widget
import matplotlib.pyplot as plt
num_features = statistics_observed_ON.shape[0]
num_subjects = statistics_observed_ON.shape[1]

num_rows = num_features // 2 + (1 if num_features % 2 else 0)  
num_cols = 2 if num_features > 2 else num_features  
fig, axs = plt.subplots(num_rows, num_cols, figsize=(8, 3 * num_rows))

axs = axs.flatten()
# Iterate through each feature
for j in range(num_features):
    data_ON = statistics_observed_ON[j, :]
    data_OFF = statistics_observed_OFF[j, :]
    axs[j].plot(data_ON, 'o', label='ON', color='blue')  
    axs[j].plot(data_ON, alpha=0.5, color='blue')

    axs[j].plot(data_OFF,'*',label='OFF',color='orange')
    axs[j].plot(data_OFF,alpha=0.5,color='orange')
    axs[j].set_title(titles[j])
    axs[j].legend()
    axs[j].set_xticks(np.arange(num_subjects))
    axs[j].set_xticklabels(np.arange(1, num_subjects + 1))
    
for k in range(j + 1, len(axs)):
    axs[k].axis('off')
plt.subplots_adjust(hspace=0.5)
plt.tight_layout

# training network

In [None]:
num_samples=100000
num_patients=10
start_time = time.time()
posterior_samples_OFF = np.zeros((num_patients, num_samples,1))
posterior_samples_ON=np.zeros((num_patients,num_samples,1))

for i in range(num_patients):
    posterior_samples_OFF[i] = posterior.sample((num_samples,), statistics_observed_OFF[idx_features, i]).numpy()
    posterior_samples_ON[i] = posterior.sample((num_samples,), statistics_observed_ON[idx_features, i]).numpy()

print ("-"*60)
print("--- posterior sampling took: %s seconds ---" % (time.time() - start_time))

In [None]:
#%matplotlib widget
import matplotlib.pyplot as plt
import numpy as np

num_patients = len(posterior_samples_OFF)  # Assuming posterior_samples_OFF is an array with a shape like (num_patients, num_samples, 1)
positions = np.arange(num_patients) + 1

plt.figure(figsize=(7, 5))

for i in range(num_patients):
    # Plot OFF condition
    vplot_off = plt.violinplot(posterior_samples_OFF[i, :, 0], positions=[positions[i] - 0], widths=0.6, showmeans=True, showextrema=True)
    for pc in vplot_off['bodies']:
        pc.set_facecolor('orange')
        pc.set_edgecolor('orange')
    plt.setp(vplot_off['cbars'], color='orange')
    plt.setp(vplot_off['cmeans'], color='orange')
    plt.setp(vplot_off['cmins'], color='orange')
    plt.setp(vplot_off['cmaxes'], color='orange')

    # Plot ON condition
    vplot_on = plt.violinplot(posterior_samples_ON[i, :, 0], positions=[positions[i] + 0], widths=0.6, showmeans=True, showextrema=True)
    for pc in vplot_on['bodies']:
        pc.set_facecolor('blue')
        pc.set_edgecolor('blue')
    plt.setp(vplot_on['cbars'], color='blue')
    plt.setp(vplot_on['cmeans'], color='blue')
    plt.setp(vplot_on['cmins'], color='blue')
    plt.setp(vplot_on['cmaxes'], color='blue')

# Create invisible lines for the legend
plt.plot([], [], color='blue', label='ON')
plt.plot([], [], color='orange', label='OFF')

plt.ylabel(r"$\tilde{w}_{\mathrm{dopa}}$", fontsize=14)  
plt.xlabel("#Patient", fontsize=10)
plt.xticks(positions, np.arange(1, num_patients + 1), fontsize=10)
plt.yticks(fontsize=10)
plt.legend(loc='upper left')
plt.tight_layout()
plt.show()

# statistical analysis

In [None]:
b=0.9 #lower bound
a=6 #upper value
prior_std =np.sqrt((1/12)*(b-a)**2)

In [None]:
def shrinkage(prior_std, post_std):
    return 1-(post_std / prior_std)**2

In [None]:
from scipy import stats
from scipy.stats import ttest_ind
num_patients=10
posterior_samples_OFF=np.squeeze(posterior_samples_OFF)
posterior_samples_ON=np.squeeze(posterior_samples_ON)
wasserstein_distances=np.zeros(num_patients)
t_stat=np.zeros(num_patients)
p_value=np.zeros(num_patients)
p_value_ttest=np.zeros(num_patients)
shrink_ON=np.zeros(num_patients)
shrink_OFF=np.zeros(num_patients)

for i in range(num_patients):
    wasserstein_distances[i] = stats.wasserstein_distance(posterior_samples_OFF[i], posterior_samples_ON[i])
    _, p_value[i] = stats.ks_2samp(posterior_samples_OFF[i], posterior_samples_ON[i])
    _, p_value_ttest[i] = ttest_ind(posterior_samples_OFF[i], posterior_samples_ON[i])

    shrink_ON[i]=shrinkage(prior_std,np.std(posterior_samples_ON[i]))    
    shrink_OFF[i]=shrinkage(prior_std,np.std(posterior_samples_OFF[i]))    

In [None]:
plt.figure(figsize=(6, 4))  # Increased figure size for better clarity
patient_indices=([1,2,3,4,5,6,7,8,9,10])
# Subplot for Shrinkage values
plt.subplot(1, 2, 1)  # 2 rows, 1 column, first subplot
plt.scatter(patient_indices, shrink_ON, color='blue', label='ON')
plt.plot(patient_indices, shrink_ON, color='blue',alpha=0.2)
plt.plot(patient_indices, shrink_OFF, color='orange',alpha=0.2)

plt.scatter(patient_indices, shrink_OFF, color='orange', label='OFF')
plt.xlabel('#Patient',fontsize=12)
plt.title('Posterior Shrinkage',fontsize=12)
plt.ylim([0, 1.1])  # Adjusted for logarithmic scale
#plt.yscale('log')  # Set y-axis to logarithmic scale
plt.legend()

# Subplot for Wasserstein distances
plt.subplot(1, 2, 2)  # 2 rows, 1 column, second subplot
plt.scatter(patient_indices, wasserstein_distances, color='green')
plt.plot(patient_indices, wasserstein_distances, color='green',alpha=0.2)
plt.xlabel('#Patient',fontsize=12)
plt.title('Wasserstein Distance',fontsize=12)
plt.ylim(0,1)
plt.legend()

plt.tight_layout()  # Adjust layout to prevent overlap
plt.show()