Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-39198: Ensure task connection storage classes are used in Quanta. #335

Merged
merged 3 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/changes/DM-39198.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fix handling of storage classes in QuantumGraph generation.

This could lead to a failure downstream in execution butler creation, and would likely have led to problems with Quantum-Backed Butler usage as well.
99 changes: 71 additions & 28 deletions python/lsst/pipe/base/graphBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@
from lsst.daf.butler.registry.wildcards import CollectionWildcard
from lsst.utils import doImportType

from ._datasetQueryConstraints import DatasetQueryConstraintVariant
from ._status import NoWorkFound

# -----------------------------
# Imports for other modules --
# -----------------------------
from . import automatic_connection_constants as acc
from ._datasetQueryConstraints import DatasetQueryConstraintVariant
from ._status import NoWorkFound
from .connections import AdjustQuantumHelper, iterConnections
from .graph import QuantumGraph
from .pipeline import Pipeline, PipelineDatasetTypes, TaskDatasetTypes, TaskDef
Expand Down Expand Up @@ -242,42 +242,60 @@
return base
return base.union(*[datasetType.dimensions for datasetType in self.keys()])

def unpackSingleRefs(self) -> NamedKeyDict[DatasetType, DatasetRef]:
def unpackSingleRefs(self, storage_classes: dict[str, str]) -> NamedKeyDict[DatasetType, DatasetRef]:
"""Unpack nested single-element `DatasetRef` dicts into a new
mapping with `DatasetType` keys and `DatasetRef` values.

This method assumes that each nest contains exactly one item, as is the
case for all "init" datasets.

Parameters
----------
storage_classes : `dict` [ `str`, `str` ]
Mapping from dataset type name to the storage class to use for that
dataset type. These are typically the storage classes declared
for a particular task, which may differ rom the data repository
definitions.

Returns
-------
dictionary : `NamedKeyDict`
Dictionary mapping `DatasetType` to `DatasetRef`, with both
`DatasetType` instances and string names usable as keys.
"""
return NamedKeyDict(
{datasetType: refs[0] for datasetType, refs in self.unpackMultiRefs(storage_classes).items()}
)

def getOne(refs: dict[DataCoordinate, _RefHolder]) -> DatasetRef:
(holder,) = refs.values()
return holder.resolved_ref

return NamedKeyDict({datasetType: getOne(refs) for datasetType, refs in self.items()})

def unpackMultiRefs(self) -> NamedKeyDict[DatasetType, list[DatasetRef]]:
def unpackMultiRefs(self, storage_classes: dict[str, str]) -> NamedKeyDict[DatasetType, list[DatasetRef]]:
"""Unpack nested multi-element `DatasetRef` dicts into a new
mapping with `DatasetType` keys and `set` of `DatasetRef` values.
mapping with `DatasetType` keys and `list` of `DatasetRef` values.

Parameters
----------
storage_classes : `dict` [ `str`, `str` ]
Mapping from dataset type name to the storage class to use for that
dataset type. These are typically the storage classes declared
for a particular task, which may differ rom the data repository
definitions.

Returns
-------
dictionary : `NamedKeyDict`
Dictionary mapping `DatasetType` to `list` of `DatasetRef`, with
both `DatasetType` instances and string names usable as keys.
"""
return NamedKeyDict(
{
datasetType: list(holder.resolved_ref for holder in refs.values())
for datasetType, refs in self.items()
}
)
result = {}
for dataset_type, holders in self.items():
if (
override := storage_classes.get(dataset_type.name, dataset_type.storageClass_name)
) != dataset_type.storageClass_name:
dataset_type = dataset_type.overrideStorageClass(override)

Check warning on line 293 in python/lsst/pipe/base/graphBuilder.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/graphBuilder.py#L293

Added line #L293 was not covered by tests
refs = [holder.resolved_ref.overrideStorageClass(override) for holder in holders.values()]
else:
refs = [holder.resolved_ref for holder in holders.values()]
result[dataset_type] = refs
return NamedKeyDict(result)

def extract(
self, datasetType: DatasetType, dataIds: Iterable[DataCoordinate]
Expand Down Expand Up @@ -395,8 +413,8 @@
quantum : `Quantum`
An actual `Quantum` instance.
"""
allInputs = self.inputs.unpackMultiRefs()
allInputs.update(self.prerequisites.unpackMultiRefs())
allInputs = self.inputs.unpackMultiRefs(self.task.storage_classes)
allInputs.update(self.prerequisites.unpackMultiRefs(self.task.storage_classes))
# Give the task's Connections class an opportunity to remove some
# inputs, or complain if they are unacceptable.
# This will raise if one of the check conditions is not met, which is
Expand All @@ -406,9 +424,11 @@
# input behave like a regular input; adjustQuantum should only raise
# NoWorkFound if a regular input is missing, and it shouldn't be
# possible for us to have generated ``self`` if that's true.
helper = AdjustQuantumHelper(inputs=allInputs, outputs=self.outputs.unpackMultiRefs())
helper = AdjustQuantumHelper(
inputs=allInputs, outputs=self.outputs.unpackMultiRefs(self.task.storage_classes)
)
helper.adjust_in_place(self.task.taskDef.connections, self.task.taskDef.label, self.dataId)
initInputs = self.task.initInputs.unpackSingleRefs()
initInputs = self.task.initInputs.unpackSingleRefs(self.task.storage_classes)
quantum_records: Optional[Mapping[str, DatastoreRecordData]] = None
if datastore_records is not None:
quantum_records = {}
Expand Down Expand Up @@ -472,6 +492,19 @@
self.prerequisites = _DatasetDict.fromSubset(datasetTypes.prerequisites, parent.prerequisites)
self.dataIds: set[DataCoordinate] = set()
self.quanta = {}
self.storage_classes = {
connection.name: connection.storageClass
for connection in self.taskDef.connections.allConnections.values()
}
self.storage_classes[
acc.CONFIG_INIT_OUTPUT_TEMPLATE.format(label=self.taskDef.label)
] = acc.CONFIG_INIT_OUTPUT_STORAGE_CLASS
self.storage_classes[
acc.LOG_OUTPUT_TEMPLATE.format(label=self.taskDef.label)
] = acc.LOG_OUTPUT_STORAGE_CLASS
self.storage_classes[
acc.METADATA_OUTPUT_TEMPLATE.format(label=self.taskDef.label)
] = acc.METADATA_OUTPUT_STORAGE_CLASS

def __repr__(self) -> str:
# Default dataclass-injected __repr__ gets caught in an infinite loop
Expand Down Expand Up @@ -518,6 +551,10 @@
this task with that data ID.
"""

storage_classes: dict[str, str]
"""Mapping from dataset type name to storage class declared by this task.
"""

def makeQuantumSet(
self,
missing: _DatasetDict,
Expand Down Expand Up @@ -548,15 +585,15 @@
# should be left in even though some follow up queries
# fail. This allows the pruning to start from this quantum
# with known issues, and prune other nodes it touches.
inputs = q.inputs.unpackMultiRefs()
inputs.update(q.prerequisites.unpackMultiRefs())
inputs = q.inputs.unpackMultiRefs(self.storage_classes)
inputs.update(q.prerequisites.unpackMultiRefs(self.storage_classes))

Check warning on line 589 in python/lsst/pipe/base/graphBuilder.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/graphBuilder.py#L588-L589

Added lines #L588 - L589 were not covered by tests
tmpQuantum = Quantum(
taskName=q.task.taskDef.taskName,
taskClass=q.task.taskDef.taskClass,
dataId=q.dataId,
initInputs=q.task.initInputs.unpackSingleRefs(),
initInputs=q.task.initInputs.unpackSingleRefs(self.storage_classes),
inputs=inputs,
outputs=q.outputs.unpackMultiRefs(),
outputs=q.outputs.unpackMultiRefs(self.storage_classes),
)
outputs.add(tmpQuantum)
else:
Expand Down Expand Up @@ -1376,8 +1413,14 @@
qset = task.makeQuantumSet(missing=self.missing, datastore_records=datastore_records)
graphInput[task.taskDef] = qset

taskInitInputs = {task.taskDef: task.initInputs.unpackSingleRefs().values() for task in self.tasks}
taskInitOutputs = {task.taskDef: task.initOutputs.unpackSingleRefs().values() for task in self.tasks}
taskInitInputs = {
task.taskDef: task.initInputs.unpackSingleRefs(task.storage_classes).values()
for task in self.tasks
}
taskInitOutputs = {
task.taskDef: task.initOutputs.unpackSingleRefs(task.storage_classes).values()
for task in self.tasks
}

globalInitOutputs: list[DatasetRef] = []
if self.globalInitOutputs is not None:
Expand Down
20 changes: 17 additions & 3 deletions python/lsst/pipe/base/script/transfer_from_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,26 @@
qgraph = QuantumGraph.loadUri(graph)

# Collect output refs that could be created by this graph.
output_refs: set[DatasetRef] = set(qgraph.globalInitOutputRefs())
original_output_refs: set[DatasetRef] = set(qgraph.globalInitOutputRefs())

Check warning on line 65 in python/lsst/pipe/base/script/transfer_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/transfer_from_graph.py#L65

Added line #L65 was not covered by tests
for task_def in qgraph.iterTaskGraph():
if refs := qgraph.initOutputRefs(task_def):
output_refs.update(refs)
original_output_refs.update(refs)

Check warning on line 68 in python/lsst/pipe/base/script/transfer_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/transfer_from_graph.py#L68

Added line #L68 was not covered by tests
for qnode in qgraph:
for refs in qnode.quantum.outputs.values():
output_refs.update(refs)
original_output_refs.update(refs)

Check warning on line 71 in python/lsst/pipe/base/script/transfer_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/transfer_from_graph.py#L71

Added line #L71 was not covered by tests

# Get data repository definitions from the QuantumGraph; these can have
# different storage classes than those in the quanta.
dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this line enough I'm starting to wonder if QuantumGraph could have a method that returned the dict.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 . I'm informally saving up a number of changes I'd like to make to QG.


# Convert output_refs to the data repository storage classes, too.
output_refs = set()

Check warning on line 78 in python/lsst/pipe/base/script/transfer_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/transfer_from_graph.py#L78

Added line #L78 was not covered by tests
for ref in original_output_refs:
internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType)

Check warning on line 80 in python/lsst/pipe/base/script/transfer_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/transfer_from_graph.py#L80

Added line #L80 was not covered by tests
if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name:
output_refs.add(ref.overrideStorageClass(internal_dataset_type.storageClass_name))

Check warning on line 82 in python/lsst/pipe/base/script/transfer_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/transfer_from_graph.py#L82

Added line #L82 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
output_refs.add(ref.overrideStorageClass(internal_dataset_type.storageClass_name))
output_refs.add(ref.overrideStorageClass(internal_dataset_type.storageClass))

since the storage class should be attached and can be used directly without having to proxy through the name string.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you think that's safe, sure. I tend to default to using the name when I'm not sure because I don't want to accidentally trigger imports of things that might not be available, but that may just be a habit relevant for working down in the guts of registry, not here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at this point the storage class is going to be resolved into a real storage class almost immediately so it won't matter. I guess there's no harm in going via the name here and then deferring sotrage class creation as much as possible.

else:
output_refs.add(ref)

Check warning on line 84 in python/lsst/pipe/base/script/transfer_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/transfer_from_graph.py#L84

Added line #L84 was not covered by tests

# Make QBB, its config is the same as output Butler.
qbb = QuantumBackedButler.from_predicted(
Expand All @@ -77,6 +90,7 @@
predicted_outputs=[],
dimensions=qgraph.universe,
datastore_records={},
dataset_types=dataset_types,
)

dest_butler = Butler(dest, writeable=True)
Expand Down