Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-39995: Fix concurrent.futures _ThreadWakeup #19760

Merged
merged 1 commit into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 27 additions & 14 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def _python_exit():
_global_shutdown = True
items = list(_threads_wakeups.items())
for _, thread_wakeup in items:
# call not protected by ProcessPoolExecutor._shutdown_lock
thread_wakeup.wakeup()
for t, _ in items:
t.join()
Expand Down Expand Up @@ -157,8 +158,10 @@ def __init__(self, work_id, fn, args, kwargs):

class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
thread_wakeup):
self.pending_work_items = pending_work_items
self.shutdown_lock = shutdown_lock
self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)

Expand All @@ -167,7 +170,8 @@ def _on_queue_feeder_error(self, e, obj):
tb = traceback.format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
self.thread_wakeup.wakeup()
with self.shutdown_lock:
self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this
# case, the executor_manager_thread fails all work_items
# with BrokenProcessPool
Expand Down Expand Up @@ -268,17 +272,21 @@ def __init__(self, executor):
# A _ThreadWakeup to allow waking up the queue_manager_thread from the
# main Thread and avoid deadlocks caused by permanently locked queues.
self.thread_wakeup = executor._executor_manager_thread_wakeup
self.shutdown_lock = executor._shutdown_lock

# A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
# to determine if the ProcessPoolExecutor has been garbage collected
# and that the manager can exit.
# When the executor gets garbage collected, the weakref callback
# will wake up the queue management thread so that it can terminate
# if there is no pending work item.
def weakref_cb(_, thread_wakeup=self.thread_wakeup):
def weakref_cb(_,
thread_wakeup=self.thread_wakeup,
shutdown_lock=self.shutdown_lock):
mp.util.debug('Executor collected: triggering callback for'
' QueueManager wakeup')
thread_wakeup.wakeup()
with shutdown_lock:
thread_wakeup.wakeup()

self.executor_reference = weakref.ref(executor, weakref_cb)

Expand Down Expand Up @@ -363,6 +371,7 @@ def wait_result_broken_or_wakeup(self):
# submitted, from the executor being shutdown/gc-ed, or from the
# shutdown of the python interpreter.
result_reader = self.result_queue._reader
assert not self.thread_wakeup._closed
wakeup_reader = self.thread_wakeup._reader
readers = [result_reader, wakeup_reader]
worker_sentinels = [p.sentinel for p in self.processes.values()]
Expand All @@ -380,7 +389,9 @@ def wait_result_broken_or_wakeup(self):

elif wakeup_reader in ready:
is_broken = False
self.thread_wakeup.clear()

with self.shutdown_lock:
Copy link
Contributor

@tomMoral tomMoral Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this lock is necessary as the goal is to protect against _ThreadWakeup.close which should only be called in the same thread.

self.thread_wakeup.clear()

return result_item, is_broken, cause

Expand Down Expand Up @@ -500,7 +511,8 @@ def join_executor_internals(self):
# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
self.thread_wakeup.close()
with self.shutdown_lock:
self.thread_wakeup.close()
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in self.processes.values():
Expand Down Expand Up @@ -619,6 +631,8 @@ def __init__(self, max_workers=None, mp_context=None,
# _result_queue to send wakeup signals to the executor_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
#
# _shutdown_lock must be locked to access _ThreadWakeup.
self._executor_manager_thread_wakeup = _ThreadWakeup()

# Create communication channels for the executor
Expand All @@ -629,6 +643,7 @@ def __init__(self, max_workers=None, mp_context=None,
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
shutdown_lock=self._shutdown_lock,
thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
Expand Down Expand Up @@ -718,12 +733,12 @@ def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._cancel_pending_futures = cancel_futures
self._shutdown_thread = True
if self._executor_manager_thread_wakeup is not None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced if self._executor_manager_thread: with if self._executor_manager_thread_wakeup is not None:. Or was the previous code written on purpose?

cc @pitrou @tomMoral

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the same as both are set to None in the same call to shutdown. (L.744/750)
This is more explicit this way so perfect.

# Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup()

if self._executor_manager_thread:
# Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup()
if wait:
self._executor_manager_thread.join()
if self._executor_manager_thread is not None and wait:
self._executor_manager_thread.join()
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
self._executor_manager_thread = None
Expand All @@ -732,8 +747,6 @@ def shutdown(self, wait=True, *, cancel_futures=False):
self._result_queue.close()
self._result_queue = None
self._processes = None

if self._executor_manager_thread_wakeup:
self._executor_manager_thread_wakeup = None
self._executor_manager_thread_wakeup = None

shutdown.__doc__ = _base.Executor.shutdown.__doc__
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix a race condition in concurrent.futures._ThreadWakeup: access to
_ThreadWakeup is now protected with the shutdown lock.