Skip to content

Commit

Permalink
Merge pull request #180 from lsst/tickets/DM-33493
Browse files Browse the repository at this point in the history
DM-33493: Add datastore records to generated quantum graph
  • Loading branch information
andy-slac committed Apr 21, 2022
2 parents 7b4de3a + d318c66 commit b02ad94
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 9 deletions.
1 change: 1 addition & 0 deletions python/lsst/ctrl/mpexec/cli/opt/optionGroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(self):
ctrlMpExecOpts.qgraph_option(),
ctrlMpExecOpts.qgraph_id_option(),
ctrlMpExecOpts.qgraph_node_id_option(),
ctrlMpExecOpts.qgraph_datastore_records_option(),
ctrlMpExecOpts.skip_existing_in_option(),
ctrlMpExecOpts.skip_existing_option(),
ctrlMpExecOpts.clobber_outputs_option(),
Expand Down
11 changes: 11 additions & 0 deletions python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,17 @@
)


qgraph_datastore_records_option = MWOptionDecorator(
"--qgraph-datastore-records",
help=unwrap(
"""Include datastore records into generated quantum graph, these records are used by a
quantum-backed butler.
"""
),
is_flag=True,
)


# I wanted to use default=None here to match Python API but click silently
# replaces None with an empty tuple when multiple=True.
qgraph_node_id_option = MWOptionDecorator(
Expand Down
4 changes: 4 additions & 0 deletions python/lsst/ctrl/mpexec/cli/script/qgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def qgraph(
qgraph,
qgraph_id,
qgraph_node_id,
qgraph_datastore_records,
skip_existing_in,
skip_existing,
save_qgraph,
Expand Down Expand Up @@ -73,6 +74,8 @@ def qgraph(
qgraph_node_id : `list` of `int`, optional
Only load a specified set of nodes if graph is loaded from a file,
nodes are identified by integer IDs.
qgraph_datastore_records : `bool`
If True then include datastore records into generated quanta.
skip_existing_in : `list` [ `str` ]
Accepts list of collections, if all Quantum outputs already exist in
the specified list of collections then that Quantum will be excluded
Expand Down Expand Up @@ -158,6 +161,7 @@ def qgraph(
qgraph=qgraph,
qgraph_id=qgraph_id,
qgraph_node_id=qgraph_node_id,
qgraph_datastore_records=qgraph_datastore_records,
save_qgraph=save_qgraph,
save_single_quanta=save_single_quanta,
qgraph_dot=qgraph_dot,
Expand Down
21 changes: 12 additions & 9 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def check(self, args: argparse.Namespace):
@classmethod
def _makeReadParts(cls, args: argparse.Namespace):
"""Common implementation for `makeReadButler` and
`makeRegistryAndCollections`.
`makeButlerAndCollections`.
Parameters
----------
Expand Down Expand Up @@ -325,9 +325,9 @@ def makeReadButler(cls, args: argparse.Namespace) -> Butler:
return Butler(butler=butler, collections=inputs)

@classmethod
def makeRegistryAndCollections(
def makeButlerAndCollections(
cls, args: argparse.Namespace
) -> Tuple[Registry, CollectionSearch, Optional[str]]:
) -> Tuple[Butler, CollectionSearch, Optional[str]]:
"""Return a read-only registry, a collection search path, and the name
of the run to be used for future writes.
Expand All @@ -339,8 +339,8 @@ def makeRegistryAndCollections(
Returns
-------
registry : `lsst.daf.butler.Registry`
Butler registry that collections will be added to and/or queried
butler : `lsst.daf.butler.Butler`
A read-only butler that collections will be added to and/or queried
from.
inputs : `lsst.daf.butler.registry.CollectionSearch`
Collections to search for datasets.
Expand All @@ -351,7 +351,7 @@ def makeRegistryAndCollections(
butler, inputs, self = cls._makeReadParts(args)
run = self.outputRun.name if args.extend_run else None
_LOG.debug("Preparing registry to read from %s and expect future writes to '%s'.", inputs, run)
return butler.registry, inputs, run
return butler, inputs, run

@classmethod
def makeWriteButler(
Expand Down Expand Up @@ -559,7 +559,7 @@ def makeGraph(self, pipeline, args):
if args.extend_run:
args.skip_existing = True

registry, collections, run = _ButlerFactory.makeRegistryAndCollections(args)
butler, collections, run = _ButlerFactory.makeButlerAndCollections(args)

if args.skip_existing and run:
args.skip_existing_in += (run,)
Expand All @@ -568,7 +568,7 @@ def makeGraph(self, pipeline, args):
# click passes empty tuple as default value for qgraph_node_id
nodes = args.qgraph_node_id or None
qgraph = QuantumGraph.loadUri(
args.qgraph, registry.dimensions, nodes=nodes, graphID=args.qgraph_id
args.qgraph, butler.registry.dimensions, nodes=nodes, graphID=args.qgraph_id
)

# pipeline can not be provided in this case
Expand All @@ -579,7 +579,10 @@ def makeGraph(self, pipeline, args):
else:
# make execution plan (a.k.a. DAG) for pipeline
graphBuilder = GraphBuilder(
registry, skipExistingIn=args.skip_existing_in, clobberOutputs=args.clobber_outputs
butler.registry,
skipExistingIn=args.skip_existing_in,
clobberOutputs=args.clobber_outputs,
datastore=butler.datastore if args.qgraph_datastore_records else None,
)
# accumulate metadata
metadata = {
Expand Down
30 changes: 30 additions & 0 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,36 @@ def testShowGraphWorkflow(self):
# butler from command line options and there is no way to pass butler
# mock to that code.

def testSimpleQGraphDatastoreRecords(self):
"""Test quantum graph generation with --qgraph-datastore-records."""
args = _makeArgs(
butler_config=self.root, input="test", output="output", qgraph_datastore_records=True
)
butler = makeSimpleButler(self.root, run=args.input, inMemory=False)
populateButler(self.pipeline, butler)

fwk = CmdLineFwk()
qgraph = fwk.makeGraph(self.pipeline, args)
self.assertEqual(len(qgraph), self.nQuanta)
for i, qnode in enumerate(qgraph):
quantum = qnode.quantum
self.assertIsNotNone(quantum.datastore_records)
# only the first quantum has a pre-existing input
if i == 0:
datastore_name = "FileDatastore@<butlerRoot>"
self.assertEqual(set(quantum.datastore_records.keys()), {datastore_name})
records_data = quantum.datastore_records[datastore_name]
records = dict(records_data.records)
self.assertEqual(len(records), 1)
_, records = records.popitem()
records = records["file_datastore_records"]
self.assertEqual(
[record.path for record in records],
["test/add_dataset0/add_dataset0_INSTR_det0_test.pickle"],
)
else:
self.assertEqual(quantum.datastore_records, {})


class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
pass
Expand Down

0 comments on commit b02ad94

Please sign in to comment.