Skip to content

Commit

Permalink
Refactor dimension record storage and caching.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Jan 8, 2024
1 parent 5beb2e9 commit 0cb8b8e
Show file tree
Hide file tree
Showing 20 changed files with 833 additions and 2,117 deletions.
54 changes: 1 addition & 53 deletions python/lsst/daf/butler/dimensions/_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,14 @@
from types import MappingProxyType
from typing import TYPE_CHECKING

from lsst.utils import doImportType
from lsst.utils.classes import cached_getter

from .._named import NamedKeyMapping, NamedValueAbstractSet, NamedValueSet
from .._named import NamedValueAbstractSet, NamedValueSet
from .._topology import TopologicalFamily, TopologicalSpace
from ._elements import Dimension, DimensionCombination, DimensionElement, KeyColumnSpec, MetadataColumnSpec
from .construction import DimensionConstructionBuilder, DimensionConstructionVisitor

if TYPE_CHECKING:
from ..registry.interfaces import (
Database,
DatabaseDimensionRecordStorage,
GovernorDimensionRecordStorage,
StaticTablesContext,
)
from ._governor import GovernorDimension
from ._universe import DimensionUniverse

Expand Down Expand Up @@ -252,51 +245,6 @@ def temporal(self) -> DatabaseTopologicalFamily | None:
# Docstring inherited from TopologicalRelationshipEndpoint
return self.topology.get(TopologicalSpace.TEMPORAL)

def makeStorage(
self,
db: Database,
*,
context: StaticTablesContext | None = None,
governors: NamedKeyMapping[GovernorDimension, GovernorDimensionRecordStorage],
view_target: DatabaseDimensionRecordStorage | None = None,
) -> DatabaseDimensionRecordStorage:
"""Make the dimension record storage instance for this database.
Constructs the `DimensionRecordStorage` instance that should
be used to back this element in a registry.
Parameters
----------
db : `Database`
Interface to the underlying database engine and namespace.
context : `StaticTablesContext`, optional
If provided, an object to use to create any new tables. If not
provided, ``db.ensureTableExists`` should be used instead.
governors : `NamedKeyMapping`
Mapping from `GovernorDimension` to the record storage backend for
that dimension, containing all governor dimensions.
view_target : `DatabaseDimensionRecordStorage`, optional
Storage object for the element this target's storage is a view of
(i.e. when `viewOf` is not `None`).
Returns
-------
storage : `DatabaseDimensionRecordStorage`
Storage object that should back this element in a registry.
"""
from ..registry.interfaces import DatabaseDimensionRecordStorage

cls = doImportType(self._storage["cls"])
assert issubclass(cls, DatabaseDimensionRecordStorage)
return cls.initialize(
db,
self,
context=context,
config=self._storage,
governors=governors,
view_target=view_target,
)


class DatabaseDimension(Dimension, DatabaseDimensionElement):
"""A `Dimension` class that maps directly to a database table or query.
Expand Down
36 changes: 0 additions & 36 deletions python/lsst/daf/butler/dimensions/_governor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,12 @@

from collections.abc import Iterable, Mapping, Set
from types import MappingProxyType
from typing import TYPE_CHECKING

from lsst.utils import doImportType

from .._named import NamedValueAbstractSet, NamedValueSet
from .._topology import TopologicalFamily, TopologicalSpace
from ._elements import Dimension, KeyColumnSpec, MetadataColumnSpec
from .construction import DimensionConstructionBuilder, DimensionConstructionVisitor

if TYPE_CHECKING:
from ..registry.interfaces import Database, GovernorDimensionRecordStorage, StaticTablesContext


class GovernorDimension(Dimension):
"""Governor dimension.
Expand Down Expand Up @@ -160,36 +154,6 @@ def documentation(self) -> str:
# Docstring inherited from DimensionElement.
return self._doc

def makeStorage(
self,
db: Database,
*,
context: StaticTablesContext | None = None,
) -> GovernorDimensionRecordStorage:
"""Make storage record.
Constructs the `DimensionRecordStorage` instance that should
be used to back this element in a registry.
Parameters
----------
db : `Database`
Interface to the underlying database engine and namespace.
context : `StaticTablesContext`, optional
If provided, an object to use to create any new tables. If not
provided, ``db.ensureTableExists`` should be used instead.
Returns
-------
storage : `GovernorDimensionRecordStorage`
Storage object that should back this element in a registry.
"""
from ..registry.interfaces import GovernorDimensionRecordStorage

cls = doImportType(self._storage["cls"])
assert issubclass(cls, GovernorDimensionRecordStorage)
return cls.initialize(db, self, context=context, config=self._storage)


class GovernorDimensionConstructionVisitor(DimensionConstructionVisitor):
"""A construction visitor for `GovernorDimension`.
Expand Down
19 changes: 14 additions & 5 deletions python/lsst/daf/butler/dimensions/_record_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,16 @@ def find_with_required_values(
self._by_required_values[required_values] = result
return result

def add(self, value: DimensionRecord) -> None:
def add(self, value: DimensionRecord, replace: bool = True) -> None:
"""Add a new record to the set.
Parameters
----------
value : `DimensionRecord`
Record to add.
replace : `bool`, optional
If `True` (default) replace any existing record with the same data
ID. If `False` the existing record will be kept.
Raises
------
Expand All @@ -396,23 +399,29 @@ def add(self, value: DimensionRecord) -> None:
raise ValueError(
f"Cannot add record {value} for {value.definition.name!r} to set for {self.element!r}."
)
self._by_required_values[value.dataId.required_values] = value
if replace:
self._by_required_values[value.dataId.required_values] = value
else:
self._by_required_values.setdefault(value.dataId.required_values, value)

def update(self, values: Iterable[DimensionRecord]) -> None:
def update(self, values: Iterable[DimensionRecord], replace: bool = True) -> None:
"""Add new records to the set.
Parameters
----------
values : `~collections.abc.Iterable` [ `DimensionRecord` ]
Record to add.
Records to add.
replace : `bool`, optional
If `True` (default) replace any existing records with the same data
IDs. If `False` the existing records will be kept.
Raises
------
ValueError
Raised if ``value.element != self.element``.
"""
for value in values:
self.add(value)
self.add(value, replace=replace)

def update_from_data_coordinates(self, data_coordinates: Iterable[DataCoordinate]) -> None:
"""Add records to the set by extracting and deduplicating them from
Expand Down
16 changes: 0 additions & 16 deletions python/lsst/daf/butler/dimensions/_skypix.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
from .construction import DimensionConstructionBuilder, DimensionConstructionVisitor

if TYPE_CHECKING:
from ..registry.interfaces import SkyPixDimensionRecordStorage
from ._universe import DimensionUniverse


Expand Down Expand Up @@ -168,21 +167,6 @@ def has_own_table(self) -> bool:
# Docstring inherited from DimensionElement.
return False

def makeStorage(self) -> SkyPixDimensionRecordStorage:
"""Make the storage record.
Constructs the `DimensionRecordStorage` instance that should
be used to back this element in a registry.
Returns
-------
storage : `SkyPixDimensionRecordStorage`
Storage object that should back this element in a registry.
"""
from ..registry.dimensions.skypix import BasicSkyPixDimensionRecordStorage

return BasicSkyPixDimensionRecordStorage(self)

@property
def unique_keys(self) -> NamedValueAbstractSet[KeyColumnSpec]:
# Docstring inherited from DimensionElement.
Expand Down
74 changes: 69 additions & 5 deletions python/lsst/daf/butler/dimensions/record_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import copy
from collections.abc import Callable, Iterator, Mapping
from contextlib import contextmanager

from ._record_set import DimensionRecordSet
from ._universe import DimensionUniverse
Expand All @@ -51,6 +52,11 @@ class DimensionRecordCache(Mapping[str, DimensionRecordSet]):
element name to a `DimensionRecordSet` of all records for that element.
They keys of the returned `dict` must be exactly the elements in
``universe`` for which `DimensionElement.is_cached` is `True`.
Notes
-----
The nested `DimensionRecordSet` objects should never be modified in place
except when returned by the `modifying` context manager.
"""

def __init__(self, universe: DimensionUniverse, fetch: Callable[[], dict[str, DimensionRecordSet]]):
Expand All @@ -63,6 +69,63 @@ def reset(self) -> None:
"""Reset the cache, causing it to be fetched again on next use."""
self._records = None

@contextmanager
def modifying(self, element: str) -> Iterator[DimensionRecordSet | None]:
"""Return a context manager for modifying the cache and database
content consistently.
Parameters
----------
element : `str`
Name of the dimension element whose records will be modified.
If this is not a cached record, `None` will be returned and the
context manager does nothing.
Returns
-------
context : `contextlib.AbstractContextManager` [ `DimensionRecordSet` \
or `None` ]
A context manager that when entered returns a `DimensionRecordSet`
that should be modified in-place, or `None` if the cache is
currently reset.
Notes
-----
The returned context manager resets the cache when entered, and only
restores the cache (along with the modifications to the returned
`DimensionRecordSet`) if an exception is not raised during the context.
It also takes care of updating any cache for "implied union" dimensions
(e.g. ``band``, in the default dimension universe) when their targets
are updated (e.g. ``physical_filter``).
"""
if element in self:
records = self._records
self._records = None
if records is None:
yield None
else:
yield records[element]
for other_element_records in records.values():
other_element = other_element_records.element
# If we've just updated the records of a dimension element
# that is the implied union target of another (i.e. we've
# updated physical_filter, and thus possibly updated the
# set of band values). We need to update the cache for
# the implied union target (i.e. band), too.
if (
other_element.implied_union_target is not None
and other_element.implied_union_target.name == element
):
other_element_records.update(
other_element.RecordClass(
**{other_element.name: getattr(record, other_element.name)}
)
for record in records[element]
)
self._records = records
else:
yield None

def load_from(self, other: DimensionRecordCache) -> None:
"""Load records from another cache, but do nothing if it doesn't
currently have any records.
Expand All @@ -74,11 +137,12 @@ def load_from(self, other: DimensionRecordCache) -> None:
"""
self._records = copy.deepcopy(other._records)

def __contains__(self, element: object) -> bool:
if self._records is None:
return element in self._keys
else:
return element in self._records
def __contains__(self, key: object) -> bool:
if not isinstance(key, str):
return False

Check warning on line 142 in python/lsst/daf/butler/dimensions/record_cache.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/dimensions/record_cache.py#L142

Added line #L142 was not covered by tests
if (element := self._universe.get(key)) is not None:
return element.is_cached
return False

Check warning on line 145 in python/lsst/daf/butler/dimensions/record_cache.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/dimensions/record_cache.py#L145

Added line #L145 was not covered by tests

def __getitem__(self, element: str) -> DimensionRecordSet:
if self._records is None:
Expand Down

0 comments on commit 0cb8b8e

Please sign in to comment.