In [3]:
from utils import *

In [5]:
#Classe de distribuição de probabilidade discreta
class ProbDist:

    def __init__(self, varname='?', freqs=None):

        self.prob = {}
        self.varname = varname
        self.values = []
        if freqs:
            for (v, p) in freqs.items():
                self[v] = p
            self.normalize()

    def __getitem__(self, val):

        try:
            return self.prob[val]
        except KeyError:
            return 0

    def __setitem__(self, val, p):

        if val not in self.values:
            self.values.append(val)
        self.prob[val] = p

    def normalize(self):

        total = sum(self.prob.values())
        if not np.isclose(total, 1.0):
            for val in self.prob:
                self.prob[val] /= total
        return self

    def show_approx(self, numfmt='{:.3g}'):

        return ', '.join([('{}: ' + numfmt).format(v, p)
                          for (v, p) in sorted(self.prob.items())])

    def __repr__(self):
        return "P({})".format(self.varname)

In [6]:
#Método que retorna uma tupla dos valores das variaveis
def event_values(event, variables):

    if isinstance(event, tuple) and len(event) == len(variables):
        return event
    else:
        return tuple([event[var] for var in variables])

In [7]:
#Classe dos nós da rede
class BayesNode:

    def __init__(self, X, parents, cpt):
        
        if isinstance(parents, str):
            parents = parents.split()

        if isinstance(cpt, (float, int)):
            cpt = {(): cpt}
        elif isinstance(cpt, dict):

            if cpt and isinstance(list(cpt.keys())[0], bool):
                cpt = {(v,): p for v, p in cpt.items()}

        assert isinstance(cpt, dict)
        for vs, p in cpt.items():
            assert isinstance(vs, tuple) and len(vs) == len(parents)
            assert all(isinstance(v, bool) for v in vs)
            assert 0 <= p <= 1

        self.variable = X
        self.parents = parents
        self.cpt = cpt
        self.children = []

    def p(self, value, event):
        
        assert isinstance(value, bool)
        ptrue = self.cpt[event_values(event, self.parents)]
        return ptrue if value else 1 - ptrue

    def sample(self, event):
    
        return probability(self.p(True, event))

    def __repr__(self):
        return repr((self.variable, ' '.join(self.parents)))

In [8]:
#Classe da rede bayesiana    
class BayesNet:

    def __init__(self, node_specs=None):
       
        self.nodes = []
        self.variables = []
        node_specs = node_specs or []
        for node_spec in node_specs:
            self.add(node_spec)

    def add(self, node_spec):
        
        node = BayesNode(*node_spec)
        assert node.variable not in self.variables
        assert all((parent in self.variables) for parent in node.parents)
        self.nodes.append(node)
        self.variables.append(node.variable)
        for parent in node.parents:
            self.variable_node(parent).children.append(node)

    def variable_node(self, var):

        for n in self.nodes:
            if n.variable == var:
                return n
        raise Exception("Nenhuma variavel: {}".format(var))

    def variable_values(self, var):
        
        return [True, False]

    def __repr__(self):
        return 'BayesNet({0!r})'.format(self.nodes)

In [9]:
#Algoritmo de amostragem ponderada
def weighted_sample(bn, e):

    w = 1
    event = dict(e)
    for node in bn.nodes:
        Xi = node.variable
        if Xi in e:
            w *= node.p(e[Xi], event)
        else:
            event[Xi] = node.sample(event)
    return event, w  
            

def likelihood_weighting(X, e, bn, N):
   
    W = {x: 0 for x in bn.variable_values(X)}
    for j in range(N):
        sample, weight = weighted_sample(bn, e) 
        W[sample[X]] += weight
    return ProbDist(X, W)

In [10]:
#Algoritmo de amostragem de Gibbs
def gibbs_ask(X, e, bn, N):
   
    assert X not in e, "A variavel de consulta deve ser diferente da evidencia"
    counts = {x: 0 for x in bn.variable_values(X)}
    Z = [var for var in bn.variables if var not in e]
    state = dict(e)
    for Zi in Z:
        state[Zi] = random.choice(bn.variable_values(Zi))
    for j in range(N):
        for Zi in Z:
            state[Zi] = markov_blanket_sample(Zi, state, bn)
            counts[state[X]] += 1
    return ProbDist(X, counts)

def markov_blanket_sample(X, e, bn):

    Xnode = bn.variable_node(X)
    Q = ProbDist(X)
    for xi in bn.variable_values(X):
        ei = extend(e, X, xi)
        Q[xi] = Xnode.p(xi, e) * product(Yj.p(ei[Yj.variable], ei) for Yj in Xnode.children)
    
    return probability(Q.normalize()[True])

In [12]:
T, F = True, False

#Criando rede
rede = BayesNet([('B', '', 0.9),
                 ('M', '', 0.1),
                 ('I', 'B M', {(T, T): 0.9, (T, F): 0.5, (F, T): 0.5, (F, F): 0.1}),
                 ('G', 'B I M', {(T, T, T): 0.9, (T, T, F): 0.8, (T, F, T): 0.0, (T, F, F): 0.0, (F, T, T): 0.2, (F, T, F): 0.1, (F, F, T): 0.0, (F, F, F): 0.0}),
                 ('J', 'G', {T: 0.9, F: 0.0}),])

In [13]:
#Executando algoritmo de amostragem ponderada na rede
likelihood_weighting('G', dict(B=T, I=T, M=T), rede, 10000).show_approx()

'False: 0.1, True: 0.9'

In [15]:
#Executando algoritmo de amostragem de Gibbs na rede
gibbs_ask('G', dict(B=T, I=T, M=T), rede, 10000).show_approx()

'False: 0.0967, True: 0.903'