Skip to content

Commit

Permalink
Merge pull request #326 from lsst/tickets/DM-38779
Browse files Browse the repository at this point in the history
DM-38779: Use graph DatasetRef in execution butler
  • Loading branch information
timj committed May 4, 2023
2 parents cecac69 + 27a8167 commit 1cb0dfd
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 55 deletions.
4 changes: 4 additions & 0 deletions doc/changes/DM-38779.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
* Execution butler creation has been changed to use the DatasetRefs from the graph rather than creating new registry entries from the dataIDs.
This is possible now that the graph is always created with resolved refs and ensures that provenance is consistent between the graph and the outputs.
* This change to execution butler required that ``ButlerQuantumContext.put()`` no longer unresolves the graph ``DatasetRef`` (otherwise there would be a dataset ID mismatch).
This results in the dataset always using the output run defined in the graph even if the Butler was created with a different default run.
17 changes: 1 addition & 16 deletions python/lsst/pipe/base/butlerQuantumContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,9 @@

__all__ = ("ButlerQuantumContext",)

import warnings
from typing import Any, List, Optional, Sequence, Union

from lsst.daf.butler import (
Butler,
DatasetRef,
DimensionUniverse,
LimitedButler,
Quantum,
UnresolvedRefWarning,
)
from lsst.daf.butler import Butler, DatasetRef, DimensionUniverse, LimitedButler, Quantum
from lsst.utils.introspection import get_full_type_name
from lsst.utils.logging import PeriodicLogger, getLogger

Expand Down Expand Up @@ -149,13 +141,6 @@ def _put(self, value: Any, ref: DatasetRef) -> None:
"""Store data in butler"""
self._checkMembership(ref, self.allOutputs)
if self.__full_butler is not None:
# If reference is resolved we need to unresolved it first.
# It is possible that we are putting a dataset into a different
# run than what was originally expected.
if ref.id is not None:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
ref = ref.unresolved()
self.__full_butler.put(value, ref)
else:
self.__butler.put(value, ref)
Expand Down
61 changes: 29 additions & 32 deletions python/lsst/pipe/base/executionButlerBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,19 @@
__all__ = ("buildExecutionButler",)

import io
import itertools
from collections import defaultdict
from typing import Callable, DefaultDict, Iterable, List, Mapping, Optional, Set, Tuple, Union
from typing import Callable, Iterable, List, Mapping, Optional, Set, Tuple, Union

from lsst.daf.butler import Butler, Config, DataCoordinate, DatasetRef, DatasetType, Registry
from lsst.daf.butler import Butler, Config, DatasetRef, DatasetType, Registry
from lsst.daf.butler.core.repoRelocation import BUTLER_ROOT_TAG
from lsst.daf.butler.registry import ConflictingDefinitionError, MissingDatasetTypeError
from lsst.daf.butler.transfers import RepoExportContext
from lsst.resources import ResourcePath, ResourcePathExpression
from lsst.utils.introspection import get_class_of

from .graph import QuantumGraph
from .pipeline import PipelineDatasetTypes

DataSetTypeMap = Mapping[DatasetType, Set[DataCoordinate]]
DataSetTypeRefMap = Mapping[DatasetType, Set[DatasetRef]]


def _validate_dataset_type(
Expand Down Expand Up @@ -120,8 +118,7 @@ def _validate_dataset_type(
def _accumulate(
butler: Butler,
graph: QuantumGraph,
dataset_types: PipelineDatasetTypes,
) -> Tuple[Set[DatasetRef], DataSetTypeMap]:
) -> Tuple[Set[DatasetRef], DataSetTypeRefMap]:
# accumulate the DatasetRefs that will be transferred to the execution
# registry

Expand All @@ -132,7 +129,7 @@ def _accumulate(
# inserts is the mapping of DatasetType to dataIds for what is to be
# inserted into the registry. These are the products that are expected
# to be produced during processing of the QuantumGraph
inserts: DefaultDict[DatasetType, Set[DataCoordinate]] = defaultdict(set)
inserts: DataSetTypeRefMap = defaultdict(set)

# It is possible to end up with a graph that has different storage
# classes attached to the same dataset type name. This is okay but
Expand All @@ -141,16 +138,19 @@ def _accumulate(
# type encountered and stores the compatible alternative.
datasetTypes: dict[Union[str, DatasetType], DatasetType] = {}

# Add inserts for initOutputs (including initIntermediates); these are
# defined fully by their DatasetType, because they have no dimensions.
# initInputs are part of Quantum and that's the only place the graph stores
# the dataset IDs, so we process them there even though each Quantum for a
# task has the same ones.
for dataset_type in itertools.chain(dataset_types.initIntermediates, dataset_types.initOutputs):
# Find the initOutput refs.
initOutputRefs = list(graph.globalInitOutputRefs())
for task_def in graph.iterTaskGraph():
task_refs = graph.initOutputRefs(task_def)
if task_refs:
initOutputRefs.extend(task_refs)

for ref in initOutputRefs:
dataset_type = ref.datasetType
if dataset_type.component() is not None:
dataset_type = dataset_type.makeCompositeDatasetType()
dataset_type = _validate_dataset_type(dataset_type, datasetTypes, butler.registry)
inserts[dataset_type].add(DataCoordinate.makeEmpty(dataset_type.dimensions.universe))
inserts[dataset_type].add(ref)

# Output references may be resolved even if they do not exist. Find all
# actually existing refs.
Expand Down Expand Up @@ -208,7 +208,8 @@ def _accumulate(
# be part of some other upstream dataset, so it
# should be safe to skip them here
continue
inserts[type].add(ref.dataId)
inserts[type].add(ref)

return exports, inserts


Expand All @@ -230,7 +231,7 @@ def _discoverCollections(butler: Butler, collections: Iterable[str]) -> set[str]
return collections


def _export(butler: Butler, collections: Optional[Iterable[str]], inserts: DataSetTypeMap) -> io.StringIO:
def _export(butler: Butler, collections: Optional[Iterable[str]], inserts: DataSetTypeRefMap) -> io.StringIO:
# This exports relevant dimension records and collections using daf butler
# objects, however it reaches in deep and does not use the public methods
# so that it can export it to a string buffer and skip disk access. This
Expand All @@ -245,8 +246,8 @@ def _export(butler: Butler, collections: Optional[Iterable[str]], inserts: DataS

# Need to ensure that the dimension records for outputs are
# transferred.
for _, dataIds in inserts.items():
exporter.saveDataIds(dataIds)
for _, refs in inserts.items():
exporter.saveDataIds([ref.dataId for ref in refs])

# Look for any defined collection, if not get the defaults
if collections is None:
Expand Down Expand Up @@ -305,7 +306,6 @@ def _setupNewButler(
# file data stores continue to look at the old location.
config = Config(butler._config)
config["root"] = outputLocation.geturl()
config["allow_put_of_predefined_dataset"] = True
config["registry", "db"] = "sqlite:///<butlerRoot>/gen3.sqlite3"

# Remove any namespace that may be set in main registry.
Expand Down Expand Up @@ -339,7 +339,7 @@ def _setupNewButler(
def _import(
yamlBuffer: io.StringIO,
newButler: Butler,
inserts: DataSetTypeMap,
inserts: DataSetTypeRefMap,
run: Optional[str],
butlerModifier: Optional[Callable[[Butler], Butler]],
) -> Butler:
Expand All @@ -359,12 +359,12 @@ def _import(
newButler = butlerModifier(newButler)

# Register datasets to be produced and insert them into the registry
for dsType, dataIds in inserts.items():
for dsType, refs in inserts.items():
# Storage class differences should have already been resolved by calls
# _validate_dataset_type in _export, resulting in the Registry dataset
# type whenever that exists.
newButler.registry.registerDatasetType(dsType)
newButler.registry.insertDatasets(dsType, dataIds, run)
newButler.registry._importDatasets(refs)

return newButler

Expand All @@ -390,7 +390,7 @@ def buildExecutionButler(
Parameters
----------
butler : `lsst.daf.butler.Bulter`
butler : `lsst.daf.butler.Butler`
This is the existing `~lsst.daf.butler.Butler` instance from which
existing datasets will be exported. This should be the
`~lsst.daf.butler.Butler` which was used to create any `QuantumGraphs`
Expand Down Expand Up @@ -445,6 +445,10 @@ def buildExecutionButler(
NotADirectoryError
Raised if specified output URI does not correspond to a directory
"""
# Now require that if run is given it must match the graph run.
if run and graph.metadata and run != (graph_run := graph.metadata.get("output_run")):
raise ValueError(f"The given run, {run!r}, does not match that specified in the graph, {graph_run!r}")

# We know this must refer to a directory.
outputLocation = ResourcePath(outputLocation, forceDirectory=True)
if datastoreRoot is not None:
Expand All @@ -456,14 +460,7 @@ def buildExecutionButler(
if not outputLocation.isdir():
raise NotADirectoryError("The specified output URI does not appear to correspond to a directory")

# Gather all DatasetTypes from the Python and check any that already exist
# in the registry for consistency. This does not check that all dataset
# types here exist, because they might want to register dataset types
# later. It would be nice to also check that, but to that we would need to
# be told whether they plan to register dataset types later (DM-30845).
dataset_types = PipelineDatasetTypes.fromPipeline(graph.iterTaskGraph(), registry=butler.registry)

exports, inserts = _accumulate(butler, graph, dataset_types)
exports, inserts = _accumulate(butler, graph)
yamlBuffer = _export(butler, collections, inserts)

newButler = _setupNewButler(butler, outputLocation, dirExists, datastoreRoot)
Expand Down
13 changes: 8 additions & 5 deletions python/lsst/pipe/base/testUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import collections.abc
import itertools
import unittest.mock
import warnings
from collections import defaultdict
from typing import TYPE_CHECKING, AbstractSet, Any, Dict, Mapping, Optional, Sequence, Set, Union

Expand All @@ -49,7 +48,6 @@
Quantum,
SkyPixDimension,
StorageClassFactory,
UnresolvedRefWarning,
)
from lsst.pipe.base.connectionTypes import BaseConnection, DimensionedConnection

Expand Down Expand Up @@ -267,10 +265,15 @@ def _refFromConnection(
butler.registry.getDatasetType(datasetType.name)
except KeyError:
raise ValueError(f"Invalid dataset type {connection.name}.")
if not butler.run:
raise ValueError("Can not create a resolved DatasetRef since the butler has no default run defined.")
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
ref = DatasetRef(datasetType=datasetType, dataId=dataId)
registry_ref = butler.registry.findDataset(datasetType, dataId, collections=[butler.run])
if registry_ref:
ref = registry_ref
else:
ref = DatasetRef(datasetType=datasetType, dataId=dataId, run=butler.run)
butler.registry._importDatasets([ref])
return ref
except KeyError as e:
raise ValueError(f"Dataset type ({connection.name}) and ID {dataId.byName()} not compatible.") from e
Expand Down
8 changes: 6 additions & 2 deletions python/lsst/pipe/base/tests/simpleQGraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import itertools
import logging
from collections.abc import Iterable, Mapping
from collections.abc import Iterable, Mapping, MutableMapping
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union, cast

import lsst.daf.butler.tests as butlerTests
Expand Down Expand Up @@ -364,7 +364,7 @@ def makeSimpleQGraph(
datasetQueryConstraint: DSQVariant = DSQVariant.ALL,
makeDatastoreRecords: bool = False,
bind: Optional[Mapping[str, Any]] = None,
metadata: Optional[Mapping[str, Any]] = None,
metadata: Optional[MutableMapping[str, Any]] = None,
) -> Tuple[Butler, QuantumGraph]:
"""Make simple QuantumGraph for tests.
Expand Down Expand Up @@ -458,6 +458,10 @@ def makeSimpleQGraph(
userQuery,
bind,
)
if not metadata:
metadata = {}
metadata["output_run"] = run

qgraph = builder.makeGraph(
pipeline,
collections=butler.collections,
Expand Down

0 comments on commit 1cb0dfd

Please sign in to comment.