Skip to content

Commit

Permalink
Update checkExistingOutputs to work with a LimitedButler
Browse files Browse the repository at this point in the history
  • Loading branch information
mfisherlevine committed Mar 15, 2024
1 parent a2f6597 commit 3774a40
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
If only partial outputs exist then they are removed if
``clobberOutputs`` is True, otherwise an exception is raised.
The ``LimitedButler`` is used for everything, and should be set to
``self.butler`` if no separate ``LimitedButler`` is available.
Parameters
----------
quantum : `~lsst.daf.butler.Quantum`
Expand All @@ -312,10 +315,6 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
RuntimeError
Raised if some outputs exist and some not.
"""
if not self.butler:
# Skip/prune logic only works for full butler.
return False

if self.skipExisting:
_LOG.debug(
"Checking existence of metadata from previous execution of label=%s dataId=%s.",
Expand All @@ -333,7 +332,7 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
_LOG.debug(
"Looking for existing outputs in the way for label=%s dataId=%s.", taskDef.label, quantum.dataId
)
ref_dict = self.butler.stored_many(chain.from_iterable(quantum.outputs.values()))
ref_dict = limited_butler.stored_many(chain.from_iterable(quantum.outputs.values()))
existingRefs = [ref for ref, exists in ref_dict.items() if exists]
missingRefs = [ref for ref, exists in ref_dict.items() if not exists]
if existingRefs:
Expand All @@ -343,26 +342,28 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
return True
elif self.clobberOutputs:
_LOG.info("Removing complete outputs for quantum %s: %s", quantum, existingRefs)
self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
else:
run = self.butler.run if hasattr(self.butler, 'run') else 'run is n/a on a limited butler'

Check warning on line 347 in python/lsst/ctrl/mpexec/singleQuantumExecutor.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/singleQuantumExecutor.py#L347

Added line #L347 was not covered by tests
raise RuntimeError(
f"Complete outputs exists for a quantum {quantum} "
"and neither clobberOutputs nor skipExisting is set: "
f"collection={self.butler.run} existingRefs={existingRefs}"
f"collection={run} existingRefs={existingRefs}"
)
else:
# Partial outputs from a failed quantum.
run = self.butler.run if hasattr(self.butler, 'run') else 'run is n/a on a limited butler'
_LOG.debug(
"Partial outputs exist for quantum %s collection=%s existingRefs=%s missingRefs=%s",
quantum,
self.butler.run,
run,
existingRefs,
missingRefs,
)
if self.clobberOutputs:
# only prune
_LOG.info("Removing partial outputs for task %s: %s", taskDef, existingRefs)
self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
return False
else:
raise RuntimeError(
Expand Down

0 comments on commit 3774a40

Please sign in to comment.