Skip to content

Commit

Permalink
Implement saving of package versions.
Browse files Browse the repository at this point in the history
  • Loading branch information
andy-slac committed May 13, 2020
1 parent 6f5274b commit e7c3758
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 6 deletions.
3 changes: 2 additions & 1 deletion python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,8 @@ def runPipeline(self, graph, taskFactory, args, butler=None):
preExecInit = PreExecInit(butler, taskFactory, args.skip_existing)
preExecInit.initialize(graph,
saveInitOutputs=not args.skip_init_writes,
registerDatasetTypes=args.register_dataset_types)
registerDatasetTypes=args.register_dataset_types,
saveVersions=not args.no_versions)

if not args.init_only:
graphFixup = self._importGraphFixup(args)
Expand Down
3 changes: 3 additions & 0 deletions python/lsst/ctrl/mpexec/cmdLineParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ def _makeMetaOutputOptions(parser):
group.add_argument("--register-dataset-types", dest="register_dataset_types", default=False,
action="store_true",
help="Register DatasetTypes that do not already exist in the Registry.")
group.add_argument("--no-versions", dest="no_versions", default=False,
action="store_true",
help="Do not save or check package versions.")


def _makeLoggingOptions(parser):
Expand Down
51 changes: 48 additions & 3 deletions python/lsst/ctrl/mpexec/preExecInit.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
# -----------------------------
# Imports for other modules --
# -----------------------------
from lsst.base import Packages
from lsst.daf.butler import DatasetType
from lsst.pipe.base import PipelineDatasetTypes

Expand Down Expand Up @@ -58,7 +59,7 @@ def __init__(self, butler, taskFactory, skipExisting=False):
self.taskFactory = taskFactory
self.skipExisting = skipExisting

def initialize(self, graph, saveInitOutputs=True, registerDatasetTypes=False):
def initialize(self, graph, saveInitOutputs=True, registerDatasetTypes=False, saveVersions=True):
"""Perform all initialization steps.
Convenience method to execute all initialization steps. Instead of
Expand All @@ -70,10 +71,14 @@ def initialize(self, graph, saveInitOutputs=True, registerDatasetTypes=False):
graph : `~lsst.pipe.base.QuantumGraph`
Execution graph.
saveInitOutputs : `bool`, optional
If ``True`` (default) then save task "init outputs" to butler.
If ``True`` (default) then save "init outputs", configurations,
and package versions to butler.
registerDatasetTypes : `bool`, optional
If ``True`` then register dataset types in registry, otherwise
they must be already registered.
saveVersions : `bool`, optional
If ``False`` then do not save package versions even if
``saveInitOutputs`` is set to ``True``.
"""
# register dataset types or check consistency
self.initializeDatasetTypes(graph, registerDatasetTypes)
Expand All @@ -83,6 +88,8 @@ def initialize(self, graph, saveInitOutputs=True, registerDatasetTypes=False):
if saveInitOutputs:
self.saveInitOutputs(graph)
self.saveConfigs(graph)
if saveVersions:
self.savePackageVersions(graph)

def initializeDatasetTypes(self, graph, registerDatasetTypes=False):
"""Save or check DatasetTypes output by the tasks in a graph.
Expand Down Expand Up @@ -115,10 +122,15 @@ def initializeDatasetTypes(self, graph, registerDatasetTypes=False):
universe=self.butler.registry.dimensions)
for taskDef in pipeline]

# And one dataset type for package versions
packagesDatasetType = DatasetType("packages", {},
storageClass="Packages",
universe=self.butler.registry.dimensions)

datasetTypes = PipelineDatasetTypes.fromPipeline(pipeline, registry=self.butler.registry)
for datasetType in itertools.chain(datasetTypes.initIntermediates, datasetTypes.initOutputs,
datasetTypes.intermediates, datasetTypes.outputs,
configDatasetTypes):
configDatasetTypes, [packagesDatasetType]):
if registerDatasetTypes:
_LOG.debug("Registering DatasetType %s with registry", datasetType)
# this is a no-op if it already exists and is consistent,
Expand Down Expand Up @@ -221,3 +233,36 @@ def logConfigMismatch(msg):
# butler will raise exception if dataset is already there
_LOG.debug("Saving Config for task=%s dataset type=%s", taskDef.label, configName)
self.butler.put(taskDef.config, configName, {})

def savePackageVersions(self, graph):
"""Write versions of software packages to butler.
Parameters
----------
graph : `~lsst.pipe.base.QuantumGraph`
Execution graph.
Raises
------
Exception
Raised if ``checkExisting`` is ``True`` but versions are not
compatible.
"""
packages = Packages.fromSystem()
datasetType = "packages"
oldPackages = self.butler.get(datasetType, {}) if self.skipExisting else None
if oldPackages is not None:
# Note that because we can only detect python modules that have been imported, the stored
# list of products may be more or less complete than what we have now. What's important is
# that the products that are in common have the same version.
diff = packages.difference(oldPackages)
if diff:
versions_str = "; ".join(f"{pkg}: {diff[pkg][1]} vs {diff[pkg][0]}" for pkg in diff)
raise TypeError(f"Package versions mismatch: ({versions_str})")
# Update the old set of packages in case we have more packages that haven't been persisted.
extra = packages.extra(oldPackages)
if extra:
oldPackages.update(packages)
self.butler.put(oldPackages, datasetType, {})
else:
self.butler.put(packages, datasetType, {})
6 changes: 5 additions & 1 deletion tests/testUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,14 @@ def registerDatasetTypes(registry, pipeline):
configDatasetType = DatasetType(taskDef.configDatasetName, {},
storageClass="Config",
universe=registry.dimensions)
packagesDatasetType = DatasetType("packages", {},
storageClass="Packages",
universe=registry.dimensions)
datasetTypes = pipeBase.TaskDatasetTypes.fromTaskDef(taskDef, registry=registry)
for datasetType in itertools.chain(datasetTypes.initInputs, datasetTypes.initOutputs,
datasetTypes.inputs, datasetTypes.outputs,
datasetTypes.prerequisites, [configDatasetType]):
datasetTypes.prerequisites,
[configDatasetType, packagesDatasetType]):
_LOG.info("Registering %s with registry", datasetType)
# this is a no-op if it already exists and is consistent,
# and it raises if it is inconsistent.
Expand Down
2 changes: 2 additions & 0 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def _makeArgs(pipeline=None, qgraph=None, pipeline_actions=(), order_pipeline=Fa
args.output = {}
args.register_dataset_types = False
args.skip_init_writes = False
args.no_versions = False
args.skip_existing = False
args.init_only = False
args.processes = 1
Expand Down Expand Up @@ -331,6 +332,7 @@ def testSimpleQGraphSkipExisting(self):

AddTask.stopAt = -1
args.skip_existing = True
args.no_versions = True
fwk.runPipeline(qgraph, taskFactory, args, butler=butler)
self.assertEqual(AddTask.countExec, nQuanta)

Expand Down
3 changes: 2 additions & 1 deletion tests/test_cmdLineParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ def testCmdLineParser(self):
run -t taskname
""".split())
run_options = qgraph_options + """register_dataset_types skip_init_writes
init_only processes profile timeout doraise graph_fixup""".split()
init_only processes profile timeout doraise graph_fixup
no_versions""".split()
self.assertEqual(set(vars(args).keys()), set(common_options + run_options))
self.assertEqual(args.subcommand, 'run')

Expand Down

0 comments on commit e7c3758

Please sign in to comment.