Skip to content

Commit

Permalink
Merge pull request #276 from lsst/tickets/DM-41962
Browse files Browse the repository at this point in the history
DM-41962: Use storage classes from QG in PreExecInit.
  • Loading branch information
TallJimbo committed Dec 2, 2023
2 parents 643a063 + 2e33486 commit e072a71
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 25 deletions.
4 changes: 4 additions & 0 deletions doc/changes/DM-41962.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Fix a storage class bug in registering dataset types in ``pipetask run``.

Prior to this fix, the presence of multiple storage classes being associated with the same dataset type in a pipeline could cause the registered dataset type's storage class to be random and nondeterministic in regular `pipetask run` execution (but not quantum-backed butler execution).
It now follows the rules set by `PipelineGraph`, in which the definition in the task that produces the dataset wins.
31 changes: 28 additions & 3 deletions python/lsst/ctrl/mpexec/preExecInit.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from lsst.daf.butler import DatasetRef, DatasetType
from lsst.daf.butler.registry import ConflictingDefinitionError
from lsst.pipe.base import PipelineDatasetTypes
from lsst.pipe.base import automatic_connection_constants as acc
from lsst.utils.packages import Packages

if TYPE_CHECKING:
Expand Down Expand Up @@ -389,14 +390,38 @@ def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool
pipelineDatasetTypes = PipelineDatasetTypes.fromPipeline(
pipeline, registry=self.full_butler.registry, include_configs=True, include_packages=True
)

for datasetTypes, is_input in (
# The "registry dataset types" saved with the QG have had their storage
# classes carefully resolved by PipelineGraph, whereas the dataset
# types from PipelineDatasetTypes are a mess because it uses
# NamedValueSet and that ignores storage classes. It will be fully
# removed here (and deprecated everywhere) on DM-40441.
# Note that these "registry dataset types" include dataset types that
# are not actually registered yet; they're the PipelineGraph's
# determination of what _should_ be registered.
registry_storage_classes = {
dataset_type.name: dataset_type.storageClass_name for dataset_type in graph.registryDatasetTypes()
}
registry_storage_classes[acc.PACKAGES_INIT_OUTPUT_NAME] = acc.PACKAGES_INIT_OUTPUT_STORAGE_CLASS
dataset_types: Iterable[DatasetType]
for dataset_types, is_input in (
(pipelineDatasetTypes.initIntermediates, True),
(pipelineDatasetTypes.initOutputs, False),
(pipelineDatasetTypes.intermediates, True),
(pipelineDatasetTypes.outputs, False),
):
self._register_output_dataset_types(registerDatasetTypes, datasetTypes, is_input)
dataset_types = [
(
# The registry dataset types do not include components, but
# we don't support storage class overrides for those in
# other contexts anyway, and custom-built QGs may not have
# the registry dataset types field populated at all.x
dataset_type.overrideStorageClass(registry_storage_classes[dataset_type.name])
if dataset_type.name in registry_storage_classes
else dataset_type
)
for dataset_type in dataset_types
]
self._register_output_dataset_types(registerDatasetTypes, dataset_types, is_input)

def _register_output_dataset_types(
self, registerDatasetTypes: bool, datasetTypes: Iterable[DatasetType], is_input: bool
Expand Down
49 changes: 27 additions & 22 deletions tests/test_simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,12 @@ def _configure_pipeline(self, config_a_cls, config_b_cls, storageClass_a=None, s
return executor

def _test_logs(self, log_output, input_type_a, output_type_a, input_type_b, output_type_b):
"""Check the expected input types received by tasks A and B"""
"""Check the expected input types received by tasks A and B.
Note that these are the types as seen from the perspective of the task,
so they must be consistent with the task's connections, but may not be
consistent with the registry dataset types.
"""
all_logs = "\n".join(log_output)
self.assertIn(f"lsst.a:Run method given data of type: {input_type_a}", all_logs)
self.assertIn(f"lsst.b:Run method given data of type: {input_type_b}", all_logs)
Expand All @@ -191,12 +196,6 @@ def test_from_pipeline(self):

def test_from_pipeline_intermediates_differ(self):
"""Run pipeline but intermediates definition in registry differs."""
executor = self._configure_pipeline(
NoDimensionsTestTask.ConfigClass,
NoDimensionsTestTask.ConfigClass,
storageClass_b="TaskMetadataLike",
)

# Pre-define the "intermediate" storage class to be something that is
# like a dict but is not a dict. This will fail unless storage
# class conversion is supported in put and get.
Expand All @@ -207,7 +206,11 @@ def test_from_pipeline_intermediates_differ(self):
storageClass="TaskMetadataLike",
)
)

executor = self._configure_pipeline(
NoDimensionsTestTask.ConfigClass,
NoDimensionsTestTask.ConfigClass,
storageClass_b="TaskMetadataLike",
)
with self.assertLogs("lsst", level="INFO") as cm:
quanta = executor.run(register_dataset_types=True, save_versions=False)
# A dict is given to task a without change.
Expand All @@ -221,17 +224,11 @@ def test_from_pipeline_intermediates_differ(self):
self._test_logs(cm.output, "dict", "dict", "dict", "lsst.pipe.base.TaskMetadata")

self.assertEqual(len(quanta), 2)
self.assertEqual(self.butler.get("intermediate").to_dict(), {"zero": 0, "one": 1})
self.assertEqual(self.butler.get("output").to_dict(), {"zero": 0, "one": 1, "two": 2})
self.assertEqual(self.butler.get("intermediate"), TaskMetadata.from_dict({"zero": 0, "one": 1}))
self.assertEqual(self.butler.get("output"), TaskMetadata.from_dict({"zero": 0, "one": 1, "two": 2}))

def test_from_pipeline_output_differ(self):
"""Run pipeline but output definition in registry differs."""
executor = self._configure_pipeline(
NoDimensionsTestTask.ConfigClass,
NoDimensionsTestTask.ConfigClass,
storageClass_a="TaskMetadataLike",
)

# Pre-define the "output" storage class to be something that is
# like a dict but is not a dict. This will fail unless storage
# class conversion is supported in put and get.
Expand All @@ -242,16 +239,21 @@ def test_from_pipeline_output_differ(self):
storageClass="TaskMetadataLike",
)
)

executor = self._configure_pipeline(
NoDimensionsTestTask.ConfigClass,
NoDimensionsTestTask.ConfigClass,
storageClass_a="TaskMetadataLike",
)
with self.assertLogs("lsst", level="INFO") as cm:
quanta = executor.run(register_dataset_types=True, save_versions=False)
# a has been told to return a TaskMetadata but will convert to dict.
# a has been told to return a TaskMetadata but this will convert to
# dict on read by b.
# b returns a dict and that is converted to TaskMetadata on put.
self._test_logs(cm.output, "dict", "lsst.pipe.base.TaskMetadata", "dict", "dict")

self.assertEqual(len(quanta), 2)
self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
self.assertEqual(self.butler.get("output").to_dict(), {"zero": 0, "one": 1, "two": 2})
self.assertEqual(self.butler.get("intermediate"), TaskMetadata.from_dict({"zero": 0, "one": 1}))
self.assertEqual(self.butler.get("output"), TaskMetadata.from_dict({"zero": 0, "one": 1, "two": 2}))

def test_from_pipeline_input_differ(self):
"""Run pipeline but input definition in registry differs."""
Expand All @@ -267,8 +269,11 @@ def test_from_pipeline_input_differ(self):
self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2})

def test_from_pipeline_incompatible(self):
"""Run pipeline but definitions are not compatible."""
def test_from_pipeline_inconsistent_dataset_types(self):
"""Generate the QG (by initializing the executor), then register the
dataset type with a different storage class than the QG should have
predicted, to make sure execution fails as it should.
"""
executor = self._configure_pipeline(
NoDimensionsTestTask.ConfigClass, NoDimensionsTestTask.ConfigClass
)
Expand Down

0 comments on commit e072a71

Please sign in to comment.