Skip to content

Commit

Permalink
Merge branch 'master' of github.com:bartvm/blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
bartvm committed Jan 15, 2015
2 parents 8ec6c1c + 1c8cfb0 commit 37b5b1a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 262 deletions.
191 changes: 1 addition & 190 deletions blocks/monitoring/aggregation.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
"""Evaluate Theano expressions on auxiliary data and during training."""
import logging
from abc import ABCMeta, abstractmethod
from collections import OrderedDict

import theano

from blocks.utils import (shared_like, dict_subset,
graph_inputs, update_instance)
from blocks.utils import shared_like, update_instance

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -135,188 +131,3 @@ def get_aggregator(self):
initialization_updates=[],
accumulation_updates=[],
readout_expression=self.variable)


class DatasetEvaluator(object):
"""A DatasetEvaluator evaluates many Theano expressions on a dataset.
The DatasetEvaluator provides a do-it-all method,
:meth:`evaluate`, which computes values of ``expressions``
on a dataset.
Alternatively, methods :meth:`_initialize_computation`,
:meth:`_process_batch`, :meth:`_readout_expressions` can be used
with a custom loop over data.
The values computed on subsets of the given dataset
are aggregated using the AggregationSchemes provided in
`aggregation_scheme` tags. If no tag is given, the value is **averaged
over minibatches**. However, care is taken to ensure that variables
which do not depend on data are not unnecessarily recomputed.
Parameters
----------
expressions : list or dict
A list of monitored variables. Or a dict from keys to variables.
If a list is given, keys will be set to the variables themselves.
Each variable can be tagged with a
:class:`AggregationScheme` that specifies how the value can
be computed for a data set by aggregating minibatches.
"""
def __init__(self, channel_variables):
if isinstance(channel_variables, dict):
self.channel_variables = channel_variables
else:
keyed_vars = ((v, v) for v in channel_variables)
self.channel_variables = OrderedDict(keyed_vars)
self.inputs = graph_inputs(self.channel_variables.values())
self._compile()

def _compile(self):
initialize_updates = []
accumulate_updates = []
readout = OrderedDict()

for k, v in self.channel_variables.iteritems():
logger.debug('Monitoring: %s', v.name)
if not hasattr(v.tag, 'aggregation_scheme'):
if graph_inputs([v]) == []:
logger.debug('Using _DataIndependent aggregation scheme'
' for %s since it does not depend on'
' the data', k)
v.tag.aggregation_scheme = _DataIndependent(variable=v)
else:
logger.debug('Using the default (average over minibatches)'
' aggregation scheme for %s', k)
v.tag.aggregation_scheme = Mean(v, 1.0)

aggregator = v.tag.aggregation_scheme.get_aggregator()
initialize_updates.extend(aggregator.initialization_updates)
accumulate_updates.extend(aggregator.accumulation_updates)
readout[k] = aggregator.readout_expression

if initialize_updates:
self._initialize_fun = theano.function([], [],
updates=initialize_updates)
else:
self._initialize_fun = None

self._initialized = False
self._input_names = [v.name for v in self.inputs]

if accumulate_updates:
self._accumulate_fun = theano.function(self.inputs,
[],
updates=accumulate_updates)
else:
self._accumulate_fun = None

readout_th_fun = theano.function([], readout.values())

def readout_fun():
ret_vals = readout_th_fun()
return dict(zip(readout.keys(), ret_vals))
self._readout_fun = readout_fun

def _initialize_computation(self):
"""Initialize the aggragators to process a dataset."""
self._initialized = True
if self._initialize_fun is not None:
self._initialize_fun()

def _process_batch(self, batch):
if not self._initialized:
self._initialize_computation()
batch = dict_subset(batch, self._input_names)
if self._accumulate_fun is not None:
self._accumulate_fun(**batch)

def _readout_expressions(self):
if not self._initialized:
raise Exception("To readout you must first initialize, then"
"process batches!")
self._initialized = False
return self._readout_fun()

def evaluate(self, data_set_view):
"""Compute the expressions over an iterable data set.
Parameters
----------
data_set_view: an iterable over batches
each batch must be a dict from input names to ndarrays
Returns
-------
dict from variables (or from the keys of the
monitored_variables argument to __init__) to the values
computed on the provided dataset.
"""
self._initialize_computation()

if self._accumulate_fun is not None:
for batch in data_set_view:
self._process_batch(batch)
else:
logger.debug('Only constant monitors are used, will not'
'iterate the over data!')

return self._readout_expressions()


class MinibatchEvaluator(object):
"""Helper evaluating several Theano variables using updates.
The MinibatchEvaluator allocates storage for each of the variables
given to its constructor. It then provides:
- a list of updates which should be called by the training function
on every minibatch. These updates store computed values in the
shared variables.
- a function which reads the shared variables and returns a dict from
names (or channel keys) their values.
Parameters
----------
expressions : list or dict
A list of monitored variables. Or a dict from keys to variables.
If a list is given, keys will be set to the variables themselves.
"""
def __init__(self, monitored_variables):
if isinstance(monitored_variables, dict):
monitored_variables = monitored_variables
else:
keyed_vars = ((v, v) for v in monitored_variables)
monitored_variables = OrderedDict(keyed_vars)

self._updates = []
self._storage = OrderedDict()
for k, v in monitored_variables.iteritems():
shared_v = shared_like(v)
self._storage[k] = shared_v
self._updates.append((shared_v, v))

@property
def updates(self):
"""Updates that have to be called for each minibatch."""
return self._updates

def read_expressions(self):
"""Read values of the expressions computed on the last minibatch.
Returns
-------
dict from variables (or from the keys of the
monitored_variables argument to __init__) to the values
computed on the provided dataset.
"""
values = OrderedDict()
for k, sv in self._storage.iteritems():
values[k] = sv.get_value(borrow=False)
return values
106 changes: 34 additions & 72 deletions tests/monitoring/test_aggregation.py
Original file line number Diff line number Diff line change
@@ -1,92 +1,54 @@
import numpy
import theano.tensor
import theano
from numpy.testing import assert_allclose
from theano import tensor

from blocks import bricks
from blocks.bricks import application, VariableRole
from blocks.graph import ComputationGraph
from blocks.monitoring.aggregation import (DatasetEvaluator, mean,
MinibatchEvaluator)
from blocks.monitoring.aggregation import mean
from blocks.utils import shared_floatx


class TestBrick(bricks.Brick):
def __init__(self, **kwargs):
super(TestBrick, self).__init__(**kwargs)

def _allocate(self):
self.params = [theano.shared(0, 'V')]
self.params = [shared_floatx(2, name='V')]

@application(inputs=['input_'], outputs=['output'])
def apply(self, input_, application_call):
V = self.params[0]

application_call.add_monitor((V ** 2).sum(),
name='V_mon')

mean_input = mean(input_.sum(), input_.shape.prod())
application_call.add_monitor(mean_input, name='mean_input')

application_call.add_monitor((V ** 2).sum(), name='V_monitor')
mean_input = mean(input_.mean(axis=1).sum(), input_.shape[0])
application_call.add_monitor(mean_input, name='mean_mean_input')
application_call.add_monitor(input_.mean(),
name='per_batch_mean_input')

return input_ + V


def test_param_monitor():
X = theano.tensor.vector('X')
brick = TestBrick(name='test_brick')
Y = brick.apply(X)
graph = ComputationGraph([Y])

V_monitors = [v for v in graph.variables
if v.name == 'V_mon']
validator = DatasetEvaluator({v.name: v for v in V_monitors})

V_vals = validator.evaluate(None)
assert V_vals['V_mon'] == 0


def test_dataset_evaluators():
X = theano.tensor.vector('X')
X = tensor.matrix('X')
brick = TestBrick(name='test_brick')
Y = brick.apply(X)
graph = ComputationGraph([Y])
V_monitors = [v for v in graph.variables
if getattr(v.tag, 'role', None) == VariableRole.MONITOR]
validator = DatasetEvaluator({v.name: v for v in V_monitors})

full_set = numpy.arange(100.0, dtype='float32')
batches = numpy.split(full_set, numpy.cumsum(numpy.arange(6) + 1))
batches = [{'X': b} for b in batches]

V_vals = validator.evaluate(batches)
assert V_vals['V_mon'] == 0
numpy.testing.assert_allclose(V_vals['mean_input'], full_set.mean())
per_batch_mean = numpy.mean([b['X'].mean() for b in batches])
numpy.testing.assert_allclose(V_vals['per_batch_mean_input'],
per_batch_mean)


def test_minibatch_evaluators():
X = theano.tensor.vector('X')
brick = TestBrick(name='test_brick')
Y = brick.apply(X)
graph = ComputationGraph([Y])
V_monitors = [v for v in graph.variables
if getattr(v.tag, 'role', None) == VariableRole.MONITOR]

train_monitor = MinibatchEvaluator({v.name: v for v in V_monitors})

full_set = numpy.arange(100.0, dtype='float32')
batches = numpy.split(full_set, numpy.cumsum(numpy.arange(6) + 1))
batches = [{'X': b} for b in batches]

train_fun = theano.function([X], [Y],
updates=train_monitor.updates)

for b in batches:
train_fun(**b)
M_vals = train_monitor.read_expressions()
assert M_vals['V_mon'] == 0
numpy.testing.assert_allclose(M_vals['mean_input'], b['X'].mean())
numpy.testing.assert_allclose(M_vals['per_batch_mean_input'],
b['X'].mean())
y = brick.apply(X)
graph = ComputationGraph([y])

# Test the monitors without aggregation schemes
monitors = [v for v in graph.variables
if getattr(v.tag, 'role', None) == VariableRole.MONITOR and
not hasattr(v.tag, 'aggregation_scheme')]
monitors.sort(key=lambda variable: variable.name)

f = theano.function([X], monitors)
monitor_vals = f(numpy.arange(4, dtype=theano.config.floatX).reshape(2, 2))
assert_allclose(monitor_vals, [4., 1.5])

# Test the aggregation scheme
monitor, = [v for v in graph.variables
if getattr(v.tag, 'role', None) == VariableRole.MONITOR and
hasattr(v.tag, 'aggregation_scheme')]
aggregator = monitor.tag.aggregation_scheme.get_aggregator()
initialize = theano.function([], updates=aggregator.initialization_updates)
initialize()
accumulate = theano.function([X], updates=aggregator.accumulation_updates)
accumulate(numpy.arange(4, dtype=theano.config.floatX).reshape(2, 2))
accumulate(numpy.arange(4, 8, dtype=theano.config.floatX).reshape(2, 2))
assert_allclose(aggregator.readout_expression.eval(), 3.5)

0 comments on commit 37b5b1a

Please sign in to comment.