Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-21421 Switch to new Pipeline interface #31

Merged
merged 1 commit into from
Nov 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion python/lsst/ctrl/mpexec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,4 @@
from .quantumGraphExecutor import *
from .singleQuantumExecutor import *
from .taskFactory import *
from .taskLoader import *
from .version import * # Generated by sconsUtils
120 changes: 20 additions & 100 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,12 @@
from lsst.daf.butler import Butler, DatasetRef, Run
import lsst.log
import lsst.pex.config as pexConfig
from lsst.pipe.base import GraphBuilder, PipelineBuilder, Pipeline, QuantumGraph
from lsst.pipe.base import GraphBuilder, Pipeline, QuantumGraph
from .cmdLineParser import makeParser
from .dotTools import graph2dot, pipeline2dot
from .mpGraphExecutor import MPGraphExecutor
from .preExecInit import PreExecInit
from .taskFactory import TaskFactory
from .taskLoader import (TaskLoader, KIND_PIPELINETASK)
from . import util

# ----------------------------------
Expand Down Expand Up @@ -106,16 +105,11 @@ def parseAndRun(self, argv=None):
# First thing to do is to setup logging.
self.configLog(args.longlog, args.loglevel)

taskLoader = TaskLoader(args.packages)
taskFactory = TaskFactory(taskLoader)

if args.subcommand == "list":
# just dump some info about where things may be found
return self.doList(taskLoader, args.show, args.show_headers)
taskFactory = TaskFactory()

# make pipeline out of command line arguments (can return empty pipeline)
try:
pipeline = self.makePipeline(taskFactory, args)
pipeline = self.makePipeline(args)
except Exception as exc:
print("Failed to build pipeline: {}".format(exc), file=sys.stderr)
raise
Expand All @@ -127,7 +121,7 @@ def parseAndRun(self, argv=None):

# make quantum graph
try:
qgraph = self.makeGraph(pipeline, taskFactory, args)
qgraph = self.makeGraph(pipeline, args)
except Exception as exc:
print("Failed to build graph: {}".format(exc), file=sys.stderr)
raise
Expand Down Expand Up @@ -188,139 +182,65 @@ def configLog(longlog, logLevels):
pyLevel = lsst.log.LevelTranslator.lsstLog2logging(level)
logging.getLogger(component).setLevel(pyLevel)

def doList(self, taskLoader, show, show_headers):
"""Implementation of the "list" command.

Parameters
----------
taskLoader : `TaskLoader`
show : `list` of `str`
List of items to show.
show_headers : `bool`
True to display additional headers
"""

if not show:
show = ["pipeline-tasks"]

if "packages" in show:
if show_headers:
print()
print("Modules search path")
print("-------------------")
for pkg in sorted(taskLoader.packages):
print(pkg)

if "modules" in show:
try:
modules = taskLoader.modules()
except ImportError as exc:
print("Failed to import package, check --package option or $PYTHONPATH:", exc,
file=sys.stderr)
return 2
modules = [(name, "package" if flag else "module") for name, flag in sorted(modules)]
headers = None
if show_headers:
print()
headers = ("Module or package name", "Type ")
util.printTable(modules, headers)

if "tasks" in show or "pipeline-tasks" in show:
try:
tasks = taskLoader.tasks()
except ImportError as exc:
print("Failed to import package, check --packages option or PYTHONPATH:", exc,
file=sys.stderr)
return 2

if "tasks" not in show:
# only show pipeline-tasks
tasks = [(name, kind) for name, kind in tasks if kind == KIND_PIPELINETASK]
tasks.sort()

headers = None
if show_headers:
print()
headers = ("Task class name", "Kind ")
util.printTable(tasks, headers)

def makePipeline(self, taskFactory, args):
def makePipeline(self, args):
"""Build a pipeline from command line arguments.

Parameters
----------
taskFactory : `~lsst.pipe.base.TaskFactory`
Task factory.
args : `argparse.Namespace`
Parsed command line

Returns
-------
pipeline : `~lsst.pipe.base.Pipeline`
"""
# read existing pipeline from pickle file
pipeline = None
if args.pipeline:
with open(args.pipeline, 'rb') as pickleFile:
pipeline = pickle.load(pickleFile)
if not isinstance(pipeline, Pipeline):
raise TypeError("Pipeline pickle file has incorrect object type: {}".format(
type(pipeline)))

pipeBuilder = PipelineBuilder(taskFactory, pipeline)
pipeline = Pipeline.fromFile(args.pipeline)
else:
pipeline = Pipeline("anonymous")

# loop over all pipeline actions and apply them in order
for action in args.pipeline_actions:
if action.action == "add_instrument":

pipeline.addInstrument(action.value)

if action.action == "new_task":

pipeBuilder.addTask(action.value, action.label)
pipeline.addTask(action.value, action.label)

elif action.action == "delete_task":

pipeBuilder.deleteTask(action.label)

elif action.action == "move_task":

pipeBuilder.moveTask(action.label, action.value)

elif action.action == "relabel":

pipeBuilder.labelTask(action.label, action.value)
pipeline.removeTask(action.label)

elif action.action == "config":

pipeBuilder.configOverride(action.label, action.value)
pipeline.addConfigOverride(action.label, action.value[0], action.value[1])

elif action.action == "configfile":

pipeBuilder.configOverrideFile(action.label, action.value)
pipeline.addConfigOverrideFile(action.label, action.value)

else:

raise ValueError(f"Unexpected pipeline action: {action.action}")

pipeline = pipeBuilder.pipeline(args.order_pipeline)

if args.save_pipeline:
with open(args.save_pipeline, "wb") as pickleFile:
pickle.dump(pipeline, pickleFile)
pipeline.toFile(args.save_pipeline)

if args.pipeline_dot:
pipeline2dot(pipeline, args.pipeline_dot, taskFactory)
pipeline2dot(pipeline, args.pipeline_dot)

return pipeline

def makeGraph(self, pipeline, taskFactory, args):
def makeGraph(self, pipeline, args):
"""Build a graph from command line arguments.

Parameters
----------
pipeline : `~lsst.pipe.base.Pipeline`
Pipeline, can be empty or ``None`` if graph is read from pickle
file.
taskFactory : `~lsst.pipe.base.TaskFactory`
Task factory.
args : `argparse.Namespace`
Parsed command line

Expand Down Expand Up @@ -380,7 +300,7 @@ def makeGraph(self, pipeline, taskFactory, args):
"types is not currently supported.")

# make execution plan (a.k.a. DAG) for pipeline
graphBuilder = GraphBuilder(taskFactory, butler.registry,
graphBuilder = GraphBuilder(butler.registry,
skipExisting=args.skip_existing,
clobberExisting=args.clobber_output)
qgraph = graphBuilder.makeGraph(pipeline, inputCollections, outputCollection, args.data_query)
Expand Down Expand Up @@ -417,7 +337,7 @@ def runPipeline(self, graph, taskFactory, args, butler=None):
graph : `QuantumGraph`
Execution graph.
taskFactory : `~lsst.pipe.base.TaskFactory`
Task factory.
Task factory
args : `argparse.Namespace`
Parsed command line
butler : `~lsst.daf.butler.Butler`, optional
Expand Down
48 changes: 7 additions & 41 deletions python/lsst/ctrl/mpexec/cmdLineParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ def __repr__(self):

_ACTION_ADD_TASK = _PipelineActionType("new_task", "(?P<value>[^:]+)(:(?P<label>.+))?")
_ACTION_DELETE_TASK = _PipelineActionType("delete_task", "(?P<value>)(?P<label>.+)")
_ACTION_MOVE_TASK = _PipelineActionType("move_task", r"(?P<label>.+):(?P<value>-?\d+)", int)
_ACTION_LABEL_TASK = _PipelineActionType("relabel", "(?P<label>.+):(?P<value>.+)")
_ACTION_CONFIG = _PipelineActionType("config", "(?P<label>.+):(?P<value>.+=.+)")
_ACTION_CONFIG_FILE = _PipelineActionType("configfile", "(?P<label>.+):(?P<value>.+)")
_ACTION_ADD_INSTRUMENT = _PipelineActionType("new_instrument", "(?P<value>[^:]+)")


class _LogLevelAction(Action):
Expand Down Expand Up @@ -244,14 +243,6 @@ def makeParser(fromfile_prefix_chars='@', parser_class=ArgumentParser, **kwargs)

# global options which come before sub-command

group = parser.add_argument_group("Task search options")
group.add_argument("-p", "--package", action="append", dest="packages", default=[],
metavar="NAME1.NAME2.NAME3",
help=("Package to search for task classes. Package name is specified as "
"dot-separated names found in $PYTHON PATH (e.g. lsst.pipe.tasks). "
"It should not include module name. This option overrides default "
"built-in list of modules. It can be used multiple times."))

# butler options
group = parser.add_argument_group("Data repository and selection options")
group.add_argument("-b", "--butler-config", dest="butler_config", default=None, metavar="PATH",
Expand Down Expand Up @@ -311,27 +302,6 @@ def makeParser(fromfile_prefix_chars='@', parser_class=ArgumentParser, **kwargs)
# The issue was fixed in Python 3.6, workaround is not need starting with that version
subparsers.required = True

# list sub-command
subparser = subparsers.add_parser("list",
usage="%(prog)s [options]",
description="Display information about tasks and where they are "
"found. If none of the options are specified then --pipeline-tasks "
"is used by default")
subparser.set_defaults(subparser=subparser)
subparser.add_argument("-p", "--packages", dest="show", action="append_const", const="packages",
help="Shows list of the packages to search for tasks")
subparser.add_argument("-m", "--modules", dest="show", action="append_const", const='modules',
help="Shows list of all modules existing in current list of packages")
subparser.add_argument("-t", "--tasks", dest="show", action="append_const", const="tasks",
help="Shows list of all tasks (any sub-class of Task) existing"
" in current list of packages")
subparser.add_argument("-l", "--pipeline-tasks", dest="show", action="append_const",
const="pipeline-tasks",
help=("(default) Shows list of all PipelineTasks existing in current set"
" of packages"))
subparser.add_argument("--no-headers", dest="show_headers", action="store_false", default=True,
help="Do not display any headers on output")

for subcommand in ("build", "qgraph", "run"):
# show/run sub-commands, they are all identical except for the
# command itself and description
Expand Down Expand Up @@ -375,21 +345,13 @@ def makeParser(fromfile_prefix_chars='@', parser_class=ArgumentParser, **kwargs)
metavar="PATH")
subparser.add_argument("-t", "--task", metavar="TASK[:LABEL]",
dest="pipeline_actions", action='append', type=_ACTION_ADD_TASK,
help="Task name to add to pipeline, can be either full name "
"with dots including package and module name, or a simple name "
"to find the class in one of the modules in pre-defined packages "
"(see --packages option). Task name can be followed by colon and "
help="Task name to add to pipeline, must be a fully qualified task name. "
"Task name can be followed by colon and "
"label name, if label is not given than task base name (class name) "
"is used as label.")
subparser.add_argument("-d", "--delete", metavar="LABEL",
dest="pipeline_actions", action='append', type=_ACTION_DELETE_TASK,
help="Delete task with given label from pipeline.")
subparser.add_argument("-m", "--move", metavar="LABEL:NUMBER",
dest="pipeline_actions", action='append', type=_ACTION_MOVE_TASK,
help="Move given task to a different position in a pipeline.")
subparser.add_argument("-l", "--label", metavar="LABEL:NEW_LABEL",
dest="pipeline_actions", action='append', type=_ACTION_LABEL_TASK,
help="Change label of a given task.")
subparser.add_argument("-c", "--config", metavar="LABEL:NAME=VALUE",
dest="pipeline_actions", action='append', type=_ACTION_CONFIG,
help="Configuration override(s) for a task with specified label, "
Expand All @@ -402,6 +364,10 @@ def makeParser(fromfile_prefix_chars='@', parser_class=ArgumentParser, **kwargs)
help="Order tasks in pipeline based on their data dependencies, "
"ordering is performed as last step before saving or executing "
"pipeline.")
subparser.add_argument("--instrument", metavar="instrument",
dest="pipeline_actions", action="append", type=_ACTION_ADD_INSTRUMENT,
help="Add an instrument which will be used to load config overrides when"
"defining a pipeline. This must be the fully qualified class name")
if subcommand in ("qgraph", "run"):
group = subparser.add_mutually_exclusive_group()
group.add_argument("--skip-existing", dest="skip_existing",
Expand Down
37 changes: 4 additions & 33 deletions python/lsst/ctrl/mpexec/dotTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,13 @@
# Imports for other modules --
# -----------------------------
from lsst.daf.butler import DimensionUniverse
from lsst.pipe.base import iterConnections
from lsst.pipe.base import iterConnections, Pipeline

# ----------------------------------
# Local non-exported definitions --
# ----------------------------------


def _loadTaskClass(taskDef, taskFactory):
"""Import task class if necessary.

Parameters
----------
taskDef : `TaskDef`
taskFactory : `TaskFactory`

Raises
------
`ImportError` is raised when task class cannot be imported.
`MissingTaskFactoryError` is raised when TaskFactory is needed but not provided.
"""
taskClass = taskDef.taskClass
if not taskClass:
if not taskFactory:
raise MissingTaskFactoryError("Task class is not defined but task "
"factory instance is not provided")
taskClass = taskFactory.loadTaskClass(taskDef.taskName)
return taskClass


def _renderTaskNode(nodeName, taskDef, file, idx=None):
"""Render GV node for a task"""
label = [taskDef.taskName.rpartition('.')[-1]]
Expand Down Expand Up @@ -133,12 +111,6 @@ def _makeDSNode(dsRef, allDatasetRefs, file):
# ------------------------


class MissingTaskFactoryError(Exception):
"""Exception raised when client fails to provide TaskFactory instance.
"""
pass


def graph2dot(qgraph, file):
"""Convert QuantumGraph into GraphViz digraph.

Expand Down Expand Up @@ -193,7 +165,7 @@ def graph2dot(qgraph, file):
file.close()


def pipeline2dot(pipeline, file, taskFactory=None):
def pipeline2dot(pipeline, file):
"""Convert Pipeline into GraphViz digraph.

This method is mostly for documentation/presentation purposes.
Expand All @@ -206,9 +178,6 @@ def pipeline2dot(pipeline, file, taskFactory=None):
file : str or file object
File where GraphViz graph (DOT language) is written, can be a file name
or file object.
taskFactory: `pipe.base.TaskFactory`, optional
Instance of an object which knows how to import task classes. It is only
used if pipeline task definitions do not define task classes.

Raises
------
Expand All @@ -228,6 +197,8 @@ def pipeline2dot(pipeline, file, taskFactory=None):
print("digraph Pipeline {", file=file)

allDatasets = set()
if isinstance(pipeline, Pipeline):
pipeline = pipeline.toExpandedPipeline()
for idx, taskDef in enumerate(pipeline):

# node for a task
Expand Down