In [1]:
import pandas as pd
import numpy as np
from pgmpy.models import BayesianModel
from pgmpy.models import BayesianNetwork
from pgmpy.inference import VariableElimination
from pgmpy.estimators import MaximumLikelihoodEstimator
from pgmpy.estimators import BayesianEstimator
from pgmpy.estimators import HillClimbSearch
from pgmpy.estimators import BDeuScore, K2Score, BicScore
from pgmpy.metrics import structure_score
from pgmpy.utils import get_example_model
from pgmpy.estimators import ScoreCache
from pgmpy.inference.CausalInference import CausalInference
import networkx as nx
import bnlearn as bn

In [2]:
asia = get_example_model("asia")

In [3]:
asia.edges()

OutEdgeView([('asia', 'tub'), ('tub', 'either'), ('smoke', 'lung'), ('smoke', 'bronc'), ('lung', 'either'), ('bronc', 'dysp'), ('either', 'xray'), ('either', 'dysp')])

In [4]:
data = pd.read_csv("../data/asia10K.csv")
data = data.replace({'yes': 1, 'no': 0})
data.rename(columns={
    'Smoker': 'S',
    'LungCancer': 'L',
    'VisitToAsia': 'A',
    'Tuberculosis': 'T',
    'TuberculosisOrCancer': 'E',
    'X-ray': 'X',
    'Bronchitis': 'B',
    'Dyspnea': 'D'
}, inplace=True)

In [5]:
data.head()

Unnamed: 0,S,L,A,T,E,X,B,D
0,1,1,0,0,1,1,1,1
1,0,0,0,0,0,1,1,1
2,0,0,0,0,0,0,1,1
3,0,0,0,0,0,0,1,1
4,1,0,0,0,0,0,1,0


In [6]:
est = HillClimbSearch(data)
model = est.estimate(scoring_method="bicscore")

  0%|          | 0/1000000 [00:00<?, ?it/s]

In [7]:
model.edges()

OutEdgeView([('S', 'B'), ('L', 'S'), ('T', 'L'), ('T', 'A'), ('E', 'L'), ('E', 'D'), ('E', 'T'), ('X', 'E'), ('B', 'D')])

In [8]:
model = BayesianNetwork(model.edges())

In [9]:
estimator = MaximumLikelihoodEstimator(model, data)
estimator.get_parameters()
model.fit(data, estimator=MaximumLikelihoodEstimator)

In [10]:
# compute P(tub=True)
inference = VariableElimination(model)
prob_tub = inference.query(variables=["T"], evidence={'A': 1, 'X': 1, 'L': 0})
print(prob_tub)

+------+----------+
| T    |   phi(T) |
| T(0) |   0.3980 |
+------+----------+
| T(1) |   0.6020 |
+------+----------+


In [11]:
prob_tub.values[0]

0.39801363612054974

In [12]:
def compute_delta_prob(inference, var, evidence):
    prob_var = inference.query(variables=[var])
    prob_var_new = inference.query(variables=[var], evidence=evidence)
    delta_prob = prob_var_new[var].values[1] - prob_var[var].values[1]
    return delta_prob

def compute_KL_divergence(inference, var, evidence):
    prob_var = inference.query(variables=[var])
    prob_var_new = inference.query(variables=[var], evidence=evidence)
    kl_div = np.sum(np.where(prob_var.values != 0, prob_var.values * np.log(prob_var.values / prob_var_new.values), 0))
    return kl_div

In [13]:
# function that computes the expected reward of observing a new variable
def expected_reward(inference, var, target_var, current_evidence):
    # compute the current probability of the target variable
    prob_target = inference.query(variables=[target_var], evidence=current_evidence)
    expected_reward = 0
    # iterate over the possible values of the variable to be observed
    for value in [0, 1]: # BINARY
        new_evidence = current_evidence.copy()
        new_evidence[var] = value
        prob_var = inference.query(variables=[var], evidence=current_evidence)

        prob_value = prob_var.values[value]
        prob_target_new = inference.query(variables=[target_var], evidence=new_evidence)    

        #reward = prob_target_new.values[1] - prob_target.values[1]
        reward = compute_KL_divergence(inference, target_var, new_evidence)
        print(f"Value: {value}, Prob: {prob_value}, Reward: {reward}")
        expected_reward += prob_value * reward
    return expected_reward

importante: lembrar que KL não é simétrica

In [14]:
random_patient = {'A': 1, 'X': 1}

latent_variables = set(data.columns.to_list()) - set(list(random_patient.keys()))
latent_variables.remove('T')
latent_variables = list(latent_variables)   

In [15]:
for v in latent_variables:
    print(f"Computing expected reward for variable {v}...")
    er = expected_reward(inference, v, 'T', random_patient)
    print(f"Expected reward of observing {v}: {er}")

Computing expected reward for variable B...
Value: 0, Prob: 0.5160106869489858, Reward: 0.5705940692042188
Value: 1, Prob: 0.4839893130510143, Reward: 0.4646537242006102
Expected reward of observing B: 0.5193200744015349
Computing expected reward for variable S...
Value: 0, Prob: 0.37737934277279167, Reward: 0.7767069023259239
Value: 1, Prob: 0.6226206572272083, Reward: 0.3895017752331509
Expected reward of observing S: 0.5356249916136768
Computing expected reward for variable L...
Value: 0, Prob: 0.6621256815406632, Reward: 0.8626372446971315
Value: 1, Prob: 0.33787431845933685, Reward: 0.0781552707073256
Expected reward of observing L: 0.5975809323916907
Computing expected reward for variable E...
Value: 0, Prob: 0.26353505007879646, Reward: inf
Value: 1, Prob: 0.7364649499212035, Reward: 0.8331388144248139
Expected reward of observing E: inf
Computing expected reward for variable D...
Value: 0, Prob: 0.3153379584632169, Reward: 0.3266135460375571
Value: 1, Prob: 0.6846620415367831, 

In [16]:
# compute mutual information between variables
from sklearn.metrics import (
    adjusted_mutual_info_score,
    mutual_info_score,
    normalized_mutual_info_score,
)
def compute_mutual_information(data, var1, var2):
    mi = mutual_info_score(data[var1], data[var2])
    return mi

# test
mi = compute_mutual_information(data, 'S', 'L')
print(f"Mutual Information between A and T: {mi}")

Mutual Information between A and T: 0.024516287465768803


In [17]:
child = pd.read_csv("/home/joao/Desktop/UFMG/PhD/code/explaining-BN/data/child.csv")

In [18]:
child.head()

Unnamed: 0,DuctFlow,HypDistrib,CardiacMixing,HypoxiaInO2,LungParench,CO2,ChestXray,LungFlow,Grunting,Sick,LVH,LVHreport,LowerBodyO2,RUQO2,CO2Report,XrayReport,BirthAsphyxia,Disease,GruntingReport,Age
0,0,0,2,1,0,0,0,0,1,1,1,1,1,1,0,0,1,2,1,1
1,1,0,1,1,0,2,0,0,1,1,1,1,2,1,1,0,1,2,1,2
2,1,0,2,1,1,2,2,2,1,1,1,1,1,1,1,4,1,4,1,0
3,0,0,2,2,0,0,1,1,1,0,1,1,2,0,0,1,1,2,1,0
4,0,0,2,2,2,0,4,1,0,1,1,1,1,0,1,4,1,2,1,1


In [19]:
compute_mutual_information(child, 'ChestXray', 'CO2Report')

0.01768648213229007

In [20]:
compute_mutual_information(child, 'Disease', 'CardiacMixing')

0.5317232685263521

In [21]:
def calculate_conditional_mutual_information(
    model: BayesianNetwork, 
    X: str, 
    Y: str, 
    E: dict
) -> float:
    

    inference = VariableElimination(model)
    
    #  Compute the joint distribution P(X, Y | E)
    # This is done by querying P(X, Y, E_1, E_2, ...) and normalizing it to P(X, Y | E).
    # VariableElimination handles the conditioning and normalization implicitly
    # when evidence is passed to the query function.

    prob_xy_given_e_factor = inference.query(variables=[X, Y], evidence=E, joint=True)
    prob_xy_given_e = prob_xy_given_e_factor.values.flatten()
    
    # Compute the marginal distributions P(X | E) and P(Y | E)
    # Query for the marginal probability P(X | E)
    prob_x_given_e_factor = inference.query(variables=[X], evidence=E)
    prob_x_given_e = prob_x_given_e_factor.values.flatten()
    
    # Query for the marginal probability P(Y | E)
    prob_y_given_e_factor = inference.query(variables=[Y], evidence=E)
    prob_y_given_e = prob_y_given_e_factor.values.flatten()

    #  Reconstruct the product P(X|E) * P(Y|E) for all joint states
    # This requires creating a product distribution of the two marginal factors.
    
    # Get the state names for X and Y to correctly combine the marginals
    x_states = prob_x_given_e_factor.state_names[X]
    y_states = prob_y_given_e_factor.state_names[Y]
    
    prob_x_times_y = np.zeros(len(x_states) * len(y_states))
    k = 0
    for i in range(len(x_states)):
        for j in range(len(y_states)):
            # P(x_i | e) * P(y_j | e)
            prob_x_times_y[k] = prob_x_given_e[i] * prob_y_given_e[j]
            k += 1

    #  Apply the conditional mutual information formula
    
    # Avoid log(0) by adding a small constant epsilon
    epsilon = 1e-12
    
    
    # Compute the ratio inside the logarithm
    ratio = (prob_xy_given_e + epsilon) / (prob_x_times_y + epsilon)
    
    # Calculate the CMI
    cmi = np.sum(prob_xy_given_e * np.log2(ratio))
    
    return cmi

In [22]:
# compute conditional mutual information between X and Y given evidence E, using the entropy definition
# I(X,Y|E) = H(Y|E) - H(Y|X,E)
def calculate_conditional_mutual_information_entropy(
    model: BayesianNetwork, 
    X: str, 
    Y: str, 
    E: dict
) -> float:
    
    inference = VariableElimination(model)
    
    # Compute H(Y|E)
    prob_y_given_e_factor = inference.query(variables=[Y], evidence=E)
    prob_y_given_e = prob_y_given_e_factor.values.flatten()
    
    # Calculate H(Y|E)
    epsilon = 1e-12
    h_y_given_e = -np.sum(prob_y_given_e * np.log2(prob_y_given_e + epsilon))
    
    # Compute H(Y|X,E)
    prob_y_given_xe_factor = inference.query(variables=[Y, X], evidence=E, joint=True)
    prob_y_given_xe = prob_y_given_xe_factor.values
    
    h_y_given_xe = 0
    for i in range(prob_y_given_xe.shape[1]):  # Iterate over states of X
        prob_y_given_x = prob_y_given_xe[:, i]
        prob_x = np.sum(prob_y_given_xe, axis=0)[i]
        if prob_x > 0:
            prob_y_given_x_normalized = prob_y_given_x / prob_x
            h_y_given_x = -np.sum(prob_y_given_x_normalized * np.log2(prob_y_given_x_normalized + epsilon))
            h_y_given_xe += prob_x * h_y_given_x
    
    # Calculate I(X;Y|E) = H(Y|E) - H(Y|X,E)
    cmi = h_y_given_e - h_y_given_xe
    
    return cmi

In [23]:
from typing import Dict, Set, Tuple

def get_markov_blanket(bn: nx.DiGraph, variable: str) -> Set[str]:
    """
    Calculates the Markov blanket for a given variable in a Bayesian Network.

    The Markov blanket of a node V includes its parents, its children, and the
    parents of its children (spouses)[cite: 265].

    Args:
        bn: The Bayesian Network represented as a networkx.DiGraph.
        variable: The name of the node for which to find the Markov blanket.

    Returns:
        A set of node names in the Markov blanket.
    """
    blanket = set()
    # Add parents
    blanket.update(bn.predecessors(variable))
    # Add children
    children = set(bn.successors(variable))
    blanket.update(children)
    # Add parents of children (spouses)
    for child in children:
        blanket.update(bn.predecessors(child))
    # Remove the variable itself if it's in the set
    blanket.discard(variable)
    return blanket

In [24]:
hc_child = HillClimbSearch(child)
model_child = hc_child.estimate(scoring_method="bicscore")
model_child = BayesianNetwork(model_child.edges())
model_child.fit(child, estimator=MaximumLikelihoodEstimator)

  0%|          | 0/1000000 [00:00<?, ?it/s]

In [25]:
calculate_conditional_mutual_information(model=model_child, X='Disease', Y='CardiacMixing', E={'ChestXray': 1})

0.41542987561836187

In [26]:
calculate_conditional_mutual_information_entropy(model=model_child, X='Disease', Y='CardiacMixing', E={'ChestXray': 1})

0.41542987565008016

In [27]:
# testing list of random patients with random evidence, target = Disease
patient1 = {'ChestXray': 1, 'CO2Report': 0, 'XrayReport': 1}
cmi1 = calculate_conditional_mutual_information(model=model_child, X='Disease', Y='CardiacMixing', E=patient1)

patient2 = {'ChestXray': 0, 'CO2Report': 1, 'XrayReport': 0, 'CardiacMixing': 1}
cmi2 = calculate_conditional_mutual_information(model=model_child, X='Disease', Y='DuctFlow', E=patient2)

patient3 = {'ChestXray': 1, 'CO2Report': 1, 'XrayReport': 1, 'DuctFlow': 0}
cmi3 = calculate_conditional_mutual_information(model=model_child, X='Disease', Y='CardiacMixing', E=patient3)

print(f"CMI Patient 1: {cmi1}")
print(f"CMI Patient 2: {cmi2}")
print(f"CMI Patient 3: {cmi3}")

CMI Patient 1: 0.37469716120983987
CMI Patient 2: 0.6140683674646568
CMI Patient 3: 0.16395828775128757


look for conditional entropy multivariada

testar variando o valor específico da evidência no caso em que a candidata é d-separada do alvo

Using the CMI implies that we do not need to compute the "expected reward". How could we merge the two ideas? i.e.: estimating the CMI but also leveraing information from each possible state of the chosen variable and how each state influences the gain in information

CMI implicitly handles d-separation between variables; is it better to compute d-separation before-hand?

In [28]:
calculate_conditional_mutual_information(model, 'D', 'S', {'L': 1, 'B':1})

-1.50317905716849e-16

In [29]:
def choose_variable_to_observe(
    model: BayesianNetwork,
    candidate_vars: Set[str],
    target_var: str,
    current_evidence: Dict[str, int]
) -> Tuple[str, float]:
    """
    Chooses the variable from candidate_vars that maximizes the conditional mutual information
    with the target_var given the current_evidence.

    """
    max_cmi = -np.inf
    best_var = None
    for var in candidate_vars:
        cmi = calculate_conditional_mutual_information(model, var, target_var, current_evidence)
        if cmi > max_cmi:
            max_cmi = cmi
            best_var = var
    return best_var, max_cmi

a busca pode ser acelerada levando em conta os caminhos saindo do target

In [30]:
random_patient = {'A': 1, 'X': 1}
best_var, best_cmi = choose_variable_to_observe(
    model=model,
    candidate_vars=set(latent_variables),
    target_var='T',
    current_evidence=random_patient
)
print(f"Best variable to observe: {best_var} with CMI: {best_cmi}")

Best variable to observe: E with CMI: 0.26834610927907265


In [None]:
def d_connected_nodes(model, target, evidence):

    active = model.active_trail_nodes(
        variables=[target],
        observed=list(evidence)
    )
    return set(active[target])

In [32]:
def separation_score(model, target, evidence, candidate):

    new_evidence = set(evidence) | {candidate}
    connected = d_connected_nodes(model, target, new_evidence)

    all_nodes = set(model.nodes())
    separated = all_nodes - connected - new_evidence

    return len(separated)

In [33]:
def select_best_variable(model, target, evidence):

    all_nodes = set(model.nodes())
    candidates = all_nodes - set(evidence) - {target}

    scores = {}

    for x in candidates:
        scores[x] = separation_score(model, target, evidence, x)

    best_var = max(scores, key=scores.get)
    return best_var, scores

In [34]:
best_variable = select_best_variable(model_child, 'Disease', {'ChestXray': 1})

In [35]:
best_variable

('LungParench',
 {'Grunting': 1,
  'CO2': 0,
  'LVH': 1,
  'XrayReport': 0,
  'CO2Report': 0,
  'GruntingReport': 0,
  'LungFlow': 0,
  'LVHreport': 0,
  'BirthAsphyxia': 0,
  'Sick': 0,
  'HypDistrib': 0,
  'Age': 0,
  'LungParench': 2,
  'HypoxiaInO2': 1,
  'DuctFlow': 0,
  'CardiacMixing': 0,
  'LowerBodyO2': 0,
  'RUQO2': 0})

What is the difference between using only the data set and using the BN model? Probabilities must be estimated somehow, and when using the BN the estimation will be impacted by the topology.