Skip to content

Commit

Permalink
Merge pull request #955 from lsst/tickets/DM-41880
Browse files Browse the repository at this point in the history
DM-41880: Support ChainedDatastore for Butler server
  • Loading branch information
dhirving committed Feb 2, 2024
2 parents 97c358c + 3a6d763 commit af21ca5
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 15 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-41880.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Butler server can now be configured to use a ChainedDatastore.
1 change: 1 addition & 0 deletions doc/changes/DM-41880.perf.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
``FileDatastore.knows()`` no longer requires database I/O if its input ``DatasetRef`` has datastore records attached.
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/_dataset_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@

# Per-dataset records grouped by opaque table name, usually there is just one
# opaque table.
DatasetDatastoreRecords: TypeAlias = Mapping[str, Iterable[StoredDatastoreItemInfo]]
DatasetDatastoreRecords: TypeAlias = Mapping[str, list[StoredDatastoreItemInfo]]


class AmbiguousDatasetError(Exception):
Expand Down
11 changes: 11 additions & 0 deletions python/lsst/daf/butler/datastores/chainedDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,17 @@ def get(

raise FileNotFoundError(f"Dataset {ref} could not be found in any of the datastores")

def prepare_get_for_external_client(self, ref: DatasetRef) -> object:
return self._get_matching_datastore(ref).prepare_get_for_external_client(ref)

def _get_matching_datastore(self, ref: DatasetRef) -> Datastore:
"""Return the first child datastore that owns the specified dataset."""
for datastore in self.datastores:
if datastore.knows(ref):
return datastore

raise FileNotFoundError(f"Dataset {ref} could not be found in any of the datastores")

def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None:
"""Write a InMemoryDataset with a given `DatasetRef` to each
datastore.
Expand Down
25 changes: 14 additions & 11 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

from lsst.daf.butler import (
Config,
DatasetDatastoreRecords,
DatasetId,
DatasetRef,
DatasetType,
Expand Down Expand Up @@ -457,12 +458,12 @@ def getStoredItemsInfo(
"""
# Try to get them from the ref first.
if ref._datastore_records is not None and not ignore_datastore_records:
if (ref_records := ref._datastore_records.get(self._table.name)) is not None:
# Need to make sure they have correct type.
for record in ref_records:
if not isinstance(record, StoredFileInfo):
raise TypeError(f"Datastore record has unexpected type {record.__class__.__name__}")
return cast(list[StoredFileInfo], ref_records)
ref_records = ref._datastore_records.get(self._table.name, [])
# Need to make sure they have correct type.
for record in ref_records:
if not isinstance(record, StoredFileInfo):
raise TypeError(f"Datastore record has unexpected type {record.__class__.__name__}")
return cast(list[StoredFileInfo], ref_records)

# Look for the dataset_id -- there might be multiple matches
# if we have disassembled the dataset.
Expand Down Expand Up @@ -1333,9 +1334,7 @@ def knows(self, ref: DatasetRef) -> bool:
exists : `bool`
`True` if the dataset is known to the datastore.
"""
# We cannot trust datastore records from ref, as many unit tests delete
# datasets and check their existence.
fileLocations = self._get_dataset_locations_info(ref, ignore_datastore_records=True)
fileLocations = self._get_dataset_locations_info(ref)
if fileLocations:
return True
return False
Expand Down Expand Up @@ -2065,10 +2064,14 @@ def to_file_info_payload(info: DatasetLocationInformation) -> FileDatastoreGetPa
datastoreRecords=file_info.to_simple(),
)

locations = self._get_dataset_locations_info(ref)
if len(locations) == 0:
raise FileNotFoundError(f"No artifacts found for DatasetId '{ref.id}'")

return FileDatastoreGetPayload(
datastore_type="file",
dataset_ref=ref.to_simple(),
file_info=[to_file_info_payload(info) for info in self._get_dataset_locations_info(ref)],
file_info=[to_file_info_payload(info) for info in locations],
)

@transactional
Expand Down Expand Up @@ -2153,7 +2156,7 @@ def put_new(self, in_memory_dataset: Any, ref: DatasetRef) -> Mapping[str, Datas
storedInfo = self._write_in_memory_to_artifact(in_memory_dataset, ref)
artifacts.append((ref, storedInfo))

ref_records = {self._opaque_table_name: [info for _, info in artifacts]}
ref_records: DatasetDatastoreRecords = {self._opaque_table_name: [info for _, info in artifacts]}
ref = ref.replace(datastore_records=ref_records)
return {self.name: ref}

Expand Down
26 changes: 26 additions & 0 deletions tests/config/basic/server.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# This is a rough approximation of the configuration on the idfprod environment
datastore:
cls: lsst.daf.butler.datastores.chainedDatastore.ChainedDatastore
datastore_constraints:
# One entry per datastore in datastores section
# Use empty `-` if no constraint override required
- constraints:
reject:
- all
- constraints:
accept:
- all
datastores:
- datastore:
name: FileDatastore@s3://immutable-bucket/dc2
cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore
root: s3://immutable-bucket/dc2
- datastore:
name: FileDatastore@s3://mutable-bucket
cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore
root: s3://mutable-bucket/
records:
table: user_datastore_records
formatters: !include formatters.yaml
composites: !include composites.yaml
storageClasses: !include storageClasses.yaml
11 changes: 11 additions & 0 deletions tests/test_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,17 @@ def test_roots(self):
if root is not None:
self.assertTrue(root.exists())

def test_prepare_get_for_external_client(self):
datastore = self.makeDatastore()
storageClass = self.storageClassFactory.getStorageClass("StructuredData")
dimensions = self.universe.conform(("visit", "physical_filter"))
dataId = {"instrument": "dummy", "visit": 52, "physical_filter": "V", "band": "v"}
ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId)
with self.assertRaises(FileNotFoundError):
# Most of the coverage for this function is in test_server.py,
# because it requires a file backend that supports URL signing.
datastore.prepare_get_for_external_client(ref)


class PosixDatastoreNoChecksumsTestCase(PosixDatastoreTestCase):
"""Posix datastore tests but with checksums disabled."""
Expand Down
8 changes: 5 additions & 3 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,18 @@ def setUpClass(cls):
# Note that all files are stored in memory.
cls.enterClassContext(clean_test_environment_for_s3())
cls.enterClassContext(mock_aws())
bucket_name = "anybucketname" # matches s3Datastore.yaml
getS3Client().create_bucket(Bucket=bucket_name)

# matches server.yaml
for bucket in ["mutable-bucket", "immutable-bucket"]:
getS3Client().create_bucket(Bucket=bucket)

cls.storageClassFactory = StorageClassFactory()

# First create a butler and populate it.
cls.root = makeTestTempDir(TESTDIR)
cls.repo = MetricTestRepo(
root=cls.root,
configFile=os.path.join(TESTDIR, "config/basic/butler-s3store.yaml"),
configFile=os.path.join(TESTDIR, "config/basic/server.yaml"),
forceConfigRoot=False,
)
# Add a file with corrupted data for testing error conditions
Expand Down

0 comments on commit af21ca5

Please sign in to comment.