Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 87 additions & 12 deletions Lib/test/test_profiling/test_sampling_profiler/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,88 @@
SubprocessInfo = namedtuple("SubprocessInfo", ["process", "socket"])


def _wait_for_signal(sock, expected_signals, timeout=SHORT_TIMEOUT):
"""
Wait for expected signal(s) from a socket with proper timeout and EOF handling.

Args:
sock: Connected socket to read from
expected_signals: Single bytes object or list of bytes objects to wait for
timeout: Socket timeout in seconds

Returns:
bytes: Complete accumulated response buffer

Raises:
RuntimeError: If connection closed before signal received or timeout
"""
if isinstance(expected_signals, bytes):
expected_signals = [expected_signals]

sock.settimeout(timeout)
buffer = b""

while True:
# Check if all expected signals are in buffer
if all(sig in buffer for sig in expected_signals):
return buffer

try:
chunk = sock.recv(4096)
if not chunk:
raise RuntimeError(
f"Connection closed before receiving expected signals. "
f"Expected: {expected_signals}, Got: {buffer[-200:]!r}"
)
buffer += chunk
except socket.timeout:
raise RuntimeError(
f"Timeout waiting for signals. "
f"Expected: {expected_signals}, Got: {buffer[-200:]!r}"
) from None
except OSError as e:
raise RuntimeError(
f"Socket error while waiting for signals: {e}. "
f"Expected: {expected_signals}, Got: {buffer[-200:]!r}"
) from None


def _cleanup_sockets(*sockets):
"""Safely close multiple sockets, ignoring errors."""
for sock in sockets:
if sock is not None:
try:
sock.close()
except OSError:
pass


def _cleanup_process(proc, timeout=SHORT_TIMEOUT):
"""Terminate a process gracefully, escalating to kill if needed."""
if proc.poll() is not None:
return
proc.terminate()
try:
proc.wait(timeout=timeout)
return
except subprocess.TimeoutExpired:
pass
proc.kill()
try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
pass # Process refuses to die, nothing more we can do


@contextlib.contextmanager
def test_subprocess(script):
def test_subprocess(script, wait_for_working=False):
"""Context manager to create a test subprocess with socket synchronization.

Args:
script: Python code to execute in the subprocess
script: Python code to execute in the subprocess. If wait_for_working
is True, script should send b"working" after starting work.
wait_for_working: If True, wait for both "ready" and "working" signals.
Default False for backward compatibility.

Yields:
SubprocessInfo: Named tuple with process and socket objects
Expand Down Expand Up @@ -80,19 +156,18 @@ def test_subprocess(script):
# Wait for process to connect and send ready signal
client_socket, _ = server_socket.accept()
server_socket.close()
response = client_socket.recv(1024)
if response != b"ready":
raise RuntimeError(
f"Unexpected response from subprocess: {response!r}"
)
server_socket = None

# Wait for ready signal, and optionally working signal
if wait_for_working:
_wait_for_signal(client_socket, [b"ready", b"working"])
else:
_wait_for_signal(client_socket, b"ready")

yield SubprocessInfo(proc, client_socket)
finally:
if client_socket is not None:
client_socket.close()
if proc.poll() is None:
proc.kill()
proc.wait()
_cleanup_sockets(client_socket, server_socket)
_cleanup_process(proc)


def close_and_unlink(file):
Expand Down
36 changes: 11 additions & 25 deletions Lib/test/test_profiling/test_sampling_profiler/test_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,26 @@ def setUpClass(cls):
import gc

class ExpensiveGarbage:
"""Class that triggers GC with expensive finalizer (callback)."""
def __init__(self):
self.cycle = self

def __del__(self):
# CPU-intensive work in the finalizer callback
result = 0
for i in range(100000):
result += i * i
if i % 1000 == 0:
result = result % 1000000

def main_loop():
"""Main loop that triggers GC with expensive callback."""
while True:
ExpensiveGarbage()
gc.collect()

if __name__ == "__main__":
main_loop()
_test_sock.sendall(b"working")
while True:
ExpensiveGarbage()
gc.collect()
'''

def test_gc_frames_enabled(self):
"""Test that GC frames appear when gc tracking is enabled."""
with (
test_subprocess(self.gc_test_script) as subproc,
test_subprocess(self.gc_test_script, wait_for_working=True) as subproc,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
Expand Down Expand Up @@ -94,7 +88,7 @@ def test_gc_frames_enabled(self):
def test_gc_frames_disabled(self):
"""Test that GC frames do not appear when gc tracking is disabled."""
with (
test_subprocess(self.gc_test_script) as subproc,
test_subprocess(self.gc_test_script, wait_for_working=True) as subproc,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
Expand Down Expand Up @@ -133,18 +127,13 @@ def setUpClass(cls):
cls.native_test_script = """
import operator

def main_loop():
while True:
# Native code in the middle of the stack:
operator.call(inner)

def inner():
# Python code at the top of the stack:
for _ in range(1_000_0000):
pass

if __name__ == "__main__":
main_loop()
_test_sock.sendall(b"working")
while True:
operator.call(inner)
"""

def test_native_frames_enabled(self):
Expand All @@ -154,10 +143,7 @@ def test_native_frames_enabled(self):
)
self.addCleanup(close_and_unlink, collapsed_file)

with (
test_subprocess(self.native_test_script) as subproc,
):
# Suppress profiler output when testing file export
with test_subprocess(self.native_test_script, wait_for_working=True) as subproc:
with (
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
Expand Down Expand Up @@ -199,7 +185,7 @@ def test_native_frames_enabled(self):
def test_native_frames_disabled(self):
"""Test that native frames do not appear when native tracking is disabled."""
with (
test_subprocess(self.native_test_script) as subproc,
test_subprocess(self.native_test_script, wait_for_working=True) as subproc,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
Expand Down
Loading
Loading