In [1]:
from baynet import DAG, metrics
import numpy as np
import networkx as nx
from cdt.causality import graph
from cdt.utils import dagify_min_edge
import pandas as pd
from typing import List, Optional, Tuple, Hashable, Dict
from itertools import product
from functools import partial

No GPU automatically detected. Setting SETTINGS.GPU to 0, and SETTINGS.NJOBS to cpu_count.


## Utility Functions

In [2]:
def _cdt_to_baynet(cpdag: nx.DiGraph, columns: List[str]) -> DAG:
    adj_matrix = np.array(nx.adj_matrix(cpdag).todense())
    graph = baynet.DAG.from_amat(adj_matrix, columns)
    return graph

def PC(data: pd.DataFrame, ci_test: str = "discrete") -> DAG:
    """
    Wrap of the CDT predict function of the PC algorithm. Return a learnt DAG from data.

    :param data: The data which the structure will be learnt with (Pandas.DataFrame)
    :param ci_test: The (str) argument specifying which Conditional Indep. test to use
    :return: A learnt DAG (BayNet DAG)
    """
    pc = graph.PC(CItest=ci_test, verbose=False)
    cpdag = pc_alg.predict(data)
    dag = self.cdt.utils.dagify_min_edge(cpdag)
    return _cdt_to_baynet(dag, list(data.columns))

def GES(data: pd.DataFrame, score: str = "int") -> DAG:
    """
    Wrap of the CDT predict function of the GES algorithm. Return a learnt DAG from data.

    :param data: The data which the structure will be learnt with (Pandas.DataFrame)
    :param score: The (str) argument specifying which score function to use
    :return: A learnt DAG (BayNet DAG)
    """
    pc = graph.GES(score=score, verbose=False)
    cpdag = pc_alg.predict(data)
    dag = self.cdt.utils.dagify_min_edge(cpdag)
    return _cdt_to_baynet(dag, list(data.columns))    

## Metrics

In [3]:
def all_combinations_odds_ratio(
    model: DAG,
    target: Optional[str] = None,
    data: Optional[pd.DataFrame] = None,
) -> Tuple[dict, str]:
    """
    Get a dictionary object containing all possible odds ratios on a target.

    :param model: A DAG object on which to calculate the odds ratios
    :param target: A str specifying the target node
    :return: A dictionary of odds ratios, with tuple keys
    """
    odds_ratio_dict = dict()

    if target is not None and target not in list(map(str, model.nodes)):
        raise ValueError(f'"{target}" node not found in model.')

    if target is None:
        target = get_target_variable(model)

    target_levels = model.get_node(target)["levels"]
    target_reference = target_levels[0]

    valid_nodes = (
        model.nodes - set(model.get_descendants(target, only_children=True)["name"]) - {target}
    )
    for target_target in target_levels[1:]:
        for evidence in valid_nodes:
            evidence_levels = model.get_node(evidence)["levels"]
            evidence_reference = evidence_levels[0]

            for evidence_target in evidence_levels[1:]:
                or_key = (
                    target,
                    str(target_reference),
                    str(target_target),
                    evidence,
                    str(evidence_reference),
                    str(evidence_target),
                )
                or_result = odds_ratio(
                    model,
                    target,
                    evidence,
                    target_reference,
                    target_target,
                    evidence_reference,
                    evidence_target,
                    data=data,
                )
                if or_result is not None:
                    odds_ratio_dict[or_key] = or_result

    return odds_ratio_dict, target

def odds_ratio(
    model: DAG,
    target: str,
    evidence: str,
    target_reference_level: str,
    target_target_level: str,
    evidence_reference_level: str,
    evidence_target_level: str,
    data: Optional[pd.DataFrame] = None,
) -> Tuple[np.float64, np.float64, np.float64]:
    """
    Produce odds ratios for a target node given some evidence.

    Does so via CPD manipulation/propagation, as opposed to network sampling.

    :param data:
    :param model: DAG with structure and CPDs defined.
    :param target: The target node to estimate the effect on
    :param target_target_level: The target level of the target node
    :param target_reference_level: The reference level of the target node
    :param evidence: The node which is being intervened on (evidence provided for)
    :param evidence_target_level: The target level for the evidence node
    :param evidence_reference_level: The reference level for the evidence node

    :return: the estimated odds ratio
    :type: numpy.float64
    """
    #  If the evidence node is a direct descendent of the target node then
    #  intervening on it will remove the latter from the graph.
    if evidence in model.get_descendants(target, only_children=True)["name"]:
        warnings.warn(
            f"Odds ratio cannot be computed as evidence node '{evidence}' is a direct descendent "
            f"of target node '{target}'"
        )
        dummy = cast(np.float64, 1)
        return dummy, dummy, dummy
    target_reference_idx = int(target_reference_level)
    target_target_idx = int(target_target_level)
    return _static_intervention(
        model=model,
        target=target,
        evidence=evidence,
        evidence_level=str(evidence_reference_level),
        target_level=str(evidence_target_level),
        target_target_idx=target_target_idx,
        target_reference_idx=target_reference_idx,
        data=data,
    )
    
def _static_intervention(
    model: DAG,
    target: str,
    evidence: str,
    evidence_level: Hashable,
    target_level: Hashable,
    target_target_idx: int,
    target_reference_idx: int,
    data: Optional[pd.DataFrame],
) -> Tuple[np.float64, np.float64, np.float64]:
    """
    Get distribution of the `target` node after intervening on the evidence node's evidence_level.

    :param model: DAG to perform intervention on; with structure and CPDs defined.
    :param target: node to assess interventional outcome
    :param evidence: node to perform intervention on
    :param evidence_level: level of `evidence` node to perform intervention on
    :return: resultant distribution of the `target` node after propagating intervention.
    """
    if evidence not in model.nodes:
        return 1, 1, 1
    reference_model = model.mutilate(evidence, str(evidence_level))
    target_model = model.mutilate(evidence, str(target_level))
    n_data = 1 if data is None else len(data)
    target_dist = _propogate_probability(target_model, target) * n_data
    reference_dist = _propogate_probability(reference_model, target) * n_data
    result = (target_dist[target_target_idx] / target_dist[target_reference_idx]) / (
        reference_dist[target_target_idx] / reference_dist[target_reference_idx]
    )
    standard_error = np.sqrt(
        1 / target_dist[target_target_idx]
        + 1 / target_dist[target_reference_idx]
        + 1 / reference_dist[target_target_idx]
        + 1 / reference_dist[target_reference_idx]
    )
    upper = np.exp(np.log(result) + 1.96 * standard_error)
    lower = np.exp(np.log(result) - 1.96 * standard_error)
    if data is None:
        return result, result, result
    return result, upper, lower

def _propogate_probability(model: DAG, target: str) -> np.ndarray:
    accumulated_probs: Dict[str, np.ndarray] = dict()

    for node in _get_resolution_order(model):

        # Get CPD of curr node
        node_cpd = model.get_node(node)["CPD"].array
        ordered_parent_nodes = model.get_node(node)["CPD"].parents

        # Init result distribution
        result_dist = node_cpd

        # If node has parents, perform accumulation step
        if ordered_parent_nodes:
            for parent_node in reversed(ordered_parent_nodes):
                result_dist = accumulated_probs[parent_node].dot(result_dist)

        accumulated_probs[node] = result_dist
    return accumulated_probs[target]

def _classify_or(odds_ratio_result: Tuple[float, float, float]) -> str:
    odds_ratio, upper, lower = odds_ratio_result
    if odds_ratio > 1:
        if upper > 1 and lower > 1:
            return "D"
    if odds_ratio < 1:
        if upper < 1 and lower < 1:
            return "P"
    return "N"

def _pcor(
    true_ors: Dict[str, Tuple[float, float, float]],
    learnt_ors: Dict[str, Tuple[float, float, float]],
) -> float:
    """
    Calculate the PCOR (prop of correct odds ratios) given two dicts of odds ratios.

    @param true_ors: Dictionary of true odds ratios (or_key, or)
    @param learnt_ors: Dictionary of learnt odds ratios (or_key, or)

    @return: the proportion of correct odds ratios
    """
    hamming = 0
    for k, v in true_ors.items():
        true_or_class = _classify_or(v)
        learnt_or_class = _classify_or(learnt_ors[k])
        if true_or_class != learnt_or_class:
            hamming += 1
    return 1 - (hamming / len(true_ors.keys()))


def calculate_pcor(true_bn: DAG, learnt_bn: DAG):
    true_ors, target = all_combinations_odds_ratio(true_bn)
    learnt_ors, _ = all_combinations_odds_ratio(learnt_bn, target)
    return _pcor(true_ors=true_ors, learnt_ors=learnt_ors)

## Input

In [4]:
trials = list(range(1, 2)) # 1 -> 10
structure_types = [partial(DAG.forest_fire, fw_prob=0.5), DAG.barabasi_albert] # structure type functions
variables = [10]
samples = [500]
alphas = [6.0]
max_levels = [4]

algorithms = [PC, GES]

## Run Experiments

In [5]:
columns = ["Trial", "Structure Type", "N_Variables", "N_Samples", "Alpha", "Max_Level",  "Algorithm", "Precision", "Recall", "V_Structure_Precision", "V_Structure_Recall", "PCOR"]
results = []

for trial, structure_type, variable, sample, alpha, max_level, algorithm in product(*[trials, structure_types, variables, samples, alphas, max_levels, algorithms]):
    # ---------  Create Data ------------
    dag = structure_type(variable, seed=trial)
    dag.generate_discrete_parameters(alpha=alpha, min_levels=2, max_levels=max_level, seed=trial)
    data = dag.sample(sample)
    # --------- Learn BN ---------------
    # Learn Structure
    learnt_dag = algorithm(data)
    # Learn Parameters
    learnt_dag.estimate_parameters(data, infer_levels=True)
    # -------- Calculate Metrics -------------
    # Skeleton
    precision = metrics.precision(dag, learnt_dag, skeleton=True)
    recall = metrics.recall(dag, learnt_dag, skeleton=True)
    # V Structures
    v_precision = metrics.v_precision(dag, learnt_dag)
    v_recall = metrics.v_recall(dag, learnt_dag)
    # PCOR
    pcor = calculate_pcor(dag, learnt_dag)
    results.append([trial, structure_type, variable, sample, alpha, max_level, algorithm, precision, recall, v_precision, v_recall, pcor])

R Call errored, is R available ?


PermissionError: [Errno 13] Permission denied: 'Rscript'