Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-22162: Add metadata writing to PipelineTask execution logic (pipe_base) #110

Merged
merged 2 commits into from
Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/lsst/pipe/base/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ class to allow configuration of the connections class. This dynamically
created config class is then attached to the `PipelineTaskConfig` via a
`~lsst.pex.config.ConfigField` with the attribute name `connections`.
"""
pass
saveMetadata = pexConfig.Field(
dtype=bool, default=True, optional=False,
doc="Flag to enable/disable metadata saving for a task, enabled by default.")


class ResourceConfig(pexConfig.Config):
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ class has been configured to use this `PipelineTaskConnectionsClass`
>>> config.connections.outputConnection = "TotallyDifferent"
>>> connections = ExampleConnections(config=config)
>>> assert(connections.inputConnection.name == "ModifiedDataset")
>>> assert(connections.outputCOnnection.name == "TotallyDifferent")
>>> assert(connections.outputConnection.name == "TotallyDifferent")
"""

def __init__(self, *, config: 'PipelineTaskConfig' = None):
Expand Down
1 change: 1 addition & 0 deletions python/lsst/pipe/base/graphBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def __init__(self, taskDef: TaskDef, parent: _PipelineScaffolding, datasetTypes:
raise GraphBuilderError(f"Task with label '{taskDef.label}' has dimensions "
f"{self.dimensions} that are not a subset of "
f"the pipeline dimensions {parent.dimensions}.")

# Initialize _DatasetScaffoldingDicts as subsets of the one or two
# corresponding dicts in the parent _PipelineScaffolding.
self.initInputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.initInputs,
Expand Down
45 changes: 34 additions & 11 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from lsst.daf.butler import DatasetType, Registry, SkyPixDimension
from lsst.utils import doImport
from .configOverrides import ConfigOverrides
from .connections import PipelineTaskConnections, iterConnections
from .connections import iterConnections
from .pipelineTask import PipelineTask

from . import pipelineIR
Expand Down Expand Up @@ -87,6 +87,16 @@ def __init__(self, taskName, config, taskClass=None, label=""):
self.label = label
self.connections = config.connections.ConnectionsClass(config=config)

@property
def metadataDatasetName(self):
"""Name of a dataset type for metadata of this task, `None` if
metadata is not to be saved (`str`)
"""
if self.config.saveMetadata:
return self.label + "_metadata"
else:
return None

def __str__(self):
rep = "TaskDef(" + self.taskName
if self.label:
Expand Down Expand Up @@ -212,6 +222,13 @@ def addTask(self, task: Union[PipelineTask, str], label: str):
else:
raise ValueError("task must be either a child class of PipelineTask or a string containing"
" a fully qualified name to one")
if not label:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually Thinking about this now, I dont think is possible with the new pipeline object to have a task that does not have a label specified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm looking at the code in ctrl_mpexec for task with no label, I didnt remove the ? in the regex, and or change make pipeline to use the task name if there is no label. This will lead to weird/broken behavior. If you dont want to change on this ticket, that's fine. I can make a ticket and fix that behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't want to force users to always provide label on the command line so if label is missing then it should come from _DefaultName. To know _DefaultName I need to import task class and this is where I thought is the most natural place for it. I do not want to import anything when command line is parsed, and another potential place for that is in CmdLineFwk class but I think that if I do it here it will be more generic. Of course if you say that Pipeline.addTask method has to receive non-empty label then I'd simply add a check here and move doImport to CmdLineFwk instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@natelust, let me know if you want me to move that doImport to CmdLineFwk before I merge both branches, should be easy for me to do, certainly faster than opening another ticket.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is fine here, I really dont like the doImport in either place, as it is done again later. One thing we talked about doing in the future is not having a _DefaultName at all, and using the name of the task in places where _DefaultName would have been used. How would you feel about just using the string name of the class for the label here? I think @TallJimbo might have had and opinion as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with using class name instead of _DefaultName, but I know that _DefaultName has a long history and this should probably be discussed with wider audience. Just tell me what to do, I'll do it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets stick with _DefaultName for now. I'd like to replace that with the unqualified Task name or something derived from it eventually (and then take advantage of that to e.g. avoid the doImport here), but until we've done that more globally, using the unqualified Task name here just exacerbates the problem of having too many names for a Task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll merge it as it is now.

# in some cases (with command line-generated pipeline) tasks can
# be defined without label which is not acceptable, use task
# _DefaultName in that case
if isinstance(task, str):
task = doImport(task)
label = task._DefaultName
self._pipelineIR.tasks[label] = pipelineIR.TaskIR(label, taskName)

def removeTask(self, label: str):
Expand Down Expand Up @@ -399,15 +416,13 @@ class TaskDatasetTypes:
"""

@classmethod
def fromConnections(cls, connectionsInstance: PipelineTaskConnections, *,
registry: Registry) -> TaskDatasetTypes:
def fromTaskDef(cls, taskDef: TaskDef, *, registry: Registry) -> TaskDatasetTypes:
"""Extract and classify the dataset types from a single `PipelineTask`.

Parameters
----------
connectionsInstance: `PipelineTaskConnections`
An instance of a `PipelineTaskConnections` class for a particular
`PipelineTask`.
taskDef: `TaskDef`
An instance of a `TaskDef` class for a particular `PipelineTask`.
registry: `Registry`
Registry used to construct normalized `DatasetType` objects and
retrieve those that are incomplete.
Expand Down Expand Up @@ -436,18 +451,18 @@ def makeDatasetTypesSet(connectionType):
Notes
-----
This function is a closure over the variables ``registry`` and
``connectionsInstance``.
``taskDef``.
"""
datasetTypes = []
for c in iterConnections(connectionsInstance, connectionType):
for c in iterConnections(taskDef.connections, connectionType):
dimensions = set(getattr(c, 'dimensions', set()))
if "skypix" in dimensions:
try:
datasetType = registry.getDatasetType(c.name)
except LookupError as err:
raise LookupError(
f"DatasetType '{c.name}' referenced by "
f"{type(connectionsInstance).__name__} uses 'skypix' as a dimension "
f"{type(taskDef.connections).__name__} uses 'skypix' as a dimension "
f"placeholder, but does not already exist in the registry. "
f"Note that reference catalog names are now used as the dataset "
f"type name instead of 'ref_cat'."
Expand All @@ -465,12 +480,20 @@ def makeDatasetTypesSet(connectionType):
datasetTypes.append(datasetType)
return frozenset(datasetTypes)

# optionally add output dataset for metadata
outputs = makeDatasetTypesSet("outputs")
if taskDef.metadataDatasetName is not None:
# Metadata is supposed to be of the PropertyList type, its dimensions
# correspond to a task quantum
dimensions = registry.dimensions.extract(taskDef.connections.dimensions)
outputs |= {DatasetType(taskDef.metadataDatasetName, dimensions, "PropertyList")}

return cls(
initInputs=makeDatasetTypesSet("initInputs"),
initOutputs=makeDatasetTypesSet("initOutputs"),
inputs=makeDatasetTypesSet("inputs"),
prerequisites=makeDatasetTypesSet("prerequisiteInputs"),
outputs=makeDatasetTypesSet("outputs"),
outputs=outputs,
)


Expand Down Expand Up @@ -574,7 +597,7 @@ def fromPipeline(cls, pipeline, *, registry: Registry) -> PipelineDatasetTypes:
if isinstance(pipeline, Pipeline):
pipeline = pipeline.toExpandedPipeline()
for taskDef in pipeline:
thisTask = TaskDatasetTypes.fromConnections(taskDef.connections, registry=registry)
thisTask = TaskDatasetTypes.fromTaskDef(taskDef, registry=registry)
allInitInputs.update(thisTask.initInputs)
allInitOutputs.update(thisTask.initOutputs)
allInputs.update(thisTask.inputs)
Expand Down
13 changes: 10 additions & 3 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@
import lsst.pipe.base as pipeBase


class NullConnections(pipeBase.PipelineTaskConnections,
dimensions=()):
pass


class NoResourceTask(pipeBase.PipelineTask):
_DefaultName = "no_resource_task"
ConfigClass = pexConfig.Config
ConfigClass = pipeBase.PipelineTaskConfig


class OneConfig(pexConfig.Config):
class OneConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=NullConnections):
resources = pexConfig.ConfigField(dtype=pipeBase.ResourceConfig,
doc="Resource configuration")

Expand All @@ -45,7 +51,8 @@ class OneTask(pipeBase.PipelineTask):
ConfigClass = OneConfig


class TwoConfig(pexConfig.Config):
class TwoConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=NullConnections):
resources = pexConfig.ConfigField(dtype=pipeBase.ResourceConfig,
doc="Resource configuration")

Expand Down
3 changes: 3 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ def testTaskDef(self):
self.assertIsInstance(task2.config, MultConfig)
self.assertIs(task2.taskClass, MultTask)
self.assertEqual(task2.label, "mult_task")
self.assertEqual(task2.metadataDatasetName, "mult_task_metadata")
task2.config.saveMetadata = False
self.assertIsNone(task2.metadataDatasetName)

def testEmpty(self):
"""Creating empty pipeline
Expand Down