Skip to content

Commit

Permalink
Merge pull request #1162 from dwf/max_min_monitoring
Browse files Browse the repository at this point in the history
AggregationSchemes for Maximum and Minimum.
  • Loading branch information
dmitriy-serdyuk committed Nov 1, 2016
2 parents 0e4e2a6 + 7e8a556 commit 1d6683f
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 9 deletions.
75 changes: 67 additions & 8 deletions blocks/monitoring/aggregation.py
@@ -1,4 +1,5 @@
"""Evaluate Theano variables on auxiliary data and during training."""
from functools import partial
import logging
from abc import ABCMeta, abstractmethod

Expand Down Expand Up @@ -29,6 +30,9 @@ class AggregationScheme(object):
The variable that holds the desired value on a single batch.
"""
def __init__(self, variable):
self.variable = variable

@abstractmethod
def get_aggregator(self):
"""Return a new Aggregator for this variable."""
Expand Down Expand Up @@ -149,9 +153,6 @@ def mean(numerator, denominator=1.):

class _DataIndependent(AggregationScheme):
"""Dummy aggregation scheme for values that don't depend on data."""
def __init__(self, variable):
self.variable = variable

def get_aggregator(self):
return Aggregator(aggregation_scheme=self,
initialization_updates=[],
Expand All @@ -161,9 +162,6 @@ def get_aggregator(self):

class TakeLast(AggregationScheme):
"""Aggregation scheme which remembers only the last value."""
def __init__(self, variable):
self.variable = variable

def get_aggregator(self):
self.storage = shared_like(self.variable)
return Aggregator(aggregation_scheme=self,
Expand All @@ -173,12 +171,73 @@ def get_aggregator(self):
readout_variable=self.storage)


def take_last(variable):
def _simple_aggregation(scheme, variable):
variable = variable.copy(variable.name)
variable.tag.aggregation_scheme = TakeLast(variable)
variable.tag.aggregation_scheme = scheme(variable)
return variable


take_last = partial(_simple_aggregation, TakeLast)


class Minimum(AggregationScheme):
"""Aggregation scheme which remembers only the minimum value."""
def _build_aggregator(self, accumulate_update):
initialized = shared_like(0.)
accumulate = ifelse(initialized, accumulate_update, self.variable)
return Aggregator(aggregation_scheme=self,
initialization_updates=[
(self.storage, tensor.zeros_like(self.storage)),
(initialized, tensor.zeros_like(initialized))
],
accumulation_updates=[
(self.storage, accumulate),
(initialized, tensor.ones_like(initialized))
],
readout_variable=self.storage)

def get_aggregator(self):
self.storage = shared_like(self.variable)
return self._build_aggregator(tensor.minimum(self.storage,
self.variable))

minimum = partial(_simple_aggregation, Minimum)


class Maximum(Minimum):
"""Aggregation scheme which remembers only the maximum value."""
def get_aggregator(self):
self.storage = shared_like(self.variable)
return self._build_aggregator(tensor.maximum(self.storage,
self.variable))

maximum = partial(_simple_aggregation, Maximum)


class Concatenate(Minimum):
"""Aggregation scheme which remembers values from all batches.
Parameters
----------
variable: :class:`~tensor.TensorVariable`
The variable that holds the desired value on a single batch.
"""
def __init__(self, variable):
# Add an extra axis to concatenate along. Must be non-broadcastable
# for concatenate to always work.
variable = (tensor.unbroadcast(tensor.shape_padleft(variable, 1), 0)
.copy(variable.name))
super(Concatenate, self).__init__(variable)

def get_aggregator(self):
self.storage = shared_like(self.variable)
return self._build_aggregator(tensor.concatenate([self.storage,
self.variable]))

concatenate = partial(_simple_aggregation, Concatenate)


@add_metaclass(ABCMeta)
class MonitoredQuantity(object):
"""The base class for monitored-quantities.
Expand Down
75 changes: 74 additions & 1 deletion tests/monitoring/test_aggregation.py
Expand Up @@ -6,7 +6,8 @@
from blocks import bricks
from blocks.bricks.base import application
from blocks.graph import ComputationGraph
from blocks.monitoring.aggregation import mean, Mean
from blocks.monitoring.aggregation import (mean, Mean, Minimum, Maximum,
Concatenate)
from blocks.utils import shared_floatx

from collections import OrderedDict
Expand Down Expand Up @@ -91,6 +92,78 @@ def test_mean_aggregator():
numpy.array([35], dtype=theano.config.floatX))


def test_min_max_aggregators():
num_examples = 4
batch_size = 2

features = numpy.array([[2, 3],
[2, 9],
[2, 4],
[5, 1]], dtype=theano.config.floatX)

dataset = IndexableDataset(OrderedDict([('features', features)]))

data_stream = DataStream(dataset,
iteration_scheme=SequentialScheme(num_examples,
batch_size))

x = tensor.matrix('features')
y = (x**2).sum(axis=0)
y.name = 'y'
z = y.min()
z.name = 'z'

y.tag.aggregation_scheme = Maximum(y)
z.tag.aggregation_scheme = Minimum(z)

assert_allclose(DatasetEvaluator([y]).evaluate(data_stream)['y'],
numpy.array([29, 90], dtype=theano.config.floatX))
assert_allclose(DatasetEvaluator([z]).evaluate(data_stream)['z'],
numpy.array([8], dtype=theano.config.floatX))

# Make sure accumulators are reset.
features = numpy.array([[2, 1],
[1, 3],
[1, -1],
[2.5, 1]], dtype=theano.config.floatX)

dataset = IndexableDataset(OrderedDict([('features', features)]))

data_stream = DataStream(dataset,
iteration_scheme=SequentialScheme(num_examples,
batch_size))
assert_allclose(DatasetEvaluator([y]).evaluate(data_stream)['y'],
numpy.array([7.25, 10], dtype=theano.config.floatX))
assert_allclose(DatasetEvaluator([z]).evaluate(data_stream)['z'],
numpy.array([2], dtype=theano.config.floatX))


def test_concatenate_aggregator():
num_examples = 4
batch_size = 2

features = numpy.array([[2, 3],
[2, 9],
[2, 4],
[5, 1]], dtype=theano.config.floatX)

dataset = IndexableDataset(OrderedDict([('features', features)]))

data_stream = DataStream(dataset,
iteration_scheme=SequentialScheme(num_examples,
batch_size))
x = tensor.matrix('features')
y = x.sum(axis=0).copy('y')
z = y.sum(axis=0).copy('z')
y.tag.aggregation_scheme = Concatenate(y)
z.tag.aggregation_scheme = Concatenate(z)

assert_allclose(DatasetEvaluator([y]).evaluate(data_stream)['y'],
numpy.array([[4, 12], [7, 5]], dtype=theano.config.floatX))
assert_allclose(DatasetEvaluator([z]).evaluate(data_stream)['z'],
numpy.array([16, 12], dtype=theano.config.floatX))


def test_aggregation_buffer_name_uniqueness():
x1 = tensor.scalar('x')
x2 = tensor.scalar('x')
Expand Down

0 comments on commit 1d6683f

Please sign in to comment.