Skip to content

Commit

Permalink
pythongh-109917, pythongh-105829: Fix concurrent.futures _ThreadWakeu…
Browse files Browse the repository at this point in the history
…p.wakeup()

Replace test_gh105829_should_not_deadlock_if_wakeup_pipe_full() test
which was mocking too many concurrent.futures internals with a new
test_wakeup() functional test.

Co-Authored-By: elfstrom <elfstrom@users.noreply.github.com>
  • Loading branch information
vstinner and elfstrom committed Sep 30, 2023
1 parent cbdacc7 commit 1aa24ee
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 81 deletions.
32 changes: 23 additions & 9 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,43 @@


class _ThreadWakeup:
# Constant overriden by tests to make them faster
_wakeup_message = b'x'

def __init__(self):
self._closed = False
self._reader, self._writer = mp.Pipe(duplex=False)
self._awaken = False

def close(self):
# Please note that we do not take the shutdown lock when
# calling clear() (to avoid deadlocking) so this method can
# only be called safely from the same thread as all calls to
# clear() even if you hold the shutdown lock. Otherwise we
# might try to read from the closed pipe.
if not self._closed:
self._closed = True
self._writer.close()
self._reader.close()
if self._closed:
return
self._closed = True
self._writer.close()
self._reader.close()

def wakeup(self):
if not self._closed:
self._writer.send_bytes(b"")
if self._closed:
return
if self._awaken:
# gh-105829: Send a single message to not block if the pipe is
# full. wait_result_broken_or_wakeup() ignores the message anyway,
# it just calls clear().
return
self._awaken = True
self._writer.send_bytes(self._wakeup_message)

def clear(self):
if not self._closed:
while self._reader.poll():
self._reader.recv_bytes()
if self._closed:
return
while self._reader.poll():
self._reader.recv_bytes()
self._awaken = False


def _python_exit():
Expand Down
75 changes: 3 additions & 72 deletions Lib/test/test_concurrent_futures/test_deadlock.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import contextlib
import queue
import signal
import sys
import time
import unittest
Expand Down Expand Up @@ -203,7 +201,7 @@ def test_shutdown_deadlock(self):
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
self.executor = executor # Allow clean up in _fail_on_deadlock
f = executor.submit(_crash, delay=.1)
executor.shutdown(wait=True)
with self.assertRaises(BrokenProcessPool):
Expand All @@ -216,7 +214,7 @@ def test_shutdown_deadlock_pickle(self):
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
self.executor = executor # Allow clean up in _fail_on_deadlock

# Start the executor and get the executor_manager_thread to collect
# the threads and avoid dangling thread that should be cleaned up
Expand Down Expand Up @@ -244,79 +242,12 @@ def test_crash_big_data(self):
data = "a" * support.PIPE_MAX_SIZE
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
self.executor = executor # Allow clean up in _fail_on_deadlock
with self.assertRaises(BrokenProcessPool):
list(executor.map(_crash_with_data, [data] * 10))

executor.shutdown(wait=True)

def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
# Issue #105829: The _ExecutorManagerThread wakeup pipe could
# fill up and block. See: https://github.com/python/cpython/issues/105829

# Lots of cargo culting while writing this test, apologies if
# something is really stupid...

self.executor.shutdown(wait=True)

if not hasattr(signal, 'alarm'):
raise unittest.SkipTest(
"Tested platform does not support the alarm signal")

def timeout(_signum, _frame):
import faulthandler
faulthandler.dump_traceback()

raise RuntimeError("timed out while submitting jobs?")

thread_run = futures.process._ExecutorManagerThread.run
def mock_run(self):
# Delay thread startup so the wakeup pipe can fill up and block
time.sleep(3)
thread_run(self)

class MockWakeup(_ThreadWakeup):
"""Mock wakeup object to force the wakeup to block"""
def __init__(self):
super().__init__()
self._dummy_queue = queue.Queue(maxsize=1)

def wakeup(self):
self._dummy_queue.put(None, block=True)
super().wakeup()

def clear(self):
try:
while True:
self._dummy_queue.get_nowait()
except queue.Empty:
super().clear()

with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
'run', mock_run),
unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
MockWakeup)):
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock

job_num = 100
job_data = range(job_num)

# Need to use sigalarm for timeout detection because
# Executor.submit is not guarded by any timeout (both
# self._work_ids.put(self._queue_count) and
# self._executor_manager_thread_wakeup.wakeup() might
# timeout, maybe more?). In this specific case it was
# the wakeup call that deadlocked on a blocking pipe.
old_handler = signal.signal(signal.SIGALRM, timeout)
try:
signal.alarm(int(self.TIMEOUT))
self.assertEqual(job_num, len(list(executor.map(int, job_data))))
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)


create_executor_tests(globals(), ExecutorDeadlockTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down
46 changes: 46 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,52 @@ def mock_start_new_thread(func, *args):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()

def test_wakeup(self):
# gh-105829: Check that calling _ExecutorManagerThread wakeup() many
# times in ProcessPoolExecutor.submit() does not block if the
# _ThreadWakeup pipe becomes full.

def get_pipe_size(connection):
try:
import fcntl
return fcntl.fcntl(connection.fileno(), fcntl.F_GETPIPE_SZ)
except ImportError:
# Assume 64 KiB pipe if we fail, makes test take longer
return 65_536

executor = self.executor
with executor:
# Summit a job to start the executor manager thread
# future = self.executor.submit(str, 12)
# future.result()

# Wrap _ThreadWakeup.wakeup() to count how many times it has been
# called
thread_wakeup = executor._executor_manager_thread_wakeup
orig_wakeup = thread_wakeup.wakeup
nwakeup = 0
def wrap_wakeup():
nonlocal nwakeup
nwakeup += 1
orig_wakeup()
thread_wakeup.wakeup = wrap_wakeup

# Use longer "wakeup message" to make the hang more likely
# and to speed up the test
njob = self.worker_count * 2 # at least 2 jobs per worker
pipe_size = get_pipe_size(thread_wakeup._writer)
msg_len = min(pipe_size // njob, 512)
thread_wakeup._wakeup_message = b'x' * msg_len
msg_size = 4 + len(thread_wakeup._wakeup_message)

njob = pipe_size // msg_size
job_data = range(njob)
if support.verbose:
print(f"run {njob:,} jobs")

self.assertEqual(len(list(executor.map(int, job_data))), njob)
self.assertGreaterEqual(nwakeup, njob)


create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down

0 comments on commit 1aa24ee

Please sign in to comment.