Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-36918: Protect against the new cache file being deleted before it can be registered #750

Merged
merged 5 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/lsst/daf/butler/_butler.py
Expand Up @@ -2160,7 +2160,7 @@ def transfer_from(
source_butler: Butler,
source_refs: Iterable[DatasetRef],
transfer: str = "auto",
id_gen_map: Dict[str, DatasetIdGenEnum] = None,
id_gen_map: Dict[str, DatasetIdGenEnum] | None = None,
skip_missing: bool = True,
register_dataset_types: bool = False,
transfer_dimensions: bool = False,
Expand Down Expand Up @@ -2397,7 +2397,7 @@ def validateConfiguration(
self,
logFailures: bool = False,
datasetTypeNames: Optional[Iterable[str]] = None,
ignore: Iterable[str] = None,
ignore: Iterable[str] | None = None,
) -> None:
"""Validate butler configuration.

Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/_butlerConfig.py
Expand Up @@ -66,7 +66,7 @@ class ButlerConfig(Config):
def __init__(
self,
other: Optional[Union[ResourcePathExpression, Config]] = None,
searchPaths: Sequence[ResourcePathExpression] = None,
searchPaths: Sequence[ResourcePathExpression] | None = None,
):

self.configDir: Optional[ResourcePath] = None
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/core/datasets/ref.py
Expand Up @@ -88,7 +88,7 @@ def direct(
id: Optional[Union[str, int]] = None,
datasetType: Optional[Dict[str, Any]] = None,
dataId: Optional[Dict[str, Any]] = None,
run: str = None,
run: str | None = None,
component: Optional[str] = None,
) -> SerializedDatasetRef:
"""Construct a `SerializedDatasetRef` directly without validators.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/core/datastore.py
Expand Up @@ -484,7 +484,7 @@ def exists(self, datasetRef: DatasetRef) -> bool:
def get(
self,
datasetRef: DatasetRef,
parameters: Mapping[str, Any] = None,
parameters: Mapping[str, Any] | None = None,
storageClass: Optional[Union[StorageClass, str]] = None,
) -> Any:
"""Load an `InMemoryDataset` from the store.
Expand Down
15 changes: 10 additions & 5 deletions python/lsst/daf/butler/core/datastoreCacheManager.py
Expand Up @@ -751,7 +751,7 @@ def remove_from_cache(self, refs: Union[DatasetRef, Iterable[DatasetRef]]) -> No
keys_to_remove.append(key)
self._remove_from_cache(keys_to_remove)

def _register_cache_entry(self, cached_location: ResourcePath, can_exist: bool = False) -> str:
def _register_cache_entry(self, cached_location: ResourcePath, can_exist: bool = False) -> Optional[str]:
"""Record the file in the cache registry.

Parameters
Expand All @@ -766,8 +766,9 @@ def _register_cache_entry(self, cached_location: ResourcePath, can_exist: bool =

Returns
-------
cache_key : `str`
The key used in the registry for this file.
cache_key : `str` or `None`
The key used in the registry for this file. `None` if the file
no longer exists (it could have been expired by another process).
"""
path_in_cache = cached_location.relative_to(self.cache_directory)
if path_in_cache is None:
Expand All @@ -783,7 +784,10 @@ def _register_cache_entry(self, cached_location: ResourcePath, can_exist: bool =
f"Cached file {cached_location} is already known to the registry"
" but this was expected to be a new file."
)
details = CacheEntry.from_file(cached_location, root=self.cache_directory)
try:
details = CacheEntry.from_file(cached_location, root=self.cache_directory)
except FileNotFoundError:
return None
self._cache_entries[path_in_cache] = details
return path_in_cache

Expand All @@ -799,7 +803,8 @@ def scan_cache(self) -> None:
continue

path_in_cache = self._register_cache_entry(file, can_exist=True)
found.add(path_in_cache)
if path_in_cache:
found.add(path_in_cache)

# Find any files that were recorded in the cache but are no longer
# on disk. (something else cleared them out?)
Expand Down
24 changes: 16 additions & 8 deletions python/lsst/daf/butler/core/named.py
Expand Up @@ -68,7 +68,7 @@ def name(self) -> str:
V_co = TypeVar("V_co", covariant=True)


class NamedKeyMapping(Mapping[K_co, V_co]):
class NamedKeyMapping(Mapping[K, V_co]):
"""Custom mapping class.

An abstract base class for custom mappings whose keys are objects with
Expand Down Expand Up @@ -107,21 +107,21 @@ def byName(self) -> Dict[str, V_co]:
return dict(zip(self.names, self.values()))

@abstractmethod
def keys(self) -> NamedValueAbstractSet[K_co]: # type: ignore
def keys(self) -> NamedValueAbstractSet[K]: # type: ignore
# TODO: docs
raise NotImplementedError()

@abstractmethod
def __getitem__(self, key: Union[str, K_co]) -> V_co:
def __getitem__(self, key: Union[str, K]) -> V_co:
raise NotImplementedError()

def get(self, key: Union[str, K_co], default: Any = None) -> Any:
def get(self, key: Union[str, K], default: Any = None) -> Any:
# Delegating to super is not allowed by typing, because it doesn't
# accept str, but we know it just delegates to __getitem__, which does.
return super().get(key, default) # type: ignore


NameLookupMapping = Union[NamedKeyMapping[K_co, V_co], Mapping[str, V_co]]
NameLookupMapping = Union[NamedKeyMapping[K, V_co], Mapping[str, V_co]]
"""A type annotation alias for signatures that want to use ``mapping[s]``
(or ``mapping.get(s)``) where ``s`` is a `str`, and don't care whether
``mapping.keys()`` returns named objects or direct `str` instances.
Expand Down Expand Up @@ -532,9 +532,17 @@ def discard(self, element: Union[str, K]) -> Any:
def pop(self, *args: str) -> K:
# Docstring inherited.
if not args:
return super().pop()
else:
return self._mapping.pop(*args)
# Parent is abstract method and we cannot call MutableSet
# implementation directly. Instead follow MutableSet and
# choose first element from iteration.
it = iter(self._mapping)
try:
value = next(it)
except StopIteration:
raise KeyError from None
args = (value,)

return self._mapping.pop(*args)

def update(self, elements: Iterable[K]) -> None:
"""Add multiple new elements to the set.
Expand Down
6 changes: 4 additions & 2 deletions python/lsst/daf/butler/core/storageClass.py
Expand Up @@ -350,7 +350,7 @@ def knownParameters(self) -> Set[str]:
known.update(sc.knownParameters())
return known

def validateParameters(self, parameters: Collection = None) -> None:
def validateParameters(self, parameters: Collection | None = None) -> None:
"""Check that the parameters are known to this `StorageClass`.

Does not check the values.
Expand Down Expand Up @@ -381,7 +381,9 @@ def validateParameters(self, parameters: Collection = None) -> None:
unknown = "', '".join(diff)
raise KeyError(f"Parameter{s} '{unknown}' not understood by StorageClass {self.name}")

def filterParameters(self, parameters: Mapping[str, Any], subset: Collection = None) -> Mapping[str, Any]:
def filterParameters(
self, parameters: Mapping[str, Any], subset: Collection | None = None
) -> Mapping[str, Any]:
"""Filter out parameters that are not known to this `StorageClass`.

Parameters
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/datastores/chainedDatastore.py
Expand Up @@ -180,7 +180,7 @@ def __init__(
self,
config: Union[Config, str],
bridgeManager: DatastoreRegistryBridgeManager,
butlerRoot: str = None,
butlerRoot: str | None = None,
):
super().__init__(config, bridgeManager)

Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/datastores/fileDatastore.py
Expand Up @@ -244,7 +244,7 @@ def __init__(
self,
config: Union[DatastoreConfig, str],
bridgeManager: DatastoreRegistryBridgeManager,
butlerRoot: str = None,
butlerRoot: str | None = None,
):
super().__init__(config, bridgeManager)
if "root" not in self.config:
Expand Down
6 changes: 2 additions & 4 deletions python/lsst/daf/butler/formatters/file.py
Expand Up @@ -223,8 +223,7 @@ def fromBytes(self, serializedDataset: bytes, component: Optional[str] = None) -
if not hasattr(self, "_fromBytes"):
raise NotImplementedError("Type does not support reading from bytes.")

# mypy can not understand that the previous line protects this call
data = self._fromBytes(serializedDataset, self.fileDescriptor.storageClass.pytype) # type: ignore
data = self._fromBytes(serializedDataset, self.fileDescriptor.storageClass.pytype)

# Assemble the requested dataset and potentially return only its
# component coercing it to its appropriate pytype
Expand Down Expand Up @@ -281,5 +280,4 @@ def toBytes(self, inMemoryDataset: Any) -> bytes:
if not hasattr(self, "_toBytes"):
raise NotImplementedError("Type does not support reading from bytes.")

# mypy can not understand that the previous line protects this call
return self._toBytes(inMemoryDataset) # type: ignore
return self._toBytes(inMemoryDataset)
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/formatters/yaml.py
Expand Up @@ -47,7 +47,7 @@ class YamlFormatter(FileFormatter):
"""Allow the normal yaml.dump to be used to write the YAML. Use this
if you know that your class has registered representers."""

def _readFile(self, path: str, pytype: Type[Any] = None) -> Any:
def _readFile(self, path: str, pytype: Type[Any] | None = None) -> Any:
"""Read a file from the path in YAML format.

Parameters
Expand Down
Expand Up @@ -179,7 +179,7 @@ def initialize(
specs = CollectionSummaryTables.makeTableSpecs(collections, dimensions)
tables = CollectionSummaryTables(
datasetType=context.addTable("collection_summary_dataset_type", specs.datasetType),
dimensions=NamedKeyDict(
dimensions=NamedKeyDict[GovernorDimension, sqlalchemy.schema.Table](
{
dimension: context.addTable(f"collection_summary_{dimension.name}", spec)
for dimension, spec in specs.dimensions.items()
Expand Down
Expand Up @@ -85,7 +85,7 @@ class Node(ABC):
Possibly empty list of sub-nodes.
"""

def __init__(self, children: Tuple[Node, ...] = None):
def __init__(self, children: Tuple[Node, ...] | None = None):
self.children = tuple(children or ())

@abstractmethod
Expand Down
21 changes: 21 additions & 0 deletions tests/test_utils.py
Expand Up @@ -148,6 +148,27 @@ def testOperators(self):
self.checkOperator(ab ^ bc, {self.a, self.c})
self.checkOperator(ab - bc, {self.a})

def testPop(self):
# Construct with list for repeatable ordering.
nvs = NamedValueSet([self.a, self.b, self.c])
self.assertEqual(nvs.pop("c"), self.c)
self.assertEqual(nvs.pop(), self.a)
self.assertEqual(nvs.pop(), self.b)
self.assertEqual(nvs.pop("d", self.c), self.c)
with self.assertRaises(KeyError):
nvs.pop()

def testRemove(self):
nvs = NamedValueSet([self.a, self.b, self.c])
nvs.remove("b")
self.assertIn("a", nvs)
self.assertNotIn("b", nvs)
with self.assertRaises(KeyError):
nvs.remove("d")
nvs.discard("d")
nvs.discard("a")
self.assertNotIn("a", nvs)


class GlobToRegexTestCase(unittest.TestCase):
def testStarInList(self):
Expand Down