Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-36412: Add new env var DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET #738

Merged
merged 4 commits into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-36412.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
A bug has been fixed in ``DatastoreCacheManager`` that triggered if two processes try to cache the same dataset simultaneously.
3 changes: 3 additions & 0 deletions doc/changes/DM-36412.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
``DatastoreCacheManager`` can now use an environment variable, ``$DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET``, to specify a cache directory to use if no explicit directory has been specified by configuration or by the ``$DAF_BUTLER_CACHE_DIRECTORY`` environment variable.
Additionally, a ``DatastoreCacheManager.set_fallback_cache_directory_if_unset()`` class method has been added that will set this environment variable with a suitable value.
This is useful for multiprocessing where each forked or spwaned subprocess needs to share the same cache directory.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ datastore:
threshold: 4
# Default cache value. This will be the default decision if no specific
# match is found in the "cacheable" section later on.
default: false
default: true
# Use a dict over list to simplify merging logic.
cacheable:
Exposure: true
Expand Down
90 changes: 84 additions & 6 deletions python/lsst/daf/butler/core/datastoreCacheManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import tempfile
from abc import ABC, abstractmethod
from collections import defaultdict
from random import Random
from typing import (
TYPE_CHECKING,
Dict,
Expand Down Expand Up @@ -455,16 +456,28 @@ class DatastoreCacheManager(AbstractDatastoreCacheManager):
The expiration mode should take the form ``mode=threshold`` so for
example to configure expiration to limit the cache directory to 5 datasets
the value would be ``datasets=5``.

Additionally the ``$DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET`` environment
variable can be used to indicate that this directory should be used
if no explicit directory has been specified from configuration or from
the ``$DAF_BUTLER_CACHE_DIRECTORY`` environment variable.
"""

_temp_exemption_prefix = "exempt/"
_tmpdir_prefix = "butler-cache-dir-"

def __init__(self, config: Union[str, DatastoreCacheManagerConfig], universe: DimensionUniverse):
super().__init__(config, universe)

# Set cache directory if it pre-exists, else defer creation until
# requested. Allow external override from environment.
root = os.environ.get("DAF_BUTLER_CACHE_DIRECTORY") or self.config.get("root")

# Allow the execution environment to override the default values
# so long as no default value has been set from the line above.
if root is None:
root = os.environ.get("DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET")

self._cache_directory = (
ResourcePath(root, forceAbsolute=True, forceDirectory=True) if root is not None else None
)
Expand Down Expand Up @@ -518,13 +531,24 @@ def __init__(self, config: Union[str, DatastoreCacheManagerConfig], universe: Di
@property
def cache_directory(self) -> ResourcePath:
if self._cache_directory is None:
# Create on demand.
self._cache_directory = ResourcePath(
tempfile.mkdtemp(prefix="butler-"), forceDirectory=True, isTemporary=True
)
log.debug("Creating temporary cache directory at %s", self._cache_directory)
# Create on demand. Allow the override environment variable
# to be used in case it got set after this object was created
# but before a cache was used.
if cache_dir := os.environ.get("DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET"):
# Someone else will clean this up.
isTemporary = False
msg = "deferred fallback"
else:
cache_dir = tempfile.mkdtemp(prefix=self._tmpdir_prefix)
isTemporary = True
msg = "temporary"

self._cache_directory = ResourcePath(cache_dir, forceDirectory=True, isTemporary=isTemporary)
log.debug("Using %s cache directory at %s", msg, self._cache_directory)

# Remove when we no longer need it.
atexit.register(remove_cache_directory, self._cache_directory.ospath)
if isTemporary:
atexit.register(remove_cache_directory, self._cache_directory.ospath)
return self._cache_directory

@property
Expand All @@ -542,6 +566,52 @@ def cache_size(self) -> int:
def file_count(self) -> int:
return len(self._cache_entries)

@classmethod
def set_fallback_cache_directory_if_unset(cls) -> tuple[bool, str]:
"""Defines a fallback cache directory if a fallback not set already.

Returns
-------
defined : `bool`
`True` if the fallback directory was newly-defined in this method.
`False` if it had already been set.
cache_dir : `str`
Returns the path to the cache directory that will be used if it's
needed. This can allow the caller to run a directory cleanup
when it's no longer needed (something that the cache manager
can not do because forks should not clean up directories defined
by the parent process).

Notes
-----
The fallback directory will not be defined if one has already been
defined. This method sets the ``DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET``
environment variable only if a value has not previously been stored
in that environment variable. Setting the environment variable allows
this value to survive into spawned subprocesses. Calling this method
will lead to all subsequently created cache managers sharing the same
cache.
"""
if cache_dir := os.environ.get("DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET"):
# A value has already been set.
return (False, cache_dir)

# As a class method, we do not know at this point whether a cache
# directory will be needed so it would be impolite to create a
# directory that will never be used.

# Construct our own temp name -- 16 characters should have a fairly
# low chance of clashing when combined with the process ID.
characters = "abcdefghijklmnopqrstuvwxyz0123456789_"
rng = Random()
tempchars = "".join(rng.choice(characters) for _ in range(16))

tempname = f"{cls._tmpdir_prefix}{os.getpid()}-{tempchars}"

cache_dir = os.path.join(tempfile.gettempdir(), tempname)
os.environ["DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET"] = cache_dir
return (True, cache_dir)

def should_be_cached(self, entity: Union[DatasetRef, DatasetType, StorageClass]) -> bool:
# Docstring inherited
matchName: Union[LookupKey, str] = "{} (via default)".format(entity)
Expand Down Expand Up @@ -595,6 +665,14 @@ def move_to_cache(self, uri: ResourcePath, ref: DatasetRef) -> Optional[Resource
# item.
self._expire_cache()

# The above reset the in-memory cache status. It's entirely possible
# that another process has just cached this file (if multiple
# processes are caching on read), so check our in-memory cache
# before attempting to cache the dataset.
path_in_cache = cached_location.relative_to(self.cache_directory)
if path_in_cache and path_in_cache in self._cache_entries:
return cached_location
timj marked this conversation as resolved.
Show resolved Hide resolved

# Move into the cache. Given that multiple processes might be
# sharing a single cache directory, and the file we need might have
# been copied in whilst we were checking, allow overwrite without
Expand Down
73 changes: 73 additions & 0 deletions tests/test_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import tempfile
import time
import unittest
import unittest.mock
from collections import UserDict
from dataclasses import dataclass

Expand Down Expand Up @@ -1352,6 +1353,78 @@ def testNoCacheDirReversed(self):

self.assertCache(cache_manager)

def testEnvvarCacheDir(self):
config_str = f"""
cached:
root: '{self.root}'
cacheable:
metric0: true
"""

root = ResourcePath(self.root, forceDirectory=True)
env_dir = root.join("somewhere", forceDirectory=True)
elsewhere = root.join("elsewhere", forceDirectory=True)

# Environment variable should override the config value.
with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_CACHE_DIRECTORY": env_dir.ospath}):
cache_manager = self._make_cache_manager(config_str)
self.assertEqual(cache_manager.cache_directory, env_dir)

# This environment variable should not override the config value.
with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": env_dir.ospath}):
cache_manager = self._make_cache_manager(config_str)
self.assertEqual(cache_manager.cache_directory, root)

# No default setting.
config_str = """
cached:
root: null
default: true
cacheable:
metric1: false
"""
cache_manager = self._make_cache_manager(config_str)

# This environment variable should override the config value.
with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": env_dir.ospath}):
cache_manager = self._make_cache_manager(config_str)
self.assertEqual(cache_manager.cache_directory, env_dir)

# If both environment variables are set the main (not IF_UNSET)
# variable should win.
with unittest.mock.patch.dict(
os.environ,
{
"DAF_BUTLER_CACHE_DIRECTORY": env_dir.ospath,
"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": elsewhere.ospath,
},
):
cache_manager = self._make_cache_manager(config_str)
self.assertEqual(cache_manager.cache_directory, env_dir)

# Use the API to set the environment variable, making sure that the
# variable is reset on exit.
with unittest.mock.patch.dict(
os.environ,
{"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": ""},
):
defined, cache_dir = DatastoreCacheManager.set_fallback_cache_directory_if_unset()
self.assertTrue(defined)
cache_manager = self._make_cache_manager(config_str)
self.assertEqual(cache_manager.cache_directory, ResourcePath(cache_dir, forceDirectory=True))

# Now create the cache manager ahead of time and set the fallback
# later.
cache_manager = self._make_cache_manager(config_str)
self.assertIsNone(cache_manager._cache_directory)
with unittest.mock.patch.dict(
os.environ,
{"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": ""},
):
defined, cache_dir = DatastoreCacheManager.set_fallback_cache_directory_if_unset()
self.assertTrue(defined)
self.assertEqual(cache_manager.cache_directory, ResourcePath(cache_dir, forceDirectory=True))

def testExplicitCacheDir(self):
config_str = f"""
cached:
Expand Down