Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-27492: Improvements to pipetask execution options #133

Merged
merged 7 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions doc/changes/DM-27492.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Several improvements in `pipetask` execution options:
- New option `--skip-existing-in` which takes collection names(s), if output
datasets already exist in those collections corresponding quanta is skipped.
- A `--skip-existing` option is now equivalent to appending output run
collection to the `--skip-existing-in` list.
- An `--extend-run` option implicitly enables `--skip-existing` option.
- A `--prune-replaced=unstore` option only removes regular output datasets;
InitOutputs, task configs, and package versions are not removed.
1 change: 1 addition & 0 deletions python/lsst/ctrl/mpexec/cli/opt/optionGroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(self):
ctrlMpExecOpts.qgraph_option(),
ctrlMpExecOpts.qgraph_id_option(),
ctrlMpExecOpts.qgraph_node_id_option(),
ctrlMpExecOpts.skip_existing_in_option(),
ctrlMpExecOpts.skip_existing_option(),
ctrlMpExecOpts.clobber_outputs_option(),
ctrlMpExecOpts.save_qgraph_option(),
Expand Down
38 changes: 28 additions & 10 deletions python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ def _to_int(value):
extend_run_option = MWOptionDecorator("--extend-run",
help=unwrap("""Instead of creating a new RUN collection, insert datasets
into either the one given by --output-run (if provided) or
the first child collection of - -output(which must be of
type RUN)."""),
the first child collection of --output (which must be of
type RUN). This also enables --skip-existing option."""),
is_flag=True)


Expand Down Expand Up @@ -244,19 +244,37 @@ def _to_int(value):
multiple=True)


skip_existing_option = MWOptionDecorator("--skip-existing",
help=unwrap("""If all Quantum outputs already exist in the output RUN
collection then that Quantum will be excluded from the
QuantumGraph. Requires the 'run` command's `--extend-run`
flag to be set."""),
is_flag=True)
skip_existing_in_option = MWOptionDecorator(
"--skip-existing-in",
callback=split_commas,
default=None,
metavar="COLLECTION",
multiple=True,
help=unwrap(
"""If all Quantum outputs already exist in the specified list of
collections then that Quantum will be excluded from the QuantumGraph.
"""
)
)


skip_existing_option = MWOptionDecorator(
"--skip-existing",
is_flag=True,
help=unwrap(
"""This option is equivalent to --skip-existing-in with the name of
the output RUN collection. If both --skip-existing-in and
--skip-existing are given then output RUN collection is appended to
the list of collections."""
)
)


clobber_outputs_option = MWOptionDecorator("--clobber-outputs",
help=unwrap("""Remove outputs from previous execution of the same
quantum before new execution. If `--skip-existing`
quantum before new execution. If --skip-existing
is also passed, then only failed quanta will be
clobbered. Requires the 'run` command's `--extend-run`
clobbered. Requires the 'run' command's --extend-run
flag to be set."""),
is_flag=True)

Expand Down
22 changes: 12 additions & 10 deletions python/lsst/ctrl/mpexec/cli/script/qgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
_log = logging.getLogger(__name__.partition(".")[2])


def qgraph(pipelineObj, qgraph, qgraph_id, qgraph_node_id, skip_existing, save_qgraph, save_single_quanta,
qgraph_dot, butler_config, input, output, output_run, extend_run, replace_run, prune_replaced,
data_query, show, save_execution_butler, clobber_execution_butler, clobber_outputs, **kwargs):
def qgraph(pipelineObj, qgraph, qgraph_id, qgraph_node_id, skip_existing_in, skip_existing, save_qgraph,
save_single_quanta, qgraph_dot, butler_config, input, output, output_run, extend_run,
replace_run, prune_replaced, data_query, show, save_execution_butler, clobber_execution_butler,
clobber_outputs, **kwargs):
"""Implements the command line interface `pipetask qgraph` subcommand,
should only be called by command line tools and unit test code that test
this function.
Expand All @@ -48,10 +49,12 @@ def qgraph(pipelineObj, qgraph, qgraph_id, qgraph_node_id, skip_existing, save_q
qgraph_node_id : `list` of `int`, optional
Only load a specified set of nodes if graph is loaded from a file,
nodes are identified by integer IDs.
skip_existing_in : `list` [ `str` ]
Accepts list of collections, if all Quantum outputs already exist in
the specified list of collections then that Quantum will be excluded
from the QuantumGraph.
skip_existing : `bool`
If all Quantum outputs already exist in the output RUN collection then
that Quantum will be excluded from the QuantumGraph. Will only be used
if `extend_run` flag is set.
Appends output RUN collection to the ``skip_existing_in`` list.
save_qgraph : `str` or `None`
URI location for storing a serialized quantum graph definition as a
pickle file.
Expand All @@ -67,10 +70,8 @@ def qgraph(pipelineObj, qgraph, qgraph_id, qgraph_node_id, skip_existing, save_q
butler/registry config file. If `dict`, `butler_config` is key value
pairs used to init or update the `lsst.daf.butler.Config` instance. If
`Config`, it is the object used to configure a Butler.
input : `str`
Comma-separated names of the input collection(s). Entries may include a
colon (:), the first string is a dataset type name that restricts the
search in that collection.
input : `list` [ `str` ]
List of names of the input collection(s).
output : `str`
Name of the output CHAINED collection. This may either be an existing
CHAINED collection to use as both input and output (if `input` is
Expand Down Expand Up @@ -136,6 +137,7 @@ def qgraph(pipelineObj, qgraph, qgraph_id, qgraph_node_id, skip_existing, save_q
prune_replaced=prune_replaced,
data_query=data_query,
show=show,
skip_existing_in=skip_existing_in,
skip_existing=skip_existing,
execution_butler_location=save_execution_butler,
clobber_execution_butler=clobber_execution_butler,
Expand Down
16 changes: 9 additions & 7 deletions python/lsst/ctrl/mpexec/cli/script/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def run(do_raise,
replace_run,
prune_replaced,
data_query,
skip_existing_in,
skip_existing,
debug,
fail_fast,
Expand Down Expand Up @@ -89,10 +90,8 @@ def run(do_raise,
butler/registry config file. If `dict`, `butler_config` is key value
pairs used to init or update the `lsst.daf.butler.Config` instance. If
`Config`, it is the object used to configure a Butler.
input : `str`
Comma-separated names of the input collection(s). Entries may include a
colon (:), the first string is a dataset type name that restricts the
search in that collection.
input : `list` [ `str` ]
List of names of the input collection(s).
output : `str`
Name of the output CHAINED collection. This may either be an existing
CHAINED collection to use as both input and output (if `input` is
Expand Down Expand Up @@ -122,10 +121,12 @@ def run(do_raise,
removing them and the RUN completely ("purge"). Requires `replace_run`.
data_query : `str`
User query selection expression.
skip_existing_in : `list` [ `str` ]
Accepts list of collections, if all Quantum outputs already exist in
the specified list of collections then that Quantum will be excluded
from the QuantumGraph.
skip_existing : `bool`
If all Quantum outputs already exist in the output RUN collection then
that Quantum will be excluded from the QuantumGraph. Requires the 'run`
command's `--extend-run` flag to be set.
Appends output RUN collection to the ``skip_existing_in`` list.
debug : `bool`
If true, enable debugging output using lsstDebug facility (imports
debug.py).
Expand Down Expand Up @@ -159,6 +160,7 @@ def run(do_raise,
replace_run=replace_run,
prune_replaced=prune_replaced,
data_query=data_query,
skip_existing_in=skip_existing_in,
skip_existing=skip_existing,
enableLsstDebug=debug,
fail_fast=fail_fast,
Expand Down