In [41]:
import networkx as nx
import numpy as np
from scipy.stats import norm, gaussian_kde
from scipy.integrate import quad
import pymc as pm


In [42]:
def preprocess_data(self):
        data = {}
        for var in self.unit_vars:
            for i in range(len(self.sizes)):
                data[var+str(i)] = self.data[var+str(i)]
        for var in self.subunit_vars:
            for i in range(len(self.sizes)):
                s = 0
                for j in range(self.sizes[i]):
                    s += self.data['_'+var+str(i)+'_'+str(j)]
                data[var+str(i)] = s / self.sizes[i]
        return data

In [43]:

class HierarchicalBayesSampler:
    def __init__(self, graph, data, unit_vars, subunit_vars, sizes):
        self.graph = nx.DiGraph(graph)
        self.data = data
        self.unit_vars = unit_vars
        self.subunit_vars = subunit_vars
        self.sizes = sizes
        self.processed_data = self.preprocess_data()
        
    def preprocess_data(self):
        data = {}
        for var in self.unit_vars:
            for i in range(len(self.sizes)):
                data[var+str(i)] = self.data[var+str(i)]
        for var in self.subunit_vars:
            for i in range(len(self.sizes)):
                s = 0
                for j in range(self.sizes[i]):
                    s += self.data['_'+var+str(i)+'_'+str(j)]
                data[var+str(i)] = s / self.sizes[i]
        return data

    def generate(self):  # num_samples est maintenant 2
        num_samples = self.sizes[0]
        with pm.Model() as model:
            # Hyperpriors
            mu = {var: pm.Normal(f'mu_{var}', mu=0, sigma=1) for var in self.unit_vars + self.subunit_vars}
            sigma = {var: pm.HalfNormal(f'sigma_{var}', sigma=1) for var in self.unit_vars + self.subunit_vars}

            # Priors pour les variables de niveau supérieur
            variables = {}
            for var in self.unit_vars:
                variables[var] = pm.Normal(var, mu=mu[var], sigma=sigma[var], shape=len(self.sizes))

            # Priors pour les variables de niveau inférieur (sous-unités)
            subunit_variables = {}
            for var in self.subunit_vars:
                for i in range(len(self.sizes)):
                    subunit_variables[f'{var}{i}'] = pm.Normal(f'{var}{i}', mu=mu[var], sigma=sigma[var], shape=self.sizes[i])

            # Likelihood pour les variables de niveau supérieur
            for var in self.unit_vars:
                pm.Normal(f'obs_{var}', mu=variables[var], sigma=1, 
                        observed=np.array([self.processed_data[f'{var}{i}'] for i in range(len(self.sizes))]))

            # Likelihood pour les variables de niveau inférieur (sous-unités)
            for var in self.subunit_vars:
                for i in range(len(self.sizes)):
                    pm.Normal(f'obs_{var}{i}', 
                            mu=subunit_variables[f'{var}{i}'], 
                            sigma=1, 
                            observed=np.array([self.data[f'_{var}{i}_{j}'] for j in range(self.sizes[i])]))

            # Sampling
            trace = pm.sample(num_samples, return_inferencedata=False)

        # Extraction des échantillons
        generated_data = {}
        for var in self.unit_vars:
            generated_data[var] = trace[var][0]  # Prend le premier (et seul) échantillon

        # Extraction et génération des sous-unités
        for var in self.subunit_vars:
            for i in range(len(self.sizes)):
                generated_data[f'{var}{i}'] = trace[f'{var}{i}'][0]  # Prend le premier (et seul) échantillon

        return generated_data
    
    def generate_cond(self):
        num_samples = self.sizes[0]
        with pm.Model() as model:
            # Hyperpriors pour les autres variables
            mu = {var: pm.Normal(f'mu_{var}', mu=0, sigma=1) for var in self.unit_vars + ['d']}
            sigma = {var: pm.HalfNormal(f'sigma_{var}', sigma=1) for var in self.unit_vars + ['d']}

            # Priors pour les variables de niveau supérieur
            variables = {}
            for var in self.unit_vars:
                variables[var] = pm.Normal(var, mu=mu[var], sigma=sigma[var], shape=len(self.sizes))

            # Prior fixe pour b (loi normale standard)
            b_fixed = pm.Normal('b_fixed', mu=0, sigma=1, shape=(len(self.sizes), max(self.sizes)))

            # Priors pour les autres variables de niveau inférieur (d)
            subunit_variables = {}
            for i in range(len(self.sizes)):
                subunit_variables[f'd{i}'] = pm.Normal(f'd{i}', mu=mu['d'], sigma=sigma['d'], shape=self.sizes[i])

            # Likelihood pour les variables de niveau supérieur
            for var in self.unit_vars:
                pm.Normal(f'obs_{var}', mu=variables[var], sigma=1, 
                        observed=np.array([self.processed_data[f'{var}{i}'] for i in range(len(self.sizes))]))

            # Likelihood pour b (utilisant la loi fixe)
            for i in range(len(self.sizes)):
                pm.Normal(f'obs_b{i}', mu=0, sigma=1,  # Loi normale standard
                        observed=np.array([self.data[f'_b{i}_{j}'] for j in range(self.sizes[i])]))

            # Likelihood pour d
            for i in range(len(self.sizes)):
                pm.Normal(f'obs_d{i}', 
                        mu=subunit_variables[f'd{i}'], 
                        sigma=1, 
                        observed=np.array([self.data[f'_d{i}_{j}'] for j in range(self.sizes[i])]))

            # Sampling
            trace = pm.sample(num_samples, return_inferencedata=False)

        # Extraction des échantillons
        generated_data = {}
        for var in self.unit_vars:
            generated_data[var] = trace[var][0]

        # Génération de b (toujours à partir d'une loi normale standard)
        for i in range(len(self.sizes)):
            generated_data[f'b{i}'] = np.random.normal(0, 1, size=self.sizes[i])

        # Extraction de d
        for i in range(len(self.sizes)):
            generated_data[f'd{i}'] = trace[f'd{i}'][0]

        return generated_data



def kl_divergence(p, q):
    def integrand(x):
        px = p(x)
        qx = q(x)
        epsilon = 1e-10
        return np.where((px > 0) & (qx > 0),
                        px * (np.log(px + epsilon) - np.log(qx + epsilon)),
                        0)
    
    result, _ = quad(integrand, -np.inf, np.inf, limit=1000)
    return result


In [44]:
n_schools = 10
n_students = 50
# Example usage
graph = [('a', '_b'), ('a', 'c'), ('_b', 'c'), ('c', '_d'), ('_b', '_d'), ('_d', 'e'), ('c', 'e')]
data = {
    **{f'a{i}': i + 1 for i in range(n_schools)},
    **{f'c{i}': i + 5 for i in range(n_schools)},
    **{f'e{i}': i + 11 for i in range(n_schools)}
}

# Generate data for b and d
for i in range(n_schools):
    for j in range(n_students):
        data[f'_b{i}_{j}'] = 2*i + j % 2 + 1  # This creates a slight variation between students
        data[f'_d{i}_{j}'] = 2*i + j % 2 + 7  # This creates a slight variation between students

unit_vars = ['a', 'c', 'e']
subunit_vars = ['b', 'd']
sizes = [n_students] * n_schools


sampling

In [None]:

sampler = HierarchicalBayesSampler(graph, data, unit_vars, subunit_vars, sizes)
generated_data = sampler.generate()

Only 50 samples per chain. Reliable r-hat and ESS diagnostics require longer chains for accurate estimate.
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Sequential sampling (2 chains in 1 job)
NUTS: [mu_a, mu_c, mu_e, mu_b, mu_d, sigma_a, sigma_c, sigma_e, sigma_b, sigma_d, a, c, e, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9, d0, d1, d2, d3, d4, d5, d6, d7, d8, d9]


Output()

In [None]:
print( generated_data)


In [None]:

generated_data_cond = sampler.generate_cond()

In [None]:
print(generated_data_cond)

In [None]:
# Compute KL divergence for 'e' variable
e_generated = generated_data['e']
e_original = np.array([sampler.processed_data[f'e{i}'] for i in range(len(sizes))])

e_generated_cond = generated_data_cond['e']




In [None]:
kde_generated_cond = gaussian_kde(e_generated_cond)
kde_generated = gaussian_kde(e_generated)
kde_original = gaussian_kde(e_original)

In [40]:
kl_div = kl_divergence(kde_original, kde_generated)
print(f"KL divergence between original 'e' and generated 'e': {kl_div}")
kl_div_cond = kl_divergence(kde_original, kde_generated_cond)
print(f"KL divergence between original 'e' and generated 'e' with conditionning: {kl_div_cond}")


KL divergence between original 'e' and generated 'e': 0.047556381868144024
KL divergence between original 'e' and generated 'e' with conditionning: 0.011056412213653687
