# Understanding Bayes Nets

We have:

- Nodes
- Edges

Bayes Nets encode the joint probability distribution of variables, allowing inference given evidence.

# Setup

In [6]:
from inspect import getsource
from IPython.display import display
import pandas as pd
import numpy as np
from collections import defaultdict, Counter
import itertools
import math
import random
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from matplotlib import pyplot
import time
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import json

In [7]:
random.seed(42)

In [8]:
mental_data = pd.read_csv("../data/cleaned_breast_cancer.csv")

In [9]:
mental_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 569 entries, 0 to 568
Data columns (total 32 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   id                       569 non-null    int64  
 1   diagnosis                569 non-null    int64  
 2   radius_mean              569 non-null    float64
 3   texture_mean             569 non-null    float64
 4   perimeter_mean           569 non-null    float64
 5   area_mean                569 non-null    float64
 6   smoothness_mean          569 non-null    float64
 7   compactness_mean         569 non-null    float64
 8   concavity_mean           569 non-null    float64
 9   concave points_mean      569 non-null    float64
 10  symmetry_mean            569 non-null    float64
 11  fractal_dimension_mean   569 non-null    float64
 12  radius_se                569 non-null    float64
 13  texture_se               569 non-null    float64
 14  perimeter_se             5

In [10]:
train_data, test_data = train_test_split(
    mental_data,
    test_size=0.4,
    random_state=42,
    stratify=mental_data["diagnosis"],
)

In [11]:
categorical_columns = mental_data.columns
for col in categorical_columns:
    train_data[col] = train_data[col].astype("category")
    test_data[col] = test_data[col].astype("category")

# Helper Code

In [12]:
def extend(s, var, val):
    """Create a copy of dictionary `s` and add a new key-value pair where `var` is set to `val`. Return the updated copy."""
    return {**s, var: val}

In [13]:
class ProbDist:
    """
    Represents a discrete probability distribution for a single random variable. 
    You can initialize it with a variable name and an optional frequency dictionary.
    Probabilities are normalized automatically if frequencies are provided.

    Example:
    >>> P = ProbDist('Flip'); P['H'], P['T'] = 0.25, 0.75; P['H']
    0.25
    >>> P = ProbDist('X', {'lo': 125, 'med': 375, 'hi': 500})
    >>> P['lo'], P['med'], P['hi']
    (0.125, 0.375, 0.5)
    """
    def __init__(self, varname='?', freqs=None):
        """
        Initialize the distribution. If `freqs` is given, it must be a dictionary 
        with values as keys and their frequencies as values. The distribution is normalized.
        """
        self.prob = {}
        self.varname = varname
        self.values = []
        if freqs:
            for (v, p) in freqs.items():
                self[v] = p
            self.normalize()

    def __getitem__(self, val):
        """Retrieve the probability of `val` if it exists, otherwise return 0."""
        try:
            return self.prob[val]
        except KeyError:
            return 0

    def __setitem__(self, val, p):
        """Assign probability `p` to the value `val`."""
        if val not in self.values:
            self.values.append(val)
        self.prob[val] = p

    def normalize(self):
        """
        Ensure that the probabilities of all values sum up to 1. 
        If the sum of values is 0, a ZeroDivisionError is raised.
        """
        total = sum(self.prob.values())
        if not np.isclose(total, 1.0):
            for val in self.prob:
                self.prob[val] /= total
        return self

    def show_approx(self, numfmt='{:.3g}'):
        """
        Display the probabilities rounded to a specified format, sorted by their keys. 
        Useful for readability in doctests.
        """
        return ', '.join([('{}: ' + numfmt).format(v, p)
                          for (v, p) in sorted(self.prob.items())])

    def __repr__(self):
        """Return a string representation of the distribution."""
        return "P({})".format(self.varname)

In [14]:
def probability_sampling(probabilities):
    """
    Perform random sampling based on the given probability distribution. 
    Returns an outcome based on the probabilities.
    """
    total = sum(probabilities.values())
    r = random.uniform(0, total)
    cumulative = 0
    for outcome, prob in probabilities.items():
        cumulative += prob
        if r <= cumulative:
            return outcome
    return None  # This should not occur if probabilities are normalized.

In [15]:
class MultiClassBayesNode:
    """
    Represents a node in a Bayesian network for multi-class variables. 
    It contains the variable, its parents, and the conditional probability table (CPT).
    """
    def __init__(self, X, parents, cpt):
        """
        Initialize the node with:
        - `X`: Variable name.
        - `parents`: List of parent variable names.
        - `cpt`: A dictionary representing the conditional probability table.
        """
        if isinstance(parents, str):
            parents = parents.split()
        self.variable = X
        self.parents = parents
        self.cpt = cpt
        self.children = []

    def p(self, value, event):
        """
        Compute the conditional probability of `X=value` given the parent values in `event`.
        """
        parent_values = tuple(event.get(p, None) for p in self.parents)
        probabilities = self.cpt.get(parent_values, {})
        return probabilities.get(value, 0)  # Defaults to 0 if `value` is not found.

    def sample(self, event):
        """
        Sample a value for the variable given parent values in `event`. 
        Sampling is based on the conditional probability distribution.
        """
        parent_values = tuple(event.get(p, None) for p in self.parents)
        probabilities = self.cpt.get(parent_values, {})
        return probability_sampling(probabilities)

    def __repr__(self):
        """Return a string representation of the node."""
        return repr((self.variable, ' '.join(self.parents)))

In [16]:
class BayesNet:
    """
    Represents a Bayesian network consisting of nodes (variables) and their dependencies.
    Supports multi-class nodes.
    """
    def __init__(self, node_specs=None):
        """
        Initialize the network. Nodes must be added in topological order 
        (parents must be added before their children).
        """
        self.nodes = []
        self.variables = []
        node_specs = node_specs or []
        for node_spec in node_specs:
            self.add(node_spec)

    def add(self, node_spec):
        """
        Add a node to the network. Accepts either a pre-constructed node 
        or the specifications for a new node.
        """
        if isinstance(node_spec, MultiClassBayesNode):
            node = node_spec
        else:
            node = MultiClassBayesNode(*node_spec)

        assert node.variable not in self.variables
        assert all((parent in self.variables) for parent in node.parents)

        self.nodes.append(node)
        self.variables.append(node.variable)

        # Register this node as a child for its parent nodes
        for parent in node.parents:
            self.variable_node(parent).children.append(node)

    def variable_node(self, var):
        """Retrieve the node corresponding to the variable `var`."""
        for n in self.nodes:
            if n.variable == var:
                return n
        raise Exception(f"No such variable: {var}")

    def variable_values(self, var):
        """Retrieve the domain of `var` (default is `[True, False]`)."""
        return [True, False]

    def __repr__(self):
        """Return a string representation of the network."""
        return f"BayesNet({self.nodes!r})"

In [17]:
class Factor:
    """Represents a factor in a joint distribution."""
    def __init__(self, variables, cpt):
        """
        Initialize the factor with:
        - `variables`: List of variables involved in the factor.
        - `cpt`: Conditional probability table.
        """
        self.variables = variables
        self.cpt = cpt

    def normalize(self):
        """
        Normalize the factor and return a `ProbDist` for the remaining variable.
        This is only valid if the factor has one variable left.
        """
        assert len(self.variables) == 1
        return ProbDist(self.variables[0], {k: v for ((k,), v) in self.cpt.items()})

    def p(self, e):
        """Retrieve the probability for the event `e` from the factor's CPT."""
        return self.cpt[event_values(e, self.variables)]

In [18]:
def enumerate_all(variables, e, bn):
    """
    Calculate the sum of all entries in the joint probability distribution 
    for `variables` consistent with the evidence `e` in network `bn`.
    """
    if not variables:
        return 1.0
    Y, rest = variables[0], variables[1:]
    Ynode = bn.variable_node(Y)
    if Y in e:
        return Ynode.p(e[Y], e) * enumerate_all(rest, e, bn)
    else:
        return sum(Ynode.p(y, e) * enumerate_all(rest, extend(e, Y, y), bn)
                   for y in bn.variable_values(Y))

In [19]:
def enumeration_ask(X, e, bn):
    """
    Compute the conditional probability distribution for the query variable `X` 
    given evidence `e` in the Bayesian network `bn`.
    """
    assert X not in e, "Query variable must not overlap with the evidence."
    Q = ProbDist(X)
    for xi in bn.variable_values(X):
        Q[xi] = enumerate_all(bn.variables, extend(e, X, xi), bn)
    return Q.normalize()

In [20]:
def event_values(event, variables):
    """
    Generate a tuple containing the values of the specified variables from the event.
    
    Examples:
    >>> event_values({'A': 10, 'B': 9, 'C': 8}, ['C', 'A'])
    (8, 10)
    >>> event_values((1, 2), ['C', 'A'])
    (1, 2)
    """
    if isinstance(event, tuple) and len(event) == len(variables):
        return event
    else:
        return tuple(event[var] for var in variables)

# Design the Network Structure

Find dependencies.

# Estimate Conditional Probabilities

If a node is a root node, then estimate probability directly from the data. Estimate conditional probabilities based on parent for non-root nodes.

In [21]:
def compute_cpt(data, target, parents, alpha=1):
    """
    Compute CPT with Laplace smoothing.
    
    Args:
        data: pandas DataFrame (training data)
        target: str, target variable
        parents: list of parent variable names
        alpha: smoothing parameter (default=1)
    
    Returns:
        cpt: dict { parent_values_tuple: { target_value: probability } }
    """
    target_values = data[target].cat.categories

    if not parents:
        # Marginal distribution of target
        counts = defaultdict(lambda: alpha)
        for val in data[target]:
            counts[val] += 1
        total = sum(counts.values())
        cpt = {(): {tv: counts[tv]/total for tv in counts}}
        return cpt

    # Determine possible parent combinations
    from itertools import product
    parent_values_list = [data[p].cat.categories for p in parents]
    parent_combinations = list(product(*parent_values_list)) if parents else [()]

    # Initialize counts with alpha
    counts = {pc: defaultdict(lambda: alpha) for pc in parent_combinations}

    # Count occurrences
    for _, row in data.iterrows():
        pv = tuple(row[p] for p in parents) if parents else ()
        tv = row[target]
        counts[pv][tv] += 1

    # Compute probabilities
    cpt = {}
    for pc in parent_combinations:
        total = sum(counts[pc].values())
        cpt[pc] = {tv: (counts[pc][tv] / total) for tv in counts[pc]}
        
    return cpt

In [22]:
def compute_all_cpts(train_data):
    """
    Compute all CPTs and record the time taken.
    """
    start_time = time.time()  # Start timing
    
    # Compute CPTs
    cpt_diagnosis = compute_cpt(train_data, 'diagnosis', [])
    cpt_concave_points = compute_cpt(train_data, 'concave points_mean', ['diagnosis'])
    cpt_perimeter = compute_cpt(train_data, 'perimeter_mean', ['diagnosis'])
    cpt_radius = compute_cpt(train_data, 'radius_mean', ['diagnosis'])
    cpt_concavity = compute_cpt(train_data, 'concavity_mean', ['diagnosis', 'concave points_mean'])
    cpt_texture = compute_cpt(train_data, 'texture_mean', ['diagnosis'])

    cpt_area = compute_cpt(train_data, 'area_mean', ['perimeter_mean'])
    cpt_compactness = compute_cpt(train_data, 'compactness_mean', ['concavity_mean'])
    cpt_smoothness = compute_cpt(train_data, 'smoothness_mean', ['concavity_mean'])
    cpt_symmetry = compute_cpt(train_data, 'symmetry_mean', ['compactness_mean'])
    cpt_fractal = compute_cpt(train_data, 'fractal_dimension_mean', ['symmetry_mean'])
    
    end_time = time.time()  # End timing
    training_time = end_time - start_time
    print(f"Training Time (CPT Computation): {training_time:.4f} seconds")

    return {
        "cpt_diagnosis": cpt_diagnosis,
        "cpt_concave_points": cpt_concave_points,
        "cpt_perimeter": cpt_perimeter,
        "cpt_radius": cpt_radius,
        "cpt_concavity": cpt_concavity,
        "cpt_texture": cpt_texture,
        "cpt_area": cpt_area,
        "cpt_compactness": cpt_compactness,
        "cpt_smoothness": cpt_smoothness,
        "cpt_symmetry": cpt_symmetry,
        "cpt_fractal": cpt_fractal
    }, training_time

# Implement Inference

Use the chain rule of probability to predict target given evidence.

In [23]:
cpts, training_time = compute_all_cpts(train_data)

Training Time (CPT Computation): 0.2046 seconds


In [24]:
diagnosis_node = MultiClassBayesNode("diagnosis", [], cpts['cpt_diagnosis'])

concave_points_node = MultiClassBayesNode(
    "concave points_mean", ["diagnosis"], cpts['cpt_concave_points']
)
perimeter_node = MultiClassBayesNode("perimeter_mean", ["diagnosis"], cpts['cpt_perimeter'])
radius_node = MultiClassBayesNode("radius_mean", ["diagnosis"], cpts['cpt_radius'])
concavity_node = MultiClassBayesNode(
    "concavity_mean", ["diagnosis", "concave points_mean"], cpts['cpt_concavity'])

texture_node = MultiClassBayesNode("texture_mean", ["diagnosis"], cpts['cpt_texture'])

area_node = MultiClassBayesNode("area_mean", ["perimeter_mean"], cpts['cpt_area'])
compactness_node = MultiClassBayesNode("compactness_mean", ["concavity_mean"], cpts['cpt_compactness'])
smoothness_node = MultiClassBayesNode("smoothness_mean", ["concavity_mean"], cpts['cpt_smoothness'])
symmetry_node = MultiClassBayesNode("symmetry_mean", ["compactness_mean"], cpts['cpt_symmetry'])
fractal_node = MultiClassBayesNode("fractal_dimension_mean", ["symmetry_mean"], cpts['cpt_fractal'])

In [25]:
diagnosis_bn = BayesNet([
    diagnosis_node,
    concave_points_node,
    perimeter_node,
    radius_node,
    concavity_node,
    texture_node,
    area_node,
    compactness_node,
    smoothness_node,
    symmetry_node,
    fractal_node
])

In [26]:
print(diagnosis_bn)

BayesNet([('diagnosis', ''), ('concave points_mean', 'diagnosis'), ('perimeter_mean', 'diagnosis'), ('radius_mean', 'diagnosis'), ('concavity_mean', 'diagnosis concave points_mean'), ('texture_mean', 'diagnosis'), ('area_mean', 'perimeter_mean'), ('compactness_mean', 'concavity_mean'), ('smoothness_mean', 'concavity_mean'), ('symmetry_mean', 'compactness_mean'), ('fractal_dimension_mean', 'symmetry_mean')])


# Evaluate the Model

In [27]:
def predict_bayes_net(bn, evidence, query_var):
    """
    Predict the most likely value of a query variable given evidence using the Bayesian Network.
    
    Args:
        bn: Bayesian network.
        evidence: Dictionary of evidence variables and their values.
        query_var: Variable to predict.
    
    Returns:
        The most likely value of the query variable.
    """
    result = enumeration_ask(query_var, evidence, bn)
    return max(result.prob, key=lambda k: result.prob[k])

In [28]:
def evaluate_bayes_net_with_time(bn, test_data, query_var):
    y_true = []
    y_pred = []

    # Start Timing Predictions
    start_time = time.time()

    for _, row in test_data.iterrows():
        evidence = {
            "Gender": row["Gender"],
            "Physical_Activity_Hours": row["Physical_Activity_Hours"],
            "Country": row["Country"],
            "Age": row["Age"],
            "Occupation": row["Occupation"],
        }
        prediction = predict_bayes_net(bn, evidence, query_var)
        y_true.append(row[query_var])
        y_pred.append(prediction)

    end_time = time.time()
    prediction_time = end_time - start_time

    # Calculate Metrics
    acc = accuracy_score(y_true, y_pred)
    cm = confusion_matrix(y_true, y_pred)
    report = classification_report(y_true, y_pred, zero_division=0)

    print("Confusion Matrix:\n", cm)
    print("\nClassification Report:\n", report)

    metrics = {
        "accuracy": acc,
        "prediction_time": prediction_time
    }

    print(f"Prediction Time: {prediction_time:.4f} seconds")
    return metrics

In [29]:
def evaluate_bayes_net(bn, test_data, query_var):
    """
    Evaluate the Bayesian Network on a test dataset and compute various metrics.

    Args:
        bn: Bayesian Network (BayesNet instance).
        test_data: Test dataset (pandas DataFrame).
        query_var: The target variable to predict.

    Returns:
        metrics: A dictionary containing accuracy, and prints out confusion matrix
                 and classification report (precision, recall, f1).
    """

    # Start Timing Predictions
    start_time = time.time()

    y_true = []
    y_pred = []

    for _, row in test_data.iterrows():
        # Build evidence dictionary from test row
        # Note: Adjust the evidence set according to what you want to condition on.
        evidence = {
            "concave points_mean": row["concave points_mean"],
            "perimeter_mean": row["perimeter_mean"],
            "radius_mean": row["radius_mean"],
            "concavity_mean": row["concavity_mean"],
            "texture_mean": row["texture_mean"],
            "area_mean": row["area_mean"],
            "compactness_mean": row["compactness_mean"],
            "smoothness_mean": row["smoothness_mean"],
            "symmetry_mean": row["symmetry_mean"],
            "fractal_dimension_mean": row["fractal_dimension_mean"],
        }

        # Predict the target variable
        prediction = predict_bayes_net(bn, evidence, query_var)

        y_true.append(row[query_var])
        y_pred.append(prediction)

    end_time = time.time()
    prediction_time = end_time - start_time    

    # Calculate accuracy
    acc = accuracy_score(y_true, y_pred)
    cm = confusion_matrix(y_true, y_pred)
    report = classification_report(y_true, y_pred, zero_division=0)

    print("Confusion Matrix:\n", cm)
    print("\nClassification Report:\n", report)
    print(f"Prediction Time: {prediction_time:.4f} seconds")

    metrics = {
        "accuracy": acc,
        "prediction_time": prediction_time
    }

    return metrics

In [30]:
metrics = evaluate_bayes_net(diagnosis_bn, test_data, "diagnosis")
print("Accuracy:", metrics["accuracy"])

Confusion Matrix:
 [[134   9]
 [ 13  72]]

Classification Report:
               precision    recall  f1-score   support

         0.0       0.91      0.94      0.92       143
         1.0       0.89      0.85      0.87        85

    accuracy                           0.90       228
   macro avg       0.90      0.89      0.90       228
weighted avg       0.90      0.90      0.90       228

Prediction Time: 0.0971 seconds
Accuracy: 0.9035087719298246


In [31]:
def cpts_to_json(cpts):
    serializable_cpts = {}
    for var, cpt in cpts.items():
        serializable_cpts[var] = {
            str(parent_comb): {str(k): v for k, v in target_probs.items()}
            for parent_comb, target_probs in cpt.items()
        }
    return serializable_cpts

In [32]:
# Save the CPTs to a JSON file
cpt_json = cpts_to_json(cpts)
with open("cancer_cpts.json", "w") as f:
    json.dump(cpt_json, f, indent=4)
print("CPTs saved successfully to cancer_cpts.json!")

CPTs saved successfully to cancer_cpts.json!


In [None]:
cpt_json