Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,360 changes: 0 additions & 3,360 deletions Lib/test/test_profiling/test_sampling_profiler.py

This file was deleted.

9 changes: 9 additions & 0 deletions Lib/test/test_profiling/test_sampling_profiler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Tests for the sampling profiler (profiling.sampling)."""

import os
from test.support import load_package_tests


def load_tests(*args):
"""Load all tests from this subpackage."""
return load_package_tests(os.path.dirname(__file__), *args)
101 changes: 101 additions & 0 deletions Lib/test/test_profiling/test_sampling_profiler/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Helper utilities for sampling profiler tests."""

import contextlib
import socket
import subprocess
import sys
import unittest
from collections import namedtuple

from test.support import SHORT_TIMEOUT
from test.support.socket_helper import find_unused_port
from test.support.os_helper import unlink


PROCESS_VM_READV_SUPPORTED = False

try:
from _remote_debugging import PROCESS_VM_READV_SUPPORTED # noqa: F401
import _remote_debugging # noqa: F401
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
)
else:
import profiling.sampling # noqa: F401
from profiling.sampling.sample import SampleProfiler # noqa: F401


skip_if_not_supported = unittest.skipIf(
(
sys.platform != "darwin"
and sys.platform != "linux"
and sys.platform != "win32"
),
"Test only runs on Linux, Windows and MacOS",
)

SubprocessInfo = namedtuple("SubprocessInfo", ["process", "socket"])


@contextlib.contextmanager
def test_subprocess(script):
"""Context manager to create a test subprocess with socket synchronization.

Args:
script: Python code to execute in the subprocess

Yields:
SubprocessInfo: Named tuple with process and socket objects
"""
# Find an unused port for socket communication
port = find_unused_port()

# Inject socket connection code at the beginning of the script
socket_code = f"""
import socket
_test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_test_sock.connect(('localhost', {port}))
_test_sock.sendall(b"ready")
"""

# Combine socket code with user script
full_script = socket_code + script

# Create server socket to wait for process to be ready
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)

proc = subprocess.Popen(
[sys.executable, "-c", full_script],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)

client_socket = None
try:
# Wait for process to connect and send ready signal
client_socket, _ = server_socket.accept()
server_socket.close()
response = client_socket.recv(1024)
if response != b"ready":
raise RuntimeError(
f"Unexpected response from subprocess: {response}"
)

yield SubprocessInfo(proc, client_socket)
finally:
if client_socket is not None:
client_socket.close()
if proc.poll() is None:
proc.kill()
proc.wait()


def close_and_unlink(file):
"""Close a file and unlink it from the filesystem."""
file.close()
unlink(file.name)
38 changes: 38 additions & 0 deletions Lib/test/test_profiling/test_sampling_profiler/mocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Mock classes for sampling profiler tests."""


class MockFrameInfo:
"""Mock FrameInfo for testing since the real one isn't accessible."""

def __init__(self, filename, lineno, funcname):
self.filename = filename
self.lineno = lineno
self.funcname = funcname

def __repr__(self):
return f"MockFrameInfo(filename='{self.filename}', lineno={self.lineno}, funcname='{self.funcname}')"


class MockThreadInfo:
"""Mock ThreadInfo for testing since the real one isn't accessible."""

def __init__(
self, thread_id, frame_info, status=0
): # Default to THREAD_STATE_RUNNING (0)
self.thread_id = thread_id
self.frame_info = frame_info
self.status = status

def __repr__(self):
return f"MockThreadInfo(thread_id={self.thread_id}, frame_info={self.frame_info}, status={self.status})"


class MockInterpreterInfo:
"""Mock InterpreterInfo for testing since the real one isn't accessible."""

def __init__(self, interpreter_id, threads):
self.interpreter_id = interpreter_id
self.threads = threads

def __repr__(self):
return f"MockInterpreterInfo(interpreter_id={self.interpreter_id}, threads={self.threads})"
Loading
Loading