From d77cd4a20044bebbf3457027d8c7e35f63b7a37e Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Sat, 13 Jan 2018 11:20:43 +0100 Subject: [PATCH 01/12] FIX reuse semaphore tracker for child processes --- Lib/multiprocessing/semaphore_tracker.py | 26 ++++++++++++++---------- Lib/multiprocessing/spawn.py | 10 +++++++++ 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index 82833bcf861a49..0d9bb8541084ab 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -44,18 +44,11 @@ def ensure_running(self): This can be run from any process. Usually a child process will use the semaphore created by its parent.''' with self._lock: - if self._pid is not None: + if self._fd is not None: # semaphore tracker was launched before, is it still running? - try: - pid, _ = os.waitpid(self._pid, os.WNOHANG) - except ChildProcessError: - # The process terminated - pass - else: - if not pid: - # => still alive - return - + if self._check_alive(): + # => still alive + return # => dead, launch it again os.close(self._fd) self._fd = None @@ -99,6 +92,15 @@ def ensure_running(self): finally: os.close(r) + def _check_alive(self): + '''Check for that the pipe has not been closed by sending a probe.''' + try: + os.write(self._fd, b'PROBE:0\n') + except BrokenPipeError: + return False + else: + return True + def register(self, name): '''Register name of semaphore with semaphore tracker.''' self._send('REGISTER', name) @@ -150,6 +152,8 @@ def main(fd): cache.add(name) elif cmd == b'UNREGISTER': cache.remove(name) + elif cmd == b'PROBE': + pass else: raise RuntimeError('unrecognized command %r' % cmd) except Exception: diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index 6759351f13abce..c3156d9270f61f 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -178,6 +178,12 @@ def get_preparation_data(name): start_method=get_start_method(), ) + if sys.platform != "win32": + # Pass the semaphore_tracker pid to avoid re-spawning it in every child + from . import semaphore_tracker + semaphore_tracker.ensure_running() + d['tracker_pid'] = semaphore_tracker._semaphore_tracker._pid + # Figure out whether to initialise main in the subprocess as a module # or through direct execution (or to leave it alone entirely) main_module = sys.modules['__main__'] @@ -231,6 +237,10 @@ def prepare(data): if 'start_method' in data: set_start_method(data['start_method'], force=True) + if 'tacker_pid' in data: + from . import semaphore_tracker + semaphore_tracker._semaphore_tracker._pid = data["tracker_pid"] + if 'init_main_from_name' in data: _fixup_main_from_name(data['init_main_from_name']) elif 'init_main_from_path' in data: From 398216f858f5d8c159017a45be4e6d74916ec15c Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Sat, 24 Feb 2018 10:52:17 +0100 Subject: [PATCH 02/12] FIX typo in spawn.prepare --- Lib/multiprocessing/spawn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index c3156d9270f61f..eae8fd9f464e42 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -237,7 +237,7 @@ def prepare(data): if 'start_method' in data: set_start_method(data['start_method'], force=True) - if 'tacker_pid' in data: + if 'tracker_pid' in data: from . import semaphore_tracker semaphore_tracker._semaphore_tracker._pid = data["tracker_pid"] From 87964fc80cfaf13fc82057c4a6ef24fbbefefec5 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Fri, 6 Apr 2018 11:05:16 +0200 Subject: [PATCH 03/12] TST add test for reused semtracker + CLN unused code --- Lib/multiprocessing/semaphore_tracker.py | 2 ++ Lib/multiprocessing/spawn.py | 10 --------- Lib/test/_test_multiprocessing.py | 27 ++++++++++++++++++++++++ 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index 0d9bb8541084ab..835c3b798c119d 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -95,6 +95,8 @@ def ensure_running(self): def _check_alive(self): '''Check for that the pipe has not been closed by sending a probe.''' try: + # We cannot use send here as it calls ensure_running, creating + # a cycle. os.write(self._fd, b'PROBE:0\n') except BrokenPipeError: return False diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index eae8fd9f464e42..6759351f13abce 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -178,12 +178,6 @@ def get_preparation_data(name): start_method=get_start_method(), ) - if sys.platform != "win32": - # Pass the semaphore_tracker pid to avoid re-spawning it in every child - from . import semaphore_tracker - semaphore_tracker.ensure_running() - d['tracker_pid'] = semaphore_tracker._semaphore_tracker._pid - # Figure out whether to initialise main in the subprocess as a module # or through direct execution (or to leave it alone entirely) main_module = sys.modules['__main__'] @@ -237,10 +231,6 @@ def prepare(data): if 'start_method' in data: set_start_method(data['start_method'], force=True) - if 'tracker_pid' in data: - from . import semaphore_tracker - semaphore_tracker._semaphore_tracker._pid = data["tracker_pid"] - if 'init_main_from_name' in data: _fixup_main_from_name(data['init_main_from_name']) elif 'init_main_from_path' in data: diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index a860d9db44fbe9..b40bd391eb021a 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4895,6 +4895,33 @@ def test_semaphore_tracker_sigkill(self): # Uncatchable signal. self.check_semaphore_tracker_death(signal.SIGKILL, True) + @staticmethod + def _is_semaphore_tracker_reused(conn): + from multiprocessing.semaphore_tracker import _semaphore_tracker + _semaphore_tracker.ensure_running() + reused = _semaphore_tracker._pid is None + reused &= _semaphore_tracker._check_alive() + conn.send(reused) + + def test_semaphore_tracker_reused(self): + from multiprocessing.semaphore_tracker import _semaphore_tracker + _semaphore_tracker.ensure_running() + pid = _semaphore_tracker._pid + + ctx = multiprocessing.get_context("spawn") + r, w = ctx.Pipe(duplex=False) + p = ctx.Process(target=self._is_semaphore_tracker_reused, + args=(w,)) + p.start() + is_semaphore_tracker_reused = r.recv() + + # Clean up + p.join() + w.close() + r.close() + + self.assertTrue(is_semaphore_tracker_reused) + class TestSimpleQueue(unittest.TestCase): From 49e96f109fa495b8a3e0deaa1cfdfda5a3bef31b Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Fri, 6 Apr 2018 11:06:36 +0200 Subject: [PATCH 04/12] DOC add what's new --- .../NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst diff --git a/Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst b/Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst new file mode 100644 index 00000000000000..32ebf4efb74bdb --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst @@ -0,0 +1 @@ +Fix the multiprocessing.semaphore_tracker so it is reused by child processes From d9b23e1b351366f951bf8b48bb5c6b8671ac56bd Mon Sep 17 00:00:00 2001 From: tomMoral Date: Fri, 15 Mar 2019 10:10:22 +0100 Subject: [PATCH 05/12] ENH test check pid of semaphore_tracker --- Lib/multiprocessing/spawn.py | 9 +++++++++ Lib/test/_test_multiprocessing.py | 6 +++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index 6759351f13abce..5a4ad8892ca501 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -169,6 +169,11 @@ def get_preparation_data(name): else: sys_path[i] = process.ORIGINAL_DIR + if sys.platform != "win32": + from .semaphore_tracker import _semaphore_tracker + _semaphore_tracker.ensure_running() + d['semaphore_tracker_pid'] = _semaphore_tracker._pid + d.update( name=name, sys_path=sys_path, @@ -231,6 +236,10 @@ def prepare(data): if 'start_method' in data: set_start_method(data['start_method'], force=True) + if 'semaphore_tracker_pid' in data: + from .semaphore_tracker import _semaphore_tracker + _semaphore_tracker._pid = data['semaphore_tracker_pid'] + if 'init_main_from_name' in data: _fixup_main_from_name(data['init_main_from_name']) elif 'init_main_from_path' in data: diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index b40bd391eb021a..125220d11e1d3d 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4896,10 +4896,10 @@ def test_semaphore_tracker_sigkill(self): self.check_semaphore_tracker_death(signal.SIGKILL, True) @staticmethod - def _is_semaphore_tracker_reused(conn): + def _is_semaphore_tracker_reused(conn, pid): from multiprocessing.semaphore_tracker import _semaphore_tracker _semaphore_tracker.ensure_running() - reused = _semaphore_tracker._pid is None + reused = _semaphore_tracker._pid == pid reused &= _semaphore_tracker._check_alive() conn.send(reused) @@ -4911,7 +4911,7 @@ def test_semaphore_tracker_reused(self): ctx = multiprocessing.get_context("spawn") r, w = ctx.Pipe(duplex=False) p = ctx.Process(target=self._is_semaphore_tracker_reused, - args=(w,)) + args=(w, pid)) p.start() is_semaphore_tracker_reused = r.recv() From b61bba22489d03d28607461de8e65507a39a70ae Mon Sep 17 00:00:00 2001 From: tomMoral Date: Sun, 17 Mar 2019 12:11:23 +0100 Subject: [PATCH 06/12] FIX dangling process in test_sem_tracker --- Lib/test/_test_multiprocessing.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 125220d11e1d3d..8ea5bbee304890 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4883,6 +4883,11 @@ def check_semaphore_tracker_death(self, signum, should_die): else: self.assertEqual(len(all_warn), 0) + # make sure to clean-up the killed semaphore_tracker to avoid dangling + # processes. + if signum == signal.SIGKILL: + os.waitpid(pid, 0) + def test_semaphore_tracker_sigint(self): # Catchable signal (ignored by semaphore tracker) self.check_semaphore_tracker_death(signal.SIGINT, False) From 96cb6432855b40d02fabd9310b8f4767373dca3c Mon Sep 17 00:00:00 2001 From: tomMoral Date: Mon, 18 Mar 2019 14:10:05 +0100 Subject: [PATCH 07/12] CLN correctly avoid dangling processes --- Lib/multiprocessing/semaphore_tracker.py | 7 +++++++ Lib/test/_test_multiprocessing.py | 5 ----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index 835c3b798c119d..a6f4c7e79f82be 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -51,6 +51,13 @@ def ensure_running(self): return # => dead, launch it again os.close(self._fd) + try: + # Clean-up to avoid dangling processes. + os.waitpid(self._pid, 0) + except ChildProcessError: + # The process terminated or is a child from an ancestor of + # the current process. + pass self._fd = None self._pid = None diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 8ea5bbee304890..125220d11e1d3d 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4883,11 +4883,6 @@ def check_semaphore_tracker_death(self, signum, should_die): else: self.assertEqual(len(all_warn), 0) - # make sure to clean-up the killed semaphore_tracker to avoid dangling - # processes. - if signum == signal.SIGKILL: - os.waitpid(pid, 0) - def test_semaphore_tracker_sigint(self): # Catchable signal (ignored by semaphore tracker) self.check_semaphore_tracker_death(signal.SIGINT, False) From cbfefd1cf81e71cc809cc8ab3e2251fee9047f9d Mon Sep 17 00:00:00 2001 From: Zackery Spytz Date: Thu, 11 Apr 2019 11:04:41 +0200 Subject: [PATCH 08/12] CLN improve docstring for _check_alive in semaphore_tracker Co-Authored-By: tomMoral --- Lib/multiprocessing/semaphore_tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index a6f4c7e79f82be..bf562fc35907f7 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -100,7 +100,7 @@ def ensure_running(self): os.close(r) def _check_alive(self): - '''Check for that the pipe has not been closed by sending a probe.''' + '''Check that the pipe has not been closed by sending a probe.''' try: # We cannot use send here as it calls ensure_running, creating # a cycle. From 8f8e82f21ae68ee3de53b67d93cbc3aa18f97edc Mon Sep 17 00:00:00 2001 From: tomMoral Date: Fri, 19 Apr 2019 16:11:40 +0200 Subject: [PATCH 09/12] CLN improve tests and avoid passing unused variables --- Lib/multiprocessing/semaphore_tracker.py | 11 +++++++---- Lib/multiprocessing/spawn.py | 9 --------- Lib/test/_test_multiprocessing.py | 9 ++++----- 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index bf562fc35907f7..528d819f926afc 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -51,12 +51,15 @@ def ensure_running(self): return # => dead, launch it again os.close(self._fd) + + # Clean-up to avoid dangling processes. try: - # Clean-up to avoid dangling processes. - os.waitpid(self._pid, 0) + # _pid can be None if this process is a child from another + # python process, which has started the semaphore_tracker. + if self._pid is not None: + os.waitpid(self._pid, 0) except ChildProcessError: - # The process terminated or is a child from an ancestor of - # the current process. + # The semaphore_tracker is already terminated. pass self._fd = None self._pid = None diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index 5a4ad8892ca501..6759351f13abce 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -169,11 +169,6 @@ def get_preparation_data(name): else: sys_path[i] = process.ORIGINAL_DIR - if sys.platform != "win32": - from .semaphore_tracker import _semaphore_tracker - _semaphore_tracker.ensure_running() - d['semaphore_tracker_pid'] = _semaphore_tracker._pid - d.update( name=name, sys_path=sys_path, @@ -236,10 +231,6 @@ def prepare(data): if 'start_method' in data: set_start_method(data['start_method'], force=True) - if 'semaphore_tracker_pid' in data: - from .semaphore_tracker import _semaphore_tracker - _semaphore_tracker._pid = data['semaphore_tracker_pid'] - if 'init_main_from_name' in data: _fixup_main_from_name(data['init_main_from_name']) elif 'init_main_from_path' in data: diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 125220d11e1d3d..c62f0345621e31 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4899,7 +4899,7 @@ def test_semaphore_tracker_sigkill(self): def _is_semaphore_tracker_reused(conn, pid): from multiprocessing.semaphore_tracker import _semaphore_tracker _semaphore_tracker.ensure_running() - reused = _semaphore_tracker._pid == pid + reused = _semaphore_tracker._pid is None reused &= _semaphore_tracker._check_alive() conn.send(reused) @@ -4908,10 +4908,9 @@ def test_semaphore_tracker_reused(self): _semaphore_tracker.ensure_running() pid = _semaphore_tracker._pid - ctx = multiprocessing.get_context("spawn") - r, w = ctx.Pipe(duplex=False) - p = ctx.Process(target=self._is_semaphore_tracker_reused, - args=(w, pid)) + r, w = multiprocessing.Pipe(duplex=False) + p = multiprocessing.Process(target=self._is_semaphore_tracker_reused, + args=(w, pid)) p.start() is_semaphore_tracker_reused = r.recv() From 9a5c94e1043a1bde90ed5877a26d52f91a0cabd4 Mon Sep 17 00:00:00 2001 From: tomMoral Date: Fri, 19 Apr 2019 16:59:16 +0200 Subject: [PATCH 10/12] FIX semaphore_tracker for the fork context --- Lib/multiprocessing/semaphore_tracker.py | 2 +- Lib/test/_test_multiprocessing.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index 528d819f926afc..d871f80f17c358 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -59,7 +59,7 @@ def ensure_running(self): if self._pid is not None: os.waitpid(self._pid, 0) except ChildProcessError: - # The semaphore_tracker is already terminated. + # The semaphore_tracker has already been terminated. pass self._fd = None self._pid = None diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index c62f0345621e31..ccab5321bbb13e 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4899,7 +4899,9 @@ def test_semaphore_tracker_sigkill(self): def _is_semaphore_tracker_reused(conn, pid): from multiprocessing.semaphore_tracker import _semaphore_tracker _semaphore_tracker.ensure_running() - reused = _semaphore_tracker._pid is None + # The pid should be None in the child process, expect for the fork + # context. It should not be a new value. + reused = _semaphore_tracker._pid in (None, pid) reused &= _semaphore_tracker._check_alive() conn.send(reused) From ada8a9d4ec8e51728c65236e7f17a5739103098c Mon Sep 17 00:00:00 2001 From: tomMoral Date: Wed, 24 Apr 2019 11:59:07 +0200 Subject: [PATCH 11/12] ENH more robust sem_tracker.ensure_running --- Lib/multiprocessing/semaphore_tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index d871f80f17c358..3c2c3ad61aeeec 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -108,7 +108,7 @@ def _check_alive(self): # We cannot use send here as it calls ensure_running, creating # a cycle. os.write(self._fd, b'PROBE:0\n') - except BrokenPipeError: + except OSError: return False else: return True From f6d6a31de51ff640f8d1349851132186d843914e Mon Sep 17 00:00:00 2001 From: tomMoral Date: Wed, 24 Apr 2019 18:27:56 +0200 Subject: [PATCH 12/12] DBG trigger appveyor