In [None]:
# PARAMETERS
GROUP_ID = 'Group11'
ALGORITHM = 've'
NETWORK_NAME = "/content/drive/MyDrive//networks/child.bif"
REPORT = 'Disease'
EVIDENCE_LEVEL = 'None'
EVIDENCE = ''

In [None]:
# Mount Google Drive, unneccessary if not using google colab
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# CLASSES
# imports
import itertools

# an object to represent a factor in a probabilistic model
# like a table over a set of vars and probs
# equivalent to a conditional prob table (cpt)
# variables = list of Node objects
# proabilities = dict matching values to their probs
class Factor:
    def __init__(self, variables, probabilities):
        self.variables = variables
        self.probabilities = probabilities
        self.variable_names = [v.name for v in variables]

    def get_var_scope(self): # get all names of variables associated with the factor
        return self.variable_names

    def __str__(self):# to print factors
      header = " | ".join(self.variable_names + ["Probability"])
      separator = "-" * len(header)
      rows = [header, separator]
      for assignment, prob in self.probabilities.items():
        row_str = " | ".join(map(str, assignment)) + f" | {prob:.4f}"
        rows.append(row_str)
      return "\n".join(rows)

    def multiply_factors(factor1, factor2): # multiple two factor objects via the use of the cartesian product
      vars1= factor1.variable_names
      vars2= factor2.variable_names
      new_vars_names = sorted(list(set(vars1)|set(vars2))) # get union of variable for new factor

      # map var names to associated nodes
      var_map = {v.name: v for v in factor1.variables}
      var_map.update({v.name: v for v in factor2.variables})
      new_vars_nodes = [var_map[name] for name in new_vars_names]

      new_probabilities= {}

      # use cartesian product to generate every row for the new factor table
      domains = [v.domain for v in new_vars_nodes]
      for row in cartesian_product(domains):
        full_row_dict = dict(zip(new_vars_names, row))
        row1_tuple = tuple(full_row_dict[v_name] for v_name in vars1)
        row2_tuple = tuple(full_row_dict[v_name] for v_name in vars2)

        prob1 = factor1.probabilities.get(row1_tuple, 1.0)
        prob2 = factor2.probabilities.get(row2_tuple, 1.0)

        new_probabilities[row] = prob1 * prob2
      return Factor(new_vars_nodes, new_probabilities)

    def marginalize(factor, var_to_sum_out): # sum out a variable from a factor with marginilization
      if var_to_sum_out not in factor.variable_names:
        return factor # there is no var to sum out in this factor//doesn't exist

      new_vars_nodes  = [v for v in factor.variables if v.name != var_to_sum_out]
      sum_out_index = factor.variable_names.index(var_to_sum_out)

      new_probabilities = {}
      for row, prob in factor.probabilities.items():
        new_row_tuple = tuple(val for i, val in enumerate(row) if i != sum_out_index)
        new_probabilities[new_row_tuple] = new_probabilities.get(new_row_tuple, 0.0) + prob

      return Factor(new_vars_nodes, new_probabilities)

    def get_prob_from_dict(self, assignment_dict):
      # looks up prob for a specific assignment

      # build a tuple in the order that matches how the factor's prob dict is
      assignment_tuple = tuple(assignment_dict[var_name] for var_name in self.variable_names)
      # return the prob
      return self.probabilities.get(assignment_tuple, 0.0)

class Node:
  # node object to hold its name, domain, parents, children, and releavant factor/conditional prob table
  def __init__(self, name, domain):
    self.name = name
    self.domain = domain
    self.parents = []
    self.children = []
    self.cpt = None # factor object

  def set_cpt(self, probabilities): #conditional probability table for the node using factor class
    cpt_vars = self.parents + [self]
    factor_probs = {}

    # parent domains to generate all state combos
    parent_domains = [parent.domain for parent in self.parents]

    # generate all parent state combos
    parent_rows = cartesian_product(parent_domains) if parent_domains else [()]

    # go through each combo of parent rows
    for i, p_row in enumerate(parent_rows):
      # get probs of the childs states given this parent row
      child_probs_list = probabilities[p_row]

      #match each child state with its prob
      for j, child_state in enumerate(self.domain):
        # include parents values and childs values
        full_row = p_row + (child_state,)
        factor_probs[full_row] = child_probs_list[j]


    self.cpt = Factor(cpt_vars, factor_probs)

  def __repr__(self):
    return f"Node({self.name})"

class BayesNet:
  # an object to represent a bayes net, holds all nodes in the net and edges between them
  def __init__(self):
    self.nodes = {} # to map node names to node objects

  def add_node(self, node):
    if node.name in self.nodes:
      raise ValueError(f"Node with name {node.name} already exists")
    self.nodes[node.name] = node

  def add_edge(self, parent_name, child_name):
    if parent_name not in self.nodes or child_name not in self.nodes:
      raise ValueError("both parents and child must be in network to add edge")
    parent_node = self.nodes[parent_name]
    child_node = self.nodes[child_name]

    parent_node.children.append(child_node)
    child_node.parents.append(parent_node)

  def get_node(self, name):
    return self.nodes.get(name)

def cartesian_product(domains):
  # get cartesian product of a list of lists
  # returns list of tuples with all combo tuples
  # get every combo of the domains

  if not domains:
    return [()] # base case

  # recursion
  results = []
  first_domain = domains[0]
  other_domains = domains[1:]

  # get all combos for other domains
  combos_of_other_domains = cartesian_product(other_domains)

  # for first domain, combine it with each combo from the other domains
  for value in first_domain:
    for combo in combos_of_other_domains:
      results.append((value,)+combo)

  return results



In [None]:
# BIF Import

!pip install pgmpy #- to install pgmpy
from pgmpy.readwrite import BIFReader

def load_bif(file_path):
  # Load BIF
  reader = BIFReader(file_path)
  model = reader.get_model()

  # Extract BIF information
  variables = reader.get_variables()
  parents = reader.get_parents()
  states = reader.get_states()
  cpds = {cpd.variable: cpd for cpd in model.get_cpds()}

  # Create Bayesian network and add nodes
  bayes_net = BayesNet()

  # Create nodes
  for var in variables:
    node = Node(name=var, domain=states[var])
    bayes_net.add_node(node)

  # Add parent child edges
  for child, parent_list in parents.items():
    for parent in parent_list:
      bayes_net.add_edge(parent, child)

  # CPTs for each node
  for var in variables:
    node = bayes_net.get_node(var)
    cpd = cpds[var]

    parent_names_cpd_order = cpd.variables[1:]

    parent_domain_lists = [states[p_name] for p_name in parent_names_cpd_order]
    child_domain_list = node.domain

    probabilities = {}

    # Create every possible combo of parent states
    parent_rows = cartesian_product(parent_domain_lists) if parent_domain_lists else[()]

    #get numpy array of probabilites
    cpd_vals_array = cpd.values

    # Assign probability of each child state from CPD
    for p_row in parent_rows:
      if parent_domain_lists:
        p_indicies = tuple(parent_domain_lists[i].index(p_row[i]) for i in range(len(p_row)))
        for j, child_val in enumerate(child_domain_list):
          # full index of array
          cpt_index = (j,) + p_indicies

          # get key for factors dict
          row = p_row + (child_val,)
          probabilities[row] = float(cpd_vals_array[cpt_index])
      else:
        for j, child_val in enumerate(child_domain_list):
          row = p_row + (child_val,)
          probabilities[row] = float(cpd.values[j])

    parent_nodes_in_order = [bayes_net.get_node(p_name) for p_name in parent_names_cpd_order]
    # Create factor then assign to the node
    node.cpt = Factor(parent_nodes_in_order + [node], probabilities)

  return bayes_net

Collecting pgmpy
  Downloading pgmpy-1.0.0-py3-none-any.whl.metadata (9.4 kB)
Collecting pyro-ppl (from pgmpy)
  Downloading pyro_ppl-1.9.1-py3-none-any.whl.metadata (7.8 kB)
Collecting pyro-api>=0.1.1 (from pyro-ppl->pgmpy)
  Downloading pyro_api-0.1.2-py3-none-any.whl.metadata (2.5 kB)
Downloading pgmpy-1.0.0-py3-none-any.whl (2.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m17.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyro_ppl-1.9.1-py3-none-any.whl (755 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m756.0/756.0 kB[0m [31m37.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyro_api-0.1.2-py3-none-any.whl (11 kB)
Installing collected packages: pyro-api, pyro-ppl, pgmpy
Successfully installed pgmpy-1.0.0 pyro-api-0.1.2 pyro-ppl-1.9.1


In [None]:
# GIBBS SAMPLING
import random

def gibbs(bayes_net, query, evidence, num_samples, burn_num):
  # input: bayes_net: bayes net object
  # query: name of variable to we are trying to find out about
  # evidence: dictionary for observed evidence
  # num_sample: number of samples to do
  # burn_num: number of intial samples to discard
  # output: prob distribution dictionary of query var

  # initialize the dist
  current_state = forward_sampling(bayes_net, evidence)
  # keep track of nodes we don't have evidence for
  no_evidence_nodes = [
      node for name, node in bayes_net.nodes.items()
      if name not in evidence]
  # get query node object
  query_node = bayes_net.get_node(query)
  counts = {value: 0 for value in query_node.domain} #initialize tally for how often a possible value occurs for each query var

  # run burn in
  for i in range(burn_num):
    for node in no_evidence_nodes:
      new_val = sample_markov_blanket(node, current_state, bayes_net)
      current_state[node.name] = new_val

  # after burn in
  for i in range(num_samples):
    for node in no_evidence_nodes:
      new_val = sample_markov_blanket(node, current_state, bayes_net)
      current_state[node.name]= new_val
    # accumulate result for loop
    query_val = current_state[query]
    counts[query_val] += 1 # counts??
  return normalize(counts)

def topological_sort(bayes_net):
  # perform topological sort on bayes net
  # input: bayes net
  # output: list of nodes in topological order

  # find number of parents for each node
  num_parents = {name: len(node.parents) for name, node in bayes_net.nodes.items()}

  # queue all nodes with no parents
  queue = [name for name, count in num_parents.items() if count == 0]

  sorted_nodes = []
  # go through all nodes in the queue until there are no more
  while queue:
    # pop each node on the queue and append them because they are part of the current "layer"
    node_name = queue.pop(0)
    node = bayes_net.get_node(node_name)
    sorted_nodes.append(node)

    # remove parent from each child so that they can be queued
    for child in node.children:
      num_parents[child.name] -= 1
      # if all parents are removed for a child, add them to the queue because we are in the next layer down
      if num_parents[child.name] == 0:
        queue.append(child.name)
  return sorted_nodes


def forward_sampling(bayes_net, evidence):
  # initialize first state for gibbs sampling using forward sampling
  # input: bayes net: bayes net object
  # evidence: dictionary of given evidence
  # output: dictionary with prob dist for each node by name
  current_state = {}
  # do topological sort of given bayes net
  sorted_nodes = topological_sort(bayes_net)
  for node in sorted_nodes:
    # if node has evidence, make its value fixed
    if node.name in evidence:
      current_state[node.name] = evidence[node.name]
    else: # non evidence node, sample it, get prob dist given the parents from current state
      prob_given_parents = get_prob(node, current_state)
      # sample value from the above dist
      current_state[node.name] = weighted_sample(prob_given_parents)
  return current_state

def get_prob(node, current_state):
  # get the probability of a node given its parents from its conditional prob table
  # input: node: node object for which we are trying to get the prob of
  # current_state: current state dict holding prob dist for each node by name
  # output: prob dist dictionary for a child given its parents values
  # get parent values
  parents = node.cpt.variable_names[:-1]

  # if root node (no parents)
  if not parents:
    parent_assignment = ()
  else:
    parent_assignment = tuple(current_state[name] for name in parents)

  # get probs for these parent assignments
  prob_dict = {}
  for child_val in node.domain:
    full_assignment = parent_assignment + (child_val,)
    # assume node.cpt.proabilites is the dict in the node object
    prob_dict[child_val] = node.cpt.probabilities[full_assignment]

  return prob_dict

def sample_markov_blanket(node, current_state, bayes_net):
  # input: node: node object, current_state: current state dict holding prob dist for each node by name,
  # bayes_net, bayes net object
  # output: value sample, given markov blanket from prob dict randomly, based on its weight
  # calculate the probability of a node given the markov blanket for that node
  probs = {}
  temp_state = current_state.copy() # create copy of current state
  # calculate unnormalized prob for each possible val of the var/node given
  for value in node.domain:
    # set the nodes value in the current state dict so that we can calculate other probs based on that
    temp_state[node.name] = value
    prob = node.cpt.get_prob_from_dict(temp_state)
    # compute product
    for child in node.children:
      # get prob of child given its parents (including the node)
      prob *= child.cpt.get_prob_from_dict(temp_state)
    probs[value] = prob
  # normalize the prob dict
  normalized_probs = normalize(probs)
  # handle deterministic case
  if normalized_probs is None:
    return current_state[node.name] # reject change and return nodes current val
  else:
  # return a single sample from the calculated dist
    return weighted_sample(normalized_probs)

def normalize(prob_dict):
  # convert a dictionary of that maps a value to a number and normalize it so all numbers in it sum to 1
  # input: proability dictionary
  # output: normalized probability dictionary
  total = sum(prob_dict.values())
  # if all probs are zero make it a uniform dist
  if total == 0:
    return None
  else:
    return {value: prob/total for value, prob in prob_dict.items()}

def weighted_sample(prob_dict):
# take normalized dict of probs and returns single value sampled according to these probs
# the sampling part to be used in gibbs
  # input: prob dictionary
  # output: value sample from prob dict randomly, based on its weight

  # get random num between 1 and 0
  rand_val = random.random()
  # initialize a cum prob to keep track of probs we've seen
  cumulative_prob = 0.0
  # go through each item in dict
  for value, prob in prob_dict.items():
    # add each prob to our running total
    cumulative_prob += prob
    # check if random number falls within the prob (is less than)
    if rand_val < cumulative_prob:
      # if yes than choose this value
      return value
    # in case of rounding errors return last item in dict, shouldn't ever be reached
  return list(prob_dict.keys())[-1]

def run_gibbs(bayes_net, query, evidence, num_samples, burn_num, num_rums):
    # runs gibbs sampling a specfied number of times to get a result that is an average over all of these runs
    # input: bayes_net: bayes net object
    # query: name of variable to we are trying to find out about
    # evidence: dictionary for observed evidence
   # num_sample: number of samples to do
    # burn_num: number of intial samples to discard
    # num_runs: number of times to run gibbs sampling
    # output: prob distribution dictionary of query var that is average of all runs

    # get the query node to be used to initialize average dict
    query_node =  bayes_net.get_node(query)
    # use query node to initialize average dict
    avg_dist = {value: 0.0 for value in query_node.domain}

    for i in range(num_rums):
      # run gibbs sampling i times
      result_dist = gibbs(bayes_net, query, evidence, num_samples, burn_num)
      for value, prob in result_dist.items():
        # add run to a running total
        avg_dist[value] += prob
    # normalize (effectively find the average)
    final_average_dist = normalize(avg_dist)
    return final_average_dist


In [None]:
# VARIABLE ELIMINATION - Min-degree hueristic

def variable_elimination(bayes_net, query, evidence):
  # PARAMETERS
  # bayes_net: Bayesian network
  # query: query variable
  # evidence: dictionary for observed evidence
  # RETURNS
  # final_factor: a normalized Factor object

  # Initialize list of factors
  factors = []

  # Loop over all nodes in bayesian network
  for node_name, node in bayes_net.nodes.items():
    # Store selected nodes CPT in factor
    factor = node.cpt

    # Loop through every piece of observed evidence
    for evidence_variable, evidence_value in evidence.items():
      # If factor includes that variable, restrict factor (keep only rows consistent with evidence)
      if evidence_variable in factor.variable_names:
        # New dictionary for probabilities that match evidence
        new_probabilities = {}

        # Loop through every row of factor's probability table
        for assignment, probability in factor.probabilities.items():
          # Map variable names to assignment values
          row_dictionary = {}
          for i in range(len(factor.variable_names)):
            row_dictionary[factor.variable_names[i]] = assignment[i]

          # Check evidence
          if row_dictionary[evidence_variable] == evidence_value:

            # Remove evidence variable from assignment
            assignment_reduced = tuple(
              row_dictionary[var] for var in factor.variable_names if var != evidence_variable
            )
            new_probabilities[assignment_reduced] = probability

        # Build reduced factor
        new_variables = [v for v in factor.variables if v.name != evidence_variable]
        factor = Factor(new_variables, new_probabilities)

    factors.append(factor)

  # Min-degree elimination order
  # Variables you are allowed to eliminate
  elim_vars = [v for v in bayes_net.nodes.keys() if v not in query and v not in evidence]

  # Count how many factors contain given variable
  def factor_degree(var):
    return sum(1 for f in factors if var in f.variable_names)

  while elim_vars:

    # Pick variable appearing in the FEWEST factors (min-degree)
    variable = min(elim_vars, key=factor_degree)

    # Collect factors containing this variable
    factors_including_variable = [f for f in factors if variable in f.variable_names]

    if len(factors_including_variable) > 0:
      # Multiply them together
      new_factor = factors_including_variable[0]
      for f in factors_including_variable[1:]:
        new_factor = Factor.multiply_factors(new_factor, f)

      # Marginalize out the variable
      new_factor = Factor.marginalize(new_factor, variable)

      # Rebuild factor list
      factors = [f for f in factors if f not in factors_including_variable]
      factors.append(new_factor)

    elim_vars.remove(variable)

  # Multiply remaining factors
  final_factor = factors[0]
  for f in factors[1:]:
    final_factor = Factor.multiply_factors(final_factor, f)

  # Normalize
  probability_total = sum(final_factor.probabilities.values())
  normalized_probabilities = {
    assignment: prob / probability_total
    for assignment, prob in final_factor.probabilities.items()
  }
  final_factor = Factor(final_factor.variables, normalized_probabilities)

  # Marginalize everything but the query variable
  for var in list(final_factor.variable_names):
    if var != query:
      final_factor = Factor.marginalize(final_factor, var)

  return final_factor


In [None]:
# parse evidence string
def parse_evidence_string(evidence_str):
    #converts the semicolon-separated evidence string into a dictionary
    if not evidence_str:
        return {}

    evidence_dict = {}
    pairs = evidence_str.strip().split(';')
    for pair in pairs:
        key, value = pair.split('=', 1)
        evidence_dict[key.strip()] = value.strip()
    return evidence_dict

In [None]:
# FILE EXPORT
import csv
import os

def file_export(results_dict, group_id, algorithm, network_name, evidence_level):

    #Saves result probability distributions to CSV
# input:results_dict: dictionary that holds results via key and value which is another dictionary that holds variables and their values
# all other inputs are parameters
    base_name = os.path.basename(network_name)

    clean_network_name = os.path.splitext(base_name)[0]

    filename = f"{group_id}_{algorithm}_{clean_network_name}_{evidence_level}.csv"

    try:
        # open file
        with open(filename, 'w', newline='') as f:
            #CSV writer object
            writer = csv.writer(f)

            # Loop each variable in the results dict
            for variable_name, distribution_dict in results_dict.items():

                # Get the domain values
                domain_values = list(distribution_dict.keys())

                #get probs
                probabilities = list(distribution_dict.values())

                # variable row
                var_row = [variable_name] + domain_values
                writer.writerow(var_row)

               # prob row
                value_row = [''] + probabilities
                writer.writerow(value_row)
        return filename

    except IOError as e:
        print(f"Error: Could not write to file {filename}. {e}")
        return None
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None


In [None]:
# MAIN AUTOMATION// RUN EVERYTHING FROM HERE

bayes_net = load_bif(NETWORK_NAME)

evidence_dict = parse_evidence_string(EVIDENCE)

query_vars = [name.strip() for name in REPORT.split(';')]

final_results = {}

if(ALGORITHM == "gibbs"):
  # gibbs
  burn_num = 100
  samples_num = 1000
  num_runs = 100
  for query_name in query_vars:
    result_dist = run_gibbs(bayes_net, query_name, evidence_dict, samples_num, burn_num, num_runs)
    final_results[query_name] = result_dist
  file_export(final_results, GROUP_ID, ALGORITHM, NETWORK_NAME, EVIDENCE_LEVEL)

elif(ALGORITHM == "ve"):
  # variable elimination
  for query_name in query_vars:
    result_factor = variable_elimination(bayes_net, query_name, evidence_dict)
    # Convert Factor object to probability dictionary
    result_dict = {}
    for assignment, prob in result_factor.probabilities.items():
      # Simplify tuple to its single value
      if isinstance(assignment, tuple) and len(assignment) == 1:
        assignment = assignment[0]
      result_dict[str(assignment)] = prob
    final_results[query_name] = result_dict
  file_export(final_results, GROUP_ID, ALGORITHM, NETWORK_NAME, EVIDENCE_LEVEL)

else:
  print("unknown algorithm")
