Skip to content

Commit

Permalink
Use graph DatasetRef in execution butler
Browse files Browse the repository at this point in the history
This attempts to ensure we have provenance agreement between
the graph and the execution butler, since the graph now always
has resolved refs.
  • Loading branch information
timj committed May 1, 2023
1 parent 79c1831 commit 8d93e1c
Showing 1 changed file with 24 additions and 30 deletions.
54 changes: 24 additions & 30 deletions python/lsst/pipe/base/executionButlerBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,19 @@
__all__ = ("buildExecutionButler",)

import io
import itertools
from collections import defaultdict
from typing import Callable, DefaultDict, Iterable, List, Mapping, Optional, Set, Tuple, Union
from typing import Callable, Iterable, List, Mapping, Optional, Set, Tuple, Union

from lsst.daf.butler import Butler, Config, DataCoordinate, DatasetRef, DatasetType, Registry
from lsst.daf.butler import Butler, Config, DatasetRef, DatasetType, Registry
from lsst.daf.butler.core.repoRelocation import BUTLER_ROOT_TAG
from lsst.daf.butler.registry import ConflictingDefinitionError, MissingDatasetTypeError
from lsst.daf.butler.transfers import RepoExportContext
from lsst.resources import ResourcePath, ResourcePathExpression
from lsst.utils.introspection import get_class_of

from .graph import QuantumGraph
from .pipeline import PipelineDatasetTypes

DataSetTypeMap = Mapping[DatasetType, Set[DataCoordinate]]
DataSetTypeRefMap = Mapping[DatasetType, Set[DatasetRef]]


def _validate_dataset_type(
Expand Down Expand Up @@ -120,8 +118,7 @@ def _validate_dataset_type(
def _accumulate(
butler: Butler,
graph: QuantumGraph,
dataset_types: PipelineDatasetTypes,
) -> Tuple[Set[DatasetRef], DataSetTypeMap]:
) -> Tuple[Set[DatasetRef], DataSetTypeRefMap]:
# accumulate the DatasetRefs that will be transferred to the execution
# registry

Expand All @@ -132,7 +129,7 @@ def _accumulate(
# inserts is the mapping of DatasetType to dataIds for what is to be
# inserted into the registry. These are the products that are expected
# to be produced during processing of the QuantumGraph
inserts: DefaultDict[DatasetType, Set[DataCoordinate]] = defaultdict(set)
inserts: DataSetTypeRefMap = defaultdict(set)

# It is possible to end up with a graph that has different storage
# classes attached to the same dataset type name. This is okay but
Expand All @@ -141,16 +138,19 @@ def _accumulate(
# type encountered and stores the compatible alternative.
datasetTypes: dict[Union[str, DatasetType], DatasetType] = {}

# Add inserts for initOutputs (including initIntermediates); these are
# defined fully by their DatasetType, because they have no dimensions.
# initInputs are part of Quantum and that's the only place the graph stores
# the dataset IDs, so we process them there even though each Quantum for a
# task has the same ones.
for dataset_type in itertools.chain(dataset_types.initIntermediates, dataset_types.initOutputs):
# Find the initOutput refs.
initOutputRefs = [ref for ref in graph.globalInitOutputRefs()]
for task_def in graph.iterTaskGraph():
task_refs = graph.initOutputRefs(task_def)
if task_refs:
initOutputRefs.extend(task_refs)

for ref in initOutputRefs:
dataset_type = ref.datasetType
if dataset_type.component() is not None:
dataset_type = dataset_type.makeCompositeDatasetType()
dataset_type = _validate_dataset_type(dataset_type, datasetTypes, butler.registry)
inserts[dataset_type].add(DataCoordinate.makeEmpty(dataset_type.dimensions.universe))
inserts[dataset_type].add(ref)

# Output references may be resolved even if they do not exist. Find all
# actually existing refs.
Expand Down Expand Up @@ -208,7 +208,8 @@ def _accumulate(
# be part of some other upstream dataset, so it
# should be safe to skip them here
continue
inserts[type].add(ref.dataId)
inserts[type].add(ref)

return exports, inserts


Expand All @@ -230,7 +231,7 @@ def _discoverCollections(butler: Butler, collections: Iterable[str]) -> set[str]
return collections


def _export(butler: Butler, collections: Optional[Iterable[str]], inserts: DataSetTypeMap) -> io.StringIO:
def _export(butler: Butler, collections: Optional[Iterable[str]], inserts: DataSetTypeRefMap) -> io.StringIO:
# This exports relevant dimension records and collections using daf butler
# objects, however it reaches in deep and does not use the public methods
# so that it can export it to a string buffer and skip disk access. This
Expand All @@ -245,8 +246,8 @@ def _export(butler: Butler, collections: Optional[Iterable[str]], inserts: DataS

# Need to ensure that the dimension records for outputs are
# transferred.
for _, dataIds in inserts.items():
exporter.saveDataIds(dataIds)
for _, refs in inserts.items():
exporter.saveDataIds([ref.dataId for ref in refs])

# Look for any defined collection, if not get the defaults
if collections is None:
Expand Down Expand Up @@ -339,7 +340,7 @@ def _setupNewButler(
def _import(
yamlBuffer: io.StringIO,
newButler: Butler,
inserts: DataSetTypeMap,
inserts: DataSetTypeRefMap,
run: Optional[str],
butlerModifier: Optional[Callable[[Butler], Butler]],
) -> Butler:
Expand All @@ -359,12 +360,12 @@ def _import(
newButler = butlerModifier(newButler)

# Register datasets to be produced and insert them into the registry
for dsType, dataIds in inserts.items():
for dsType, refs in inserts.items():
# Storage class differences should have already been resolved by calls
# _validate_dataset_type in _export, resulting in the Registry dataset
# type whenever that exists.
newButler.registry.registerDatasetType(dsType)
newButler.registry.insertDatasets(dsType, dataIds, run)
newButler.registry._importDatasets(refs)

return newButler

Expand Down Expand Up @@ -456,14 +457,7 @@ def buildExecutionButler(
if not outputLocation.isdir():
raise NotADirectoryError("The specified output URI does not appear to correspond to a directory")

# Gather all DatasetTypes from the Python and check any that already exist
# in the registry for consistency. This does not check that all dataset
# types here exist, because they might want to register dataset types
# later. It would be nice to also check that, but to that we would need to
# be told whether they plan to register dataset types later (DM-30845).
dataset_types = PipelineDatasetTypes.fromPipeline(graph.iterTaskGraph(), registry=butler.registry)

exports, inserts = _accumulate(butler, graph, dataset_types)
exports, inserts = _accumulate(butler, graph)
yamlBuffer = _export(butler, collections, inserts)

newButler = _setupNewButler(butler, outputLocation, dirExists, datastoreRoot)
Expand Down

0 comments on commit 8d93e1c

Please sign in to comment.