Skip to content

Commit

Permalink
Merge pull request #1007 from rizar/monitored_quantities_for_training…
Browse files Browse the repository at this point in the history
…_data_monitoring

WIP: Monitoring of non-Theano quantities on training data.
  • Loading branch information
rizar committed Mar 4, 2016
2 parents f8e85f7 + 65630b4 commit c9a1794
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 56 deletions.
85 changes: 65 additions & 20 deletions blocks/extensions/monitoring.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""Extensions for monitoring the training process."""
import logging

import theano

from blocks.extensions import SimpleExtension, TrainingExtension
from blocks.algorithms import DifferentiableCostMinimizer
from blocks.monitoring.evaluators import AggregationBuffer, DatasetEvaluator
from blocks.monitoring.aggregation import MonitoredQuantity, take_last
from blocks.monitoring.evaluators import (
AggregationBuffer, MonitoredQuantityBuffer, DatasetEvaluator)

PREFIX_SEPARATOR = '_'
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -95,9 +99,10 @@ class TrainingDataMonitoring(SimpleExtension, MonitoringExtension):
Parameters
----------
variables : list of :class:`~tensor.TensorVariable`
The variables to monitor. The variable names are used as record
names in the logs.
variables : list of :class:`~tensor.TensorVariable` or
:class:`~blocks.monitoring.aggregation.MonitoredQuantity`
The variables or non-Theano quantities to monitor.
The variable names are used as record names in the logs.
Notes
-----
Expand All @@ -111,33 +116,73 @@ class TrainingDataMonitoring(SimpleExtension, MonitoringExtension):
def __init__(self, variables, **kwargs):
kwargs.setdefault("before_training", True)
super(TrainingDataMonitoring, self).__init__(**kwargs)
self._buffer = AggregationBuffer(variables, use_take_last=True)
self.add_condition(['after_batch'], arguments=('just_aggregate',))

self._non_variables = []
self._variables = []
for variable_or_not in variables:
if isinstance(variable_or_not, theano.Variable):
self._variables.append(variable_or_not)
elif isinstance(variable_or_not, MonitoredQuantity):
self._non_variables.append(variable_or_not)
else:
raise ValueError("can not monitor {}".format(variable_or_not))

self._non_variables = MonitoredQuantityBuffer(self._non_variables)
self._required_for_non_variables = AggregationBuffer(
[take_last(v) for v in self._non_variables.requires])
self._variables = AggregationBuffer(
self._variables, use_take_last=True)
self._last_time_called = -1

def do(self, callback_name, *args):
"""Initializes the buffer or commits the values to the log.
What this method does depends on from what callback it is called.
When called within `before_training`, it initializes the
aggregation buffer and instructs the training algorithm what
additional computations should be carried at each step by adding
corresponding updates to it. In all other cases it writes
aggregated values of the monitored variables to the log.
What this method does depends on from what callback it is called
and with which arguments. When called within `before_training`, it
initializes the aggregation buffer and instructs the training
algorithm what additional computations should be carried at each
step by adding corresponding updates to it. In most_other cases it
writes aggregated values of the monitored variables to the log. An
exception is when an argument `just_aggregate` is given: in this
cases it updates the values of monitored non-Theano quantities, but
does not write anything to the log.
"""
data, args = self.parse_args(callback_name, args)
if callback_name == 'before_training':
if not isinstance(self.main_loop.algorithm,
DifferentiableCostMinimizer):
raise ValueError
self.main_loop.algorithm.add_updates(
self._buffer.accumulation_updates)
self._buffer.initialize_aggregators()
self._variables.accumulation_updates)
self.main_loop.algorithm.add_updates(
self._required_for_non_variables.accumulation_updates)
self._variables.initialize_aggregators()
self._required_for_non_variables.initialize_aggregators()
self._non_variables.initialize_quantities()
else:
if (self.main_loop.status['iterations_done'] ==
# When called first time at any iterations, update
# monitored non-Theano quantities
if (self.main_loop.status['iterations_done'] >
self._last_time_called):
raise Exception("TrainingDataMonitoring.do should be invoked"
" no more than once per iteration")
self._last_time_called = self.main_loop.status['iterations_done']
self.add_records(self.main_loop.log,
self._buffer.get_aggregated_values().items())
self._buffer.initialize_aggregators()
self._non_variables.aggregate_quantities(
list(self._required_for_non_variables
.get_aggregated_values().values()))
self._required_for_non_variables.initialize_aggregators()
self._last_time_called = (
self.main_loop.status['iterations_done'])
# If only called to update non-Theano quantities,
# do just that
if args == ('just_aggregate',):
return
# Otherwise, also output current values of from the accumulators
# to the log.
self.add_records(
self.main_loop.log,
self._variables.get_aggregated_values().items())
self._variables.initialize_aggregators()
self.add_records(
self.main_loop.log,
self._non_variables.get_aggregated_values().items())
self._non_variables.initialize_quantities()
30 changes: 21 additions & 9 deletions blocks/monitoring/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class Aggregator(object):
Updates that specify how a new batch of data gets processed
by this Aggregator. *Can refer to model inputs.*
readout_variable : :class:`~tensor.TensorVariable`
Theano variable that holds the final value based on accumulated
Theano variable that holds the final value based on aggregated
partial results. *readout_variable must only consist of shared
variables and constants.*
Expand Down Expand Up @@ -108,7 +108,7 @@ def get_aggregator(self):
numerator_acc = shared_like(self.numerator)
denominator_acc = shared_like(self.denominator)

# Dummy default expression to use as the previously-accumulated
# Dummy default expression to use as the previously-aggregated
# value, that has the same shape as the new result
numerator_zeros = tensor.as_tensor(self.numerator).zeros_like()
denominator_zeros = tensor.as_tensor(self.denominator).zeros_like()
Expand Down Expand Up @@ -172,15 +172,21 @@ def get_aggregator(self):
readout_variable=self.storage)


def take_last(variable):
variable = variable.copy(variable.name)
variable.tag.aggregation_scheme = TakeLast(variable)
return variable


@add_metaclass(ABCMeta)
class MonitoredQuantity(object):
"""The base class for monitored-quantities.
To monitor a non-Theano quantity in Blocks you have to implement this
interface for it. The initialize method initializes accumulators and
the parameters needed to compute this quantity, accumulate method
accumulates results for every batch, and finally readout is called
to get the accumulated results.
the parameters needed to compute this quantity, aggregate method
aggregates results for every batch, and finally readout is called
to get the aggregated results.
Attributes
----------
Expand All @@ -207,11 +213,17 @@ def initialize(self):
pass

@abstractmethod
def accumulate(self):
"""Accumulate results for every batch."""
def aggregate(self, *args):
r"""Aggregate results for every batch.
\*args : list of :class:`~numpy.ndarray`
The values of the variables required to aggregate the
value of the quantity.
"""
pass

@abstractmethod
def readout(self):
"""Readout the accumulated results to capture the final result."""
def get_aggregated_value(self):
"""Obtain the result of aggregation."""
pass
36 changes: 18 additions & 18 deletions blocks/monitoring/evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
class MonitoredQuantityBuffer(object):
"""Intermediate results of aggregating values of monitored-quantity.
Accumulate results for a list of monitored-quantity for every
Aggregate results for a list of monitored-quantity for every
single batch. Provides initialization and readout routines to
initialize each quantity and capture its accumulated results.
initialize each quantity and capture its aggregated results.
Parameters
Expand Down Expand Up @@ -50,29 +50,29 @@ def __init__(self, quantities):
self._computation_graph = ComputationGraph(self.requires)
self.inputs = self._computation_graph.inputs

def initialize(self):
def initialize_quantities(self):
"""Initialize the quantities."""
self._initialized = True
for quantity in self.quantities:
quantity.initialize()

def get_aggregated_values(self):
"""Readout the accumulated values."""
"""Get the aggregated values."""
if not self._initialized:
raise Exception("To readout you must first initialize, then"
"process batches!")
else:
ret_vals = [q.readout() for q in self.quantities]
ret_vals = [q.get_aggregated_value() for q in self.quantities]
return dict(zip(self.quantity_names, ret_vals))

def accumulate_quantities(self, numerical_values):
"""Accumulate the results for every batch."""
def aggregate_quantities(self, numerical_values):
"""Aggregate the results for every batch."""
if not self._initialized:
raise Exception("To readout you must first initialize, then"
"process batches!")
else:
for quantity in self.quantities:
quantity.accumulate(
quantity.aggregate(
*[numerical_values[self.requires.index(requirement)]
for requirement in quantity.requires])

Expand Down Expand Up @@ -193,7 +193,7 @@ def get_aggregated_values(self):
raise Exception("To readout you must first initialize, then"
"process batches!")
ret_vals = self._readout_fun()
return dict(equizip(self.variable_names, ret_vals))
return OrderedDict(equizip(self.variable_names, ret_vals))


class DatasetEvaluator(object):
Expand Down Expand Up @@ -281,15 +281,15 @@ def _compile(self):

if inputs != []:
self.unique_inputs = list(set(inputs))
self._accumulate_fun = theano.function(self.unique_inputs,
outputs,
updates=updates)
self._aggregate_fun = theano.function(self.unique_inputs,
outputs,
updates=updates)
else:
self._accumulate_fun = None
self._aggregate_fun = None

def initialize_aggregators(self):
self.theano_buffer.initialize_aggregators()
self.monitored_quantities_buffer.initialize()
self.monitored_quantities_buffer.initialize_quantities()

def process_batch(self, batch):
try:
Expand All @@ -300,9 +300,9 @@ def process_batch(self, batch):
"Not all data sources required for monitoring were"
" provided. The list of required data sources:"
" {}.".format(input_names))
if self._accumulate_fun is not None:
numerical_values = self._accumulate_fun(**batch)
self.monitored_quantities_buffer.accumulate_quantities(
if self._aggregate_fun is not None:
numerical_values = self._aggregate_fun(**batch)
self.monitored_quantities_buffer.aggregate_quantities(
numerical_values)

def get_aggregated_values(self):
Expand All @@ -326,7 +326,7 @@ def evaluate(self, data_stream):
"""
self.initialize_aggregators()
if self._accumulate_fun is not None:
if self._aggregate_fun is not None:
for batch in data_stream.get_epoch_iterator(as_dict=True):
self.process_batch(batch)
else:
Expand Down
37 changes: 33 additions & 4 deletions tests/extensions/test_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,25 @@
from blocks.main_loop import MainLoop


class MeanFeaturesTimesTarget(aggregation.MonitoredQuantity):

def initialize(self):
self._aggregated = 0.
self._num_batches = 0

def aggregate(self, features, targets):
self._aggregated += features * targets
self._num_batches += 1

def get_aggregated_value(self):
return self._aggregated / self._num_batches


def test_training_data_monitoring():
weights = numpy.array([-1, 1], dtype=theano.config.floatX)
features = [numpy.array(f, dtype=theano.config.floatX)
for f in [[1, 2], [3, 4], [5, 6]]]
targets = [(weights * f).sum() for f in features]
for f in [[1, 2], [3, 5], [5, 8]]]
targets = numpy.array([(weights * f).sum() for f in features])
n_batches = 3
dataset = IterableDataset(dict(features=features, targets=targets))

Expand All @@ -35,15 +49,23 @@ def before_batch(self, data):
((W.get_value() * data["features"]).sum() -
data["targets"]) ** 2)

# Note, that unlike a Theano variable, a monitored
# quantity can't be reused in more than one TrainingDataMonitoring

ftt1 = MeanFeaturesTimesTarget(
requires=[x, y], name='ftt1')
ftt2 = MeanFeaturesTimesTarget(
requires=[x, y], name='ftt2')

main_loop = MainLoop(
model=None, data_stream=dataset.get_example_stream(),
algorithm=GradientDescent(cost=cost, parameters=[W],
step_rule=Scale(0.001)),
extensions=[
FinishAfter(after_n_epochs=1),
TrainingDataMonitoring([W_sum, cost, V], prefix="train1",
TrainingDataMonitoring([W_sum, cost, V, ftt1], prefix="train1",
after_batch=True),
TrainingDataMonitoring([aggregation.mean(W_sum), cost],
TrainingDataMonitoring([aggregation.mean(W_sum), cost, ftt2],
prefix="train2", after_epoch=True),
TrueCostExtension()])

Expand All @@ -66,3 +88,10 @@ def before_batch(self, data):
main_loop.log[n_batches]['train2_W_sum'],
sum([main_loop.log[i]['train1_W_sum']
for i in range(1, n_batches + 1)]) / n_batches)

# Check monitoring of non-Theano quantites
for i in range(n_batches):
assert_allclose(main_loop.log[i + 1]['train1_ftt1'],
features[i] * targets[i])
assert_allclose(main_loop.log[n_batches]['train2_ftt2'],
(features * targets[:, None]).mean(axis=0))
6 changes: 3 additions & 3 deletions tests/monitoring/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ def test_parameter_monitor():
aggregator = monitor.tag.aggregation_scheme.get_aggregator()
initialize = theano.function([], updates=aggregator.initialization_updates)
initialize()
accumulate = theano.function([X], updates=aggregator.accumulation_updates)
accumulate(numpy.arange(4, dtype=theano.config.floatX).reshape(2, 2))
accumulate(numpy.arange(4, 10, dtype=theano.config.floatX).reshape(3, 2))
aggregate = theano.function([X], updates=aggregator.accumulation_updates)
aggregate(numpy.arange(4, dtype=theano.config.floatX).reshape(2, 2))
aggregate(numpy.arange(4, 10, dtype=theano.config.floatX).reshape(3, 2))
assert_allclose(aggregator.readout_variable.eval(), 4.5)


Expand Down
4 changes: 2 additions & 2 deletions tests/monitoring/test_monitored_quantity.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ def __init__(self, **kwargs):
def initialize(self):
self.total_cross_entropy, self.examples_seen = 0.0, 0

def accumulate(self, target, predicted):
def aggregate(self, target, predicted):
import numpy
self.total_cross_entropy += -(target * numpy.log(predicted)).sum()
self.examples_seen += 1

def readout(self):
def get_aggregated_value(self):
res = self.total_cross_entropy / self.examples_seen
return res

Expand Down

0 comments on commit c9a1794

Please sign in to comment.