From d02546d7fb2847f6f4cfac9c92f6764c7e59779c Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Wed, 3 Dec 2025 09:36:48 +0100 Subject: [PATCH 1/3] gh-142206: multiprocessing.resource_tracker: Decode messages using older protocol --- Lib/multiprocessing/resource_tracker.py | 62 +++++++++++++------ Lib/test/_test_multiprocessing.py | 18 ++++++ ...-12-03-09-36-29.gh-issue-142206.ilwegH.rst | 4 ++ 3 files changed, 66 insertions(+), 18 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2025-12-03-09-36-29.gh-issue-142206.ilwegH.rst diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index b0f9099f4a59f3..22cdfcfcde4d08 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -68,6 +68,13 @@ def __init__(self): self._exitcode = None self._reentrant_messages = deque() + # True to use colon-separated lines, rather than JSON lines, + # for internal communication. (Mainly for testing). + # Filenames not supported by the simple format will always be sent + # using JSON. + # The reader should understand all formats. + self._use_simple_format = False + def _reentrant_call_error(self): # gh-109629: this happens if an explicit call to the ResourceTracker # gets interrupted by a garbage collection, invoking a finalizer (*) @@ -200,7 +207,9 @@ def _launch(self): os.close(r) def _make_probe_message(self): - """Return a JSON-encoded probe message.""" + """Return a probe message.""" + if self._use_simple_format: + return b'PROBE:0:noop\n' return ( json.dumps( {"cmd": "PROBE", "rtype": "noop"}, @@ -267,6 +276,15 @@ def _write(self, msg): assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}" def _send(self, cmd, name, rtype): + if self._use_simple_format and ':' not in name: + msg = f"{cmd}:{name}:{rtype}\n".encode("ascii") + if len(msg) > 512: + # posix guarantees that writes to a pipe of less than PIPE_BUF + # bytes are atomic, and that PIPE_BUF >= 512 + raise ValueError('msg too long') + self._ensure_running_and_write(msg) + return + # POSIX guarantees that writes to a pipe of less than PIPE_BUF (512 on Linux) # bytes are atomic. Therefore, we want the message to be shorter than 512 bytes. # POSIX shm_open() and sem_open() require the name, including its leading slash, @@ -286,6 +304,7 @@ def _send(self, cmd, name, rtype): # The entire JSON message is guaranteed < PIPE_BUF (512 bytes) by construction. assert len(msg) <= 512, f"internal error: message too long ({len(msg)} bytes)" + assert msg.startswith(b'{') self._ensure_running_and_write(msg) @@ -296,6 +315,29 @@ def _send(self, cmd, name, rtype): getfd = _resource_tracker.getfd +def decode_message(line): + if line.startswith(b'{'): + try: + obj = json.loads(line.decode('ascii')) + except Exception as e: + raise ValueError("malformed resource_tracker message: %r" % (line,)) from e + + cmd = obj["cmd"] + rtype = obj["rtype"] + b64 = obj.get("base64_name", "") + + if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str): + raise ValueError("malformed resource_tracker fields: %r" % (obj,)) + + try: + name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape') + except ValueError as e: + raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e + else: + cmd, name, rtype = line.strip().decode('ascii').split(':') + return cmd, rtype, name + + def main(fd): '''Run resource tracker.''' # protect the process from ^C and "killall python" etc @@ -318,23 +360,7 @@ def main(fd): with open(fd, 'rb') as f: for line in f: try: - try: - obj = json.loads(line.decode('ascii')) - except Exception as e: - raise ValueError("malformed resource_tracker message: %r" % (line,)) from e - - cmd = obj["cmd"] - rtype = obj["rtype"] - b64 = obj.get("base64_name", "") - - if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str): - raise ValueError("malformed resource_tracker fields: %r" % (obj,)) - - try: - name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape') - except ValueError as e: - raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e - + cmd, rtype, name = decode_message(line) cleanup_func = _CLEANUP_FUNCS.get(rtype, None) if cleanup_func is None: raise ValueError( diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d718a27231897f..d30bd179098a19 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -39,6 +39,7 @@ from test.support import socket_helper from test.support import threading_helper from test.support import warnings_helper +from test.support import subTests from test.support.script_helper import assert_python_failure, assert_python_ok # Skip tests if _multiprocessing wasn't built. @@ -4383,6 +4384,19 @@ def test_copy(self): self.assertEqual(bar.z, 2 ** 33) +def resource_tracker_format_subtests(func): + """Run given test using both resource tracker communication formats""" + def _inner(self, *args, **kwargs): + tracker = resource_tracker._resource_tracker + for use_simple_format in False, True: + with ( + self.subTest(use_simple_format=use_simple_format), + unittest.mock.patch.object( + tracker, '_use_simple_format', use_simple_format) + ): + func(self, *args, **kwargs) + return _inner + @unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory") @hashlib_helper.requires_hashdigest('sha256') class _TestSharedMemory(BaseTestCase): @@ -4662,6 +4676,7 @@ def test_shared_memory_SharedMemoryServer_ignores_sigint(self): smm.shutdown() @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") + @resource_tracker_format_subtests def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self): # bpo-36867: test that a SharedMemoryManager uses the # same resource_tracker process as its parent. @@ -4913,6 +4928,7 @@ def test_shared_memory_cleaned_after_process_termination(self): "shared_memory objects to clean up at shutdown", err) @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") + @resource_tracker_format_subtests def test_shared_memory_untracking(self): # gh-82300: When a separate Python process accesses shared memory # with track=False, it must not cause the memory to be deleted @@ -4940,6 +4956,7 @@ def test_shared_memory_untracking(self): mem.close() @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") + @resource_tracker_format_subtests def test_shared_memory_tracking(self): # gh-82300: When a separate Python process accesses shared memory # with track=True, it must cause the memory to be deleted when @@ -7353,6 +7370,7 @@ def test_forkpty(self): @unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory") class TestSharedMemoryNames(unittest.TestCase): + @resource_tracker_format_subtests def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(self): # Test script that creates and cleans up shared memory with colon in name test_script = textwrap.dedent(""" diff --git a/Misc/NEWS.d/next/Library/2025-12-03-09-36-29.gh-issue-142206.ilwegH.rst b/Misc/NEWS.d/next/Library/2025-12-03-09-36-29.gh-issue-142206.ilwegH.rst new file mode 100644 index 00000000000000..90e4dd96985979 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-12-03-09-36-29.gh-issue-142206.ilwegH.rst @@ -0,0 +1,4 @@ +The resource tracker in the :mod:`multiprocessing` module can now understand +messages from older versions of itself. This avoids issues with upgrading +Python while it is running. (Note that such 'in-place' upgrades are not +tested.) From be437028217ce0f0b3caa34c9bc29f85e65c08dd Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Wed, 3 Dec 2025 10:10:30 +0100 Subject: [PATCH 2/3] Split on first and last colons when reading older messages --- Lib/multiprocessing/resource_tracker.py | 5 +++-- Lib/test/_test_multiprocessing.py | 10 +++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 22cdfcfcde4d08..d000cc93e8acb4 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -276,7 +276,7 @@ def _write(self, msg): assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}" def _send(self, cmd, name, rtype): - if self._use_simple_format and ':' not in name: + if self._use_simple_format and '\n' not in name: msg = f"{cmd}:{name}:{rtype}\n".encode("ascii") if len(msg) > 512: # posix guarantees that writes to a pipe of less than PIPE_BUF @@ -334,7 +334,8 @@ def decode_message(line): except ValueError as e: raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e else: - cmd, name, rtype = line.strip().decode('ascii').split(':') + cmd, rest = line.strip().decode('ascii').split(':', maxsplit=1) + name, rtype = rest.rsplit(':', maxsplit=1) return cmd, rtype, name diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d30bd179098a19..d03eb1dfb253ec 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -7370,14 +7370,18 @@ def test_forkpty(self): @unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory") class TestSharedMemoryNames(unittest.TestCase): - @resource_tracker_format_subtests - def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(self): + @subTests('use_simple_format', (True, False)) + def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors( + self, use_simple_format): # Test script that creates and cleans up shared memory with colon in name test_script = textwrap.dedent(""" import sys from multiprocessing import shared_memory + from multiprocessing import resource_tracker import time + resource_tracker._resource_tracker._use_simple_format = %s + # Test various patterns of colons in names test_names = [ "a:b", @@ -7405,7 +7409,7 @@ def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(self sys.exit(1) print("SUCCESS") - """) + """ % use_simple_format) rc, out, err = assert_python_ok("-c", test_script) self.assertIn(b"SUCCESS", out) From 9353b904bf0738763f79fabec5f866d852d57f22 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Wed, 3 Dec 2025 11:01:41 +0100 Subject: [PATCH 3/3] Private name --- Lib/multiprocessing/resource_tracker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index d000cc93e8acb4..3606d1effb495b 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -315,7 +315,7 @@ def _send(self, cmd, name, rtype): getfd = _resource_tracker.getfd -def decode_message(line): +def _decode_message(line): if line.startswith(b'{'): try: obj = json.loads(line.decode('ascii')) @@ -361,7 +361,7 @@ def main(fd): with open(fd, 'rb') as f: for line in f: try: - cmd, rtype, name = decode_message(line) + cmd, rtype, name = _decode_message(line) cleanup_func = _CLEANUP_FUNCS.get(rtype, None) if cleanup_func is None: raise ValueError(