Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-90622: Prevent max_tasks_per_child use with a fork mp_context. #91587

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,11 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.

*max_tasks_per_child* is an optional argument that specifies the maximum
number of tasks a single process can execute before it will exit and be
replaced with a fresh worker process. The default *max_tasks_per_child* is
``None`` which means worker processes will live as long as the pool.
replaced with a fresh worker process. By default *max_tasks_per_child* is
``None`` which means worker processes will live as long as the pool. When
a max is specified, the "spawn" multiprocessing start method will be used by
default in absense of a *mp_context* parameter. This feature is incompatible
with the "fork" start method.

.. versionchanged:: 3.3
When one of the worker processes terminates abruptly, a
Expand Down
24 changes: 17 additions & 7 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,14 +614,16 @@ def __init__(self, max_workers=None, mp_context=None,
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
mp_context: A multiprocessing context to launch the workers. This
object should provide SimpleQueue, Queue and Process.
object should provide SimpleQueue, Queue and Process. Useful
to allow specific multiprocessing start methods.
initializer: A callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer.
max_tasks_per_child: The maximum number of tasks a worker process can
complete before it will exit and be replaced with a fresh
worker process, to enable unused resources to be freed. The
default value is None, which means worker process will live
as long as the executor will live.
max_tasks_per_child: The maximum number of tasks a worker process
can complete before it will exit and be replaced with a fresh
worker process. The default of None means worker process will
live as long as the executor. Requires a non-'fork' mp_context
start method. When given, we default to using 'spawn' if no
gpshead marked this conversation as resolved.
Show resolved Hide resolved
mp_context is supplied.
"""
_check_system_limits()

Expand All @@ -641,7 +643,10 @@ def __init__(self, max_workers=None, mp_context=None,
self._max_workers = max_workers

if mp_context is None:
mp_context = mp.get_context()
if max_tasks_per_child is not None:
mp_context = mp.get_context("spawn")
else:
mp_context = mp.get_context()
self._mp_context = mp_context

if initializer is not None and not callable(initializer):
Expand All @@ -654,6 +659,11 @@ def __init__(self, max_workers=None, mp_context=None,
raise TypeError("max_tasks_per_child must be an integer")
elif max_tasks_per_child <= 0:
raise ValueError("max_tasks_per_child must be >= 1")
if self._mp_context.get_start_method(allow_none=False) == "fork":
# https://github.com/python/cpython/issues/90622
raise ValueError("max_tasks_per_child is incompatible with"
" the 'fork' multiprocessing start method;"
" supply a different mp_context.")
self._max_tasks_per_child = max_tasks_per_child

# Management thread
Expand Down
18 changes: 16 additions & 2 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1039,10 +1039,15 @@ def test_idle_process_reuse_multiple(self):
executor.shutdown()

def test_max_tasks_per_child(self):
context = self.get_context()
if context.get_start_method(allow_none=False) == "fork":
with self.assertRaises(ValueError):
self.executor_type(1, mp_context=context, max_tasks_per_child=3)
return
# not using self.executor as we need to control construction.
# arguably this could go in another class w/o that mixin.
executor = self.executor_type(
1, mp_context=self.get_context(), max_tasks_per_child=3)
1, mp_context=context, max_tasks_per_child=3)
f1 = executor.submit(os.getpid)
original_pid = f1.result()
# The worker pid remains the same as the worker could be reused
Expand All @@ -1061,11 +1066,20 @@ def test_max_tasks_per_child(self):

executor.shutdown()

def test_max_tasks_per_child_defaults_to_spawn_context(self):
# not using self.executor as we need to control construction.
# arguably this could go in another class w/o that mixin.
executor = self.executor_type(1, max_tasks_per_child=3)
self.assertEqual(executor._mp_context.get_start_method(), "spawn")

def test_max_tasks_early_shutdown(self):
context = self.get_context()
if context.get_start_method(allow_none=False) == "fork":
raise unittest.SkipTest("Incompatible with the fork start method.")
# not using self.executor as we need to control construction.
# arguably this could go in another class w/o that mixin.
executor = self.executor_type(
3, mp_context=self.get_context(), max_tasks_per_child=1)
3, mp_context=context, max_tasks_per_child=1)
futures = []
for i in range(6):
futures.append(executor.submit(mul, i, i))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
In ``concurrent.futures.process.ProcessPoolExecutor`` disallow the "fork"
multiprocessing start method when the new ``max_tasks_per_child`` feature is
used as the mix of threads+fork can hang the child processes. Default to
using the safe "spawn" start method in that circumstance if no
``mp_context`` was supplied.