Skip to content

Commit

Permalink
Merge branch 'tickets/DM-28649'
Browse files Browse the repository at this point in the history
  • Loading branch information
natelust committed May 18, 2021
2 parents 78d2a6e + 7e61ddd commit 82f2559
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 9 deletions.
4 changes: 3 additions & 1 deletion python/lsst/ctrl/mpexec/cli/opt/optionGroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ def __init__(self):
ctrlMpExecOpts.skip_existing_option(),
ctrlMpExecOpts.save_qgraph_option(),
ctrlMpExecOpts.save_single_quanta_option(),
ctrlMpExecOpts.qgraph_dot_option()]
ctrlMpExecOpts.qgraph_dot_option(),
ctrlMpExecOpts.save_execution_butler_option(),
ctrlMpExecOpts.clobber_execution_butler_option()]


class butler_options(OptionGroup): # noqa: N801
Expand Down
10 changes: 10 additions & 0 deletions python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,13 @@ def _to_int(value):
help=unwrap("""Stop processing at first error, default is to process
as many tasks as possible."""),
is_flag=True)

save_execution_butler_option = MWOptionDecorator("--save-execution-butler",
help=unwrap("""Export location for an
execution-specific butler after making
QuantumGraph"""))

clobber_execution_butler_option = MWOptionDecorator("--clobber-execution-butler",
help=unwrap("""When creating execution butler overwrite
any existing products"""),
is_flag=True)
12 changes: 10 additions & 2 deletions python/lsst/ctrl/mpexec/cli/script/qgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

def qgraph(pipelineObj, qgraph, qgraph_id, qgraph_node_id, skip_existing, save_qgraph, save_single_quanta,
qgraph_dot, butler_config, input, output, output_run, extend_run, replace_run, prune_replaced,
data_query, show, **kwargs):
data_query, show, save_execution_butler, clobber_execution_butler, **kwargs):
"""Implements the command line interface `pipetask qgraph` subcommand,
should only be called by command line tools and unit test code that test
this function.
Expand Down Expand Up @@ -101,6 +101,11 @@ def qgraph(pipelineObj, qgraph, qgraph_id, qgraph_node_id, skip_existing, save_q
User query selection expression.
show : `list` [`str`] or `None`
Descriptions of what to dump to stdout.
save_execution_butler : `str` or `None`
URI location for storing an execution Butler build from the
QuantumGraph.
clobber_execution_butler : `bool`
It True overwrite existing execution butler files if present.
kwargs : `dict` [`str`, `str`]
Ignored; click commands may accept options for more than one script
function and pass all the option kwargs to each of the script functions
Expand All @@ -126,7 +131,10 @@ def qgraph(pipelineObj, qgraph, qgraph_id, qgraph_node_id, skip_existing, save_q
prune_replaced=prune_replaced,
data_query=data_query,
show=show,
skip_existing=skip_existing)
skip_existing=skip_existing,
execution_butler_location=save_execution_butler,
clobber_execution_butler=clobber_execution_butler
)

f = CmdLineFwk()
qgraph = f.makeGraph(pipelineObj, args)
Expand Down
19 changes: 18 additions & 1 deletion python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
# Imports of standard modules --
# -------------------------------
import argparse
import copy
import fnmatch
import logging
import re
Expand All @@ -46,7 +47,7 @@
)
from lsst.daf.butler.registry import MissingCollectionError, RegistryDefaults
import lsst.pex.config as pexConfig
from lsst.pipe.base import GraphBuilder, Pipeline, QuantumGraph
from lsst.pipe.base import GraphBuilder, Pipeline, QuantumGraph, buildExecutionButler
from lsst.obs.base import Instrument
from .dotTools import graph2dot, pipeline2dot
from .executionGraphFixup import ExecutionGraphFixup
Expand Down Expand Up @@ -561,6 +562,22 @@ def makeGraph(self, pipeline, args):
if args.qgraph_dot:
graph2dot(qgraph, args.qgraph_dot)

if args.execution_butler_location:
butler = Butler(args.butler_config)
newArgs = copy.deepcopy(args)

def builderShim(butler):
newArgs.butler_config = butler._config
# Calling makeWriteButler is done for the side effects of
# calling that method, maining parsing all the args into
# collection names, creating collections, etc.
newButler = _ButlerFactory.makeWriteButler(newArgs)
return newButler

buildExecutionButler(butler, qgraph, args.execution_butler_location, run,
butlerModifier=builderShim, collections=args.input,
clobber=args.clobber_execution_butler)

return qgraph

def runPipeline(self, graph, taskFactory, args, butler=None):
Expand Down
12 changes: 7 additions & 5 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def testMakeGraphFromSave(self):
qgraph = _makeQGraph()
with open(tmpname, "wb") as saveFile:
qgraph.save(saveFile)
args = _makeArgs(qgraph=tmpname, registryConfig=registryConfig)
args = _makeArgs(qgraph=tmpname, registryConfig=registryConfig, execution_butler_location=None)
qgraph = fwk.makeGraph(None, args)
self.assertIsInstance(qgraph, QuantumGraph)
self.assertEqual(len(qgraph), 1)
Expand All @@ -327,15 +327,16 @@ def testMakeGraphFromSave(self):
args = _makeArgs(
qgraph=tmpname,
qgraph_id="R2-D2 is that you?",
registryConfig=registryConfig
registryConfig=registryConfig,
execution_butler_location=None
)
with self.assertRaisesRegex(ValueError, "graphID does not match"):
fwk.makeGraph(None, args)

# save with wrong object type
with open(tmpname, "wb") as saveFile:
pickle.dump({}, saveFile)
args = _makeArgs(qgraph=tmpname, registryConfig=registryConfig)
args = _makeArgs(qgraph=tmpname, registryConfig=registryConfig, execution_butler_location=None)
with self.assertRaises(ValueError):
fwk.makeGraph(None, args)

Expand All @@ -344,7 +345,7 @@ def testMakeGraphFromSave(self):
qgraph = QuantumGraph(dict())
with open(tmpname, "wb") as saveFile:
qgraph.save(saveFile)
args = _makeArgs(qgraph=tmpname, registryConfig=registryConfig)
args = _makeArgs(qgraph=tmpname, registryConfig=registryConfig, execution_butler_location=None)
with self.assertWarnsRegex(UserWarning, "QuantumGraph is empty"):
# this also tests that warning is generated for empty graph
qgraph = fwk.makeGraph(None, args)
Expand Down Expand Up @@ -634,7 +635,8 @@ def testSubgraph(self):
with open(tmpname, "wb") as saveFile:
qgraph.save(saveFile)

args = _makeArgs(qgraph=tmpname, qgraph_node_id=nodeIds, registryConfig=registryConfig)
args = _makeArgs(qgraph=tmpname, qgraph_node_id=nodeIds, registryConfig=registryConfig,
execution_butler_location=None)
fwk = CmdLineFwk()

# load graph, should only read a subset
Expand Down

0 comments on commit 82f2559

Please sign in to comment.