Skip to content

Commit

Permalink
Update code to support new QuantumGraph type
Browse files Browse the repository at this point in the history
  • Loading branch information
natelust committed Sep 28, 2020
1 parent 93337f7 commit a383768
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 148 deletions.
17 changes: 9 additions & 8 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,20 +633,21 @@ def makeGraph(self, pipeline, args):
qgraph = graphBuilder.makeGraph(pipeline, collections, run, args.data_query)

# count quanta in graph and give a warning if it's empty and return None
nQuanta = qgraph.countQuanta()
nQuanta = len(qgraph)
if nQuanta == 0:
warnings.warn("QuantumGraph is empty", stacklevel=2)
return None
else:
_LOG.info("QuantumGraph contains %d quanta for %d tasks",
nQuanta, len(qgraph))
nQuanta, len(qgraph.taskGraph))

if args.save_qgraph:
with open(args.save_qgraph, "wb") as pickleFile:
qgraph.save(pickleFile)

if args.save_single_quanta:
for iq, sqgraph in enumerate(qgraph.quantaAsQgraph()):
for iq, quantumNode in enumerate(qgraph):
sqgraph = qgraph.subset(quantumNode)
filename = args.save_single_quanta.format(iq)
with open(filename, "wb") as pickleFile:
sqgraph.save(pickleFile)
Expand Down Expand Up @@ -860,13 +861,13 @@ def _showGraph(self, graph):
graph : `QuantumGraph`
Execution graph.
"""
for taskNodes in graph:
print(taskNodes.taskDef)
for taskNode in graph.taskGraph:
print(taskNode)

for iq, quantum in enumerate(taskNodes.quanta):
for iq, quantum in enumerate(graph.getQuantaForTask(taskNode)):
print(" Quantum {}:".format(iq))
print(" inputs:")
for key, refs in quantum.predictedInputs.items():
for key, refs in quantum.inputs.items():
dataIds = ["DataId({})".format(ref.dataId) for ref in refs]
print(" {}: [{}]".format(key, ", ".join(dataIds)))
print(" outputs:")
Expand Down Expand Up @@ -900,7 +901,7 @@ def dumpURIs(thisRef):
shortname = qdata.taskDef.taskName.split('.')[-1]
print("Quantum {}: {}".format(qdata.index, shortname))
print(" inputs:")
for key, refs in qdata.quantum.predictedInputs.items():
for key, refs in qdata.quantum.inputs.items():
for ref in refs:
dumpURIs(ref)
print(" outputs:")
Expand Down
9 changes: 4 additions & 5 deletions python/lsst/ctrl/mpexec/dotTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,17 @@ def graph2dot(qgraph, file):
print("digraph QuantumGraph {", file=file)

allDatasetRefs = {}
for taskId, nodes in enumerate(qgraph):
for taskId, taskDef in enumerate(qgraph.taskGraph):

taskDef = nodes.taskDef

for qId, quantum in enumerate(nodes.quanta):
quanta = qgraph.getQuantaForTask(taskDef)
for qId, quantum in enumerate(quanta):

# node for a task
taskNodeName = "task_{}_{}".format(taskId, qId)
_renderTaskNode(taskNodeName, taskDef, file)

# quantum inputs
for dsRefs in quantum.predictedInputs.values():
for dsRefs in quantum.inputs.values():
for dsRef in dsRefs:
nodeName = _makeDSNode(dsRef, allDatasetRefs, file)
_renderEdge(nodeName, taskNodeName, file)
Expand Down
52 changes: 30 additions & 22 deletions python/lsst/ctrl/mpexec/execFixupDataId.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

__all__ = ['ExecutionGraphFixup']

from typing import Any, Iterable, Sequence, Tuple, Union
from collections import defaultdict
import networkx as nx
from typing import Sequence, Union, Tuple, Any

from lsst.pipe.base import QuantumIterData
from lsst.pipe.base import QuantumGraph, QuantumNode
from .executionGraphFixup import ExecutionGraphFixup


Expand Down Expand Up @@ -75,37 +77,43 @@ def __init__(self, taskLabel: str, dimensions: Union[str, Sequence[str]], revers
else:
self.dimensions = tuple(self.dimensions)

def _key(self, qdata: QuantumIterData) -> Tuple[Any, ...]:
def _key(self, qnode: QuantumNode) -> Tuple[Any, ...]:
"""Produce comparison key for quantum data.
Parameters
----------
qdata : `QuantumIterData`
qnode : `QuantumNode`
An individual node in a `~lsst.pipe.base.QuantumGraph`
Returns
-------
key : `tuple`
"""
dataId = qdata.quantum.dataId
dataId = qnode.quantum.dataId
key = tuple(dataId[dim] for dim in self.dimensions)
return key

def fixupQuanta(self, quanta: Iterable[QuantumIterData]) -> Iterable[QuantumIterData]:
# Docstring inherited from ExecutionGraphFixup.fixupQuanta
quanta = list(quanta)
# Index task quanta by the key
keyQuanta = {}
for qdata in quanta:
if qdata.taskDef.label == self.taskLabel:
key = self._key(qdata)
keyQuanta.setdefault(key, []).append(qdata)
if not keyQuanta:
def fixupQuanta(self, graph: QuantumGraph) -> QuantumGraph:
taskDef = graph.findTaskDefByLabel(self.taskLabel)
if taskDef is None:
raise ValueError(f"Cannot find task with label {self.taskLabel}")
# order keys
quanta = list(graph.quantaForTask(taskDef))
keyQuanta = defaultdict(list)
for q in quanta:
key = self._key(q)
keyQuanta[key].append(q)
keys = sorted(keyQuanta.keys(), reverse=self.reverse)
# for each quanta in a key add dependency to all quanta in a preceding key
networkGraph = graph.graph

for prev_key, key in zip(keys, keys[1:]):
prev_indices = frozenset(qdata.index for qdata in keyQuanta[prev_key])
for qdata in keyQuanta[key]:
qdata.dependencies |= prev_indices
return quanta
for prev_node in keyQuanta[prev_key]:
for node in keyQuanta[key]:
# remove any existing edges between the two nodes, but
# don't fail if there are not any. Both directions need
# tried because in a directed graph, order maters
try:
networkGraph.remove_edge(node, prev_node)
networkGraph.remove_edge(prev_node, node)
except nx.NetworkXException:
pass
networkGraph.add_edge(prev_node, node)
return graph
19 changes: 8 additions & 11 deletions python/lsst/ctrl/mpexec/executionGraphFixup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
__all__ = ['ExecutionGraphFixup']

from abc import ABC, abstractmethod
from typing import Iterable

from lsst.pipe.base import QuantumIterData
from lsst.pipe.base import QuantumGraph


class ExecutionGraphFixup(ABC):
Expand All @@ -44,22 +43,20 @@ class ExecutionGraphFixup(ABC):
"""

@abstractmethod
def fixupQuanta(self, quanta: Iterable[QuantumIterData]) -> Iterable[QuantumIterData]:
def fixupQuanta(self, graph: QuantumGraph) -> QuantumGraph:
"""Update quanta in a graph.
Potentially anything in the graph could be changed if it does not
break executor assumptions. Returned quanta will be re-ordered by
executor, if modifications result in a dependency cycle the executor
will raise an exception.
break executor assumptions. If modifications result in a dependency
cycle the executor will raise an exception.
Parameters
----------
quanta : iterable [`~lsst.pipe.base.QuantumIterData`]
Iterable of topologically ordered quanta as returned from
`lsst.pipe.base.QuantumGraph.traverse` method.
graph : QuantumGraph
Quantum Graph that will be executed by the executor
Yields
Returns
------
quantum : `~lsst.pipe.base.QuantumIterData`
graph : QuantumGraph
"""
raise NotImplementedError

0 comments on commit a383768

Please sign in to comment.