Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-31699 Deadlocks in concurrent.futures.ProcessPoolExecutor with pickling error #3895

Merged
merged 15 commits into from Jan 5, 2018
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
208 changes: 159 additions & 49 deletions Lib/concurrent/futures/process.py
Expand Up @@ -8,10 +8,10 @@
|======================= In-process =====================|== Out-of-process ==|

+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | => | | => | Call Q | => | |
| | +----------+ | | +-----------+ | |
| | | ... | | | | ... | | |
| | | 6 | | | | 5, call() | | |
| | => | Work Ids | | | | Call Q | | Process |
| | +----------+ | | +-----------+ | Pool |
| | | ... | | | | ... | +---------+
| | | 6 | => | | => | 5, call() | => | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
Expand Down Expand Up @@ -52,6 +52,7 @@
from queue import Full
import multiprocessing as mp
from multiprocessing.connection import wait
from multiprocessing.queues import Queue
import threading
import weakref
from functools import partial
Expand All @@ -72,16 +73,31 @@
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.

_threads_queues = weakref.WeakKeyDictionary()
_threads_wakeups = weakref.WeakKeyDictionary()
_global_shutdown = False


class _ThreadWakeup:
__slot__ = ["_state"]

def __init__(self):
self._reader, self._writer = mp.Pipe(duplex=False)

def wakeup(self):
self._writer.send_bytes(b"")

def clear(self):
while self._reader.poll():
self._reader.recv_bytes()


def _python_exit():
global _global_shutdown
_global_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
for t, q in items:
items = list(_threads_wakeups.items())
for _, thread_wakeup in items:
thread_wakeup.wakeup()
for t, _ in items:
t.join()

# Controls how many more calls than processes will be queued in the call queue.
Expand All @@ -90,6 +106,7 @@ def _python_exit():
# (Futures in the call queue cannot be cancelled).
EXTRA_QUEUED_CALLS = 1


# Hack to embed stringification of remote traceback in local traceback

class _RemoteTraceback(Exception):
Expand Down Expand Up @@ -132,6 +149,25 @@ def __init__(self, work_id, fn, args, kwargs):
self.kwargs = kwargs


class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
def __init__(self, max_size=0, *, ctx, pending_work_items):
self.pending_work_items = pending_work_items
super().__init__(max_size, ctx=ctx)

def _on_queue_feeder_error(self, e, obj):
if isinstance(obj, _CallItem):
tb = traceback.format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
# work_item can be None if another process terminated. In this case,
# the queue_manager_thread fails all work_items with BrokenProcessPool
if work_item is not None:
work_item.future.set_exception(e)
else:
super()._on_queue_feeder_error(e, obj)


def _get_chunks(*iterables, chunksize):
""" Iterates over zip()ed iterables in chunks. """
it = zip(*iterables)
Expand All @@ -152,6 +188,17 @@ def _process_chunk(fn, chunk):
"""
return [fn(*args) for args in chunk]


def _sendback_result(result_queue, work_id, result=None, exception=None):
"""Safely send back the given result or exception"""
try:
result_queue.put(_ResultItem(work_id, result=result,
exception=exception))
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(work_id, exception=exc))


def _process_worker(call_queue, result_queue, initializer, initargs):
"""Evaluates calls from call_queue and places the results in result_queue.

Expand Down Expand Up @@ -183,10 +230,9 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(call_item.work_id, exception=exc))
_sendback_result(result_queue, call_item.work_id, exception=exc)
else:
result_queue.put(_ResultItem(call_item.work_id,
result=r))
_sendback_result(result_queue, call_item.work_id, result=r)

# Liberate the resource as soon as possible, to avoid holding onto
# open files or shared memory that is not needed anymore
Expand Down Expand Up @@ -230,12 +276,14 @@ def _add_call_item_to_queue(pending_work_items,
del pending_work_items[work_id]
continue


def _queue_management_worker(executor_reference,
processes,
pending_work_items,
work_ids_queue,
call_queue,
result_queue):
result_queue,
thread_wakeup):
"""Manages the communication between this process and the worker processes.

This function is run in a local thread.
Expand All @@ -253,6 +301,9 @@ def _queue_management_worker(executor_reference,
derived from _WorkItems for processing by the process workers.
result_queue: A ctx.SimpleQueue of _ResultItems generated by the
process workers.
thread_wakeup: A _ThreadWakeup to allow waking up the
queue_manager_thread from the main Thread and avoid deadlocks
caused by permanently locked queues.
"""
executor = None

Expand All @@ -261,30 +312,59 @@ def shutting_down():
or executor._shutdown_thread)

def shutdown_worker():
# This is an upper bound
nb_children_alive = sum(p.is_alive() for p in processes.values())
for i in range(0, nb_children_alive):
call_queue.put_nowait(None)
# This is an upper bound on the number of children alive.
n_children_alive = sum(p.is_alive() for p in processes.values())
n_children_to_stop = n_children_alive
n_sentinels_sent = 0
# Send the right number of sentinels, to make sure all children are
# properly terminated.
while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
for i in range(n_children_to_stop - n_sentinels_sent):
try:
call_queue.put_nowait(None)
n_sentinels_sent += 1
except Full:
break
n_children_alive = sum(p.is_alive() for p in processes.values())

# Release the queue's resources as soon as possible.
call_queue.close()
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in processes.values():
p.join()

reader = result_queue._reader
result_reader = result_queue._reader
wakeup_reader = thread_wakeup._reader
readers = [result_reader, wakeup_reader]

while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)

sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)
if reader in ready:
result_item = reader.recv()
else:
# Wait for a result to be ready in the result_queue while checking
# that all worker processes are still running, or for a wake up
# signal send. The wake up signals come either from new tasks being
# submitted, from the executor being shutdown/gc-ed, or from the
# shutdown of the python interpreter.
worker_sentinels = [p.sentinel for p in processes.values()]
ready = wait(readers + worker_sentinels)

cause = None
is_broken = True
if result_reader in ready:
try:
result_item = result_reader.recv()
is_broken = False
except BaseException as e:
cause = traceback.format_exception(type(e), e, e.__traceback__)

elif wakeup_reader in ready:
is_broken = False
result_item = None
Copy link
Contributor

@ogrisel ogrisel Nov 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should always clear the wakeup_reader even if there is a result_item in the result queue:

cause = None
is_broken = True
result_item = None

if wakeup_reader in ready:
    thread_wakeup.clear()
    is_broken = False

if result_reader in ready:
    try:
        result_item = result_reader.recv()
        is_broken = False
    except BaseException as e:
        cause = traceback.format_exception(type(e), e, e.__traceback__)
        is_broken = True

...

Otherwise might not empty the wakeup pipe quickly enough in case of a large number of submissions of very short tasks.

thread_wakeup.clear()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed this is simpler :)

if is_broken:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
Expand All @@ -293,14 +373,15 @@ def shutdown_worker():
'usable anymore')
executor._shutdown_thread = True
executor = None
bpe = BrokenProcessPool("A process in the process pool was "
"terminated abruptly while the future was "
"running or pending.")
if cause is not None:
bpe.__cause__ = _RemoteTraceback(
f"\n'''\n{''.join(cause)}'''")
# All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"A process in the process pool was "
"terminated abruptly while the future was "
"running or pending."
))
work_item.future.set_exception(bpe)
# Delete references to object. See issue16284
del work_item
pending_work_items.clear()
Expand Down Expand Up @@ -329,6 +410,9 @@ def shutdown_worker():
work_item.future.set_result(result_item.result)
# Delete references to object. See issue16284
del work_item
# Delete reference to result_item
del result_item

# Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
Expand All @@ -348,8 +432,11 @@ def shutdown_worker():
pass
executor = None


_system_limits_checked = False
_system_limited = None


def _check_system_limits():
global _system_limits_checked, _system_limited
if _system_limits_checked:
Expand All @@ -369,7 +456,8 @@ def _check_system_limits():
# minimum number of semaphores available
# according to POSIX
return
_system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
_system_limited = ("system provides too few semaphores (%d"
" available, 256 necessary)" % nsems_max)
raise NotImplementedError(_system_limited)


Expand Down Expand Up @@ -415,6 +503,7 @@ def __init__(self, max_workers=None, mp_context=None,
raise ValueError("max_workers must be greater than 0")

self._max_workers = max_workers

if mp_context is None:
mp_context = mp.get_context()
self._mp_context = mp_context
Expand All @@ -424,34 +513,52 @@ def __init__(self, max_workers=None, mp_context=None,
self._initializer = initializer
self._initargs = initargs

# Management thread
self._queue_management_thread = None

# Map of pids to processes
self._processes = {}

# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
self._broken = False
self._queue_count = 0
self._pending_work_items = {}

# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
self._call_queue = mp_context.Queue(queue_size)
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
# Map of pids to processes
self._processes = {}

# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
# _ThreadWakeup is a communication channel used to interrupt the wait
# of the main loop of queue_manager_thread from another thread (e.g.
# when calling executor.submit or executor.shutdown). We do not use the
# _result_queue to send the wakeup signal to the queue_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice explanation :-)

self._queue_management_thread_wakeup = _ThreadWakeup()

def _start_queue_management_thread(self):
# When the executor gets lost, the weakref callback will wake up
# the queue management thread.
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None:
# When the executor gets garbarge collected, the weakref callback
# will wake up the queue management thread so that it can terminate
# if there is no pending work item.
def weakref_cb(_,
thread_wakeup=self._queue_management_thread_wakeup):
mp.util.debug('Executor collected: triggering callback for'
' QueueManager wakeup')
thread_wakeup.wakeup()
# Start the processes so that their sentinels are known.
self._adjust_process_count()
self._queue_management_thread = threading.Thread(
Expand All @@ -461,10 +568,13 @@ def weakref_cb(_, q=self._result_queue):
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue))
self._result_queue,
self._queue_management_thread_wakeup),
name="QueueManagerThread")
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue
_threads_wakeups[self._queue_management_thread] = \
self._queue_management_thread_wakeup

def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
Expand All @@ -491,7 +601,7 @@ def submit(self, fn, *args, **kwargs):
self._work_ids.put(self._queue_count)
self._queue_count += 1
# Wake up queue management thread
self._result_queue.put(None)
self._queue_management_thread_wakeup.wakeup()

self._start_queue_management_thread()
return f
Expand Down Expand Up @@ -531,7 +641,7 @@ def shutdown(self, wait=True):
self._shutdown_thread = True
if self._queue_management_thread:
# Wake up queue management thread
self._result_queue.put(None)
self._queue_management_thread_wakeup.wakeup()
if wait:
self._queue_management_thread.join()
# To reduce the risk of opening too many files, remove references to
Expand Down