# Reading Data
- #### features with discrete values

In [1]:
#############################################################################
# NB_Classifier.py
#
# Implements the Naive Bayes classifier for simple probabilistic inference.
# It assumes the existance of data in CSV format, where the first line contains
# the names of random variables -- the last being the variable to predict.
# This implementation aims to be agnostic of the data (no hardcoded vars/probs)
#
# Version: 1.0, Date: 03 October 2022
# Version: 1.1, Date: 03 October 2023 more compatible with CPT_Generator
# Version: 1.2, Date: 08 October 2023 more compatible with ModelEvaluator
# Contact: hcuayahuitl@lincoln.ac.uk
#############################################################################

import sys
import math
import time


class NB_Classifier:
    rand_vars = []
    rv_key_values = {}
    rv_all_values = []
    predictor_variable = None
    num_data_instances = 0
    default_missing_count = 0.000001
    probabilities = {}
    predictions = []
    log_probabilities = False

    def __init__(self, file_name, fitted_model=None):
        if file_name is None:
            return
        else:
            self.read_data(file_name)

        if fitted_model is None:
            self.training_time = time.time()
            self.estimate_probabilities()
            self.training_time = time.time() - self.training_time

        else:
            self.inference_time = time.time()
            self.rv_key_values = fitted_model.rv_key_values
            self.probabilities = fitted_model.probabilities
            self.training_time = fitted_model.training_time
            self.test_learnt_probabilities(file_name)
            self.inference_time = time.time() - self.inference_time

    def read_data(self, data_file):
        print("\nREADING data file %s..." % (data_file))
        print("---------------------------------------")

        self.rand_vars = []
        self.rv_key_values = {}
        self.rv_all_values = []

        with open(data_file) as csv_file:
            for line in csv_file:
                line = line.strip()
                if len(self.rand_vars) == 0:
                    self.rand_vars = line.split(',')
                    for variable in self.rand_vars:
                        self.rv_key_values[variable] = []
                else:
                    values = line.split(',')
                    self.rv_all_values.append(values)
                    self.update_variable_key_values(values)
                    self.num_data_instances += 1

        self.predictor_variable = self.rand_vars[len(self.rand_vars)-1]

        print("RANDOM VARIABLES=%s" % (self.rand_vars))
        print("VARIABLE KEY VALUES=%s" % (self.rv_key_values))
        print("VARIABLE VALUES (first 10)=%s" % (self.rv_all_values[:10]))
        print("PREDICTOR VARIABLE=%s" % (self.predictor_variable))
        print("|data instances|=%d" % (self.num_data_instances))

    def update_variable_key_values(self, values):
        for i in range(0, len(self.rand_vars)):
            variable = self.rand_vars[i]
            key_values = self.rv_key_values[variable]
            value_in_focus = values[i]
            if value_in_focus not in key_values:
                self.rv_key_values[variable].append(value_in_focus)

    def estimate_probabilities(self):
        countings = self.estimate_countings()
        prior_counts = countings[self.predictor_variable]

        print("\nESTIMATING probabilities...")
        for variable, counts in countings.items():
            prob_distribution = {}
            for key, val in counts.items():
                variables = key.split('|')

                if len(variables) == 1:
                    # prior probability
                    probability = float(val/self.num_data_instances)
                else:
                    # conditional probability
                    probability = float(val/prior_counts[variables[1]])

                if self.log_probabilities is False:
                    prob_distribution[key] = probability
                else:
                    # convert probability to log probability
                    prob_distribution[key] = math.log(probability)

            self.probabilities[variable] = prob_distribution

        for variable, prob_dist in self.probabilities.items():
            prob_mass = 0
            for value, prob in prob_dist.items():
                prob_mass += prob
            print("P(%s)=>%s\tSUM=%f" % (variable, prob_dist, prob_mass))

    def estimate_countings(self):
        print("\nESTIMATING countings...")

        countings = {}
        for variable_index in range(0, len(self.rand_vars)):
            variable = self.rand_vars[variable_index]

            if variable_index == len(self.rand_vars)-1:
                # prior counts
                countings[variable] = self.get_counts(None)
            else:
                # conditional counts
                countings[variable] = self.get_counts(variable_index)

        print("countings="+str(countings))
        return countings

    def get_counts(self, variable_index):
        counts = {}
        predictor_index = len(self.rand_vars)-1

        # accumulate countings
        for values in self.rv_all_values:
            if variable_index is None:
                # case: prior probability
                value = values[predictor_index]
            else:
                # case: conditional probability
                value = values[variable_index]+"|"+values[predictor_index]

            try:
                counts[value] += 1
            except Exception:
                counts[value] = 1

        # verify countings by checking missing prior/conditional counts
        if variable_index is None:
            counts = self.check_missing_prior_counts(counts)
        else:
            counts = self.check_missing_conditional_counts(counts, variable_index)

        return counts

    def check_missing_prior_counts(self, counts):
        for var_val in self.rv_key_values[self.predictor_variable]:
            if var_val not in counts:
                print("WARNING: missing count for variable=" % (var_val))
                counts[var_val] = self.default_missing_count

        return counts

    def check_missing_conditional_counts(self, counts, variable_index):
        variable = self.rand_vars[variable_index]
        for var_val in self.rv_key_values[variable]:
            for pred_val in self.rv_key_values[self.predictor_variable]:
                pair = var_val+"|"+pred_val
                if pair not in counts:
                    print("WARNING: missing count for variables=%s" % (pair))
                    counts[pair] = self.default_missing_count

        return counts

    def test_learnt_probabilities(self, file_name):
        print("\nEVALUATING on "+str(file_name))

        # iterate over all instances in the test data
        for instance in self.rv_all_values:
            distribution = {}
            print("Input vector=%s" % (instance))

            # iterate over all values in the predictor variable
            for predictor_value in self.rv_key_values[self.predictor_variable]:
                prob_dist = self.probabilities[self.predictor_variable]
                prob = prob_dist[predictor_value]

                # iterate over all instance values except the predictor var.
                for value_index in range(0, len(instance)-1):
                    variable = self.rand_vars[value_index]
                    value = instance[value_index]
                    prob_dist = self.probabilities[variable]
                    cond_prob = value+"|"+predictor_value

                    if self.log_probabilities is False:
                        prob *= prob_dist[cond_prob]
                    else:
                        prob += prob_dist[cond_prob]

                distribution[predictor_value] = prob

            normalised_dist = self.get_normalised_distribution(distribution)
            self.predictions.append(normalised_dist)
            print("UNNORMALISED DISTRIBUTION=%s" % (distribution))
            print("NORMALISED DISTRIBUTION=%s" % (normalised_dist))
            print("---")

    def get_normalised_distribution(self, distribution):
        normalised_dist = {}
        prob_mass = 0
        for var_val, prob in distribution.items():
            prob = math.exp(prob) if self.log_probabilities is True else prob
            prob_mass += prob

        for var_val, prob in distribution.items():
            prob = math.exp(prob) if self.log_probabilities is True else prob
            normalised_prob = prob/prob_mass
            normalised_dist[var_val] = normalised_prob

        return normalised_dist


# if __name__ == "__main__":
#     if len(sys.argv) != 3:
#         print("USAGE: NB_Classifier.py [train_file.csv] [test_file.csv]")
#         exit(0)
#     else:
#         file_name_train = sys.argv[1]
#         file_name_test = sys.argv[2]
#         nb_fitted = NB_Classifier(file_name_train)
#         nb_tester = NB_Classifier(file_name_test, nb_fitted)

# if __name__ == "__main__":
#    file_name_train = "dataset/diabetes_data-discretized-train.csv"
#    file_name_test = "dataset/diabetes_data-discretized-test.csv"
#    nb_fitted = NB_Classifier(file_name_train)
#    nb_tester = NB_Classifier(file_name_test, nb_fitted)


# Reading Configuration File

In [5]:
#############################################################################
# BayesNetReader.py
#
# Reads a configuration file containing the specification of a Bayes net.
# It generates a dictionary of key-value pairs containing information
# describing the random variables, structure, and conditional probabilities.
# This implementation aims to be agnostic of the data (no hardcoded vars/probs)
#
# Keys expected: name, random_variables, structure, and CPTs.
# Separators: COLON(:) for key-values, EQUALS(=) for table_entry-probabilities
# Example configuration file: see config_alarm.txt (from workshop of week 3)
#
# Version: 1.0
# Date: 06 October 2022
# Contact: hcuayahuitl@lincoln.ac.uk
#############################################################################

import sys


class BayesNetReader:
    bn = {}

    def __init__(self, file_name):
        self.read_data(file_name)
        # self.tokenise_data()

    def read_data(self, data_file):
        print("\nREADING data file %s..." % (data_file))

        with open(data_file) as cfg_file:
            key = None
            value = None
            for line in cfg_file:
                line = line.strip()
                if len(line) == 0:
                    continue

                tokens = line.split(":")
                if len(tokens) == 2:
                    if value is not None:
                        self.bn[key] = value
                        value = None

                    key = tokens[0]
                    value = tokens[1]
                else:
                    value += tokens[0]

        self.bn[key] = value
        self.bn["random_variables_raw"] = self.bn["random_variables"]
        print("RAW key-values="+str(self.bn))

    # def tokenise_data(self):
    #     print("TOKENISING data...")
    #     rv_key_values = {}

    #     for key, values in self.bn.items():

    #         if key == "random_variables":
    #             var_set = []
    #             for value in values.split(";"):
    #                 if value.find("(") and value.find(")"):
    #                     value = value.replace('(', ' ')
    #                     value = value.replace(')', ' ')
    #                     parts = value.split(' ')
    #                     var_set.append(parts[1].strip())
    #                 else:
    #                     var_set.append(value)
    #             self.bn[key] = var_set

    #         elif key.startswith("CPT"):
    #             # store Conditional Probability Tables (CPTs) as dictionaries
    #             cpt = {}
    #             sum = 0
    #             for value in values.split(";"):
    #                 pair = value.split("=")
    #                 cpt[pair[0]] = float(pair[1])
    #                 sum += float(pair[1])
    #             print("key=%s cpt=%s sum=%s" % (key, cpt, sum))
    #             self.bn[key] = cpt

    #             # store unique values for each random variable
    #             if key.find("|") > 0:
    #                 rand_var = key[4:].split("|")[0]
    #             else:
    #                 rand_var = key[4:].split(")")[0]
    #             unique_values = list(cpt.keys())
    #             rv_key_values[rand_var] = unique_values

    #         else:
    #             values = values.split(";")
    #             if len(values) > 1:
    #                 self.bn[key] = values

    #     self.bn['rv_key_values'] = rv_key_values
    #     print("TOKENISED key-values="+str(self.bn))


if __name__ == "__main__":
    # if len(sys.argv) != 2:
    #     print("USAGE: BayesNetReader.py [your_config_file.txt]")
    # else:
    #     # file_name = sys.argv[1]
        file_name = "diabetes-config.txt"
        BayesNetReader(file_name)



READING data file diabetes-config.txt...
RAW key-values={'name': ' Diabetes_BayesianNetwork', 'random_variables': ' Pregnancies;Glucose;BloodPressure;SkinThickness;Insulin;BMI;DiabetesPedigreeFunction;Age;Outcome', 'structure': ' P(Outcome); P(Glucose|Outcome);P(BMI|Outcome);P(Age|Outcome)', 'random_variables_raw': ' Pregnancies;Glucose;BloodPressure;SkinThickness;Insulin;BMI;DiabetesPedigreeFunction;Age;Outcome'}


# CPT Generator

In [41]:
import pandas as pd
from pgmpy.models import NaiveBayes
from pgmpy.estimators import MaximumLikelihoodEstimator
from pgmpy.inference import VariableElimination

# Load the data
train_data = pd.read_csv('dataset/diabetes_data-discretized-train.csv')
test_data = pd.read_csv('dataset/diabetes_data-discretized-test.csv', usecols=['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age'])


# Define the model structure (Naive Bayes)
model = NaiveBayes()
model.add_node('Outcome')  # Target variable
model.add_nodes_from(['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI',
                      'DiabetesPedigreeFunction', 'Age'])  # Other variables

# Fit the model using Maximum Likelihood Estimation
estimator = MaximumLikelihoodEstimator(model, train_data)
cdp_Outcome = estimator.estimate_cpd('Outcome')  # Estimate Conditional Probability Distribution
cdp_Glucose = estimator.estimate_cpd('Glucose')
cdp_Pregnancies = estimator.estimate_cpd('Pregnancies')
cdp_BloodPressure = estimator.estimate_cpd('BloodPressure')
cdp_SkinThickness = estimator.estimate_cpd('SkinThickness')
cdp_Insulin = estimator.estimate_cpd('Insulin')
cdp_Age = estimator.estimate_cpd('Age') 
cdp_BMI = estimator.estimate_cpd('BMI') 
cdp_DiabetesPedigreeFunction = estimator.estimate_cpd('DiabetesPedigreeFunction') 


print('cdp_Outcome\n',cdp_Outcome)
print('cdp_Glucose\n',cdp_Glucose)
print('\ncdp_Age\n',cdp_Age)
print('\ncdp_BMI\n',cdp_BMI)

model.add_cpds(cdp_Glucose, cdp_Age, cdp_BMI, cdp_Outcome, cdp_Pregnancies, cdp_BloodPressure, cdp_SkinThickness, cdp_Insulin, cdp_DiabetesPedigreeFunction)

# Create an instance of the VariableElimination class
ve = VariableElimination(model)

# Perform inference on the test set
# Example evidence (replace 'value' with actual values from your test dataset)
# evidence = {'Pregnancies': 'value', 'Glucose': 'value', 'BloodPressure': 'value',
#             'SkinThickness': 'value', 'Insulin': 'value', 'BMI': 'value',
#             'DiabetesPedigreeFunction': 'value', 'Age': 'value'}
evidence = test_data.to_dict(orient='records')[10]


# Perform inference on the 'Outcome' variable given the evidence
result = ve.query(variables=['Outcome'], evidence=evidence, joint=False)
print(result)

cdp_Outcome
 +------------+----------+
| Outcome(0) | 0.646766 |
+------------+----------+
| Outcome(1) | 0.353234 |
+------------+----------+
cdp_Glucose
 +------------+-----------+
| Glucose(0) | 0.0066335 |
+------------+-----------+
| Glucose(1) | 0.116086  |
+------------+-----------+
| Glucose(2) | 0.416252  |
+------------+-----------+
| Glucose(3) | 0.296849  |
+------------+-----------+
| Glucose(4) | 0.131012  |
+------------+-----------+
| Glucose(5) | 0.0331675 |
+------------+-----------+

cdp_Age
 +--------+-----------+
| Age(1) | 0.0829187 |
+--------+-----------+
| Age(2) | 0.538972  |
+--------+-----------+
| Age(3) | 0.205638  |
+--------+-----------+
| Age(4) | 0.107794  |
+--------+-----------+
| Age(5) | 0.0646766 |
+--------+-----------+

cdp_BMI
 +--------+------------+
| BMI(0) | 0.00995025 |
+--------+------------+
| BMI(1) | 0.111111   |
+--------+------------+
| BMI(2) | 0.359867   |
+--------+------------+
| BMI(3) | 0.391376   |
+--------+------------+
| BM

TypeError: NaiveBayes.active_trail_nodes() got an unexpected keyword argument 'variables'

In [42]:
import pandas as pd
from pgmpy.models import NaiveBayes
from pgmpy.estimators import MaximumLikelihoodEstimator

# Load the data
train_data = pd.read_csv('dataset/diabetes_data-discretized-train.csv')
test_data = pd.read_csv('dataset/diabetes_data-discretized-test.csv', usecols=['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age'])

# Define the model structure (Naive Bayes)
model = NaiveBayes()
model.add_node('Outcome')  # Target variable
model.add_nodes_from(['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI',
                      'DiabetesPedigreeFunction', 'Age'])  # Other variables

# Fit the model using Maximum Likelihood Estimation for each feature variable
estimator = MaximumLikelihoodEstimator(model, train_data)
for feature in ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI',
                'DiabetesPedigreeFunction', 'Age']:
    cpd = estimator.estimate_cpd(feature, prior_type="dirichlet", pseudo_counts=0.5)
    model.add_cpds(cpd)

# Estimate CPD for the target variable 'Outcome'
cpd_Outcome = estimator.estimate_cpd('Outcome', prior_type="dirichlet", pseudo_counts=0.5)
model.add_cpds(cpd_Outcome)

# Create an instance of the VariableElimination class
ve = VariableElimination(model)

# Perform inference on the test set
evidence = test_data.to_dict(orient='records')[10]

# Perform inference on the 'Outcome' variable given the evidence
result = ve.query(variables=['Outcome'], evidence=evidence, joint=False)
print(result)


TypeError: MaximumLikelihoodEstimator.estimate_cpd() got an unexpected keyword argument 'prior_type'

In [35]:
import pandas as pd
from pgmpy.models import BayesianNetwork
from pgmpy.estimators import MaximumLikelihoodEstimator
from pgmpy.inference import VariableElimination

# Load your data (make sure your data is loaded as a pandas DataFrame)
data = pd.read_csv('dataset/diabetes_data-discretized-train.csv')
data_test = pd.read_csv('dataset/diabetes_data-discretized-test.csv', usecols=['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age'])

# Define the structure of your Bayesian Network
model = BayesianNetwork([('Pregnancies', 'Outcome'), ('Glucose', 'Outcome'), ('BloodPressure', 'Outcome'),
                         ('SkinThickness', 'Outcome'), ('Insulin', 'Outcome'), ('BMI', 'Outcome'),
                         ('DiabetesPedigreeFunction', 'Outcome'), ('Age', 'Outcome')])

# Estimate the CPDs using Maximum Likelihood Estimation
model.fit(data, estimator=MaximumLikelihoodEstimator)

# Create an instance of the VariableElimination class
ve = VariableElimination(model)




In [36]:
# Convert test data to a dictionary to use as evidence
evidence = data_test.to_dict(orient='records')[10]

# Perform inference on the 'Outcome' variable given the evidence
result = ve.query(variables=['Outcome'], evidence=evidence)
print(result)

+------------+----------------+
| Outcome    |   phi(Outcome) |
| Outcome(0) |         0.5000 |
+------------+----------------+
| Outcome(1) |         0.5000 |
+------------+----------------+
