Skip to content

Commit

Permalink
Re-enable dataset type caching using new class DatasetTypeCache.
Browse files Browse the repository at this point in the history
Unlike collection caches, dataset type cache is always on, this
helps to reduce number of queries in `pipetask run` without the need
to explicitly enable caching in multiple places.
  • Loading branch information
andy-slac committed Nov 13, 2023
1 parent 14721a2 commit 1e59418
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 8 deletions.
15 changes: 15 additions & 0 deletions python/lsst/daf/butler/registry/_caching_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@

__all__ = ["CachingContext"]

from typing import TYPE_CHECKING

from ._collection_record_cache import CollectionRecordCache
from ._collection_summary_cache import CollectionSummaryCache
from ._dataset_type_cache import DatasetTypeCache

if TYPE_CHECKING:
from .interfaces import DatasetRecordStorage


class CachingContext:
Expand All @@ -45,6 +51,9 @@ class CachingContext:
instances which will be `None` when caching is disabled. Instance of this
class is passed to the relevant managers that can use it to query or
populate caches when caching is enabled.
Dataset type cache is always enabled for now, this avoids the need for
explicitly enabling caching in pipetask executors.
"""

collection_records: CollectionRecordCache | None = None
Expand All @@ -53,6 +62,12 @@ class is passed to the relevant managers that can use it to query or
collection_summaries: CollectionSummaryCache | None = None
"""Cache for collection summary records (`CollectionSummaryCache`)."""

dataset_types: DatasetTypeCache[DatasetRecordStorage]
"""Cache for dataset types, never disabled (`DatasetTypeCache`)."""

def __init__(self) -> None:
self.dataset_types = DatasetTypeCache()

def enable(self) -> None:
"""Enable caches, initializes all caches."""
self.collection_records = CollectionRecordCache()
Expand Down
162 changes: 162 additions & 0 deletions python/lsst/daf/butler/registry/_dataset_type_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# This file is part of daf_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = ("DatasetTypeCache",)

from collections.abc import Iterable, Iterator
from typing import Generic, TypeVar

from .._dataset_type import DatasetType

_T = TypeVar("_T")


class DatasetTypeCache(Generic[_T]):
"""Cache for dataset types.
Notes
-----
This class caches mapping of dataset type name to a corresponding
`DatasetType` instance. Registry manager also needs to cache corresponding
"storage" instance, so this class allows storing additional opaque object
along with the dataset type.
In come contexts (e.g. ``resolve_wildcard``) a full list of dataset types
is needed. To signify that cache content can be used in such contexts,
cache defines special ``full`` flag that needs to be set by client.
"""

def __init__(self) -> None:
self._cache: dict[str, tuple[DatasetType, _T | None]] = {}
self._full = False

@property
def full(self) -> bool:
"""`True` if cache holds all known dataset types (`bool`)."""
return self._full

def add(self, dataset_type: DatasetType, extra: _T | None = None) -> None:
"""Add one record to the cache.
Parameters
----------
dataset_type : `DatasetType`
Dataset type, replaces any existing dataset type with the same
name.
extra : `Any`, optional
Additional opaque object stored with this dataset type.
"""
self._cache[dataset_type.name] = (dataset_type, extra)

def set(self, data: Iterable[DatasetType | tuple[DatasetType, _T | None]], *, full: bool = False) -> None:
"""Replace cache contents with the new set of dataset types.
Parameters
----------
data : `~collections.abc.Iterable`
Sequence of `DatasetType` instances or tuples of `DatasetType` and
an extra opaque object.
full : `bool`
If `True` then ``data`` contains all known dataset types.
"""
self.clear()
for item in data:
if isinstance(item, DatasetType):
item = (item, None)

Check warning on line 91 in python/lsst/daf/butler/registry/_dataset_type_cache.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/_dataset_type_cache.py#L91

Added line #L91 was not covered by tests
self._cache[item[0].name] = item
self._full = full

def clear(self) -> None:
"""Remove everything from the cache."""
self._cache = {}
self._full = False

def discard(self, name: str) -> None:
"""Remove named dataset type from the cache.
Parameters
----------
name : `str`
Name of the dataset type to remove.
"""
self._cache.pop(name, None)

Check warning on line 108 in python/lsst/daf/butler/registry/_dataset_type_cache.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/_dataset_type_cache.py#L108

Added line #L108 was not covered by tests

def get(self, name: str) -> tuple[DatasetType | None, _T | None]:
"""Return cached info given dataset type name.
Parameters
----------
name : `str`
Dataset type name.
Returns
-------
dataset_type : `DatasetType` or `None`
Cached dataset type, `None` is returned if the name is not in the
cache.
extra : `Any` or `None`
Cached opaque data, `None` is returned if the name is not in the
cache or no extra info was stored for this dataset type.
"""
item = self._cache.get(name)
if item is None:
return (None, None)
return item

def get_dataset_type(self, name: str) -> DatasetType | None:
"""Return dataset type given its name.
Parameters
----------
name : `str`
Dataset type name.
Returns
-------
dataset_type : `DatasetType` or `None`
Cached dataset type, `None` is returned if the name is not in the
cache.
"""
item = self._cache.get(name)

Check warning on line 146 in python/lsst/daf/butler/registry/_dataset_type_cache.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/_dataset_type_cache.py#L146

Added line #L146 was not covered by tests
if item is None:
return None
return item[0]

Check warning on line 149 in python/lsst/daf/butler/registry/_dataset_type_cache.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/_dataset_type_cache.py#L148-L149

Added lines #L148 - L149 were not covered by tests

def items(self) -> Iterator[tuple[DatasetType, _T | None]]:
"""Return iterator for the set of items in the cache, can only be
used if `full` is true.
Raises
------
RuntimeError
Raised if ``self.full`` is `False`.
"""
if not self._full:
raise RuntimeError("cannot call items() if cache is not full")

Check warning on line 161 in python/lsst/daf/butler/registry/_dataset_type_cache.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/_dataset_type_cache.py#L161

Added line #L161 was not covered by tests
return iter(self._cache.values())
66 changes: 58 additions & 8 deletions python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ class ByDimensionsDatasetRecordStorageManagerBase(DatasetRecordStorageManager):
tables used by this class.
summaries : `CollectionSummaryManager`
Structure containing tables that summarize the contents of collections.
caching_context : `CachingContext`
Object controlling caching of information returned by managers.
"""

def __init__(
Expand All @@ -131,6 +133,7 @@ def __init__(
dimensions: DimensionRecordStorageManager,
static: StaticDatasetTablesTuple,
summaries: CollectionSummaryManager,
caching_context: CachingContext,
registry_schema_version: VersionTuple | None = None,
):
super().__init__(registry_schema_version=registry_schema_version)
Expand All @@ -139,6 +142,7 @@ def __init__(
self._dimensions = dimensions
self._static = static
self._summaries = summaries
self._caching_context = caching_context

@classmethod
def initialize(
Expand Down Expand Up @@ -170,6 +174,7 @@ def initialize(
dimensions=dimensions,
static=static,
summaries=summaries,
caching_context=caching_context,
registry_schema_version=registry_schema_version,
)

Expand Down Expand Up @@ -237,7 +242,8 @@ def addDatasetForeignKey(

def refresh(self) -> None:
# Docstring inherited from DatasetRecordStorageManager.
pass
if self._caching_context.dataset_types is not None:
self._caching_context.dataset_types.clear()

def _make_storage(self, record: _DatasetTypeRecord) -> ByDimensionsDatasetRecordStorage:
"""Create storage instance for a dataset type record."""
Expand Down Expand Up @@ -286,8 +292,28 @@ def remove(self, name: str) -> None:

def find(self, name: str) -> DatasetRecordStorage | None:
# Docstring inherited from DatasetRecordStorageManager.
if self._caching_context.dataset_types is not None:
_, storage = self._caching_context.dataset_types.get(name)
if storage is not None:
return storage
else:
# On the first cache miss populate the cache with complete list
# of dataset types (if it was not done yet).
if not self._caching_context.dataset_types.full:
self._fetch_dataset_types()
# Try again
_, storage = self._caching_context.dataset_types.get(name)
if self._caching_context.dataset_types.full:
# If not in cache then dataset type is not defined.
return storage
record = self._fetch_dataset_type_record(name)

Check warning on line 309 in python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py#L309

Added line #L309 was not covered by tests
return self._make_storage(record) if record is not None else None
if record is not None:
storage = self._make_storage(record)

Check warning on line 311 in python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py#L311

Added line #L311 was not covered by tests
if self._caching_context.dataset_types is not None:
self._caching_context.dataset_types.add(storage.datasetType, storage)
return storage

Check warning on line 314 in python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py#L313-L314

Added lines #L313 - L314 were not covered by tests
else:
return None

Check warning on line 316 in python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py#L316

Added line #L316 was not covered by tests

def register(self, datasetType: DatasetType) -> bool:
# Docstring inherited from DatasetRecordStorageManager.
Expand Down Expand Up @@ -316,7 +342,7 @@ def register(self, datasetType: DatasetType) -> bool:
self.getIdColumnType(),
),
)
_, inserted = self._db.sync(
row, inserted = self._db.sync(
self._static.dataset_type,
keys={"name": datasetType.name},
compared={
Expand All @@ -331,16 +357,24 @@ def register(self, datasetType: DatasetType) -> bool:
},
returning=["id", "tag_association_table"],
)
# Make sure that cache is updated
if self._caching_context.dataset_types is not None and row is not None:
record = _DatasetTypeRecord(
dataset_type=datasetType,
dataset_type_id=row["id"],
tag_table_name=tagTableName,
calib_table_name=calibTableName,
)
storage = self._make_storage(record)
self._caching_context.dataset_types.add(datasetType, storage)
else:
if datasetType != record.dataset_type:
raise ConflictingDefinitionError(
f"Given dataset type {datasetType} is inconsistent "
f"with database definition {record.dataset_type}."
)
inserted = False
# TODO: We return storage instance from this method, but the only
# client that uses this method ignores it. Maybe we should drop it
# and avoid making storage instance above.

return bool(inserted)

def resolve_wildcard(
Expand Down Expand Up @@ -472,7 +506,15 @@ def getDatasetRef(self, id: DatasetId) -> DatasetRef | None:
row = sql_result.mappings().fetchone()
if row is None:
return None
storage = self._make_storage(self._record_from_row(row))
record = self._record_from_row(row)
storage: DatasetRecordStorage | None = None
if self._caching_context.dataset_types is not None:
_, storage = self._caching_context.dataset_types.get(record.dataset_type.name)
if storage is None:
storage = self._make_storage(record)

Check warning on line 514 in python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py#L514

Added line #L514 was not covered by tests
if self._caching_context.dataset_types is not None:
self._caching_context.dataset_types.add(storage.datasetType, storage)

Check warning on line 516 in python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py#L516

Added line #L516 was not covered by tests
assert isinstance(storage, ByDimensionsDatasetRecordStorage), "Not expected storage class"
return DatasetRef(
storage.datasetType,
dataId=storage.getDataId(id=id),
Expand Down Expand Up @@ -516,9 +558,17 @@ def _dataset_type_from_row(self, row: Mapping) -> DatasetType:

def _fetch_dataset_types(self) -> list[DatasetType]:
"""Fetch list of all defined dataset types."""
if self._caching_context.dataset_types is not None:
if self._caching_context.dataset_types.full:
return [dataset_type for dataset_type, _ in self._caching_context.dataset_types.items()]
with self._db.query(self._static.dataset_type.select()) as sql_result:
sql_rows = sql_result.mappings().fetchall()
return [self._record_from_row(row).dataset_type for row in sql_rows]
records = [self._record_from_row(row) for row in sql_rows]
# Cache everything and specify that cache is complete.
if self._caching_context.dataset_types is not None:
cache_data = [(record.dataset_type, self._make_storage(record)) for record in records]
self._caching_context.dataset_types.set(cache_data, full=True)
return [record.dataset_type for record in records]

def getCollectionSummary(self, collection: CollectionRecord) -> CollectionSummary:
# Docstring inherited from DatasetRecordStorageManager.
Expand Down

0 comments on commit 1e59418

Please sign in to comment.