Skip to content

Commit

Permalink
Provide API for setting the fallback cache directory environment vari…
Browse files Browse the repository at this point in the history
…able

Also change the behavior such that if that variable gets set
after the cache manager has been instantiated, it will still
use that environment variable to make the default cache
directory if it has not yet been defined.
  • Loading branch information
timj committed Oct 6, 2022
1 parent 1474ca6 commit e0b746e
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 6 deletions.
76 changes: 70 additions & 6 deletions python/lsst/daf/butler/core/datastoreCacheManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import tempfile
from abc import ABC, abstractmethod
from collections import defaultdict
from random import Random
from typing import (
TYPE_CHECKING,
Dict,
Expand Down Expand Up @@ -455,9 +456,15 @@ class DatastoreCacheManager(AbstractDatastoreCacheManager):
The expiration mode should take the form ``mode=threshold`` so for
example to configure expiration to limit the cache directory to 5 datasets
the value would be ``datasets=5``.
Additionally the ``$DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET`` environment
variable can be used to indicate that this directory should be used
if no explicit directory has been specified from configuration or from
the ``$DAF_BUTLER_CACHE_DIRECTORY`` environment variable.
"""

_temp_exemption_prefix = "exempt/"
_tmpdir_prefix = "butler-cache-dir-"

def __init__(self, config: Union[str, DatastoreCacheManagerConfig], universe: DimensionUniverse):
super().__init__(config, universe)
Expand Down Expand Up @@ -524,13 +531,24 @@ def __init__(self, config: Union[str, DatastoreCacheManagerConfig], universe: Di
@property
def cache_directory(self) -> ResourcePath:
if self._cache_directory is None:
# Create on demand.
self._cache_directory = ResourcePath(
tempfile.mkdtemp(prefix="butler-"), forceDirectory=True, isTemporary=True
)
log.debug("Creating temporary cache directory at %s", self._cache_directory)
# Create on demand. Allow the override environment variable
# to be used in case it got set after this object was created
# but before a cache was used.
if cache_dir := os.environ.get("DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET"):
# Someone else will clean this up.
isTemporary = False
msg = "deferred fallback"
else:
cache_dir = tempfile.mkdtemp(prefix=self._tmpdir_prefix)
isTemporary = True
msg = "temporary"

self._cache_directory = ResourcePath(cache_dir, forceDirectory=True, isTemporary=isTemporary)
log.debug("Using %s cache directory at %s", msg, self._cache_directory)

# Remove when we no longer need it.
atexit.register(remove_cache_directory, self._cache_directory.ospath)
if isTemporary:
atexit.register(remove_cache_directory, self._cache_directory.ospath)
return self._cache_directory

@property
Expand All @@ -548,6 +566,52 @@ def cache_size(self) -> int:
def file_count(self) -> int:
return len(self._cache_entries)

@classmethod
def set_fallback_cache_directory_if_unset(cls) -> tuple[bool, str]:
"""Defines a fallback cache directory if a fallback not set already.
Returns
-------
defined : `bool`
`True` if the fallback directory was newly-defined in this method.
`False` if it had already been set.
cache_dir : `str`
Returns the path to the cache directory that will be used if it's
needed. This can allow the caller to run a directory cleanup
when it's no longer needed (something that the cache manager
can not do because forks should not clean up directories defined
by the parent process).
Notes
-----
The fallback directory will not be defined if one has already been
defined. This method sets the ``DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET``
environment variable only if a value has not previously been stored
in that environment variable. Setting the environment variable allows
this value to survive into spawned subprocesses. Calling this method
will lead to all subsequently created cache managers sharing the same
cache.
"""
if cache_dir := os.environ.get("DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET"):
# A value has already been set.
return (False, cache_dir)

# As a class method, we do not know at this point whether a cache
# directory will be needed so it would be impolite to create a
# directory that will never be used.

# Construct our own temp name -- 16 characters should have a fairly
# low chance of clashing when combined with the process ID.
characters = "abcdefghijklmnopqrstuvwxyz0123456789_"
rng = Random()
tempchars = "".join(rng.choice(characters) for _ in range(16))

tempname = f"{cls._tmpdir_prefix}{os.getpid()}-{tempchars}"

cache_dir = os.path.join(tempfile.gettempdir(), tempname)
os.environ["DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET"] = cache_dir
return (True, cache_dir)

def should_be_cached(self, entity: Union[DatasetRef, DatasetType, StorageClass]) -> bool:
# Docstring inherited
matchName: Union[LookupKey, str] = "{} (via default)".format(entity)
Expand Down
23 changes: 23 additions & 0 deletions tests/test_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,29 @@ def testEnvvarCacheDir(self):
cache_manager = self._make_cache_manager(config_str)
self.assertEqual(cache_manager.cache_directory, env_dir)

# Use the API to set the environment variable, making sure that the
# variable is reset on exit.
with unittest.mock.patch.dict(
os.environ,
{"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": ""},
):
defined, cache_dir = DatastoreCacheManager.set_fallback_cache_directory_if_unset()
self.assertTrue(defined)
cache_manager = self._make_cache_manager(config_str)
self.assertEqual(cache_manager.cache_directory, ResourcePath(cache_dir, forceDirectory=True))

# Now create the cache manager ahead of time and set the fallback
# later.
cache_manager = self._make_cache_manager(config_str)
self.assertIsNone(cache_manager._cache_directory)
with unittest.mock.patch.dict(
os.environ,
{"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": ""},
):
defined, cache_dir = DatastoreCacheManager.set_fallback_cache_directory_if_unset()
self.assertTrue(defined)
self.assertEqual(cache_manager.cache_directory, ResourcePath(cache_dir, forceDirectory=True))

def testExplicitCacheDir(self):
config_str = f"""
cached:
Expand Down

0 comments on commit e0b746e

Please sign in to comment.