Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-34924: Check for dataset type compatibility when creating execution butler #251

Merged
merged 1 commit into from
May 27, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
94 changes: 93 additions & 1 deletion python/lsst/pipe/base/executionButlerBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from lsst.daf.butler import Butler, Config, DataCoordinate, DatasetRef, DatasetType
from lsst.daf.butler.core.repoRelocation import BUTLER_ROOT_TAG
from lsst.daf.butler.registry import ConflictingDefinitionError
from lsst.daf.butler.transfers import RepoExportContext
from lsst.resources import ResourcePath, ResourcePathExpression
from lsst.utils.introspection import get_class_of
Expand All @@ -39,6 +40,72 @@
DataSetTypeMap = Mapping[DatasetType, Set[DataCoordinate]]


def _validate_dataset_type(
candidate: DatasetType, previous: dict[Union[str, DatasetType], DatasetType]
) -> DatasetType:
"""Check the dataset types and return a consistent variant if there are
different compatible options.

Parameters
----------
candidate : `lsst.daf.butler.DatasetType`
The candidate dataset type.
previous : `dict` [Union[`str`, `DatasetType`], `DatasetType`]
Previous dataset types found, indexed by name and also by
dataset type. The latter provides a quick way of returning a
previously checked dataset type.

Returns
-------
datasetType : `lsst.daf.butler.DatasetType`
timj marked this conversation as resolved.
Show resolved Hide resolved
The dataset type to be used. This can be different from the
given ``candidate`` if a previous dataset type was encountered
with the same name and this one is compatible with it.

Raises
------
ConflictingDefinitionError
Raised if a candidate dataset type has the same name as one
previously encountered but is not compatible with it.

Notes
-----
This function ensures that if a dataset type is given that has the
same name as a previously encountered dataset type but differs solely
in a way that is interchangeable (through a supported storage class)
then we will always return the first dataset type encountered instead
of the new variant. We assume that the butler will handle the
type conversion itself later.
"""
# First check that if we have previously vetted this dataset type.
# Return the vetted form immediately if we have.
checked = previous.get(candidate)
if checked:
return checked

# Have not previously encountered this dataset type.
name = candidate.name
if prevDsType := previous.get(name):
# Check compatibility. For now assume both directions have to
# be acceptable.
if prevDsType.is_compatible_with(candidate) and candidate.is_compatible_with(prevDsType):
# Ensure that if this dataset type is used again we will return
# the version that we were first given with this name. Store
# it for next time and return the previous one.
previous[candidate] = prevDsType
return prevDsType
else:
raise ConflictingDefinitionError(
f"Dataset type incompatibility in graph: {prevDsType} not compatible with {candidate}"
)

# New dataset type encountered. Store it by name and by dataset type
# so it will be validated immediately next time it comes up.
previous[name] = candidate
previous[candidate] = candidate
timj marked this conversation as resolved.
Show resolved Hide resolved
return candidate


def _accumulate(
graph: QuantumGraph,
dataset_types: PipelineDatasetTypes,
Expand All @@ -55,12 +122,20 @@ def _accumulate(
# to be produced during processing of the QuantumGraph
inserts: DefaultDict[DatasetType, Set[DataCoordinate]] = defaultdict(set)

# It is possible to end up with a graph that has different storage
# classes attached to the same dataset type name. This is okay but
# must we must ensure that only a single dataset type definition is
# accumulated in the loop below. This data structure caches every dataset
# type encountered and stores the compatible alternative.
datasetTypes: dict[Union[str, DatasetType], DatasetType] = {}

# Add inserts for initOutputs (including initIntermediates); these are
# defined fully by their DatasetType, because they have no dimensions, and
# they are by definition not resolved. initInputs are part of Quantum and
# that's the only place the graph stores the dataset IDs, so we process
# them there even though each Quantum for a task has the same ones.
for dataset_type in itertools.chain(dataset_types.initIntermediates, dataset_types.initOutputs):
dataset_type = _validate_dataset_type(dataset_type, datasetTypes)
inserts[dataset_type].add(DataCoordinate.makeEmpty(dataset_type.dimensions.universe))

n: QuantumNode
Expand Down Expand Up @@ -89,6 +164,7 @@ def _accumulate(
# be part of some other upstream dataset, so it
# should be safe to skip them here
continue
type = _validate_dataset_type(type, datasetTypes)
inserts[type].add(ref.dataId)
return exports, inserts

Expand Down Expand Up @@ -218,7 +294,23 @@ def _import(

# Register datasets to be produced and insert them into the registry
for dsType, dataIds in inserts.items():
newButler.registry.registerDatasetType(dsType)
# There may be inconsistencies with storage class definitions
# so those differences must be checked.
try:
newButler.registry.registerDatasetType(dsType)
except ConflictingDefinitionError:
# We do not at this point know whether the dataset type is
# an intermediate (and so must be able to support conversion
# from the registry storage class to an input) or solely an output
# dataset type. Test both compatibilities.
registryDsType = newButler.registry.getDatasetType(dsType.name)
if registryDsType.is_compatible_with(dsType) and dsType.is_compatible_with(registryDsType):
# Ensure that we use the registry type when inserting.
dsType = registryDsType
else:
# Not compatible so re-raise the original exception.
raise
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include a helpful error message as before, e.g., "Cannot insert {dsType} into the registry because it is incompatible with {registryDsType}"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am dreaming of python 3.11 exception notes. They would be perfect for this scenario. I think in this case though the exception that you are going to get is going to tell you "ConflictingDefinitionError(DatasetType A differs from Registry version B)" or something and there's not a lot I need to add.


newButler.registry.insertDatasets(dsType, dataIds, run)

return newButler
Expand Down