diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 392d1960e05950..321608ab174cfb 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -497,6 +497,10 @@ def terminate_broken(self, cause): for p in self.processes.values(): p.terminate() + # Prevent queue writing to a pipe which is no longer read. + # https://github.com/python/cpython/issues/94777 + self.call_queue._reader.close() + # clean up resources self.join_executor_internals() diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 1bb6156a18ddbf..4558560e1a992f 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -1167,6 +1167,11 @@ def _crash(delay=None): faulthandler._sigsegv() +def _crash_with_data(data): + """Induces a segfault with dummy data in input.""" + _crash() + + def _exit(): """Induces a sys exit with exitcode 1.""" sys.exit(1) @@ -1366,6 +1371,19 @@ def test_shutdown_deadlock_pickle(self): # dangling threads executor_manager.join() + def test_crash_big_data(self): + # Test that there is a clean exception instad of a deadlock when a + # child process crashes while some data is being written into the + # queue. + # https://github.com/python/cpython/issues/94777 + self.executor.shutdown(wait=True) + data = "a" * support.PIPE_MAX_SIZE + with self.executor_type(max_workers=2, + mp_context=self.get_context()) as executor: + self.executor = executor # Allow clean up in fail_on_deadlock + with self.assertRaises(BrokenProcessPool): + list(executor.map(_crash_with_data, [data] * 10)) + create_executor_tests(ExecutorDeadlockTest, executor_mixins=(ProcessPoolForkMixin, diff --git a/Misc/NEWS.d/next/Library/2022-07-12-18-45-13.gh-issue-94777.mOybx7.rst b/Misc/NEWS.d/next/Library/2022-07-12-18-45-13.gh-issue-94777.mOybx7.rst new file mode 100644 index 00000000000000..2c04a35fbfce13 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-07-12-18-45-13.gh-issue-94777.mOybx7.rst @@ -0,0 +1 @@ +Fix hanging :mod:`multiprocessing` ``ProcessPoolExecutor`` when a child process crashes while data is being written in the call queue.