Skip to content

Commit

Permalink
Mixtures for factors work now.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomsch420 committed Jan 2, 2024
1 parent eb22de0 commit 8a7546a
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 23 deletions.
3 changes: 2 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
fglib==0.2.3
fglib==0.2.3
scipy
2 changes: 1 addition & 1 deletion src/fglib2/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.1.4'
__version__ = '1.1.5'
78 changes: 66 additions & 12 deletions src/fglib2/probabilistic_circuits.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,53 @@
from typing import Iterable, Optional, Union, List
from typing import Iterable, Optional, Union, List, Tuple

import numpy as np
from random_events.events import Event, EncodedEvent
from typing_extensions import Self

from probabilistic_model.probabilistic_model import ProbabilisticModel
from probabilistic_model.probabilistic_circuit import SmoothSumUnit
from random_events.variables import Variable, Symbolic
from probabilistic_model.probabilistic_circuit import SmoothSumUnit, DeterministicSumUnit, Unit
from random_events.variables import Variable, Symbolic, Discrete
from .distributions import Multinomial
from .graphs import FactorNode


class SumUnitFactor(ProbabilisticModel):
class SumUnitWrapper(DeterministicSumUnit):

@staticmethod
def from_sum_unit(unit: SmoothSumUnit) -> 'DeterministicSumUnit':
result = SumUnitWrapper(unit.variables, unit.weights)
result.children = unit.children
return result

def _conditional(self, event: EncodedEvent) -> Tuple[Optional[Self], float]:
# conditional weights of new sum unit
conditional_weights = []

# conditional children of new sum unit
conditional_children = []

# initialize probability
probability = 0.

for weight, child in zip(self.weights, self.children):
conditional_child, conditional_probability = child._conditional(event)

if conditional_child is None:
conditional_child = child

conditional_probability = conditional_probability * weight
probability += conditional_probability

conditional_weights.append(conditional_probability)
conditional_children.append(conditional_child)

result = self._parameter_copy()
result.weights = conditional_weights
result.children = conditional_children
return result.normalize(), probability


class SumUnitFactor(FactorNode):
"""
A sum unite (mixture model) that can be used as factor for variables in a factor graph.
Expand All @@ -20,15 +58,31 @@ class SumUnitFactor(ProbabilisticModel):
over the latent variables. The factors for the emission model are the joint probability trees.
"""

def __init__(self, model: SmoothSumUnit):
self.model = model
latent_variable = Symbolic(f"latent_{str(id(model))}", range(len(self.model.weights)))
super().__init__([latent_variable])
latent_variable: Symbolic

@property
def latent_variable(self) -> Symbolic:
return self.variables[0]
def __init__(self, distribution: SmoothSumUnit):
super().__init__(SumUnitWrapper.from_sum_unit(distribution))
self.latent_variable = Symbolic(f"latent_{str(id(self.distribution))}",
range(len(self.distribution.weights)))

def marginal(self, variables: List[Variable]) -> Union[Multinomial, Self]:
if variables[0] == self.latent_variable:
return Multinomial([self.latent_variable], np.array(self.model.weights))
return self.latent_distribution()

@property
def variables(self) -> List[Discrete]:
return [self.latent_variable]

def latent_distribution(self):
return Multinomial([self.latent_variable], np.array(self.distribution.weights))

def sum_product(self, messages: List[Multinomial]) -> Multinomial:

message = self.latent_distribution()

# Product over incoming messages
for msg in messages:
if msg is not None:
message *= msg

return message
59 changes: 50 additions & 9 deletions test/test_pc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import networkx as nx
import numpy as np
from random_events.variables import Continuous
from random_events.events import Event
from probabilistic_model.probabilistic_circuit.units import DeterministicSumUnit
from probabilistic_model.probabilistic_circuit.distributions import UniformDistribution
from fglib2.distributions import Multinomial
Expand All @@ -12,7 +13,7 @@
import matplotlib.pyplot as plt


class JPTTestCase(unittest.TestCase):
class SumUnitTestCase(unittest.TestCase):

variables = [
Continuous("x1"),
Expand All @@ -25,37 +26,77 @@ class JPTTestCase(unittest.TestCase):
model: FactorGraph

def setUp(self):
np.random.seed(69)
interval_1 = portion.closed(-1.25, -0.75)
interval_2 = portion.closed(0.75, 1.25)

model = FactorGraph()

for variable in self.variables:
distribution = SumUnitFactor(UniformDistribution(variable, interval_1) +
UniformDistribution(variable, interval_2))
factor = FactorNode(distribution)
distribution = (UniformDistribution(variable, interval_1) +
UniformDistribution(variable, interval_2))
factor = SumUnitFactor(distribution)
model *= factor

for f1, f2 in zip(model.factor_nodes[:-1], model.factor_nodes[1:]):
model *= FactorNode(Multinomial([f1.variables[0], f2.variables[0]], np.array([[0, 0.5], [0.5, 0]])))
model *= FactorNode(Multinomial([f1.latent_variable, f2.latent_variable],
np.array([[0, 0.5], [0.5, 0]])))

self.model = model

def test_creation(self):
nx.draw(self.model, with_labels=True)
plt.show()

def test_latent_distribution(self):
distribution = self.model.factor_nodes[0].latent_distribution()
self.assertTrue(np.all(distribution.probabilities == np.array([0.5, 0.5])))
self.assertEqual(distribution.variables[0], self.model.factor_nodes[0].latent_variable)

def test_marginal(self):
self.model.sum_product()
latent_variables = self.model.variables
print(latent_variables)
for variable in latent_variables:
print(self.model.belief(variable))
# self.model.marginal(self.variables[:1])
belief = self.model.belief(variable).normalize()
self.assertTrue(np.all(belief.probabilities == np.array([0.5, 0.5])))

def test_marginal_with_evidence(self):
factor_node = self.model.factor_nodes[0]
conditional, probability = factor_node.distribution.conditional(Event({self.variables[0]:
portion.closed(-1.25, -0.75)}))
self.assertEqual(probability, 0.5)
factor_node.distribution = conditional
self.model.factor_nodes[0] = factor_node

def test_sum_product(self):
self.model.sum_product()
latent_variables = self.model.variables
latent_variable_0 = latent_variables[0]
latent_variable_0_distribution = self.model.belief(latent_variable_0).normalize()
self.assertTrue(np.all(latent_variable_0_distribution.probabilities == np.array([0., 1.])))

latent_variable_1 = latent_variables[1]
latent_variable_1_distribution = self.model.belief(latent_variable_1).normalize()
self.assertTrue(np.all(latent_variable_1_distribution.probabilities == np.array([1., 0.])))

def test_mode(self):
mode_by_hand = Event({self.model.factor_nodes[0].latent_variable: (0, 1),
self.model.factor_nodes[1].latent_variable: (0, 1)})
modes = self.model.max_product()
self.assertEqual(modes, mode_by_hand)

def test_conditional_mode(self):
factor_node = self.model.factor_nodes[0]
conditional, probability = factor_node.distribution.conditional(Event({self.variables[0]:
portion.closed(-1.25, -0.75)}))
self.assertEqual(probability, 0.5)
factor_node.distribution = conditional
self.model.factor_nodes[0] = factor_node

mode_by_hand = Event({self.model.factor_nodes[0].latent_variable: 0,
self.model.factor_nodes[1].latent_variable: 1})
modes = self.model.max_product()
self.assertEqual(modes, mode_by_hand)



if __name__ == '__main__':
Expand Down

0 comments on commit 8a7546a

Please sign in to comment.