From 96c8e1cd6dd7ee7b374e318a64fdf28db7ba140e Mon Sep 17 00:00:00 2001 From: Olivier Grisel Date: Mon, 13 Sep 2021 09:27:33 +0200 Subject: [PATCH] Vendor up (#1218) --- joblib/externals/cloudpickle/__init__.py | 2 +- joblib/externals/cloudpickle/cloudpickle.py | 195 +++++++++++++++--- .../externals/cloudpickle/cloudpickle_fast.py | 69 +++++-- joblib/externals/loky/__init__.py | 2 +- .../loky/backend/popen_loky_win32.py | 2 +- joblib/externals/loky/backend/queues.py | 2 +- .../loky/backend/resource_tracker.py | 2 +- joblib/externals/loky/backend/spawn.py | 2 +- joblib/externals/loky/backend/synchronize.py | 34 +-- joblib/externals/loky/backend/utils.py | 2 +- joblib/externals/loky/initializers.py | 78 +++++++ joblib/externals/loky/process_executor.py | 40 ++-- 12 files changed, 349 insertions(+), 81 deletions(-) create mode 100644 joblib/externals/loky/initializers.py diff --git a/joblib/externals/cloudpickle/__init__.py b/joblib/externals/cloudpickle/__init__.py index f461d65e9..c00537770 100644 --- a/joblib/externals/cloudpickle/__init__.py +++ b/joblib/externals/cloudpickle/__init__.py @@ -8,4 +8,4 @@ # expose their Pickler subclass at top-level under the "Pickler" name. Pickler = CloudPickler -__version__ = '1.6.0' +__version__ = '2.0.0' diff --git a/joblib/externals/cloudpickle/cloudpickle.py b/joblib/externals/cloudpickle/cloudpickle.py index 05d52afa0..347b38695 100644 --- a/joblib/externals/cloudpickle/cloudpickle.py +++ b/joblib/externals/cloudpickle/cloudpickle.py @@ -55,6 +55,7 @@ import warnings from .compat import pickle +from collections import OrderedDict from typing import Generic, Union, Tuple, Callable from pickle import _getattribute from importlib._bootstrap import _find_spec @@ -87,8 +88,11 @@ def g(): # communication speed over compatibility: DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL +# Names of modules whose resources should be treated as dynamic. +_PICKLE_BY_VALUE_MODULES = set() + # Track the provenance of reconstructed dynamic classes to make it possible to -# recontruct instances from the matching singleton class definition when +# reconstruct instances from the matching singleton class definition when # appropriate and preserve the usual "isinstance" semantics of Python objects. _DYNAMIC_CLASS_TRACKER_BY_CLASS = weakref.WeakKeyDictionary() _DYNAMIC_CLASS_TRACKER_BY_ID = weakref.WeakValueDictionary() @@ -123,6 +127,77 @@ def _lookup_class_or_track(class_tracker_id, class_def): return class_def +def register_pickle_by_value(module): + """Register a module to make it functions and classes picklable by value. + + By default, functions and classes that are attributes of an importable + module are to be pickled by reference, that is relying on re-importing + the attribute from the module at load time. + + If `register_pickle_by_value(module)` is called, all its functions and + classes are subsequently to be pickled by value, meaning that they can + be loaded in Python processes where the module is not importable. + + This is especially useful when developing a module in a distributed + execution environment: restarting the client Python process with the new + source code is enough: there is no need to re-install the new version + of the module on all the worker nodes nor to restart the workers. + + Note: this feature is considered experimental. See the cloudpickle + README.md file for more details and limitations. + """ + if not isinstance(module, types.ModuleType): + raise ValueError( + f"Input should be a module object, got {str(module)} instead" + ) + # In the future, cloudpickle may need a way to access any module registered + # for pickling by value in order to introspect relative imports inside + # functions pickled by value. (see + # https://github.com/cloudpipe/cloudpickle/pull/417#issuecomment-873684633). + # This access can be ensured by checking that module is present in + # sys.modules at registering time and assuming that it will still be in + # there when accessed during pickling. Another alternative would be to + # store a weakref to the module. Even though cloudpickle does not implement + # this introspection yet, in order to avoid a possible breaking change + # later, we still enforce the presence of module inside sys.modules. + if module.__name__ not in sys.modules: + raise ValueError( + f"{module} was not imported correctly, have you used an " + f"`import` statement to access it?" + ) + _PICKLE_BY_VALUE_MODULES.add(module.__name__) + + +def unregister_pickle_by_value(module): + """Unregister that the input module should be pickled by value.""" + if not isinstance(module, types.ModuleType): + raise ValueError( + f"Input should be a module object, got {str(module)} instead" + ) + if module.__name__ not in _PICKLE_BY_VALUE_MODULES: + raise ValueError(f"{module} is not registered for pickle by value") + else: + _PICKLE_BY_VALUE_MODULES.remove(module.__name__) + + +def list_registry_pickle_by_value(): + return _PICKLE_BY_VALUE_MODULES.copy() + + +def _is_registered_pickle_by_value(module): + module_name = module.__name__ + if module_name in _PICKLE_BY_VALUE_MODULES: + return True + while True: + parent_name = module_name.rsplit(".", 1)[0] + if parent_name == module_name: + break + if parent_name in _PICKLE_BY_VALUE_MODULES: + return True + module_name = parent_name + return False + + def _whichmodule(obj, name): """Find the module an object belongs to. @@ -136,11 +211,14 @@ def _whichmodule(obj, name): # Workaround bug in old Python versions: prior to Python 3.7, # T.__module__ would always be set to "typing" even when the TypeVar T # would be defined in a different module. - # - # For such older Python versions, we ignore the __module__ attribute of - # TypeVar instances and instead exhaustively lookup those instances in - # all currently imported modules. - module_name = None + if name is not None and getattr(typing, name, None) is obj: + # Built-in TypeVar defined in typing such as AnyStr + return 'typing' + else: + # User defined or third-party TypeVar: __module__ attribute is + # irrelevant, thus trigger a exhaustive search for obj in all + # modules. + module_name = None else: module_name = getattr(obj, '__module__', None) @@ -166,18 +244,35 @@ def _whichmodule(obj, name): return None -def _is_importable(obj, name=None): - """Dispatcher utility to test the importability of various constructs.""" - if isinstance(obj, types.FunctionType): - return _lookup_module_and_qualname(obj, name=name) is not None - elif issubclass(type(obj), type): - return _lookup_module_and_qualname(obj, name=name) is not None +def _should_pickle_by_reference(obj, name=None): + """Test whether an function or a class should be pickled by reference + + Pickling by reference means by that the object (typically a function or a + class) is an attribute of a module that is assumed to be importable in the + target Python environment. Loading will therefore rely on importing the + module and then calling `getattr` on it to access the function or class. + + Pickling by reference is the only option to pickle functions and classes + in the standard library. In cloudpickle the alternative option is to + pickle by value (for instance for interactively or locally defined + functions and classes or for attributes of modules that have been + explicitly registered to be pickled by value. + """ + if isinstance(obj, types.FunctionType) or issubclass(type(obj), type): + module_and_name = _lookup_module_and_qualname(obj, name=name) + if module_and_name is None: + return False + module, name = module_and_name + return not _is_registered_pickle_by_value(module) + elif isinstance(obj, types.ModuleType): # We assume that sys.modules is primarily used as a cache mechanism for # the Python import machinery. Checking if a module has been added in - # is sys.modules therefore a cheap and simple heuristic to tell us whether - # we can assume that a given module could be imported by name in - # another Python process. + # is sys.modules therefore a cheap and simple heuristic to tell us + # whether we can assume that a given module could be imported by name + # in another Python process. + if _is_registered_pickle_by_value(obj): + return False return obj.__name__ in sys.modules else: raise TypeError( @@ -233,10 +328,13 @@ def _extract_code_globals(co): out_names = _extract_code_globals_cache.get(co) if out_names is None: names = co.co_names - out_names = {names[oparg] for _, oparg in _walk_global_ops(co)} + # We use a dict with None values instead of a set to get a + # deterministic order (assuming Python 3.6+) and avoid introducing + # non-deterministic pickle bytes as a results. + out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)} # Declaring a function inside another one using the "def ..." - # syntax generates a constant code object corresonding to the one + # syntax generates a constant code object corresponding to the one # of the nested function's As the nested function may itself need # global variables, we need to introspect its code, extract its # globals, (look for code object in it's co_consts attribute..) and @@ -244,7 +342,7 @@ def _extract_code_globals(co): if co.co_consts: for const in co.co_consts: if isinstance(const, types.CodeType): - out_names |= _extract_code_globals(const) + out_names.update(_extract_code_globals(const)) _extract_code_globals_cache[co] = out_names @@ -452,15 +550,31 @@ def _extract_class_dict(cls): if sys.version_info[:2] < (3, 7): # pragma: no branch def _is_parametrized_type_hint(obj): - # This is very cheap but might generate false positives. + # This is very cheap but might generate false positives. So try to + # narrow it down is good as possible. + type_module = getattr(type(obj), '__module__', None) + from_typing_extensions = type_module == 'typing_extensions' + from_typing = type_module == 'typing' + # general typing Constructs is_typing = getattr(obj, '__origin__', None) is not None # typing_extensions.Literal - is_litteral = getattr(obj, '__values__', None) is not None + is_literal = ( + (getattr(obj, '__values__', None) is not None) + and from_typing_extensions + ) # typing_extensions.Final - is_final = getattr(obj, '__type__', None) is not None + is_final = ( + (getattr(obj, '__type__', None) is not None) + and from_typing_extensions + ) + + # typing.ClassVar + is_classvar = ( + (getattr(obj, '__type__', None) is not None) and from_typing + ) # typing.Union/Tuple for old Python 3.5 is_union = getattr(obj, '__union_params__', None) is not None @@ -469,8 +583,8 @@ def _is_parametrized_type_hint(obj): getattr(obj, '__result__', None) is not None and getattr(obj, '__args__', None) is not None ) - return any((is_typing, is_litteral, is_final, is_union, is_tuple, - is_callable)) + return any((is_typing, is_literal, is_final, is_classvar, is_union, + is_tuple, is_callable)) def _create_parametrized_type_hint(origin, args): return origin[args] @@ -557,8 +671,11 @@ def _rebuild_tornado_coroutine(func): loads = pickle.loads -# hack for __import__ not working as desired def subimport(name): + # We cannot do simply: `return __import__(name)`: Indeed, if ``name`` is + # the name of a submodule, __import__ will return the top-level root module + # of this submodule. For instance, __import__('os.path') returns the `os` + # module. __import__(name) return sys.modules[name] @@ -699,7 +816,7 @@ def _make_skel_func(code, cell_count, base_globals=None): """ # This function is deprecated and should be removed in cloudpickle 1.7 warnings.warn( - "A pickle file created using an old (<=1.4.1) version of cloudpicke " + "A pickle file created using an old (<=1.4.1) version of cloudpickle " "is currently being loaded. This is not supported by cloudpickle and " "will break in cloudpickle 1.7", category=UserWarning ) @@ -813,10 +930,15 @@ def _decompose_typevar(obj): def _typevar_reduce(obj): - # TypeVar instances have no __qualname__ hence we pass the name explicitly. + # TypeVar instances require the module information hence why we + # are not using the _should_pickle_by_reference directly module_and_name = _lookup_module_and_qualname(obj, name=obj.__name__) + if module_and_name is None: return (_make_typevar, _decompose_typevar(obj)) + elif _is_registered_pickle_by_value(module_and_name[0]): + return (_make_typevar, _decompose_typevar(obj)) + return (getattr, module_and_name) @@ -830,13 +952,22 @@ def _get_bases(typ): return getattr(typ, bases_attr) -def _make_dict_keys(obj): - return dict.fromkeys(obj).keys() +def _make_dict_keys(obj, is_ordered=False): + if is_ordered: + return OrderedDict.fromkeys(obj).keys() + else: + return dict.fromkeys(obj).keys() -def _make_dict_values(obj): - return {i: _ for i, _ in enumerate(obj)}.values() +def _make_dict_values(obj, is_ordered=False): + if is_ordered: + return OrderedDict((i, _) for i, _ in enumerate(obj)).values() + else: + return {i: _ for i, _ in enumerate(obj)}.values() -def _make_dict_items(obj): - return obj.items() +def _make_dict_items(obj, is_ordered=False): + if is_ordered: + return OrderedDict(obj).items() + else: + return obj.items() diff --git a/joblib/externals/cloudpickle/cloudpickle_fast.py b/joblib/externals/cloudpickle/cloudpickle_fast.py index fa8da0f63..6db059eb8 100644 --- a/joblib/externals/cloudpickle/cloudpickle_fast.py +++ b/joblib/externals/cloudpickle/cloudpickle_fast.py @@ -6,7 +6,7 @@ is only available for Python versions 3.8+, a lot of backward-compatibility code is also removed. -Note that the C Pickler sublassing API is CPython-specific. Therefore, some +Note that the C Pickler subclassing API is CPython-specific. Therefore, some guards present in cloudpickle.py that were written to handle PyPy specificities are not present in cloudpickle_fast.py """ @@ -23,12 +23,12 @@ import typing from enum import Enum -from collections import ChainMap +from collections import ChainMap, OrderedDict from .compat import pickle, Pickler from .cloudpickle import ( _extract_code_globals, _BUILTIN_TYPE_NAMES, DEFAULT_PROTOCOL, - _find_imported_submodules, _get_cell_contents, _is_importable, + _find_imported_submodules, _get_cell_contents, _should_pickle_by_reference, _builtin_type, _get_or_create_tracker_id, _make_skeleton_class, _make_skeleton_enum, _extract_class_dict, dynamic_subimport, subimport, _typevar_reduce, _get_bases, _make_cell, _make_empty_cell, CellType, @@ -180,7 +180,7 @@ def _class_getstate(obj): clsdict.pop('__weakref__', None) if issubclass(type(obj), abc.ABCMeta): - # If obj is an instance of an ABCMeta subclass, dont pickle the + # If obj is an instance of an ABCMeta subclass, don't pickle the # cache/negative caches populated during isinstance/issubclass # checks, but pickle the list of registered subclasses of obj. clsdict.pop('_abc_cache', None) @@ -244,7 +244,19 @@ def _enum_getstate(obj): def _code_reduce(obj): """codeobject reducer""" - if hasattr(obj, "co_posonlyargcount"): # pragma: no branch + if hasattr(obj, "co_linetable"): # pragma: no branch + # Python 3.10 and later: obj.co_lnotab is deprecated and constructor + # expects obj.co_linetable instead. + args = ( + obj.co_argcount, obj.co_posonlyargcount, + obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, + obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, + obj.co_varnames, obj.co_filename, obj.co_name, + obj.co_firstlineno, obj.co_linetable, obj.co_freevars, + obj.co_cellvars + ) + elif hasattr(obj, "co_posonlyargcount"): + # Backward compat for 3.9 and older args = ( obj.co_argcount, obj.co_posonlyargcount, obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, @@ -254,6 +266,7 @@ def _code_reduce(obj): obj.co_cellvars ) else: + # Backward compat for even older versions of Python args = ( obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code, obj.co_consts, @@ -339,11 +352,16 @@ def _memoryview_reduce(obj): def _module_reduce(obj): - if _is_importable(obj): + if _should_pickle_by_reference(obj): return subimport, (obj.__name__,) else: - obj.__dict__.pop('__builtins__', None) - return dynamic_subimport, (obj.__name__, vars(obj)) + # Some external libraries can populate the "__builtins__" entry of a + # module's `__dict__` with unpicklable objects (see #316). For that + # reason, we do not attempt to pickle the "__builtins__" entry, and + # restore a default value for it at unpickling time. + state = obj.__dict__.copy() + state.pop('__builtins__', None) + return dynamic_subimport, (obj.__name__, state) def _method_reduce(obj): @@ -396,7 +414,7 @@ def _class_reduce(obj): return type, (NotImplemented,) elif obj in _BUILTIN_TYPE_NAMES: return _builtin_type, (_BUILTIN_TYPE_NAMES[obj],) - elif not _is_importable(obj): + elif not _should_pickle_by_reference(obj): return _dynamic_class_reduce(obj) return NotImplemented @@ -419,6 +437,24 @@ def _dict_items_reduce(obj): return _make_dict_items, (dict(obj), ) +def _odict_keys_reduce(obj): + # Safer not to ship the full dict as sending the rest might + # be unintended and could potentially cause leaking of + # sensitive information + return _make_dict_keys, (list(obj), True) + + +def _odict_values_reduce(obj): + # Safer not to ship the full dict as sending the rest might + # be unintended and could potentially cause leaking of + # sensitive information + return _make_dict_values, (list(obj), True) + + +def _odict_items_reduce(obj): + return _make_dict_items, (dict(obj), True) + + # COLLECTIONS OF OBJECTS STATE SETTERS # ------------------------------------ # state setters are called at unpickling time, once the object is created and @@ -426,7 +462,7 @@ def _dict_items_reduce(obj): def _function_setstate(obj, state): - """Update the state of a dynaamic function. + """Update the state of a dynamic function. As __closure__ and __globals__ are readonly attributes of a function, we cannot rely on the native setstate routine of pickle.load_build, that calls @@ -495,6 +531,9 @@ class CloudPickler(Pickler): _dispatch_table[_collections_abc.dict_keys] = _dict_keys_reduce _dispatch_table[_collections_abc.dict_values] = _dict_values_reduce _dispatch_table[_collections_abc.dict_items] = _dict_items_reduce + _dispatch_table[type(OrderedDict().keys())] = _odict_keys_reduce + _dispatch_table[type(OrderedDict().values())] = _odict_values_reduce + _dispatch_table[type(OrderedDict().items())] = _odict_items_reduce dispatch_table = ChainMap(_dispatch_table, copyreg.dispatch_table) @@ -520,7 +559,7 @@ def _function_reduce(self, obj): As opposed to cloudpickle.py, There no special handling for builtin pypy functions because cloudpickle_fast is CPython-specific. """ - if _is_importable(obj): + if _should_pickle_by_reference(obj): return NotImplemented else: return self._dynamic_function_reduce(obj) @@ -579,7 +618,7 @@ def dump(self, obj): # `dispatch` attribute. Earlier versions of the protocol 5 CloudPickler # used `CloudPickler.dispatch` as a class-level attribute storing all # reducers implemented by cloudpickle, but the attribute name was not a - # great choice given the meaning of `Cloudpickler.dispatch` when + # great choice given the meaning of `CloudPickler.dispatch` when # `CloudPickler` extends the pure-python pickler. dispatch = dispatch_table @@ -653,7 +692,7 @@ def reducer_override(self, obj): return self._function_reduce(obj) else: # fallback to save_global, including the Pickler's - # distpatch_table + # dispatch_table return NotImplemented else: @@ -724,7 +763,7 @@ def save_global(self, obj, name=None, pack=struct.pack): ) elif name is not None: Pickler.save_global(self, obj, name=name) - elif not _is_importable(obj, name=name): + elif not _should_pickle_by_reference(obj, name=name): self._save_reduce_pickle5(*_dynamic_class_reduce(obj), obj=obj) else: Pickler.save_global(self, obj, name=name) @@ -736,7 +775,7 @@ def save_function(self, obj, name=None): Determines what kind of function obj is (e.g. lambda, defined at interactive prompt, etc) and handles the pickling appropriately. """ - if _is_importable(obj, name=name): + if _should_pickle_by_reference(obj, name=name): return Pickler.save_global(self, obj, name=name) elif PYPY and isinstance(obj.__code__, builtin_code_type): return self.save_pypy_builtin_func(obj) diff --git a/joblib/externals/loky/__init__.py b/joblib/externals/loky/__init__.py index 21f3bb6b9..d46e59f41 100644 --- a/joblib/externals/loky/__init__.py +++ b/joblib/externals/loky/__init__.py @@ -22,4 +22,4 @@ "wrap_non_picklable_objects", "set_loky_pickler"] -__version__ = '2.9.0' +__version__ = '3.0.0' diff --git a/joblib/externals/loky/backend/popen_loky_win32.py b/joblib/externals/loky/backend/popen_loky_win32.py index 523bd078c..e8c4fd070 100644 --- a/joblib/externals/loky/backend/popen_loky_win32.py +++ b/joblib/externals/loky/backend/popen_loky_win32.py @@ -170,4 +170,4 @@ def main(): from_parent.close() exitcode = self._bootstrap() - exit(exitcode) + sys.exit(exitcode) diff --git a/joblib/externals/loky/backend/queues.py b/joblib/externals/loky/backend/queues.py index 567146429..62735db3a 100644 --- a/joblib/externals/loky/backend/queues.py +++ b/joblib/externals/loky/backend/queues.py @@ -209,7 +209,7 @@ def __init__(self, reducers=None, ctx=None): else: self._wlock = ctx.Lock() - # Add possibility to use custom reducers + # Add possiblity to use custom reducers self._reducers = reducers def close(self): diff --git a/joblib/externals/loky/backend/resource_tracker.py b/joblib/externals/loky/backend/resource_tracker.py index 48e49be51..95dff35d0 100644 --- a/joblib/externals/loky/backend/resource_tracker.py +++ b/joblib/externals/loky/backend/resource_tracker.py @@ -36,7 +36,7 @@ # Note that this behavior differs from CPython's resource_tracker, which only # implements list of shared resources, and not a proper refcounting scheme. # Also, CPython's resource tracker will only attempt to cleanup those shared -# resources once all procsses connected to the resource tracker have exited. +# resources once all procsses connected to the resouce tracker have exited. import os diff --git a/joblib/externals/loky/backend/spawn.py b/joblib/externals/loky/backend/spawn.py index d4a6a8c9e..2a16c844b 100644 --- a/joblib/externals/loky/backend/spawn.py +++ b/joblib/externals/loky/backend/spawn.py @@ -105,7 +105,7 @@ def get_preparation_data(name, init_main_module=True): _resource_tracker as mp_resource_tracker ) # multiprocessing's resource_tracker must be running before loky - # process is created (otherwise the child won't be able to use it if it + # process is created (othewise the child won't be able to use it if it # is created later on) mp_resource_tracker.ensure_running() d["mp_tracker_args"] = { diff --git a/joblib/externals/loky/backend/synchronize.py b/joblib/externals/loky/backend/synchronize.py index 592de3c02..6db77b0de 100644 --- a/joblib/externals/loky/backend/synchronize.py +++ b/joblib/externals/loky/backend/synchronize.py @@ -60,23 +60,29 @@ class SemLock(object): _rand = tempfile._RandomNameSequence() - def __init__(self, kind, value, maxvalue): + def __init__(self, kind, value, maxvalue, name=None): # unlink_now is only used on win32 or when we are using fork. unlink_now = False - for i in range(100): - try: - self._semlock = _SemLock( - kind, value, maxvalue, SemLock._make_name(), - unlink_now) - except FileExistsError: # pragma: no cover - pass - else: - break - else: # pragma: no cover - raise FileExistsError('cannot find name for semaphore') - + if name is None: + # Try to find an unused name for the SemLock instance. + for i in range(100): + try: + self._semlock = _SemLock( + kind, value, maxvalue, SemLock._make_name(), unlink_now + ) + except FileExistsError: # pragma: no cover + pass + else: + break + else: # pragma: no cover + raise FileExistsError('cannot find name for semaphore') + else: + self._semlock = _SemLock( + kind, value, maxvalue, name, unlink_now + ) + self.name = name util.debug('created semlock with handle %s and name "%s"' - % (self._semlock.handle, self._semlock.name)) + % (self._semlock.handle, self.name)) self._make_methods() diff --git a/joblib/externals/loky/backend/utils.py b/joblib/externals/loky/backend/utils.py index 5342204c0..dc1b82af2 100644 --- a/joblib/externals/loky/backend/utils.py +++ b/joblib/externals/loky/backend/utils.py @@ -117,7 +117,7 @@ def _recursive_terminate(pid): def get_exitcodes_terminated_worker(processes): - """Return a formatted string with the exitcodes of terminated workers. + """Return a formated string with the exitcodes of terminated workers. If necessary, wait (up to .25s) for the system to correctly set the exitcode of one terminated worker. diff --git a/joblib/externals/loky/initializers.py b/joblib/externals/loky/initializers.py new file mode 100644 index 000000000..b734c29e7 --- /dev/null +++ b/joblib/externals/loky/initializers.py @@ -0,0 +1,78 @@ +import warnings + + +def _viztracer_init(init_kwargs): + """Initialize viztracer's profiler in worker processes""" + from viztracer import VizTracer + tracer = VizTracer(**init_kwargs) + tracer.register_exit() + tracer.start() + + +def _make_viztracer_initializer_and_initargs(): + try: + import viztracer + tracer = viztracer.get_tracer() + if tracer is not None and getattr(tracer, 'enable', False): + # Profiler is active: introspect its configuration to + # initialize the workers with the same configuration. + return _viztracer_init, (tracer.init_kwargs,) + except ImportError: + # viztracer is not installed: nothing to do + pass + except Exception as e: + # In case viztracer's API evolve, we do not want to crash loky but + # we want to know about it to be able to update loky. + warnings.warn("Unable to introspect viztracer state: {}" + .format(e)) + return None, () + + +class _ChainedInitializer(): + """Compound worker initializer + + This is meant to be used in conjunction with _chain_initializers to + produce the necessary chained_args list to be passed to __call__. + """ + + def __init__(self, initializers): + self._initializers = initializers + + def __call__(self, *chained_args): + for initializer, args in zip(self._initializers, chained_args): + initializer(*args) + + +def _chain_initializers(initializer_and_args): + """Convenience helper to combine a sequence of initializers. + + If some initializers are None, they are filtered out. + """ + filtered_initializers = [] + filtered_initargs = [] + for initializer, initargs in initializer_and_args: + if initializer is not None: + filtered_initializers.append(initializer) + filtered_initargs.append(initargs) + + if len(filtered_initializers) == 0: + return None, () + elif len(filtered_initializers) == 1: + return filtered_initializers[0], filtered_initargs[0] + else: + return _ChainedInitializer(filtered_initializers), filtered_initargs + + +def _prepare_initializer(initializer, initargs): + if initializer is not None and not callable(initializer): + raise TypeError( + "initializer must be a callable, got: {!r}" + .format(initializer) + ) + + # Introspect runtime to determine if we need to propagate the viztracer + # profiler information to the workers: + return _chain_initializers([ + (initializer, initargs), + _make_viztracer_initializer_and_initargs(), + ]) diff --git a/joblib/externals/loky/process_executor.py b/joblib/externals/loky/process_executor.py index 41e4a2b57..3737ecf51 100644 --- a/joblib/externals/loky/process_executor.py +++ b/joblib/externals/loky/process_executor.py @@ -82,6 +82,7 @@ from .backend.queues import Queue, SimpleQueue from .backend.reduction import set_loky_pickler, get_loky_pickler_name from .backend.utils import recursive_terminate, get_exitcodes_terminated_worker +from .initializers import _prepare_initializer try: from concurrent.futures.process import BrokenProcessPool as _BPPException @@ -115,7 +116,9 @@ def _get_memory_usage(pid, force_gc=False): if force_gc: gc.collect() - return Process(pid).memory_info().rss + mem_size = Process(pid).memory_info().rss + mp.util.debug('psutil return memory size: {}'.format(mem_size)) + return mem_size except ImportError: _USE_PSUTIL = False @@ -421,11 +424,13 @@ def _process_worker(call_queue, result_queue, initializer, initargs, # If we cannot format correctly the exception, at least print # the traceback. print(previous_tb) + mp.util.debug('Exiting with code 1') sys.exit(1) if call_item is None: # Notify queue management thread about clean worker shutdown result_queue.put(pid) with worker_exit_lock: + mp.util.debug('Exited cleanly') return try: r = call_item() @@ -467,6 +472,7 @@ def _process_worker(call_queue, result_queue, initializer, initargs, mp.util.info("Memory leak detected: shutting down worker") result_queue.put(pid) with worker_exit_lock: + mp.util.debug('Exit due to memory leak') return else: # if psutil is not installed, trigger gc.collect events @@ -542,7 +548,9 @@ def weakref_cb(_, # of new processes or shut down self.processes_management_lock = executor._processes_management_lock - super(_ExecutorManagerThread, self).__init__() + super(_ExecutorManagerThread, self).__init__( + name="ExecutorManagerThread" + ) if sys.version_info < (3, 9): self.daemon = True @@ -645,7 +653,13 @@ def wait_result_broken_or_wakeup(self): # unstable, therefore they are not appended in the exception # message. exit_codes = "\nThe exit codes of the workers are {}".format( - get_exitcodes_terminated_worker(self.processes)) + get_exitcodes_terminated_worker(self.processes) + ) + mp.util.debug('A worker unexpectedly terminated. Workers that ' + 'might have caused the breakage: ' + + str({p.name: p.exitcode + for p in list(self.processes.values()) + if p is not None and p.sentinel in ready})) bpe = TerminatedWorkerError( "A worker process managed by the executor was unexpectedly " "terminated. This could be caused by a segmentation fault " @@ -733,7 +747,7 @@ def terminate_broken(self, bpe): # Terminate remaining workers forcibly: the queues or their # locks may be in a dirty state and block forever. - self.kill_workers() + self.kill_workers(reason="broken executor") # clean up resources self.join_executor_internals() @@ -753,15 +767,16 @@ def flag_executor_shutting_down(self): del work_item # Kill the remaining worker forcibly to no waste time joining them - self.kill_workers() + self.kill_workers(reason="executor shutting down") - def kill_workers(self): + def kill_workers(self, reason=''): # Terminate the remaining workers using SIGKILL. This function also # terminates descendant workers of the children in case there is some # nested parallelism. while self.processes: _, p = self.processes.popitem() - mp.util.debug('terminate process {}'.format(p.name)) + mp.util.debug("terminate process {}, reason: {}" + .format(p.name, reason)) try: recursive_terminate(p) except ProcessLookupError: # pragma: no cover @@ -964,11 +979,9 @@ def __init__(self, max_workers=None, job_reducers=None, self._context = context self._env = env - if initializer is not None and not callable(initializer): - raise TypeError("initializer must be a callable") - self._initializer = initializer - self._initargs = initargs - + self._initializer, self._initargs = _prepare_initializer( + initializer, initargs + ) _check_max_depth(self._context) if result_reducers is None: @@ -1152,7 +1165,8 @@ def map(self, fn, *iterables, **kwargs): results = super(ProcessPoolExecutor, self).map( partial(_process_chunk, fn), _get_chunks(chunksize, *iterables), - timeout=timeout) + timeout=timeout + ) return _chain_from_iterable_of_lists(results) def shutdown(self, wait=True, kill_workers=False):