Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gibbs sampling #457

Merged
merged 5 commits into from Aug 31, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
192 changes: 191 additions & 1 deletion pgmpy/inference/Sampling.py
@@ -1,11 +1,13 @@
from collections import namedtuple
import itertools

import networkx as nx
import numpy as np
from pandas import DataFrame

from pgmpy.factors.Factor import Factor, factor_product
from pgmpy.inference import Inference
from pgmpy.models import BayesianModel
from pgmpy.models import BayesianModel, MarkovChain, MarkovModel
from pgmpy.utils.mathext import sample_discrete


Expand Down Expand Up @@ -194,3 +196,191 @@ def likelihood_weighted_sample(self, evidence=None, size=1):
sampled[node] = list(map(lambda t: State(node, t),
sample_discrete(states, cpd.values, size)))
return sampled


class GibbsSampling(MarkovChain):
"""
Class for performing Gibbs sampling.

Parameters:
-----------
model: BayesianModel or MarkovModel
Model from which variables are inherited and transition probabilites computed.

Public Methods:
---------------
set_start_state(state)
sample(start_state, size)
generate_sample(start_state, size)

Examples:
---------
Initialization from a BayesianModel object:
>>> from pgmpy.factors import TabularCPD
>>> from pgmpy.models import BayesianModel
>>> intel_cpd = TabularCPD('intel', 2, [[0.7], [0.3]])
>>> sat_cpd = TabularCPD('sat', 2, [[0.95, 0.2], [0.05, 0.8]], evidence=['intel'], evidence_card=[2])
>>> student = BayesianModel()
>>> student.add_nodes_from(['intel', 'sat'])
>>> student.add_edge('intel', 'sat')
>>> student.add_cpds(intel_cpd, sat_cpd)

>>> from pgmpy.inference import GibbsSampling
>>> gibbs_chain = GibbsSampling(student)

Sample from it:
>>> gibbs_chain.sample(size=3)
intel sat
0 0 0
1 0 0
2 1 1
"""
def __init__(self, model=None):
super().__init__()
if isinstance(model, BayesianModel):
self._get_kernel_from_bayesian_model(model)
elif isinstance(model, MarkovModel):
self._get_kernel_from_markov_model(model)

def _get_kernel_from_bayesian_model(self, model):
"""
Computes the Gibbs transition models from a Bayesian Network.
'Probabilistic Graphical Model Principles and Techniques', Koller and
Friedman, Section 12.3.3 pp 512-513.

Parameters:
-----------
model: BayesianModel
The model from which probabilities will be computed.
"""
self.variables = np.array(model.nodes())
self.cardinalities = {var: model.get_cpds(var).variable_card for var in self.variables}
for var in self.variables:
other_vars = [v for v in self.variables if var != v]
other_cards = [self.cardinalities[v] for v in other_vars]
cpds = [cpd for cpd in model.cpds if var in cpd.scope()]
prod_cpd = factor_product(*cpds)
kernel = {}
scope = set(prod_cpd.scope())
for tup in itertools.product(*[range(card) for card in other_cards]):
states = [State(var, s) for var, s in zip(other_vars, tup) if var in scope]
prod_cpd_reduced = prod_cpd.reduce(states, inplace=False)
kernel[tup] = prod_cpd_reduced.values / sum(prod_cpd_reduced.values)
self.transition_models[var] = kernel

def _get_kernel_from_markov_model(self, model):
"""
Computes the Gibbs transition models from a Markov Network.
'Probabilistic Graphical Model Principles and Techniques', Koller and
Friedman, Section 12.3.3 pp 512-513.

Parameters:
-----------
model: MarkovModel
The model from which probabilities will be computed.
"""
self.variables = np.array(model.nodes())
factors_dict = {var: [] for var in self.variables}
for factor in model.get_factors():
for var in factor.scope():
factors_dict[var].append(factor)

# Take factor product
factors_dict = {var: factor_product(*factors) if len(factors) > 1 else factors[0]
for var, factors in factors_dict.items()}
self.cardinalities = {var: factors_dict[var].get_cardinality(var)[var] for var in self.variables}
for var in self.variables:
other_vars = [v for v in self.variables if var != v]
other_cards = [self.cardinalities[v] for v in other_vars]
kernel = {}
factor = factors_dict[var]
scope = set(factor.scope())
for tup in itertools.product(*[range(card) for card in other_cards]):
states = [State(var, s) for var, s in zip(other_vars, tup) if var in scope]
reduced_factor = factor.reduce(states, inplace=False)
kernel[tup] = reduced_factor.values / sum(reduced_factor.values)
self.transition_models[var] = kernel

def sample(self, start_state=None, size=1):
"""
Sample from the Markov Chain.

Parameters:
-----------
start_state: dict or array-like iterable
Representing the starting states of the variables. If None is passed, a random start_state is chosen.
size: int
Number of samples to be generated.

Return Type:
------------
pandas.DataFrame

Examples:
---------
>>> from pgmpy.factors import Factor
>>> from pgmpy.inference import GibbsSampling
>>> from pgmpy.models import MarkovModel
>>> model = MarkovModel([('A', 'B'), ('C', 'B')])
>>> factor_ab = Factor(['A', 'B'], [2, 2], [1, 2, 3, 4])
>>> factor_cb = Factor(['C', 'B'], [2, 2], [5, 6, 7, 8])
>>> model.add_factors(factor_ab, factor_cb)
>>> gibbs = GibbsSampling(model)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pratyakshs Got this error while trying to run this:

In [11]: gibbs = GibbsSampling(model)
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-11-df5eb6774937> in <module>()
----> 1 gibbs = GibbsSampling(model)

/home/ubuntu/pgmpy/pgmpy/inference/Sampling.py in __init__(self, model)
    242             self._get_kernel_from_bayesian_model(model)
    243         elif isinstance(model, MarkovModel):
--> 244             self._get_kernel_from_markov_model(model)
    245 
    246     def _get_kernel_from_bayesian_model(self, model):

/home/ubuntu/pgmpy/pgmpy/inference/Sampling.py in _get_kernel_from_markov_model(self, model)
    290 
    291         # Take factor product
--> 292         factors_dict = {var: Factor.product(*factors) for var, factors in factors_dict.items()}
    293         self.cardinalities = {var: factors_dict[var].get_cardinality(var) for var in self.variables}
    294         for var in self.variables:

/home/ubuntu/pgmpy/pgmpy/inference/Sampling.py in <dictcomp>(.0)
    290 
    291         # Take factor product
--> 292         factors_dict = {var: Factor.product(*factors) for var, factors in factors_dict.items()}
    293         self.cardinalities = {var: factors_dict[var].get_cardinality(var) for var in self.variables}
    294         for var in self.variables:

/home/ubuntu/pgmpy/pgmpy/factors/Factor.py in product(self, n_jobs, *factors)
    316                 ('x3', ['x3_0', 'x3_1']), ('x4', ['x4_0', 'x4_1'])])
    317         """
--> 318         if isinstance(factors[0], Number):
    319             return Factor(self.variables, self.cardinality, np.array([i * factors[0] for i in self.values]))
    320         else:

IndexError: tuple index out of range

>>> gibbs.sample(size=4)
A B C
0 0 1 1
1 1 0 0
2 1 1 0
3 1 1 1
"""
if start_state is None and self.state is None:
self.state = self.random_state()
else:
self.set_start_state(start_state)

sampled = DataFrame(index=range(size), columns=self.variables)
sampled.loc[0] = [st for var, st in self.state]
for i in range(size - 1):
for j, (var, st) in enumerate(self.state):
other_st = tuple(st for v, st in self.state if var != v)
next_st = sample_discrete(list(range(self.cardinalities[var])),
self.transition_models[var][other_st])[0]
self.state[j] = State(var, next_st)
sampled.loc[i + 1] = [st for var, st in self.state]
return sampled

def generate_sample(self, start_state=None, size=1):
"""
Generator version of self.sample

Return Type:
------------
List of State namedtuples, representing the assignment to all variables of the model.

Examples:
---------
>>> from pgmpy.factors import Factor
>>> from pgmpy.inference import GibbsSampling
>>> from pgmpy.models import MarkovModel
>>> model = MarkovModel([('A', 'B'), ('C', 'B')])
>>> factor_ab = Factor(['A', 'B'], [2, 2], [1, 2, 3, 4])
>>> factor_cb = Factor(['C', 'B'], [2, 2], [5, 6, 7, 8])
>>> model.add_factors(factor_ab, factor_cb)
>>> gibbs = GibbsSampling(model)
>>> gen = gibbs.generate_sample(size=2)
>>> [sample for sample in gen]
[[State(var='C', state=1), State(var='B', state=1), State(var='A', state=0)],
[State(var='C', state=0), State(var='B', state=1), State(var='A', state=1)]]
"""
if start_state is None and self.state is None:
self.state = self.random_state()
else:
self.set_start_state(start_state)

for i in range(size):
for j, (var, st) in enumerate(self.state):
other_st = tuple(st for v, st in self.state if var != v)
next_st = sample_discrete(list(range(self.cardinalities[var])),
self.transition_models[var][other_st])[0]
self.state[j] = State(var, next_st)
yield self.state[:]
3 changes: 2 additions & 1 deletion pgmpy/inference/__init__.py
@@ -1,11 +1,12 @@
from .base import Inference
from .ExactInference import VariableElimination
from .ExactInference import BeliefPropagation
from .Sampling import BayesianModelSampling
from .Sampling import BayesianModelSampling, GibbsSampling
from .mplp import Mplp

__all__ = ['Inference',
'VariableElimination',
'BeliefPropagation',
'BayesianModelSampling',
'GibbsSampling',
'Mplp']
102 changes: 98 additions & 4 deletions pgmpy/tests/test_inference/test_Sampling.py
@@ -1,9 +1,9 @@
import unittest
from unittest.mock import MagicMock, patch

from pgmpy.models import MarkovModel
from pgmpy.inference.Sampling import BayesianModelSampling
from pgmpy.models import BayesianModel
from pgmpy.factors import TabularCPD, State
from pgmpy.factors import Factor, TabularCPD, State
from pgmpy.inference.Sampling import BayesianModelSampling, GibbsSampling
from pgmpy.models import BayesianModel, MarkovModel


class TestBayesianModelSampling(unittest.TestCase):
Expand Down Expand Up @@ -67,6 +67,12 @@ def test_rejection_sample_basic(self):
self.assertTrue(set(sample.G).issubset({State('G', 0), State('G', 1)}))
self.assertTrue(set(sample.L).issubset({State('L', 0), State('L', 1)}))

@patch("pgmpy.inference.BayesianModelSampling.forward_sample", autospec=True)
def test_rejection_sample_less_arg(self, forward_sample):
sample = self.sampling_inference.rejection_sample(size=5)
forward_sample.assert_called_once_with(self.sampling_inference, 5)
self.assertEqual(sample, forward_sample.return_value)

def test_likelihood_weighted_sample(self):
sample = self.sampling_inference.likelihood_weighted_sample([State('A', 0), State('J', 1), State('R', 0)], 25)
self.assertEquals(len(sample), 25)
Expand All @@ -89,3 +95,91 @@ def tearDown(self):
del self.sampling_inference
del self.bayesian_model
del self.markov_model


class TestGibbsSampling(unittest.TestCase):
def setUp(self):
# A test Bayesian model
diff_cpd = TabularCPD('diff', 2, [[0.6], [0.4]])
intel_cpd = TabularCPD('intel', 2, [[0.7], [0.3]])
grade_cpd = TabularCPD('grade', 3, [[0.3, 0.05, 0.9, 0.5], [0.4, 0.25, 0.08, 0.3], [0.3, 0.7, 0.02, 0.2]],
evidence=['diff', 'intel'], evidence_card=[2, 2])
self.bayesian_model = BayesianModel()
self.bayesian_model.add_nodes_from(['diff', 'intel', 'grade'])
self.bayesian_model.add_edges_from([('diff', 'grade'), ('intel', 'grade')])
self.bayesian_model.add_cpds(diff_cpd, intel_cpd, grade_cpd)

# A test Markov model
self.markov_model = MarkovModel([('A', 'B'), ('C', 'B'), ('B', 'D')])
factor_ab = Factor(['A', 'B'], [2, 3], [1, 2, 3, 4, 5, 6])
factor_cb = Factor(['C', 'B'], [4, 3], [3, 1, 4, 5, 7, 8, 1, 3, 10, 4, 5, 6])
factor_bd = Factor(['B', 'D'], [3, 2], [5, 7, 2, 1, 9, 3])
self.markov_model.add_factors(factor_ab, factor_cb, factor_bd)

self.gibbs = GibbsSampling(self.bayesian_model)

def tearDown(self):
del self.bayesian_model
del self.markov_model

@patch('pgmpy.inference.Sampling.GibbsSampling._get_kernel_from_bayesian_model', autospec=True)
@patch('pgmpy.models.MarkovChain.__init__', autospec=True)
def test_init_bayesian_model(self, init, get_kernel):
model = MagicMock(spec_set=BayesianModel)
gibbs = GibbsSampling(model)
init.assert_called_once_with(gibbs)
get_kernel.assert_called_once_with(gibbs, model)

@patch('pgmpy.inference.Sampling.GibbsSampling._get_kernel_from_markov_model', autospec=True)
def test_init_markov_model(self, get_kernel):
model = MagicMock(spec_set=MarkovModel)
gibbs = GibbsSampling(model)
get_kernel.assert_called_once_with(gibbs, model)

def test_get_kernel_from_bayesian_model(self):
gibbs = GibbsSampling()
gibbs._get_kernel_from_bayesian_model(self.bayesian_model)
self.assertListEqual(list(gibbs.variables), self.bayesian_model.nodes())
self.assertDictEqual(gibbs.cardinalities, {'diff': 2, 'intel': 2, 'grade': 3})

def test_get_kernel_from_markov_model(self):
gibbs = GibbsSampling()
gibbs._get_kernel_from_markov_model(self.markov_model)
self.assertListEqual(list(gibbs.variables), self.markov_model.nodes())
self.assertDictEqual(gibbs.cardinalities, {'A': 2, 'B': 3, 'C': 4, 'D': 2})

def test_sample(self):
start_state = [State('diff', 0), State('intel', 0), State('grade', 0)]
sample = self.gibbs.sample(start_state, 2)
self.assertEquals(len(sample), 2)
self.assertEquals(len(sample.columns), 3)
self.assertIn('diff', sample.columns)
self.assertIn('intel', sample.columns)
self.assertIn('grade', sample.columns)
self.assertTrue(set(sample['diff']).issubset({0, 1}))
self.assertTrue(set(sample['intel']).issubset({0, 1}))
self.assertTrue(set(sample['grade']).issubset({0, 1, 2}))

@patch("pgmpy.inference.Sampling.GibbsSampling.random_state", autospec=True)
def test_sample_less_arg(self, random_state):
self.gibbs.state = None
random_state.return_value = [State('diff', 0), State('intel', 0), State('grade', 0)]
sample = self.gibbs.sample(size=2)
random_state.assert_called_once_with(self.gibbs)
self.assertEqual(len(sample), 2)

def test_generate_sample(self):
start_state = [State('diff', 0), State('intel', 0), State('grade', 0)]
gen = self.gibbs.generate_sample(start_state, 2)
samples = [sample for sample in gen]
self.assertEqual(len(samples), 2)
self.assertEqual({samples[0][0].var, samples[0][1].var, samples[0][2].var}, {'diff', 'intel', 'grade'})
self.assertEqual({samples[1][0].var, samples[1][1].var, samples[1][2].var}, {'diff', 'intel', 'grade'})

@patch("pgmpy.inference.Sampling.GibbsSampling.random_state", autospec=True)
def test_generate_sample_less_arg(self, random_state):
self.gibbs.state = None
gen = self.gibbs.generate_sample(size=2)
samples = [sample for sample in gen]
random_state.assert_called_once_with(self.gibbs)
self.assertEqual(len(samples), 2)