Skip to content

Commit

Permalink
Merge pull request #345 from lsst/tickets/DM-39672
Browse files Browse the repository at this point in the history
DM-39672: fix QG bugs involving skip_existing_in
  • Loading branch information
TallJimbo committed Jun 21, 2023
2 parents 9867a7d + e79cf3c commit f9b488b
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 20 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-39672.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug in `QuantumGraph` generation that could result in datasets from `skip_existing_in` collections being used as outputs, and another that prevented `QuantumGraph` generation when a `skip_existing_in` collection has some outputs from a failed quantum.
34 changes: 17 additions & 17 deletions python/lsst/pipe/base/graphBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,10 +645,12 @@ def resolveRef(self, dataset_type: DatasetType, data_id: DataCoordinate) -> Data
self.resolved[key] = resolved
return resolved

def resolveDict(self, dataset_type: DatasetType, refs: dict[DataCoordinate, _RefHolder]) -> None:
def resolveDict(
self, dataset_type: DatasetType, refs: dict[DataCoordinate, _RefHolder], is_output: bool
) -> None:
"""Resolve all unresolved references in the provided dictionary."""
for data_id, holder in refs.items():
if holder.ref is None:
if holder.ref is None or (is_output and holder.ref.run != self.run):
holder.ref = self.resolveRef(holder.dataset_type, data_id)


Expand Down Expand Up @@ -1213,7 +1215,7 @@ def resolveDatasetRefs(
# Resolve the missing refs, just so they look like all of the others;
# in the end other code will make sure they never appear in the QG.
for dataset_type, refDict in self.missing.items():
idMaker.resolveDict(dataset_type, refDict)
idMaker.resolveDict(dataset_type, refDict, is_output=False)

# Copy the resolved DatasetRefs to the _QuantumScaffolding objects,
# replacing the unresolved refs there, and then look up prerequisites.
Expand Down Expand Up @@ -1258,7 +1260,7 @@ def resolveDatasetRefs(
continue
else:
dataIdsFailed.append(quantum.dataId)
if not clobberOutputs:
if not clobberOutputs and run_exists:
raise OutputExistsError(
f"Quantum {quantum.dataId} of task with label "
f"'{quantum.task.taskDef.label}' has some outputs that exist "
Expand Down Expand Up @@ -1331,14 +1333,16 @@ def resolveDatasetRefs(
task.prerequisites[datasetType][ref.dataId] = _RefHolder(datasetType, ref)

# Resolve all quantum inputs and outputs.
for datasetDict in (quantum.inputs, quantum.outputs):
for dataset_type, refDict in datasetDict.items():
idMaker.resolveDict(dataset_type, refDict)
for dataset_type, refDict in quantum.inputs.items():
idMaker.resolveDict(dataset_type, refDict, is_output=False)
for dataset_type, refDict in quantum.outputs.items():
idMaker.resolveDict(dataset_type, refDict, is_output=True)

# Resolve task initInputs and initOutputs.
for datasetDict in (task.initInputs, task.initOutputs):
for dataset_type, refDict in datasetDict.items():
idMaker.resolveDict(dataset_type, refDict)
for dataset_type, refDict in task.initInputs.items():
idMaker.resolveDict(dataset_type, refDict, is_output=False)
for dataset_type, refDict in task.initOutputs.items():
idMaker.resolveDict(dataset_type, refDict, is_output=True)

# Actually remove any quanta that we decided to skip above.
if dataIdsSucceeded:
Expand All @@ -1351,25 +1355,21 @@ def resolveDatasetRefs(
)
for dataId in dataIdsSucceeded:
del task.quanta[dataId]
elif clobberOutputs:
elif clobberOutputs and run_exists:
_LOG.info(
"Found %d successful quanta for task with label '%s' "
"that will need to be clobbered during execution.",
len(dataIdsSucceeded),
task.taskDef.label,
)
else:
raise AssertionError("OutputExistsError should have already been raised.")
if dataIdsFailed:
if clobberOutputs:
if clobberOutputs and run_exists:
_LOG.info(
"Found %d failed/incomplete quanta for task with label '%s' "
"that will need to be clobbered during execution.",
len(dataIdsFailed),
task.taskDef.label,
)
else:
raise AssertionError("OutputExistsError should have already been raised.")

# Collect initOutputs that do not belong to any task.
global_dataset_types: set[DatasetType] = set(self.initOutputs)
Expand All @@ -1378,7 +1378,7 @@ def resolveDatasetRefs(
if global_dataset_types:
self.globalInitOutputs = _DatasetDict.fromSubset(global_dataset_types, self.initOutputs)
for dataset_type, refDict in self.globalInitOutputs.items():
idMaker.resolveDict(dataset_type, refDict)
idMaker.resolveDict(dataset_type, refDict, is_output=True)

def makeQuantumGraph(
self,
Expand Down
7 changes: 4 additions & 3 deletions python/lsst/pipe/base/tests/mocks/_pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
def mock_task_defs(
originals: Iterable[TaskDef],
unmocked_dataset_types: Iterable[str] = (),
force_failures: Mapping[str, tuple[str, type[Exception]]] | None = None,
force_failures: Mapping[str, tuple[str, type[Exception] | None]] | None = None,
) -> list[TaskDef]:
"""Create mocks for an iterable of TaskDefs.
Expand All @@ -61,7 +61,7 @@ def mock_task_defs(
Names of overall-input dataset types that should not be replaced with
mocks.
force_failures : `~collections.abc.Mapping` [ `str`, `tuple` [ `str`, \
`type` [ `Exception` ] ] ]
`type` [ `Exception` ] or `None` ] ]
Mapping from original task label to a 2-tuple indicating that some
quanta should raise an exception when executed. The first entry is a
data ID match using the butler expression language (i.e. a string of
Expand All @@ -87,7 +87,8 @@ def mock_task_defs(
if original_task_def.label in force_failures:
condition, exception_type = force_failures[original_task_def.label]
config.fail_condition = condition
config.fail_exception = get_full_type_name(exception_type)
if exception_type is not None:
config.fail_exception = get_full_type_name(exception_type)
mock_task_def = TaskDef(
config=config, taskClass=MockPipelineTask, label=get_mock_name(original_task_def.label)
)
Expand Down

0 comments on commit f9b488b

Please sign in to comment.