Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the resource_tracker to unlink shared temporary memmaps #966

Merged
merged 81 commits into from
May 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
e91e716
try to remove preparation code in failing example
pierreglaser Mar 30, 2020
431d4c3
remove pickling workarounds related to old sphinx-gallery
pierreglaser Mar 30, 2020
550d66e
Merge remote-tracking branch 'upstream/master'
pierreglaser Apr 3, 2020
d83b6d1
Merge branch 'master' of github.com:joblib/joblib
pierreglaser Apr 22, 2020
7366062
vendor loky 2.6.1
pierreglaser Feb 21, 2020
10ffeca
add the new file_plus_plus resource type
pierreglaser Feb 21, 2020
c13a157
TST add a new test reproducing the crash in #806
pierreglaser Dec 4, 2019
4a077b8
FIX delegate memmap cleanup to resource_tracker
pierreglaser Dec 4, 2019
832438f
TST WIP add initial reproducer of #806 as a test
pierreglaser Dec 5, 2019
ee67f94
MNT port vendor_loky.sh over to osx and windows
pierreglaser Dec 22, 2019
f6d75bb
FIX mature refcounting of memmaps
pierreglaser Dec 23, 2019
363c69b
ENH start re-writting delete_folder
pierreglaser Dec 23, 2019
4614a98
temporary changes of the tests
pierreglaser Feb 21, 2020
7461081
limit race possibility in file_plus_plus_unlink
pierreglaser Feb 25, 2020
85b92e3
log using mp.util.debug instead of print
pierreglaser Feb 25, 2020
abf7336
start tracking temporary memmaps
pierreglaser Feb 25, 2020
de43355
test cleanup + adapt current tests to new changes
pierreglaser Feb 25, 2020
cd7f274
python2/3 compat
pierreglaser Feb 25, 2020
6f8b1be
use weakref.ref instead of finalize (python2 compatible)
pierreglaser Feb 25, 2020
87aeedf
unused imports
pierreglaser Feb 25, 2020
73ba88b
fix loky vendoring issues
pierreglaser Feb 25, 2020
127bb2a
unused imports
pierreglaser Feb 25, 2020
f67676e
run new tests for all multiprocessing backends
pierreglaser Feb 25, 2020
f0de5fe
give time to a linux-only test to cleanup
pierreglaser Feb 25, 2020
b46a0da
fix assertion in prev test
pierreglaser Feb 25, 2020
92e0837
weakref python2/3 compat + increase rtracker robustness
pierreglaser Feb 26, 2020
9d3ce9e
unused imports
pierreglaser Feb 26, 2020
7bb5f8f
Include reproducer of #942
pierreglaser Feb 26, 2020
7cd1113
disable memmaping for multiprocessing backend
pierreglaser Feb 26, 2020
19f33fb
treat memmap as files, restore delete_folder logic
pierreglaser Mar 6, 2020
47ae7f9
add an allow_non_empty option to delete_folder
pierreglaser Mar 6, 2020
edcbe83
serialize temporary memmaps as np.array in childs
pierreglaser Mar 6, 2020
5e1313c
make flake8 happy
pierreglaser Mar 6, 2020
bdb9388
unused imports
pierreglaser Mar 6, 2020
575a9cf
restore memory mapping in multiprocessing backends
pierreglaser Mar 6, 2020
6ed5068
linting
pierreglaser Mar 6, 2020
4b4be4e
improve the reproducer (spawn-compatibility)
pierreglaser Mar 6, 2020
18cdfbd
add new testutils file
pierreglaser Mar 6, 2020
082754a
some more if..main guards in tests
pierreglaser Mar 6, 2020
01dbb94
limit potential race condition in a test
pierreglaser Mar 6, 2020
e4339e9
fix a bug making children still sending memmaps
pierreglaser Mar 6, 2020
6cf0deb
remove duplicated function
pierreglaser Mar 7, 2020
9774a40
unused imports
pierreglaser Mar 7, 2020
2b41215
catch for OSError when delerting folder at exit
pierreglaser Mar 7, 2020
cc1c95d
refactor test and add some more
pierreglaser Mar 7, 2020
c02f1f2
various fixes/cleanups/test enhancements
pierreglaser Mar 7, 2020
3364c25
Apply suggestions from code review
pierreglaser Mar 10, 2020
a7613a9
Update joblib/test/test_memmapping.py
ogrisel Mar 11, 2020
5de7b4d
separate backward/forward reducers logic for ndarrays
pierreglaser Mar 30, 2020
e8a9ea5
remove unused imports
pierreglaser Mar 30, 2020
a68d0cf
remove stale usage of JOBLIB_MEMMAPS
pierreglaser Mar 30, 2020
f56e16f
fixup! remove stale usage of JOBLIB_MEMMAPS
pierreglaser Mar 31, 2020
a69a7aa
put resource management logic in a separate mixin
pierreglaser Mar 31, 2020
b5d35da
mock TemporaryResourcesManager when no multiprocessing
pierreglaser Mar 31, 2020
c6f222f
track_memmap_in_child -> unlink_on_gc_collect
pierreglaser Mar 31, 2020
9d91108
remove reduce_memmap (unused function)
pierreglaser Mar 31, 2020
18f264a
reformulate a comment
pierreglaser Mar 31, 2020
f85eafd
don't pass kwargs as args in shutil.rmtree cal
pierreglaser Mar 31, 2020
a670ece
echo an informative error message in delete_folder
pierreglaser Mar 31, 2020
7a52c64
correct a few typos in a nearby comment
pierreglaser Mar 31, 2020
b68c573
mention unlink_on_gc_collect in make_memmap doc
pierreglaser Mar 31, 2020
3bbee3d
rename crash->print_filename_and_raise
pierreglaser Mar 31, 2020
0f4220e
document what test_child...no_resource_leak tests
pierreglaser Mar 31, 2020
a7f4965
test_permission_error: make sure stdout is empty
pierreglaser Mar 31, 2020
e2e3b1e
rename testutils.test_data -> return_slice_of_data
pierreglaser Mar 31, 2020
bc0cd49
rephrase comment in reduce_array_memmap_backward
pierreglaser Mar 31, 2020
38256dd
remove vendor_loky.ps1 (to be discussed)
pierreglaser Mar 31, 2020
e161ba5
remove Python 2 weakref compat trick
pierreglaser Apr 22, 2020
c4e75d6
unused import
pierreglaser Apr 22, 2020
2c8e9ac
add some pytest pragmas
pierreglaser Apr 22, 2020
152c9d8
add allow_empty arg to try_delete_folder
pierreglaser Apr 23, 2020
120d2ad
maximum explicitness when calling delete_folder
pierreglaser Apr 23, 2020
62669d3
Parallel fails: delete temp files after __call__
pierreglaser Apr 23, 2020
8303c92
add a missing with_multiprocessing pragma
pierreglaser Apr 23, 2020
24cf682
minor touch-ups
pierreglaser Apr 23, 2020
0f3c229
test this PR against windows + Python3.6
pierreglaser Apr 23, 2020
0e967c0
test against Python 3.7 + windows
pierreglaser Apr 23, 2020
31f74f3
retry sensitive tests many times in test suite
pierreglaser Apr 24, 2020
39428f6
repeat the sensitive test 50 (!) times
pierreglaser Apr 24, 2020
375897c
remove debug code
pierreglaser Apr 27, 2020
ce67429
Merge branch 'master' into memmap_reference_counting
ogrisel May 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ jobs:
imageName: "vs2017-win2016"
PYTHON_VERSION: "3.8"
EXTRA_CONDA_PACKAGES: "numpy=1.18"
windows_py36_no_numpy:
imageName: "vs2017-win2016"
PYTHON_VERSION: "3.6"

macos_py38:
imageName: "macos-10.14"
Expand Down
201 changes: 151 additions & 50 deletions joblib/_memmapping_reducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import warnings
import weakref
from uuid import uuid4
from multiprocessing import util

from pickle import whichmodule, loads, dumps, HIGHEST_PROTOCOL, PicklingError

Expand All @@ -29,10 +30,10 @@
except ImportError:
np = None

from .numpy_pickle import load
from .numpy_pickle import dump
from .numpy_pickle import dump, load, load_temporary_memmap
from .backports import make_memmap
from .disk import delete_folder
from .externals.loky.backend import resource_tracker

# Some system have a ramdisk mounted by default, we can use it instead of /tmp
# as the default folder to dump big arrays to share with subprocesses.
Expand All @@ -48,6 +49,29 @@
FOLDER_PERMISSIONS = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
FILE_PERMISSIONS = stat.S_IRUSR | stat.S_IWUSR

# Set used in joblib workers, referencing the filenames of temporary memmaps
# created by joblib to speed up data communication. In child processes, we add
# a finalizer to these memmaps that sends a maybe_unlink call to the
# resource_tracker, in order to free main memory as fast as possible.
JOBLIB_MMAPS = set()


def _log_and_unlink(filename):
from .externals.loky.backend.resource_tracker import _resource_tracker
util.debug(
"[FINALIZER CALL] object mapping to {} about to be deleted,"
" decrementing the refcount of the file (pid: {})".format(
os.path.basename(filename), os.getpid()))
_resource_tracker.maybe_unlink(filename, "file")


def add_maybe_unlink_finalizer(memmap):
util.debug(
"[FINALIZER ADD] adding finalizer to {} (id {}, filename {}, pid {})"
"".format(type(memmap), id(memmap), os.path.basename(memmap.filename),
os.getpid()))
weakref.finalize(memmap, _log_and_unlink, memmap.filename)


class _WeakArrayKeyMap:
"""A variant of weakref.WeakKeyDictionary for unhashable numpy arrays.
Expand Down Expand Up @@ -175,21 +199,25 @@ def has_shareable_memory(a):


def _strided_from_memmap(filename, dtype, mode, offset, order, shape, strides,
total_buffer_len):
total_buffer_len, unlink_on_gc_collect):
"""Reconstruct an array view on a memory mapped file."""
if mode == 'w+':
# Do not zero the original data when unpickling
mode = 'r+'

if strides is None:
# Simple, contiguous memmap
return make_memmap(filename, dtype=dtype, shape=shape, mode=mode,
offset=offset, order=order)
return make_memmap(
filename, dtype=dtype, shape=shape, mode=mode, offset=offset,
order=order, unlink_on_gc_collect=unlink_on_gc_collect
)
else:
# For non-contiguous data, memmap the total enclosing buffer and then
# extract the non-contiguous view with the stride-tricks API
base = make_memmap(filename, dtype=dtype, shape=total_buffer_len,
mode=mode, offset=offset, order=order)
base = make_memmap(
filename, dtype=dtype, shape=total_buffer_len, offset=offset,
mode=mode, order=order, unlink_on_gc_collect=unlink_on_gc_collect
)
return as_strided(base, shape=shape, strides=strides)


Expand All @@ -201,6 +229,8 @@ def _reduce_memmap_backed(a, m):
attribute ancestry of a. ``m.base`` should be the real python mmap object.
"""
# offset that comes from the striding differences between a and m
util.debug('[MEMMAP REDUCE] reducing a memmap-backed array '
'(shape, {}, pid: {})'.format(a.shape, os.getpid()))
a_start, a_end = np.byte_bounds(a)
m_start = np.byte_bounds(m)[0]
offset = a_start - m_start
Expand All @@ -224,26 +254,32 @@ def _reduce_memmap_backed(a, m):
# view will be extracted.
strides = a.strides
total_buffer_len = (a_end - a_start) // a.itemsize

return (_strided_from_memmap,
(m.filename, a.dtype, m.mode, offset, order, a.shape, strides,
total_buffer_len))
total_buffer_len, False))


def reduce_memmap(a):
"""Pickle the descriptors of a memmap instance to reopen on same file."""
def reduce_array_memmap_backward(a):
"""reduce a np.array or a np.memmap from a child process"""
m = _get_backing_memmap(a)
if m is not None:
# m is a real mmap backed memmap instance, reduce a preserving striding
# information
if isinstance(m, np.memmap) and m.filename not in JOBLIB_MMAPS:
# if a is backed by a memmaped file, reconstruct a using the
# memmaped file.
return _reduce_memmap_backed(a, m)
else:
# This memmap instance is actually backed by a regular in-memory
# buffer: this can happen when using binary operators on numpy.memmap
# instances
return (loads, (dumps(np.asarray(a), protocol=HIGHEST_PROTOCOL),))


class ArrayMemmapReducer(object):
# a is either a regular (not memmap-backed) numpy array, or an array
# backed by a shared temporary file created by joblib. In the latter
# case, in order to limit the lifespan of these temporary files, we
# serialize the memmap as a regular numpy array, and decref the
# file backing the memmap (done implicitly in a previously registered
# finalizer, see ``unlink_on_gc_collect`` for more details)
return (
loads, (dumps(np.asarray(a), protocol=HIGHEST_PROTOCOL), )
)


class ArrayMemmapForwardReducer(object):
"""Reducer callable to dump large arrays to memmap files.

Parameters
Expand All @@ -267,26 +303,29 @@ class ArrayMemmapReducer(object):
same data array is passed to different worker processes.
"""

def __init__(self, max_nbytes, temp_folder, mmap_mode, verbose=0,
prewarm=True):
def __init__(self, max_nbytes, temp_folder, mmap_mode,
unlink_on_gc_collect, verbose=0, prewarm=True):
self._max_nbytes = max_nbytes
self._temp_folder = temp_folder
self._mmap_mode = mmap_mode
self.verbose = int(verbose)
self._prewarm = prewarm
self._memmaped_arrays = _WeakArrayKeyMap()
self._temporary_memmaped_filenames = set()
self._unlink_on_gc_collect = unlink_on_gc_collect

def __reduce__(self):
# The ArrayMemmapReducer is passed to the children processes: it needs
# to be pickled but the _WeakArrayKeyMap need to be skipped as it's
# only guaranteed to be consistent with the parent process memory
# The ArrayMemmapForwardReducer is passed to the children processes: it
# needs to be pickled but the _WeakArrayKeyMap need to be skipped as
# it's only guaranteed to be consistent with the parent process memory
# garbage collection.
args = (self._max_nbytes, self._temp_folder, self._mmap_mode)
args = (self._max_nbytes, self._temp_folder, self._mmap_mode,
self._unlink_on_gc_collect)
kwargs = {
'verbose': self.verbose,
'prewarm': self._prewarm,
}
return ArrayMemmapReducer, args, kwargs
return ArrayMemmapForwardReducer, args, kwargs

def __call__(self, a):
m = _get_backing_memmap(a)
Expand Down Expand Up @@ -320,13 +359,39 @@ def __call__(self, a):
# In case the same array with the same content is passed several
# times to the pool subprocess children, serialize it only once

# XXX: implement an explicit reference counting scheme to make it
# possible to delete temporary files as soon as the workers are
# done processing this data.
is_new_memmap = filename not in self._temporary_memmaped_filenames

# add the memmap to the list of temporary memmaps created by joblib
self._temporary_memmaped_filenames.add(filename)

if self._unlink_on_gc_collect:
# Bump reference count of the memmap by 1 to account for
# shared usage of the memmap by a child process. The
# corresponding decref call will be executed upon calling
# resource_tracker.maybe_unlink, registered as a finalizer in
# the child.
# the incref/decref calls here are only possible when the child
# and the parent share the same resource_tracker. It is not the
# case for the multiprocessing backend, but it does not matter
# because unlinking a memmap from a child process is only
# useful to control the memory usage of long-lasting child
# processes, while the multiprocessing-based pools terminate
# their workers at the end of a map() call.
resource_tracker.register(filename, "file")

if is_new_memmap:
# Incref each temporary memmap created by joblib one extra
# time. This means that these memmaps will only be deleted
# once an extra maybe_unlink() is called, which is done once
# all the jobs have completed (or been canceled) in the
# Parallel._terminate_backend() method.
resource_tracker.register(filename, "file")

if not os.path.exists(filename):
if self.verbose > 0:
print("Memmapping (shape={}, dtype={}) to new file {}"
.format(a.shape, a.dtype, filename))
util.debug(
"[ARRAY DUMP] Pickling new array (shape={}, dtype={}) "
"creating a new memmap".format(
a.shape, a.dtype, os.path.basename(filename)))
for dumped_filename in dump(a, filename):
os.chmod(dumped_filename, FILE_PERMISSIONS)

Expand All @@ -337,25 +402,31 @@ def __call__(self, a):
# concurrent memmap creation in multiple children
# processes.
load(filename, mmap_mode=self._mmap_mode).max()
elif self.verbose > 1:
print("Memmapping (shape={}, dtype={}) to old file {}"
.format(a.shape, a.dtype, filename))

else:
util.debug(
"[ARRAY DUMP] Pickling known array (shape={}, dtype={}) "
"reusing memmap file: {}".format(
a.shape, a.dtype, os.path.basename(filename)))

# The worker process will use joblib.load to memmap the data
return (load, (filename, self._mmap_mode))
return (
(load_temporary_memmap, (filename, self._mmap_mode,
self._unlink_on_gc_collect))
)
else:
# do not convert a into memmap, let pickler do its usual copy with
# the default system pickler
if self.verbose > 1:
print("Pickling array (shape={}, dtype={})."
.format(a.shape, a.dtype))
util.debug(
'[ARRAY DUMP] Pickling array (NO MEMMAPPING) (shape={}, '
' dtype={}).'.format(a.shape, a.dtype))
return (loads, (dumps(a, protocol=HIGHEST_PROTOCOL),))


def get_memmapping_reducers(
pool_id, forward_reducers=None, backward_reducers=None,
temp_folder=None, max_nbytes=1e6, mmap_mode='r', verbose=0,
prewarm=False, **kwargs):
prewarm=False, unlink_on_gc_collect=True, **kwargs):
"""Construct a pair of memmapping reducer linked to a tmpdir.

This function manage the creation and the clean up of the temporary folders
Expand All @@ -381,6 +452,7 @@ def get_memmapping_reducers(
# the pool instance and related file handler resources such as POSIX
# semaphores and pipes
pool_module_name = whichmodule(delete_folder, 'delete_folder')
resource_tracker.register(pool_folder, "folder")

def _cleanup():
# In some cases the Python runtime seems to set delete_folder to
Expand All @@ -394,8 +466,9 @@ def _cleanup():
delete_folder = __import__(
pool_module_name, fromlist=['delete_folder']).delete_folder
try:
delete_folder(pool_folder)
except WindowsError:
delete_folder(pool_folder, allow_non_empty=True)
resource_tracker.unregister(pool_folder, "folder")
except OSError:
warnings.warn("Failed to clean temporary folder: {}"
.format(pool_folder))

Expand All @@ -407,19 +480,47 @@ def _cleanup():
# arrays over the max_nbytes threshold
if prewarm == "auto":
prewarm = not use_shared_mem
forward_reduce_ndarray = ArrayMemmapReducer(
max_nbytes, pool_folder, mmap_mode, verbose,
forward_reduce_ndarray = ArrayMemmapForwardReducer(
max_nbytes, pool_folder, mmap_mode, unlink_on_gc_collect, verbose,
prewarm=prewarm)
forward_reducers[np.ndarray] = forward_reduce_ndarray
forward_reducers[np.memmap] = reduce_memmap
forward_reducers[np.memmap] = forward_reduce_ndarray

# Communication from child process to the parent process always
# pickles in-memory numpy.ndarray without dumping them as memmap
# to avoid confusing the caller and make it tricky to collect the
# temporary folder
backward_reduce_ndarray = ArrayMemmapReducer(
None, pool_folder, mmap_mode, verbose)
backward_reducers[np.ndarray] = backward_reduce_ndarray
backward_reducers[np.memmap] = reduce_memmap
backward_reducers[np.ndarray] = reduce_array_memmap_backward
backward_reducers[np.memmap] = reduce_array_memmap_backward

return forward_reducers, backward_reducers, pool_folder


class TemporaryResourcesManagerMixin(object):
def _unlink_temporary_resources(self):
"""Unlink temporary resources created by a process-based pool"""
if os.path.exists(self._temp_folder):
for filename in os.listdir(self._temp_folder):
resource_tracker.maybe_unlink(
os.path.join(self._temp_folder, filename), "file"
)
# XXX: calling shutil.rmtree inside delete_folder is likely to
# cause a race condition with the lines above.
self._try_delete_folder(allow_non_empty=False)

def _unregister_temporary_resources(self):
"""Unregister temporary resources created by a process-based pool"""
if os.path.exists(self._temp_folder):
for filename in os.listdir(self._temp_folder):
resource_tracker.unregister(
os.path.join(self._temp_folder, filename), "file"
)

def _try_delete_folder(self, allow_non_empty):
try:
delete_folder(self._temp_folder, allow_non_empty=allow_non_empty)
except OSError:
# Temporary folder cannot be deleted right now. No need to
# handle it though, as this folder will be cleaned up by an
# atexit finalizer registered by the memmapping_reducer.
pass