Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-41962: Use storage classes from QG in PreExecInit. #276

Merged
merged 4 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/changes/DM-41962.bugfix.md
@@ -0,0 +1,4 @@
Fix a storage class bug in registering dataset types in ``pipetask run``.

Prior to this fix, the presence of multiple storage classes being associated with the same dataset type in a pipeline could cause the registered dataset type's storage class to be random and nondeterministic in regular `pipetask run` execution (but not quantum-backed butler execution).
It now follows the rules set by `PipelineGraph`, in which the definition in the task that produces the dataset wins.
30 changes: 27 additions & 3 deletions python/lsst/ctrl/mpexec/preExecInit.py
Expand Up @@ -44,6 +44,7 @@
from lsst.daf.butler import DatasetRef, DatasetType
from lsst.daf.butler.registry import ConflictingDefinitionError
from lsst.pipe.base import PipelineDatasetTypes
from lsst.pipe.base import automatic_connection_constants as acc
from lsst.utils.packages import Packages

if TYPE_CHECKING:
Expand Down Expand Up @@ -389,14 +390,37 @@ def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool
pipelineDatasetTypes = PipelineDatasetTypes.fromPipeline(
pipeline, registry=self.full_butler.registry, include_configs=True, include_packages=True
)

for datasetTypes, is_input in (
# The "registry dataset types" saved with the QG have had their storage
# classes carefully resolved by PipelineGraph, whereas the dataset
# types from PipelineDatasetTypes are a mess because it uses
# NamedValueSet and that ignores storage classes. It will be fully
# removed here (and deprecated everywhere) on DM-40441.
# Note that these "registry dataset types" include dataset types that
# are not actually registered yet; they're the PipelineGraph's
# determination of what _should_ be registered.
registry_storage_classes = {
dataset_type.name: dataset_type.storageClass_name for dataset_type in graph.registryDatasetTypes()
}
registry_storage_classes[acc.PACKAGES_INIT_OUTPUT_NAME] = acc.PACKAGES_INIT_OUTPUT_STORAGE_CLASS
dataset_types: Iterable[DatasetType]
for dataset_types, is_input in (
(pipelineDatasetTypes.initIntermediates, True),
(pipelineDatasetTypes.initOutputs, False),
(pipelineDatasetTypes.intermediates, True),
(pipelineDatasetTypes.outputs, False),
):
self._register_output_dataset_types(registerDatasetTypes, datasetTypes, is_input)
dataset_types = [
(
# The registry dataset types do not include components,
# but we don't support storage class overrides for those
# in other contexts anyway.
dataset_type.overrideStorageClass(registry_storage_classes[dataset_type.name])
if not dataset_type.isComponent()
else dataset_type
)
for dataset_type in dataset_types
]
self._register_output_dataset_types(registerDatasetTypes, dataset_types, is_input)

def _register_output_dataset_types(
self, registerDatasetTypes: bool, datasetTypes: Iterable[DatasetType], is_input: bool
Expand Down
49 changes: 27 additions & 22 deletions tests/test_simple_pipeline_executor.py
Expand Up @@ -165,7 +165,12 @@ def _configure_pipeline(self, config_a_cls, config_b_cls, storageClass_a=None, s
return executor

def _test_logs(self, log_output, input_type_a, output_type_a, input_type_b, output_type_b):
"""Check the expected input types received by tasks A and B"""
"""Check the expected input types received by tasks A and B.

Note that these are the types as seen from the perspective of the task,
so they must be consistent with the task's connections, but may not be
consistent with the registry dataset types.
"""
all_logs = "\n".join(log_output)
self.assertIn(f"lsst.a:Run method given data of type: {input_type_a}", all_logs)
self.assertIn(f"lsst.b:Run method given data of type: {input_type_b}", all_logs)
Expand All @@ -191,12 +196,6 @@ def test_from_pipeline(self):

def test_from_pipeline_intermediates_differ(self):
"""Run pipeline but intermediates definition in registry differs."""
executor = self._configure_pipeline(
NoDimensionsTestTask.ConfigClass,
NoDimensionsTestTask.ConfigClass,
storageClass_b="TaskMetadataLike",
)

# Pre-define the "intermediate" storage class to be something that is
# like a dict but is not a dict. This will fail unless storage
# class conversion is supported in put and get.
Expand All @@ -207,7 +206,11 @@ def test_from_pipeline_intermediates_differ(self):
storageClass="TaskMetadataLike",
)
)

executor = self._configure_pipeline(
NoDimensionsTestTask.ConfigClass,
NoDimensionsTestTask.ConfigClass,
storageClass_b="TaskMetadataLike",
)
with self.assertLogs("lsst", level="INFO") as cm:
quanta = executor.run(register_dataset_types=True, save_versions=False)
# A dict is given to task a without change.
Expand All @@ -221,17 +224,11 @@ def test_from_pipeline_intermediates_differ(self):
self._test_logs(cm.output, "dict", "dict", "dict", "lsst.pipe.base.TaskMetadata")

self.assertEqual(len(quanta), 2)
self.assertEqual(self.butler.get("intermediate").to_dict(), {"zero": 0, "one": 1})
self.assertEqual(self.butler.get("output").to_dict(), {"zero": 0, "one": 1, "two": 2})
self.assertEqual(self.butler.get("intermediate"), TaskMetadata.from_dict({"zero": 0, "one": 1}))
self.assertEqual(self.butler.get("output"), TaskMetadata.from_dict({"zero": 0, "one": 1, "two": 2}))

def test_from_pipeline_output_differ(self):
"""Run pipeline but output definition in registry differs."""
executor = self._configure_pipeline(
NoDimensionsTestTask.ConfigClass,
NoDimensionsTestTask.ConfigClass,
storageClass_a="TaskMetadataLike",
)

# Pre-define the "output" storage class to be something that is
# like a dict but is not a dict. This will fail unless storage
# class conversion is supported in put and get.
Expand All @@ -242,16 +239,21 @@ def test_from_pipeline_output_differ(self):
storageClass="TaskMetadataLike",
)
)

executor = self._configure_pipeline(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to move until after the dataset type has been registered? If that is the case shouldn't the other tests have this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. They probably don't matter, but they're at least unsafe. Will do.

NoDimensionsTestTask.ConfigClass,
NoDimensionsTestTask.ConfigClass,
storageClass_a="TaskMetadataLike",
)
with self.assertLogs("lsst", level="INFO") as cm:
quanta = executor.run(register_dataset_types=True, save_versions=False)
# a has been told to return a TaskMetadata but will convert to dict.
# a has been told to return a TaskMetadata but this will convert to
# dict on read by b.
# b returns a dict and that is converted to TaskMetadata on put.
self._test_logs(cm.output, "dict", "lsst.pipe.base.TaskMetadata", "dict", "dict")
timj marked this conversation as resolved.
Show resolved Hide resolved

self.assertEqual(len(quanta), 2)
self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
self.assertEqual(self.butler.get("output").to_dict(), {"zero": 0, "one": 1, "two": 2})
self.assertEqual(self.butler.get("intermediate"), TaskMetadata.from_dict({"zero": 0, "one": 1}))
self.assertEqual(self.butler.get("output"), TaskMetadata.from_dict({"zero": 0, "one": 1, "two": 2}))

def test_from_pipeline_input_differ(self):
"""Run pipeline but input definition in registry differs."""
Expand All @@ -267,8 +269,11 @@ def test_from_pipeline_input_differ(self):
self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2})

def test_from_pipeline_incompatible(self):
"""Run pipeline but definitions are not compatible."""
def test_from_pipeline_inconsistent_dataset_types(self):
"""Generate the QG (by initializing the executor), then register the
dataset type with a different storage class than the QG should have
predicted, to make sure execution fails as it should.
"""
executor = self._configure_pipeline(
NoDimensionsTestTask.ConfigClass, NoDimensionsTestTask.ConfigClass
)
Expand Down