Skip to content

Commit

Permalink
Improve storage class override logic (DM-37995)
Browse files Browse the repository at this point in the history
In trusted mode the datastore has to use original registry storage class
to search for artifacts. Because datastore has no access to registry
a new method was added to Datastore class to specify a way to retrieve
registry dataset type based on its name (dependency inversion).

Butler instances use this new method to provide access to to dataset
type mapping. In case of QBB, when registry does not exist this mapping
has to be provided when constructing QBB.
  • Loading branch information
andy-slac committed Feb 21, 2023
1 parent d99b741 commit 89534f8
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 16 deletions.
14 changes: 14 additions & 0 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
ConflictingDefinitionError,
DataIdError,
DatasetIdGenEnum,
MissingDatasetTypeError,
Registry,
RegistryConfig,
RegistryDefaults,
Expand Down Expand Up @@ -296,6 +297,12 @@ def __init__(
log.error(f"Failed to instantiate Butler from config {self._config.configFile}.")
raise

# For execution butler the datastore needs a special
# dependency-inversion trick. This is not used by regular butler,
# but we do not have a way to distinguish regular butler from execution
# butler.
self.datastore.set_retrieve_dataset_type_method(self._retrieve_dataset_type)

if "run" in self._config or "collection" in self._config:
raise ValueError("Passing a run or collection via configuration is no longer supported.")

Expand All @@ -307,6 +314,13 @@ def __init__(
code.
"""

def _retrieve_dataset_type(self, name: str) -> DatasetType | None:
"""Return DatasetType defined in registry given dataset type name."""
try:
return self.registry.getDatasetType(name)
except MissingDatasetTypeError:
return None

@classmethod
def get_repo_uri(cls, label: str) -> ResourcePath:
"""Look up the label in a butler repository index.
Expand Down
30 changes: 29 additions & 1 deletion python/lsst/daf/butler/_quantum_backed.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
Config,
DatasetId,
DatasetRef,
DatasetType,
Datastore,
DatastoreRecordData,
DimensionUniverse,
Expand Down Expand Up @@ -153,6 +154,7 @@ def __init__(
dimensions: DimensionUniverse,
datastore: Datastore,
storageClasses: StorageClassFactory,
dataset_types: Mapping[str, DatasetType] | None = None,
):
self._dimensions = dimensions
self._predicted_inputs = set(predicted_inputs)
Expand All @@ -163,6 +165,10 @@ def __init__(
self._actual_output_refs: Set[DatasetRef] = set()
self.datastore = datastore
self.storageClasses = storageClasses
self._dataset_types: Mapping[str, DatasetType] = {}
if dataset_types is not None:
self._dataset_types = dataset_types
self.datastore.set_retrieve_dataset_type_method(self._retrieve_dataset_type)

@classmethod
def initialize(
Expand All @@ -174,6 +180,7 @@ def initialize(
OpaqueManagerClass: Type[OpaqueTableStorageManager] = ByNameOpaqueTableStorageManager,
BridgeManagerClass: Type[DatastoreRegistryBridgeManager] = MonolithicDatastoreRegistryBridgeManager,
search_paths: Optional[List[str]] = None,
dataset_types: Mapping[str, DatasetType] | None = None,
) -> QuantumBackedButler:
"""Construct a new `QuantumBackedButler` from repository configuration
and helper types.
Expand All @@ -200,6 +207,8 @@ def initialize(
location records. Default is a SQL-backed implementation.
search_paths : `list` of `str`, optional
Additional search paths for butler configuration.
dataset_types: `Mapping` [`str`, `DatasetType`]
Mapping of the dataset type name to its registry definition.
"""
predicted_inputs = [
ref.getCheckedId() for ref in itertools.chain.from_iterable(quantum.inputs.values())
Expand All @@ -218,6 +227,7 @@ def initialize(
OpaqueManagerClass=OpaqueManagerClass,
BridgeManagerClass=BridgeManagerClass,
search_paths=search_paths,
dataset_types=dataset_types,
)

@classmethod
Expand All @@ -232,6 +242,7 @@ def from_predicted(
OpaqueManagerClass: Type[OpaqueTableStorageManager] = ByNameOpaqueTableStorageManager,
BridgeManagerClass: Type[DatastoreRegistryBridgeManager] = MonolithicDatastoreRegistryBridgeManager,
search_paths: Optional[List[str]] = None,
dataset_types: Mapping[str, DatasetType] | None = None,
) -> QuantumBackedButler:
"""Construct a new `QuantumBackedButler` from sets of input and output
dataset IDs.
Expand Down Expand Up @@ -261,6 +272,8 @@ def from_predicted(
location records. Default is a SQL-backed implementation.
search_paths : `list` of `str`, optional
Additional search paths for butler configuration.
dataset_types: `Mapping` [`str`, `DatasetType`]
Mapping of the dataset type name to its registry definition.
"""
return cls._initialize(
config=config,
Expand All @@ -272,6 +285,7 @@ def from_predicted(
OpaqueManagerClass=OpaqueManagerClass,
BridgeManagerClass=BridgeManagerClass,
search_paths=search_paths,
dataset_types=dataset_types,
)

@classmethod
Expand All @@ -287,6 +301,7 @@ def _initialize(
OpaqueManagerClass: Type[OpaqueTableStorageManager] = ByNameOpaqueTableStorageManager,
BridgeManagerClass: Type[DatastoreRegistryBridgeManager] = MonolithicDatastoreRegistryBridgeManager,
search_paths: Optional[List[str]] = None,
dataset_types: Mapping[str, DatasetType] | None = None,
) -> QuantumBackedButler:
"""Internal method with common implementation used by `initialize` and
`for_output`.
Expand Down Expand Up @@ -315,6 +330,8 @@ def _initialize(
location records. Default is a SQL-backed implementation.
search_paths : `list` of `str`, optional
Additional search paths for butler configuration.
dataset_types: `Mapping` [`str`, `DatasetType`]
Mapping of the dataset type name to its registry definition.
"""
butler_config = ButlerConfig(config, searchPaths=search_paths)
if "root" in butler_config:
Expand Down Expand Up @@ -342,7 +359,18 @@ def _initialize(
datastore.import_records(datastore_records)
storageClasses = StorageClassFactory()
storageClasses.addFromConfig(butler_config)
return cls(predicted_inputs, predicted_outputs, dimensions, datastore, storageClasses=storageClasses)
return cls(
predicted_inputs,
predicted_outputs,
dimensions,
datastore,
storageClasses=storageClasses,
dataset_types=dataset_types,
)

def _retrieve_dataset_type(self, name: str) -> DatasetType | None:
"""Return DatasetType defined in registry given dataset type name."""
return self._dataset_types.get(name)

def isWriteable(self) -> bool:
# Docstring inherited.
Expand Down
19 changes: 19 additions & 0 deletions python/lsst/daf/butler/core/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1201,3 +1201,22 @@ def export_records(
Exported datastore records indexed by datastore name.
"""
raise NotImplementedError()

def set_retrieve_dataset_type_method(self, method: Callable[[str], DatasetType | None] | None) -> None:
"""Specify a method that can be used by datastore to retrieve
registry-defined dataset type.
Parameters
----------
method : `~collections.abc.Callable` | `None`
Method that takes a name of the dataset type and returns a
corresponding `DatasetType` instance as defined in Registry. If
dataset type name is not known to registry `None` is returned.
Notes
-----
This method si only needed for a Datastore supporting a "trusted" mode
when it does not have an access to datastore records and needs to
guess dataset location based on its stored dataset type.
"""
pass
35 changes: 34 additions & 1 deletion python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import hashlib
import logging
from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -182,6 +183,11 @@ class FileDatastore(GenericBaseDatastore):
or relative to a search path. Can be None if no defaults specified.
"""

_retrieve_dataset_method: Callable[[str], DatasetType | None] | None = None
"""Callable that is used in trusted mode to retrieve registry definition
of a named dataset type.
"""

@classmethod
def setConfigRoot(cls, root: str, config: Config, full: Config, overwrite: bool = True) -> None:
"""Set any filesystem-dependent config options for this Datastore to
Expand Down Expand Up @@ -592,6 +598,9 @@ def _prepare_for_get(
"""
log.debug("Retrieve %s from %s with parameters %s", ref, self.name, parameters)

# For trusted mode need to reset storage class.
ref = self._cast_storage_class(ref)

# Get file metadata and internal metadata
fileLocations = self._get_dataset_locations_info(ref)
if not fileLocations:
Expand Down Expand Up @@ -1348,7 +1357,7 @@ def _read_artifact_into_memory(
self.cacheManager.move_to_cache(local_uri, cache_ref)

return self._post_process_get(
result, getInfo.readStorageClass, getInfo.assemblerParams, isComponent=isComponent
result, ref.datasetType.storageClass, getInfo.assemblerParams, isComponent=isComponent
)

def knows(self, ref: DatasetRef) -> bool:
Expand Down Expand Up @@ -1645,6 +1654,7 @@ def exists(self, ref: DatasetRef) -> bool:
compute node could remove the file from the object store even
though it is present in the local cache.
"""
ref = self._cast_storage_class(ref)
fileLocations = self._get_dataset_locations_info(ref)

# if we are being asked to trust that registry might not be correct
Expand Down Expand Up @@ -2890,3 +2900,26 @@ def export_records(self, refs: Iterable[DatasetIdRef]) -> Mapping[str, Datastore

record_data = DatastoreRecordData(records=records)
return {self.name: record_data}

def set_retrieve_dataset_type_method(self, method: Callable[[str], DatasetType | None] | None) -> None:
# Docstring inherited from the base class.
self._retrieve_dataset_method = method

def _cast_storage_class(self, ref: DatasetRef) -> DatasetRef:
"""Update dataset reference to use the storage class from registry.
This does nothing for regular datasetores, and is only enabled for
trusted mode where we need to use registry definition of storage class
for some datastore methods. `set_retrieve_dataset_type_method` has to
be called beforehand.
"""
if self.trustGetRequest:
if self._retrieve_dataset_method is None:
# We could raise an exception here but unit tests do not define
# this method.
return ref
dataset_type = self._retrieve_dataset_method(ref.datasetType.name)
if dataset_type is not None:
if ref.datasetType.storageClass.name != dataset_type.storageClass.name:
ref = ref.overrideStorageClass(dataset_type.storageClass.name)
return ref
11 changes: 11 additions & 0 deletions tests/test_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
DataCoordinate,
DatasetRef,
DatasetRefURIs,
DatasetType,
DatasetTypeNotSupportedError,
Datastore,
DatastoreCacheManager,
Expand Down Expand Up @@ -453,6 +454,14 @@ def testTrustGetRequest(self):

# Check for compatible storage class
if sc_name in ("StructuredDataNoComponents", "StructuredData"):
# need a special method to generate stored dataset type
def _stored_dataset_type(name: str) -> DatasetType:
if name == ref.datasetType.name:
return ref.datasetType
raise ValueError(f"Unexpected dataset type name {ref.datasetType.name}")

datastore.set_retrieve_dataset_type_method(_stored_dataset_type)

# Storage class override with original dataset ref
metrics_as_dict = datastore.get(ref, storageClass="StructuredDataDictJson")
self.assertIsInstance(metrics_as_dict, dict)
Expand All @@ -467,6 +476,8 @@ def testTrustGetRequest(self):
# exists() should work as well
self.assertTrue(datastore.exists(ref_comp))

datastore.set_retrieve_dataset_type_method(None)

def testDisassembly(self):
"""Test disassembly within datastore."""
metrics = makeExampleMetrics()
Expand Down

0 comments on commit 89534f8

Please sign in to comment.