Skip to content

Commit 004b93e

Browse files
tomMoralpitrou
authored andcommitted
bpo-36668: FIX reuse semaphore tracker for child processes (#5172)
Fix the multiprocessing.semaphore_tracker so it is reused by child processes.
1 parent 09d434c commit 004b93e

File tree

3 files changed

+55
-10
lines changed

3 files changed

+55
-10
lines changed

Lib/multiprocessing/semaphore_tracker.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,23 @@ def ensure_running(self):
4444
This can be run from any process. Usually a child process will use
4545
the semaphore created by its parent.'''
4646
with self._lock:
47-
if self._pid is not None:
47+
if self._fd is not None:
4848
# semaphore tracker was launched before, is it still running?
49+
if self._check_alive():
50+
# => still alive
51+
return
52+
# => dead, launch it again
53+
os.close(self._fd)
54+
55+
# Clean-up to avoid dangling processes.
4956
try:
50-
pid, _ = os.waitpid(self._pid, os.WNOHANG)
57+
# _pid can be None if this process is a child from another
58+
# python process, which has started the semaphore_tracker.
59+
if self._pid is not None:
60+
os.waitpid(self._pid, 0)
5161
except ChildProcessError:
52-
# The process terminated
62+
# The semaphore_tracker has already been terminated.
5363
pass
54-
else:
55-
if not pid:
56-
# => still alive
57-
return
58-
59-
# => dead, launch it again
60-
os.close(self._fd)
6164
self._fd = None
6265
self._pid = None
6366

@@ -99,6 +102,17 @@ def ensure_running(self):
99102
finally:
100103
os.close(r)
101104

105+
def _check_alive(self):
106+
'''Check that the pipe has not been closed by sending a probe.'''
107+
try:
108+
# We cannot use send here as it calls ensure_running, creating
109+
# a cycle.
110+
os.write(self._fd, b'PROBE:0\n')
111+
except OSError:
112+
return False
113+
else:
114+
return True
115+
102116
def register(self, name):
103117
'''Register name of semaphore with semaphore tracker.'''
104118
self._send('REGISTER', name)
@@ -150,6 +164,8 @@ def main(fd):
150164
cache.add(name)
151165
elif cmd == b'UNREGISTER':
152166
cache.remove(name)
167+
elif cmd == b'PROBE':
168+
pass
153169
else:
154170
raise RuntimeError('unrecognized command %r' % cmd)
155171
except Exception:

Lib/test/_test_multiprocessing.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4891,6 +4891,34 @@ def test_semaphore_tracker_sigkill(self):
48914891
# Uncatchable signal.
48924892
self.check_semaphore_tracker_death(signal.SIGKILL, True)
48934893

4894+
@staticmethod
4895+
def _is_semaphore_tracker_reused(conn, pid):
4896+
from multiprocessing.semaphore_tracker import _semaphore_tracker
4897+
_semaphore_tracker.ensure_running()
4898+
# The pid should be None in the child process, expect for the fork
4899+
# context. It should not be a new value.
4900+
reused = _semaphore_tracker._pid in (None, pid)
4901+
reused &= _semaphore_tracker._check_alive()
4902+
conn.send(reused)
4903+
4904+
def test_semaphore_tracker_reused(self):
4905+
from multiprocessing.semaphore_tracker import _semaphore_tracker
4906+
_semaphore_tracker.ensure_running()
4907+
pid = _semaphore_tracker._pid
4908+
4909+
r, w = multiprocessing.Pipe(duplex=False)
4910+
p = multiprocessing.Process(target=self._is_semaphore_tracker_reused,
4911+
args=(w, pid))
4912+
p.start()
4913+
is_semaphore_tracker_reused = r.recv()
4914+
4915+
# Clean up
4916+
p.join()
4917+
w.close()
4918+
r.close()
4919+
4920+
self.assertTrue(is_semaphore_tracker_reused)
4921+
48944922

48954923
class TestSimpleQueue(unittest.TestCase):
48964924

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix the multiprocessing.semaphore_tracker so it is reused by child processes

0 commit comments

Comments
 (0)