Skip to content

Commit

Permalink
Merge pull request #646 from lsst/tickets/DM-33638
Browse files Browse the repository at this point in the history
DM-33638: Check datastore cache for existence before checking remote datastore
  • Loading branch information
timj committed Feb 23, 2022
2 parents 0bfdc4b + b608633 commit 40211b7
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 48 deletions.

This file was deleted.

1 change: 0 additions & 1 deletion python/lsst/daf/butler/configs/datastores/s3Datastore.yaml

This file was deleted.

This file was deleted.

2 changes: 1 addition & 1 deletion python/lsst/daf/butler/core/datasets/type.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def componentTypeName(self, component: str) -> str:
"""
if component in self.storageClass.allComponents():
return self.nameWithComponent(self.name, component)
raise KeyError("Requested component ({}) not understood by this DatasetType".format(component))
raise KeyError(f"Requested component ({component}) not understood by this DatasetType ({self})")

def makeCompositeDatasetType(self) -> DatasetType:
"""Return a composite dataset type from the component.
Expand Down
146 changes: 142 additions & 4 deletions python/lsst/daf/butler/core/datastoreCacheManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ def from_file(cls, file: ResourcePath, root: ResourcePath) -> CacheEntry:
)


class _MarkerEntry(CacheEntry):
pass


class CacheRegistry(BaseModel):
"""Collection of cache entries."""

Expand All @@ -180,6 +184,9 @@ class CacheRegistry(BaseModel):
_entries: Dict[str, CacheEntry] = PrivateAttr({})
"""Internal collection of cache entries."""

_ref_map: Dict[DatasetId, List[str]] = PrivateAttr({})
"""Mapping of DatasetID to corresponding keys in cache registry."""

@property
def cache_size(self) -> int:
return self._size
Expand All @@ -191,9 +198,15 @@ def __setitem__(self, key: str, entry: CacheEntry) -> None:
self._size += entry.size
self._entries[key] = entry

# Update the mapping from ref to path.
if entry.ref not in self._ref_map:
self._ref_map[entry.ref] = []
self._ref_map[entry.ref].append(key)

def __delitem__(self, key: str) -> None:
entry = self._entries.pop(key)
self._decrement(entry)
self._ref_map[entry.ref].remove(key)

def _decrement(self, entry: Optional[CacheEntry]) -> None:
if entry:
Expand All @@ -220,11 +233,51 @@ def values(self) -> ValuesView[CacheEntry]:
def items(self) -> ItemsView[str, CacheEntry]:
return self._entries.items()

def pop(self, key: str, default: Optional[CacheEntry] = None) -> Optional[CacheEntry]:
entry = self._entries.pop(key, default)
# An private marker to indicate that pop() should raise if no default
# is given.
__marker = _MarkerEntry(name="marker", size=0, ref=0, ctime=datetime.datetime.utcfromtimestamp(0))

def pop(self, key: str, default: Optional[CacheEntry] = __marker) -> Optional[CacheEntry]:
# The marker for dict.pop is not the same as our marker.
if default is self.__marker:
entry = self._entries.pop(key)
else:
entry = self._entries.pop(key, self.__marker)
# Should not attempt to correct for this entry being removed
# if we got the default value.
if entry is self.__marker:
return default

self._decrement(entry)
# The default entry given to this method may not even be in the cache.
if entry and entry.ref in self._ref_map:
keys = self._ref_map[entry.ref]
if key in keys:
keys.remove(key)
return entry

def get_dataset_keys(self, dataset_id: Optional[DatasetId]) -> Optional[List[str]]:
"""Retrieve all keys associated with the given dataset ID.
Parameters
----------
dataset_id : `DatasetId` or `None`
The dataset ID to look up. Returns `None` if the ID is `None`.
Returns
-------
keys : `list` [`str`]
Keys associated with this dataset. These keys can be used to lookup
the cache entry information in the `CacheRegistry`. Returns
`None` if the dataset is not known to the cache.
"""
if dataset_id not in self._ref_map:
return None
keys = self._ref_map[dataset_id]
if not keys:
return None
return keys


class DatastoreCacheManagerConfig(ConfigSubset):
"""Configuration information for `DatastoreCacheManager`."""
Expand Down Expand Up @@ -282,6 +335,35 @@ def should_be_cached(self, entity: Union[DatasetRef, DatasetType, StorageClass])
"""
raise NotImplementedError()

@abstractmethod
def known_to_cache(self, ref: DatasetRef, extension: Optional[str] = None) -> bool:
"""Report if the dataset is known to the cache.
Parameters
----------
ref : `DatasetRef`
Dataset to check for in the cache.
extension : `str`, optional
File extension expected. Should include the leading "``.``".
If `None` the extension is ignored and the dataset ID alone is
used to check in the cache. The extension must be defined if
a specific component is being checked.
Returns
-------
known : `bool`
Returns `True` if the dataset is currently known to the cache
and `False` otherwise.
Notes
-----
This method can only report if the dataset is known to the cache
in this specific instant and does not indicate whether the file
can be read from the cache later. `find_in_cache()` should be called
if the cached file is to be used.
"""
raise NotImplementedError()

@abstractmethod
def move_to_cache(self, uri: ResourcePath, ref: DatasetRef) -> Optional[ResourcePath]:
"""Move a file to the cache.
Expand Down Expand Up @@ -651,7 +733,56 @@ def scan_cache(self) -> None:
"Entries no longer on disk but thought to be in cache and so removed: %s", ",".join(missing)
)
for path_in_cache in missing:
self._cache_entries.pop(path_in_cache)
self._cache_entries.pop(path_in_cache, None)

def known_to_cache(self, ref: DatasetRef, extension: Optional[str] = None) -> bool:
"""Report if the dataset is known to the cache.
Parameters
----------
ref : `DatasetRef`
Dataset to check for in the cache.
extension : `str`, optional
File extension expected. Should include the leading "``.``".
If `None` the extension is ignored and the dataset ID alone is
used to check in the cache. The extension must be defined if
a specific component is being checked.
Returns
-------
known : `bool`
Returns `True` if the dataset is currently known to the cache
and `False` otherwise. If the dataset refers to a component and
an extension is given then only that component is checked.
Notes
-----
This method can only report if the dataset is known to the cache
in this specific instant and does not indicate whether the file
can be read from the cache later. `find_in_cache()` should be called
if the cached file is to be used.
This method does not force the cache to be re-scanned and so can miss
cached datasets that have recently been written by other processes.
"""
if self._cache_directory is None:
return False
if self.file_count == 0:
return False

if extension is None:
# Look solely for matching dataset ref ID and not specific
# components.
cached_paths = self._cache_entries.get_dataset_keys(ref.id)
return True if cached_paths else False

else:
# Extension is known so we can do an explicit look up for the
# cache entry.
cached_location = self._construct_cache_name(ref, extension)
path_in_cache = cached_location.relative_to(self.cache_directory)
assert path_in_cache is not None # For mypy
return path_in_cache in self._cache_entries

def _remove_from_cache(self, cache_entries: Iterable[str]) -> None:
"""Remove the specified cache entries from cache.
Expand All @@ -665,7 +796,7 @@ def _remove_from_cache(self, cache_entries: Iterable[str]) -> None:
for entry in cache_entries:
path = self.cache_directory.join(entry)

self._cache_entries.pop(entry)
self._cache_entries.pop(entry, None)
log.debug("Removing file from cache: %s", path)
try:
path.remove()
Expand Down Expand Up @@ -822,5 +953,12 @@ def remove_from_cache(self, ref: Union[DatasetRef, Iterable[DatasetRef]]) -> Non
"""
return

def known_to_cache(self, ref: DatasetRef, extension: Optional[str] = None) -> bool:
"""Report if a dataset is known to the cache.
Always returns `False`.
"""
return False

def __str__(self) -> str:
return f"{type(self).__name__}()"
57 changes: 51 additions & 6 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1397,13 +1397,29 @@ def _process_mexists_records(

location_factory = self.locationFactory

for ref_id, info in records.items():
# Key is the dataId, value is list of StoredItemInfo
uris = [info.file_location(location_factory).uri for info in info]
uris_to_check.extend(uris)
uri_existence: Dict[ResourcePath, bool] = {}
for ref_id, infos in records.items():
# Key is the dataset Id, value is list of StoredItemInfo
uris = [info.file_location(location_factory).uri for info in infos]
location_map.update({uri: ref_id for uri in uris})

uri_existence: Dict[ResourcePath, bool] = {}
# Check the local cache directly for a dataset corresponding
# to the remote URI.
if self.cacheManager.file_count > 0:
ref = id_to_ref[ref_id]
for uri, storedFileInfo in zip(uris, infos):
check_ref = ref
if not ref.datasetType.isComponent() and (component := storedFileInfo.component):
check_ref = ref.makeComponentRef(component)
if self.cacheManager.known_to_cache(check_ref, uri.getExtension()):
# Proxy for URI existence.
uri_existence[uri] = True
else:
uris_to_check.append(uri)
else:
# Check all of them.
uris_to_check.extend(uris)

if artifact_existence is not None:
# If a URI has already been checked remove it from the list
# and immediately add the status to the output dict.
Expand Down Expand Up @@ -1455,6 +1471,15 @@ def mexists(
-------
existence : `dict` of [`DatasetRef`, `bool`]
Mapping from dataset to boolean indicating existence.
Notes
-----
To minimize potentially costly remote existence checks, the local
cache is checked as a proxy for existence. If a file for this
`DatasetRef` does exist no check is done for the actual URI. This
could result in possibly unexpected behavior if the dataset itself
has been removed from the datastore by another process whilst it is
still in the cache.
"""
chunk_size = 10_000
dataset_existence: Dict[DatasetRef, bool] = {}
Expand Down Expand Up @@ -1556,6 +1581,13 @@ def exists(self, ref: DatasetRef) -> bool:
-------
exists : `bool`
`True` if the entity exists in the `Datastore`.
Notes
-----
The local cache is checked as a proxy for existence in the remote
object store. It is possible that another process on a different
compute node could remove the file from the object store even
though it is present in the local cache.
"""
fileLocations = self._get_dataset_locations_info(ref)

Expand All @@ -1565,6 +1597,12 @@ def exists(self, ref: DatasetRef) -> bool:
if not self.trustGetRequest:
return False

# First check the cache. If it is not found we must check
# the datastore itself. Assume that any component in the cache
# means that the dataset does exist somewhere.
if self.cacheManager.known_to_cache(ref):
return True

# When we are guessing a dataset location we can not check
# for the existence of every component since we can not
# know if every component was written. Instead we check
Expand All @@ -1575,7 +1613,14 @@ def exists(self, ref: DatasetRef) -> bool:
return False

# All listed artifacts must exist.
for location, _ in fileLocations:
for location, storedFileInfo in fileLocations:
# Checking in cache needs the component ref.
check_ref = ref
if not ref.datasetType.isComponent() and (component := storedFileInfo.component):
check_ref = ref.makeComponentRef(component)
if self.cacheManager.known_to_cache(check_ref, location.getExtension()):
continue

if not self._artifact_exists(location):
return False

Expand Down

0 comments on commit 40211b7

Please sign in to comment.