Skip to content

Commit

Permalink
Add pipetask option to execute graph subset (DM-27667)
Browse files Browse the repository at this point in the history
New option `--qgraph-node-id` is added which takes a bunch of integers
for node numbers to execute from a full graph. Another new option
`--qgraph-id` can be used to specify graph ID to verify it after loading
from pickle file.
  • Loading branch information
andy-slac committed Nov 23, 2020
1 parent 64d3e0a commit 0be18b6
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ tests/.tests
pytest_session.txt
.cache/
.pytest_cache
.coverage
.coverage*
4 changes: 3 additions & 1 deletion python/lsst/ctrl/mpexec/cli/opt/optionGroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(self):
self.decorators = [
option_section(sectionText="Quantum graph building options:"),
ctrlMpExecOpts.qgraph_option(),
ctrlMpExecOpts.qgraph_id_option(),
ctrlMpExecOpts.skip_existing_option(),
ctrlMpExecOpts.save_qgraph_option(),
ctrlMpExecOpts.save_single_quanta_option(),
Expand Down Expand Up @@ -109,7 +110,8 @@ def __init__(self):
ctrlMpExecOpts.start_method_option(),
ctrlMpExecOpts.timeout_option(),
ctrlMpExecOpts.fail_fast_option(),
ctrlMpExecOpts.graph_fixup_option()]
ctrlMpExecOpts.graph_fixup_option(),
ctrlMpExecOpts.qgraph_node_id_option()]


class meta_info_options(OptionGroup): # noqa: N801
Expand Down
17 changes: 17 additions & 0 deletions python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,23 @@
type=MWPath(exists=True, file_okay=True, dir_okay=False, readable=True))


qgraph_id_option = MWOptionDecorator("--qgraph-id",
help=unwrap("""Quantum graph identifier, graph loaded from a pickle file
is required to have this identifier. Ignored if graph is not
loaded from pickle file."""))


qgraph_node_id_option = MWOptionDecorator("--qgraph-node-id",
default=None,
type=int,
callback=split_commas,
multiple=True,
help=unwrap("""Only execute a specified set of nodes in a quantum
graph, nodes are identified by integer IDs. One or more
comma-separated integers are accepted. By default all
nodes are executed."""))


qgraph_dot_option = MWOptionDecorator("--qgraph-dot",
help=unwrap("""Location for storing GraphViz DOT representation of a
quantum graph."""),
Expand Down
45 changes: 45 additions & 0 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,11 @@ def makeGraph(self, pipeline, args):
with open(args.qgraph, 'rb') as pickleFile:
qgraph = QuantumGraph.load(pickleFile, registry.dimensions)

if args.qgraph_id:
if qgraph.buildId != args.qgraph_id:
raise ValueError(f"Mismatch in graph build ID, expected: '{args.qgraph_id}',"
f" read from pickle: '{qgraph.buildId}'.")

# pipeline can not be provided in this case
if pipeline:
raise ValueError("Pipeline must not be given when quantum graph is read from file.")
Expand Down Expand Up @@ -542,6 +547,42 @@ def makeGraph(self, pipeline, args):

return qgraph

def _makeSubgraph(self, graph, nodeIds):
"""Extract execution subgraph based on supplied node IDs.
Parameters
----------
graph : `QuantumGraph`
Execution graph.
nodeIds : iterable of `int`, or `None`
Node IDs to include into subgraph, empty or `None` means to
include whole graph.
Returns
-------
graph : `QuantumGraph`
Execution sub-graph.
"""
if nodeIds is None:
return graph

nodeIds = frozenset(nodeIds)
nodes = [node for node in graph if node.nodeId.number in nodeIds]

if not nodes:
# nothing is selected, most likely a user error
raise ValueError(f"None of the specified node IDs was found in a graph: {nodeIds}")

notFound = nodeIds - set(node.nodeId.number for node in nodes)
if notFound:
# it is probably an error but don't make big fuss
_LOG.warning("Few of the specified node IDs were not found in a graph: %s", notFound)

# make a subgraph
graph = graph.subset(nodes)

return graph

def runPipeline(self, graph, taskFactory, args, butler=None):
"""Execute complete QuantumGraph.
Expand All @@ -557,6 +598,10 @@ def runPipeline(self, graph, taskFactory, args, butler=None):
Data Butler instance, if not defined then new instance is made
using command line options.
"""
# build a subgraph if node IDs are specified
if args.qgraph_node_id:
graph = self._makeSubgraph(graph, args.qgraph_node_id)

# make butler instance
if butler is None:
butler = _ButlerFactory.makeWriteButler(args)
Expand Down
32 changes: 32 additions & 0 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,15 @@ def testMakeGraphFromPickle(self):
self.assertIsInstance(qgraph, QuantumGraph)
self.assertEqual(len(qgraph), 1)

# will fail if graph id does not match
args = _makeArgs(
qgraph=tmpname,
qgraph_id="R2-D2 is that you?",
registryConfig=registryConfig
)
with self.assertRaisesRegex(ValueError, "Mismatch in graph build ID"):
fwk.makeGraph(None, args)

# pickle with wrong object type
with open(tmpname, "wb") as pickleFile:
pickle.dump({}, pickleFile)
Expand Down Expand Up @@ -567,6 +576,29 @@ def testSimpleQGraphReplaceRun(self):
refs = butler.registry.queryDatasets(..., collections="output/run4")
self.assertEqual(len(list(refs)), n_outputs)

def testSubgraph(self):
"""Test successfull execution of trivial quantum graph.
"""
nQuanta = 5
nNodes = 2
butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root)

# Select first two nodes for execution. This depends on node ordering
# which I assume is the same as execution order.
nodeIds = [node.nodeId.number for node in qgraph]
nodeIds = nodeIds[:nNodes]

self.assertEqual(len(qgraph.taskGraph), 5)
self.assertEqual(len(qgraph), nQuanta)

args = _makeArgs(qgraph_node_id=nodeIds)
fwk = CmdLineFwk()
taskFactory = AddTaskFactoryMock()

# run whole thing
fwk.runPipeline(qgraph, taskFactory, args, butler=butler)
self.assertEqual(taskFactory.countExec, nNodes)

def testShowGraph(self):
"""Test for --show options for quantum graph.
"""
Expand Down

0 comments on commit 0be18b6

Please sign in to comment.