Skip to content

Commit

Permalink
implement qgraph and run commands
Browse files Browse the repository at this point in the history
build, run, and qgraph may be "chained" together.

Some options, marked in subcommand help with (f), are forwarded
from one chained subcommand to the next.
  • Loading branch information
n8pease committed Aug 18, 2020
1 parent 90326e5 commit 4954b80
Show file tree
Hide file tree
Showing 7 changed files with 750 additions and 51 deletions.
160 changes: 119 additions & 41 deletions python/lsst/ctrl/mpexec/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import click
import copy
from functools import partial

from lsst.daf.butler.cli.opt import (config_file_option,
config_option,
log_level_option)
from lsst.daf.butler.cli.utils import cli_handle_exception
from lsst.daf.butler.cli.utils import (cli_handle_exception,
MWCommand,
MWCtxObj,
option_section,
split_kv,
unwrap)
from lsst.obs.base.cli.opt import instrument_option
from .. import opt
from .. import script
Expand All @@ -36,25 +41,29 @@
"pipeline. This must be the fully qualified class name.")


class PipetaskCommand(click.Command):
def parse_args(self, ctx, args):
ctx.obj = copy.copy(args)
super().parse_args(ctx, args)
forwardEpilog = unwrap("""Options marked with (f) are forwarded to the next subcommand if multiple subcommands
are chained in the same command execution. Previous values may be overridden by passing new
option values into the next subcommand.""")


buildEpilog = """Notes:
buildEpilog = unwrap(f"""Notes:
--task, --delete, --config, --config-file, and --instrument action options can
appear multiple times; all values are used, in order left to right.
FILE reads command-line options from the specified file. Data may be
distributed among multiple lines (e.g. one option per line). Data after # is
treated as a comment and ignored. Blank lines and lines starting with # are
ignored.
"""
ignored.)
""")

qgraphEpilog = forwardEpilog

runEpilog = forwardEpilog

@click.command(cls=PipetaskCommand, epilog=buildEpilog, short_help="Build pipeline definition.")

@click.command(cls=MWCommand, epilog=buildEpilog, short_help="Build pipeline definition.")
@click.pass_context
@log_level_option()
@opt.pipeline_option()
@opt.task_option()
@opt.delete_option(metavar="LABEL")
Expand All @@ -67,42 +76,111 @@ def parse_args(self, ctx, args):
@opt.save_pipeline_option()
@opt.pipeline_dot_option()
@opt.show_option()
@log_level_option()
def build(ctx, *args, **kwargs):
"""Build and optionally save pipeline definition.
This does not require input data to be specified.
"""
# The pipeline actions (task, delete, config, config_file, and instrument)
# must be handled in the order they appear on the command line, but the CLI
# specification gives them all different option names. So, instead of using
# the individual action options as they appear in kwargs (because
# invocation order can't be known), we capture the CLI arguments by
# overriding `click.Command.parse_args` and save them in the Context's
# `obj` parameter. We use `makePipelineActions` to create a list of
# pipeline actions from the CLI arguments and pass that list to the script
# function using the `pipeline_actions` kwarg name, and remove the action
# options from kwargs.
for pipelineAction in (opt.task_option.name(), opt.delete_option.name(), config_option.name(),
config_file_option.name(), instrument_option.name()):
kwargs.pop(pipelineAction)
kwargs['pipeline_actions'] = makePipelineActions(ctx.obj)
cli_handle_exception(script.build, *args, **kwargs)


@click.command(cls=PipetaskCommand)
def qgraph(*args, **kwargs):
"""Not implemented.
Build and optionally save pipeline and quantum graph.
def processor(objs):
# The pipeline actions (task, delete, config, config_file, and instrument)
# must be handled in the order they appear on the command line, but the CLI
# specification gives them all different option names. So, instead of using
# the individual action options as they appear in kwargs (because
# invocation order can't be known), we capture the CLI arguments by
# overriding `click.Command.parse_args` and save them in the Context's
# `obj` parameter. We use `makePipelineActions` to create a list of
# pipeline actions from the CLI arguments and pass that list to the script
# function using the `pipeline_actions` kwarg name, and remove the action
# options from kwargs.
for pipelineAction in (opt.task_option.name(), opt.delete_option.name(),
config_option.name(), config_file_option.name(),
instrument_option.name()):
kwargs.pop(pipelineAction)
kwargs['pipeline_actions'] = makePipelineActions(MWCtxObj.getFrom(ctx).args)
objs.pipeline = cli_handle_exception(script.build, *args, **kwargs)
return objs
return processor


@click.command(cls=MWCommand, epilog=qgraphEpilog)
@click.pass_context
@log_level_option()
@opt.qgraph_option()
@opt.skip_existing_option()
@opt.save_qgraph_option()
@opt.save_single_quanta_option()
@opt.qgraph_dot_option()
@option_section(sectionText="Data repository and selection options:")
@opt.butler_config_option(forward=True)
# CLI API says `--input` values should be given like
# "datasetType:collectionName" or just "datasetType", but CmdLineFwk api
# wants input values to be a tuple of tuples, where each tuple is
# ("collectionName", "datasetType"), or (..., "datasetType") with elipsis if no
# collectionName is provided. Setting `return_type=tuple`, `reverse_kv=True`,
# and `default_key=...` make `split_kv` callback structure its return value
# that way.
@opt.input_option(callback=partial(split_kv, return_type=tuple, default_key=..., reverse_kv=True,
unseparated_okay=True),
multiple=True,
forward=True)
@opt.output_option(forward=True)
@opt.output_run_option(forward=True)
@opt.extend_run_option(forward=True)
@opt.replace_run_option(forward=True)
@opt.prune_replaced_option(forward=True)
@opt.data_query_option(forward=True)
@option_section(sectionText="Other options:")
@opt.show_option()
def qgraph(ctx, *args, **kwargs):
"""Build and optionally save quantum graph.
"""
print("Not implemented.")
def processor(objs):
newKwargs = objs.butlerArgs.update(ctx.command.params, MWCtxObj.getFrom(ctx).args, **kwargs)
objs.qgraph = cli_handle_exception(script.qgraph, pipeline=objs.pipeline, **newKwargs)
return objs
return processor


@click.command(cls=PipetaskCommand)
def run(*args, **kwargs):
"""Not implemented.
Build and execute pipeline and quantum graph.
@click.command(cls=MWCommand, epilog=runEpilog)
@click.pass_context
@log_level_option()
@opt.debug_option()
@option_section(sectionText="Data repository and selection options:")
@opt.butler_config_option(forward=True)
# CLI API says `--input` values should be given like
# "datasetType:collectionName" or just "datasetType", but CmdLineFwk api
# wants input values to be a tuple of tuples, where each tuple is
# ("collectionName", "datasetType"), or (..., "datasetType") - elipsis if no
# collectionName is provided.
@opt.input_option(callback=partial(split_kv, return_type=tuple, default_key=..., reverse_kv=True,
unseparated_okay=True),
multiple=True,
forward=True)
@opt.output_option(forward=True)
@opt.output_run_option(forward=True)
@opt.extend_run_option(forward=True)
@opt.replace_run_option(forward=True)
@opt.prune_replaced_option(forward=True)
@opt.data_query_option(forward=True)
@option_section(sectionText="Execution options:")
@opt.do_raise_option()
@opt.profile_option()
@opt.processes_option()
@opt.timeout_option()
@opt.graph_fixup_option()
@option_section(sectionText="Meta-information output options:")
@opt.skip_init_writes_option()
@opt.init_only_option()
@opt.register_dataset_types_option()
@opt.no_versions_option()
@opt.skip_existing_option(help=unwrap("""Do not try to overwrite any datasets that might exist in the butler.
If not provided then any existing conflicting dataset will cause butler
exception."""))
def run(ctx, *args, **kwargs):
"""Execute pipeline and quantum graph.
"""
print("Not implemented.")
def processor(objs):
newKwargs = objs.butlerArgs.update(ctx.command.params, MWCtxObj.getFrom(ctx).args, **kwargs)
cli_handle_exception(script.run, qgraph=objs.qgraph, **newKwargs)
return objs
return processor
154 changes: 154 additions & 0 deletions python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,94 @@
from lsst.daf.butler.cli.utils import MWOptionDecorator, MWPath, split_commas, unwrap


butler_config_option = MWOptionDecorator("-b", "--butler-config",
help="Location of the gen3 butler/registry config file.",
type=MWPath(dir_okay=False, file_okay=True, readable=True))


data_query_option = MWOptionDecorator("-d", "--data-query",
help="User data selection expression.",
metavar="QUERY")


debug_option = MWOptionDecorator("--debug",
help="Enable debugging output using lsstDebug facility (imports debug.py).")


delete_option = MWOptionDecorator("--delete",
callback=split_commas,
help="Delete task with given label from pipeline.",
multiple=True)


do_raise_option = MWOptionDecorator("--do-raise",
help="Raise an exception on error. (else log a message and continue?)",
is_flag=True)


extend_run_option = MWOptionDecorator("--extend-run",
help=unwrap("""Instead of creating a new RUN collection, insert datasets
into either the one given by --output-run (if provided) or
the first child collection of - -output(which must be of
type RUN)."""),
is_flag=True)


graph_fixup_option = MWOptionDecorator("--graph-fixup",
help=unwrap("""Name of the class or factory method which makes an
instance used for execution graph fixup."""))


init_only_option = MWOptionDecorator("--init-only",
help=unwrap("""Do not actually run; just register dataset types and/or
save init outputs. """),
is_flag=True)


# TODO defaultMetavar and defaultHelp both match with the handling
# specified by the input_option callback defined in commands.py. Should
# that callback definition get moved here? Should these defaults be made
# less use-case specific and moved to commands.py? Is it ok as is?
input_option = MWOptionDecorator("--input",
callback=split_commas,
default=list(),
help=unwrap("""Comma-separated names of the input collection(s). Entries may
include a colon (:), the first string is a dataset type name that
restricts the search in that collection."""),
metavar="COLL,DSTYPE:COLL",
multiple=True)

no_versions_option = MWOptionDecorator("--no-versions",
help="Do not save or check package versions.",
is_flag=True)


order_pipeline_option = MWOptionDecorator("--order-pipeline",
help=unwrap("""Order tasks in pipeline based on their data
dependencies, ordering is performed as last step before saving or
executing pipeline."""),
is_flag=True)


output_option = MWOptionDecorator("-o", "--output",
help=unwrap(f"""Name of the output CHAINED collection. This may either be an
existing CHAINED collection to use as both input and output
(incompatible with --input), or a new CHAINED collection created
to include all inputs (requires --input). In both cases, the
collection's children will start with an output RUN collection
that directly holds all new datasets (see --output-run)."""),
metavar="COLL")


output_run_option = MWOptionDecorator("--output-run",
help=unwrap("""Name of the new output RUN collection. If not provided
then --output must be provided and a new RUN collection will
be created by appending a timestamp to the value passed with
--output. If this collection already exists then
--extend-run must be passed."""),
metavar="COLL")


pipeline_option = MWOptionDecorator("-p", "--pipeline",
help="Location of a pipeline definition file in YAML format.",
type=MWPath(exists=True, file_okay=True, dir_okay=False, readable=True))
Expand All @@ -48,11 +123,72 @@
type=MWPath(writable=True, file_okay=True, dir_okay=False))


processes_option = MWOptionDecorator("-j", "--processes",
help="Number of processes to use.",
type=click.IntRange(min=1))


profile_option = MWOptionDecorator("--profile",
help="Dump cProfile statistics to file name.",
type=MWPath(file_okay=True, dir_okay=False))


prune_replaced_option = MWOptionDecorator("--prune-replaced",
help=unwrap("""Delete the datasets in the collection replaced by
--replace-run, either just from the datastore
('unstore') or by removing them and the RUN completely
('purge'). Requires --replace-run."""),
type=click.Choice(("unstore", "purge"), case_sensitive=False))


qgraph_option = MWOptionDecorator("-g", "--qgraph",
help=unwrap("""Location for a serialized quantum graph definition (pickle
file). If this option is given then all input data options and
pipeline-building options cannot be used."""),
type=MWPath(exists=True, file_okay=True, dir_okay=False, readable=True))


qgraph_dot_option = MWOptionDecorator("--qgraph-dot",
help=unwrap("""Location for storing GraphViz DOT representation of a
quantum graph."""),
type=MWPath(writable=True, file_okay=True, dir_okay=False))


register_dataset_types_option = MWOptionDecorator("--register-dataset-types",
help=unwrap("""Register DatasetTypes that do not already
exist in the Registry."""),
is_flag=True)


replace_run_option = MWOptionDecorator("--replace-run",
help=unwrap(f"""Before creating a new RUN collection in an existing
CHAINED collection, remove the first child collection
(which must be of type RUN). This can be used to repeatedly
write to the same (parent) collection during development,
but it does not delete the datasets associated with the
replaced run unless --prune-replaced is also passed.
Requires --output, and incompatible with --extend-run."""),
is_flag=True)


save_pipeline_option = MWOptionDecorator("-s", "--save-pipeline",
help=unwrap("""Location for storing resulting pipeline definition in
YAML format."""),
type=MWPath(dir_okay=False, file_okay=True, writable=True))

save_qgraph_option = MWOptionDecorator("-q", "--save-qgraph",
help=unwrap("""Location for storing a serialized quantum graph
definition (pickle file)."""),
type=MWPath(file_okay=True, dir_okay=False, readable=True))


save_single_quanta_option = MWOptionDecorator("--save-single-quanta",
help=unwrap("""Format string of locations for storing individual
quantum graph definition (pickle files). The curly
brace {} in the input string will be replaced by a
quantum number."""))


show_option = MWOptionDecorator("--show",
callback=split_commas,
help=unwrap("""Dump various info to standard output. Possible items are:
Expand All @@ -70,6 +206,20 @@
multiple=True)


skip_existing_option = MWOptionDecorator("--skip-existing",
help=unwrap("""If all Quantum outputs already exist in the output RUN
collection then that Quantum will be excluded from the
QuantumGraph. Requires the 'run` command's `--extend-run`
flag to be set."""),
is_flag=True)


skip_init_writes_option = MWOptionDecorator("--skip-init-writes",
help=unwrap("""Do not write collection-wide 'init output' datasets
(e.g.schemas)."""),
is_flag=True)


task_option = MWOptionDecorator("-t", "--task",
callback=split_commas,
help=unwrap("""Task name to add to pipeline, must be a fully qualified task
Expand All @@ -78,3 +228,7 @@
label."""),
metavar="TASK[:LABEL]",
multiple=True)


timeout_option = MWOptionDecorator("--timeout",
help="Timeout for multiprocessing; maximum wall time (sec).")

0 comments on commit 4954b80

Please sign in to comment.