Skip to content

Commit

Permalink
Merge pull request #51 from lsst/tickets/DM-24797
Browse files Browse the repository at this point in the history
DM-24797: Store per-run information (configs, software versions) in butler repo
  • Loading branch information
andy-slac committed May 14, 2020
2 parents 20da6be + 1d63fef commit 8b60ee7
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 10 deletions.
3 changes: 2 additions & 1 deletion python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,8 @@ def runPipeline(self, graph, taskFactory, args, butler=None):
preExecInit = PreExecInit(butler, taskFactory, args.skip_existing)
preExecInit.initialize(graph,
saveInitOutputs=not args.skip_init_writes,
registerDatasetTypes=args.register_dataset_types)
registerDatasetTypes=args.register_dataset_types,
saveVersions=not args.no_versions)

if not args.init_only:
graphFixup = self._importGraphFixup(args)
Expand Down
3 changes: 3 additions & 0 deletions python/lsst/ctrl/mpexec/cmdLineParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ def _makeMetaOutputOptions(parser):
group.add_argument("--register-dataset-types", dest="register_dataset_types", default=False,
action="store_true",
help="Register DatasetTypes that do not already exist in the Registry.")
group.add_argument("--no-versions", dest="no_versions", default=False,
action="store_true",
help="Do not save or check package versions.")


def _makeLoggingOptions(parser):
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/ctrl/mpexec/executionGraphFixup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ def fixupQuanta(self, quanta: Iterable[QuantumIterData]) -> Iterable[QuantumIter
Iterable of topologically ordered quanta as returned from
`lsst.pipe.base.QuantumGraph.traverse` method.
Yieds
-----
Yields
------
quantum : `~lsst.pipe.base.QuantumIterData`
"""
raise NotImplementedError
106 changes: 101 additions & 5 deletions python/lsst/ctrl/mpexec/preExecInit.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
# -----------------------------
# Imports for other modules --
# -----------------------------
from lsst.base import Packages
from lsst.daf.butler import DatasetType
from lsst.pipe.base import PipelineDatasetTypes

_LOG = logging.getLogger(__name__.partition(".")[2])
Expand Down Expand Up @@ -57,7 +59,7 @@ def __init__(self, butler, taskFactory, skipExisting=False):
self.taskFactory = taskFactory
self.skipExisting = skipExisting

def initialize(self, graph, saveInitOutputs=True, registerDatasetTypes=False):
def initialize(self, graph, saveInitOutputs=True, registerDatasetTypes=False, saveVersions=True):
"""Perform all initialization steps.
Convenience method to execute all initialization steps. Instead of
Expand All @@ -69,10 +71,14 @@ def initialize(self, graph, saveInitOutputs=True, registerDatasetTypes=False):
graph : `~lsst.pipe.base.QuantumGraph`
Execution graph.
saveInitOutputs : `bool`, optional
If ``True`` (default) then save task "init outputs" to butler.
If ``True`` (default) then save "init outputs", configurations,
and package versions to butler.
registerDatasetTypes : `bool`, optional
If ``True`` then register dataset types in registry, otherwise
they must be already registered.
saveVersions : `bool`, optional
If ``False`` then do not save package versions even if
``saveInitOutputs`` is set to ``True``.
"""
# register dataset types or check consistency
self.initializeDatasetTypes(graph, registerDatasetTypes)
Expand All @@ -81,6 +87,9 @@ def initialize(self, graph, saveInitOutputs=True, registerDatasetTypes=False):
# is consistent with what tasks would save
if saveInitOutputs:
self.saveInitOutputs(graph)
self.saveConfigs(graph)
if saveVersions:
self.savePackageVersions(graph)

def initializeDatasetTypes(self, graph, registerDatasetTypes=False):
"""Save or check DatasetTypes output by the tasks in a graph.
Expand All @@ -106,9 +115,22 @@ def initializeDatasetTypes(self, graph, registerDatasetTypes=False):
does not exist in registry.
"""
pipeline = list(nodes.taskDef for nodes in graph)

# Make dataset types for configurations
configDatasetTypes = [DatasetType(taskDef.configDatasetName, {},
storageClass="Config",
universe=self.butler.registry.dimensions)
for taskDef in pipeline]

# And one dataset type for package versions
packagesDatasetType = DatasetType("packages", {},
storageClass="Packages",
universe=self.butler.registry.dimensions)

datasetTypes = PipelineDatasetTypes.fromPipeline(pipeline, registry=self.butler.registry)
for datasetType in itertools.chain(datasetTypes.initIntermediates, datasetTypes.initOutputs,
datasetTypes.intermediates, datasetTypes.outputs):
datasetTypes.intermediates, datasetTypes.outputs,
configDatasetTypes, [packagesDatasetType]):
if registerDatasetTypes:
_LOG.debug("Registering DatasetType %s with registry", datasetType)
# this is a no-op if it already exists and is consistent,
Expand Down Expand Up @@ -136,8 +158,8 @@ def saveInitOutputs(self, graph):
exists. Content of a butler collection may be changed if
exception is raised.
Note
----
Notes
-----
If ``skipExisting`` is `True` then existing datasets are not
overwritten, instead we should check that their stored object is
exactly the same as what we would save at this time. Comparing
Expand Down Expand Up @@ -172,3 +194,77 @@ def saveInitOutputs(self, graph):
# butler will raise exception if dataset is already there
_LOG.debug("Saving InitOutputs for task=%s key=%s", task, name)
self.butler.put(initOutputVar, attribute.name, {})

def saveConfigs(self, graph):
"""Write configurations for pipeline tasks to butler or check that
existing configurations are equal to the new ones.
Parameters
----------
graph : `~lsst.pipe.base.QuantumGraph`
Execution graph.
Raises
------
Exception
Raised if ``skipExisting`` is `False` and datasets already exists.
Content of a butler collection should not be changed if exception
is raised.
"""
def logConfigMismatch(msg):
"""Log messages about configuration mismatch.
"""
_LOG.fatal("Comparing configuration: %s", msg)

_LOG.debug("Will save Configs for all tasks")
# start transaction to rollback any changes on exceptions
with self.butler.transaction():
for taskNodes in graph:
taskDef = taskNodes.taskDef
configName = taskDef.configDatasetName

oldConfig = None
if self.skipExisting:
oldConfig = self.butler.get(configName, {})
if oldConfig is not None:
if not taskDef.config.compare(oldConfig, shortcut=False, output=logConfigMismatch):
raise TypeError(
f"Config does not match existing task config {configName!r} in butler; "
"tasks configurations must be consistent within the same run collection")
if oldConfig is None:
# butler will raise exception if dataset is already there
_LOG.debug("Saving Config for task=%s dataset type=%s", taskDef.label, configName)
self.butler.put(taskDef.config, configName, {})

def savePackageVersions(self, graph):
"""Write versions of software packages to butler.
Parameters
----------
graph : `~lsst.pipe.base.QuantumGraph`
Execution graph.
Raises
------
Exception
Raised if ``checkExisting`` is ``True`` but versions are not
compatible.
"""
packages = Packages.fromSystem()
datasetType = "packages"
oldPackages = self.butler.get(datasetType, {}) if self.skipExisting else None
if oldPackages is not None:
# Note that because we can only detect python modules that have been imported, the stored
# list of products may be more or less complete than what we have now. What's important is
# that the products that are in common have the same version.
diff = packages.difference(oldPackages)
if diff:
versions_str = "; ".join(f"{pkg}: {diff[pkg][1]} vs {diff[pkg][0]}" for pkg in diff)
raise TypeError(f"Package versions mismatch: ({versions_str})")
# Update the old set of packages in case we have more packages that haven't been persisted.
extra = packages.extra(oldPackages)
if extra:
oldPackages.update(packages)
self.butler.put(oldPackages, datasetType, {})
else:
self.butler.put(packages, datasetType, {})
14 changes: 13 additions & 1 deletion tests/testUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

__all__ = ["AddTaskConfig", "AddTask", "AddTaskFactoryMock", "ButlerMock"]

import contextlib
import itertools
import logging
import numpy
Expand Down Expand Up @@ -143,6 +144,10 @@ def key(dataId):
"""
return frozenset(dataId.items())

@contextlib.contextmanager
def transaction(self):
yield

def get(self, datasetRefOrType, dataId=None, parameters=None, **kwds):
datasetType, dataId = self._standardizeArgs(datasetRefOrType, dataId, **kwds)
_LOG.info("butler.get: datasetType=%s dataId=%s", datasetType.name, dataId)
Expand Down Expand Up @@ -196,10 +201,17 @@ def registerDatasetTypes(registry, pipeline):
Iterable of TaskDef instances.
"""
for taskDef in pipeline:
configDatasetType = DatasetType(taskDef.configDatasetName, {},
storageClass="Config",
universe=registry.dimensions)
packagesDatasetType = DatasetType("packages", {},
storageClass="Packages",
universe=registry.dimensions)
datasetTypes = pipeBase.TaskDatasetTypes.fromTaskDef(taskDef, registry=registry)
for datasetType in itertools.chain(datasetTypes.initInputs, datasetTypes.initOutputs,
datasetTypes.inputs, datasetTypes.outputs,
datasetTypes.prerequisites):
datasetTypes.prerequisites,
[configDatasetType, packagesDatasetType]):
_LOG.info("Registering %s with registry", datasetType)
# this is a no-op if it already exists and is consistent,
# and it raises if it is inconsistent.
Expand Down
2 changes: 2 additions & 0 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def _makeArgs(pipeline=None, qgraph=None, pipeline_actions=(), order_pipeline=Fa
args.output = {}
args.register_dataset_types = False
args.skip_init_writes = False
args.no_versions = False
args.skip_existing = False
args.init_only = False
args.processes = 1
Expand Down Expand Up @@ -331,6 +332,7 @@ def testSimpleQGraphSkipExisting(self):

AddTask.stopAt = -1
args.skip_existing = True
args.no_versions = True
fwk.runPipeline(qgraph, taskFactory, args, butler=butler)
self.assertEqual(AddTask.countExec, nQuanta)

Expand Down
3 changes: 2 additions & 1 deletion tests/test_cmdLineParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ def testCmdLineParser(self):
run -t taskname
""".split())
run_options = qgraph_options + """register_dataset_types skip_init_writes
init_only processes profile timeout doraise graph_fixup""".split()
init_only processes profile timeout doraise graph_fixup
no_versions""".split()
self.assertEqual(set(vars(args).keys()), set(common_options + run_options))
self.assertEqual(args.subcommand, 'run')

Expand Down

0 comments on commit 8b60ee7

Please sign in to comment.