In [1]:
import numpy as np
from jax import jit
import jax
import jax.numpy as jnp
# from jax.scipy.linalg import solve

import Surprise_statistics as sup # import surprise code
import PPD # import ppd code
import surprise_gaussian as supg # gaussian surprise code

Para calcular a divergência de Kullback-Leibler (KLD) entre duas distribuições $p_2$ e $p_1$, $D(p_2|p_1)$, sua distribuição e consequentemente seu valor esperado precisamos efetivamente nos importarmos com duas distribuições (i.e., se considerarmos uma priori plana, infinita e não informativa), $p(\theta|D_1)$, a posteriori do observável 1 e a likelihood do observável 2, i.e., $\mathcal{L}(D_2|\Theta)$.

Como estamos interessados em encontrar a Surpresa entre as duas distribuições, em um outro passo vamos precisar de calcular a posteriori $p(\theta|D_2)$.

Vamos considerar o seguinte caso, um vetor de dados $D_i = F(\theta)$ onde dim($\theta$) = 2 e dim(D$_i$) = 7. $F(\theta)$ é uma função linear que mapeia os parâmetros $\theta$ nos dados $D_i$. Considere as seguintes informações, onde M é uma matriz 7x1:

\begin{align}
    p(\theta|D_1) &= \mathcal{N}(\theta_1, \Sigma_1)\\
    \mathcal{L}(D_2|\Theta) &= \mathcal{N}(F(\theta), \mathcal{C})\\
    F(\theta) &= F_0+M\theta 
\end{align}

In [2]:
# Set the seed for reproducibility
np.random.seed(42)

# Constants and dimensions
theta_dim = 2
D_dim = 7

# Gera um vetor de parametros \theta_1 e theta_2
theta_fid_1 = np.array([0.4, 0.1])
theta_fid_2 = np.array([1.3, 0.2])


# generate F(theta) related quantities
F0 = np.random.rand(D_dim)  # 7x1 vector
M = np.random.rand(D_dim, theta_dim)  # 7x2 matrix
# generate covariance matrix for likelihood L2
C = np.random.rand(D_dim, D_dim)  # 7x7 matrix
# generate covariance matrix for posterior 1
Sigma_1 = np.random.rand(theta_dim, theta_dim)  # 2x2 matrix

# Ensure C and Sigma are symmetric and positive-definite
C = np.dot(C, C.T)
Sigma_1 = np.dot(Sigma_1, Sigma_1.T)
invS1 = np.linalg.inv(Sigma_1)

# covariance matrix of posterior 2
# Assuming Gaussianity and a flat prior, we can also derive the equations for Sigma_2. 
# See https://arxiv.org/abs/1402.3593 eq. A17.
invC  = np.linalg.inv(C)
invS2 = np.dot(M.T, np.dot(invC, M))
Sigma_2 = np.linalg.inv(invS2)

# Define the linear function F(theta)
def F(theta):
    return F0 + np.dot(M, theta)

# fiducial data vectors
################# hey there, maybe you should think some more here about putting noise to this vector!
D1_fid = F(theta_fid_1) 
D2_fid = F(theta_fid_2)

print("Fiducial parameters 1 = ", theta_fid_1)
print("Fiducial parameters 2 = ", theta_fid_2)

Fiducial parameters 1 =  [0.4 0.1]
Fiducial parameters 2 =  [1.3 0.2]


**Define a multivariate gaussian distribution**

In [3]:


def multivariate_gaussian_pdf(theta, mean, cov):
    """
    Calculate the PDF of a multivariate Gaussian distribution.

    Parameters:
    mean : array-like, shape (n,)
        The mean vector of the Gaussian distribution.
    cov : array-like, shape (n, n)
        The covariance matrix of the Gaussian distribution.
    theta : array-like, shape (n,)
        The parameter vector at which to evaluate the PDF.

    Returns:
    pdf_value : float
        The PDF value of the multivariate Gaussian at the data point.
    """
    k = mean.shape[0]
    diff = theta - mean
    inv_cov = jnp.linalg.inv(cov)
    logL = -0.5 * jnp.dot(diff, jnp.dot(inv_cov, diff))
    norm_factor = jnp.log(jnp.sqrt((2 * jnp.pi) ** k * jnp.linalg.det(cov)))
    logpdf_value = logL - norm_factor
    return logpdf_value

**Define both loglikelihoods that will be used in the main function**

In [4]:
# Define the distributions as callable functions of theta and D
@jit
def logL1(theta):
    return multivariate_gaussian_pdf(theta, theta_fid_1, Sigma_1)
    
def logL2(theta, D2):
    return multivariate_gaussian_pdf(theta, theta_fid_2, Sigma_2)

domain = np.array([[5,-5], [5,-5]])

In [15]:
import joblib

In [33]:
surprise_results = surprise_function_call(logL1, logL2, data2_model_fun=F, covariance_matrix_2=C, domain=domain, Nkld = 5, result_path="DELETAR.hdf5", n_jobs=5, data_2_vec = D2_fid)

# surprise_results = surprise_function_call(logL1, logL2, data2_model_fun=F, covariance_matrix_2=C, domain=domain, Nkld=5, result_path=".", data_1_name=

----------------------------------------------------------------------
Loading posterior data
----------------------------------------------------------------------
----------------------------------------------------------------------
Loading failed!
----------------------------------------------------------------------
Running Nested Sampling...
----------------------------------------------------------------------


  cur_live_logl[not_finite] = _LOWL_VAL
18122it [00:13, 1355.57it/s, batch: 18 | bound: 3 | nc: 1 | ncall: 89893 | eff(%): 20.021 | loglstar: -8.106 < -1.374 < -1.857 | logz: -4.609 +/-  0.038 | stop:  0.970]         

Run completed sucessfully.
Saving  None
Error while saving
Done!
Evaluating theory from sample distribution p1





Generating theory vectors:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling the Posterior Predictive Distribution...


Sampling PPD:   0%|          | 0/5 [00:00<?, ?it/s]

An NVIDIA GPU may be present on this machine, but a CUDA-enabled jaxlib is not installed. Falling back to cpu.
An NVIDIA GPU may be present on this machine, but a CUDA-enabled jaxlib is not installed. Falling back to cpu.
An NVIDIA GPU may be present on this machine, but a CUDA-enabled jaxlib is not installed. Falling back to cpu.
An NVIDIA GPU may be present on this machine, but a CUDA-enabled jaxlib is not installed. Falling back to cpu.
An NVIDIA GPU may be present on this machine, but a CUDA-enabled jaxlib is not installed. Falling back to cpu.


Iterating over the PPD:   0%|          | 0/5 [00:00<?, ?it/s]

  cur_live_logl[not_finite] = _LOWL_VAL
25025it [01:11, 351.40it/s, batch: 26 | bound: 4 | nc: 1 | ncall: 164027 | eff(%): 15.113 | loglstar: -8.452 < -0.337 < -0.802 | logz: -4.631 +/-  0.040 | stop:  0.971]          


Processing batches:   0%|          | 0/26 [00:00<?, ?it/s]

Processing batches:   0%|          | 0/26 [00:00<?, ?it/s]

p-value = 20.0 %
S = 0.01 nats
<KLD> = 2.21 nats
KLD = 2.22 nats
Saving results
----------------------------------------------------------------------
Failed saving results


In [39]:
def save_dict_to_hdf5(file_name, data_dict):
    """
    Save the contents of a dictionary to an HDF5 file, handling different data types.

    Parameters:
    - file_name: The name of the HDF5 file to be created.
    - data_dict: The dictionary to save, where keys are dataset names and values are the data.
    """
    with h5py.File(file_name, 'w') as hdf:
        for key, value in data_dict.items():
            # Check the type of the value to handle it appropriately
            if isinstance(value, np.ndarray):
                # Save NumPy arrays directly as datasets
                hdf.create_dataset(key, data=value)
            elif isinstance(value, jnp.ndarray):
                # Convert JAX array to NumPy array and save it
                hdf.create_dataset(key, data=np.array(value))
            elif isinstance(value, (float, int, np.float32, np.float64, np.int32, np.int64)):
                # Save floats or integers as attributes of a dataset
                # If the value is a NumPy scalar, convert to a native Python type
                if hasattr(value, 'item'):
                    value = value.item()
                dset = hdf.create_dataset(key, data=[])
                dset.attrs['value'] = value
            else:
                raise TypeError(f"Unsupported data type for key '{key}': {type(value)}")
                
save_dict_to_hdf5("TESTARDELETAR.hdf5", surprise_results)

In [16]:
surprise_results

{'S': Array(-0.08835649, dtype=float32),
 'S_dist': array([ 0.02267694, -0.00136876,  0.03353047, -0.03103805, -0.02380133],
       dtype=float32),
 'kld21': Array(2.0961416, dtype=float32),
 'kld_exp': 2.184498,
 'kld_dist': array([2.207175 , 2.1831293, 2.2180285, 2.15346  , 2.1606967],
       dtype=float32),
 'p_value': 1.0}

# Funções para add no código final

In [18]:
def find_pval(Sdist, S, verbose = 0):
    """
    Calculate the p-value from a distribution of surprise values given an observed surprise.

    Parameters:
    - Sdist (ndarray): An array of surprise values from simulations or a distribution.
    - S (float): The observed surprise value for which the p-value is to be calculated.

    Returns:
    - pval (float): The calculated p-value indicating the probability of observing a surprise at least as extreme as S.
    """
    pval = Sdist[Sdist > S].size/Sdist.size
    if verbose>0:
        print("p-value = {:.1f} %".format(100*pval))
    return pval

from scipy import special
def sigma_discordance(p_value):
    return np.sqrt(2)*special.erfinv(1-p_value)

In [40]:
import h5py

def save_dict_to_hdf5(file_name, data_dict):
    """
    Save the contents of a dictionary to an HDF5 file, handling different data types.

    Parameters:
    - file_name: The name of the HDF5 file to be created.
    - data_dict: The dictionary to save, where keys are dataset names and values are the data.
    """
    with h5py.File(file_name, 'w') as hdf:
        for key, value in data_dict.items():
            # Check the type of the value to handle it appropriately
            if isinstance(value, np.ndarray):
                # Save NumPy arrays directly as datasets
                hdf.create_dataset(key, data=value)
            elif isinstance(value, jnp.ndarray):
                # Convert JAX array to NumPy array and save it
                hdf.create_dataset(key, data=np.array(value))
            elif isinstance(value, (float, int, np.float32, np.float64, np.int32, np.int64)):
                # Save floats or integers as attributes of a dataset
                # If the value is a NumPy scalar, convert to a native Python type
                if hasattr(value, 'item'):
                    value = value.item()
                dset = hdf.create_dataset(key, data=[])
                dset.attrs['value'] = value
            else:
                raise TypeError(f"Unsupported data type for key '{key}': {type(value)}")
            
def load_create_NS_file(data_1_name, logL1, ndim, domain,  n_effective=15000, dlogz=0.5):
    print(70*'-')
    print("Loading posterior data")
    print(70*'-')
    try: 
        ## loading pre-made Nested Sampling run
        res_1 = joblib.load(data_1_name)
        print("Data loaded sucessfully!")
    except:
        print(70*'-')
        print("Loading failed!")
        print(70*'-')

        print("Running Nested Sampling...")
        print(70*'-')
        # any nested sampling routine will be fine, as long as it has the method .samples_equal() or equivalent, to obtain equally weighted samples. 
        res_1 = sup.run_nested_sampling(logL1, ndim, domain=domain, print_progress=True, n_effective=n_effective, dlogz=dlogz)
        print("Run completed sucessfully.")
        
        # if data_1_name is not None:
        try:
            print("Saving ", data_1_name)
            joblib.dump(res_1, data_1_name)
        except:
            # print("Error while saving")
            pass
    return res_1
    
def surprise_function_call(logL1, logL2, data2_model_fun, covariance_matrix_2, domain, Nkld, 
                           result_path, data_1_name = None, n_effective= 15000, n_jobs=-1, data_2_vec = None, data_2_name = None, verbose=1):
    # logL1 --> a callable function of theta (parameter)
    # logL2 --> a callable function of theta (parameter) and D (data).
    # data_2_vec is yet to be added. If provided then function should also compute KLD(p2|p1) 
    # and return the surprise statistic value  
    ndim = domain.shape[0]

    if verbose>0:
        print_progress = True
    else:
        print_progress = False

    ############ loading/creating mock 1 ############
    res_1 = load_create_NS_file(data_1_name, logL1, ndim, domain)
    
    # This method is a particularity of Dynesty, but can be easily implemented for any other NS package.
    # Equal weighted samples 
    eq_samples_1 = res_1.samples_equal()
    print("Done!")
    
    ############ create posterior predictive distribution ############
    # parameter space samples of posterior distribution p(theta|D1)
    th1_samples = sup.sampler(eq_samples_1, Nkld) # we take a subset of samples with size Nkld
    PPD_chain = PPD.create_ppd_chain(th1_samples=th1_samples, data_model_fun=data2_model_fun, cov_matrix = covariance_matrix_2, sample_size=1, n_jobs=n_jobs)

    kld_samples = sup.main_parallel(PPD_chain, logL2, res_1, logP_1=logL1,  domain=domain, mode='MCMC', n_jobs=n_jobs, 
              result_path=None, ignore_warnings=True, 
              n_effective=n_effective)

    kld_array = np.array(kld_samples)
    kld_exp = kld_array.mean()
    S_dist = kld_array - kld_exp

    # if data 2 is provided
    if data_2_vec is not None:
        logP2 = lambda theta : logL2(theta, data_2_vec)

        # load or create data_2 posterior chain with NS
        if data_2_name is not None:
            res_2 = load_create_NS_file(data_2_name, logP2, ndim, domain)
        else:
            res_2 = sup.run_nested_sampling(logP2, ndim, domain=domain, print_progress=True)
        
        kld_value = sup.compute_KLD_MCMC(res_2, logP2, res_1, logL1, domain = domain)
        S = kld_value - kld_exp
        p_value = find_pval(S_dist, S, verbose = verbose)
        if verbose>0:
            print("S = {:.2f} nats".format(S))
            print("<KLD> = {:.2f} nats".format(kld_exp))
            print("KLD = {:.2f} nats".format(kld_value))
        results_dic = {"S" : S, "S_dist": S_dist, "kld21" : kld_value, "kld_exp":kld_exp, "kld_dist":kld_array, "p_value":p_value}
    else:
        if verbose>0:
            print("<KLD> = {:.2f} nats".format(kld_exp))
        results_dic = {"S_dist": S_dist, "kld_exp":kld_exp, "kld_dist":kld_array}
    if result_path is not None:
        try:
            print("Saving results")
            print(70*"-")
            save_dict_to_hdf5(result_path, results_dic)
            print("Results saved sucessfully!")
        except:
            print("Failed saving results")
            pass
    return results_dic

# Testes

In [7]:
import numpy as np

In [8]:
# Parameters for the uniform distribution of wb
# Calculate the range for the uniform distribution
lower_bound = DESI.mu_wb - 5 * DESI.sigma_wb
upper_bound = DESI.mu_wb + 5 * DESI.sigma_wb

# Generate uniform samples for the new dimension wb
wb_values = np.random.uniform(lower_bound, upper_bound, size=(eq_samples_1.shape[0], 1))

# Concatenate the new dimension to the existing samples
eq_samples_with_wb = np.concatenate((eq_samples_1, wb_values), axis=1)

array([[0.90068067, 0.52135128, 0.90928984],
       [0.69997009, 0.59434248, 0.04956948],
       [0.0891736 , 0.8285129 , 0.59819861],
       [0.53530764, 0.24321398, 0.18703713],
       [0.61414684, 0.10966708, 0.20365686],
       [0.54760663, 0.48011745, 0.86623483],
       [0.13129248, 0.37410539, 0.43129792],
       [0.69791098, 0.15303223, 0.93526071],
       [0.65757065, 0.55555849, 0.35832369],
       [0.84353273, 0.20610911, 0.21355773]])