Skip to content

Commit

Permalink
Merge pull request #110 from lsst/tickets/DM-22162
Browse files Browse the repository at this point in the history
DM-22162: Add metadata writing to PipelineTask execution logic (pipe_base)
  • Loading branch information
andy-slac committed Dec 10, 2019
2 parents 3dc8cbe + 8587f3b commit 204a9c6
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 16 deletions.
4 changes: 3 additions & 1 deletion python/lsst/pipe/base/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ class to allow configuration of the connections class. This dynamically
created config class is then attached to the `PipelineTaskConfig` via a
`~lsst.pex.config.ConfigField` with the attribute name `connections`.
"""
pass
saveMetadata = pexConfig.Field(
dtype=bool, default=True, optional=False,
doc="Flag to enable/disable metadata saving for a task, enabled by default.")


class ResourceConfig(pexConfig.Config):
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ class has been configured to use this `PipelineTaskConnectionsClass`
>>> config.connections.outputConnection = "TotallyDifferent"
>>> connections = ExampleConnections(config=config)
>>> assert(connections.inputConnection.name == "ModifiedDataset")
>>> assert(connections.outputCOnnection.name == "TotallyDifferent")
>>> assert(connections.outputConnection.name == "TotallyDifferent")
"""

def __init__(self, *, config: 'PipelineTaskConfig' = None):
Expand Down
1 change: 1 addition & 0 deletions python/lsst/pipe/base/graphBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def __init__(self, taskDef: TaskDef, parent: _PipelineScaffolding, datasetTypes:
raise GraphBuilderError(f"Task with label '{taskDef.label}' has dimensions "
f"{self.dimensions} that are not a subset of "
f"the pipeline dimensions {parent.dimensions}.")

# Initialize _DatasetScaffoldingDicts as subsets of the one or two
# corresponding dicts in the parent _PipelineScaffolding.
self.initInputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.initInputs,
Expand Down
45 changes: 34 additions & 11 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from lsst.daf.butler import DatasetType, Registry, SkyPixDimension
from lsst.utils import doImport
from .configOverrides import ConfigOverrides
from .connections import PipelineTaskConnections, iterConnections
from .connections import iterConnections
from .pipelineTask import PipelineTask

from . import pipelineIR
Expand Down Expand Up @@ -87,6 +87,16 @@ def __init__(self, taskName, config, taskClass=None, label=""):
self.label = label
self.connections = config.connections.ConnectionsClass(config=config)

@property
def metadataDatasetName(self):
"""Name of a dataset type for metadata of this task, `None` if
metadata is not to be saved (`str`)
"""
if self.config.saveMetadata:
return self.label + "_metadata"
else:
return None

def __str__(self):
rep = "TaskDef(" + self.taskName
if self.label:
Expand Down Expand Up @@ -212,6 +222,13 @@ def addTask(self, task: Union[PipelineTask, str], label: str):
else:
raise ValueError("task must be either a child class of PipelineTask or a string containing"
" a fully qualified name to one")
if not label:
# in some cases (with command line-generated pipeline) tasks can
# be defined without label which is not acceptable, use task
# _DefaultName in that case
if isinstance(task, str):
task = doImport(task)
label = task._DefaultName
self._pipelineIR.tasks[label] = pipelineIR.TaskIR(label, taskName)

def removeTask(self, label: str):
Expand Down Expand Up @@ -399,15 +416,13 @@ class TaskDatasetTypes:
"""

@classmethod
def fromConnections(cls, connectionsInstance: PipelineTaskConnections, *,
registry: Registry) -> TaskDatasetTypes:
def fromTaskDef(cls, taskDef: TaskDef, *, registry: Registry) -> TaskDatasetTypes:
"""Extract and classify the dataset types from a single `PipelineTask`.
Parameters
----------
connectionsInstance: `PipelineTaskConnections`
An instance of a `PipelineTaskConnections` class for a particular
`PipelineTask`.
taskDef: `TaskDef`
An instance of a `TaskDef` class for a particular `PipelineTask`.
registry: `Registry`
Registry used to construct normalized `DatasetType` objects and
retrieve those that are incomplete.
Expand Down Expand Up @@ -436,18 +451,18 @@ def makeDatasetTypesSet(connectionType):
Notes
-----
This function is a closure over the variables ``registry`` and
``connectionsInstance``.
``taskDef``.
"""
datasetTypes = []
for c in iterConnections(connectionsInstance, connectionType):
for c in iterConnections(taskDef.connections, connectionType):
dimensions = set(getattr(c, 'dimensions', set()))
if "skypix" in dimensions:
try:
datasetType = registry.getDatasetType(c.name)
except LookupError as err:
raise LookupError(
f"DatasetType '{c.name}' referenced by "
f"{type(connectionsInstance).__name__} uses 'skypix' as a dimension "
f"{type(taskDef.connections).__name__} uses 'skypix' as a dimension "
f"placeholder, but does not already exist in the registry. "
f"Note that reference catalog names are now used as the dataset "
f"type name instead of 'ref_cat'."
Expand All @@ -465,12 +480,20 @@ def makeDatasetTypesSet(connectionType):
datasetTypes.append(datasetType)
return frozenset(datasetTypes)

# optionally add output dataset for metadata
outputs = makeDatasetTypesSet("outputs")
if taskDef.metadataDatasetName is not None:
# Metadata is supposed to be of the PropertyList type, its dimensions
# correspond to a task quantum
dimensions = registry.dimensions.extract(taskDef.connections.dimensions)
outputs |= {DatasetType(taskDef.metadataDatasetName, dimensions, "PropertyList")}

return cls(
initInputs=makeDatasetTypesSet("initInputs"),
initOutputs=makeDatasetTypesSet("initOutputs"),
inputs=makeDatasetTypesSet("inputs"),
prerequisites=makeDatasetTypesSet("prerequisiteInputs"),
outputs=makeDatasetTypesSet("outputs"),
outputs=outputs,
)


Expand Down Expand Up @@ -574,7 +597,7 @@ def fromPipeline(cls, pipeline, *, registry: Registry) -> PipelineDatasetTypes:
if isinstance(pipeline, Pipeline):
pipeline = pipeline.toExpandedPipeline()
for taskDef in pipeline:
thisTask = TaskDatasetTypes.fromConnections(taskDef.connections, registry=registry)
thisTask = TaskDatasetTypes.fromTaskDef(taskDef, registry=registry)
allInitInputs.update(thisTask.initInputs)
allInitOutputs.update(thisTask.initOutputs)
allInputs.update(thisTask.inputs)
Expand Down
13 changes: 10 additions & 3 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@
import lsst.pipe.base as pipeBase


class NullConnections(pipeBase.PipelineTaskConnections,
dimensions=()):
pass


class NoResourceTask(pipeBase.PipelineTask):
_DefaultName = "no_resource_task"
ConfigClass = pexConfig.Config
ConfigClass = pipeBase.PipelineTaskConfig


class OneConfig(pexConfig.Config):
class OneConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=NullConnections):
resources = pexConfig.ConfigField(dtype=pipeBase.ResourceConfig,
doc="Resource configuration")

Expand All @@ -45,7 +51,8 @@ class OneTask(pipeBase.PipelineTask):
ConfigClass = OneConfig


class TwoConfig(pexConfig.Config):
class TwoConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=NullConnections):
resources = pexConfig.ConfigField(dtype=pipeBase.ResourceConfig,
doc="Resource configuration")

Expand Down
3 changes: 3 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ def testTaskDef(self):
self.assertIsInstance(task2.config, MultConfig)
self.assertIs(task2.taskClass, MultTask)
self.assertEqual(task2.label, "mult_task")
self.assertEqual(task2.metadataDatasetName, "mult_task_metadata")
task2.config.saveMetadata = False
self.assertIsNone(task2.metadataDatasetName)

def testEmpty(self):
"""Creating empty pipeline
Expand Down

0 comments on commit 204a9c6

Please sign in to comment.