Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give Memory.reduce_size() items_limit and age_limit options #1200

Merged
merged 13 commits into from
Jun 1, 2023
24 changes: 15 additions & 9 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ Latest changes
In development
--------------

- Ensure native byte order for memmap arrays in `joblib.load`.
- Ensure native byte order for memmap arrays in ``joblib.load``.
https://github.com/joblib/joblib/issues/1353

- Add ability to change default Parallel backend in tests by setting the
JOBLIB_TESTS_DEFAULT_PARALLEL_BACKEND environment variable.
``JOBLIB_TESTS_DEFAULT_PARALLEL_BACKEND`` environment variable.
https://github.com/joblib/joblib/pull/1356

- Fix temporary folder creation in `joblib.Parallel` on Linux subsystems on Windows
which do have `/dev/shm` but don't have the `os.statvfs` function
which do have `/dev/shm` but don't have the `os.statvfs` function
https://github.com/joblib/joblib/issues/1353

- Drop runtime dependency on ``distutils``. ``distutils`` is going away
Expand All @@ -24,7 +24,7 @@ In development

- A warning is raised when a pickling error occurs during caching operations.
In version 1.5, this warning will be turned into an error. For all other
errors, a new warning has been introduced: `joblib.memory.CacheWarning`.
errors, a new warning has been introduced: ``joblib.memory.CacheWarning``.
https://github.com/joblib/joblib/pull/1359

- Avoid (module, name) collisions when caching nested functions. This fix
Expand All @@ -40,12 +40,18 @@ In development
tracebacks and more efficient running time.
https://github.com/joblib/joblib/pull/1393

- Add the `parallel_config` context manager to allow for more fine-grained
- Add the ``parallel_config`` context manager to allow for more fine-grained
control over the backend configuration. It should be used in place of the
`parallel_backend` context manager. In particular, it has the advantage
``parallel_backend`` context manager. In particular, it has the advantage
of not requiring to set a specific backend in the context manager.
https://github.com/joblib/joblib/pull/1392

- Add ``items_limit`` and ``age_limit`` in :meth:`joblib.Memory.reduce_size`
to make it easy to limit the number of items and remove items that have
not been accessed for a long time in the cache.
https://github.com/joblib/joblib/pull/1200


Release 1.2.0
-------------

Expand All @@ -59,7 +65,7 @@ Release 1.2.0

- Avoid unnecessary warnings when workers and main process delete
the temporary memmap folder contents concurrently.
https://github.com/joblib/joblib/pull/1263
https://github.com/joblib/joblib/pull/1263

- Fix memory alignment bug for pickles containing numpy arrays.
This is especially important when loading the pickle with
Expand Down Expand Up @@ -118,7 +124,7 @@ Release 1.0.1

- Add check_call_in_cache method to check cache without calling function.
https://github.com/joblib/joblib/pull/820

- dask: avoid redundant scattering of large arguments to make a more
efficient use of the network resources and avoid crashing dask with
"OSError: [Errno 55] No buffer space available"
Expand All @@ -134,7 +140,7 @@ Release 1.0.0
or a third party library involved in the cached values definition is
upgraded. In particular, users updating `joblib` to a release that includes
this fix will see their previous cache invalidated if they contained
reference to `numpy` objects.
reference to `numpy` objects.
https://github.com/joblib/joblib/pull/1136

- Remove deprecated `check_pickle` argument in `delayed`.
Expand Down
50 changes: 42 additions & 8 deletions joblib/_store_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,15 @@ def clear(self):
"""Clear the whole store content."""
self.clear_location(self.location)

def reduce_store_size(self, bytes_limit):
"""Reduce store size to keep it under the given bytes limit."""
items_to_delete = self._get_items_to_delete(bytes_limit)
def enforce_store_limits(
self, bytes_limit, items_limit=None, age_limit=None
):
"""
Remove the store's oldest files to enforce item, byte, and age limits.
"""
items_to_delete = self._get_items_to_delete(
bytes_limit, items_limit, age_limit
)

for item in items_to_delete:
if self.verbose > 10:
Expand All @@ -310,16 +316,38 @@ def reduce_store_size(self, bytes_limit):
# the folder already.
pass

def _get_items_to_delete(self, bytes_limit):
"""Get items to delete to keep the store under a size limit."""
def _get_items_to_delete(
self, bytes_limit, items_limit=None, age_limit=None
):
"""
Get items to delete to keep the store under size, file, & age limits.
"""
if isinstance(bytes_limit, str):
bytes_limit = memstr_to_bytes(bytes_limit)

items = self.get_items()
size = sum(item.size for item in items)

to_delete_size = size - bytes_limit
if to_delete_size < 0:
if bytes_limit is not None:
to_delete_size = size - bytes_limit
else:
to_delete_size = 0

if items_limit is not None:
to_delete_items = len(items) - items_limit
else:
to_delete_items = 0

if age_limit is not None:
older_item = min(item.last_access for item in items)
deadline = datetime.datetime.now() - age_limit
else:
deadline = None

if (
to_delete_size <= 0 and to_delete_items <= 0
and (deadline is None or older_item > deadline)
):
return []

# We want to delete first the cache items that were accessed a
Expand All @@ -328,13 +356,19 @@ def _get_items_to_delete(self, bytes_limit):

items_to_delete = []
size_so_far = 0
items_so_far = 0

for item in items:
if size_so_far > to_delete_size:
if (
(size_so_far >= to_delete_size)
and items_so_far >= to_delete_items
and (deadline is None or deadline < item.last_access)
):
break

items_to_delete.append(item)
size_so_far += item.size
items_so_far += 1

return items_to_delete

Expand Down
53 changes: 46 additions & 7 deletions joblib/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,10 +926,12 @@
Verbosity flag, controls the debug messages that are issued
as functions are evaluated.

bytes_limit: int, optional
bytes_limit: int | str, optional
Limit in bytes of the size of the cache. By default, the size of
the cache is unlimited. When reducing the size of the cache,
``joblib`` keeps the most recently accessed items first.
``joblib`` keeps the most recently accessed items first. If a
str is passed, it is converted to a number of bytes using units
{ K | M | G} for kilo, mega, giga.

**Note:** You need to call :meth:`joblib.Memory.reduce_size` to
actually reduce the cache size to be less than ``bytes_limit``.
Expand Down Expand Up @@ -1022,16 +1024,53 @@
if self.store_backend is not None:
self.store_backend.clear()

# As the cache in completely clear, make sure the _FUNCTION_HASHES
# As the cache is completely clear, make sure the _FUNCTION_HASHES
# cache is also reset. Else, for a function that is present in this
# table, results cached after this clear will be have cache miss
# as the function code is not re-written.
_FUNCTION_HASHES.clear()

def reduce_size(self):
"""Remove cache elements to make cache size fit in ``bytes_limit``."""
if self.bytes_limit is not None and self.store_backend is not None:
self.store_backend.reduce_store_size(self.bytes_limit)
def reduce_size(self, bytes_limit=None, items_limit=None, age_limit=None):
"""Remove cache elements to make the cache fit its limits.

The limitation can impose that the cache size fits in ``bytes_limit``,
that the number of cache items is no more than ``items_limit``, and
that all files in cache are not older than ``age_limit``.

Parameters
----------
bytes_limit: int | str, optional
Limit in bytes of the size of the cache. By default, the size of
the cache is unlimited. When reducing the size of the cache,
``joblib`` keeps the most recently accessed items first. If a
str is passed, it is converted to a number of bytes using units
{ K | M | G} for kilo, mega, giga.

items_limit: int, optional
Number of items to limit the cache to. By default, the number of
items in the cache is unlimited. When reducing the size of the
cache, ``joblib`` keeps the most recently accessed items first.

age_limit: datetime.timedelta, optional
Maximum age of items to limit the cache to. When reducing the size
of the cache, any items last accessed more than the given length of
time ago are deleted.
"""
if bytes_limit is None:
bytes_limit = self.bytes_limit

if self.store_backend is None:
# No cached results, this function does nothing.
return

Check warning on line 1064 in joblib/memory.py

View check run for this annotation

Codecov / codecov/patch

joblib/memory.py#L1064

Added line #L1064 was not covered by tests

if bytes_limit is None and items_limit is None and age_limit is None:
# No limitation to impose, returning
return

# Defers the actual limits enforcing to the store backend.
self.store_backend.enforce_store_limits(
bytes_limit, items_limit, age_limit
)

def eval(self, func, *args, **kwargs):
""" Eval function func with arguments `*args` and `**kwargs`,
Expand Down
60 changes: 59 additions & 1 deletion joblib/test/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ def test__get_items_to_delete(tmpdir):
min(ci.last_access for ci in surviving_items))


def test_memory_reduce_size(tmpdir):
def test_memory_reduce_size_bytes_limit(tmpdir):
memory, _, _ = _setup_toy_cache(tmpdir)
ref_cache_items = memory.store_backend.get_items()

Expand Down Expand Up @@ -973,6 +973,64 @@ def test_memory_reduce_size(tmpdir):
assert cache_items == []


def test_memory_reduce_size_items_limit(tmpdir):
memory, _, _ = _setup_toy_cache(tmpdir)
ref_cache_items = memory.store_backend.get_items()

# By default reduce_size is a noop
memory.reduce_size()
cache_items = memory.store_backend.get_items()
assert sorted(ref_cache_items) == sorted(cache_items)

# No cache items deleted if items_limit greater than the size of
# the cache
memory.reduce_size(items_limit=10)
cache_items = memory.store_backend.get_items()
assert sorted(ref_cache_items) == sorted(cache_items)

# items_limit is set so that only two cache items are kept
memory.reduce_size(items_limit=2)
cache_items = memory.store_backend.get_items()
assert set.issubset(set(cache_items), set(ref_cache_items))
assert len(cache_items) == 2

# bytes_limit set so that no cache item is kept
memory.reduce_size(items_limit=0)
cache_items = memory.store_backend.get_items()
assert cache_items == []


def test_memory_reduce_size_age_limit(tmpdir):
import time
import datetime
memory, _, put_cache = _setup_toy_cache(tmpdir)
ref_cache_items = memory.store_backend.get_items()

# By default reduce_size is a noop
memory.reduce_size()
cache_items = memory.store_backend.get_items()
assert sorted(ref_cache_items) == sorted(cache_items)

# No cache items deleted if age_limit big.
memory.reduce_size(age_limit=datetime.timedelta(days=1))
cache_items = memory.store_backend.get_items()
assert sorted(ref_cache_items) == sorted(cache_items)

# age_limit is set so that only two cache items are kept
time.sleep(1)
put_cache(-1)
put_cache(-2)
memory.reduce_size(age_limit=datetime.timedelta(seconds=1))
cache_items = memory.store_backend.get_items()
assert not set.issubset(set(cache_items), set(ref_cache_items))
assert len(cache_items) == 2

# age_limit set so that no cache item is kept
memory.reduce_size(age_limit=datetime.timedelta(seconds=0))
cache_items = memory.store_backend.get_items()
assert cache_items == []


def test_memory_clear(tmpdir):
memory, _, g = _setup_toy_cache(tmpdir)
memory.clear()
Expand Down