Skip to content

Commit

Permalink
Merge pull request #60 from lsst/tickets/DM-14823
Browse files Browse the repository at this point in the history
DM-14823: Simplify PipelineTask API for common use case
  • Loading branch information
andy-slac committed Aug 4, 2018
2 parents 0494b80 + 41ad3e3 commit 41f434e
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 62 deletions.
29 changes: 21 additions & 8 deletions python/lsst/pipe/base/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,26 @@ def wrappedFunc(doc, name, storageClass, check=None):
return factory(**{k: v for k, v in locals().items() if k != 'factory'})
# This factory does not take a units argument, so set the variables for the
# units documentation to empty python strings
unitsDoc = ""
unitsString = ""
extraDoc = ""
extraFields = ""
elif issubclass(dtype, _DatasetTypeConfig):
# Handle dataset types like InputDatasetConfig, note these take a units argument
def wrappedFunc(doc, name, units, storageClass, check=None):
def wrappedFunc(doc, name, units, storageClass, scalar=False, check=None):
return factory(**{k: v for k, v in locals().items() if k != 'factory'})
# Set the string corresponding to the units parameter documentation
# formatting is to support final output of the docstring variable
unitsDoc = """
extraDoc = """
units : iterable of `str`
Iterable of DataUnits for this `~lsst.daf.butler.DatasetType`"""
Iterable of DataUnits for this `~lsst.daf.butler.DatasetType`
scalar : `bool`, optional
If set to True then only a single dataset is expected on input or
produced on output. In that case list of objects/DataIds will be
unpacked before calling task methods, returned data is expected
to contain single objects as well."""
# Set a string to add the units argument to the list of arguments in the
# docstring explanation section formatting is to support final output
# of the docstring variable
unitsString = ", units,"
extraFields = ", units, scalar,"
else:
# if someone tries to create a config factory for a type that is not
# handled raise and exception
Expand All @@ -154,7 +159,7 @@ def wrappedFunc(doc, name, units, storageClass, check=None):
The input arguments for this class are a combination of the arguments for
`~lsst.pex.config.ConfigField` and `{dtype.__name__}`. The arguments
doc and check come from `~lsst.pex.config.ConfigField`, while name{unitsString}
doc and check come from `~lsst.pex.config.ConfigField`, while name{extraFields}
and storageClass come from `{dtype.__name__}`.
Parameters
Expand All @@ -163,7 +168,7 @@ def wrappedFunc(doc, name, units, storageClass, check=None):
Documentation string for the `{dtype.__name__}`
name : `str`
Name of the `~lsst.daf.butler.DatasetType` in the returned
`{dtype.__name__}`{indent(dedent(unitsDoc), " " * 4)}
`{dtype.__name__}`{indent(dedent(extraDoc), " " * 4)}
storageClass : `str`
Name of the `~lsst.daf.butler.StorageClass` in the `{dtype.__name__}`
check : callable
Expand Down Expand Up @@ -216,6 +221,14 @@ class should not be used directly, instead one of `InputDatasetConfig` or
"""
units = pexConfig.ListField(dtype=str,
doc="list of DataUnits for this DatasetType")
scalar = pexConfig.Field(dtype=bool,
default=False,
optional=True,
doc=("If set to True then only a single dataset is expected "
"on input or produced on output. In that case list of "
"objects/DataIds will be unpacked before calling task "
"methods, returned data is expected to contain single "
"objects as well."))


class InputDatasetConfig(_DatasetTypeConfig):
Expand Down
230 changes: 179 additions & 51 deletions python/lsst/pipe/base/pipelineTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@
from .task import Task


class ScalarError(TypeError):
"""Exception raised when dataset type is configured as scalar
but there are multiple DataIds in a Quantum for that dataset.
Parameters
----------
key : `str`
Name of the configuration field for dataset type.
numDataIds : `int`
Actual number of DataIds in a Quantum for this dataset type.
"""
def __init__(self, key, numDataIds):
super().__init__(("Expected scalar for output dataset field {}, "
"received {} DataIds").format(key, numDataIds))


class PipelineTask(Task):
"""Base class for all pipeline tasks.
Expand Down Expand Up @@ -72,9 +88,9 @@ class PipelineTask(Task):
initInputs : `dict`, optional
A dictionary of objects needed to construct this PipelineTask, with
keys matching the keys of the dictionary returned by
:py:meth:`getInitInputDatasetTypes` and values equivalent to what
would be obtained by calling `Butler.get` with those DatasetTypes and
no data IDs. While it is optional for the base class, subclasses are
`getInitInputDatasetTypes` and values equivalent to what would be
obtained by calling `Butler.get` with those DatasetTypes and no data
IDs. While it is optional for the base class, subclasses are
permitted to require this argument.
"""

Expand All @@ -98,10 +114,10 @@ def getInitOutputDatasets(self):
-------
datasets : `dict`
Dictionary with keys that match those of the dict returned by
:py:meth:`getInitOutputDatasetTypes` values that can be written
by calling `Butler.put` with those DatasetTypes and no data IDs.
An empty `dict` should be returned by tasks that produce no
initialization outputs.
`getInitOutputDatasetTypes` values that can be written by calling
`Butler.put` with those DatasetTypes and no data IDs. An empty
`dict` should be returned by tasks that produce no initialization
outputs.
"""
return {}

Expand Down Expand Up @@ -244,47 +260,114 @@ def getDatasetTypes(cls, config, configClass):
dsTypes[key] = cls.makeDatasetType(value)
return dsTypes

def run(self, *args, **kwargs):
def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds):
"""Run task algorithm on in-memory data.
This function is the one that actually operates on the data and usually
returning a `Struct` with the produced results. This method will be
overridden by every subclass. It operates on in-memory data structures
(or data proxies) and cannot access any external data such as data
butler or databases. All interaction with external data happens in
`runQuantum` method.
With default implementation of `runQuantum()` this method will
receive keyword arguments whose names will be the same as names
of configuration fields describing input and output dataset types.
For input dataset types argument values will be lists of the data
object retrieved from data butler. For output dataset types argument
values will be the lists of units from DataRefs in a Quantum.
This method is called by `runQuantum` to operate on input in-memory
data and produce coressponding output in-memory data. It receives
arguments which are dictionaries with input data and input/output
DataIds. Many simple tasks do not need to know DataIds so default
implementation of this method calls `run` method passing input data
objects as keyword arguments. Most simple tasks will implement `run`
method, more complex tasks that need to know about output DataIds
will override this method instead.
All three arguments to this method are dictionaries with keys equal
to the name of the configuration fields for dataset type. If dataset
type is configured with ``scalar`` fiels set to ``True`` then it is
expected that only one dataset appears on input or output for that
dataset type and dictionary value will be a single data object or
DataId. Otherwise if ``scalar`` is ``False`` (default) then value
will be a list (even if only one item is in the list).
The method returns `Struct` instance with attributes matching the
configuration fields for output dataset types. Values stored in
returned struct are single object if ``scalar`` is ``True`` or
list of objects otherwise. If tasks produces more than one object
for some dataset type then data objects returned in ``struct`` must
match in count and order corresponding DataIds in ``outputDataIds``.
Parameters
----------
inputData : `dict`
Dictionary whose keys are the names of the configuration fields
describing input dataset types and values are Python-domain data
objects (or lists of objects) retrieved from data butler.
inputDataIds : `dict`
Dictionary whose keys are the names of the configuration fields
describing input dataset types and values are DataIds (or lists
of DataIds) that task consumes for corresponding dataset type.
DataIds are guaranteed to match data objects in ``inputData``
outputDataIds : `dict`
Dictionary whose keys are the names of the configuration fields
describing output dataset types and values are DataIds (or lists
of DataIds) that task is to produce for corresponding dataset
type.
Returns
-------
struct : `Struct`
Standard convention is that this method should return `Struct`
instance containing all output data. Struct attribute names
should correspond to the names of the configuration fields
describing task output dataset types. If something different
is returned then `saveStruct` method has to be re-implemented
accordingly.
"""
return self.run(**inputData)

def run(self, **kwargs):
"""Run task algorithm on in-memory data.
This method should be implemented in a subclass unless tasks overrides
`adaptArgsAndRun` to do something different from its default
implementation. With default implementation of `adaptArgsAndRun` this
method will receive keyword arguments whose names will be the same as
names of configuration fields describing input dataset types. Argument
values will be data objects retrieved from data butler. If a dataset
type is configured with ``scalar`` field set to ``True`` then argument
value will be a single object, otherwise it will be a list of objects.
If the task needs to know its input or output DataIds then it has to
override `adaptArgsAndRun` method instead.
Returns
-------
struct : `Struct`
See description of `adaptArgsAndRun` method.
Examples
--------
Typical implementation of this method may look like::
def run(self, input, calib):
# "input", "calib", and "output" are the names of the config fields
# Assuming that input/calib datasets are `scalar` they are simple objects,
# do something with inputs and calibs, produce output image.
image = self.makeImage(input, calib)
# If output dataset is `scalar` then return object, not list
return Struct(output=image)
"""
raise NotImplementedError("run() is not implemented")

def runQuantum(self, quantum, butler):
"""Execute PipelineTask algorithm on single quantum of data.
Typical implementation of this method will use inputs from quantum
to retrieve Python-domain objects from data butler and call `run()`
method on that data. On return from `run()` this method will
extract data from returned `Struct` instance and save that data
to butler.
Default implementaion retrieves all input data in quantum graph
and calls `run()` method with keyword arguments where name of the
keyword argument is the same as the name of the configuration field
defining input DatasetType. Additionally it also passes keyword
arguments that correspond to output dataset types, each keyword
argument will have the list of units for corresponding output DataRefs.
The `Struct` returned from `run()` is expected to contain data
attributes with the names equal to the names of the configuration
fields defining output dataset types. The values of the data
attributes must be lists of data bjects corresponding to the units
passed as keyword arguments. All data objects will be saved in butler
using DataRefs from Quantum's output dictionary.
to retrieve Python-domain objects from data butler and call
`adaptArgsAndRun` method on that data. On return from
`adaptArgsAndRun` this method will extract data from returned
`Struct` instance and save that data to butler.
The `Struct` returned from `adaptArgsAndRun` is expected to contain
data attributes with the names equal to the names of the
configuration fields defining output dataset types. The values of
the data attributes must be data objects corresponding to
the DataIds of output dataset types. All data objects will be
saved in butler using DataRefs from Quantum's output dictionary.
This method does not return anything to the caller, on errors
corresponding exception is raised.
Expand All @@ -299,34 +382,79 @@ def runQuantum(self, quantum, butler):
Raises
------
Any exceptions that happen in data butler or in `run()` method.
`ScalarError` if a dataset type is configured as scalar but receives
multiple DataIds in `quantum`. Any exceptions that happen in data
butler or in `adaptArgsAndRun` method.
"""
# get all data from butler
inputDataIds = {}
inputs = {}
for key, value in self.config.items():
if isinstance(value, InputDatasetConfig):
dataRefs = quantum.predictedInputs[value.name]
inputs[key] = [butler.get(dataRef.datasetType.name, dataRef.dataId)
for dataRef in dataRefs]

# lists of units for output datasets
outUnits = {}
dataIds = [dataRef.dataId for dataRef in dataRefs]
data = [butler.get(dataRef) for dataRef in dataRefs]
if value.scalar:
# unpack single-item lists
if len(dataRefs) != 1:
raise ScalarError(key, len(dataRefs))
data = data[0]
dataIds = dataIds[0]
inputDataIds[key] = dataIds
inputs[key] = data

# lists of DataRefs/DataIds for output datasets
outputDataRefs = {}
outputDataIds = {}
for key, value in self.config.items():
if isinstance(value, OutputDatasetConfig):
dataRefs = quantum.outputs[value.name]
outUnits[key] = [dataRef.dataId for dataRef in dataRefs]
dataIds = [dataRef.dataId for dataRef in dataRefs]
if value.scalar:
# unpack single-item lists
if len(dataRefs) != 1:
raise ScalarError(key, len(dataRefs))
dataRefs = dataRefs[0]
dataIds = dataIds[0]
outputDataRefs[key] = dataRefs
outputDataIds[key] = dataIds

# call run method with keyword arguments
struct = self.run(**inputs, **outUnits)
struct = self.adaptArgsAndRun(inputs, inputDataIds, outputDataIds)

# store produced ouput data
self.saveStruct(struct, outputDataRefs, butler)

# save data in butler, convention is that returned struct
# has data field(s) with the same names as the config fields
# defining DatasetTypes
def saveStruct(self, struct, outputDataRefs, butler):
"""Save data in butler.
Convention is that struct returned from ``run()`` method has data
field(s) with the same names as the config fields defining
output DatasetTypes. Subclasses may override this method to implement
different convention for `Struct` content or in case any
post-processing of data may be needed.
Parameters
----------
struct : `Struct`
Data produced by the task packed into `Struct` instance
outputDataRefs : `dict`
Dictionary whose keys are the names of the configuration fields
describing output dataset types and values are lists of DataRefs.
DataRefs must match corresponding data objects in ``struct`` in
number and order.
butler : object
Data butler instance.
"""
structDict = struct.getDict()
for key, value in self.config.items():
if isinstance(value, OutputDatasetConfig):
dataList = structDict[key]
dataRefs = quantum.outputs[value.name]
dataRefs = outputDataRefs[key]
if not isinstance(dataRefs, list):
# scalar outputs, make them lists again
dataRefs = [dataRefs]
dataList = [dataList]
# TODO: check that data objects and data refs are aligned
for dataRef, data in zip(dataRefs, dataList):
butler.put(data, dataRef.datasetType.name, dataRef.dataId)
Expand Down
2 changes: 2 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ class ConfigWithDatasets(pexConfig.Config):
input2 = pipeBase.InputDatasetField(name="in2",
units=["UnitA", "UnitB"],
storageClass="SCB",
scalar=True,
doc="")
output = pipeBase.OutputDatasetField(name="out",
units=["UnitB", "UnitC"],
storageClass="SCC",
scalar=False,
doc="")
initInput = pipeBase.InitInputDatasetField(name="init_input",
storageClass="SCX",
Expand Down

0 comments on commit 41f434e

Please sign in to comment.