Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 45 additions & 18 deletions Lib/multiprocessing/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ def __init__(self):
self._exitcode = None
self._reentrant_messages = deque()

# True to use colon-separated lines, rather than JSON lines,
# for internal communication. (Mainly for testing).
# Filenames not supported by the simple format will always be sent
# using JSON.
# The reader should understand all formats.
self._use_simple_format = False

def _reentrant_call_error(self):
# gh-109629: this happens if an explicit call to the ResourceTracker
# gets interrupted by a garbage collection, invoking a finalizer (*)
Expand Down Expand Up @@ -200,7 +207,9 @@ def _launch(self):
os.close(r)

def _make_probe_message(self):
"""Return a JSON-encoded probe message."""
"""Return a probe message."""
if self._use_simple_format:
return b'PROBE:0:noop\n'
return (
json.dumps(
{"cmd": "PROBE", "rtype": "noop"},
Expand Down Expand Up @@ -267,6 +276,15 @@ def _write(self, msg):
assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"

def _send(self, cmd, name, rtype):
if self._use_simple_format and '\n' not in name:
msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
if len(msg) > 512:
# posix guarantees that writes to a pipe of less than PIPE_BUF
# bytes are atomic, and that PIPE_BUF >= 512
raise ValueError('msg too long')
self._ensure_running_and_write(msg)
return

# POSIX guarantees that writes to a pipe of less than PIPE_BUF (512 on Linux)
# bytes are atomic. Therefore, we want the message to be shorter than 512 bytes.
# POSIX shm_open() and sem_open() require the name, including its leading slash,
Expand All @@ -286,6 +304,7 @@ def _send(self, cmd, name, rtype):

# The entire JSON message is guaranteed < PIPE_BUF (512 bytes) by construction.
assert len(msg) <= 512, f"internal error: message too long ({len(msg)} bytes)"
assert msg.startswith(b'{')

self._ensure_running_and_write(msg)

Expand All @@ -296,6 +315,30 @@ def _send(self, cmd, name, rtype):
getfd = _resource_tracker.getfd


def _decode_message(line):
if line.startswith(b'{'):
try:
obj = json.loads(line.decode('ascii'))
except Exception as e:
raise ValueError("malformed resource_tracker message: %r" % (line,)) from e

cmd = obj["cmd"]
rtype = obj["rtype"]
b64 = obj.get("base64_name", "")

if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str):
raise ValueError("malformed resource_tracker fields: %r" % (obj,))

try:
name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape')
except ValueError as e:
raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e
else:
cmd, rest = line.strip().decode('ascii').split(':', maxsplit=1)
name, rtype = rest.rsplit(':', maxsplit=1)
return cmd, rtype, name


def main(fd):
'''Run resource tracker.'''
# protect the process from ^C and "killall python" etc
Expand All @@ -318,23 +361,7 @@ def main(fd):
with open(fd, 'rb') as f:
for line in f:
try:
try:
obj = json.loads(line.decode('ascii'))
except Exception as e:
raise ValueError("malformed resource_tracker message: %r" % (line,)) from e

cmd = obj["cmd"]
rtype = obj["rtype"]
b64 = obj.get("base64_name", "")

if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str):
raise ValueError("malformed resource_tracker fields: %r" % (obj,))

try:
name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape')
except ValueError as e:
raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e

cmd, rtype, name = _decode_message(line)
cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
if cleanup_func is None:
raise ValueError(
Expand Down
26 changes: 24 additions & 2 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from test.support import socket_helper
from test.support import threading_helper
from test.support import warnings_helper
from test.support import subTests
from test.support.script_helper import assert_python_failure, assert_python_ok

# Skip tests if _multiprocessing wasn't built.
Expand Down Expand Up @@ -4383,6 +4384,19 @@ def test_copy(self):
self.assertEqual(bar.z, 2 ** 33)


def resource_tracker_format_subtests(func):
"""Run given test using both resource tracker communication formats"""
def _inner(self, *args, **kwargs):
tracker = resource_tracker._resource_tracker
for use_simple_format in False, True:
with (
self.subTest(use_simple_format=use_simple_format),
unittest.mock.patch.object(
tracker, '_use_simple_format', use_simple_format)
):
func(self, *args, **kwargs)
return _inner

@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
@hashlib_helper.requires_hashdigest('sha256')
class _TestSharedMemory(BaseTestCase):
Expand Down Expand Up @@ -4662,6 +4676,7 @@ def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
smm.shutdown()

@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
@resource_tracker_format_subtests
def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
# bpo-36867: test that a SharedMemoryManager uses the
# same resource_tracker process as its parent.
Expand Down Expand Up @@ -4913,6 +4928,7 @@ def test_shared_memory_cleaned_after_process_termination(self):
"shared_memory objects to clean up at shutdown", err)

@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
@resource_tracker_format_subtests
def test_shared_memory_untracking(self):
# gh-82300: When a separate Python process accesses shared memory
# with track=False, it must not cause the memory to be deleted
Expand Down Expand Up @@ -4940,6 +4956,7 @@ def test_shared_memory_untracking(self):
mem.close()

@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
@resource_tracker_format_subtests
def test_shared_memory_tracking(self):
# gh-82300: When a separate Python process accesses shared memory
# with track=True, it must cause the memory to be deleted when
Expand Down Expand Up @@ -7353,13 +7370,18 @@ def test_forkpty(self):

@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
class TestSharedMemoryNames(unittest.TestCase):
def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(self):
@subTests('use_simple_format', (True, False))
def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(
self, use_simple_format):
# Test script that creates and cleans up shared memory with colon in name
test_script = textwrap.dedent("""
import sys
from multiprocessing import shared_memory
from multiprocessing import resource_tracker
import time

resource_tracker._resource_tracker._use_simple_format = %s

# Test various patterns of colons in names
test_names = [
"a:b",
Expand Down Expand Up @@ -7387,7 +7409,7 @@ def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(self
sys.exit(1)

print("SUCCESS")
""")
""" % use_simple_format)

rc, out, err = assert_python_ok("-c", test_script)
self.assertIn(b"SUCCESS", out)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
The resource tracker in the :mod:`multiprocessing` module can now understand
messages from older versions of itself. This avoids issues with upgrading
Python while it is running. (Note that such 'in-place' upgrades are not
tested.)
Loading