Skip to content

Commit

Permalink
MTN vendor joblib 0.11.0dev0
Browse files Browse the repository at this point in the history
  • Loading branch information
tomMoral committed Jun 22, 2018
1 parent addf05e commit 923d766
Show file tree
Hide file tree
Showing 30 changed files with 2,981 additions and 1,653 deletions.
49 changes: 21 additions & 28 deletions sklearn/externals/joblib/__init__.py
@@ -1,27 +1,25 @@
"""Joblib is a set of tools to provide **lightweight pipelining in
Python**. In particular, joblib offers:
Python**. In particular:
1. transparent disk-caching of the output values and lazy re-evaluation
1. transparent disk-caching of functions and lazy re-evaluation
(memoize pattern)
2. easy simple parallel computing
3. logging and tracing of the execution
Joblib is optimized to be **fast** and **robust** in particular on large
data and has specific optimizations for `numpy` arrays. It is
**BSD-licensed**.
========================= ================================================
**User documentation:** http://pythonhosted.org/joblib
==================== ===============================================
**Documentation:** http://pythonhosted.org/joblib
**Download packages:** http://pypi.python.org/pypi/joblib#downloads
**Download:** http://pypi.python.org/pypi/joblib#downloads
**Source code:** http://github.com/joblib/joblib
**Source code:** http://github.com/joblib/joblib
**Report issues:** http://github.com/joblib/joblib/issues
========================= ================================================
**Report issues:** http://github.com/joblib/joblib/issues
==================== ===============================================
Vision
Expand All @@ -43,9 +41,8 @@
good for resuming an application status or computational job, eg
after a crash.
Joblib strives to address these problems while **leaving your code and
your flow control as unmodified as possible** (no framework, no new
paradigms).
Joblib addresses these problems while **leaving your code and your flow
control as unmodified as possible** (no framework, no new paradigms).
Main features
------------------
Expand All @@ -59,16 +56,17 @@
computation to disk and rerun it only if necessary::
>>> from sklearn.externals.joblib import Memory
>>> mem = Memory(cachedir='/tmp/joblib')
>>> cachedir = 'your_cache_dir_goes_here'
>>> mem = Memory(cachedir)
>>> import numpy as np
>>> a = np.vander(np.arange(3)).astype(np.float)
>>> square = mem.cache(np.square)
>>> b = square(a) # doctest: +ELLIPSIS
________________________________________________________________________________
[Memory] Calling square...
square(array([[ 0., 0., 1.],
[ 1., 1., 1.],
[ 4., 2., 1.]]))
square(array([[0., 0., 1.],
[1., 1., 1.],
[4., 2., 1.]]))
___________________________________________________________square - 0...s, 0.0min
>>> c = square(a)
Expand All @@ -83,19 +81,12 @@
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
3) **Logging/tracing:** The different functionalities will
progressively acquire better logging mechanism to help track what
has been ran, and capture I/O easily. In addition, Joblib will
provide a few I/O primitives, to easily define logging and
display streams, and provide a way of compiling a report.
We want to be able to quickly inspect what has been run.
4) **Fast compressed Persistence**: a replacement for pickle to work
3) **Fast compressed Persistence**: a replacement for pickle to work
efficiently on Python objects containing large data (
*joblib.dump* & *joblib.load* ).
..
>>> import shutil ; shutil.rmtree('/tmp/joblib/')
>>> import shutil ; shutil.rmtree(cachedir)
"""

Expand All @@ -118,12 +109,13 @@
__version__ = '0.11.1.dev0'


from .memory import Memory, MemorizedResult
from .memory import Memory, MemorizedResult, register_store_backend
from .logger import PrintTime
from .logger import Logger
from .hashing import hash
from .numpy_pickle import dump
from .numpy_pickle import load
from .compressor import register_compressor
from .parallel import Parallel
from .parallel import delayed
from .parallel import cpu_count
Expand All @@ -134,4 +126,5 @@

__all__ = ['Memory', 'MemorizedResult', 'PrintTime', 'Logger', 'hash', 'dump',
'load', 'Parallel', 'delayed', 'cpu_count', 'effective_n_jobs',
'register_parallel_backend', 'parallel_backend']
'register_parallel_backend', 'parallel_backend',
'register_store_backend', 'register_compressor']
97 changes: 84 additions & 13 deletions sklearn/externals/joblib/_memmapping_reducer.py
Expand Up @@ -13,6 +13,8 @@
import atexit
import tempfile
import warnings
import weakref
from uuid import uuid4

try:
WindowsError
Expand All @@ -28,7 +30,7 @@
from pickle import loads
from pickle import dumps

from pickle import HIGHEST_PROTOCOL
from pickle import HIGHEST_PROTOCOL, PicklingError

try:
import numpy as np
Expand All @@ -38,20 +40,64 @@

from .numpy_pickle import load
from .numpy_pickle import dump
from .hashing import hash
from .backports import make_memmap
from .disk import delete_folder

# Some system have a ramdisk mounted by default, we can use it instead of /tmp
# as the default folder to dump big arrays to share with subprocesses
# as the default folder to dump big arrays to share with subprocesses.
SYSTEM_SHARED_MEM_FS = '/dev/shm'

# Minimal number of bytes available on SYSTEM_SHARED_MEM_FS to consider using
# it as the default folder to dump big arrays to share with subprocesses.
SYSTEM_SHARED_MEM_FS_MIN_SIZE = int(2e9)

# Folder and file permissions to chmod temporary files generated by the
# memmapping pool. Only the owner of the Python process can access the
# temporary files and folder.
FOLDER_PERMISSIONS = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
FILE_PERMISSIONS = stat.S_IRUSR | stat.S_IWUSR


class _WeakArrayKeyMap:
"""A variant of weakref.WeakKeyDictionary for unhashable numpy arrays.
This datastructure will be used with numpy arrays as obj keys, therefore we
do not use the __get__ / __set__ methods to avoid any conflict with the
numpy fancy indexing syntax.
"""

def __init__(self):
self._data = {}

def get(self, obj):
ref, val = self._data[id(obj)]
if ref() is not obj:
# In case of race condition with on_destroy: could never be
# triggered by the joblib tests with CPython.
raise KeyError(obj)
return val

def set(self, obj, value):
key = id(obj)
try:
ref, _ = self._data[key]
if ref() is not obj:
# In case of race condition with on_destroy: could never be
# triggered by the joblib tests with CPython.
raise KeyError(obj)
except KeyError:
# Insert the new entry in the mapping along with a weakref
# callback to automatically delete the entry from the mapping
# as soon as the object used as key is garbage collected.
def on_destroy(_):
del self._data[key]
ref = weakref.ref(obj, on_destroy)
self._data[key] = ref, value

def __getstate__(self):
raise PicklingError("_WeakArrayKeyMap is not pickleable")


###############################################################################
# Support for efficient transient pickling of numpy data structures

Expand Down Expand Up @@ -109,12 +155,18 @@ def _get_temp_dir(pool_folder_name, temp_folder=None):
if temp_folder is None:
if os.path.exists(SYSTEM_SHARED_MEM_FS):
try:
temp_folder = SYSTEM_SHARED_MEM_FS
pool_folder = os.path.join(temp_folder, pool_folder_name)
if not os.path.exists(pool_folder):
os.makedirs(pool_folder)
use_shared_mem = True
except IOError:
shm_stats = os.statvfs(SYSTEM_SHARED_MEM_FS)
available_nbytes = shm_stats.f_bsize * shm_stats.f_bavail
if available_nbytes > SYSTEM_SHARED_MEM_FS_MIN_SIZE:
# Try to see if we have write access to the shared mem
# folder only if it is reasonably large (that is 2GB or
# more).
temp_folder = SYSTEM_SHARED_MEM_FS
pool_folder = os.path.join(temp_folder, pool_folder_name)
if not os.path.exists(pool_folder):
os.makedirs(pool_folder)
use_shared_mem = True
except (IOError, OSError):
# Missing rights in the the /dev/shm partition,
# fallback to regular temp folder.
temp_folder = None
Expand Down Expand Up @@ -231,6 +283,19 @@ def __init__(self, max_nbytes, temp_folder, mmap_mode, verbose=0,
self._mmap_mode = mmap_mode
self.verbose = int(verbose)
self._prewarm = prewarm
self._memmaped_arrays = _WeakArrayKeyMap()

def __reduce__(self):
# The ArrayMemmapReducer is passed to the children processes: it needs
# to be pickled but the _WeakArrayKeyMap need to be skipped as it's
# only guaranteed to be consistent with the parent process memory
# garbage collection.
args = (self._max_nbytes, self._temp_folder, self._mmap_mode)
kwargs = {
'verbose': self.verbose,
'prewarm': self._prewarm,
}
return ArrayMemmapReducer, args, kwargs

def __call__(self, a):
m = _get_backing_memmap(a)
Expand All @@ -249,10 +314,16 @@ def __call__(self, a):
if e.errno != errno.EEXIST:
raise e

# Find a unique, concurrent safe filename for writing the
# content of this array only once.
basename = "{}-{}-{}.pkl".format(
os.getpid(), id(threading.current_thread()), hash(a))
try:
basename = self._memmaped_arrays.get(a)
except KeyError:
# Generate a new unique random filename. The process and thread
# ids are only useful for debugging purpose and to make it
# easier to cleanup orphaned files in case of hard process
# kill (e.g. by "kill -9" or segfault).
basename = "{}-{}-{}.pkl".format(
os.getpid(), id(threading.current_thread()), uuid4().hex)
self._memmaped_arrays.set(a, basename)
filename = os.path.join(self._temp_folder, basename)

# In case the same array with the same content is passed several
Expand Down

0 comments on commit 923d766

Please sign in to comment.