Skip to content

Commit

Permalink
Merge pull request #951 from lsst/tickets/DM-42676
Browse files Browse the repository at this point in the history
DM-42676: Fix quantum backed butler trust mode with chained datastore
  • Loading branch information
timj committed Jan 29, 2024
2 parents 6af44e7 + 7f485cf commit b175c21
Show file tree
Hide file tree
Showing 18 changed files with 106 additions and 96 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ repos:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 23.12.1
rev: 24.1.0
hooks:
- id: black
# It is recommended to specify the latest version of Python
Expand All @@ -23,7 +23,7 @@ repos:
name: isort (python)
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.1.13
rev: v0.1.14
hooks:
- id: ruff
- repo: https://github.com/numpy/numpydoc
Expand Down
12 changes: 4 additions & 8 deletions python/lsst/daf/butler/_named.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,10 @@ def __getitem__(self, key: str | K) -> V_co:
raise NotImplementedError()

@overload
def get(self, key: object) -> V_co | None:
...
def get(self, key: object) -> V_co | None: ...

@overload
def get(self, key: object, default: V) -> V_co | V:
...
def get(self, key: object, default: V) -> V_co | V: ...

def get(self, key: Any, default: Any = None) -> Any:
return super().get(key, default)
Expand Down Expand Up @@ -312,12 +310,10 @@ def __getitem__(self, key: str | K_co) -> K_co:
raise NotImplementedError()

@overload
def get(self, key: object) -> K_co | None:
...
def get(self, key: object) -> K_co | None: ...

@overload
def get(self, key: object, default: V) -> K_co | V:
...
def get(self, key: object, default: V) -> K_co | V: ...

def get(self, key: Any, default: Any = None) -> Any:
"""Return the element with the given name.
Expand Down
11 changes: 6 additions & 5 deletions python/lsst/daf/butler/_quantum_backed.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,13 @@ def _initialize(
datasets=_DatasetRecordStorageManagerDatastoreConstructionMimic, # type: ignore
universe=dimensions,
)
# TODO: We need to inform `Datastore` here that it needs to support
# predictive reads; right now that's a configuration option, but after
# execution butler is retired it could just be a kwarg we pass here.
# For now just force this option as we cannot work without it.
butler_config["datastore", "trust_get_request"] = True
datastore = Datastore.fromConfig(butler_config, bridge_manager, butler_root)

# TODO: We need to inform `Datastore` here that it needs to support
# predictive reads; This only really works for file datastore but
# we need to try everything in case there is a chained datastore.
datastore._set_trust_mode(True)

if datastore_records is not None:
datastore.import_records(datastore_records)
storageClasses = StorageClassFactory()
Expand Down
24 changes: 21 additions & 3 deletions python/lsst/daf/butler/datastore/_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import time
from abc import ABCMeta, abstractmethod
from collections import abc, defaultdict
from collections.abc import Callable, Iterable, Iterator, Mapping
from collections.abc import Callable, Collection, Iterable, Iterator, Mapping
from typing import TYPE_CHECKING, Any, ClassVar

from lsst.utils import doImportType
Expand Down Expand Up @@ -476,6 +476,24 @@ def transaction(self) -> Iterator[DatastoreTransaction]:
self._transaction.commit()
self._transaction = self._transaction.parent

def _set_trust_mode(self, mode: bool) -> None:
"""Set the trust mode for this datastore.
Parameters
----------
mode : `bool`
If `True`, get requests will be attempted even if the datastore
does not know about the dataset.
Notes
-----
This is a private method to indicate that trust mode might be a
transitory property that we do not want to make fully public. For now
only a `~lsst.daf.butler.datastores.FileDatastore` understands this
concept. By default this method does nothing.
"""
return

@abstractmethod
def knows(self, ref: DatasetRef) -> bool:
"""Check if the dataset is known to the datastore.
Expand Down Expand Up @@ -835,7 +853,7 @@ def ingest(
def transfer_from(
self,
source_datastore: Datastore,
refs: Iterable[DatasetRef],
refs: Collection[DatasetRef],
transfer: str = "auto",
artifact_existence: dict[ResourcePath, bool] | None = None,
dry_run: bool = False,
Expand All @@ -847,7 +865,7 @@ def transfer_from(
source_datastore : `Datastore`
The datastore from which to transfer artifacts. That datastore
must be compatible with this datastore receiving the artifacts.
refs : iterable of `DatasetRef`
refs : `~collections.abc.Collection` of `DatasetRef`
The datasets to transfer from the source datastore.
transfer : `str`, optional
How (and whether) the dataset should be added to the datastore.
Expand Down
12 changes: 10 additions & 2 deletions python/lsst/daf/butler/datastores/chainedDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import logging
import time
import warnings
from collections.abc import Iterable, Mapping, Sequence
from collections.abc import Collection, Iterable, Mapping, Sequence
from typing import TYPE_CHECKING, Any

from lsst.daf.butler import DatasetRef, DatasetTypeNotSupportedError, FileDataset
Expand Down Expand Up @@ -264,6 +264,10 @@ def __str__(self) -> str:
chainName = ", ".join(str(ds) for ds in self.datastores)
return chainName

def _set_trust_mode(self, mode: bool) -> None:
for datastore in self.datastores:
datastore._set_trust_mode(mode)

def knows(self, ref: DatasetRef) -> bool:
"""Check if the dataset is known to any of the datastores.
Expand Down Expand Up @@ -1101,7 +1105,7 @@ def export(
def transfer_from(
self,
source_datastore: Datastore,
refs: Iterable[DatasetRef],
refs: Collection[DatasetRef],
transfer: str = "auto",
artifact_existence: dict[ResourcePath, bool] | None = None,
dry_run: bool = False,
Expand All @@ -1116,6 +1120,10 @@ def transfer_from(
# child datastores.
source_datastores = (source_datastore,)

if not refs:
# Nothing to transfer.
return set(), set()

# Need to know the set of all possible refs that could be transferred.
remaining_refs = set(refs)

Expand Down
11 changes: 5 additions & 6 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import hashlib
import logging
from collections import defaultdict
from collections.abc import Callable, Iterable, Mapping, Sequence
from collections.abc import Callable, Collection, Iterable, Mapping, Sequence
from typing import TYPE_CHECKING, Any, ClassVar, cast

from lsst.daf.butler import (
Expand Down Expand Up @@ -359,6 +359,9 @@ def roots(self) -> dict[str, ResourcePath | None]:
# Docstring inherited.
return {self.name: self.root}

def _set_trust_mode(self, mode: bool) -> None:
self.trustGetRequest = mode

def _artifact_exists(self, location: Location) -> bool:
"""Check that an artifact exists in this datastore at the specified
location.
Expand Down Expand Up @@ -2367,7 +2370,7 @@ def emptyTrash(self, ignore_errors: bool = True) -> None:
def transfer_from(
self,
source_datastore: Datastore,
refs: Iterable[DatasetRef],
refs: Collection[DatasetRef],
transfer: str = "auto",
artifact_existence: dict[ResourcePath, bool] | None = None,
dry_run: bool = False,
Expand Down Expand Up @@ -2400,10 +2403,6 @@ def transfer_from(
if artifact_existence is None:
artifact_existence = {}

# We will go through the list multiple times so must convert
# generators to lists.
refs = list(refs)

# In order to handle disassembled composites the code works
# at the records level since it can assume that internal APIs
# can be used.
Expand Down
16 changes: 8 additions & 8 deletions python/lsst/daf/butler/dimensions/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,9 @@ class _TopologySectionConfig(pydantic.BaseModel):
class _LegacyGovernorDimensionStorage(pydantic.BaseModel):
"""Legacy storage configuration for governor dimensions."""

cls: Literal[
cls: Literal["lsst.daf.butler.registry.dimensions.governor.BasicGovernorDimensionRecordStorage"] = (
"lsst.daf.butler.registry.dimensions.governor.BasicGovernorDimensionRecordStorage"
] = "lsst.daf.butler.registry.dimensions.governor.BasicGovernorDimensionRecordStorage"
)

has_own_table: ClassVar[Literal[True]] = True
"""Whether this dimension needs a database table to be defined."""
Expand All @@ -328,9 +328,9 @@ class _LegacyTableDimensionStorage(pydantic.BaseModel):
database.
"""

cls: Literal[
cls: Literal["lsst.daf.butler.registry.dimensions.table.TableDimensionRecordStorage"] = (
"lsst.daf.butler.registry.dimensions.table.TableDimensionRecordStorage"
] = "lsst.daf.butler.registry.dimensions.table.TableDimensionRecordStorage"
)

has_own_table: ClassVar[Literal[True]] = True
"""Whether this dimension element needs a database table to be defined."""
Expand All @@ -351,9 +351,9 @@ class _LegacyImpliedUnionDimensionStorage(pydantic.BaseModel):
this one.
"""

cls: Literal[
cls: Literal["lsst.daf.butler.registry.dimensions.query.QueryDimensionRecordStorage"] = (
"lsst.daf.butler.registry.dimensions.query.QueryDimensionRecordStorage"
] = "lsst.daf.butler.registry.dimensions.query.QueryDimensionRecordStorage"
)

view_of: str
"""The dimension that implies this one and defines its values."""
Expand All @@ -378,9 +378,9 @@ class _LegacyCachingDimensionStorage(pydantic.BaseModel):
records should be cached.
"""

cls: Literal[
cls: Literal["lsst.daf.butler.registry.dimensions.caching.CachingDimensionRecordStorage"] = (
"lsst.daf.butler.registry.dimensions.caching.CachingDimensionRecordStorage"
] = "lsst.daf.butler.registry.dimensions.caching.CachingDimensionRecordStorage"
)

nested: _LegacyTableDimensionStorage | _LegacyImpliedUnionDimensionStorage
"""Dimension storage configuration wrapped by this one."""
Expand Down
6 changes: 2 additions & 4 deletions python/lsst/daf/butler/dimensions/_record_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,10 @@ def __iter__(self) -> Iterator[DimensionRecord]:
yield self._get_record_at(self._table, i)

@overload
def __getitem__(self, index: int) -> DimensionRecord:
...
def __getitem__(self, index: int) -> DimensionRecord: ...

@overload
def __getitem__(self, index: slice) -> DimensionRecordTable:
...
def __getitem__(self, index: slice) -> DimensionRecordTable: ...

def __getitem__(self, index: int | slice) -> DimensionRecord | DimensionRecordTable:
if isinstance(index, slice):
Expand Down
6 changes: 2 additions & 4 deletions python/lsst/daf/butler/dimensions/_universe.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,14 +520,12 @@ def conform(
return DimensionGroup(self, names)

@overload
def sorted(self, elements: Iterable[Dimension], *, reverse: bool = False) -> Sequence[Dimension]:
...
def sorted(self, elements: Iterable[Dimension], *, reverse: bool = False) -> Sequence[Dimension]: ...

@overload
def sorted(
self, elements: Iterable[DimensionElement | str], *, reverse: bool = False
) -> Sequence[DimensionElement]:
...
) -> Sequence[DimensionElement]: ...

def sorted(self, elements: Iterable[Any], *, reverse: bool = False) -> list[Any]:
"""Return a sorted version of the given iterable of dimension elements.
Expand Down
6 changes: 3 additions & 3 deletions python/lsst/daf/butler/direct_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1471,9 +1471,9 @@ def ingest(

# Track DataIDs that are being ingested so we can spot issues early
# with duplication. Retain previous FileDataset so we can report it.
groupedDataIds: MutableMapping[
tuple[DatasetType, str], dict[DataCoordinate, FileDataset]
] = defaultdict(dict)
groupedDataIds: MutableMapping[tuple[DatasetType, str], dict[DataCoordinate, FileDataset]] = (
defaultdict(dict)
)

# And the nested loop that populates it:
for dataset in progress.wrap(datasets, desc="Grouping by dataset type"):
Expand Down
6 changes: 2 additions & 4 deletions python/lsst/daf/butler/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,12 @@ class SupportsSimple(Protocol):

_serializedType: Type

def to_simple(self, minimal: bool) -> Any:
...
def to_simple(self, minimal: bool) -> Any: ...

@classmethod
def from_simple(
cls, simple: Any, universe: DimensionUniverse | None = None, registry: Registry | None = None
) -> SupportsSimple:
...
) -> SupportsSimple: ...


def to_json_pydantic(self: SupportsSimple, minimal: bool = False) -> str:
Expand Down
6 changes: 2 additions & 4 deletions python/lsst/daf/butler/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,12 +486,10 @@ def __setitem__(self, index: int, value: Record) -> None:
self.root[index] = self._validate_record(value)

@overload
def __getitem__(self, index: int) -> ButlerLogRecord:
...
def __getitem__(self, index: int) -> ButlerLogRecord: ...

@overload
def __getitem__(self, index: slice) -> "ButlerLogRecords":
...
def __getitem__(self, index: slice) -> "ButlerLogRecords": ...

def __getitem__(self, index: slice | int) -> "Union[ButlerLogRecords, ButlerLogRecord]":
# Handles slices and returns a new collection in that
Expand Down
6 changes: 3 additions & 3 deletions python/lsst/daf/butler/persistence_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ class PersistenceContextVars:
the `SerializedDatasetRef`\ s was worth the memory savings.
"""

serializedDatasetTypeMapping: ContextVar[
dict[tuple[str, str], SerializedDatasetType] | None
] = ContextVar("serializedDatasetTypeMapping", default=None)
serializedDatasetTypeMapping: ContextVar[dict[tuple[str, str], SerializedDatasetType] | None] = (
ContextVar("serializedDatasetTypeMapping", default=None)
)
r"""A cache of `SerializedDatasetType`\ s.
"""

Expand Down

0 comments on commit b175c21

Please sign in to comment.