Skip to content

Commit

Permalink
Use storage classes from QG in PreExecInit.
Browse files Browse the repository at this point in the history
PipelineDatasetTypes does preserve storage classes (that's why
it's being deprecated).
  • Loading branch information
TallJimbo committed Dec 1, 2023
1 parent 643a063 commit 9ccb217
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
4 changes: 4 additions & 0 deletions doc/changes/DM-41962.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Fix a storage class bug in registering dataset types in ``pipetask run``.

Prior to this fix, the presence of multiple storage classes being associated with the same dataset type in a pipeline could cause the registered dataset type's storage class to be random and nondeterministic in regular `pipetask run` execution (but not quantum-backed butler execution).
It now follows the rules set by `PipelineGraph`, in which the definition in the task that produces the dataset wins.
29 changes: 26 additions & 3 deletions python/lsst/ctrl/mpexec/preExecInit.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from lsst.daf.butler import DatasetRef, DatasetType
from lsst.daf.butler.registry import ConflictingDefinitionError
from lsst.pipe.base import PipelineDatasetTypes
from lsst.pipe.base import automatic_connection_constants as acc
from lsst.utils.packages import Packages

if TYPE_CHECKING:
Expand Down Expand Up @@ -389,14 +390,36 @@ def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool
pipelineDatasetTypes = PipelineDatasetTypes.fromPipeline(
pipeline, registry=self.full_butler.registry, include_configs=True, include_packages=True
)

for datasetTypes, is_input in (
# The "registry dataset types" saved with the QG have had their storage
# classes carefully resolved by PipelineGraph, whereas the dataset
# types from PipelineDatasetTypes are a mess because it uses
# NamedValueSet and that ignores storage classes. It will be fully
# removed here (and deprecated everywhere) on DM-40441.
# Note that these "registry dataset types" include dataset types that
# are not actually registered yet; they're the PipelineGraph's
# determination of what _should_ be registered.
registry_storage_classes = {
dataset_type.name: dataset_type.storageClass_name for dataset_type in graph.registryDatasetTypes()
}
registry_storage_classes[acc.PACKAGES_INIT_OUTPUT_NAME] = acc.PACKAGES_INIT_OUTPUT_STORAGE_CLASS
for dataset_types, is_input in (
(pipelineDatasetTypes.initIntermediates, True),
(pipelineDatasetTypes.initOutputs, False),
(pipelineDatasetTypes.intermediates, True),
(pipelineDatasetTypes.outputs, False),
):
self._register_output_dataset_types(registerDatasetTypes, datasetTypes, is_input)
dataset_types = [
(
# The registry dataset types do not include components,
# but we don't support storage class overrides for those
# in other contexts anyway.
dataset_type.overrideStorageClass(registry_storage_classes[dataset_type.name])
if not dataset_type.isComponent()
else dataset_type
)
for dataset_type in dataset_types
]
self._register_output_dataset_types(registerDatasetTypes, dataset_types, is_input)

def _register_output_dataset_types(
self, registerDatasetTypes: bool, datasetTypes: Iterable[DatasetType], is_input: bool
Expand Down

0 comments on commit 9ccb217

Please sign in to comment.