Skip to content

Commit

Permalink
Improve handling of clobbering in SingleQuantumExecutor (DM-39122)
Browse files Browse the repository at this point in the history
SingleQuantumExecutor is updated to not use skip_existing_in collections but
it now uses extend_run to implement an improved dataset clobbering behavior.
It was possible previously to clobber complete output of a quantum when
passing inconsistent arguments (empty skip_existing_in and clobber_outputs=True).
The code is updated to only clobber partial outputs when both extend_run and
clobber_outputs is True. Exception is raised for partial outputs if extend_run
is True but clobber_outputs is False.
  • Loading branch information
andy-slac committed May 15, 2023
1 parent 8d21bf0 commit 18faa70
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 134 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-39122.api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`SingleQuantumExecutor` constructor drops `skipExistingIn` parameter and adds `extendRun` parameter.
1 change: 1 addition & 0 deletions doc/changes/DM-39122.doc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Command line help for `pipetask run` is updated to reflect its correct clobbering behavior.
3 changes: 3 additions & 0 deletions python/lsst/ctrl/mpexec/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ def run(ctx: click.Context, **kwargs: Any) -> None:
file=sys.stderr,
)
return
# "run" script does not support full set of options.
del kwargs["skip_existing"]
del kwargs["skip_existing_in"]
script.run(qgraphObj=qgraph, **kwargs)
finally:
if coverage:
Expand Down
20 changes: 9 additions & 11 deletions python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@

extend_run_option = MWOptionDecorator(
"--extend-run",
help=unwrap(
"""Instead of creating a new RUN collection, insert datasets
into either the one given by --output-run (if provided) or
the first child collection of --output (which must be of
type RUN). This also enables --skip-existing option."""
help=(
"Instead of creating a new RUN collection, insert datasets into either the one given by "
"--output-run (if provided) or the first child collection of --output (which must be of type RUN). "
"This also enables --skip-existing option when building a graph. "
"When executing a graph this option skips quanta with all existing outputs."
),
is_flag=True,
)
Expand Down Expand Up @@ -344,12 +344,10 @@

clobber_outputs_option = MWOptionDecorator(
"--clobber-outputs",
help=unwrap(
"""Remove outputs from previous execution of the same
quantum before new execution. If --skip-existing
is also passed, then only failed quanta will be
clobbered. Requires the 'run' command's --extend-run
flag to be set."""
help=(
"Remove outputs of failed quanta from the output run when they would block the execution of new "
"quanta with the same data ID (or assume that this will be done, if just building a QuantumGraph). "
"Does nothing if --extend-run is not passed."
),
is_flag=True,
)
Expand Down
8 changes: 0 additions & 8 deletions python/lsst/ctrl/mpexec/cli/script/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,6 @@ def run( # type: ignore
removing them and the RUN completely ("purge"). Requires `replace_run`.
data_query : `str`
User query selection expression.
skip_existing_in : `list` [ `str` ]
Accepts list of collections, if all Quantum outputs already exist in
the specified list of collections then that Quantum will be excluded
from the QuantumGraph.
skip_existing : `bool`
Appends output RUN collection to the ``skip_existing_in`` list.
debug : `bool`
If true, enable debugging output using lsstDebug facility (imports
debug.py).
Expand Down Expand Up @@ -177,8 +171,6 @@ def run( # type: ignore
replace_run=replace_run,
prune_replaced=prune_replaced,
data_query=data_query,
skip_existing_in=skip_existing_in,
skip_existing=skip_existing,
enableLsstDebug=debug,
fail_fast=fail_fast,
clobber_outputs=clobber_outputs,
Expand Down
9 changes: 1 addition & 8 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,10 +690,6 @@ def runPipeline(
"To update graph metadata run `pipetask update-graph-run` command."
)

# make sure that --extend-run always enables --skip-existing
if args.extend_run:
args.skip_existing = True

if not args.enable_implicit_threading:
disable_implicit_threading()

Expand All @@ -702,9 +698,6 @@ def runPipeline(
if butler is None:
butler = _ButlerFactory.makeWriteButler(args, graph.iterTaskGraph())

if args.skip_existing:
args.skip_existing_in += (butler.run,)

# Enable lsstDebug debugging. Note that this is done once in the
# main process before PreExecInit and it is also repeated before
# running each task in SingleQuantumExecutor (which may not be
Expand All @@ -730,7 +723,7 @@ def runPipeline(
quantumExecutor = SingleQuantumExecutor(
butler,
taskFactory,
skipExistingIn=args.skip_existing_in,
extendRun=args.extend_run,
clobberOutputs=args.clobber_outputs,
enableLsstDebug=args.enableLsstDebug,
exitOnKnownError=args.fail_fast,
Expand Down
14 changes: 11 additions & 3 deletions python/lsst/ctrl/mpexec/separablePipelineExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,15 @@ class SeparablePipelineExecutor:
butler : `lsst.daf.butler.Butler`
A Butler whose ``collections`` and ``run`` attributes contain the input
and output collections to use for processing.
extend_run : `bool`, optional
True if extending an existing run instead of creating a new empty run.
Quanta with complete outputs will be skipped. Partial outputs from
failed quanta will be removed if ``clobberOutputs`` is `True`,
otherwise an exception will be raised.
clobber_output : `bool`, optional
If set, the pipeline execution overwrites existing output files.
Otherwise, any conflict between existing and new outputs is an error.
If set, the pipeline execution overwrites existing output files from
previously failed quanta. Otherwise, any conflict between existing and
new outputs is an error.
skip_existing_in : iterable [`str`], optional
If not empty, the pipeline execution searches the listed collections
for existing outputs, and skips any quanta that have run to completion
Expand All @@ -100,6 +106,7 @@ class SeparablePipelineExecutor:
def __init__(
self,
butler: Butler,
extend_run: bool = False,
clobber_output: bool = False,
skip_existing_in: Iterable[str] | None = None,
task_factory: lsst.pipe.base.TaskFactory | None = None,
Expand All @@ -110,6 +117,7 @@ def __init__(
if not self._butler.run:
raise ValueError("Butler must specify output run for pipeline.")

self._extend_run = extend_run
self._clobber_output = clobber_output
self._skip_existing_in = list(skip_existing_in) if skip_existing_in else []

Expand Down Expand Up @@ -251,7 +259,7 @@ def run_pipeline(
quantum_executor = SingleQuantumExecutor(
self._butler,
self._task_factory,
skipExistingIn=self._skip_existing_in,
extendRun=self._extend_run,
clobberOutputs=self._clobber_output,
)
graph_executor = MPGraphExecutor(
Expand Down
118 changes: 26 additions & 92 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from collections import defaultdict
from collections.abc import Callable
from itertools import chain
from typing import Any, Optional, Union
from typing import Any, Optional

from lsst.daf.butler import Butler, DatasetRef, DatasetType, LimitedButler, NamedKeyDict, Quantum
from lsst.pipe.base import (
Expand Down Expand Up @@ -78,17 +78,15 @@ class SingleQuantumExecutor(QuantumExecutor):
instead.
taskFactory : `~lsst.pipe.base.TaskFactory`
Instance of a task factory.
skipExistingIn : `list` [ `str` ], optional
Accepts list of collections, if all Quantum outputs already exist in
the specified list of collections then that Quantum will not be rerun.
If `None` then butler output RUN collection is searched for existing
datasets. If empty list then there no check for existing outputs (which
could result in conflicts when datasets are stored).
extendRun : bool, optional
True if execution is extending an existing run instead of creating a
new empty run. Quanta with complete outputs will be skipped. Partial
outputs from failed quanta will be removed if ``clobberOutputs`` is
`True`, otherwise an exception will be raised.
clobberOutputs : `bool`, optional
If `True`, then existing qauntum outputs in output run collection will
be removed prior to executing a quantum. If ``skipExistingIn`` is
defined, only partial outputs from failed quanta will be overwritten
(see notes). Only used when ``butler`` is not `None`.
If `True`, then existing partial outputs from failed quanta in output
run collection will be removed prior to executing a quantum. Only used
when ``extendRun`` is `True` and ``butler`` is not `None`.
enableLsstDebug : `bool`, optional
Enable debugging with ``lsstDebug`` facility for a task.
exitOnKnownError : `bool`, optional
Expand All @@ -104,32 +102,13 @@ class SingleQuantumExecutor(QuantumExecutor):
A method that creates a `~lsst.daf.butler.LimitedButler` instance
for a given Quantum. This parameter must be defined if ``butler`` is
`None`. If ``butler`` is not `None` then this parameter is ignored.
Notes
-----
There is a non-trivial interaction between ``skipExistingIn`` and
``clobberOutputs`` areguments. Here is how they work together:
- If ``skipExistingIn`` is specified (or `None`) then those collections
are searched for quantum output datasets. If all outputs are found, then
quantum is not executed and `run` completes successfully.
- Otherwise if ``clobberOutputs`` is `True` then butler output RUN
collection is checked for existing quantum outputs. If full or partial
outputs are found, they are are pruned and quantum is executed.
- Otherwise if ``clobberOutputs`` is `False` then butler output RUN
collection is checked for existing quantum outputs. If any output
dataset is found an exception is raised.
This leaves the case when partial quantum outputs may be found in
``skipExistingIn`` but that list does not include butler RUN collection.
Those partial outputs are not prunned.
"""

def __init__(
self,
butler: Butler | None,
taskFactory: TaskFactory,
skipExistingIn: list[str] | None = None,
extendRun: bool = False,
clobberOutputs: bool = False,
enableLsstDebug: bool = False,
exitOnKnownError: bool = False,
Expand All @@ -139,7 +118,7 @@ def __init__(
):
self.butler = butler
self.taskFactory = taskFactory
self.skipExistingIn = skipExistingIn
self.extendRun = extendRun
self.enableLsstDebug = enableLsstDebug
self.clobberOutputs = clobberOutputs
self.exitOnKnownError = exitOnKnownError
Expand Down Expand Up @@ -310,77 +289,34 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
Returns
-------
exist : `bool`
`True` if ``self.skipExistingIn`` is defined, and a previous
execution of this quanta appears to have completed successfully
(either because metadata was written or all datasets were written).
`True` if ``self.extendRun`` is defined, and a previous execution
of this quanta appears to have completed successfully (either
because metadata was written or all datasets were written).
`False` otherwise.
Raises
------
RuntimeError
Raised if some outputs exist and some not.
"""
if self.skipExistingIn and taskDef.metadataDatasetName is not None:
if not self.extendRun:
# If not extending assume nothing should exist.
return False

if taskDef.metadataDatasetName is not None:
# Metadata output exists; this is sufficient to assume the previous
# run was successful and should be skipped.
[metadata_ref] = quantum.outputs[taskDef.metadataDatasetName]
if metadata_ref is not None:
if limited_butler.datastore.exists(metadata_ref):
return True

# Previously we always checked for existing outputs in `butler.run`,
# now logic gets more complicated as we only want to skip quantum
# whose outputs exist in `self.skipExistingIn` but pruning should only
# be done for outputs existing in `butler.run`.

def findOutputs(
collections: Optional[Union[str, list[str]]]
) -> tuple[list[DatasetRef], list[DatasetRef]]:
"""Find quantum outputs in specified collections."""
existingRefs = []
missingRefs = []
for datasetRefs in quantum.outputs.values():
checkRefs: list[DatasetRef] = []
registryRefToQuantumRef: dict[DatasetRef, DatasetRef] = {}
for datasetRef in datasetRefs:
if self.butler is not None:
# If running with full butler check registry.
ref = self.butler.registry.findDataset(
datasetRef.datasetType, datasetRef.dataId, collections=collections
)
else:
# In case of QBB assume it must be there.
ref = datasetRef
if ref is None:
missingRefs.append(datasetRef)
else:
checkRefs.append(ref)
registryRefToQuantumRef[ref] = datasetRef

# More efficient to ask the datastore in bulk for ref
# existence rather than one at a time.
existence = limited_butler.datastore.mexists(checkRefs)
for ref, exists in existence.items():
if exists:
existingRefs.append(ref)
else:
missingRefs.append(registryRefToQuantumRef[ref])
return existingRefs, missingRefs

# If skipExistingIn is None this will search in butler.run.
existingRefs, missingRefs = findOutputs(self.skipExistingIn)
if self.skipExistingIn:
if existingRefs and not missingRefs:
# Everything is already there, and we do not clobber complete
# outputs if skipExistingIn is specified.
return True

# If we are to re-run quantum then prune datasets that exists in
# output run collection, only if `self.clobberOutputs` is set,
# Find and prune partial outputs if `self.clobberOutputs` is set,
# that only works when we have full butler.
if self.butler is not None:
# Look at butler run instead of skipExistingIn collections.
existingRefs, missingRefs = findOutputs(self.butler.run)
ref_dict = self.butler.datastore.mexists(chain.from_iterable(quantum.outputs.values()))
existingRefs = [ref for ref, exists in ref_dict.items() if exists]
missingRefs = [ref for ref, exists in ref_dict.items() if not exists]
if existingRefs and missingRefs:
_LOG.debug(
"Partial outputs exist for task %s dataId=%s collection=%s "
Expand All @@ -402,11 +338,9 @@ def findOutputs(
f" collection={self.butler.run} existingRefs={existingRefs}"
f" missingRefs={missingRefs}"
)
elif existingRefs and self.clobberOutputs and not self.skipExistingIn:
# Clobber complete outputs if skipExistingIn is not specified.
_LOG.info("Removing complete outputs for task %s: %s", taskDef, existingRefs)
self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
return False
elif existingRefs:
# Everyhting is here already.
return True

# need to re-run
return False
Expand Down
10 changes: 4 additions & 6 deletions tests/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ def test_simple_execute(self) -> None:
self.assertEqual(len(refs), 1)

def test_skip_existing_execute(self) -> None:
"""Run execute() method twice, with skip_existing_in."""
"""Run execute() method twice, with extendRun=True."""

nQuanta = 1
butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument)
Expand All @@ -639,9 +639,9 @@ def test_skip_existing_execute(self) -> None:
self.assertEqual(len(refs), 1)
dataset_id_1 = refs[0].id

# Re-run it with skipExistingIn, it should not run.
# Re-run it with extendRun, it should not run.
assert butler.run is not None
executor = SingleQuantumExecutor(butler, taskFactory, skipExistingIn=[butler.run])
executor = SingleQuantumExecutor(butler, taskFactory, extendRun=True)
executor.execute(node.taskDef, node.quantum)
self.assertEqual(taskFactory.countExec, 1)

Expand Down Expand Up @@ -680,9 +680,7 @@ def test_clobber_outputs_execute(self) -> None:
# Re-run it with clobberOutputs and skipExistingIn, it should not
# clobber but should skip instead.
assert butler.run is not None
executor = SingleQuantumExecutor(
butler, taskFactory, skipExistingIn=[butler.run], clobberOutputs=True
)
executor = SingleQuantumExecutor(butler, taskFactory, extendRun=True, clobberOutputs=True)
executor.execute(node.taskDef, node.quantum)
self.assertEqual(taskFactory.countExec, 1)

Expand Down

0 comments on commit 18faa70

Please sign in to comment.