Skip to content

Commit

Permalink
Add --clobber-partial-outputs option to pipetask (DM-26131)
Browse files Browse the repository at this point in the history
Extended executor to remove partial outputs if this option is specified.
Added unit test for this new feature, made some refactoring in unit
tests to use true butler instead of my own mock.
  • Loading branch information
andy-slac committed Sep 6, 2020
1 parent 18fd799 commit 538eacb
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 214 deletions.
1 change: 1 addition & 0 deletions python/lsst/ctrl/mpexec/cli/opt/optionGroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class execution_options(OptionGroup): # noqa: N801
def __init__(self):
self.decorators = [
option_section(sectionText="Execution options:"),
ctrlMpExecOpts.clobber_partial_outputs_option(),
ctrlMpExecOpts.do_raise_option(),
ctrlMpExecOpts.profile_option(),
ctrlMpExecOpts.processes_option(),
Expand Down
7 changes: 7 additions & 0 deletions python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@
is_flag=True)


clobber_partial_outputs_option = MWOptionDecorator("--clobber-partial-outputs",
help=unwrap("""Remove incomplete outputs from previous
execution of the same quantum before new
execution."""),
is_flag=True)


skip_init_writes_option = MWOptionDecorator("--skip-init-writes",
help=unwrap("""Do not write collection-wide 'init output' datasets
(e.g.schemas)."""),
Expand Down
1 change: 1 addition & 0 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ def runPipeline(self, graph, taskFactory, args, butler=None):
graphFixup = self._importGraphFixup(args)
quantumExecutor = SingleQuantumExecutor(taskFactory,
skipExisting=args.skip_existing,
clobberPartialOutputs=args.clobber_partial_outputs,
enableLsstDebug=args.enableLsstDebug)
timeout = self.MP_TIMEOUT if args.timeout is None else args.timeout
executor = MPGraphExecutor(numProc=args.processes, timeout=timeout,
Expand Down
6 changes: 5 additions & 1 deletion python/lsst/ctrl/mpexec/cmdLineParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ def _makeQuantumGraphOptions(parser):
default=False, action="store_true",
help=("If all Quantum outputs already exist in the output RUN collection "
"then that Quantum will be excluded from the QuantumGraph. "
"Requires --extend-run."))
"Requires --extend-run. When this option is used with 'run' command "
"it skips execution of quantum if all its output exist."))
group.add_argument("-q", "--save-qgraph", dest="save_qgraph",
help="Location for storing a serialized quantum graph definition "
"(pickle file).",
Expand All @@ -385,6 +386,9 @@ def _makeExecOptions(parser):
parser : `argparse.ArgumentParser`
"""
group = parser.add_argument_group("Execution options")
group.add_argument("--clobber-partial-outputs", action="store_true", default=False,
help=("Remove incomplete outputs from previous execution of the same "
"quantum before new execution."))
group.add_argument("--doraise", action="store_true",
help="raise an exception on error (else log a message and continue)?")
group.add_argument("--profile", metavar="PATH", help="Dump cProfile statistics to filename")
Expand Down
49 changes: 37 additions & 12 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,30 @@ class SingleQuantumExecutor(QuantumExecutor):
Instance of a task factory.
skipExisting : `bool`, optional
If True then quanta with all existing outputs are not executed.
clobberPartialOutputs : `bool`, optional
If True then delete any partial outputs from quantum execution. If
complete outputs exists then exception is raise if ``skipExisting`` is
False.
enableLsstDebug : `bool`, optional
Enable debugging with ``lsstDebug`` facility for a task.
"""
def __init__(self, taskFactory, skipExisting=False, enableLsstDebug=False):
def __init__(self, taskFactory, skipExisting=False, clobberPartialOutputs=False, enableLsstDebug=False):
self.taskFactory = taskFactory
self.skipExisting = skipExisting
self.enableLsstDebug = enableLsstDebug
self.clobberPartialOutputs = clobberPartialOutputs

def execute(self, taskDef, quantum, butler):
# Docstring inherited from QuantumExecutor.execute
taskClass, config = taskDef.taskClass, taskDef.config
self.setupLogging(taskClass, config, quantum)
if self.skipExisting and self.quantumOutputsExist(quantum, butler):

# check whether to skip or delete old outputs
if self.checkExistingOutputs(quantum, butler, taskDef):
_LOG.info("Quantum execution skipped due to existing outputs, "
f"task={taskClass.__name__} dataId={quantum.dataId}.")
return

self.updateQuantumInputs(quantum, butler)

# enable lsstDebug debugging
Expand Down Expand Up @@ -109,21 +117,26 @@ def setupLogging(self, taskClass, config, quantum):
else:
Log.MDC("LABEL", '[' + ', '.join([str(dataId) for dataId in dataIds]) + ']')

def quantumOutputsExist(self, quantum, butler):
def checkExistingOutputs(self, quantum, butler, taskDef):
"""Decide whether this quantum needs to be executed.
If only partial outputs exist then they are removed if
``clobberPartialOutputs`` is True, otherwise an exception is raised.
Parameters
----------
quantum : `~lsst.daf.butler.Quantum`
Quantum to check for existing outputs
butler : `~lsst.daf.butler.Butler`
Data butler.
taskDef : `~lsst.pipe.base.TaskDef`
Task definition structure.
Returns
-------
exist : `bool`
True if all quantum's outputs exist in a collection, False
otherwise.
True if all quantum's outputs exist in a collection and
``skipExisting`` is True, False otherwise.
Raises
------
Expand All @@ -140,16 +153,28 @@ def quantumOutputsExist(self, quantum, butler):
ref = registry.findDataset(datasetRef.datasetType, datasetRef.dataId,
collections=butler.run)
if ref is None:
missingRefs.append(datasetRefs)
missingRefs.append(datasetRef)
else:
existingRefs.append(datasetRefs)
existingRefs.append(ref)
if existingRefs and missingRefs:
# some outputs exist and same not, can't do a thing with that
raise RuntimeError(f"Registry inconsistency while checking for existing outputs:"
f" collection={collection} existingRefs={existingRefs}"
f" missingRefs={missingRefs}")
# some outputs exist and some don't, either delete existing ones or complain
_LOG.debug("Partial outputs exist for task %s dataId=%s collection=%s "
"existingRefs=%s missingRefs=%s",
taskDef, quantum.dataId, collection, existingRefs, missingRefs)
if self.clobberPartialOutputs:
_LOG.info("Removing partial outputs for task %s: %s", taskDef, existingRefs)
butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
return False
else:
raise RuntimeError(f"Registry inconsistency while checking for existing outputs:"
f" collection={collection} existingRefs={existingRefs}"
f" missingRefs={missingRefs}")
elif existingRefs:
# complete outputs exist, this is fine only if skipExisting is set
return self.skipExisting
else:
return bool(existingRefs)
# no outputs exist
return False

def makeTask(self, taskClass, config, butler):
"""Make new task instance.
Expand Down

0 comments on commit 538eacb

Please sign in to comment.