In [2]:
!conda env list
!python -V
import sys
print(sys.executable)

# conda environments:
#
base                     /Applications/anaconda3
phd                   *  /Applications/anaconda3/envs/phd

Python 3.10.0
/Applications/anaconda3/envs/phd/bin/python


# Table of Contents
* [Library](#1)
* [Lung](#3)
    * [Rejection Sampler](#3.1)
    * [Message Passing](#3.2)
    * [Gibbs Sampling](#3.3)

In [1]:
# DATA ANALYSIS
import numpy as np
import pandas as pd
import random as rand

# PLOTS
# import matplotlib.pyplot as plt
# import seaborn as sns
# import plotly.express as px
# import plotly.graph_objects as go
# from plotly.subplots import make_subplots

# OTHERS
import math
import statistics as stats
import time as t
#import pickle
import functools as fc # contains reduce

# DEBUGGER
from IPython.core.debugger import set_trace
# For executing line by line use n and 
# for step into a function use s and 
# to exit from debugging prompt use c.
# REFACTOR: uninstall jupyter packages that I installed before, not using PyCharm built in debugger


# import color blind colors
colorblind = ['#377eb8', '#ff7f00', '#4daf4a',
              '#f781bf', '#a65628', '#984ea3',
              '#999999', '#e41a1c', '#dede00']

# Library <a class="anchor" id="1"></a>

In [3]:
# Utility functions
%load utils.py
import utils

## Inference Code <a class="anchor" id="1.2"></a>

### Belief propagation <a class="anchor" id="3.2"></a>
Also known as message passing

In [4]:
# Lung Function Model
class lungModel:
    
    # Names
    """
    i: inflammation
    bl: bacterial load
    w: wellness
    """
    # Marginal distributions
    marginal_i = {
        "absent": 0.3,
        "small": 0.5,
        "heavy": 0.2
    }
    # Conditional probability table (cpt) of varB knowing varA
    cpt_bl_i = {
        "absent": {
            "low": 0.6,
            "medium": 0.2,
            "high": 0.2
        },
        "small": {
            "low": 0.3,
            "medium": 0.3,
            "high": 0.4
        },
        "heavy": {
            "low": 0.1,
            "medium": 0.3,
            "high": 0.6
        }
    }
    cpt_w_i = {
        "absent": {
            1: 0.01,
            2: 0.09,
            3: 0.2,
            4: 0.3,
            5: 0.4
        },
        "small": {
            1: 0.2,
            2: 0.3,
            3: 0.3,
            4: 0.1,
            5: 0.1
        },
        "heavy": {
            1: 0.4,
            2: 0.3,
            3: 0.15,
            4: 0.1,
            5: 0.05
        }
    }
    cpt_FEV1_bl = {
        'low': {
            'low': 0.6,
            'medium': 0.3,
            'high': 0.2,
        },
        'medium': {
            'low': 0.3,
            'medium': 0.4,
            'high': 0.3,
        },
        'high': {
            'low': 0.1,
            'medium': 0.4,
            'high': 0.5,
        }
    }
    
    def sample(self):
        inflammation = utils.threeStatesSample(list(self.marginal_i.values()),
                                               list(self.marginal_i.keys()))
        
        wellness = utils.nStatesSample(list(self.cpt_w_i[inflammation].values()),
                                 list(self.cpt_w_i[inflammation].keys()))
        
        bacterial_load = utils.threeStatesSample(list(self.cpt_bl_i[inflammation].values()),
                                            list(self.cpt_bl_i[inflammation].keys()))
        
        FEV1 = utils.threeStatesSample(list(self.cpt_FEV1_bl[bacterial_load].values()),
                                list(self.cpt_FEV1_bl[bacterial_load].keys()))
        
        return {
            "Inflammation": inflammation,
            "Wellness": wellness,
            "Bacterial load": bacterial_load,
            "FEV1": FEV1
        }
model = lungModel()
model.sample()

{'Inflammation': 'absent',
 'Wellness': 4,
 'Bacterial load': 'low',
 'FEV1': 'low'}

In [5]:
# inference code
def get_cpt_obs(obs_B, cpt_B_A):

    cpt_B_A_bis = cpt_B_A
    for key_A, items_B in cpt_B_A.items():
        for key_B, p_B in items_B.items():
            # observed variable has probability 1, unobserved are 0
            if key_B != obs_B: cpt_B_A_bis[key_A][key_B] = 0
            else: cpt_B_A_bis[key_A][key_B] = p_B
    
    return cpt_B_A_bis
    
def get_second_level_keys(cpt_B_A):
    """
    returns a dict with second level keys and values set to 0
    """
    return {key : 0 for key in cpt_B_A[next(iter(cpt_B_A))]}

def get_first_level_keys(cpt_B_A):
    return {key : 0 for key in cpt_B_A.keys()}
    
def marginalise(cpt_B_A, dist_A):
    """
    marginalise out B out of cpt_B_A
    TODO: implement case where A is not marginal (recursion?)
    """
    
    dist_B = get_second_level_keys(cpt_B_A)
    # P(B) = sum_A(P(B | A) * P(A))
    for key_A, B_A in cpt_B_A.items():
        for key_B, B_value in B_A.items():
            dist_B[key_B] += B_value * dist_A[key_A]
    
    return dist_B

# factor node message
def message_down(dist, disp=1, label_A='A'):
    """
    dist: marginal distribution
    """
    if disp==1: print('marginal distribution: P(', label_A, ') =', dist, '\n')
    print("message down: ", dist)
    return dist

def message_up(A_dist, cpt_B_A, disp=1, label_A='A', label_B='B'):
    
    # compute normalisers, sum_B(P(B|A) * P(A))
    normalisers = get_first_level_keys(cpt_B_A)
    print(cpt_B_A)
    for key_A, items_B in cpt_B_A.items():
        for key_B, p_B_A in items_B.items():
            normalisers[key_A] += p_B_A * A_dist[key_A]

    normaliser = sum(list(normalisers.values()))
    
    # compute posteriors P(A|B) = P(B|A) * P(A) / sum_B(P(B|A))
    posteriors = get_first_level_keys(cpt_B_A)
    for key_A, items_B in cpt_B_A.items():
        for key_B, p_B_A in items_B.items():
            if p_B_A != 0:
                posteriors[key_A] = p_B_A * A_dist[key_A] / normaliser
    print("posteriors: ",posteriors)
    return posteriors

# variable node message
def propagate_belief(messages):
    # Multiply all messages
    beliefs=[]
    for key in messages[0].keys():
        belief=1
        for message in messages:
            print("message",message[key])
            belief *= message[key]
        print("belief", belief)
        beliefs.append(belief)
    
    # Normalise and format
    normalised_beliefs = utils.normalise_list(beliefs)
    p_A_given_B = dict(zip(messages[0].keys(), normalised_beliefs))
    
    print('Result of belief propagation:', p_A_given_B)
    return p_A_given_B


In [6]:
# define model
m = lungModel()

# CASE 1

obs_bacterial_load = 'medium'

cpt_bl_obs_i = get_cpt_obs(obs_bacterial_load,m.cpt_bl_i)

# belief propagation
i_bl = message_up(m.marginal_i, cpt_bl_obs_i)

i = message_down(m.marginal_i)

belief1 = propagate_belief([i_bl, i])


{'absent': {'low': 0, 'medium': 0.2, 'high': 0}, 'small': {'low': 0, 'medium': 0.3, 'high': 0}, 'heavy': {'low': 0, 'medium': 0.3, 'high': 0}}
posteriors:  {'absent': 0.2222222222222222, 'small': 0.5555555555555555, 'heavy': 0.2222222222222222}
marginal distribution: P( A ) = {'absent': 0.3, 'small': 0.5, 'heavy': 0.2} 

message down:  {'absent': 0.3, 'small': 0.5, 'heavy': 0.2}
message 0.2222222222222222
message 0.3
belief 0.06666666666666667
message 0.5555555555555555
message 0.5
belief 0.27777777777777773
message 0.2222222222222222
message 0.2
belief 0.044444444444444446
Result of belief propagation: {'absent': 0.17142857142857146, 'small': 0.7142857142857143, 'heavy': 0.11428571428571431}


In [86]:
# CASE 2

obs_bacterial_load = 'medium'
obs_wellness = 1

cpt_bl_obs_i = get_cpt_obs(obs_bacterial_load,m.cpt_bl_i)
cpt_w_obs_i = get_cpt_obs(obs_wellness,m.cpt_w_i)

# belief propagation
i_bl = message_up(m.marginal_i, cpt_bl_obs_i)
i_w = message_up(m.marginal_i, cpt_w_obs_i)
i = message_down(m.marginal_i)

belief1 = propagate_belief([i_bl, i_w, i])

{'absent': {'low': 0, 'medium': 0.2, 'high': 0}, 'small': {'low': 0, 'medium': 0.3, 'high': 0}, 'heavy': {'low': 0, 'medium': 0.3, 'high': 0}}
posteriors:  {'absent': 0.2222222222222222, 'small': 0.5555555555555555, 'heavy': 0.2222222222222222}
{'absent': {1: 0.01, 2: 0, 3: 0, 4: 0, 5: 0}, 'small': {1: 0.2, 2: 0, 3: 0, 4: 0, 5: 0}, 'heavy': {1: 0.4, 2: 0, 3: 0, 4: 0, 5: 0}}
posteriors:  {'absent': 0.016393442622950817, 'small': 0.5464480874316939, 'heavy': 0.43715846994535523}
marginal distribution: P( A ) = {'absent': 0.3, 'small': 0.5, 'heavy': 0.2} 

message down:  {'absent': 0.3, 'small': 0.5, 'heavy': 0.2}
message 0.2222222222222222
message 0.016393442622950817
message 0.3
belief 0.0010928961748633878
message 0.5555555555555555
message 0.5464480874316939
message 0.5
belief 0.15179113539769273
message 0.2222222222222222
message 0.43715846994535523
message 0.2
belief 0.01942926533090468
Result of belief propagation: {'absent': 0.006342494714587738, 'small': 0.8809020436927413, 'heav

In [None]:
# CASE 3
# observations
obs_FEV1 = 'low'
obs_wellness = 1

cpt_bl_obs_i = get_cpt_obs(obs_bacterial_load, m.cpt_FEV1_bl)

# belief propagation
bl_FEV1 = message_up(m.cpt_FEV1_bl, cpt_bl_obs_i)
i_w = message_up(m.marginal_i, m.cpt_w_i, obs_wellness)
i = message_down(m.marginal_i)

belief1 = propagate_belief([i_bl, i_w, i])

## Gibbs sampling <a class="anchor" id="3.3"></a>

In [None]:
# Gibbs inference code
def gibbs_inference_code(target_X, target_Y, modelToRun=lungModel, n=100000):


    # Random initialisation
    sample = modelToRun()
    i = [sample[target_X]]
    b = [sample[target_Y]]
    print('Initial state:', I, '=', i, ',', B, '=', b)

    # Loop
    # TODO  review function arguments to avoid global variables
    for j in np.arange(n):
        b.append(threeStatesSample(bacterial_load_p.get(i[j]), ["low", "medium", "high"]))
        i_posteriors = compute_posterior(I, B, b[j+1], conditional_probability_table, disp=0)
        i.append(threeStatesSample([i_posteriors[key] for key in i_posteriors.keys()], list(i_posteriors.keys())))

    return b, i

# Data processing
def get_marginal(name, list):
    '''
    name: identifier
    list: a list of values
    output: distinct values count in list, and related proportions (probability estimate)
    '''
    marginal = pd.DataFrame(data={name: list})
    marginal = pd.DataFrame(data={'count': marginal.value_counts()}).reset_index()
    marginal['P(count)'] = marginal['count']/marginal['count'].sum()
    
    return marginal

In [None]:
# Gibbs sampling implementation for two variables

# Variables
I = 'Inflammation'
B = 'Bacterial load'
    
n_high=100000; n_low=1000
b, i = gibbs_inference_code(I, B, n=n_high)
b_fuzzy, i_fuzzy = gibbs_inference_code(I, B, n=n_low)

# Marginal distribution of inflammation
i_marginal = get_marginal(I, i)
i_marginal_fuzzy = get_marginal(I, i_fuzzy)
i_marginal

In [None]:
# Marginal distribution of bacterial load
b_marginal = get_marginal(B, b)
b_marginal

In [None]:
plotFigure(i_marginal, I, column=1, title="Marginal distribution for inflammation for "+str(n_high)+" iterations")
plotFigure(i_marginal_fuzzy, I, column=1, title="Marginal distribution for inflammation "+str(n_low)+" iterations")
plotFigure(b_marginal, B, column=1, title="Marginal distribution for inflammation for "+str(n_high)+" iterations")