Fetching contributors…
Cannot retrieve contributors at this time
544 lines (425 sloc) 18 KB
from __future__ import print_function, division
from sympy.external import import_module
from sympy.printing.printer import Printer
from sympy.core.compatibility import range, is_sequence
from sympy.utilities.exceptions import SymPyDeprecationWarning
import sympy
from functools import partial
theano = import_module('theano')
if theano:
ts = theano.scalar
tt = theano.tensor
from theano.sandbox import linalg as tlinalg
mapping = {
sympy.Add: tt.add,
sympy.Mul: tt.mul,
sympy.Abs: tt.abs_,
sympy.sign: tt.sgn,
sympy.ceiling: tt.ceil,
sympy.floor: tt.floor,
sympy.log: tt.log,
sympy.exp: tt.exp,
sympy.sqrt: tt.sqrt,
sympy.cos: tt.cos,
sympy.acos: tt.arccos,
sympy.sin: tt.sin,
sympy.asin: tt.arcsin,
sympy.tan: tt.tan,
sympy.atan: tt.arctan,
sympy.atan2: tt.arctan2,
sympy.cosh: tt.cosh,
sympy.acosh: tt.arccosh,
sympy.sinh: tt.sinh,
sympy.asinh: tt.arcsinh,
sympy.tanh: tt.tanh,
sympy.atanh: tt.arctanh, tt.real, tt.imag,
sympy.arg: tt.angle,
sympy.erf: tt.erf,
sympy.gamma: tt.gamma,
sympy.loggamma: tt.gammaln,
sympy.Pow: tt.pow,
sympy.Eq: tt.eq,
sympy.LessThan: tt.le,
sympy.And: tt.and_,
sympy.Or: tt.or_,
sympy.Max: tt.maximum, # Sympy accept >2 inputs, Theano only 2
sympy.Min: tt.minimum, # Sympy accept >2 inputs, Theano only 2
# Matrices
sympy.MatAdd: tt.Elemwise(ts.add),
sympy.HadamardProduct: tt.Elemwise(ts.mul),
sympy.Trace: tlinalg.trace,
sympy.Determinant : tlinalg.det,
sympy.Inverse: tlinalg.matrix_inverse,
sympy.Transpose: tt.DimShuffle((False, False), [1, 0]),
class TheanoPrinter(Printer):
""" Code printer which creates Theano symbolic expression graphs.
cache : dict
Cache dictionary to use (see :attr:`cache`). If None (default) will use
the global cache. To create a printer which does not depend on or alter
global state pass an empty dictionary. Note: the dictionary is not
copied on initialization of the printer and will be updated in-place,
so using the same dict object when creating multiple printers or making
multiple calls to :func:`.theano_code` or :func:`.theano_function` means
the cache is shared between all these applications.
cache : dict
A cache of Theano variables which have been created for Sympy
symbol-like objects (e.g. :class:`sympy.core.symbol.Symbol` or
:class:`sympy.matrices.expressions.MatrixSymbol`). This is used to
ensure that all references to a given symbol in an expression (or
multiple expressions) are printed as the same Theano variable, which is
created only once. Symbols are differentiated only by name and type. The
format of the cache's contents should be considered opaque to the user.
printmethod = "_theano"
def __init__(self, *args, **kwargs):
self.cache = kwargs.pop('cache', dict())
super(TheanoPrinter, self).__init__(*args, **kwargs)
def _get_key(self, s, name=None, dtype=None, broadcastable=None):
""" Get the cache key for a Sympy object.
s : sympy.core.basic.Basic
Sympy object to get key for.
name : str
Name of object, if it does not have a ``name`` attribute.
if name is None:
name =
return (name, type(s), s.args, dtype, broadcastable)
def _get_or_create(self, s, name=None, dtype=None, broadcastable=None):
Get the Theano variable for a Sympy symbol from the cache, or create it
if it does not exist.
# Defaults
if name is None:
name =
if dtype is None:
dtype = 'floatX'
if broadcastable is None:
broadcastable = ()
key = self._get_key(s, name, dtype=dtype, broadcastable=broadcastable)
if key in self.cache:
return self.cache[key]
value = tt.tensor(name=name, dtype=dtype, broadcastable=broadcastable)
self.cache[key] = value
return value
def _print_Symbol(self, s, **kwargs):
dtype = kwargs.get('dtypes', {}).get(s)
bc = kwargs.get('broadcastables', {}).get(s)
return self._get_or_create(s, dtype=dtype, broadcastable=bc)
def _print_AppliedUndef(self, s, **kwargs):
name = str(type(s)) + '_' + str(s.args[0])
dtype = kwargs.get('dtypes', {}).get(s)
bc = kwargs.get('broadcastables', {}).get(s)
return self._get_or_create(s, name=name, dtype=dtype, broadcastable=bc)
def _print_Basic(self, expr, **kwargs):
op = mapping[type(expr)]
children = [self._print(arg, **kwargs) for arg in expr.args]
return op(*children)
def _print_Number(self, n, **kwargs):
# Integers already taken care of below, interpret as float
return float(n.evalf())
def _print_MatrixSymbol(self, X, **kwargs):
dtype = kwargs.get('dtypes', {}).get(X)
return self._get_or_create(X, dtype=dtype, broadcastable=(None, None))
def _print_DenseMatrix(self, X, **kwargs):
except AttributeError:
raise NotImplementedError(
"Matrix translation not yet supported in this version of Theano")
return tt.stacklists([
[self._print(arg, **kwargs) for arg in L]
for L in X.tolist()
_print_ImmutableMatrix = _print_ImmutableDenseMatrix = _print_DenseMatrix
def _print_MatMul(self, expr, **kwargs):
children = [self._print(arg, **kwargs) for arg in expr.args]
result = children[0]
for child in children[1:]:
result =, child)
return result
def _print_MatPow(self, expr, **kwargs):
children = [self._print(arg, **kwargs) for arg in expr.args]
result = 1
if isinstance(children[1], int) and children[1] > 0:
for i in range(children[1]):
result =, children[0])
raise NotImplementedError('''Only non-negative integer
powers of matrices can be handled by Theano at the moment''')
return result
def _print_MatrixSlice(self, expr, **kwargs):
parent = self._print(expr.parent, **kwargs)
rowslice = self._print(slice(*expr.rowslice), **kwargs)
colslice = self._print(slice(*expr.colslice), **kwargs)
return parent[rowslice, colslice]
def _print_BlockMatrix(self, expr, **kwargs):
nrows, ncols = expr.blocks.shape
blocks = [[self._print(expr.blocks[r, c], **kwargs)
for c in range(ncols)]
for r in range(nrows)]
return tt.join(0, *[tt.join(1, *row) for row in blocks])
def _print_slice(self, expr, **kwargs):
return slice(*[self._print(i, **kwargs)
if isinstance(i, sympy.Basic) else i
for i in (expr.start, expr.stop, expr.step)])
def _print_Pi(self, expr, **kwargs):
return 3.141592653589793
def _print_Piecewise(self, expr, **kwargs):
import numpy as np
e, cond = expr.args[0].args # First condition and corresponding value
# Print conditional expression and value for first condition
p_cond = self._print(cond, **kwargs)
p_e = self._print(e, **kwargs)
# One condition only
if len(expr.args) == 1:
# Return value if condition else NaN
return tt.switch(p_cond, p_e, np.nan)
# Return value_1 if condition_1 else evaluate remaining conditions
p_remaining = self._print(sympy.Piecewise(*expr.args[1:]), **kwargs)
return tt.switch(p_cond, p_e, p_remaining)
def _print_Rational(self, expr, **kwargs):
return tt.true_div(self._print(expr.p, **kwargs),
self._print(expr.q, **kwargs))
def _print_Integer(self, expr, **kwargs):
return expr.p
def _print_factorial(self, expr, **kwargs):
return self._print(sympy.gamma(expr.args[0] + 1), **kwargs)
def _print_Derivative(self, deriv, **kwargs):
rv = self._print(deriv.expr, **kwargs)
for var in deriv.variables:
var = self._print(var, **kwargs)
rv = tt.Rop(rv, var, tt.ones_like(var))
return rv
def emptyPrinter(self, expr):
return expr
def doprint(self, expr, dtypes=None, broadcastables=None):
""" Convert a Sympy expression to a Theano graph variable.
The ``dtypes`` and ``broadcastables`` arguments are used to specify the
data type, dimension, and broadcasting behavior of the Theano variables
corresponding to the free symbols in ``expr``. Each is a mapping from
Sympy symbols to the value of the corresponding argument to
See the corresponding `documentation page`__ for more information on
broadcasting in Theano.
.. __:
expr : sympy.core.expr.Expr
Sympy expression to print.
dtypes : dict
Mapping from Sympy symbols to Theano datatypes to use when creating
new Theano variables for those symbols. Corresponds to the ``dtype``
argument to :func:`theano.tensor.Tensor`. Defaults to ``'floatX'``
for symbols not included in the mapping.
broadcastables : dict
Mapping from Sympy symbols to the value of the ``broadcastable``
argument to :func:`theano.tensor.Tensor` to use when creating Theano
variables for those symbols. Defaults to the empty tuple for symbols
not included in the mapping (resulting in a scalar).
A variable corresponding to the expression's value in a Theano
symbolic expression graph.
See Also
if dtypes is None:
dtypes = {}
if broadcastables is None:
broadcastables = {}
return self._print(expr, dtypes=dtypes, broadcastables=broadcastables)
global_cache = {}
def theano_code(expr, cache=None, **kwargs):
""" Convert a Sympy expression into a Theano graph variable.
expr : sympy.core.expr.Expr
Sympy expression object to convert.
cache : dict
Cached Theano variables (see :attr:`.TheanoPrinter.cache`). Defaults to
the module-level global cache.
dtypes : dict
Passed to :meth:`.TheanoPrinter.doprint`.
broadcastables : dict
Passed to :meth:`.TheanoPrinter.doprint`.
A variable corresponding to the expression's value in a Theano symbolic
expression graph.
if not theano:
raise ImportError("theano is required for theano_code")
if cache is None:
cache = global_cache
return TheanoPrinter(cache=cache, settings={}).doprint(expr, **kwargs)
def dim_handling(inputs, dim=None, dims=None, broadcastables=None):
Get value of ``broadcastables`` argument to :func:`.theano_code` from
keyword arguments to :func:`.theano_function`.
Included for backwards compatibility.
Sequence of input symbols.
dim : int
Common number of dimensions for all inputs. Overrides other arguments
if given.
dims : dict
Mapping from input symbols to number of dimensions. Overrides
``broadcastables`` argument if given.
broadcastables : dict
Explicit value of ``broadcastables`` argument to
:meth:`.TheanoPrinter.doprint`. If not None function will return this value unchanged.
Dictionary mapping elements of ``inputs`` to their "broadcastable"
values (tuple of ``bool``s).
if dim is not None:
return {s: (False,) * dim for s in inputs}
if dims is not None:
maxdim = max(dims.values())
return {
s: (False,) * d + (True,) * (maxdim - d)
for s, d in dims.items()
if broadcastables != None:
return broadcastables
return {}
def theano_function(inputs, outputs, squeeze=True, scalar=False, **kwargs):
""" Create a Theano function from SymPy expressions.
The inputs and outputs are converted to Theano variables using
:func:`.theano_code` and then passed to :func:`theano.function`.
Sequence of symbols which constitute the inputs of the function.
Expression or sequence of expressions which constitute the outputs(s) of
the function. The free symbols of each expression must be a subset of
squeeze : bool
If ``outputs`` is a sequence of length one, pass the printed value of
the lone element of the sequence to :func:`theano.function` instead of
the sequence itself. This has the effect of making a function which
returns a single array instead of a list containing one array. This
behavior is deprecated and the default value will be changed to False in
a future release. To get a function that returns a single
scalar : bool
Convert 0-dimensional arrays in output to scalars. This will return a
Python wrapper function around the Theano function object.
cache : dict
Cached Theano variables (see :attr:`.TheanoPrinter.cache`). Defaults to
the module-level global cache.
dtypes : dict
Passed to :meth:`.TheanoPrinter.doprint`.
broadcastables : dict
Passed to :meth:`.TheanoPrinter.doprint`.
dims : dict
Alternative to ``broadcastables`` argument. Mapping from elements of
``inputs`` to integers indicating the dimension of their associated
arrays/tensors. Overrides ``broadcastables`` argument if given.
dim : int
Another alternative to the ``broadcastables`` argument. Common number of
dimensions to use for all arrays/tensors.
``theano_function([x, y], [...], dim=2)`` is equivalent to using
``broadcastables={x: (False, False), y: (False, False)}``.
A callable object which takes values of ``inputs`` as positional
arguments and returns an output array for each of the expressions
in ``outputs``. If ``outputs`` is a single expression the function will
return a Numpy array, if it is a list of multiple expressions the
function will return a list of arrays. See description of the ``squeeze``
argument above for the behavior when a single output is passed in a list.
The returned object will either be an instance of
:class:`theano.compile.function_module.Function` or a Python wrapper
function around one. In both cases, the returned value will have a
``theano_function`` attribute which points to the return value of
>>> from import x, y, z
>>> from sympy.printing.theanocode import theano_function
A simple function with one input and one output:
>>> f1 = theano_function([x], x**2 - 1, scalar=True)
>>> f1(3)
A function with multiple inputs and one output:
>>> f2 = theano_function([x, y, z], (x**z + y**z)**(1/z), scalar=True)
>>> f2(3, 4, 2)
A function with multiple inputs and multiple outputs:
>>> f3 = theano_function([x, y], [x**2 + y**2, x**2 - y**2], scalar=True)
>>> f3(2, 3)
[13.0, -5.0]
See also
if not theano:
raise ImportError("theano is required for theano_function")
# Squeeze output sequence of length one
if squeeze and is_sequence(outputs) and len(outputs) == 1:
feature='theano_function() with squeeze=True',
useinstead='a single expression as "outputs" (not a list)',
outputs = outputs[0]
# Pop off non-theano keyword args
cache = kwargs.pop('cache', {})
dtypes = kwargs.pop('dtypes', {})
broadcastables = dim_handling(
dim=kwargs.pop('dim', None),
dims=kwargs.pop('dims', None),
broadcastables=kwargs.pop('broadcastables', None),
# Print inputs/outputs
code = partial(theano_code, cache=cache, dtypes=dtypes,
tinputs = list(map(code, inputs))
is_seq = is_sequence(outputs)
if is_seq:
toutputs = list(map(code, outputs))
toutputs = code(outputs)
# Compile theano func
func = theano.function(tinputs, toutputs, **kwargs)
is_0d = [len(o.variable.broadcastable) == 0 for o in func.outputs]
# No wrapper required
if not scalar or not any(is_0d):
func.theano_function = func
return func
# Create wrapper to convert 0-dimensional outputs to scalars
def wrapper(*args):
out = func(*args)
if not is_seq:
return out[()]
return [o[()] if is_0d[i] else o for i, o in enumerate(out)]
wrapper.__wrapped__ = func
wrapper.__doc__ = func.__doc__
wrapper.theano_function = func
return wrapper