Skip to content

Commit

Permalink
Merge f8bce58 into 16b00c5
Browse files Browse the repository at this point in the history
  • Loading branch information
ferrine committed Jan 14, 2017
2 parents 16b00c5 + f8bce58 commit 25da028
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 20 deletions.
30 changes: 29 additions & 1 deletion pymc3/data.py
@@ -1,7 +1,10 @@
import itertools
import pkgutil
import io

__all__ = ['get_data_file']
import theano.tensor as tt

__all__ = ['get_data_file', 'DataGenerator']


def get_data_file(pkg, path):
Expand All @@ -19,3 +22,28 @@ def get_data_file(pkg, path):
"""

return io.BytesIO(pkgutil.get_data(pkg, path))


class DataGenerator(object):
"""
Helper class that helps to infer data type of generator with looking
at the first item, preserving the order of the resulting generator
"""
def __init__(self, generator):
self.test_value = next(generator)
self.gen = itertools.chain([self.test_value], generator)
self.tensortype = tt.TensorType(
self.test_value.dtype,
((False, ) * self.test_value.ndim))

def __next__(self):
return next(self.gen)

def __iter__(self):
return self

def __eq__(self, other):
return id(self) == id(other)

def __hash__(self):
return hash(id(self))
3 changes: 2 additions & 1 deletion pymc3/distributions/distribution.py
Expand Up @@ -30,8 +30,9 @@ def __new__(cls, name, *args, **kwargs):

if isinstance(name, string_types):
data = kwargs.pop('observed', None)
total_size = kwargs.pop('total_size', None)
dist = cls.dist(*args, **kwargs)
return model.Var(name, dist, data)
return model.Var(name, dist, data, total_size)
else:
raise TypeError("Name needs to be a string but got: %s" % name)

Expand Down
66 changes: 49 additions & 17 deletions pymc3/model.py
Expand Up @@ -8,7 +8,7 @@

import pymc3 as pm
from .memoize import memoize
from .theanof import gradient, hessian, inputvars
from .theanof import gradient, hessian, inputvars, generator
from .vartypes import typefilter, discrete_types, continuous_types
from .blocking import DictToArrayBijection, ArrayOrdering

Expand Down Expand Up @@ -458,7 +458,7 @@ def cont_vars(self):
"""All the continuous variables in the model"""
return list(typefilter(self.vars, continuous_types))

def Var(self, name, dist, data=None):
def Var(self, name, dist, data=None, total_size=None):
"""Create and add (un)observed random variable to the model with an
appropriate prior distribution.
Expand All @@ -469,6 +469,8 @@ def Var(self, name, dist, data=None):
data : array_like (optional)
If data is provided, the variable is observed. If None,
the variable is unobserved.
total_size : scalar
upscales logp of variable with :math:`coef = total_size/var.shape[0]`
Returns
-------
Expand All @@ -477,11 +479,13 @@ def Var(self, name, dist, data=None):
name = self.name_for(name)
if data is None:
if getattr(dist, "transform", None) is None:
var = FreeRV(name=name, distribution=dist, model=self)
var = FreeRV(name=name, distribution=dist, model=self,
total_size=total_size)
self.free_RVs.append(var)
else:
var = TransformedRV(name=name, distribution=dist, model=self,
transform=dist.transform)
transform=dist.transform,
total_size=total_size)
pm._log.debug('Applied {transform}-transform to {name}'
' and added transformed {orig_name} to model.'.format(
transform=dist.transform.name,
Expand All @@ -491,7 +495,7 @@ def Var(self, name, dist, data=None):
return var
elif isinstance(data, dict):
var = MultiObservedRV(name=name, data=data, distribution=dist,
model=self)
model=self, total_size=total_size)
self.observed_RVs.append(var)
if var.missing_values:
self.free_RVs += var.missing_values
Expand All @@ -500,7 +504,8 @@ def Var(self, name, dist, data=None):
self.named_vars[v.name] = v
else:
var = ObservedRV(name=name, data=data,
distribution=dist, model=self)
distribution=dist, model=self,
total_size=total_size)
self.observed_RVs.append(var)
if var.missing_values:
self.free_RVs.append(var.missing_values)
Expand Down Expand Up @@ -717,15 +722,18 @@ class FreeRV(Factor, TensorVariable):
"""Unobserved random variable that a model is specified in terms of."""

def __init__(self, type=None, owner=None, index=None, name=None,
distribution=None, model=None):
distribution=None, model=None, total_size=None):
"""
Parameters
----------
type : theano type (optional)
owner : theano owner (optional)
name : str
distribution : Distribution
model : Model"""
model : Model
total_size : scalar Tensor (optional)
needed for upscaling logp
"""
if type is None:
type = distribution.type
super(FreeRV, self).__init__(type, owner, index, name)
Expand All @@ -736,7 +744,12 @@ def __init__(self, type=None, owner=None, index=None, name=None,
self.distribution = distribution
self.tag.test_value = np.ones(
distribution.shape, distribution.dtype) * distribution.default()
self.logp_elemwiset = distribution.logp(self)
logp_elemwiset = distribution.logp(self)
if total_size is None:
coef = tt.as_tensor(1)
else:
coef = tt.as_tensor(total_size) / logp_elemwiset.shape[0]
self.logp_elemwiset = logp_elemwiset * coef
self.model = model

incorporate_methods(source=distribution, destination=self,
Expand All @@ -759,6 +772,8 @@ def pandas_to_array(data):
return data
elif isinstance(data, theano.gof.graph.Variable):
return data
elif hasattr(data, '__next__'):
return generator(data)
else:
return np.asarray(data)

Expand Down Expand Up @@ -792,7 +807,7 @@ class ObservedRV(Factor, TensorVariable):
"""

def __init__(self, type=None, owner=None, index=None, name=None, data=None,
distribution=None, model=None):
distribution=None, model=None, total_size=None):
"""
Parameters
----------
Expand All @@ -801,6 +816,8 @@ def __init__(self, type=None, owner=None, index=None, name=None, data=None,
name : str
distribution : Distribution
model : Model
total_size : scalar Tensor (optional)
needed for upscaling logp
"""
from .distributions import TensorType
if type is None:
Expand All @@ -814,7 +831,12 @@ def __init__(self, type=None, owner=None, index=None, name=None, data=None,
data = as_tensor(data, name, model, distribution)
self.missing_values = data.missing_values

self.logp_elemwiset = distribution.logp(data)
logp_elemwiset = distribution.logp(data)
if total_size is None:
coef = tt.as_tensor(1)
else:
coef = tt.as_tensor(total_size) / logp_elemwiset.shape[0]
self.logp_elemwiset = logp_elemwiset * coef
self.model = model
self.distribution = distribution

Expand All @@ -835,7 +857,7 @@ class MultiObservedRV(Factor):
Potentially partially observed.
"""

def __init__(self, name, data, distribution, model):
def __init__(self, name, data, distribution, model, total_size=None):
"""
Parameters
----------
Expand All @@ -844,14 +866,21 @@ def __init__(self, name, data, distribution, model):
name : str
distribution : Distribution
model : Model
total_size : scalar Tensor (optional)
needed for upscaling logp
"""
self.name = name
self.data = {name: as_tensor(data, name, model, distribution)
for name, data in data.items()}

self.missing_values = [datum.missing_values for datum in self.data.values()
if datum.missing_values is not None]
self.logp_elemwiset = distribution.logp(**self.data)
logp_elemwiset = distribution.logp(**self.data)
if total_size is None:
coef = tt.as_tensor(1)
else:
coef = tt.as_tensor(total_size) / logp_elemwiset.shape[0]
self.logp_elemwiset = logp_elemwiset * coef
self.model = model
self.distribution = distribution

Expand Down Expand Up @@ -896,17 +925,20 @@ def Potential(name, var, model=None):
class TransformedRV(TensorVariable):

def __init__(self, type=None, owner=None, index=None, name=None,
distribution=None, model=None, transform=None):
distribution=None, model=None, transform=None,
total_size=None):
"""
Parameters
----------
type : theano type (optional)
owner : theano owner (optional)
name : str
distribution : Distribution
model : Model"""
model : Model
total_size : scalar Tensor (optional)
needed for upscaling logp
"""
if type is None:
type = distribution.type
super(TransformedRV, self).__init__(type, owner, index, name)
Expand All @@ -916,7 +948,7 @@ def __init__(self, type=None, owner=None, index=None, name=None,

transformed_name = "{}_{}_".format(name, transform.name)
self.transformed = model.Var(
transformed_name, transform.apply(distribution))
transformed_name, transform.apply(distribution), total_size=total_size)

normalRV = transform.backward(self.transformed)

Expand Down
50 changes: 50 additions & 0 deletions pymc3/tests/test_theanof.py
@@ -0,0 +1,50 @@
import itertools
import unittest
import numpy as np
import theano
from ..theanof import DataGenerator, GeneratorOp, generator


def integers():
i = 0
while True:
yield np.float32(i)
i += 1


def integers_ndim(ndim):
i = 0
while True:
yield np.ones((2,) * ndim) * i
i += 1


class TestGenerator(unittest.TestCase):
def test_basic(self):
generator = DataGenerator(integers())
gop = GeneratorOp(generator)()
self.assertEqual(gop.tag.test_value, np.float32(0))
f = theano.function([], gop)
self.assertEqual(f(), np.float32(0))
self.assertEqual(f(), np.float32(1))
for i in range(2, 100):
f()
self.assertEqual(f(), np.float32(100))

def test_ndim(self):
for ndim in range(10):
res = list(itertools.islice(integers_ndim(ndim), 0, 2))
generator = DataGenerator(integers_ndim(ndim))
gop = GeneratorOp(generator)()
f = theano.function([], gop)
self.assertEqual(ndim, res[0].ndim)
np.testing.assert_equal(f(), res[0])
np.testing.assert_equal(f(), res[1])

def test_cloning_available(self):
gop = generator(integers())
res = gop ** 2
shared = theano.shared(np.float32(10))
res1 = theano.clone(res, {gop: shared})
f = theano.function([], res1)
self.assertEqual(f(), np.float32(100))
49 changes: 48 additions & 1 deletion pymc3/theanof.py
Expand Up @@ -3,13 +3,17 @@
from .vartypes import typefilter, continuous_types
from theano import theano, scalar, tensor as tt
from theano.gof.graph import inputs
from theano.tensor import TensorVariable
from theano.gof import Op
from theano.configparser import change_flags
from .memoize import memoize
from .blocking import ArrayOrdering
from .data import DataGenerator

__all__ = ['gradient', 'hessian', 'hessian_diag', 'inputvars',
'cont_inputs', 'floatX', 'jacobian',
'CallableTensor', 'join_nonshared_inputs',
'make_shared_replacements']
'make_shared_replacements', 'generator']


def inputvars(a):
Expand Down Expand Up @@ -242,3 +246,46 @@ def __call__(self, input):

scalar_identity = IdentityOp(scalar.upgrade_to_float, name='scalar_identity')
identity = tt.Elemwise(scalar_identity, name='identity')


class GeneratorOp(Op):
"""
Generaror Op is designed for storing python generators
inside theano graph. The main limitation is generator itself.
There are some important cases when generator becomes exhausted
- not endless generator is passed
- exception is raised while `generator.__next__` is performed
Note: it is dangerous in simple python generators, but ok in
custom class based generators with explicit state
- data type on each iteration should be the same
"""
__props__ = ('generator',)

def __init__(self, gen):
if not isinstance(gen, DataGenerator):
gen = DataGenerator(gen)
super(GeneratorOp, self).__init__()
self.generator = gen
self.itypes = []
self.otypes = [self.generator.tensortype]

def perform(self, node, inputs, output_storage, params=None):
try:
output_storage[0][0] = self.generator.__next__()
except StopIteration:
output_storage[0][0] = np.nan

def do_constant_folding(self, node):
return False

@change_flags(compute_test_value='off')
def __call__(self, *args, **kwargs):
rval = super(GeneratorOp, self).__call__(*args, **kwargs)
rval.tag.test_value = self.generator.test_value
return rval


def generator(gen):
"""shortcut for `GeneratorOp`"""
return GeneratorOp(gen)()

0 comments on commit 25da028

Please sign in to comment.