Skip to content

Commit

Permalink
Improve PipelineTask sub-classing support (DM-16275)
Browse files Browse the repository at this point in the history
Changed return type of the get*DatasetTypes methods so that they can be
used by PipelineTask class itself (and can be meaningfully redefined in
subclasses). This adds new class `DatasetTypeDescriptor` which holds
dataset type instance and corresponding configuration options. Unit tests
were updated and extended to test new behavior.
  • Loading branch information
andy-slac committed Oct 24, 2018
1 parent 53081a1 commit 8546190
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 135 deletions.
205 changes: 121 additions & 84 deletions python/lsst/pipe/base/pipelineTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"""This module defines PipelineTask class and related methods.
"""

__all__ = ["PipelineTask"] # Classes in this module
__all__ = ["DatasetTypeDescriptor", "PipelineTask"] # Classes in this module

from lsst.daf.butler import DatasetType, StorageClassFactory
from .config import (InputDatasetConfig, OutputDatasetConfig,
Expand All @@ -46,6 +46,65 @@ def __init__(self, key, numDataIds):
"received {} DataIds").format(key, numDataIds))


class DatasetTypeDescriptor:
"""Describe DatasetType and its option for PipelineTask.
This class contains DatasetType and all relevant options that are used by
PipelineTask. Typically this is derived from configuration classes but
sub-classes of PipelineTask can also define additional DatasetTypes that
are not part of the task configuration.
Parameters
----------
datasetType : `DatasetType`
scalar : `bool`, optional
`True` if this is a scalar dataset, `None` for dataset types that
do not need scalar option.
"""

__slots__ = ("_datasetType", "_scalar")

def __init__(self, datasetType, scalar=None):
self._datasetType = datasetType
self._scalar = scalar

@classmethod
def fromConfig(cls, datasetConfig):
"""Make DatasetTypeDescriptor instance from configuration object.
Parameters
----------
datasetConfig :
Instance of one the `InputDatasetConfig`, `OutputDatasetConfig`,
`InitInputDatasetConfig`, or `InitOutputDatasetConfig` types
Returns
-------
description : `DatasetTypeDescriptor`
"""
# map storage class name to storage class
storageClass = StorageClassFactory().getStorageClass(datasetConfig.storageClass)

datasetType = DatasetType(name=datasetConfig.name,
dataUnits=datasetConfig.units,
storageClass=storageClass)
scalar = getattr(datasetConfig, 'scalar', None)
return cls(datasetType=datasetType, scalar=scalar)

@property
def datasetType(self):
"""`DatasetType` instance.
"""
return self._datasetType

@property
def scalar(self):
"""`True` if this is a scalar dataset, `None` for dataset types that
do not need scalar option.
"""
return self._scalar


class PipelineTask(Task):
"""Base class for all pipeline tasks.
Expand Down Expand Up @@ -123,12 +182,12 @@ def getInitOutputDatasets(self):

@classmethod
def getInputDatasetTypes(cls, config):
"""Return input dataset types for this task.
"""Return input dataset type descriptors for this task.
Default implementation finds all fields of type `InputDatasetConfig`
in configuration (non-recursively) and uses them for constructing
`DatasetType` instances. The keys of these fields are used as keys
in returned dictionary. Subclasses can override this behavior.
`DatasetTypeDescriptor` instances. The keys of these fields are used
as keys in returned dictionary. Subclasses can override this behavior.
Parameters
----------
Expand All @@ -139,19 +198,19 @@ def getInputDatasetTypes(cls, config):
Returns
-------
Dictionary where key is the name (arbitrary) of the input dataset
and value is the `butler.DatasetType` instance. Default
and value is the `DatasetTypeDescriptor` instance. Default
implementation uses configuration field name as dictionary key.
"""
return cls.getDatasetTypes(config, InputDatasetConfig)

@classmethod
def getOutputDatasetTypes(cls, config):
"""Return output dataset types for this task.
"""Return output dataset type descriptors for this task.
Default implementation finds all fields of type `OutputDatasetConfig`
in configuration (non-recursively) and uses them for constructing
`DatasetType` instances. The keys of these fields are used as keys
in returned dictionary. Subclasses can override this behavior.
`DatasetTypeDescriptor` instances. The keys of these fields are used
as keys in returned dictionary. Subclasses can override this behavior.
Parameters
----------
Expand All @@ -162,24 +221,24 @@ def getOutputDatasetTypes(cls, config):
Returns
-------
Dictionary where key is the name (arbitrary) of the output dataset
and value is the `butler.DatasetType` instance. Default
and value is the `DatasetTypeDescriptor` instance. Default
implementation uses configuration field name as dictionary key.
"""
return cls.getDatasetTypes(config, OutputDatasetConfig)

@classmethod
def getInitInputDatasetTypes(cls, config):
"""Return dataset types that can be used to retrieve the
"""Return dataset type descriptors that can be used to retrieve the
``initInputs`` constructor argument.
Datasets used in initialization may not be associated with any
DataUnits (i.e. their data IDs must be empty dictionaries).
Default implementation finds all fields of type
`InitInputInputDatasetConfig` in configuration (non-recursively) and
uses them for constructing `DatasetType` instances. The keys of these
fields are used as keys in returned dictionary. Subclasses can
override this behavior.
uses them for constructing `DatasetTypeDescriptor` instances. The
keys of these fields are used as keys in returned dictionary.
Subclasses can override this behavior.
Parameters
----------
Expand All @@ -190,7 +249,7 @@ def getInitInputDatasetTypes(cls, config):
Returns
-------
Dictionary where key is the name (arbitrary) of the input dataset
and value is the `butler.DatasetType` instance. Default
and value is the `DatasetTypeDescriptor` instance. Default
implementation uses configuration field name as dictionary key.
When the task requires no initialization inputs, should return an
Expand All @@ -200,16 +259,16 @@ def getInitInputDatasetTypes(cls, config):

@classmethod
def getInitOutputDatasetTypes(cls, config):
"""Return dataset types that can be used to write the objects
returned by `getOutputDatasets`.
"""Return dataset type descriptors that can be used to write the
objects returned by `getOutputDatasets`.
Datasets used in initialization may not be associated with any
DataUnits (i.e. their data IDs must be empty dictionaries).
Default implementation finds all fields of type
`InitOutputDatasetConfig` in configuration (non-recursively) and uses
them for constructing `DatasetType` instances. The keys of these
fields are used as keys in returned dictionary. Subclasses can
them for constructing `DatasetTypeDescriptor` instances. The keys of
these fields are used as keys in returned dictionary. Subclasses can
override this behavior.
Parameters
Expand All @@ -221,7 +280,7 @@ def getInitOutputDatasetTypes(cls, config):
Returns
-------
Dictionary where key is the name (arbitrary) of the output dataset
and value is the `butler.DatasetType` instance. Default
and value is the `DatasetTypeDescriptor` instance. Default
implementation uses configuration field name as dictionary key.
When the task produces no initialization outputs, should return an
Expand All @@ -231,11 +290,11 @@ def getInitOutputDatasetTypes(cls, config):

@classmethod
def getDatasetTypes(cls, config, configClass):
"""Return dataset types defined in task configuration .
"""Return dataset type descriptors defined in task configuration.
This method can be used by other methods that need to extract dataset
types from task configuration (e.g. :py:method:`getInputDatasetTypes`
or sub-class methods).
types from task configuration (e.g. `getInputDatasetTypes` or
sub-class methods).
Parameters
----------
Expand All @@ -248,16 +307,15 @@ def getDatasetTypes(cls, config, configClass):
Returns
-------
Dictionary where key is the name (arbitrary) of the output dataset
and value is the `butler.DatasetType` instance. Default
and value is the `DatasetTypeDescriptor` instance. Default
implementation uses configuration field name as dictionary key.
When the task produces no initialization outputs, should return an
empty dict.
Returns empty dict if configuration has no fields with the specified
``configClass``.
"""
dsTypes = {}
for key, value in config.items():
if isinstance(value, configClass):
dsTypes[key] = cls.makeDatasetType(value)
dsTypes[key] = DatasetTypeDescriptor.fromConfig(value)
return dsTypes

def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds):
Expand Down Expand Up @@ -389,35 +447,35 @@ def runQuantum(self, quantum, butler):
# get all data from butler
inputDataIds = {}
inputs = {}
for key, value in self.config.items():
if isinstance(value, InputDatasetConfig):
dataRefs = quantum.predictedInputs[value.name]
dataIds = [dataRef.dataId for dataRef in dataRefs]
data = [butler.get(dataRef) for dataRef in dataRefs]
if value.scalar:
# unpack single-item lists
if len(dataRefs) != 1:
raise ScalarError(key, len(dataRefs))
data = data[0]
dataIds = dataIds[0]
inputDataIds[key] = dataIds
inputs[key] = data
descriptors = self.getInputDatasetTypes(self.config)
for key, descriptor in descriptors.items():
dataRefs = quantum.predictedInputs[descriptor.datasetType.name]
dataIds = [dataRef.dataId for dataRef in dataRefs]
data = [butler.get(dataRef) for dataRef in dataRefs]
if descriptor.scalar:
# unpack single-item lists
if len(dataRefs) != 1:
raise ScalarError(key, len(dataRefs))
data = data[0]
dataIds = dataIds[0]
inputDataIds[key] = dataIds
inputs[key] = data

# lists of DataRefs/DataIds for output datasets
outputDataRefs = {}
outputDataIds = {}
for key, value in self.config.items():
if isinstance(value, OutputDatasetConfig):
dataRefs = quantum.outputs[value.name]
dataIds = [dataRef.dataId for dataRef in dataRefs]
if value.scalar:
# unpack single-item lists
if len(dataRefs) != 1:
raise ScalarError(key, len(dataRefs))
dataRefs = dataRefs[0]
dataIds = dataIds[0]
outputDataRefs[key] = dataRefs
outputDataIds[key] = dataIds
descriptors = self.getOutputDatasetTypes(self.config)
for key, descriptor in descriptors.items():
dataRefs = quantum.outputs[descriptor.datasetType.name]
dataIds = [dataRef.dataId for dataRef in dataRefs]
if descriptor.scalar:
# unpack single-item lists
if len(dataRefs) != 1:
raise ScalarError(key, len(dataRefs))
dataRefs = dataRefs[0]
dataIds = dataIds[0]
outputDataRefs[key] = dataRefs
outputDataIds[key] = dataIds

# call run method with keyword arguments
struct = self.adaptArgsAndRun(inputs, inputDataIds, outputDataIds)
Expand Down Expand Up @@ -447,38 +505,17 @@ def saveStruct(self, struct, outputDataRefs, butler):
Data butler instance.
"""
structDict = struct.getDict()
for key, value in self.config.items():
if isinstance(value, OutputDatasetConfig):
dataList = structDict[key]
dataRefs = outputDataRefs[key]
if not isinstance(dataRefs, list):
# scalar outputs, make them lists again
dataRefs = [dataRefs]
dataList = [dataList]
# TODO: check that data objects and data refs are aligned
for dataRef, data in zip(dataRefs, dataList):
butler.put(data, dataRef.datasetType.name, dataRef.dataId)

@classmethod
def makeDatasetType(cls, dsConfig):
"""Create new instance of the `DatasetType` from task config.
Parameters
----------
dsConfig : `pexConfig.Config`
Instance of `InputDatasetConfig`, `OutputDatasetConfig`,
`InitInputDatasetConfig`, or `InitOutputDatasetConfig`.
Returns
-------
`butler.DatasetType` instance.
"""
# map storage class name to storage class
storageClass = StorageClassFactory().getStorageClass(dsConfig.storageClass)

return DatasetType(name=dsConfig.name,
dataUnits=dsConfig.units,
storageClass=storageClass)
descriptors = self.getOutputDatasetTypes(self.config)
for key in descriptors.keys():
dataList = structDict[key]
dataRefs = outputDataRefs[key]
if not isinstance(dataRefs, list):
# scalar outputs, make them lists again
dataRefs = [dataRefs]
dataList = [dataList]
# TODO: check that data objects and data refs are aligned
for dataRef, data in zip(dataRefs, dataList):
butler.put(data, dataRef.datasetType.name, dataRef.dataId)

def getResourceConfig(self):
"""Return resource configuration for this task.
Expand Down

0 comments on commit 8546190

Please sign in to comment.