diff --git a/blocks/monitoring/aggregation.py b/blocks/monitoring/aggregation.py index 558ff5c8..244b57a7 100644 --- a/blocks/monitoring/aggregation.py +++ b/blocks/monitoring/aggregation.py @@ -1,4 +1,5 @@ """Evaluate Theano variables on auxiliary data and during training.""" +from functools import partial import logging from abc import ABCMeta, abstractmethod @@ -29,6 +30,9 @@ class AggregationScheme(object): The variable that holds the desired value on a single batch. """ + def __init__(self, variable): + self.variable = variable + @abstractmethod def get_aggregator(self): """Return a new Aggregator for this variable.""" @@ -149,9 +153,6 @@ def mean(numerator, denominator=1.): class _DataIndependent(AggregationScheme): """Dummy aggregation scheme for values that don't depend on data.""" - def __init__(self, variable): - self.variable = variable - def get_aggregator(self): return Aggregator(aggregation_scheme=self, initialization_updates=[], @@ -161,9 +162,6 @@ def get_aggregator(self): class TakeLast(AggregationScheme): """Aggregation scheme which remembers only the last value.""" - def __init__(self, variable): - self.variable = variable - def get_aggregator(self): self.storage = shared_like(self.variable) return Aggregator(aggregation_scheme=self, @@ -173,12 +171,73 @@ def get_aggregator(self): readout_variable=self.storage) -def take_last(variable): +def _simple_aggregation(scheme, variable): variable = variable.copy(variable.name) - variable.tag.aggregation_scheme = TakeLast(variable) + variable.tag.aggregation_scheme = scheme(variable) return variable +take_last = partial(_simple_aggregation, TakeLast) + + +class Minimum(AggregationScheme): + """Aggregation scheme which remembers only the minimum value.""" + def _build_aggregator(self, accumulate_update): + initialized = shared_like(0.) + accumulate = ifelse(initialized, accumulate_update, self.variable) + return Aggregator(aggregation_scheme=self, + initialization_updates=[ + (self.storage, tensor.zeros_like(self.storage)), + (initialized, tensor.zeros_like(initialized)) + ], + accumulation_updates=[ + (self.storage, accumulate), + (initialized, tensor.ones_like(initialized)) + ], + readout_variable=self.storage) + + def get_aggregator(self): + self.storage = shared_like(self.variable) + return self._build_aggregator(tensor.minimum(self.storage, + self.variable)) + +minimum = partial(_simple_aggregation, Minimum) + + +class Maximum(Minimum): + """Aggregation scheme which remembers only the maximum value.""" + def get_aggregator(self): + self.storage = shared_like(self.variable) + return self._build_aggregator(tensor.maximum(self.storage, + self.variable)) + +maximum = partial(_simple_aggregation, Maximum) + + +class Concatenate(Minimum): + """Aggregation scheme which remembers values from all batches. + + Parameters + ---------- + variable: :class:`~tensor.TensorVariable` + The variable that holds the desired value on a single batch. + + """ + def __init__(self, variable): + # Add an extra axis to concatenate along. Must be non-broadcastable + # for concatenate to always work. + variable = (tensor.unbroadcast(tensor.shape_padleft(variable, 1), 0) + .copy(variable.name)) + super(Concatenate, self).__init__(variable) + + def get_aggregator(self): + self.storage = shared_like(self.variable) + return self._build_aggregator(tensor.concatenate([self.storage, + self.variable])) + +concatenate = partial(_simple_aggregation, Concatenate) + + @add_metaclass(ABCMeta) class MonitoredQuantity(object): """The base class for monitored-quantities. diff --git a/tests/monitoring/test_aggregation.py b/tests/monitoring/test_aggregation.py index ecbddcf2..1c72c8a1 100644 --- a/tests/monitoring/test_aggregation.py +++ b/tests/monitoring/test_aggregation.py @@ -6,7 +6,8 @@ from blocks import bricks from blocks.bricks.base import application from blocks.graph import ComputationGraph -from blocks.monitoring.aggregation import mean, Mean +from blocks.monitoring.aggregation import (mean, Mean, Minimum, Maximum, + Concatenate) from blocks.utils import shared_floatx from collections import OrderedDict @@ -91,6 +92,78 @@ def test_mean_aggregator(): numpy.array([35], dtype=theano.config.floatX)) +def test_min_max_aggregators(): + num_examples = 4 + batch_size = 2 + + features = numpy.array([[2, 3], + [2, 9], + [2, 4], + [5, 1]], dtype=theano.config.floatX) + + dataset = IndexableDataset(OrderedDict([('features', features)])) + + data_stream = DataStream(dataset, + iteration_scheme=SequentialScheme(num_examples, + batch_size)) + + x = tensor.matrix('features') + y = (x**2).sum(axis=0) + y.name = 'y' + z = y.min() + z.name = 'z' + + y.tag.aggregation_scheme = Maximum(y) + z.tag.aggregation_scheme = Minimum(z) + + assert_allclose(DatasetEvaluator([y]).evaluate(data_stream)['y'], + numpy.array([29, 90], dtype=theano.config.floatX)) + assert_allclose(DatasetEvaluator([z]).evaluate(data_stream)['z'], + numpy.array([8], dtype=theano.config.floatX)) + + # Make sure accumulators are reset. + features = numpy.array([[2, 1], + [1, 3], + [1, -1], + [2.5, 1]], dtype=theano.config.floatX) + + dataset = IndexableDataset(OrderedDict([('features', features)])) + + data_stream = DataStream(dataset, + iteration_scheme=SequentialScheme(num_examples, + batch_size)) + assert_allclose(DatasetEvaluator([y]).evaluate(data_stream)['y'], + numpy.array([7.25, 10], dtype=theano.config.floatX)) + assert_allclose(DatasetEvaluator([z]).evaluate(data_stream)['z'], + numpy.array([2], dtype=theano.config.floatX)) + + +def test_concatenate_aggregator(): + num_examples = 4 + batch_size = 2 + + features = numpy.array([[2, 3], + [2, 9], + [2, 4], + [5, 1]], dtype=theano.config.floatX) + + dataset = IndexableDataset(OrderedDict([('features', features)])) + + data_stream = DataStream(dataset, + iteration_scheme=SequentialScheme(num_examples, + batch_size)) + x = tensor.matrix('features') + y = x.sum(axis=0).copy('y') + z = y.sum(axis=0).copy('z') + y.tag.aggregation_scheme = Concatenate(y) + z.tag.aggregation_scheme = Concatenate(z) + + assert_allclose(DatasetEvaluator([y]).evaluate(data_stream)['y'], + numpy.array([[4, 12], [7, 5]], dtype=theano.config.floatX)) + assert_allclose(DatasetEvaluator([z]).evaluate(data_stream)['z'], + numpy.array([16, 12], dtype=theano.config.floatX)) + + def test_aggregation_buffer_name_uniqueness(): x1 = tensor.scalar('x') x2 = tensor.scalar('x')