Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-38305: Fix race condition in DatasetRecordStorageManager refresh #804

Merged
merged 2 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-38305.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix occasional crashes in Butler `refresh()` method due to a race condition in dataset types refresh.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def refresh(self) -> None:
# Docstring inherited from DatasetRecordStorageManager.
byName: dict[str, ByDimensionsDatasetRecordStorage] = {}
byId: dict[int, ByDimensionsDatasetRecordStorage] = {}
dataset_types: dict[int, DatasetType] = {}
c = self._static.dataset_type.columns
with self._db.query(self._static.dataset_type.select()) as sql_result:
sql_rows = sql_result.mappings().fetchall()
Expand Down Expand Up @@ -223,9 +224,10 @@ def refresh(self) -> None:
)
byName[datasetType.name] = storage
byId[storage._dataset_type_id] = storage
dataset_types[row["id"]] = datasetType
self._byName = byName
self._byId = byId
self._summaries.refresh(lambda dataset_type_id: self._byId[dataset_type_id].datasetType)
self._summaries.refresh(dataset_types)

def remove(self, name: str) -> None:
# Docstring inherited from DatasetRecordStorageManager.
Expand Down
41 changes: 21 additions & 20 deletions python/lsst/daf/butler/registry/datasets/byDimensions/summaries.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

__all__ = ("CollectionSummaryManager",)

from collections.abc import Callable, Iterable
from collections.abc import Iterable, Mapping
from typing import Any, Generic, TypeVar

import sqlalchemy
Expand Down Expand Up @@ -250,14 +250,15 @@ def update(
# in-memory cache.
self.get(collection).update(summary)

def refresh(self, get_dataset_type: Callable[[int], DatasetType]) -> None:
def refresh(self, dataset_types: Mapping[int, DatasetType]) -> None:
"""Load all collection summary information from the database.

Parameters
----------
get_dataset_type : `Callable`
Function that takes an `int` dataset_type_id value and returns a
`DatasetType` instance.
dataset_types : `~collections.abc.Mapping` [`int`, `DatasetType`]
Mapping of an `int` dataset_type_id value to `DatasetType`
instance. Summaries are only loaded for dataset types that appear
in this mapping.
"""
# Set up the SQL query we'll use to fetch all of the summary
# information at once.
Expand Down Expand Up @@ -289,21 +290,21 @@ def refresh(self, get_dataset_type: Callable[[int], DatasetType]) -> None:
collectionKey = row[self._collectionKeyName]
# dataset_type_id should also never be None/NULL; it's in the first
# table we joined.
datasetType = get_dataset_type(row["dataset_type_id"])
# See if we have a summary already for this collection; if not,
# make one.
summary = summaries.get(collectionKey)
if summary is None:
summary = CollectionSummary()
summaries[collectionKey] = summary
# Update the dimensions with the values in this row that aren't
# None/NULL (many will be in general, because these enter the query
# via LEFT OUTER JOIN).
summary.dataset_types.add(datasetType)
for dimension in self._tables.dimensions:
value = row[dimension.name]
if value is not None:
summary.governors.setdefault(dimension.name, set()).add(value)
if datasetType := dataset_types.get(row["dataset_type_id"]):
# See if we have a summary already for this collection; if not,
# make one.
summary = summaries.get(collectionKey)
if summary is None:
summary = CollectionSummary()
summaries[collectionKey] = summary
# Update the dimensions with the values in this row that
# aren't None/NULL (many will be in general, because these
# enter the query via LEFT OUTER JOIN).
summary.dataset_types.add(datasetType)
for dimension in self._tables.dimensions:
value = row[dimension.name]
if value is not None:
summary.governors.setdefault(dimension.name, set()).add(value)
self._cache = summaries

def get(self, collection: CollectionRecord) -> CollectionSummary:
Expand Down