Skip to content

Commit

Permalink
Merge pull request #251 from lsst/tickets/DM-34924
Browse files Browse the repository at this point in the history
DM-34924: Check for dataset type compatibility when creating execution butler
  • Loading branch information
timj committed May 27, 2022
2 parents e3f43cd + 115fa21 commit ddcd2f7
Showing 1 changed file with 93 additions and 1 deletion.
94 changes: 93 additions & 1 deletion python/lsst/pipe/base/executionButlerBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from lsst.daf.butler import Butler, Config, DataCoordinate, DatasetRef, DatasetType
from lsst.daf.butler.core.repoRelocation import BUTLER_ROOT_TAG
from lsst.daf.butler.registry import ConflictingDefinitionError
from lsst.daf.butler.transfers import RepoExportContext
from lsst.resources import ResourcePath, ResourcePathExpression
from lsst.utils.introspection import get_class_of
Expand All @@ -39,6 +40,72 @@
DataSetTypeMap = Mapping[DatasetType, Set[DataCoordinate]]


def _validate_dataset_type(
candidate: DatasetType, previous: dict[Union[str, DatasetType], DatasetType]
) -> DatasetType:
"""Check the dataset types and return a consistent variant if there are
different compatible options.
Parameters
----------
candidate : `lsst.daf.butler.DatasetType`
The candidate dataset type.
previous : `dict` [Union[`str`, `DatasetType`], `DatasetType`]
Previous dataset types found, indexed by name and also by
dataset type. The latter provides a quick way of returning a
previously checked dataset type.
Returns
-------
datasetType : `lsst.daf.butler.DatasetType`
The dataset type to be used. This can be different from the
given ``candidate`` if a previous dataset type was encountered
with the same name and this one is compatible with it.
Raises
------
ConflictingDefinitionError
Raised if a candidate dataset type has the same name as one
previously encountered but is not compatible with it.
Notes
-----
This function ensures that if a dataset type is given that has the
same name as a previously encountered dataset type but differs solely
in a way that is interchangeable (through a supported storage class)
then we will always return the first dataset type encountered instead
of the new variant. We assume that the butler will handle the
type conversion itself later.
"""
# First check that if we have previously vetted this dataset type.
# Return the vetted form immediately if we have.
checked = previous.get(candidate)
if checked:
return checked

# Have not previously encountered this dataset type.
name = candidate.name
if prevDsType := previous.get(name):
# Check compatibility. For now assume both directions have to
# be acceptable.
if prevDsType.is_compatible_with(candidate) and candidate.is_compatible_with(prevDsType):
# Ensure that if this dataset type is used again we will return
# the version that we were first given with this name. Store
# it for next time and return the previous one.
previous[candidate] = prevDsType
return prevDsType
else:
raise ConflictingDefinitionError(
f"Dataset type incompatibility in graph: {prevDsType} not compatible with {candidate}"
)

# New dataset type encountered. Store it by name and by dataset type
# so it will be validated immediately next time it comes up.
previous[name] = candidate
previous[candidate] = candidate
return candidate


def _accumulate(
graph: QuantumGraph,
dataset_types: PipelineDatasetTypes,
Expand All @@ -55,12 +122,20 @@ def _accumulate(
# to be produced during processing of the QuantumGraph
inserts: DefaultDict[DatasetType, Set[DataCoordinate]] = defaultdict(set)

# It is possible to end up with a graph that has different storage
# classes attached to the same dataset type name. This is okay but
# must we must ensure that only a single dataset type definition is
# accumulated in the loop below. This data structure caches every dataset
# type encountered and stores the compatible alternative.
datasetTypes: dict[Union[str, DatasetType], DatasetType] = {}

# Add inserts for initOutputs (including initIntermediates); these are
# defined fully by their DatasetType, because they have no dimensions, and
# they are by definition not resolved. initInputs are part of Quantum and
# that's the only place the graph stores the dataset IDs, so we process
# them there even though each Quantum for a task has the same ones.
for dataset_type in itertools.chain(dataset_types.initIntermediates, dataset_types.initOutputs):
dataset_type = _validate_dataset_type(dataset_type, datasetTypes)
inserts[dataset_type].add(DataCoordinate.makeEmpty(dataset_type.dimensions.universe))

n: QuantumNode
Expand Down Expand Up @@ -89,6 +164,7 @@ def _accumulate(
# be part of some other upstream dataset, so it
# should be safe to skip them here
continue
type = _validate_dataset_type(type, datasetTypes)
inserts[type].add(ref.dataId)
return exports, inserts

Expand Down Expand Up @@ -218,7 +294,23 @@ def _import(

# Register datasets to be produced and insert them into the registry
for dsType, dataIds in inserts.items():
newButler.registry.registerDatasetType(dsType)
# There may be inconsistencies with storage class definitions
# so those differences must be checked.
try:
newButler.registry.registerDatasetType(dsType)
except ConflictingDefinitionError:
# We do not at this point know whether the dataset type is
# an intermediate (and so must be able to support conversion
# from the registry storage class to an input) or solely an output
# dataset type. Test both compatibilities.
registryDsType = newButler.registry.getDatasetType(dsType.name)
if registryDsType.is_compatible_with(dsType) and dsType.is_compatible_with(registryDsType):
# Ensure that we use the registry type when inserting.
dsType = registryDsType
else:
# Not compatible so re-raise the original exception.
raise

newButler.registry.insertDatasets(dsType, dataIds, run)

return newButler
Expand Down

0 comments on commit ddcd2f7

Please sign in to comment.