Skip to content

Commit

Permalink
Fix reading of QuantumGraph from pickle file (DM-25016)
Browse files Browse the repository at this point in the history
When reading QuantumGraph from pickle DimensionUniverse instance must be
initialized, this fix makes registry instance before un-pickling which
creates all necessary instances. This trivial fix exposed an issue with
my dumb unit test that also needed proper fix.
  • Loading branch information
andy-slac committed May 22, 2020
1 parent 67900b1 commit 893b41c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 24 deletions.
24 changes: 10 additions & 14 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import datetime
import fnmatch
import logging
import pickle
import re
import sys
from typing import List, Optional, Tuple
Expand Down Expand Up @@ -233,7 +232,7 @@ def check(self, args: argparse.Namespace):
if args.extend_run and not self.outputRun.exists:
raise ValueError(f"Cannot --extend-run; output collection "
f"'{self.outputRun.name}' does not exist.")
if not args.extend_run and self.outputRun.exists:
if not args.extend_run and self.outputRun is not None and self.outputRun.exists:
raise ValueError(f"Output run '{self.outputRun.name}' already exists, but "
f"--extend-run was not given.")
if args.prune_replaced and not args.replace_run:
Expand Down Expand Up @@ -282,7 +281,7 @@ def _makeReadParts(cls, args: argparse.Namespace):
return butler, inputs, self

@classmethod
def makeReadButler(cls, args: argparse.Namespace):
def makeReadButler(cls, args: argparse.Namespace) -> Butler:
"""Construct a read-only butler according to the given command-line
arguments.
Expand All @@ -303,7 +302,8 @@ def makeReadButler(cls, args: argparse.Namespace):
return Butler(butler=butler, collections=inputs)

@classmethod
def makeRegistryAndCollections(cls, args: argparse.Namespace) -> CollectionSearch:
def makeRegistryAndCollections(cls, args: argparse.Namespace) -> \
Tuple[Registry, CollectionSearch, Optional[str]]:
"""Return a read-only registry, a collection search path, and the name
of the run to be used for future writes.
Expand Down Expand Up @@ -591,8 +591,7 @@ def makeGraph(self, pipeline, args):
Parameters
----------
pipeline : `~lsst.pipe.base.Pipeline`
Pipeline, can be empty or ``None`` if graph is read from pickle
file.
Pipeline, can be empty or ``None`` if graph is read from a file.
args : `argparse.Namespace`
Parsed command line
Expand All @@ -602,22 +601,19 @@ def makeGraph(self, pipeline, args):
If resulting graph is empty then `None` is returned.
"""

registry, collections, run = _ButlerFactory.makeRegistryAndCollections(args)

if args.qgraph:

with open(args.qgraph, 'rb') as pickleFile:
qgraph = pickle.load(pickleFile)
if not isinstance(qgraph, QuantumGraph):
raise TypeError("QuantumGraph pickle file has incorrect object type: {}".format(
type(qgraph)))
qgraph = QuantumGraph.restore(pickleFile, registry.dimensions)

# pipeline can not be provided in this case
if pipeline:
raise ValueError("Pipeline must not be given when quantum graph is read from file.")

else:

registry, collections, run = _ButlerFactory.makeRegistryAndCollections(args)

# make execution plan (a.k.a. DAG) for pipeline
graphBuilder = GraphBuilder(registry,
skipExisting=args.skip_existing)
Expand All @@ -634,13 +630,13 @@ def makeGraph(self, pipeline, args):

if args.save_qgraph:
with open(args.save_qgraph, "wb") as pickleFile:
pickle.dump(qgraph, pickleFile)
qgraph.save(pickleFile)

if args.save_single_quanta:
for iq, sqgraph in enumerate(qgraph.quantaAsQgraph()):
filename = args.save_single_quanta.format(iq)
with open(filename, "wb") as pickleFile:
pickle.dump(sqgraph, pickleFile)
qgraph.save(pickleFile)

if args.qgraph_dot:
graph2dot(qgraph, args.qgraph_dot)
Expand Down
47 changes: 37 additions & 10 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
from lsst.ctrl.mpexec.cmdLineFwk import CmdLineFwk
from lsst.ctrl.mpexec.cmdLineParser import (_ACTION_ADD_TASK, _ACTION_CONFIG,
_ACTION_CONFIG_FILE, _ACTION_ADD_INSTRUMENT)
from lsst.daf.butler import Quantum, Config
from lsst.daf.butler import Config, Quantum, Registry
from lsst.daf.butler.registry import RegistryConfig
import lsst.pex.config as pexConfig
from lsst.pipe.base import (Pipeline, PipelineTask, PipelineTaskConfig,
QuantumGraph, QuantumGraphTaskNodes,
Expand Down Expand Up @@ -66,6 +67,23 @@ def makeTmpFile(contents=None):
os.remove(tmpname)


@contextlib.contextmanager
def makeSQLiteRegistry():
"""Context manager to create new empty registry database.
Yields
------
config : `RegistryConfig`
Registry configuration for initialized registry database.
"""
with makeTmpFile() as filename:
uri = f"sqlite:///{filename}"
config = RegistryConfig()
config["db"] = uri
Registry.fromConfig(config, create=True)
yield config


class SimpleConnections(PipelineTaskConnections, dimensions=(),
defaultTemplates={"template": "simple"}):
schema = cT.InitInput(doc="Schema",
Expand Down Expand Up @@ -106,7 +124,8 @@ def makeTask(self, taskClass, config, overrides, butler):


def _makeArgs(pipeline=None, qgraph=None, pipeline_actions=(), order_pipeline=False,
save_pipeline="", save_qgraph="", save_single_quanta="", pipeline_dot="", qgraph_dot=""):
save_pipeline="", save_qgraph="", save_single_quanta="",
pipeline_dot="", qgraph_dot="", registryConfig=None):
"""Return parsed command line arguments.
Parameters
Expand All @@ -130,6 +149,8 @@ def _makeArgs(pipeline=None, qgraph=None, pipeline_actions=(), order_pipeline=Fa
"""
args = argparse.Namespace()
args.butler_config = Config()
if registryConfig:
args.butler_config["registry"] = registryConfig
# The default datastore has a relocatable root, so we need to specify
# some root here for it to use
args.butler_config.configFile = "."
Expand All @@ -142,7 +163,12 @@ def _makeArgs(pipeline=None, qgraph=None, pipeline_actions=(), order_pipeline=Fa
args.save_single_quanta = save_single_quanta
args.pipeline_dot = pipeline_dot
args.qgraph_dot = qgraph_dot
args.output = {}
args.input = ""
args.output = None
args.output_run = None
args.extend_run = False
args.replace_run = False
args.prune_replaced = False
args.register_dataset_types = False
args.skip_init_writes = False
args.no_versions = False
Expand Down Expand Up @@ -260,29 +286,30 @@ def testMakeGraphFromPickle(self):
"""
fwk = CmdLineFwk()

with makeTmpFile() as tmpname:
with makeTmpFile() as tmpname, makeSQLiteRegistry() as registryConfig:

# make non-empty graph and store it in a file
qgraph = _makeQGraph()
with open(tmpname, "wb") as pickleFile:
pickle.dump(qgraph, pickleFile)
args = _makeArgs(qgraph=tmpname)
qgraph.save(pickleFile)
args = _makeArgs(qgraph=tmpname, registryConfig=registryConfig)
qgraph = fwk.makeGraph(None, args)
self.assertIsInstance(qgraph, QuantumGraph)
self.assertEqual(len(qgraph), 1)

# pickle with wrong object type
with open(tmpname, "wb") as pickleFile:
pickle.dump({}, pickleFile)
args = _makeArgs(qgraph=tmpname)
args = _makeArgs(qgraph=tmpname, registryConfig=registryConfig)
with self.assertRaises(TypeError):
fwk.makeGraph(None, args)

# reading empty graph from pickle should return None
# reading empty graph from pickle should work but makeGraph()
# will return None and make a warning
qgraph = QuantumGraph()
with open(tmpname, "wb") as pickleFile:
pickle.dump(qgraph, pickleFile)
args = _makeArgs(qgraph=tmpname)
qgraph.save(pickleFile)
args = _makeArgs(qgraph=tmpname, registryConfig=registryConfig)
with self.assertWarnsRegex(UserWarning, "QuantumGraph is empty"):
# this also tests that warning is generated for empty graph
qgraph = fwk.makeGraph(None, args)
Expand Down

0 comments on commit 893b41c

Please sign in to comment.