Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-37786: allow PipelineTasks to control default dataset-query-constraint behavior. #304

Merged
merged 6 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10"]
python-version: ["3.10", "3.11"]

steps:
- uses: actions/checkout@v3
Expand Down
1 change: 1 addition & 0 deletions doc/changes/DM-37786.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix an error message that says that repository state has changed during QuantumGraph generation when init input datasets are just missing.
1 change: 1 addition & 0 deletions doc/changes/DM-37786.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow `PipelineTasks` to provide defaults for the `--dataset-query-constraints` option for the `pipe_task` tool.
56 changes: 56 additions & 0 deletions python/lsst/pipe/base/connectionTypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,62 @@ def __post_init__(self) -> None:

@dataclasses.dataclass(frozen=True)
class Input(BaseInput):
"""Class used for declaring PipelineTask input connections

Parameters
----------
name : `str`
The default name used to identify the dataset type
storageClass : `str`
The storage class used when (un)/persisting the dataset type
multiple : `bool`
Indicates if this connection should expect to contain multiple objects
of the given dataset type. Tasks with more than one connection with
``multiple=True`` with the same dimensions may want to implement
`PipelineTaskConnections.adjustQuantum` to ensure those datasets are
consistent (i.e. zip-iterable) in `PipelineTask.runQuantum` and notify
the execution system as early as possible of outputs that will not be
produced because the corresponding input is missing.
dimensions : iterable of `str`
The `lsst.daf.butler.Butler` `lsst.daf.butler.Registry` dimensions used
to identify the dataset type identified by the specified name
deferLoad : `bool`
Indicates that this dataset type will be loaded as a
`lsst.daf.butler.DeferredDatasetHandle`. PipelineTasks can use this
object to load the object at a later time.
minimum : `bool`
Minimum number of datasets required for this connection, per quantum.
This is checked in the base implementation of
`PipelineTaskConnections.adjustQuantum`, which raises `NoWorkFound` if
the minimum is not met for `Input` connections (causing the quantum to
be pruned, skipped, or never created, depending on the context), and
`FileNotFoundError` for `PrerequisiteInput` connections (causing
QuantumGraph generation to fail). `PipelineTask` implementations may
provide custom `~PipelineTaskConnections.adjustQuantum` implementations
for more fine-grained or configuration-driven constraints, as long as
they are compatible with this minium.
deferGraphConstraint: `bool`, optional
If `True`, do not include this dataset type's existence in the initial
query that starts the QuantumGraph generation process. This can be
used to make QuantumGraph generation faster by avoiding redundant
datasets, and in certain cases it can (along with careful attention to
which tasks are included in the same QuantumGraph) be used to work
around the QuantumGraph generation algorithm's inflexible handling of
spatial overlaps. This option has no effect when the connection is not
an overall input of the pipeline (or subset thereof) for which a graph
is being created, and it never affects the ordering of quanta.

Raises
------
TypeError
Raised if ``minimum`` is greater than one but ``multiple=False``.
NotImplementedError
Raised if ``minimum`` is zero for a regular `Input` connection; this
is not currently supported by our QuantumGraph generation algorithm.
"""

deferGraphConstraint: bool = False

def __post_init__(self) -> None:
super().__post_init__()
if self.minimum == 0:
Expand Down
36 changes: 31 additions & 5 deletions python/lsst/pipe/base/graphBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
DimensionGraph,
DimensionUniverse,
NamedKeyDict,
NamedValueSet,
Quantum,
Registry,
)
Expand Down Expand Up @@ -594,6 +595,7 @@ def __init__(self, pipeline: Union[Pipeline, Iterable[TaskDef]], *, registry: Re
attr,
_DatasetDict.fromDatasetTypes(getattr(datasetTypes, attr), universe=registry.dimensions),
)
self.defaultDatasetQueryConstraints = datasetTypes.queryConstraints
# Aggregate all dimensions for all non-init, non-prerequisite
# DatasetTypes. These are the ones we'll include in the big join
# query.
Expand Down Expand Up @@ -655,6 +657,11 @@ def __repr__(self) -> str:
per-Quantum when generating the graph (`_DatasetDict`).
"""

defaultDatasetQueryConstraints: NamedValueSet[DatasetType]
"""Datasets that should be used as constraints in the initial query,
according to tasks (`NamedValueSet`).
"""

dimensions: DimensionGraph
"""All dimensions used by any regular input, intermediate, or output
(not prerequisite) dataset; the set of dimension used in the "Big Join
Expand Down Expand Up @@ -724,16 +731,22 @@ def connectDataIds(
# inputs and outputs. We limit the query to only dimensions that are
# associated with the input dataset types, but don't (yet) try to
# obtain the dataset_ids for those inputs.
_LOG.debug("Submitting data ID query and materializing results.")
_LOG.debug(
"Submitting data ID query over dimensions %s and materializing results.",
list(self.dimensions.names),
)
queryArgs: Dict[str, Any] = {
"dimensions": self.dimensions,
"where": userQuery,
"dataId": externalDataId,
"bind": bind,
}
if datasetQueryConstraint == DatasetQueryConstraintVariant.ALL:
_LOG.debug("Constraining graph query using all datasets in pipeline.")
queryArgs["datasets"] = list(self.inputs)
_LOG.debug(
"Constraining graph query using default of %s.",
list(self.defaultDatasetQueryConstraints.names),
)
queryArgs["datasets"] = list(self.defaultDatasetQueryConstraints)
queryArgs["collections"] = collections
elif datasetQueryConstraint == DatasetQueryConstraintVariant.OFF:
_LOG.debug("Not using dataset existence to constrain query.")
Expand Down Expand Up @@ -766,7 +779,6 @@ def connectDataIds(
# quanta and then connecting them to each other.
n = -1
for n, commonDataId in enumerate(commonDataIds):
_LOG.debug("Next DataID = %s", commonDataId)
# Create DatasetRefs for all DatasetTypes from this result row,
# noting that we might have created some already.
# We remember both those that already existed and those that we
Expand All @@ -783,7 +795,6 @@ def connectDataIds(
ref = refs.get(datasetDataId)
if ref is None:
ref = DatasetRef(datasetType, datasetDataId)
_LOG.debug("Made new ref = %s", ref)
refs[datasetDataId] = ref
refsForRow[datasetType.name] = ref
# Create _QuantumScaffolding objects for all tasks from this
Expand Down Expand Up @@ -924,6 +935,16 @@ def resolveDatasetRefs(

resolvedRefQueryResults: Iterable[DatasetRef]

# Updating constrainedByAllDatasets here is not ideal, but we have a
# few different code paths that each transfer different pieces of
# information about what dataset query constraints were applied here,
# and none of them has the complete picture until we get here. We're
# long overdue for a QG generation rewrite that will make this go away
# entirely anyway.
constrainedByAllDatasets = (
constrainedByAllDatasets and self.defaultDatasetQueryConstraints == self.inputs.keys()
)

# Look up [init] intermediate and output datasets in the output
# collection, if there is an output collection.
if run_exists or skip_collections_wildcard is not None:
Expand Down Expand Up @@ -1034,6 +1055,11 @@ def resolveDatasetRefs(
"or the input collections have been modified since "
"QuantumGraph generation began."
)
elif not datasetType.dimensions:
raise RuntimeError(
f"Dataset {datasetType.name!r} (with no dimensions) could not be found in "
f"collections {collections}."
)
else:
# if the common dataIds were not constrained using all the
# input dataset types, it is possible that some data ids
Expand Down
32 changes: 30 additions & 2 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from .config import PipelineTaskConfig
from .configOverrides import ConfigOverrides
from .connections import iterConnections
from .connectionTypes import Input
from .pipelineTask import PipelineTask
from .task import _TASK_METADATA_TYPE

Expand Down Expand Up @@ -856,6 +857,12 @@ class TaskDatasetTypes:
at the Pipeline level.
"""

queryConstraints: NamedValueSet[DatasetType]
"""Regular inputs that should not be used as constraints on the initial
QuantumGraph generation data ID query, according to their tasks
(`NamedValueSet`).
"""

prerequisites: NamedValueSet[DatasetType]
"""Dataset types that are prerequisite inputs to this Task.

Expand Down Expand Up @@ -1094,10 +1101,18 @@ def makeDatasetTypesSet(

outputs.freeze()

inputs = makeDatasetTypesSet("inputs", is_input=True)
queryConstraints = NamedValueSet(
inputs[c.name]
for c in cast(Iterable[Input], iterConnections(taskDef.connections, "inputs"))
if not c.deferGraphConstraint
)

return cls(
initInputs=makeDatasetTypesSet("initInputs", is_input=True),
initOutputs=initOutputs,
inputs=makeDatasetTypesSet("inputs", is_input=True),
inputs=inputs,
queryConstraints=queryConstraints,
prerequisites=makeDatasetTypesSet("prerequisiteInputs", is_input=True),
outputs=outputs,
)
Expand Down Expand Up @@ -1144,6 +1159,12 @@ class PipelineDatasetTypes:
produced.
"""

queryConstraints: NamedValueSet[DatasetType]
"""Regular inputs that should be used as constraints on the initial
QuantumGraph generation data ID query, according to their tasks
(`NamedValueSet`).
"""

prerequisites: NamedValueSet[DatasetType]
"""Dataset types that are prerequisite inputs for the full Pipeline.

Expand Down Expand Up @@ -1216,6 +1237,7 @@ def fromPipeline(
allInitInputs = NamedValueSet[DatasetType]()
allInitOutputs = NamedValueSet[DatasetType]()
prerequisites = NamedValueSet[DatasetType]()
queryConstraints = NamedValueSet[DatasetType]()
byTask = dict()
if include_packages:
allInitOutputs.add(
Expand Down Expand Up @@ -1244,6 +1266,9 @@ def fromPipeline(
allInitInputs.update(thisTask.initInputs)
allInitOutputs.update(thisTask.initOutputs)
allInputs.update(thisTask.inputs)
# Inputs are query constraints if any task considers them a query
# constraint.
queryConstraints.update(thisTask.queryConstraints)
prerequisites.update(thisTask.prerequisites)
allOutputs.update(thisTask.outputs)
byTask[taskDef.label] = thisTask
Expand Down Expand Up @@ -1296,11 +1321,14 @@ def frozen(s: AbstractSet[DatasetType]) -> NamedValueSet[DatasetType]:
s.freeze()
return s

inputs = frozen(allInputs - allOutputs - intermediateComponents)

return cls(
initInputs=frozen(allInitInputs - allInitOutputs),
initIntermediates=frozen(allInitInputs & allInitOutputs),
initOutputs=frozen(allInitOutputs - allInitInputs),
inputs=frozen(allInputs - allOutputs - intermediateComponents),
inputs=inputs,
queryConstraints=frozen(queryConstraints & inputs),
# If there are storage class differences in inputs and outputs
# the intermediates have to choose priority. Here choose that
# inputs to tasks much match the requested storage class by
Expand Down