Skip to content

Commit

Permalink
implement run method using option forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
n8pease committed Aug 10, 2020
1 parent a6a7cd2 commit 6135ef4
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 32 deletions.
90 changes: 61 additions & 29 deletions python/lsst/ctrl/mpexec/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import click
import copy
from functools import partial

from lsst.daf.butler.cli.opt import (config_file_option,
config_option,
log_level_option)
from lsst.daf.butler.cli.utils import cli_handle_exception, option_section, split_kv
from lsst.daf.butler.cli.utils import (cli_handle_exception, MWCommand, MWCtxObj,
option_section, split_kv, unwrap)
from lsst.obs.base.cli.opt import instrument_parameter
from ..opt import (butler_config_option,
data_query_option,
debug_option,
do_raise_option,
extend_run_option,
delete_option,
Expand Down Expand Up @@ -65,25 +66,27 @@
"pipeline. This must be the fully qualified class name.")


class PipetaskCommand(click.Command):
def parse_args(self, ctx, args):
ctx.obj = copy.copy(args)
super().parse_args(ctx, args)
forwardEpilog = unwrap("""Options marked with (f) are forwarded to the next subcommand if multiple subcommands
are chained in the same command execution. Previous values may be overridden by passing new
option values into the next subcommand.""")


buildEpilog = """Notes:
buildEpilog = unwrap(f"""Notes:
--task, --delete, --config, --config-file, and --instrument action options can
appear multiple times; all values are used, in order left to right.
FILE reads command-line options from the specified file. Data may be
distributed among multiple lines (e.g. one option per line). Data after # is
treated as a comment and ignored. Blank lines and lines starting with # are
ignored.
"""
ignored.)
""")

qgraphEpilog = forwardEpilog

runEpilog = forwardEpilog

@click.command(cls=PipetaskCommand, epilog=buildEpilog, short_help="Build pipeline definition.")

@click.command(cls=MWCommand, epilog=buildEpilog, short_help="Build pipeline definition.")
@click.pass_context
@log_level_option(defaultValue=None)
@pipeline_option()
Expand Down Expand Up @@ -114,49 +117,75 @@ def processor(objs):
# pipeline actions from the CLI arguments and pass that list to the script
# function using the `pipeline_actions` kwarg name, and remove the action
# options from kwargs.
for pipelineAction in (task_option.optionKey, delete_option.optionKey, config_option.optionKey,
config_file_option.optionKey, instrument_parameter.optionKey):
for pipelineAction in (task_option.optionKey(), delete_option.optionKey(), config_option.optionKey(),
config_file_option.optionKey(), instrument_parameter.optionKey()):
kwargs.pop(pipelineAction)
kwargs['pipeline_actions'] = makePipelineActions(ctx.obj)
kwargs['pipeline_actions'] = makePipelineActions(MWCtxObj.getFrom(ctx).args)
objs.pipeline = cli_handle_exception(script.build, *args, **kwargs)
return objs
return processor


@click.command(cls=PipetaskCommand)
@click.command(cls=MWCommand, epilog=qgraphEpilog)
@click.pass_context
@log_level_option(defaultValue=None)
@qgraph_option()
@skip_existing_option()
@save_qgraph_option()
@save_single_quanta_option()
@qgraph_dot_option()
@butler_config_option()
@option_section(sectionText="Data repository and selection options:")
@butler_config_option(forward=True)
# CLI API says `--input` values should be given like
# "datasetType:collectionName" or just "datasetType", but CmdLineFwk api
# wants input values to be a tuple of tuples, where each tuple is
# ("collectionName", "datasetType"), or (..., "datasetType") - elipsis if no
# collectionName is provided.
# collectionName is provided. `return_type="tuple"`, `reverse_kv=True`, and
# `default_key=...` make split_kv callback structure its return value that way.
@input_option(callback=partial(split_kv, return_type="tuple", default_key=..., reverse_kv=True,
unseparated_okay=True),
multiple=True)
@output_option()
@output_run_option()
@extend_run_option()
@replace_run_option()
@prune_replaced_option()
@data_query_option()
multiple=True,
forward=True)
@output_option(forward=True)
@output_run_option(forward=True)
@extend_run_option(forward=True)
@replace_run_option(forward=True)
@prune_replaced_option(forward=True)
@data_query_option(forward=True)
@option_section(sectionText="Other options:")
@show_option(multiple=True)
def qgraph(*args, **kwargs):
def qgraph(ctx, *args, **kwargs):
"""Build and optionally save quantum graph.
"""
def processor(objs):
objs.pipeline, objs.qgraph = cli_handle_exception(script.qgraph, pipeline=objs.pipeline, **kwargs)
newKwargs = objs.butlerArgs.update(ctx.command.params, MWCtxObj.getFrom(ctx).args, **kwargs)
objs.qgraph = cli_handle_exception(script.qgraph, pipeline=objs.pipeline, **newKwargs)
return objs
return processor


@click.command(cls=PipetaskCommand)
@click.command(cls=MWCommand, epilog=runEpilog)
@click.pass_context
@log_level_option(defaultValue=None)
@debug_option()
@option_section(sectionText="Data repository and selection options:")
@butler_config_option(forward=True)
# CLI API says `--input` values should be given like
# "datasetType:collectionName" or just "datasetType", but CmdLineFwk api
# wants input values to be a tuple of tuples, where each tuple is
# ("collectionName", "datasetType"), or (..., "datasetType") - elipsis if no
# collectionName is provided.
@input_option(callback=partial(split_kv, return_type="tuple", default_key=..., reverse_kv=True,
unseparated_okay=True),
multiple=True,
forward=True)
@output_option(forward=True)
@output_run_option(forward=True)
@extend_run_option(forward=True)
@replace_run_option(forward=True)
@prune_replaced_option(forward=True)
@data_query_option(forward=True)
@option_section(sectionText="Execution options:")
@do_raise_option()
@profile_option()
@processes_option()
Expand All @@ -167,10 +196,13 @@ def processor(objs):
@init_only_option()
@register_dataset_types_option()
@no_versions_option()
def run(*args, **kwargs):
@skip_existing_option(help="Do not try to overwrite any datasets that might exist in the butler. If not "
"provided then any existing conflicting dataset will cause butler exception.")
def run(ctx, *args, **kwargs):
"""Execute pipeline and quantum graph.
"""
def processor(objs):
cli_handle_exception(script.run, qgraph=objs.qgraph, **kwargs)
newKwargs = objs.butlerArgs.update(ctx.command.params, MWCtxObj.getFrom(ctx).args, **kwargs)
cli_handle_exception(script.run, qgraph=objs.qgraph, **newKwargs)
return objs
return processor
9 changes: 9 additions & 0 deletions python/lsst/ctrl/mpexec/cli/pipetask.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from lsst.daf.butler.cli.butler import LoaderCLI
from lsst.daf.butler.cli.opt import log_level_option, long_log_option
from lsst.daf.butler.cli.utils import ForwardOptions


class PipetaskCLI(LoaderCLI):
Expand All @@ -41,9 +42,17 @@ def cli(log_level, long_log):

class PipetaskObjects:
def __init__(self):

# The pipeline object that gets created by a subcommand
self.pipeline = None

# The qgraph object that gets created by a subcommand
self.qgraph = None

# The arguments used to initialize a butler in a subcommand, can be
# used by subsequent subcommands.
self.butlerArgs = ForwardOptions()


@cli.resultcallback()
def processPipeline(processors, log_level, long_log):
Expand Down
119 changes: 116 additions & 3 deletions python/lsst/ctrl/mpexec/cli/script/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,97 @@ def run(do_raise,
qgraph,
register_dataset_types,
skip_init_writes,
timeout):
timeout,
butler_config,
input,
output,
output_run,
extend_run,
replace_run,
prune_replaced,
data_query,
skip_existing,
debug):
"""Implements the command line interface `pipetask run` subcommand, should
only be called by command line tools and unit test code that test this
function.
Parameters
----------
do_raise : `bool`
Raise an exception in the case of an error.
graph_fixup : `str`
The name of the class or factory method which makes an instance used
for execution graph fixup.
init_only : `bool`
If true, do not actually run; just register dataset types and/or save
init outputs.
log_level : `list` of `tuple`
per-component logging levels, each item in the list is a tuple
(component, level), `component` is a logger name or an empty string
or `None` for root logger, `level` is a logging level name, one of
CRITICAL, ERROR, WARNING, INFO, DEBUG (case insensitive).
no_versions : `bool`
If true, do not save or check package versions.
processes : `int`
The number of processes to use.
profile : `str`
File name to dump cProfile information to.
qgraph : `lsst.pipe.base.QuantumGraph`
A QuantumGraph generated by a previous subcommand.
register_dataset_types : `bool`
If true, register DatasetTypes that do not already exist in the Registry.
skip_init_writes : `bool`
If true, do not write collection-wide 'init output' datasets (e.g.
schemas).
timeout : `int`
Timeout for multiprocessing; maximum wall time (sec).
butler_config : `str`, `dict`, or `lsst.daf.butler.Config`
If `str`, `butler_config` is the path location of the gen3
butler/registry config file. If `dict`, `butler_config` is key value
pairs used to init or update the `lsst.daf.butler.Config` instance. If
`Config`, it is the object used to configure a Butler.
input : `str`
Comma-separated names of the input collection(s). Entries may include a
colon (:), the first string is a dataset type name that restricts the
search in that collection.
output : `str`
Name of the output CHAINED collection. This may either be an existing
CHAINED collection to use as both input and output (if `input` is
`None`), or a new CHAINED collection created to include all inputs
(if `input` is not `None`). In both cases, the collection's children
will start with an output RUN collection that directly holds all new
datasets (see `output_run`).
output_run : `str`
Name of the new output RUN collection. If not provided then `output`
must be provided and a new RUN collection will be created by appending
a timestamp to the value passed with `output`. If this collection
already exists then `extend_run` must be passed.
extend_run : `bool`
Instead of creating a new RUN collection, insert datasets into either
the one given by `output_run` (if provided) or the first child
collection of `output` (which must be of type RUN).
replace_run : `bool`
Before creating a new RUN collection in an existing CHAINED collection,
remove the first child collection (which must be of type RUN). This can
be used to repeatedly write to the same (parent) collection during
development, but it does not delete the datasets associated with the
replaced run unless `prune-replaced` is also True. Requires `output`,
and `extend_run` must be `None`.
prune_replaced : "unstore", "purge", or `None`.
If not `None`, delete the datasets in the collection replaced by
`replace_run`, either just from the datastore ("unstore") or by
removing them and the RUN completely ("purge"). Requires `replace_run`.
data_query : `str`
User query selection expression.
skip_existing : `bool`
If all Quantum outputs already exist in the output RUN collection then
that Quantum will be excluded from the QuantumGraph. Requires the 'run`
command's `--extend-run` flag to be set.
debug : `bool`
If true, enable debugging output using lsstDebug facility (imports
debug.py).
"""

if log_level is not None:
CliLog.setLogLevels(log_level)
Expand All @@ -57,7 +147,17 @@ def __init__(self,
profile,
skip_init_writes,
timeout,
register_dataset_types):
register_dataset_types,
butler_config,
input,
output,
output_run,
extend_run,
replace_run,
prune_replaced,
data_query,
skip_existing,
debug):
self.do_raise = do_raise
self.graph_fixup = graph_fixup
self.init_only = init_only
Expand All @@ -67,10 +167,23 @@ def __init__(self,
self.skip_init_writes = skip_init_writes
self.timeout = timeout
self.register_dataset_types = register_dataset_types
self.butler_config = butler_config
self.input = input
self.output = output
self.output_run = output_run
self.extend_run = extend_run
self.replace_run = replace_run
self.prune_replaced = prune_replaced
self.data_query = data_query
self.skip_existing = skip_existing
self.enableLsstDebug = debug

args = RunArgs(do_raise=do_raise, graph_fixup=graph_fixup, init_only=init_only, no_versions=no_versions,
processes=processes, profile=profile, skip_init_writes=skip_init_writes, timeout=timeout,
register_dataset_types=register_dataset_types)
register_dataset_types=register_dataset_types, butler_config=butler_config, input=input,
output=output, output_run=output_run, extend_run=extend_run, replace_run=replace_run,
prune_replaced=prune_replaced, data_query=data_query, skip_existing=skip_existing,
debug=debug)

f = CmdLineFwk()
taskFactory = TaskFactory()
Expand Down

0 comments on commit 6135ef4

Please sign in to comment.