Skip to content

Commit

Permalink
Reorganize how the --show option is handled
Browse files Browse the repository at this point in the history
1. Keep track of which options have been used.
2. No longer run the pipeline show options multiple times
   for pipetask qgraph.
3. No longer warn about options not being used when they
   are about to be used.
4. If --show is used, no longer continue on unless there are
   remaining --show options to process.

The latter means that using `pipetask qgraph` with only pipeline
show options exits without building the graph, and any use of
--show leads to `pipetask run` not running anything.
  • Loading branch information
timj committed Aug 20, 2022
1 parent 704faf8 commit 7619b97
Show file tree
Hide file tree
Showing 7 changed files with 401 additions and 309 deletions.
31 changes: 26 additions & 5 deletions python/lsst/ctrl/mpexec/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import click
import lsst.pipe.base.cli.opt as pipeBaseOpts
from lsst.ctrl.mpexec.showInfo import ShowInfo
from lsst.daf.butler.cli.opt import config_file_option, config_option, confirm_option, options_file_option
from lsst.daf.butler.cli.utils import MWCtxObj, catch_and_exit, option_section, unwrap

Expand Down Expand Up @@ -101,7 +102,13 @@ def build(ctx: click.Context, **kwargs: Any) -> None:
This does not require input data to be specified.
"""
kwargs = _collectActions(ctx, **kwargs)
script.build(**kwargs)
show = ShowInfo(kwargs.pop("show", []))
script.build(**kwargs, show=show)
if show.unhandled:
print(
"The following '--show' options were not known to the build command: "
f"{', '.join(show.unhandled)}"
)


@click.command(cls=PipetaskCommand, epilog=epilog)
Expand All @@ -116,8 +123,13 @@ def build(ctx: click.Context, **kwargs: Any) -> None:
def qgraph(ctx: click.Context, **kwargs: Any) -> None:
"""Build and optionally save quantum graph."""
kwargs = _collectActions(ctx, **kwargs)
pipeline = script.build(**kwargs)
script.qgraph(pipelineObj=pipeline, **kwargs)
show = ShowInfo(kwargs.pop("show", []))
pipeline = script.build(**kwargs, show=show)
if show.handled and not show.unhandled:
# The show option was given and all options were processed.
# No need to also build the quantum graph.
return
script.qgraph(pipelineObj=pipeline, **kwargs, show=show)


@click.command(cls=PipetaskCommand, epilog=epilog)
Expand All @@ -126,8 +138,17 @@ def qgraph(ctx: click.Context, **kwargs: Any) -> None:
def run(ctx: click.Context, **kwargs: Any) -> None:
"""Build and execute pipeline and quantum graph."""
kwargs = _collectActions(ctx, **kwargs)
pipeline = script.build(**kwargs)
qgraph = script.qgraph(pipelineObj=pipeline, **kwargs)
show = ShowInfo(kwargs.pop("show", []))
pipeline = script.build(**kwargs, show=show)
if show.handled and not show.unhandled:
# The show option was given and all options were processed.
# No need to also build the quantum graph.
return
qgraph = script.qgraph(pipelineObj=pipeline, **kwargs, show=show)
if show.handled:
# The show option was given and all graph options were processed.
# No need to also run the pipeline.
return
script.run(qgraphObj=qgraph, **kwargs)


Expand Down
5 changes: 2 additions & 3 deletions python/lsst/ctrl/mpexec/cli/script/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def build( # type: ignore
Path location of a pipeline definition file in YAML format.
save_pipeline : `str`
Path location for storing resulting pipeline definition in YAML format.
show : `list` [`str`]
show : `lsst.ctrl.mpexec.showInfo.ShowInfo`
Descriptions of what to dump to stdout.
kwargs : `dict` [`str`, `str`]
Ignored; click commands may accept options for more than one script
Expand Down Expand Up @@ -86,7 +86,6 @@ def build( # type: ignore
# Will raise an exception if it fails to build the pipeline.
pipeline = f.makePipeline(args)

args = SimpleNamespace(show=show)
f.showInfo(args, pipeline)
show.show_pipeline_info(pipeline)

return pipeline
4 changes: 1 addition & 3 deletions python/lsst/ctrl/mpexec/cli/script/qgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ def qgraph( # type: ignore
replace_run=replace_run,
prune_replaced=prune_replaced,
data_query=data_query,
show=show,
skip_existing_in=skip_existing_in,
skip_existing=skip_existing,
execution_butler_location=save_execution_butler,
Expand All @@ -200,7 +199,6 @@ def qgraph( # type: ignore
raise RuntimeError("QuantumGraph is empty.")

# optionally dump some info.
if show:
f.showInfo(args, pipelineObj, qgraph)
show.show_graph_info(qgraph, args)

return qgraph
275 changes: 1 addition & 274 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,12 @@
# -------------------------------
import copy
import datetime
import fnmatch
import getpass
import logging
import re
import sys
import warnings
from types import SimpleNamespace
from typing import Any, Iterable, Optional, Tuple
from typing import Iterable, Optional, Tuple

import lsst.pex.config as pexConfig
import lsst.pex.config.history as pexConfigHistory
from lsst.daf.butler import Butler, CollectionSearch, CollectionType, DatasetRef, Registry
from lsst.daf.butler.registry import MissingCollectionError, RegistryDefaults
from lsst.pipe.base import (
Expand Down Expand Up @@ -437,41 +432,6 @@ def makeWriteButler(cls, args: SimpleNamespace, taskDefs: Optional[Iterable[Task
"""


class _FilteredStream:
"""A file-like object that filters some config fields.
Note
----
This class depends on implementation details of ``Config.saveToStream``
methods, in particular that that method uses single call to write()
method to save information about single config field, and that call
combines comments string(s) for a field and field path and value.
This class will not work reliably on the "import" strings, so imports
should be disabled by passing ``skipImports=True`` to ``saveToStream()``.
"""

def __init__(self, pattern: str):
# obey case if pattern isn't lowercase or requests NOIGNORECASE
mat = re.search(r"(.*):NOIGNORECASE$", pattern)

if mat:
pattern = mat.group(1)
self._pattern = re.compile(fnmatch.translate(pattern))
else:
if pattern != pattern.lower():
print(
f'Matching "{pattern}" without regard to case ' "(append :NOIGNORECASE to prevent this)",
file=sys.stdout,
)
self._pattern = re.compile(fnmatch.translate(pattern), re.IGNORECASE)

def write(self, showStr: str) -> None:
# Strip off doc string line(s) and cut off at "=" for string matching
matchStr = showStr.rstrip().split("\n")[-1].split("=")[0]
if self._pattern.search(matchStr):
sys.stdout.write(showStr)


# ------------------------
# Exported definitions --
# ------------------------
Expand Down Expand Up @@ -757,239 +717,6 @@ def runPipeline(
# Do not save fields that are not set.
out.write(report.json(exclude_none=True, indent=2))

def showInfo(
self, args: SimpleNamespace, pipeline: Pipeline, graph: Optional[QuantumGraph] = None
) -> None:
"""Display useful info about pipeline and environment.
Parameters
----------
args : `types.SimpleNamespace`
Parsed command line
pipeline : `Pipeline`
Pipeline definition
graph : `QuantumGraph`, optional
Execution graph
"""
showOpts = args.show
for what in showOpts:
showCommand, _, showArgs = what.partition("=")

if showCommand in ["pipeline", "config", "history", "tasks"]:
if not pipeline:
_LOG.warning("Pipeline is required for --show=%s", showCommand)
continue

if showCommand in ["graph", "workflow", "uri"]:
if not graph:
_LOG.warning("QuantumGraph is required for --show=%s", showCommand)
continue

if showCommand == "pipeline":
print(pipeline)
elif showCommand == "config":
self._showConfig(pipeline, showArgs, False)
elif showCommand == "dump-config":
self._showConfig(pipeline, showArgs, True)
elif showCommand == "history":
self._showConfigHistory(pipeline, showArgs)
elif showCommand == "tasks":
self._showTaskHierarchy(pipeline)
elif showCommand == "graph":
if graph:
self._showGraph(graph)
elif showCommand == "uri":
if graph:
self._showUri(graph, args)
elif showCommand == "workflow":
if graph:
self._showWorkflow(graph, args)
else:
print(
"Unknown value for show: %s (choose from '%s')"
% (what, "', '".join("pipeline config[=XXX] history=XXX tasks graph".split())),
file=sys.stderr,
)
sys.exit(1)

def _showConfig(self, pipeline: Pipeline, showArgs: str, dumpFullConfig: bool) -> None:
"""Show task configuration
Parameters
----------
pipeline : `Pipeline`
Pipeline definition
showArgs : `str`
Defines what to show
dumpFullConfig : `bool`
If true then dump complete task configuration with all imports.
"""
stream: Any = sys.stdout
if dumpFullConfig:
# Task label can be given with this option
taskName = showArgs
else:
# The argument can have form [TaskLabel::][pattern:NOIGNORECASE]
matConfig = re.search(r"^(?:(\w+)::)?(?:config.)?(.+)?", showArgs)
assert matConfig is not None, "regex always matches"
taskName = matConfig.group(1)
pattern = matConfig.group(2)
if pattern:
stream = _FilteredStream(pattern)

tasks = util.filterTasks(pipeline, taskName)
if not tasks:
print("Pipeline has no tasks named {}".format(taskName), file=sys.stderr)
sys.exit(1)

for taskDef in tasks:
print("### Configuration for task `{}'".format(taskDef.label))
taskDef.config.saveToStream(stream, root="config", skipImports=not dumpFullConfig)

def _showConfigHistory(self, pipeline: Pipeline, showArgs: str) -> None:
"""Show history for task configuration
Parameters
----------
pipeline : `Pipeline`
Pipeline definition
showArgs : `str`
Defines what to show
"""

taskName = None
pattern = None
matHistory = re.search(r"^(?:(\w+)::)?(?:config[.])?(.+)", showArgs)
if matHistory:
taskName = matHistory.group(1)
pattern = matHistory.group(2)
if not pattern:
print("Please provide a value with --show history (e.g. history=Task::param)", file=sys.stderr)
sys.exit(1)

tasks = util.filterTasks(pipeline, taskName)
if not tasks:
print(f"Pipeline has no tasks named {taskName}", file=sys.stderr)
sys.exit(1)

found = False
for taskDef in tasks:

config = taskDef.config

# Look for any matches in the config hierarchy for this name
for nmatch, thisName in enumerate(fnmatch.filter(config.names(), pattern)):
if nmatch > 0:
print("")

cpath, _, cname = thisName.rpartition(".")
try:
if not cpath:
# looking for top-level field
hconfig = taskDef.config
else:
hconfig = eval("config." + cpath, {}, {"config": config})
except AttributeError:
print(
f"Error: Unable to extract attribute {cpath} from task {taskDef.label}",
file=sys.stderr,
)
hconfig = None

# Sometimes we end up with a non-Config so skip those
if isinstance(hconfig, (pexConfig.Config, pexConfig.ConfigurableInstance)) and hasattr(
hconfig, cname
):
print(f"### Configuration field for task `{taskDef.label}'")
print(pexConfigHistory.format(hconfig, cname))
found = True

if not found:
print(f"None of the tasks has field matching {pattern}", file=sys.stderr)
sys.exit(1)

def _showTaskHierarchy(self, pipeline: Pipeline) -> None:
"""Print task hierarchy to stdout
Parameters
----------
pipeline: `Pipeline`
"""
for taskDef in pipeline.toExpandedPipeline():
print("### Subtasks for task `{}'".format(taskDef.taskName))

for configName, taskName in util.subTaskIter(taskDef.config):
print("{}: {}".format(configName, taskName))

def _showGraph(self, graph: QuantumGraph) -> None:
"""Print quanta information to stdout
Parameters
----------
graph : `QuantumGraph`
Execution graph.
"""
for taskNode in graph.taskGraph:
print(taskNode)

for iq, quantum in enumerate(graph.getQuantaForTask(taskNode)):
print(" Quantum {}:".format(iq))
print(" inputs:")
for key, refs in quantum.inputs.items():
dataIds = ["DataId({})".format(ref.dataId) for ref in refs]
print(" {}: [{}]".format(key, ", ".join(dataIds)))
print(" outputs:")
for key, refs in quantum.outputs.items():
dataIds = ["DataId({})".format(ref.dataId) for ref in refs]
print(" {}: [{}]".format(key, ", ".join(dataIds)))

def _showWorkflow(self, graph: QuantumGraph, args: SimpleNamespace) -> None:
"""Print quanta information and dependency to stdout
Parameters
----------
graph : `QuantumGraph`
Execution graph.
args : `types.SimpleNamespace`
Parsed command line
"""
for node in graph:
print(f"Quantum {node.nodeId}: {node.taskDef.taskName}")
for parent in graph.determineInputsToQuantumNode(node):
print(f"Parent Quantum {parent.nodeId} - Child Quantum {node.nodeId}")

def _showUri(self, graph: QuantumGraph, args: SimpleNamespace) -> None:
"""Print input and predicted output URIs to stdout
Parameters
----------
graph : `QuantumGraph`
Execution graph
args : `types.SimpleNamespace`
Parsed command line
"""

def dumpURIs(thisRef: DatasetRef) -> None:
primary, components = butler.getURIs(thisRef, predict=True, run="TBD")
if primary:
print(f" {primary}")
else:
print(" (disassembled artifact)")
for compName, compUri in components.items():
print(f" {compName}: {compUri}")

butler = _ButlerFactory.makeReadButler(args)
for node in graph:
print(f"Quantum {node.nodeId}: {node.taskDef.taskName}")
print(" inputs:")
for key, refs in node.quantum.inputs.items():
for ref in refs:
dumpURIs(ref)
print(" outputs:")
for key, refs in node.quantum.outputs.items():
for ref in refs:
dumpURIs(ref)

def _importGraphFixup(self, args: SimpleNamespace) -> Optional[ExecutionGraphFixup]:
"""Import/instantiate graph fixup object.
Expand Down

0 comments on commit 7619b97

Please sign in to comment.