Skip to content

Commit

Permalink
Merge branch 'tickets/DM-30266'
Browse files Browse the repository at this point in the history
  • Loading branch information
natelust committed Dec 12, 2021
2 parents 2bbf922 + b83c5ca commit 7bb07dd
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 20 deletions.
3 changes: 2 additions & 1 deletion python/lsst/ctrl/mpexec/cli/opt/optionGroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def __init__(self):
ctrlMpExecOpts.qgraph_dot_option(),
ctrlMpExecOpts.save_execution_butler_option(),
ctrlMpExecOpts.clobber_execution_butler_option(),
ctrlMpExecOpts.dataset_query_constraint()]
ctrlMpExecOpts.dataset_query_constraint(),
ctrlMpExecOpts.qgraph_header_data_option()]


class butler_options(OptionGroup): # noqa: N801
Expand Down
17 changes: 11 additions & 6 deletions python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,19 @@ def _to_int(value):
# I wanted to use default=None here to match Python API but click silently
# replaces None with an empty tuple when multiple=True.
qgraph_node_id_option = MWOptionDecorator("--qgraph-node-id",
callback=_split_commas_int,
callback=split_commas,
multiple=True,
help=unwrap("""Only load a specified set of nodes when graph is
loaded from a file, nodes are identified by integer
IDs. One or more comma-separated integers are accepted.
By default all nodes are loaded. Ignored if graph is
not loaded from a file."""))

loaded from a file, nodes are identified by UUID
values. One or more comma-separated integers are
accepted. By default all nodes are loaded. Ignored if
graph is not loaded from a file."""))

qgraph_header_data_option = MWOptionDecorator("--show-qgraph-header",
is_flag=True,
default=False,
help=unwrap("""Print the headerData for Quantum Graph to the
console"""))

qgraph_dot_option = MWOptionDecorator("--qgraph-dot",
help=unwrap("""Location for storing GraphViz DOT representation of a
Expand Down
6 changes: 5 additions & 1 deletion python/lsst/ctrl/mpexec/cli/script/qgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
def qgraph(pipelineObj, qgraph, qgraph_id, qgraph_node_id, skip_existing_in, skip_existing, save_qgraph,
save_single_quanta, qgraph_dot, butler_config, input, output, output_run, extend_run,
replace_run, prune_replaced, data_query, show, save_execution_butler, clobber_execution_butler,
clobber_outputs, dataset_query_constraint, **kwargs):
clobber_outputs, dataset_query_constraint, show_qgraph_header=False, **kwargs):
"""Implements the command line interface `pipetask qgraph` subcommand,
should only be called by command line tools and unit test code that test
this function.
Expand Down Expand Up @@ -118,6 +118,9 @@ def qgraph(pipelineObj, qgraph, qgraph_id, qgraph_node_id, skip_existing_in, ski
Control constraining graph building using pre-existing dataset types.
Valid values are off, all, or a comma separated list of dataset type
names.
show_qgraph_header : bool, optional
Controls if the headerData of a QuantumGraph should be printed to the
terminal. Defaults to False.
kwargs : `dict` [`str`, `str`]
Ignored; click commands may accept options for more than one script
function and pass all the option kwargs to each of the script functions
Expand Down Expand Up @@ -150,6 +153,7 @@ def qgraph(pipelineObj, qgraph, qgraph_id, qgraph_node_id, skip_existing_in, ski
clobber_execution_butler=clobber_execution_butler,
clobber_outputs=clobber_outputs,
dataset_query_constraint=dataset_query_constraint,
show_qgraph_header=show_qgraph_header,
)

f = CmdLineFwk()
Expand Down
14 changes: 8 additions & 6 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,9 +555,9 @@ def makeGraph(self, pipeline, args):
# pipeline can not be provided in this case
if pipeline:
raise ValueError("Pipeline must not be given when quantum graph is read from file.")

if args.show_qgraph_header:
print(QuantumGraph.readHeader(args.qgraph))
else:

# make execution plan (a.k.a. DAG) for pipeline
graphBuilder = GraphBuilder(registry,
skipExistingIn=args.skip_existing_in,
Expand All @@ -570,6 +570,8 @@ def makeGraph(self, pipeline, args):
"time": f"{datetime.datetime.now()}"}
qgraph = graphBuilder.makeGraph(pipeline, collections, run, args.data_query, metadata=metadata,
datasetQueryConstraint=args.dataset_query_constraint)
if args.show_qgraph_header:
print(qgraph.buildAndPrintHeader())

# Count quanta in graph and give a warning if it's empty and return
# None.
Expand All @@ -587,7 +589,7 @@ def makeGraph(self, pipeline, args):
if args.save_single_quanta:
for quantumNode in qgraph:
sqgraph = qgraph.subset(quantumNode)
uri = args.save_single_quanta.format(quantumNode.nodeId.number)
uri = args.save_single_quanta.format(quantumNode)
sqgraph.saveUri(uri)

if args.qgraph_dot:
Expand Down Expand Up @@ -870,9 +872,9 @@ def _showWorkflow(self, graph, args):
Parsed command line
"""
for node in graph:
print(f"Quantum {node.nodeId.number}: {node.taskDef.taskName}")
print(f"Quantum {node.nodeId}: {node.taskDef.taskName}")
for parent in graph.determineInputsToQuantumNode(node):
print(f"Parent Quantum {parent.nodeId.number} - Child Quantum {node.nodeId.number}")
print(f"Parent Quantum {parent.nodeId} - Child Quantum {node.nodeId}")

def _showUri(self, graph, args):
"""Print input and predicted output URIs to stdout
Expand All @@ -895,7 +897,7 @@ def dumpURIs(thisRef):

butler = _ButlerFactory.makeReadButler(args)
for node in graph:
print(f"Quantum {node.nodeId.number}: {node.taskDef.taskName}")
print(f"Quantum {node.nodeId}: {node.taskDef.taskName}")
print(" inputs:")
for key, refs in node.quantum.inputs.items():
for ref in refs:
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def start(self, butler, quantumExecutor, startMethod=None):
self.process = mp_ctx.Process(
target=_Job._executeJob,
args=(quantumExecutor, taskDef, quantum_pickle, butler, logConfigState),
name=f"task-{self.qnode.nodeId.number}"
name=f"task-{self.qnode.quantum.dataId}"
)
self.process.start()
self.started = time.time()
Expand Down
48 changes: 43 additions & 5 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
_ACTION_ADD_INSTRUMENT,
PipetaskCommand,
)
from lsst.daf.butler import Config, Quantum, Registry
from lsst.daf.butler import (Config, Quantum, Registry, DimensionUniverse, DatasetRef, DataCoordinate)
from lsst.daf.butler.core.datasets.type import DatasetType
from lsst.daf.butler.registry import RegistryConfig
from lsst.obs.base import Instrument
import lsst.pex.config as pexConfig
Expand All @@ -58,7 +59,8 @@
makeSimpleButler,
makeSimplePipeline,
makeSimpleQGraph,
populateButler)
populateButler,
AddTask)
from lsst.utils.tests import temporaryDirectory


Expand Down Expand Up @@ -207,9 +209,45 @@ def _makeQGraph():
-------
qgraph : `~lsst.pipe.base.QuantumGraph`
"""
taskDef = TaskDef(taskName=_TASK_CLASS, config=SimpleConfig(), label="test")
config = Config({
"version": 1,
"skypix": {
"common": "htm7",
"htm": {
"class": "lsst.sphgeom.HtmPixelization",
"max_level": 24,
}
},
"elements": {
"A": {
"keys": [{
"name": "id",
"type": "int",
}],
"storage": {
"cls": "lsst.daf.butler.registry.dimensions.table.TableDimensionRecordStorage",
},
},
"B": {
"keys": [{
"name": "id",
"type": "int",
}],
"storage": {
"cls": "lsst.daf.butler.registry.dimensions.table.TableDimensionRecordStorage",
},
}
},
"packers": {}
})
universe = DimensionUniverse(config=config)
fakeDSType = DatasetType("A", tuple(), storageClass="ExposureF", universe=universe)
taskDef = TaskDef(taskName=_TASK_CLASS, config=AddTask.ConfigClass(), taskClass=AddTask)
quanta = [Quantum(taskName=_TASK_CLASS,
inputs={FakeDSType("A"): [FakeDSRef("A", (1, 2))]})] # type: ignore
inputs={fakeDSType:
[DatasetRef(fakeDSType,
DataCoordinate.standardize({"A": 1, "B": 2},
universe=universe))]})] # type: ignore
qgraph = QuantumGraph({taskDef: set(quanta)})
return qgraph

Expand Down Expand Up @@ -727,7 +765,7 @@ def testSubgraph(self):
# Select first two nodes for execution. This depends on node ordering
# which I assume is the same as execution order.
nNodes = 2
nodeIds = [node.nodeId.number for node in qgraph]
nodeIds = [node.nodeId for node in qgraph]
nodeIds = nodeIds[:nNodes]

self.assertEqual(len(qgraph.taskGraph), self.nQuanta)
Expand Down

0 comments on commit 7bb07dd

Please sign in to comment.