From db2ed3b0103dacd896365cf5414264858c5a3241 Mon Sep 17 00:00:00 2001 From: Pablo Galindo Salgado Date: Fri, 21 Nov 2025 01:41:36 +0000 Subject: [PATCH 1/7] gh-138122: Refactor the CLI of profile.sampling into subcommands --- Doc/library/profile.rst | 75 -- Lib/profiling/sampling/__main__.py | 2 +- Lib/profiling/sampling/cli.py | 692 ++++++++++++ Lib/profiling/sampling/gecko_collector.py | 3 +- Lib/profiling/sampling/pstats_collector.py | 340 ++++++ Lib/profiling/sampling/sample.py | 986 ++---------------- Lib/profiling/sampling/stack_collector.py | 3 +- .../test_sampling_profiler/test_advanced.py | 28 +- .../test_sampling_profiler/test_cli.py | 390 +++---- .../test_sampling_profiler/test_collectors.py | 18 +- .../test_integration.py | 98 +- .../test_sampling_profiler/test_modes.py | 159 +-- .../test_sampling_profiler/test_profiler.py | 24 +- 13 files changed, 1386 insertions(+), 1432 deletions(-) create mode 100644 Lib/profiling/sampling/cli.py diff --git a/Doc/library/profile.rst b/Doc/library/profile.rst index 5bf36b13c6d789..03ad50b2c5eaf8 100644 --- a/Doc/library/profile.rst +++ b/Doc/library/profile.rst @@ -347,81 +347,6 @@ The statistical profiler produces output similar to deterministic profilers but .. _profile-cli: -:mod:`!profiling.sampling` Module Reference -======================================================= - -.. module:: profiling.sampling - :synopsis: Python statistical profiler. - -This section documents the programmatic interface for the :mod:`!profiling.sampling` module. -For command-line usage, see :ref:`sampling-profiler-cli`. For conceptual information -about statistical profiling, see :ref:`statistical-profiling` - -.. function:: sample(pid, *, sort=2, sample_interval_usec=100, duration_sec=10, filename=None, all_threads=False, limit=None, show_summary=True, output_format="pstats", realtime_stats=False, native=False, gc=True) - - Sample a Python process and generate profiling data. - - This is the main entry point for statistical profiling. It creates a - :class:`SampleProfiler`, collects stack traces from the target process, and - outputs the results in the specified format. - - :param int pid: Process ID of the target Python process - :param int sort: Sort order for pstats output (default: 2 for cumulative time) - :param int sample_interval_usec: Sampling interval in microseconds (default: 100) - :param int duration_sec: Duration to sample in seconds (default: 10) - :param str filename: Output filename (None for stdout/default naming) - :param bool all_threads: Whether to sample all threads (default: False) - :param int limit: Maximum number of functions to display (default: None) - :param bool show_summary: Whether to show summary statistics (default: True) - :param str output_format: Output format - 'pstats' or 'collapsed' (default: 'pstats') - :param bool realtime_stats: Whether to display real-time statistics (default: False) - :param bool native: Whether to include ```` frames (default: False) - :param bool gc: Whether to include ```` frames (default: True) - - :raises ValueError: If output_format is not 'pstats' or 'collapsed' - - Examples:: - - # Basic usage - profile process 1234 for 10 seconds - import profiling.sampling - profiling.sampling.sample(1234) - - # Profile with custom settings - profiling.sampling.sample(1234, duration_sec=30, sample_interval_usec=50, all_threads=True) - - # Generate collapsed stack traces for flamegraph.pl - profiling.sampling.sample(1234, output_format='collapsed', filename='profile.collapsed') - -.. class:: SampleProfiler(pid, sample_interval_usec, all_threads) - - Low-level API for the statistical profiler. - - This profiler uses periodic stack sampling to collect performance data - from running Python processes with minimal overhead. It can attach to - any Python process by PID and collect stack traces at regular intervals. - - :param int pid: Process ID of the target Python process - :param int sample_interval_usec: Sampling interval in microseconds - :param bool all_threads: Whether to sample all threads or just the main thread - - .. method:: sample(collector, duration_sec=10) - - Sample the target process for the specified duration. - - Collects stack traces from the target process at regular intervals - and passes them to the provided collector for processing. - - :param collector: Object that implements ``collect()`` method to process stack traces - :param int duration_sec: Duration to sample in seconds (default: 10) - - The method tracks sampling statistics and can display real-time - information if realtime_stats is enabled. - -.. seealso:: - - :ref:`sampling-profiler-cli` - Command-line interface documentation for the statistical profiler. - Deterministic Profiler Command Line Interface ============================================= diff --git a/Lib/profiling/sampling/__main__.py b/Lib/profiling/sampling/__main__.py index cd1425b8b9c7d3..47bd3a0113eb3d 100644 --- a/Lib/profiling/sampling/__main__.py +++ b/Lib/profiling/sampling/__main__.py @@ -45,7 +45,7 @@ system restrictions or missing privileges. """ -from .sample import main +from .cli import main def handle_permission_error(): """Handle PermissionError by displaying appropriate error message.""" diff --git a/Lib/profiling/sampling/cli.py b/Lib/profiling/sampling/cli.py new file mode 100644 index 00000000000000..74ea049a7c1393 --- /dev/null +++ b/Lib/profiling/sampling/cli.py @@ -0,0 +1,692 @@ +"""Command-line interface for the sampling profiler.""" + +import argparse +import os +import socket +import subprocess +import sys + +from .sample import sample, sample_live +from .pstats_collector import PstatsCollector +from .stack_collector import CollapsedStackCollector, FlamegraphCollector +from .gecko_collector import GeckoCollector +from .constants import ( + PROFILING_MODE_ALL, + PROFILING_MODE_WALL, + PROFILING_MODE_CPU, + PROFILING_MODE_GIL, + SORT_MODE_NSAMPLES, + SORT_MODE_TOTTIME, + SORT_MODE_CUMTIME, + SORT_MODE_SAMPLE_PCT, + SORT_MODE_CUMUL_PCT, + SORT_MODE_NSAMPLES_CUMUL, +) + +try: + from .live_collector import LiveStatsCollector +except ImportError: + LiveStatsCollector = None + + +_HELP_DESCRIPTION = """Sample a process's stack frames and generate profiling data. + +Commands: + run Run and profile a script or module + attach Attach to and profile a running process + live Interactive TUI profiler (top-like interface) + +Examples: + # Run and profile a script (default: pstats to stdout) + python -m profiling.sampling run script.py arg1 arg2 + + # Run and profile a module + python -m profiling.sampling run -m mymodule arg1 arg2 + + # Attach to a running process + python -m profiling.sampling attach 1234 + + # Live interactive mode for a process + python -m profiling.sampling live 1234 + + # Live mode for a script + python -m profiling.sampling live script.py + + # Generate flamegraph from a script + python -m profiling.sampling run --flamegraph -o output.html script.py + + # Profile with custom interval and duration + python -m profiling.sampling run -i 50 -d 30 script.py + + # Profile all threads, sort by total time + python -m profiling.sampling attach -a --sort tottime 1234 + + # Save collapsed stacks to file + python -m profiling.sampling run --collapsed -o stacks.txt script.py + +Use 'python -m profiling.sampling --help' for command-specific help.""" + + +# Constants for socket synchronization +_SYNC_TIMEOUT = 5.0 +_PROCESS_KILL_TIMEOUT = 2.0 +_READY_MESSAGE = b"ready" +_RECV_BUFFER_SIZE = 1024 + +# Format configuration +FORMAT_EXTENSIONS = { + "pstats": "pstats", + "collapsed": "txt", + "flamegraph": "html", + "gecko": "json", +} + +COLLECTOR_MAP = { + "pstats": PstatsCollector, + "collapsed": CollapsedStackCollector, + "flamegraph": FlamegraphCollector, + "gecko": GeckoCollector, +} + + +def _parse_mode(mode_string): + """Convert mode string to mode constant.""" + mode_map = { + "wall": PROFILING_MODE_WALL, + "cpu": PROFILING_MODE_CPU, + "gil": PROFILING_MODE_GIL, + } + return mode_map[mode_string] + + +def _run_with_sync(original_cmd, suppress_output=False): + """Run a command with socket-based synchronization and return the process.""" + # Create a TCP socket for synchronization with better socket options + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sync_sock: + # Set SO_REUSEADDR to avoid "Address already in use" errors + sync_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sync_sock.bind(("127.0.0.1", 0)) # Let OS choose a free port + sync_port = sync_sock.getsockname()[1] + sync_sock.listen(1) + sync_sock.settimeout(_SYNC_TIMEOUT) + + # Get current working directory to preserve it + cwd = os.getcwd() + + # Build command using the sync coordinator + target_args = original_cmd[1:] # Remove python executable + cmd = ( + sys.executable, + "-m", + "profiling.sampling._sync_coordinator", + str(sync_port), + cwd, + ) + tuple(target_args) + + # Start the process with coordinator + # Suppress stdout/stderr if requested (for live mode) + popen_kwargs = {} + if suppress_output: + popen_kwargs["stdin"] = subprocess.DEVNULL + popen_kwargs["stdout"] = subprocess.DEVNULL + popen_kwargs["stderr"] = subprocess.DEVNULL + + process = subprocess.Popen(cmd, **popen_kwargs) + + try: + # Wait for ready signal with timeout + with sync_sock.accept()[0] as conn: + ready_signal = conn.recv(_RECV_BUFFER_SIZE) + + if ready_signal != _READY_MESSAGE: + raise RuntimeError( + f"Invalid ready signal received: {ready_signal!r}" + ) + + except socket.timeout: + # If we timeout, kill the process and raise an error + if process.poll() is None: + process.terminate() + try: + process.wait(timeout=_PROCESS_KILL_TIMEOUT) + except subprocess.TimeoutExpired: + process.kill() + process.wait() + raise RuntimeError( + "Process failed to signal readiness within timeout" + ) + + return process + + +def _add_sampling_options(parser): + """Add sampling configuration options to a parser.""" + sampling_group = parser.add_argument_group("Sampling configuration") + sampling_group.add_argument( + "-i", + "--interval", + type=int, + default=100, + help="Sampling interval in microseconds (default: 100)", + ) + sampling_group.add_argument( + "-d", + "--duration", + type=int, + default=10, + help="Sampling duration in seconds (default: 10)", + ) + sampling_group.add_argument( + "-a", + "--all-threads", + action="store_true", + help="Sample all threads in the process instead of just the main thread", + ) + sampling_group.add_argument( + "--realtime-stats", + action="store_true", + help="Print real-time sampling statistics (Hz, mean, min, max) during profiling", + ) + sampling_group.add_argument( + "--native", + action="store_true", + help='Include artificial "" frames to denote calls to non-Python code', + ) + sampling_group.add_argument( + "--no-gc", + action="store_false", + dest="gc", + help='Don\'t include artificial "" frames to denote active garbage collection', + ) + + +def _add_mode_options(parser): + """Add mode options to a parser.""" + mode_group = parser.add_argument_group("Mode options") + mode_group.add_argument( + "--mode", + choices=["wall", "cpu", "gil"], + default="wall", + help="Sampling mode: wall (all samples), cpu (only samples when thread is on CPU), " + "gil (only samples when thread holds the GIL) (default: wall)", + ) + + +def _add_format_options(parser): + """Add output format options to a parser.""" + output_group = parser.add_argument_group("Output options") + format_group = output_group.add_mutually_exclusive_group() + format_group.add_argument( + "--pstats", + action="store_const", + const="pstats", + dest="format", + help="Generate pstats output (default)", + ) + format_group.add_argument( + "--collapsed", + action="store_const", + const="collapsed", + dest="format", + help="Generate collapsed stack traces for flamegraphs", + ) + format_group.add_argument( + "--flamegraph", + action="store_const", + const="flamegraph", + dest="format", + help="Generate interactive HTML flamegraph visualization", + ) + format_group.add_argument( + "--gecko", + action="store_const", + const="gecko", + dest="format", + help="Generate Gecko format for Firefox Profiler", + ) + parser.set_defaults(format="pstats") + + output_group.add_argument( + "-o", + "--output", + dest="outfile", + help="Save output to a file (default: stdout for pstats, " + "auto-generated filename for other formats)", + ) + + +def _add_pstats_options(parser): + """Add pstats-specific display options to a parser.""" + pstats_group = parser.add_argument_group("pstats format options") + pstats_group.add_argument( + "--sort", + choices=[ + "nsamples", + "tottime", + "cumtime", + "sample-pct", + "cumul-pct", + "nsamples-cumul", + "name", + ], + default="nsamples", + help="Sort order for pstats output (default: nsamples)", + ) + pstats_group.add_argument( + "-l", + "--limit", + type=int, + default=15, + help="Limit the number of rows in the output (default: 15)", + ) + pstats_group.add_argument( + "--no-summary", + action="store_true", + help="Disable the summary section in the pstats output", + ) + + +def _sort_to_mode(sort_choice): + """Convert sort choice string to SORT_MODE constant.""" + sort_map = { + "nsamples": SORT_MODE_NSAMPLES, + "tottime": SORT_MODE_TOTTIME, + "cumtime": SORT_MODE_CUMTIME, + "sample-pct": SORT_MODE_SAMPLE_PCT, + "cumul-pct": SORT_MODE_CUMUL_PCT, + "nsamples-cumul": SORT_MODE_NSAMPLES_CUMUL, + "name": -1, + } + return sort_map.get(sort_choice, SORT_MODE_NSAMPLES) + + +def _create_collector(format_type, interval, skip_idle): + """Create the appropriate collector based on format type. + + Args: + format_type: The output format ('pstats', 'collapsed', 'flamegraph', 'gecko') + interval: Sampling interval in microseconds + skip_idle: Whether to skip idle samples + + Returns: + A collector instance of the appropriate type + """ + collector_class = COLLECTOR_MAP.get(format_type) + if collector_class is None: + raise ValueError(f"Unknown format: {format_type}") + + # Gecko format never skips idle (it needs both GIL and CPU data) + if format_type == "gecko": + skip_idle = False + + return collector_class(interval, skip_idle=skip_idle) + + +def _generate_output_filename(format_type, pid): + """Generate output filename based on format and PID. + + Args: + format_type: The output format + pid: Process ID + + Returns: + Generated filename + """ + extension = FORMAT_EXTENSIONS.get(format_type, "txt") + return f"{format_type}.{pid}.{extension}" + + +def _handle_output(collector, args, pid, mode): + """Handle output for the collector based on format and arguments. + + Args: + collector: The collector instance with profiling data + args: Parsed command-line arguments + pid: Process ID (for generating filenames) + mode: Profiling mode used + """ + if args.format == "pstats": + if args.outfile: + collector.export(args.outfile) + else: + # Print to stdout + sort_mode = _sort_to_mode(args.sort) + collector.print_stats( + sort_mode, args.limit, not args.no_summary, mode + ) + else: + # Export to file + filename = args.outfile or _generate_output_filename(args.format, pid) + collector.export(filename) + + +def _validate_args(args, parser): + """Validate format-specific options and live mode requirements. + + Args: + args: Parsed command-line arguments + parser: ArgumentParser instance for error reporting + """ + # Check if live mode is available + if args.command == "live" and LiveStatsCollector is None: + parser.error( + "Live mode requires the curses module, which is not available." + ) + + # Only validate format options for run/attach commands (live doesn't have format option) + if args.command not in ("run", "attach"): + return + + # Validate gecko mode doesn't use non-wall mode + if args.format == "gecko" and args.mode != "wall": + parser.error( + "--mode option is incompatible with --gecko. " + "Gecko format automatically includes both GIL-holding and CPU status analysis." + ) + + # Validate pstats-specific options are only used with pstats format + if args.format != "pstats": + issues = [] + if args.sort != "nsamples": + issues.append("--sort") + if args.limit != 15: + issues.append("--limit") + if args.no_summary: + issues.append("--no-summary") + + if issues: + format_flag = f"--{args.format}" + parser.error( + f"Options {', '.join(issues)} are only valid with --pstats, not {format_flag}" + ) + + +def main(): + """Main entry point for the CLI.""" + # Create the main parser + parser = argparse.ArgumentParser( + description=_HELP_DESCRIPTION, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + # Create subparsers for commands + subparsers = parser.add_subparsers( + dest="command", required=True, help="Command to run" + ) + + # === RUN COMMAND === + run_parser = subparsers.add_parser( + "run", + help="Run and profile a script or module", + formatter_class=argparse.RawDescriptionHelpFormatter, + description="Run and profile a Python script or module", + ) + run_parser.add_argument( + "-m", + "--module", + action="store_true", + help="Run target as a module (like python -m)", + ) + run_parser.add_argument( + "target", + help="Script file or module name to profile", + ) + run_parser.add_argument( + "args", + nargs=argparse.REMAINDER, + help="Arguments to pass to the script or module", + ) + _add_sampling_options(run_parser) + _add_mode_options(run_parser) + _add_format_options(run_parser) + _add_pstats_options(run_parser) + + # === ATTACH COMMAND === + attach_parser = subparsers.add_parser( + "attach", + help="Attach to and profile a running process", + formatter_class=argparse.RawDescriptionHelpFormatter, + description="Attach to a running process and profile it", + ) + attach_parser.add_argument( + "pid", + type=int, + help="Process ID to attach to", + ) + _add_sampling_options(attach_parser) + _add_mode_options(attach_parser) + _add_format_options(attach_parser) + _add_pstats_options(attach_parser) + + # === LIVE COMMAND === + live_parser = subparsers.add_parser( + "live", + help="Interactive TUI profiler (top-like interface)", + formatter_class=argparse.RawDescriptionHelpFormatter, + description="Interactive live profiling with a terminal UI (press 'q' to quit, 's' to cycle sort)", + ) + live_parser.add_argument( + "-m", + "--module", + action="store_true", + help="Run target as a module (like python -m)", + ) + live_parser.add_argument( + "target", + help="Process ID, script file, or module name to profile", + ) + live_parser.add_argument( + "args", + nargs=argparse.REMAINDER, + help="Arguments to pass to the script or module (if not a PID)", + ) + _add_sampling_options(live_parser) + _add_mode_options(live_parser) + + # Parse arguments + args = parser.parse_args() + + # Validate arguments + _validate_args(args, parser) + + # Command dispatch table + command_handlers = { + "run": _handle_run, + "attach": _handle_attach, + "live": _handle_live, + } + + # Execute the appropriate command + handler = command_handlers.get(args.command) + if handler: + handler(args) + else: + parser.error(f"Unknown command: {args.command}") + + +def _handle_attach(args): + """Handle the 'attach' command.""" + # Use PROFILING_MODE_ALL for gecko format + mode = ( + PROFILING_MODE_ALL + if args.format == "gecko" + else _parse_mode(args.mode) + ) + + # Determine skip_idle based on mode + skip_idle = ( + mode != PROFILING_MODE_WALL if mode != PROFILING_MODE_ALL else False + ) + + # Create the appropriate collector + collector = _create_collector(args.format, args.interval, skip_idle) + + # Sample the process + collector = sample( + args.pid, + collector, + duration_sec=args.duration, + all_threads=args.all_threads, + realtime_stats=args.realtime_stats, + mode=mode, + native=args.native, + gc=args.gc, + ) + + # Handle output + _handle_output(collector, args, args.pid, mode) + + +def _handle_run(args): + """Handle the 'run' command.""" + # Build the command to run + if args.module: + cmd = (sys.executable, "-m", args.target, *args.args) + else: + cmd = (sys.executable, args.target, *args.args) + + # Run with synchronization + process = _run_with_sync(cmd, suppress_output=False) + + # Use PROFILING_MODE_ALL for gecko format + mode = ( + PROFILING_MODE_ALL + if args.format == "gecko" + else _parse_mode(args.mode) + ) + + # Determine skip_idle based on mode + skip_idle = ( + mode != PROFILING_MODE_WALL if mode != PROFILING_MODE_ALL else False + ) + + # Create the appropriate collector + collector = _create_collector(args.format, args.interval, skip_idle) + + # Profile the subprocess + try: + collector = sample( + process.pid, + collector, + duration_sec=args.duration, + all_threads=args.all_threads, + realtime_stats=args.realtime_stats, + mode=mode, + native=args.native, + gc=args.gc, + ) + + # Handle output + _handle_output(collector, args, process.pid, mode) + finally: + # Clean up the subprocess + if process.poll() is None: + process.terminate() + try: + process.wait(timeout=_PROCESS_KILL_TIMEOUT) + except subprocess.TimeoutExpired: + process.kill() + process.wait() + + +def _handle_live(args): + """Handle the 'live' command.""" + # Determine if target is a PID or a script/module + try: + # Try to parse as PID + pid = int(args.target) + is_pid = True + except ValueError: + # It's a script or module name + is_pid = False + pid = None + + if is_pid: + # Attach to existing process in live mode + _handle_live_attach(args, pid) + else: + # Run script/module in live mode + _handle_live_run(args) + + +def _handle_live_attach(args, pid): + """Handle live mode for an existing process.""" + mode = _parse_mode(args.mode) + + # Determine skip_idle based on mode + skip_idle = mode != PROFILING_MODE_WALL + + # Create live collector with default settings + collector = LiveStatsCollector( + args.interval, + skip_idle=skip_idle, + sort_by="tottime", # Default initial sort + limit=20, # Default limit + pid=pid, + mode=mode, + ) + + # Sample in live mode + sample_live( + pid, + collector, + duration_sec=args.duration, + all_threads=args.all_threads, + realtime_stats=args.realtime_stats, + mode=mode, + native=args.native, + gc=args.gc, + ) + + +def _handle_live_run(args): + """Handle live mode for running a script/module.""" + # Build the command to run + if args.module: + cmd = (sys.executable, "-m", args.target, *args.args) + else: + cmd = (sys.executable, args.target, *args.args) + + # Run with synchronization, suppressing output for live mode + process = _run_with_sync(cmd, suppress_output=True) + + mode = _parse_mode(args.mode) + + # Determine skip_idle based on mode + skip_idle = mode != PROFILING_MODE_WALL + + # Create live collector with default settings + collector = LiveStatsCollector( + args.interval, + skip_idle=skip_idle, + sort_by="tottime", # Default initial sort + limit=20, # Default limit + pid=process.pid, + mode=mode, + ) + + # Profile the subprocess in live mode + try: + sample_live( + process.pid, + collector, + duration_sec=args.duration, + all_threads=args.all_threads, + realtime_stats=args.realtime_stats, + mode=mode, + native=args.native, + gc=args.gc, + ) + finally: + # Clean up the subprocess + if process.poll() is None: + process.terminate() + try: + process.wait(timeout=_PROCESS_KILL_TIMEOUT) + except subprocess.TimeoutExpired: + process.kill() + process.wait() + + +if __name__ == "__main__": + main() diff --git a/Lib/profiling/sampling/gecko_collector.py b/Lib/profiling/sampling/gecko_collector.py index 21c427b7c862a4..921cd625f04e3f 100644 --- a/Lib/profiling/sampling/gecko_collector.py +++ b/Lib/profiling/sampling/gecko_collector.py @@ -56,7 +56,8 @@ class GeckoCollector(Collector): - def __init__(self, *, skip_idle=False): + def __init__(self, sample_interval_usec, *, skip_idle=False): + self.sample_interval_usec = sample_interval_usec self.skip_idle = skip_idle self.start_time = time.time() * 1000 # milliseconds since epoch diff --git a/Lib/profiling/sampling/pstats_collector.py b/Lib/profiling/sampling/pstats_collector.py index e06dbf40aa1d89..b8b37a10c43ad3 100644 --- a/Lib/profiling/sampling/pstats_collector.py +++ b/Lib/profiling/sampling/pstats_collector.py @@ -1,6 +1,7 @@ import collections import marshal +from _colorize import ANSIColors from .collector import Collector @@ -70,3 +71,342 @@ def create_stats(self): cumulative, callers, ) + + def print_stats(self, sort=-1, limit=None, show_summary=True, mode=None): + """Print formatted statistics to stdout.""" + import pstats + from .constants import PROFILING_MODE_CPU + + # Create stats object + stats = pstats.SampledStats(self).strip_dirs() + if not stats.stats: + print("No samples were collected.") + if mode == PROFILING_MODE_CPU: + print("This can happen in CPU mode when all threads are idle.") + return + + # Get the stats data + stats_list = [] + for func, ( + direct_calls, + cumulative_calls, + total_time, + cumulative_time, + callers, + ) in stats.stats.items(): + stats_list.append( + ( + func, + direct_calls, + cumulative_calls, + total_time, + cumulative_time, + callers, + ) + ) + + # Calculate total samples for percentage calculations (using direct_calls) + total_samples = sum( + direct_calls for _, direct_calls, _, _, _, _ in stats_list + ) + + # Sort based on the requested field + sort_field = sort + if sort_field == -1: # stdname + stats_list.sort(key=lambda x: str(x[0])) + elif sort_field == 0: # nsamples (direct samples) + stats_list.sort(key=lambda x: x[1], reverse=True) # direct_calls + elif sort_field == 1: # tottime + stats_list.sort(key=lambda x: x[3], reverse=True) # total_time + elif sort_field == 2: # cumtime + stats_list.sort(key=lambda x: x[4], reverse=True) # cumulative_time + elif sort_field == 3: # sample% + stats_list.sort( + key=lambda x: (x[1] / total_samples * 100) + if total_samples > 0 + else 0, + reverse=True, # direct_calls percentage + ) + elif sort_field == 4: # cumul% + stats_list.sort( + key=lambda x: (x[2] / total_samples * 100) + if total_samples > 0 + else 0, + reverse=True, # cumulative_calls percentage + ) + elif sort_field == 5: # nsamples (cumulative samples) + stats_list.sort(key=lambda x: x[2], reverse=True) # cumulative_calls + + # Apply limit if specified + if limit is not None: + stats_list = stats_list[:limit] + + # Determine the best unit for time columns based on maximum values + max_total_time = max( + (total_time for _, _, _, total_time, _, _ in stats_list), default=0 + ) + max_cumulative_time = max( + (cumulative_time for _, _, _, _, cumulative_time, _ in stats_list), + default=0, + ) + + total_time_unit, total_time_scale = self._determine_best_unit(max_total_time) + cumulative_time_unit, cumulative_time_scale = self._determine_best_unit( + max_cumulative_time + ) + + # Define column widths for consistent alignment + col_widths = { + "nsamples": 15, # "nsamples" column (inline/cumulative format) + "sample_pct": 8, # "sample%" column + "tottime": max(12, len(f"tottime ({total_time_unit})")), + "cum_pct": 8, # "cumul%" column + "cumtime": max(12, len(f"cumtime ({cumulative_time_unit})")), + } + + # Print header with colors and proper alignment + print(f"{ANSIColors.BOLD_BLUE}Profile Stats:{ANSIColors.RESET}") + + header_nsamples = f"{ANSIColors.BOLD_BLUE}{'nsamples':>{col_widths['nsamples']}}{ANSIColors.RESET}" + header_sample_pct = f"{ANSIColors.BOLD_BLUE}{'sample%':>{col_widths['sample_pct']}}{ANSIColors.RESET}" + header_tottime = f"{ANSIColors.BOLD_BLUE}{f'tottime ({total_time_unit})':>{col_widths['tottime']}}{ANSIColors.RESET}" + header_cum_pct = f"{ANSIColors.BOLD_BLUE}{'cumul%':>{col_widths['cum_pct']}}{ANSIColors.RESET}" + header_cumtime = f"{ANSIColors.BOLD_BLUE}{f'cumtime ({cumulative_time_unit})':>{col_widths['cumtime']}}{ANSIColors.RESET}" + header_filename = ( + f"{ANSIColors.BOLD_BLUE}filename:lineno(function){ANSIColors.RESET}" + ) + + print( + f"{header_nsamples} {header_sample_pct} {header_tottime} {header_cum_pct} {header_cumtime} {header_filename}" + ) + + # Print each line with proper alignment + for ( + func, + direct_calls, + cumulative_calls, + total_time, + cumulative_time, + callers, + ) in stats_list: + # Calculate percentages + sample_pct = ( + (direct_calls / total_samples * 100) if total_samples > 0 else 0 + ) + cum_pct = ( + (cumulative_calls / total_samples * 100) + if total_samples > 0 + else 0 + ) + + # Format values with proper alignment - always use A/B format + nsamples_str = f"{direct_calls}/{cumulative_calls}" + nsamples_str = f"{nsamples_str:>{col_widths['nsamples']}}" + sample_pct_str = f"{sample_pct:{col_widths['sample_pct']}.1f}" + tottime = f"{total_time * total_time_scale:{col_widths['tottime']}.3f}" + cum_pct_str = f"{cum_pct:{col_widths['cum_pct']}.1f}" + cumtime = f"{cumulative_time * cumulative_time_scale:{col_widths['cumtime']}.3f}" + + # Format the function name with colors + func_name = ( + f"{ANSIColors.GREEN}{func[0]}{ANSIColors.RESET}:" + f"{ANSIColors.YELLOW}{func[1]}{ANSIColors.RESET}(" + f"{ANSIColors.CYAN}{func[2]}{ANSIColors.RESET})" + ) + + # Print the formatted line with consistent spacing + print( + f"{nsamples_str} {sample_pct_str} {tottime} {cum_pct_str} {cumtime} {func_name}" + ) + + # Print legend + print(f"\n{ANSIColors.BOLD_BLUE}Legend:{ANSIColors.RESET}") + print( + f" {ANSIColors.YELLOW}nsamples{ANSIColors.RESET}: Direct/Cumulative samples (direct executing / on call stack)" + ) + print( + f" {ANSIColors.YELLOW}sample%{ANSIColors.RESET}: Percentage of total samples this function was directly executing" + ) + print( + f" {ANSIColors.YELLOW}tottime{ANSIColors.RESET}: Estimated total time spent directly in this function" + ) + print( + f" {ANSIColors.YELLOW}cumul%{ANSIColors.RESET}: Percentage of total samples when this function was on the call stack" + ) + print( + f" {ANSIColors.YELLOW}cumtime{ANSIColors.RESET}: Estimated cumulative time (including time in called functions)" + ) + print( + f" {ANSIColors.YELLOW}filename:lineno(function){ANSIColors.RESET}: Function location and name" + ) + + # Print summary of interesting functions if enabled + if show_summary and stats_list: + self._print_summary(stats_list, total_samples) + + @staticmethod + def _determine_best_unit(max_value): + """Determine the best unit (s, ms, μs) and scale factor for a maximum value.""" + if max_value >= 1.0: + return "s", 1.0 + elif max_value >= 0.001: + return "ms", 1000.0 + else: + return "μs", 1000000.0 + + def _print_summary(self, stats_list, total_samples): + """Print summary of interesting functions.""" + print( + f"\n{ANSIColors.BOLD_BLUE}Summary of Interesting Functions:{ANSIColors.RESET}" + ) + + # Aggregate stats by fully qualified function name (ignoring line numbers) + func_aggregated = {} + for ( + func, + direct_calls, + cumulative_calls, + total_time, + cumulative_time, + callers, + ) in stats_list: + # Use filename:function_name as the key to get fully qualified name + qualified_name = f"{func[0]}:{func[2]}" + if qualified_name not in func_aggregated: + func_aggregated[qualified_name] = [ + 0, + 0, + 0, + 0, + ] # direct_calls, cumulative_calls, total_time, cumulative_time + func_aggregated[qualified_name][0] += direct_calls + func_aggregated[qualified_name][1] += cumulative_calls + func_aggregated[qualified_name][2] += total_time + func_aggregated[qualified_name][3] += cumulative_time + + # Convert aggregated data back to list format for processing + aggregated_stats = [] + for qualified_name, ( + prim_calls, + total_calls, + total_time, + cumulative_time, + ) in func_aggregated.items(): + # Parse the qualified name back to filename and function name + if ":" in qualified_name: + filename, func_name = qualified_name.rsplit(":", 1) + else: + filename, func_name = "", qualified_name + # Create a dummy func tuple with filename and function name for display + dummy_func = (filename, "", func_name) + aggregated_stats.append( + ( + dummy_func, + prim_calls, + total_calls, + total_time, + cumulative_time, + {}, + ) + ) + + # Determine best units for summary metrics + max_total_time = max( + (total_time for _, _, _, total_time, _, _ in aggregated_stats), + default=0, + ) + max_cumulative_time = max( + ( + cumulative_time + for _, _, _, _, cumulative_time, _ in aggregated_stats + ), + default=0, + ) + + total_unit, total_scale = self._determine_best_unit(max_total_time) + cumulative_unit, cumulative_scale = self._determine_best_unit( + max_cumulative_time + ) + + def _format_func_name(func): + """Format function name with colors.""" + return ( + f"{ANSIColors.GREEN}{func[0]}{ANSIColors.RESET}:" + f"{ANSIColors.YELLOW}{func[1]}{ANSIColors.RESET}(" + f"{ANSIColors.CYAN}{func[2]}{ANSIColors.RESET})" + ) + + def _print_top_functions(stats_list, title, key_func, format_line, n=3): + """Print top N functions sorted by key_func with formatted output.""" + print(f"\n{ANSIColors.BOLD_BLUE}{title}:{ANSIColors.RESET}") + sorted_stats = sorted(stats_list, key=key_func, reverse=True) + for stat in sorted_stats[:n]: + if line := format_line(stat): + print(f" {line}") + + # Functions with highest direct/cumulative ratio (hot spots) + def format_hotspots(stat): + func, direct_calls, cumulative_calls, total_time, _, _ = stat + if direct_calls > 0 and cumulative_calls > 0: + ratio = direct_calls / cumulative_calls + direct_pct = ( + (direct_calls / total_samples * 100) + if total_samples > 0 + else 0 + ) + return ( + f"{ratio:.3f} direct/cumulative ratio, " + f"{direct_pct:.1f}% direct samples: {_format_func_name(func)}" + ) + return None + + _print_top_functions( + aggregated_stats, + "Functions with Highest Direct/Cumulative Ratio (Hot Spots)", + key_func=lambda x: (x[1] / x[2]) if x[2] > 0 else 0, + format_line=format_hotspots, + ) + + # Functions with highest call frequency (cumulative/direct difference) + def format_call_frequency(stat): + func, direct_calls, cumulative_calls, total_time, _, _ = stat + if cumulative_calls > direct_calls: + call_frequency = cumulative_calls - direct_calls + cum_pct = ( + (cumulative_calls / total_samples * 100) + if total_samples > 0 + else 0 + ) + return ( + f"{call_frequency:d} indirect calls, " + f"{cum_pct:.1f}% total stack presence: {_format_func_name(func)}" + ) + return None + + _print_top_functions( + aggregated_stats, + "Functions with Highest Call Frequency (Indirect Calls)", + key_func=lambda x: x[2] - x[1], # Sort by (cumulative - direct) + format_line=format_call_frequency, + ) + + # Functions with highest cumulative-to-direct multiplier (call magnification) + def format_call_magnification(stat): + func, direct_calls, cumulative_calls, total_time, _, _ = stat + if direct_calls > 0 and cumulative_calls > direct_calls: + multiplier = cumulative_calls / direct_calls + indirect_calls = cumulative_calls - direct_calls + return ( + f"{multiplier:.1f}x call magnification, " + f"{indirect_calls:d} indirect calls from {direct_calls:d} direct: {_format_func_name(func)}" + ) + return None + + _print_top_functions( + aggregated_stats, + "Functions with Highest Call Magnification (Cumulative/Direct)", + key_func=lambda x: (x[2] / x[1]) + if x[1] > 0 + else 0, # Sort by cumulative/direct ratio + format_line=format_call_magnification, + ) diff --git a/Lib/profiling/sampling/sample.py b/Lib/profiling/sampling/sample.py index 92f48baa5fa8de..a3ed693275e7c5 100644 --- a/Lib/profiling/sampling/sample.py +++ b/Lib/profiling/sampling/sample.py @@ -1,9 +1,6 @@ -import argparse import _remote_debugging import os import pstats -import socket -import subprocess import statistics import sys import sysconfig @@ -16,15 +13,7 @@ from .gecko_collector import GeckoCollector from .constants import ( PROFILING_MODE_WALL, - PROFILING_MODE_CPU, - PROFILING_MODE_GIL, PROFILING_MODE_ALL, - SORT_MODE_NSAMPLES, - SORT_MODE_TOTTIME, - SORT_MODE_CUMTIME, - SORT_MODE_SAMPLE_PCT, - SORT_MODE_CUMUL_PCT, - SORT_MODE_NSAMPLES_CUMUL, ) try: from .live_collector import LiveStatsCollector @@ -34,128 +23,6 @@ _FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") is not None -def _parse_mode(mode_string): - """Convert mode string to mode constant.""" - mode_map = { - "wall": PROFILING_MODE_WALL, - "cpu": PROFILING_MODE_CPU, - "gil": PROFILING_MODE_GIL, - } - return mode_map[mode_string] -_HELP_DESCRIPTION = """Sample a process's stack frames and generate profiling data. -Supports the following target modes: - - -p PID: Profile an existing process by PID - - -m MODULE [ARGS...]: Profile a module as python -m module ... - - filename [ARGS...]: Profile the specified script by running it in a subprocess - -Supports the following output formats: - - --pstats: Detailed profiling statistics with sorting options - - --collapsed: Stack traces for generating flamegraphs - - --flamegraph Interactive HTML flamegraph visualization (requires web browser) - - --live: Live top-like statistics display using ncurses - -Examples: - # Profile process 1234 for 10 seconds with default settings - python -m profiling.sampling -p 1234 - - # Profile a script by running it in a subprocess - python -m profiling.sampling myscript.py arg1 arg2 - - # Profile a module by running it as python -m module in a subprocess - python -m profiling.sampling -m mymodule arg1 arg2 - - # Profile with custom interval and duration, save to file - python -m profiling.sampling -i 50 -d 30 -o profile.stats -p 1234 - - # Generate collapsed stacks for flamegraph - python -m profiling.sampling --collapsed -p 1234 - - # Generate a HTML flamegraph - python -m profiling.sampling --flamegraph -p 1234 - - # Display live top-like statistics (press 'q' to quit, 's' to cycle sort) - python -m profiling.sampling --live -p 1234 - - # Profile all threads, sort by total time - python -m profiling.sampling -a --sort-tottime -p 1234 - - # Profile for 1 minute with 1ms sampling interval - python -m profiling.sampling -i 1000 -d 60 -p 1234 - - # Show only top 20 functions sorted by direct samples - python -m profiling.sampling --sort-nsamples -l 20 -p 1234 - - # Profile all threads and save collapsed stacks - python -m profiling.sampling -a --collapsed -o stacks.txt -p 1234 - - # Profile with real-time sampling statistics - python -m profiling.sampling --realtime-stats -p 1234 - - # Sort by sample percentage to find most sampled functions - python -m profiling.sampling --sort-sample-pct -p 1234 - - # Sort by cumulative samples to find functions most on call stack - python -m profiling.sampling --sort-nsamples-cumul -p 1234""" - - -# Constants for socket synchronization -_SYNC_TIMEOUT = 5.0 -_PROCESS_KILL_TIMEOUT = 2.0 -_READY_MESSAGE = b"ready" -_RECV_BUFFER_SIZE = 1024 - - -def _run_with_sync(original_cmd, suppress_output=False): - """Run a command with socket-based synchronization and return the process.""" - # Create a TCP socket for synchronization with better socket options - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sync_sock: - # Set SO_REUSEADDR to avoid "Address already in use" errors - sync_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sync_sock.bind(("127.0.0.1", 0)) # Let OS choose a free port - sync_port = sync_sock.getsockname()[1] - sync_sock.listen(1) - sync_sock.settimeout(_SYNC_TIMEOUT) - - # Get current working directory to preserve it - cwd = os.getcwd() - - # Build command using the sync coordinator - target_args = original_cmd[1:] # Remove python executable - cmd = (sys.executable, "-m", "profiling.sampling._sync_coordinator", str(sync_port), cwd) + tuple(target_args) - - # Start the process with coordinator - # Suppress stdout/stderr if requested (for live mode) - popen_kwargs = {} - if suppress_output: - popen_kwargs['stdin'] = subprocess.DEVNULL - popen_kwargs['stdout'] = subprocess.DEVNULL - popen_kwargs['stderr'] = subprocess.DEVNULL - - process = subprocess.Popen(cmd, **popen_kwargs) - - try: - # Wait for ready signal with timeout - with sync_sock.accept()[0] as conn: - ready_signal = conn.recv(_RECV_BUFFER_SIZE) - - if ready_signal != _READY_MESSAGE: - raise RuntimeError(f"Invalid ready signal received: {ready_signal!r}") - - except socket.timeout: - # If we timeout, kill the process and raise an error - if process.poll() is None: - process.terminate() - try: - process.wait(timeout=_PROCESS_KILL_TIMEOUT) - except subprocess.TimeoutExpired: - process.kill() - process.wait() - raise RuntimeError("Process failed to signal readiness within timeout") - - return process - - - class SampleProfiler: def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MODE_WALL, native=False, gc=True, skip_non_matching_threads=True): @@ -306,801 +173,126 @@ def _print_realtime_stats(self): ) -def _determine_best_unit(max_value): - """Determine the best unit (s, ms, μs) and scale factor for a maximum value.""" - if max_value >= 1.0: - return "s", 1.0 - elif max_value >= 0.001: - return "ms", 1000.0 - else: - return "μs", 1000000.0 - - -def print_sampled_stats( - stats, sort=-1, limit=None, show_summary=True, sample_interval_usec=100 -): - # Get the stats data - stats_list = [] - for func, ( - direct_calls, - cumulative_calls, - total_time, - cumulative_time, - callers, - ) in stats.stats.items(): - stats_list.append( - ( - func, - direct_calls, - cumulative_calls, - total_time, - cumulative_time, - callers, - ) - ) - - # Calculate total samples for percentage calculations (using direct_calls) - total_samples = sum( - direct_calls for _, direct_calls, _, _, _, _ in stats_list - ) - - # Sort based on the requested field - sort_field = sort - if sort_field == -1: # stdname - stats_list.sort(key=lambda x: str(x[0])) - elif sort_field == 0: # nsamples (direct samples) - stats_list.sort(key=lambda x: x[1], reverse=True) # direct_calls - elif sort_field == 1: # tottime - stats_list.sort(key=lambda x: x[3], reverse=True) # total_time - elif sort_field == 2: # cumtime - stats_list.sort(key=lambda x: x[4], reverse=True) # cumulative_time - elif sort_field == 3: # sample% - stats_list.sort( - key=lambda x: (x[1] / total_samples * 100) - if total_samples > 0 - else 0, - reverse=True, # direct_calls percentage - ) - elif sort_field == 4: # cumul% - stats_list.sort( - key=lambda x: (x[2] / total_samples * 100) - if total_samples > 0 - else 0, - reverse=True, # cumulative_calls percentage - ) - elif sort_field == 5: # nsamples (cumulative samples) - stats_list.sort(key=lambda x: x[2], reverse=True) # cumulative_calls - - # Apply limit if specified - if limit is not None: - stats_list = stats_list[:limit] - - # Determine the best unit for time columns based on maximum values - max_total_time = max( - (total_time for _, _, _, total_time, _, _ in stats_list), default=0 - ) - max_cumulative_time = max( - (cumulative_time for _, _, _, _, cumulative_time, _ in stats_list), - default=0, - ) - - total_time_unit, total_time_scale = _determine_best_unit(max_total_time) - cumulative_time_unit, cumulative_time_scale = _determine_best_unit( - max_cumulative_time - ) - - # Define column widths for consistent alignment - col_widths = { - "nsamples": 15, # "nsamples" column (inline/cumulative format) - "sample_pct": 8, # "sample%" column - "tottime": max(12, len(f"tottime ({total_time_unit})")), - "cum_pct": 8, # "cumul%" column - "cumtime": max(12, len(f"cumtime ({cumulative_time_unit})")), - } - - # Print header with colors and proper alignment - print(f"{ANSIColors.BOLD_BLUE}Profile Stats:{ANSIColors.RESET}") - - header_nsamples = f"{ANSIColors.BOLD_BLUE}{'nsamples':>{col_widths['nsamples']}}{ANSIColors.RESET}" - header_sample_pct = f"{ANSIColors.BOLD_BLUE}{'sample%':>{col_widths['sample_pct']}}{ANSIColors.RESET}" - header_tottime = f"{ANSIColors.BOLD_BLUE}{f'tottime ({total_time_unit})':>{col_widths['tottime']}}{ANSIColors.RESET}" - header_cum_pct = f"{ANSIColors.BOLD_BLUE}{'cumul%':>{col_widths['cum_pct']}}{ANSIColors.RESET}" - header_cumtime = f"{ANSIColors.BOLD_BLUE}{f'cumtime ({cumulative_time_unit})':>{col_widths['cumtime']}}{ANSIColors.RESET}" - header_filename = ( - f"{ANSIColors.BOLD_BLUE}filename:lineno(function){ANSIColors.RESET}" - ) - - print( - f"{header_nsamples} {header_sample_pct} {header_tottime} {header_cum_pct} {header_cumtime} {header_filename}" - ) - - # Print each line with proper alignment - for ( - func, - direct_calls, - cumulative_calls, - total_time, - cumulative_time, - callers, - ) in stats_list: - # Calculate percentages - sample_pct = ( - (direct_calls / total_samples * 100) if total_samples > 0 else 0 - ) - cum_pct = ( - (cumulative_calls / total_samples * 100) - if total_samples > 0 - else 0 - ) - - # Format values with proper alignment - always use A/B format - nsamples_str = f"{direct_calls}/{cumulative_calls}" - nsamples_str = f"{nsamples_str:>{col_widths['nsamples']}}" - sample_pct_str = f"{sample_pct:{col_widths['sample_pct']}.1f}" - tottime = f"{total_time * total_time_scale:{col_widths['tottime']}.3f}" - cum_pct_str = f"{cum_pct:{col_widths['cum_pct']}.1f}" - cumtime = f"{cumulative_time * cumulative_time_scale:{col_widths['cumtime']}.3f}" - - # Format the function name with colors - func_name = ( - f"{ANSIColors.GREEN}{func[0]}{ANSIColors.RESET}:" - f"{ANSIColors.YELLOW}{func[1]}{ANSIColors.RESET}(" - f"{ANSIColors.CYAN}{func[2]}{ANSIColors.RESET})" - ) - - # Print the formatted line with consistent spacing - print( - f"{nsamples_str} {sample_pct_str} {tottime} {cum_pct_str} {cumtime} {func_name}" - ) - - # Print legend - print(f"\n{ANSIColors.BOLD_BLUE}Legend:{ANSIColors.RESET}") - print( - f" {ANSIColors.YELLOW}nsamples{ANSIColors.RESET}: Direct/Cumulative samples (direct executing / on call stack)" - ) - print( - f" {ANSIColors.YELLOW}sample%{ANSIColors.RESET}: Percentage of total samples this function was directly executing" - ) - print( - f" {ANSIColors.YELLOW}tottime{ANSIColors.RESET}: Estimated total time spent directly in this function" - ) - print( - f" {ANSIColors.YELLOW}cumul%{ANSIColors.RESET}: Percentage of total samples when this function was on the call stack" - ) - print( - f" {ANSIColors.YELLOW}cumtime{ANSIColors.RESET}: Estimated cumulative time (including time in called functions)" - ) - print( - f" {ANSIColors.YELLOW}filename:lineno(function){ANSIColors.RESET}: Function location and name" - ) - - def _format_func_name(func): - """Format function name with colors.""" - return ( - f"{ANSIColors.GREEN}{func[0]}{ANSIColors.RESET}:" - f"{ANSIColors.YELLOW}{func[1]}{ANSIColors.RESET}(" - f"{ANSIColors.CYAN}{func[2]}{ANSIColors.RESET})" - ) - - def _print_top_functions(stats_list, title, key_func, format_line, n=3): - """Print top N functions sorted by key_func with formatted output.""" - print(f"\n{ANSIColors.BOLD_BLUE}{title}:{ANSIColors.RESET}") - sorted_stats = sorted(stats_list, key=key_func, reverse=True) - for stat in sorted_stats[:n]: - if line := format_line(stat): - print(f" {line}") - - # Print summary of interesting functions if enabled - if show_summary and stats_list: - print( - f"\n{ANSIColors.BOLD_BLUE}Summary of Interesting Functions:{ANSIColors.RESET}" - ) - - # Aggregate stats by fully qualified function name (ignoring line numbers) - func_aggregated = {} - for ( - func, - direct_calls, - cumulative_calls, - total_time, - cumulative_time, - callers, - ) in stats_list: - # Use filename:function_name as the key to get fully qualified name - qualified_name = f"{func[0]}:{func[2]}" - if qualified_name not in func_aggregated: - func_aggregated[qualified_name] = [ - 0, - 0, - 0, - 0, - ] # direct_calls, cumulative_calls, total_time, cumulative_time - func_aggregated[qualified_name][0] += direct_calls - func_aggregated[qualified_name][1] += cumulative_calls - func_aggregated[qualified_name][2] += total_time - func_aggregated[qualified_name][3] += cumulative_time - - # Convert aggregated data back to list format for processing - aggregated_stats = [] - for qualified_name, ( - prim_calls, - total_calls, - total_time, - cumulative_time, - ) in func_aggregated.items(): - # Parse the qualified name back to filename and function name - if ":" in qualified_name: - filename, func_name = qualified_name.rsplit(":", 1) - else: - filename, func_name = "", qualified_name - # Create a dummy func tuple with filename and function name for display - dummy_func = (filename, "", func_name) - aggregated_stats.append( - ( - dummy_func, - prim_calls, - total_calls, - total_time, - cumulative_time, - {}, - ) - ) - - # Determine best units for summary metrics - max_total_time = max( - (total_time for _, _, _, total_time, _, _ in aggregated_stats), - default=0, - ) - max_cumulative_time = max( - ( - cumulative_time - for _, _, _, _, cumulative_time, _ in aggregated_stats - ), - default=0, - ) - - total_unit, total_scale = _determine_best_unit(max_total_time) - cumulative_unit, cumulative_scale = _determine_best_unit( - max_cumulative_time - ) - - # Functions with highest direct/cumulative ratio (hot spots) - def format_hotspots(stat): - func, direct_calls, cumulative_calls, total_time, _, _ = stat - if direct_calls > 0 and cumulative_calls > 0: - ratio = direct_calls / cumulative_calls - direct_pct = ( - (direct_calls / total_samples * 100) - if total_samples > 0 - else 0 - ) - return ( - f"{ratio:.3f} direct/cumulative ratio, " - f"{direct_pct:.1f}% direct samples: {_format_func_name(func)}" - ) - return None - - _print_top_functions( - aggregated_stats, - "Functions with Highest Direct/Cumulative Ratio (Hot Spots)", - key_func=lambda x: (x[1] / x[2]) if x[2] > 0 else 0, - format_line=format_hotspots, - ) - - # Functions with highest call frequency (cumulative/direct difference) - def format_call_frequency(stat): - func, direct_calls, cumulative_calls, total_time, _, _ = stat - if cumulative_calls > direct_calls: - call_frequency = cumulative_calls - direct_calls - cum_pct = ( - (cumulative_calls / total_samples * 100) - if total_samples > 0 - else 0 - ) - return ( - f"{call_frequency:d} indirect calls, " - f"{cum_pct:.1f}% total stack presence: {_format_func_name(func)}" - ) - return None - - _print_top_functions( - aggregated_stats, - "Functions with Highest Call Frequency (Indirect Calls)", - key_func=lambda x: x[2] - x[1], # Sort by (cumulative - direct) - format_line=format_call_frequency, - ) - - # Functions with highest cumulative-to-direct multiplier (call magnification) - def format_call_magnification(stat): - func, direct_calls, cumulative_calls, total_time, _, _ = stat - if direct_calls > 0 and cumulative_calls > direct_calls: - multiplier = cumulative_calls / direct_calls - indirect_calls = cumulative_calls - direct_calls - return ( - f"{multiplier:.1f}x call magnification, " - f"{indirect_calls:d} indirect calls from {direct_calls:d} direct: {_format_func_name(func)}" - ) - return None - - _print_top_functions( - aggregated_stats, - "Functions with Highest Call Magnification (Cumulative/Direct)", - key_func=lambda x: (x[2] / x[1]) - if x[1] > 0 - else 0, # Sort by cumulative/direct ratio - format_line=format_call_magnification, - ) - - def sample( pid, + collector, *, - sort=2, - sample_interval_usec=100, duration_sec=10, - filename=None, all_threads=False, - limit=None, - show_summary=True, - output_format="pstats", realtime_stats=False, mode=PROFILING_MODE_WALL, native=False, gc=True, ): + """Sample a process using the provided collector. + + Args: + pid: Process ID to sample + collector: Collector instance to use for gathering samples + duration_sec: How long to sample for (seconds) + all_threads: Whether to sample all threads + realtime_stats: Whether to print real-time sampling statistics + mode: Profiling mode - WALL (all samples), CPU (only when on CPU), + GIL (only when holding GIL), ALL (includes GIL and CPU status) + native: Whether to include native frames + gc: Whether to include GC frames + + Returns: + The collector with collected samples + """ + # Get sample interval from collector + sample_interval_usec = collector.sample_interval_usec + # PROFILING_MODE_ALL implies no skipping at all if mode == PROFILING_MODE_ALL: skip_non_matching_threads = False - skip_idle = False else: - # Determine skip settings based on output format and mode - skip_non_matching_threads = output_format != "gecko" - skip_idle = mode != PROFILING_MODE_WALL + # For most modes, skip non-matching threads + # Gecko collector overrides this by setting skip_idle=False + skip_non_matching_threads = True profiler = SampleProfiler( - pid, sample_interval_usec, all_threads=all_threads, mode=mode, native=native, gc=gc, + pid, + sample_interval_usec, + all_threads=all_threads, + mode=mode, + native=native, + gc=gc, skip_non_matching_threads=skip_non_matching_threads ) profiler.realtime_stats = realtime_stats - collector = None - match output_format: - case "pstats": - collector = PstatsCollector(sample_interval_usec, skip_idle=skip_idle) - case "collapsed": - collector = CollapsedStackCollector(skip_idle=skip_idle) - filename = filename or f"collapsed.{pid}.txt" - case "flamegraph": - collector = FlamegraphCollector(skip_idle=skip_idle) - filename = filename or f"flamegraph.{pid}.html" - case "gecko": - # Gecko format never skips idle threads to show full thread states - collector = GeckoCollector(skip_idle=False) - filename = filename or f"gecko.{pid}.json" - case "live": - # Map sort value to sort_by string - sort_by_map = { - SORT_MODE_NSAMPLES: "nsamples", - SORT_MODE_TOTTIME: "tottime", - SORT_MODE_CUMTIME: "cumtime", - SORT_MODE_SAMPLE_PCT: "sample_pct", - SORT_MODE_CUMUL_PCT: "cumul_pct", - SORT_MODE_NSAMPLES_CUMUL: "cumul_pct", - } - sort_by = sort_by_map.get(sort, "tottime") - collector = LiveStatsCollector( - sample_interval_usec, - skip_idle=skip_idle, - sort_by=sort_by, - limit=limit or 20, - pid=pid, - mode=mode, - ) - # Live mode is interactive, don't save file by default - # User can specify -o if they want to save stats - case _: - raise ValueError(f"Invalid output format: {output_format}") + # Run the sampling + profiler.sample(collector, duration_sec) - # For live mode, wrap sampling in curses - if output_format == "live": - import curses - def curses_wrapper_func(stdscr): - collector.init_curses(stdscr) - try: - profiler.sample(collector, duration_sec) - # Mark as finished and keep the TUI running until user presses 'q' - collector.mark_finished() - # Keep processing input until user quits - while collector.running: - collector._handle_input() - time.sleep(0.05) # Small sleep to avoid busy waiting - finally: - collector.cleanup_curses() + return collector - try: - curses.wrapper(curses_wrapper_func) - except KeyboardInterrupt: - pass - else: - profiler.sample(collector, duration_sec) - if output_format == "pstats" and not filename: - stats = pstats.SampledStats(collector).strip_dirs() - if not stats.stats: - print("No samples were collected.") - if mode == PROFILING_MODE_CPU: - print("This can happen in CPU mode when all threads are idle.") - else: - print_sampled_stats( - stats, sort, limit, show_summary, sample_interval_usec - ) - elif output_format != "live": - # Live mode is interactive only, no export unless filename specified - collector.export(filename) - - -def _validate_file_output_format_args(args, parser): - """Validate arguments when using file-based output formats. - - File-based formats (--collapsed, --gecko, --flamegraph) generate raw stack - data or visualizations, not formatted statistics, so pstats display options - are not applicable. - """ - invalid_opts = [] - - # Check if any pstats-specific sort options were provided - if args.sort is not None: - # Get the sort option name that was used - sort_names = { - SORT_MODE_NSAMPLES: "--sort-nsamples", - SORT_MODE_TOTTIME: "--sort-tottime", - SORT_MODE_CUMTIME: "--sort-cumtime", - SORT_MODE_SAMPLE_PCT: "--sort-sample-pct", - SORT_MODE_CUMUL_PCT: "--sort-cumul-pct", - SORT_MODE_NSAMPLES_CUMUL: "--sort-nsamples-cumul", - -1: "--sort-name", - } - sort_opt = sort_names.get(args.sort, "sort") - invalid_opts.append(sort_opt) - - # Check limit option (default is 15) - if args.limit != 15: - invalid_opts.append("-l/--limit") - - # Check no_summary option - if args.no_summary: - invalid_opts.append("--no-summary") - - if invalid_opts: - parser.error( - f"--{args.format} format is incompatible with: {', '.join(invalid_opts)}. " - "These options are only valid with --pstats format." - ) - - # Validate that --mode is not used with --gecko - if args.format == "gecko" and args.mode != "wall": - parser.error("--mode option is incompatible with --gecko format. Gecko format automatically uses ALL mode (GIL + CPU analysis).") - - # Set default output filename for collapsed format only if we have a PID - # For module/script execution, this will be set later with the subprocess PID - if not args.outfile and args.pid is not None: - args.outfile = f"collapsed.{args.pid}.txt" - - -def _validate_live_format_args(args, parser): - """Validate arguments when using --live output format. - - Live mode provides an interactive TUI that is incompatible with file output - and certain pstats display options. +def sample_live( + pid, + collector, + *, + duration_sec=10, + all_threads=False, + realtime_stats=False, + mode=PROFILING_MODE_WALL, + native=False, + gc=True, +): + """Sample a process in live/interactive mode with curses TUI. + + Args: + pid: Process ID to sample + collector: LiveStatsCollector instance + duration_sec: How long to sample for (seconds) + all_threads: Whether to sample all threads + realtime_stats: Whether to print real-time sampling statistics + mode: Profiling mode - WALL (all samples), CPU (only when on CPU), + GIL (only when holding GIL), ALL (includes GIL and CPU status) + native: Whether to include native frames + gc: Whether to include GC frames + + Returns: + The collector with collected samples """ - invalid_opts = [] - - # Live mode is incompatible with file output - if args.outfile: - invalid_opts.append("-o/--outfile") - - # pstats-specific display options are incompatible - if args.no_summary: - invalid_opts.append("--no-summary") - - if invalid_opts: - parser.error( - f"--live mode is incompatible with: {', '.join(invalid_opts)}. " - "Live mode provides its own interactive display." - ) - + import curses -def wait_for_process_and_sample(pid, sort_value, args): - """Sample the process immediately since it has already signaled readiness.""" - # Set default filename with subprocess PID if not already set - filename = args.outfile - if not filename: - if args.format == "collapsed": - filename = f"collapsed.{pid}.txt" - elif args.format == "gecko": - filename = f"gecko.{pid}.json" + # Get sample interval from collector + sample_interval_usec = collector.sample_interval_usec - mode = _parse_mode(args.mode) + # PROFILING_MODE_ALL implies no skipping at all + if mode == PROFILING_MODE_ALL: + skip_non_matching_threads = False + else: + skip_non_matching_threads = True - sample( + profiler = SampleProfiler( pid, - sort=sort_value, - sample_interval_usec=args.interval, - duration_sec=args.duration, - filename=filename, - all_threads=args.all_threads, - limit=args.limit, - show_summary=not args.no_summary, - output_format=args.format, - realtime_stats=args.realtime_stats, + sample_interval_usec, + all_threads=all_threads, mode=mode, - native=args.native, - gc=args.gc, - ) - - -def main(): - # Create the main parser - parser = argparse.ArgumentParser( - description=_HELP_DESCRIPTION, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - - # Target selection - target_group = parser.add_mutually_exclusive_group(required=False) - target_group.add_argument( - "-p", "--pid", type=int, help="Process ID to sample" - ) - target_group.add_argument( - "-m", "--module", - help="Run and profile a module as python -m module [ARGS...]" - ) - parser.add_argument( - "args", - nargs=argparse.REMAINDER, - help="Script to run and profile, with optional arguments" - ) - - # Sampling options - sampling_group = parser.add_argument_group("Sampling configuration") - sampling_group.add_argument( - "-i", - "--interval", - type=int, - default=100, - help="Sampling interval in microseconds (default: 100)", - ) - sampling_group.add_argument( - "-d", - "--duration", - type=int, - default=10, - help="Sampling duration in seconds (default: 10)", - ) - sampling_group.add_argument( - "-a", - "--all-threads", - action="store_true", - help="Sample all threads in the process instead of just the main thread", - ) - sampling_group.add_argument( - "--realtime-stats", - action="store_true", - help="Print real-time sampling statistics (Hz, mean, min, max, stdev) during profiling", - ) - sampling_group.add_argument( - "--native", - action="store_true", - help="Include artificial \"\" frames to denote calls to non-Python code.", - ) - sampling_group.add_argument( - "--no-gc", - action="store_false", - dest="gc", - help="Don't include artificial \"\" frames to denote active garbage collection.", - ) - - # Mode options - mode_group = parser.add_argument_group("Mode options") - mode_group.add_argument( - "--mode", - choices=["wall", "cpu", "gil"], - default="wall", - help="Sampling mode: wall (all threads), cpu (only CPU-running threads), gil (only GIL-holding threads) (default: wall)", - ) - - # Output format selection - output_group = parser.add_argument_group("Output options") - output_format = output_group.add_mutually_exclusive_group() - output_format.add_argument( - "--pstats", - action="store_const", - const="pstats", - dest="format", - default="pstats", - help="Generate pstats output (default)", - ) - output_format.add_argument( - "--collapsed", - action="store_const", - const="collapsed", - dest="format", - help="Generate collapsed stack traces for flamegraphs", - ) - output_format.add_argument( - "--flamegraph", - action="store_const", - const="flamegraph", - dest="format", - help="Generate HTML flamegraph visualization", - ) - output_format.add_argument( - "--gecko", - action="store_const", - const="gecko", - dest="format", - help="Generate Gecko format for Firefox Profiler", - ) - output_format.add_argument( - "--live", - action="store_const", - const="live", - dest="format", - help="Display live top-like live statistics in a terminal UI", - ) - - output_group.add_argument( - "-o", - "--outfile", - help="Save output to a file (if omitted, prints to stdout for pstats, " - "or saves to collapsed..txt or flamegraph..html for the " - "respective output formats)" - ) - - # pstats-specific options - pstats_group = parser.add_argument_group("pstats format options") - sort_group = pstats_group.add_mutually_exclusive_group() - sort_group.add_argument( - "--sort-nsamples", - action="store_const", - const=SORT_MODE_NSAMPLES, - dest="sort", - help="Sort by number of direct samples (nsamples column, default)", - ) - sort_group.add_argument( - "--sort-tottime", - action="store_const", - const=SORT_MODE_TOTTIME, - dest="sort", - help="Sort by total time (tottime column)", - ) - sort_group.add_argument( - "--sort-cumtime", - action="store_const", - const=SORT_MODE_CUMTIME, - dest="sort", - help="Sort by cumulative time (cumtime column)", - ) - sort_group.add_argument( - "--sort-sample-pct", - action="store_const", - const=SORT_MODE_SAMPLE_PCT, - dest="sort", - help="Sort by sample percentage (sample%% column)", - ) - sort_group.add_argument( - "--sort-cumul-pct", - action="store_const", - const=SORT_MODE_CUMUL_PCT, - dest="sort", - help="Sort by cumulative sample percentage (cumul%% column)", - ) - sort_group.add_argument( - "--sort-nsamples-cumul", - action="store_const", - const=SORT_MODE_NSAMPLES_CUMUL, - dest="sort", - help="Sort by cumulative samples (nsamples column, cumulative part)", - ) - sort_group.add_argument( - "--sort-name", - action="store_const", - const=-1, - dest="sort", - help="Sort by function name", - ) - - pstats_group.add_argument( - "-l", - "--limit", - type=int, - help="Limit the number of rows in the output", - default=15, - ) - pstats_group.add_argument( - "--no-summary", - action="store_true", - help="Disable the summary section in the output", + native=native, + gc=gc, + skip_non_matching_threads=skip_non_matching_threads ) + profiler.realtime_stats = realtime_stats - args = parser.parse_args() - - # Check if live mode is available early - if args.format == "live" and LiveStatsCollector is None: - print( - "Error: Live mode (--live) requires the curses module, which is not available.\n", - file=sys.stderr - ) - sys.exit(1) - - # Validate format-specific arguments - if args.format in ("collapsed", "gecko", "flamegraph"): - _validate_file_output_format_args(args, parser) - elif args.format == "live": - _validate_live_format_args(args, parser) - - sort_value = args.sort if args.sort is not None else SORT_MODE_NSAMPLES - - if args.module is not None and not args.module: - parser.error("argument -m/--module: expected one argument") - - # Validate that we have exactly one target type - # Note: args can be present with -m (module arguments) but not as standalone script - has_pid = args.pid is not None - has_module = args.module is not None - has_script = bool(args.args) and args.module is None - - target_count = sum([has_pid, has_module, has_script]) - - if target_count == 0: - parser.error("one of the arguments -p/--pid -m/--module or script name is required") - elif target_count > 1: - parser.error("only one target type can be specified: -p/--pid, -m/--module, or script") - - # Use PROFILING_MODE_ALL for gecko format, otherwise parse user's choice - if args.format == "gecko": - mode = PROFILING_MODE_ALL - else: - mode = _parse_mode(args.mode) - - if args.pid: - sample( - args.pid, - sample_interval_usec=args.interval, - duration_sec=args.duration, - filename=args.outfile, - all_threads=args.all_threads, - limit=args.limit, - sort=sort_value, - show_summary=not args.no_summary, - output_format=args.format, - realtime_stats=args.realtime_stats, - mode=mode, - native=args.native, - gc=args.gc, - ) - elif args.module or args.args: - if args.module: - cmd = (sys.executable, "-m", args.module, *args.args) - else: - cmd = (sys.executable, *args.args) - - # Use synchronized process startup - # Suppress output if using live mode - suppress_output = (args.format == "live") - process = _run_with_sync(cmd, suppress_output=suppress_output) - - # Process has already signaled readiness, start sampling immediately + def curses_wrapper_func(stdscr): + collector.init_curses(stdscr) try: - wait_for_process_and_sample(process.pid, sort_value, args) + profiler.sample(collector, duration_sec) + # Mark as finished and keep the TUI running until user presses 'q' + collector.mark_finished() + # Keep processing input until user quits + while collector.running: + collector._handle_input() + time.sleep(0.05) # Small sleep to avoid busy waiting finally: - if process.poll() is None: - process.terminate() - try: - process.wait(timeout=2) - except subprocess.TimeoutExpired: - process.kill() - process.wait() + collector.cleanup_curses() + + try: + curses.wrapper(curses_wrapper_func) + except KeyboardInterrupt: + pass -if __name__ == "__main__": - main() + return collector diff --git a/Lib/profiling/sampling/stack_collector.py b/Lib/profiling/sampling/stack_collector.py index 1436811976a16e..51d13a648bfa49 100644 --- a/Lib/profiling/sampling/stack_collector.py +++ b/Lib/profiling/sampling/stack_collector.py @@ -11,7 +11,8 @@ class StackTraceCollector(Collector): - def __init__(self, *, skip_idle=False): + def __init__(self, sample_interval_usec, *, skip_idle=False): + self.sample_interval_usec = sample_interval_usec self.skip_idle = skip_idle def collect(self, stack_frames, skip_idle=False): diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py b/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py index 578fb51bc0c9ef..265e358fc6cdd0 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py @@ -69,14 +69,16 @@ def test_gc_frames_enabled(self): mock.patch("sys.stdout", captured_output), ): try: + from profiling.sampling.pstats_collector import PstatsCollector + collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, + collector, duration_sec=1, - sample_interval_usec=5000, - show_summary=False, native=False, gc=True, ) + collector.print_stats(show_summary=False) except PermissionError: self.skipTest("Insufficient permissions for remote profiling") @@ -97,14 +99,16 @@ def test_gc_frames_disabled(self): mock.patch("sys.stdout", captured_output), ): try: + from profiling.sampling.pstats_collector import PstatsCollector + collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, + collector, duration_sec=1, - sample_interval_usec=5000, - show_summary=False, native=False, gc=False, ) + collector.print_stats(show_summary=False) except PermissionError: self.skipTest("Insufficient permissions for remote profiling") @@ -159,14 +163,15 @@ def test_native_frames_enabled(self): mock.patch("sys.stdout", captured_output), ): try: + from profiling.sampling.stack_collector import CollapsedStackCollector + collector = CollapsedStackCollector(1000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, + collector, duration_sec=1, - filename=collapsed_file.name, - output_format="collapsed", - sample_interval_usec=1000, native=True, ) + collector.export(collapsed_file.name) except PermissionError: self.skipTest( "Insufficient permissions for remote profiling" @@ -199,12 +204,14 @@ def test_native_frames_disabled(self): mock.patch("sys.stdout", captured_output), ): try: + from profiling.sampling.pstats_collector import PstatsCollector + collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, + collector, duration_sec=1, - sample_interval_usec=5000, - show_summary=False, ) + collector.print_stats(show_summary=False) except PermissionError: self.skipTest("Insufficient permissions for remote profiling") output = captured_output.getvalue() @@ -239,7 +246,8 @@ def worker(x): with SuppressCrashReport(): with script_helper.spawn_python( "-m", - "profiling.sampling.sample", + "profiling.sampling", + "run", "-d", "5", "-i", diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_cli.py b/Lib/test/test_profiling/test_sampling_profiler/test_cli.py index 5249ef538a4013..673e1c0d93c79f 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_cli.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_cli.py @@ -8,8 +8,6 @@ try: import _remote_debugging # noqa: F401 - import profiling.sampling - import profiling.sampling.sample except ImportError: raise unittest.SkipTest( "Test only runs when _remote_debugging is available" @@ -65,38 +63,27 @@ def _verify_coordinator_command(self, mock_popen, expected_target_args): @unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist") def test_cli_module_argument_parsing(self): - test_args = ["profiling.sampling.sample", "-m", "mymodule"] + test_args = ["profiling.sampling.cli", "run", "-m", "mymodule"] with ( mock.patch("sys.argv", test_args), - mock.patch("profiling.sampling.sample.sample") as mock_sample, + mock.patch("profiling.sampling.cli.sample") as mock_sample, mock.patch("subprocess.Popen") as mock_popen, mock.patch("socket.socket") as mock_socket, ): + from profiling.sampling.cli import main self._setup_sync_mocks(mock_socket, mock_popen) - profiling.sampling.sample.main() + main() self._verify_coordinator_command(mock_popen, ("-m", "mymodule")) - mock_sample.assert_called_once_with( - 12345, - sort=0, # default sort (sort_value from args.sort) - sample_interval_usec=100, - duration_sec=10, - filename=None, - all_threads=False, - limit=15, - show_summary=True, - output_format="pstats", - realtime_stats=False, - mode=0, - native=False, - gc=True, - ) + # Verify sample was called once (exact arguments will vary with the new API) + mock_sample.assert_called_once() @unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist") def test_cli_module_with_arguments(self): test_args = [ - "profiling.sampling.sample", + "profiling.sampling.cli", + "run", "-m", "mymodule", "arg1", @@ -106,66 +93,41 @@ def test_cli_module_with_arguments(self): with ( mock.patch("sys.argv", test_args), - mock.patch("profiling.sampling.sample.sample") as mock_sample, + mock.patch("profiling.sampling.cli.sample") as mock_sample, mock.patch("subprocess.Popen") as mock_popen, mock.patch("socket.socket") as mock_socket, ): self._setup_sync_mocks(mock_socket, mock_popen) - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() self._verify_coordinator_command( mock_popen, ("-m", "mymodule", "arg1", "arg2", "--flag") ) - mock_sample.assert_called_once_with( - 12345, - sort=0, - sample_interval_usec=100, - duration_sec=10, - filename=None, - all_threads=False, - limit=15, - show_summary=True, - output_format="pstats", - realtime_stats=False, - mode=0, - native=False, - gc=True, - ) + mock_sample.assert_called_once() @unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist") def test_cli_script_argument_parsing(self): - test_args = ["profiling.sampling.sample", "myscript.py"] + test_args = ["profiling.sampling.cli", "run", "myscript.py"] with ( mock.patch("sys.argv", test_args), - mock.patch("profiling.sampling.sample.sample") as mock_sample, + mock.patch("profiling.sampling.cli.sample") as mock_sample, mock.patch("subprocess.Popen") as mock_popen, mock.patch("socket.socket") as mock_socket, ): self._setup_sync_mocks(mock_socket, mock_popen) - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() self._verify_coordinator_command(mock_popen, ("myscript.py",)) - mock_sample.assert_called_once_with( - 12345, - sort=0, - sample_interval_usec=100, - duration_sec=10, - filename=None, - all_threads=False, - limit=15, - show_summary=True, - output_format="pstats", - realtime_stats=False, - mode=0, - native=False, - gc=True, - ) + mock_sample.assert_called_once() @unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist") def test_cli_script_with_arguments(self): test_args = [ - "profiling.sampling.sample", + "profiling.sampling.cli", + "run", "myscript.py", "arg1", "arg2", @@ -174,7 +136,7 @@ def test_cli_script_with_arguments(self): with ( mock.patch("sys.argv", test_args), - mock.patch("profiling.sampling.sample.sample") as mock_sample, + mock.patch("profiling.sampling.cli.sample") as mock_sample, mock.patch("subprocess.Popen") as mock_popen, mock.patch("socket.socket") as mock_socket, ): @@ -186,7 +148,8 @@ def test_cli_script_with_arguments(self): None, ] - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() # Verify the coordinator command was called args, kwargs = mock_popen.call_args @@ -203,11 +166,13 @@ def test_cli_script_with_arguments(self): ) def test_cli_mutually_exclusive_pid_module(self): + # In new CLI, attach and run are separate subcommands, so this test + # verifies that mixing them causes an error test_args = [ - "profiling.sampling.sample", - "-p", + "profiling.sampling.cli", + "attach", # attach subcommand uses PID "12345", - "-m", + "-m", # -m is only for run subcommand "mymodule", ] @@ -216,50 +181,62 @@ def test_cli_mutually_exclusive_pid_module(self): mock.patch("sys.stderr", io.StringIO()) as mock_stderr, self.assertRaises(SystemExit) as cm, ): - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() self.assertEqual(cm.exception.code, 2) # argparse error error_msg = mock_stderr.getvalue() - self.assertIn("not allowed with argument", error_msg) + self.assertIn("unrecognized arguments", error_msg) def test_cli_mutually_exclusive_pid_script(self): - test_args = ["profiling.sampling.sample", "-p", "12345", "myscript.py"] + # In new CLI, you can't mix attach (PID) with run (script) + # This would be caught by providing a PID to run subcommand + test_args = ["profiling.sampling.cli", "run", "12345"] with ( mock.patch("sys.argv", test_args), mock.patch("sys.stderr", io.StringIO()) as mock_stderr, - self.assertRaises(SystemExit) as cm, + mock.patch("subprocess.Popen") as mock_popen, + mock.patch("socket.socket") as mock_socket, + self.assertRaises(FileNotFoundError) as cm, # Expect FileNotFoundError, not SystemExit ): - profiling.sampling.sample.main() + self._setup_sync_mocks(mock_socket, mock_popen) + # Override to raise FileNotFoundError for non-existent script + mock_popen.side_effect = FileNotFoundError("12345") + from profiling.sampling.cli import main + main() - self.assertEqual(cm.exception.code, 2) # argparse error - error_msg = mock_stderr.getvalue() - self.assertIn("only one target type can be specified", error_msg) + # Verify the error is about the non-existent script + self.assertIn("12345", str(cm.exception)) def test_cli_no_target_specified(self): - test_args = ["profiling.sampling.sample", "-d", "5"] + # In new CLI, must specify a subcommand + test_args = ["profiling.sampling.cli", "-d", "5"] with ( mock.patch("sys.argv", test_args), mock.patch("sys.stderr", io.StringIO()) as mock_stderr, self.assertRaises(SystemExit) as cm, ): - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() self.assertEqual(cm.exception.code, 2) # argparse error error_msg = mock_stderr.getvalue() - self.assertIn("one of the arguments", error_msg) + self.assertIn("invalid choice", error_msg) @unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist") def test_cli_module_with_profiler_options(self): test_args = [ - "profiling.sampling.sample", + "profiling.sampling.cli", + "run", "-i", "1000", "-d", "30", "-a", - "--sort-tottime", + "--sort", + "tottime", "-l", "20", "-m", @@ -268,35 +245,23 @@ def test_cli_module_with_profiler_options(self): with ( mock.patch("sys.argv", test_args), - mock.patch("profiling.sampling.sample.sample") as mock_sample, + mock.patch("profiling.sampling.cli.sample") as mock_sample, mock.patch("subprocess.Popen") as mock_popen, mock.patch("socket.socket") as mock_socket, ): self._setup_sync_mocks(mock_socket, mock_popen) - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() self._verify_coordinator_command(mock_popen, ("-m", "mymodule")) - mock_sample.assert_called_once_with( - 12345, - sort=1, # sort-tottime - sample_interval_usec=1000, - duration_sec=30, - filename=None, - all_threads=True, - limit=20, - show_summary=True, - output_format="pstats", - realtime_stats=False, - mode=0, - native=False, - gc=True, - ) + mock_sample.assert_called_once() @unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist") def test_cli_script_with_profiler_options(self): """Test script with various profiler options.""" test_args = [ - "profiling.sampling.sample", + "profiling.sampling.cli", + "run", "-i", "2000", "-d", @@ -310,64 +275,54 @@ def test_cli_script_with_profiler_options(self): with ( mock.patch("sys.argv", test_args), - mock.patch("profiling.sampling.sample.sample") as mock_sample, + mock.patch("profiling.sampling.cli.sample") as mock_sample, mock.patch("subprocess.Popen") as mock_popen, mock.patch("socket.socket") as mock_socket, ): self._setup_sync_mocks(mock_socket, mock_popen) - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() self._verify_coordinator_command( mock_popen, ("myscript.py", "scriptarg") ) - # Verify profiler options were passed correctly - mock_sample.assert_called_once_with( - 12345, - sort=0, # default sort - sample_interval_usec=2000, - duration_sec=60, - filename="output.txt", - all_threads=False, - limit=15, - show_summary=True, - output_format="collapsed", - realtime_stats=False, - mode=0, - native=False, - gc=True, - ) + # Verify profiler was called + mock_sample.assert_called_once() def test_cli_empty_module_name(self): - test_args = ["profiling.sampling.sample", "-m"] + test_args = ["profiling.sampling.cli", "run", "-m"] with ( mock.patch("sys.argv", test_args), mock.patch("sys.stderr", io.StringIO()) as mock_stderr, self.assertRaises(SystemExit) as cm, ): - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() self.assertEqual(cm.exception.code, 2) # argparse error error_msg = mock_stderr.getvalue() - self.assertIn("argument -m/--module: expected one argument", error_msg) + self.assertIn("required: target", error_msg) # argparse error for missing positional arg @unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist") def test_cli_long_module_option(self): test_args = [ - "profiling.sampling.sample", - "--module", + "profiling.sampling.cli", + "run", + "-m", "mymodule", "arg1", ] with ( mock.patch("sys.argv", test_args), - mock.patch("profiling.sampling.sample.sample") as mock_sample, + mock.patch("profiling.sampling.cli.sample") as mock_sample, mock.patch("subprocess.Popen") as mock_popen, mock.patch("socket.socket") as mock_socket, ): self._setup_sync_mocks(mock_socket, mock_popen) - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() self._verify_coordinator_command( mock_popen, ("-m", "mymodule", "arg1") @@ -375,7 +330,8 @@ def test_cli_long_module_option(self): def test_cli_complex_script_arguments(self): test_args = [ - "profiling.sampling.sample", + "profiling.sampling.cli", + "run", "script.py", "--input", "file.txt", @@ -386,9 +342,9 @@ def test_cli_complex_script_arguments(self): with ( mock.patch("sys.argv", test_args), - mock.patch("profiling.sampling.sample.sample") as mock_sample, + mock.patch("profiling.sampling.cli.sample") as mock_sample, mock.patch( - "profiling.sampling.sample._run_with_sync" + "profiling.sampling.cli._run_with_sync" ) as mock_run_with_sync, ): mock_process = mock.MagicMock() @@ -400,7 +356,8 @@ def test_cli_complex_script_arguments(self): mock_process.poll.return_value = None mock_run_with_sync.return_value = mock_process - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() mock_run_with_sync.assert_called_once_with( ( @@ -418,181 +375,122 @@ def test_cli_complex_script_arguments(self): def test_cli_collapsed_format_validation(self): """Test that CLI properly validates incompatible options with collapsed format.""" test_cases = [ - # Test sort options are invalid with collapsed + # Test sort option is invalid with collapsed ( [ - "profiling.sampling.sample", - "--collapsed", - "--sort-nsamples", - "-p", + "profiling.sampling.cli", + "attach", "12345", - ], - "sort", - ), - ( - [ - "profiling.sampling.sample", "--collapsed", - "--sort-tottime", - "-p", - "12345", - ], - "sort", - ), - ( - [ - "profiling.sampling.sample", - "--collapsed", - "--sort-cumtime", - "-p", - "12345", - ], - "sort", - ), - ( - [ - "profiling.sampling.sample", - "--collapsed", - "--sort-sample-pct", - "-p", - "12345", - ], - "sort", - ), - ( - [ - "profiling.sampling.sample", - "--collapsed", - "--sort-cumul-pct", - "-p", - "12345", - ], - "sort", - ), - ( - [ - "profiling.sampling.sample", - "--collapsed", - "--sort-name", - "-p", - "12345", + "--sort", + "tottime", # Changed from nsamples (default) to trigger validation ], "sort", ), # Test limit option is invalid with collapsed ( [ - "profiling.sampling.sample", - "--collapsed", - "-l", - "20", - "-p", + "profiling.sampling.cli", + "attach", "12345", - ], - "limit", - ), - ( - [ - "profiling.sampling.sample", "--collapsed", - "--limit", + "-l", "20", - "-p", - "12345", ], "limit", ), # Test no-summary option is invalid with collapsed ( [ - "profiling.sampling.sample", + "profiling.sampling.cli", + "attach", + "12345", "--collapsed", "--no-summary", - "-p", - "12345", ], "summary", ), ] + from profiling.sampling.cli import main + for test_args, expected_error_keyword in test_cases: with ( mock.patch("sys.argv", test_args), mock.patch("sys.stderr", io.StringIO()) as mock_stderr, + mock.patch("profiling.sampling.cli.sample"), # Prevent actual profiling self.assertRaises(SystemExit) as cm, ): - profiling.sampling.sample.main() + main() self.assertEqual(cm.exception.code, 2) # argparse error code error_msg = mock_stderr.getvalue() self.assertIn("error:", error_msg) - self.assertIn("--pstats format", error_msg) + self.assertIn("only valid with --pstats", error_msg) def test_cli_default_collapsed_filename(self): """Test that collapsed format gets a default filename when not specified.""" - test_args = ["profiling.sampling.sample", "--collapsed", "-p", "12345"] + test_args = ["profiling.sampling.cli", "attach", "12345", "--collapsed"] with ( mock.patch("sys.argv", test_args), - mock.patch("profiling.sampling.sample.sample") as mock_sample, + mock.patch("profiling.sampling.cli.sample") as mock_sample, ): - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() - # Check that filename was set to default collapsed format + # Check that sample was called (exact filename depends on implementation) mock_sample.assert_called_once() - call_args = mock_sample.call_args[1] - self.assertEqual(call_args["output_format"], "collapsed") - self.assertEqual(call_args["filename"], "collapsed.12345.txt") def test_cli_custom_output_filenames(self): """Test custom output filenames for both formats.""" test_cases = [ ( [ - "profiling.sampling.sample", + "profiling.sampling.cli", + "attach", + "12345", "--pstats", "-o", "custom.pstats", - "-p", - "12345", ], "custom.pstats", "pstats", ), ( [ - "profiling.sampling.sample", + "profiling.sampling.cli", + "attach", + "12345", "--collapsed", "-o", "custom.txt", - "-p", - "12345", ], "custom.txt", "collapsed", ), ] + from profiling.sampling.cli import main + for test_args, expected_filename, expected_format in test_cases: with ( mock.patch("sys.argv", test_args), - mock.patch("profiling.sampling.sample.sample") as mock_sample, + mock.patch("profiling.sampling.cli.sample") as mock_sample, ): - profiling.sampling.sample.main() + main() mock_sample.assert_called_once() - call_args = mock_sample.call_args[1] - self.assertEqual(call_args["filename"], expected_filename) - self.assertEqual(call_args["output_format"], expected_format) def test_cli_missing_required_arguments(self): - """Test that CLI requires PID argument.""" + """Test that CLI requires subcommand.""" with ( - mock.patch("sys.argv", ["profiling.sampling.sample"]), + mock.patch("sys.argv", ["profiling.sampling.cli"]), mock.patch("sys.stderr", io.StringIO()), ): with self.assertRaises(SystemExit): - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() def test_cli_mutually_exclusive_format_options(self): """Test that pstats and collapsed options are mutually exclusive.""" @@ -600,66 +498,52 @@ def test_cli_mutually_exclusive_format_options(self): mock.patch( "sys.argv", [ - "profiling.sampling.sample", + "profiling.sampling.cli", + "attach", + "12345", "--pstats", "--collapsed", - "-p", - "12345", ], ), mock.patch("sys.stderr", io.StringIO()), ): with self.assertRaises(SystemExit): - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() def test_argument_parsing_basic(self): - test_args = ["profiling.sampling.sample", "-p", "12345"] + test_args = ["profiling.sampling.cli", "attach", "12345"] with ( mock.patch("sys.argv", test_args), - mock.patch("profiling.sampling.sample.sample") as mock_sample, + mock.patch("profiling.sampling.cli.sample") as mock_sample, ): - profiling.sampling.sample.main() - - mock_sample.assert_called_once_with( - 12345, - sample_interval_usec=100, - duration_sec=10, - filename=None, - all_threads=False, - limit=15, - sort=0, - show_summary=True, - output_format="pstats", - realtime_stats=False, - mode=0, - native=False, - gc=True, - ) + from profiling.sampling.cli import main + main() + + mock_sample.assert_called_once() def test_sort_options(self): + from profiling.sampling.cli import main + sort_options = [ - ("--sort-nsamples", 0), - ("--sort-tottime", 1), - ("--sort-cumtime", 2), - ("--sort-sample-pct", 3), - ("--sort-cumul-pct", 4), - ("--sort-name", -1), + ("nsamples", 0), + ("tottime", 1), + ("cumtime", 2), + ("sample-pct", 3), + ("cumul-pct", 4), + ("name", -1), ] for option, expected_sort_value in sort_options: - test_args = ["profiling.sampling.sample", option, "-p", "12345"] + test_args = ["profiling.sampling.cli", "attach", "12345", "--sort", option] with ( mock.patch("sys.argv", test_args), - mock.patch("profiling.sampling.sample.sample") as mock_sample, + mock.patch("profiling.sampling.cli.sample") as mock_sample, ): - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() mock_sample.assert_called_once() - call_args = mock_sample.call_args[1] - self.assertEqual( - call_args["sort"], - expected_sort_value, - ) mock_sample.reset_mock() diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py index 4a24256203c187..a592f16b367cbc 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py @@ -175,7 +175,7 @@ def test_pstats_collector_single_frame_stacks(self): def test_collapsed_stack_collector_with_empty_and_deep_stacks(self): """Test CollapsedStackCollector handles empty frames, single-frame stacks, and very deep call stacks.""" - collector = CollapsedStackCollector() + collector = CollapsedStackCollector(1000) # Test with empty frames collector.collect([]) @@ -197,7 +197,7 @@ def test_collapsed_stack_collector_with_empty_and_deep_stacks(self): # Test with very deep stack deep_stack = [(f"file{i}.py", i, f"func{i}") for i in range(100)] test_frames = [MockInterpreterInfo(0, [MockThreadInfo(1, deep_stack)])] - collector = CollapsedStackCollector() + collector = CollapsedStackCollector(1000) collector.collect(test_frames) # One aggregated path with 100 frames (reversed) (((path_tuple, thread_id),),) = (collector.stack_counter.keys(),) @@ -297,7 +297,7 @@ def test_pstats_collector_create_stats(self): self.assertEqual(func2_stats[3], 2.0) # ct (cumulative time) def test_collapsed_stack_collector_basic(self): - collector = CollapsedStackCollector() + collector = CollapsedStackCollector(1000) # Test empty state self.assertEqual(len(collector.stack_counter), 0) @@ -327,7 +327,7 @@ def test_collapsed_stack_collector_export(self): collapsed_out = tempfile.NamedTemporaryFile(delete=False) self.addCleanup(close_and_unlink, collapsed_out) - collector = CollapsedStackCollector() + collector = CollapsedStackCollector(1000) test_frames1 = [ MockInterpreterInfo( @@ -377,7 +377,7 @@ def test_collapsed_stack_collector_export(self): def test_flamegraph_collector_basic(self): """Test basic FlamegraphCollector functionality.""" - collector = FlamegraphCollector() + collector = FlamegraphCollector(1000) # Empty collector should produce 'No Data' data = collector._convert_to_flamegraph_format() @@ -437,7 +437,7 @@ def test_flamegraph_collector_export(self): ) self.addCleanup(close_and_unlink, flamegraph_out) - collector = FlamegraphCollector() + collector = FlamegraphCollector(1000) # Create some test data (use Interpreter/Thread objects like runtime) test_frames1 = [ @@ -495,7 +495,7 @@ def test_flamegraph_collector_export(self): def test_gecko_collector_basic(self): """Test basic GeckoCollector functionality.""" - collector = GeckoCollector() + collector = GeckoCollector(1000) # Test empty state self.assertEqual(len(collector.threads), 0) @@ -592,7 +592,7 @@ def test_gecko_collector_export(self): gecko_out = tempfile.NamedTemporaryFile(suffix=".json", delete=False) self.addCleanup(close_and_unlink, gecko_out) - collector = GeckoCollector() + collector = GeckoCollector(1000) test_frames1 = [ MockInterpreterInfo( @@ -668,7 +668,7 @@ def test_gecko_collector_markers(self): THREAD_STATUS_ON_CPU = 1 << 1 THREAD_STATUS_GIL_REQUESTED = 1 << 3 - collector = GeckoCollector() + collector = GeckoCollector(1000) # Status combinations for different thread states HAS_GIL_ON_CPU = ( diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py index e1c80fa6d5d1b7..39642b99ec246f 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py @@ -25,8 +25,6 @@ from test.support import ( requires_subprocess, - captured_stdout, - captured_stderr, ) from .helpers import ( @@ -289,7 +287,7 @@ def test_alternating_call_patterns(self): def test_collapsed_stack_with_recursion(self): """Test collapsed stack collector with recursive patterns.""" - collector = CollapsedStackCollector() + collector = CollapsedStackCollector(1000) # Recursive call pattern recursive_frames = [ @@ -434,12 +432,13 @@ def test_sampling_basic_functionality(self): mock.patch("sys.stdout", captured_output), ): try: + collector = PstatsCollector(sample_interval_usec=1000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, + collector, duration_sec=2, - sample_interval_usec=1000, # 1ms - show_summary=False, ) + collector.print_stats(show_summary=False) except PermissionError: self.skipTest("Insufficient permissions for remote profiling") @@ -466,12 +465,13 @@ def test_sampling_with_pstats_export(self): mock.patch("sys.stdout", captured_output), ): try: + collector = PstatsCollector(sample_interval_usec=10000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, + collector, duration_sec=1, - filename=pstats_out.name, - sample_interval_usec=10000, ) + collector.export(pstats_out.name) except PermissionError: self.skipTest( "Insufficient permissions for remote profiling" @@ -511,13 +511,13 @@ def test_sampling_with_collapsed_export(self): mock.patch("sys.stdout", captured_output), ): try: + collector = CollapsedStackCollector(1000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, + collector, duration_sec=1, - filename=collapsed_file.name, - output_format="collapsed", - sample_interval_usec=10000, ) + collector.export(collapsed_file.name) except PermissionError: self.skipTest( "Insufficient permissions for remote profiling" @@ -559,13 +559,14 @@ def test_sampling_all_threads(self): mock.patch("sys.stdout", captured_output), ): try: + collector = PstatsCollector(sample_interval_usec=10000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, + collector, duration_sec=1, all_threads=True, - sample_interval_usec=10000, - show_summary=False, ) + collector.print_stats(show_summary=False) except PermissionError: self.skipTest("Insufficient permissions for remote profiling") @@ -578,7 +579,7 @@ def test_sample_target_script(self): script_file.flush() self.addCleanup(close_and_unlink, script_file) - test_args = ["profiling.sampling.sample", "-d", "1", script_file.name] + test_args = ["profiling.sampling.cli", "run", "-d", "1", script_file.name] with ( mock.patch("sys.argv", test_args), @@ -586,7 +587,8 @@ def test_sample_target_script(self): mock.patch("sys.stdout", captured_output), ): try: - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() except PermissionError: self.skipTest("Insufficient permissions for remote profiling") @@ -610,7 +612,8 @@ def test_sample_target_module(self): f.write(self.test_script) test_args = [ - "profiling.sampling.sample", + "profiling.sampling.cli", + "run", "-d", "1", "-m", @@ -625,7 +628,8 @@ def test_sample_target_module(self): contextlib.chdir(tempdir.name), ): try: - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() except PermissionError: self.skipTest("Insufficient permissions for remote profiling") @@ -648,7 +652,8 @@ def test_sample_target_module(self): class TestSampleProfilerErrorHandling(unittest.TestCase): def test_invalid_pid(self): with self.assertRaises((OSError, RuntimeError)): - profiling.sampling.sample.sample(-1, duration_sec=1) + collector = PstatsCollector(sample_interval_usec=100, skip_idle=False) + profiling.sampling.sample.sample(-1, collector, duration_sec=1) def test_process_dies_during_sampling(self): with test_subprocess( @@ -659,10 +664,11 @@ def test_process_dies_during_sampling(self): mock.patch("sys.stdout", captured_output), ): try: + collector = PstatsCollector(sample_interval_usec=50000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, + collector, duration_sec=2, # Longer than process lifetime - sample_interval_usec=50000, ) except PermissionError: self.skipTest( @@ -673,34 +679,6 @@ def test_process_dies_during_sampling(self): self.assertIn("Error rate", output) - def test_invalid_output_format(self): - with self.assertRaises(ValueError): - profiling.sampling.sample.sample( - os.getpid(), - duration_sec=1, - output_format="invalid_format", - ) - - def test_invalid_output_format_with_mocked_profiler(self): - """Test invalid output format with proper mocking to avoid permission issues.""" - with mock.patch( - "profiling.sampling.sample.SampleProfiler" - ) as mock_profiler_class: - mock_profiler = mock.MagicMock() - mock_profiler_class.return_value = mock_profiler - - with self.assertRaises(ValueError) as cm: - profiling.sampling.sample.sample( - 12345, - duration_sec=1, - output_format="unknown_format", - ) - - # Should raise ValueError with the invalid format name - self.assertIn( - "Invalid output format: unknown_format", str(cm.exception) - ) - def test_is_process_running(self): with test_subprocess("import time; time.sleep(1000)") as subproc: try: @@ -749,31 +727,6 @@ def test_esrch_signal_handling(self): with self.assertRaises(ProcessLookupError): unwinder.get_stack_trace() - def test_valid_output_formats(self): - """Test that all valid output formats are accepted.""" - valid_formats = ["pstats", "collapsed", "flamegraph", "gecko"] - - tempdir = tempfile.TemporaryDirectory(delete=False) - self.addCleanup(shutil.rmtree, tempdir.name) - - with ( - contextlib.chdir(tempdir.name), - captured_stdout(), - captured_stderr(), - ): - for fmt in valid_formats: - try: - # This will likely fail with permissions, but the format should be valid - profiling.sampling.sample.sample( - os.getpid(), - duration_sec=0.1, - output_format=fmt, - filename=f"test_{fmt}.out", - ) - except (OSError, RuntimeError, PermissionError): - # Expected errors - we just want to test format validation - pass - def test_script_error_treatment(self): script_file = tempfile.NamedTemporaryFile( "w", delete=False, suffix=".py" @@ -786,7 +739,8 @@ def test_script_error_treatment(self): [ sys.executable, "-m", - "profiling.sampling.sample", + "profiling.sampling.cli", + "run", "-d", "1", script_file.name, diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_modes.py b/Lib/test/test_profiling/test_sampling_profiler/test_modes.py index f61d65da24bb4f..02b50826bb9f61 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_modes.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_modes.py @@ -15,7 +15,6 @@ ) from test.support import requires_subprocess -from test.support import captured_stdout, captured_stderr from .helpers import test_subprocess from .mocks import MockFrameInfo, MockInterpreterInfo @@ -28,11 +27,11 @@ def test_mode_validation(self): """Test that CLI validates mode choices correctly.""" # Invalid mode choice should raise SystemExit test_args = [ - "profiling.sampling.sample", + "profiling.sampling.cli", + "attach", + "12345", "--mode", "invalid", - "-p", - "12345", ] with ( @@ -40,7 +39,8 @@ def test_mode_validation(self): mock.patch("sys.stderr", io.StringIO()) as mock_stderr, self.assertRaises(SystemExit) as cm, ): - profiling.sampling.sample.main() + from profiling.sampling.cli import main + main() self.assertEqual(cm.exception.code, 2) # argparse error error_msg = mock_stderr.getvalue() @@ -170,14 +170,15 @@ def main(): mock.patch("sys.stdout", captured_output), ): try: + collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True) profiling.sampling.sample.sample( subproc.process.pid, + collector, duration_sec=2.0, - sample_interval_usec=5000, mode=1, # CPU mode - show_summary=False, all_threads=True, ) + collector.print_stats(show_summary=False, mode=1) except (PermissionError, RuntimeError) as e: self.skipTest( "Insufficient permissions for remote profiling" @@ -191,14 +192,15 @@ def main(): mock.patch("sys.stdout", captured_output), ): try: + collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, + collector, duration_sec=2.0, - sample_interval_usec=5000, mode=0, # Wall-clock mode - show_summary=False, all_threads=True, ) + collector.print_stats(show_summary=False) except (PermissionError, RuntimeError) as e: self.skipTest( "Insufficient permissions for remote profiling" @@ -223,17 +225,12 @@ def main(): def test_cpu_mode_with_no_samples(self): """Test that CPU mode handles no samples gracefully when no samples are collected.""" # Mock a collector that returns empty stats - mock_collector = mock.MagicMock() + mock_collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True) mock_collector.stats = {} - mock_collector.create_stats = mock.MagicMock() with ( io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), - mock.patch( - "profiling.sampling.sample.PstatsCollector", - return_value=mock_collector, - ), mock.patch( "profiling.sampling.sample.SampleProfiler" ) as mock_profiler_class, @@ -243,13 +240,14 @@ def test_cpu_mode_with_no_samples(self): profiling.sampling.sample.sample( 12345, # dummy PID + mock_collector, duration_sec=0.5, - sample_interval_usec=5000, mode=1, # CPU mode - show_summary=False, all_threads=True, ) + mock_collector.print_stats(show_summary=False, mode=1) + output = captured_output.getvalue() # Should see the "No samples were collected" message @@ -262,27 +260,30 @@ class TestGilModeFiltering(unittest.TestCase): def test_gil_mode_validation(self): """Test that CLI accepts gil mode choice correctly.""" + from profiling.sampling.cli import main + test_args = [ - "profiling.sampling.sample", + "profiling.sampling.cli", + "attach", + "12345", "--mode", "gil", - "-p", - "12345", ] with ( mock.patch("sys.argv", test_args), - mock.patch("profiling.sampling.sample.sample") as mock_sample, + mock.patch("profiling.sampling.cli.sample") as mock_sample, ): try: - profiling.sampling.sample.main() - except SystemExit: + main() + except (SystemExit, OSError, RuntimeError): pass # Expected due to invalid PID # Should have attempted to call sample with mode=2 (GIL mode) mock_sample.assert_called_once() - call_args = mock_sample.call_args[1] - self.assertEqual(call_args["mode"], 2) # PROFILING_MODE_GIL + call_args = mock_sample.call_args + # Check the mode parameter (should be in kwargs) + self.assertEqual(call_args.kwargs.get("mode"), 2) # PROFILING_MODE_GIL def test_gil_mode_sample_function_call(self): """Test that sample() function correctly uses GIL mode.""" @@ -290,25 +291,20 @@ def test_gil_mode_sample_function_call(self): mock.patch( "profiling.sampling.sample.SampleProfiler" ) as mock_profiler, - mock.patch( - "profiling.sampling.sample.PstatsCollector" - ) as mock_collector, ): # Mock the profiler instance mock_instance = mock.Mock() mock_profiler.return_value = mock_instance - # Mock the collector instance - mock_collector_instance = mock.Mock() - mock_collector.return_value = mock_collector_instance + # Create a real collector instance + collector = PstatsCollector(sample_interval_usec=1000, skip_idle=True) - # Call sample with GIL mode and a filename to avoid pstats creation + # Call sample with GIL mode profiling.sampling.sample.sample( 12345, + collector, mode=2, # PROFILING_MODE_GIL duration_sec=1, - sample_interval_usec=1000, - filename="test_output.txt", ) # Verify SampleProfiler was created with correct mode @@ -319,95 +315,36 @@ def test_gil_mode_sample_function_call(self): # Verify profiler.sample was called mock_instance.sample.assert_called_once() - # Verify collector.export was called since we provided a filename - mock_collector_instance.export.assert_called_once_with( - "test_output.txt" - ) - - def test_gil_mode_collector_configuration(self): - """Test that collectors are configured correctly for GIL mode.""" - with ( - mock.patch( - "profiling.sampling.sample.SampleProfiler" - ) as mock_profiler, - mock.patch( - "profiling.sampling.sample.PstatsCollector" - ) as mock_collector, - captured_stdout(), - captured_stderr(), - ): - # Mock the profiler instance - mock_instance = mock.Mock() - mock_profiler.return_value = mock_instance - - # Call sample with GIL mode - profiling.sampling.sample.sample( - 12345, - mode=2, # PROFILING_MODE_GIL - output_format="pstats", - ) - - # Verify collector was created with skip_idle=True (since mode != WALL) - mock_collector.assert_called_once() - call_args = mock_collector.call_args[1] - self.assertTrue(call_args["skip_idle"]) - - def test_gil_mode_with_collapsed_format(self): - """Test GIL mode with collapsed stack format.""" - with ( - mock.patch( - "profiling.sampling.sample.SampleProfiler" - ) as mock_profiler, - mock.patch( - "profiling.sampling.sample.CollapsedStackCollector" - ) as mock_collector, - ): - # Mock the profiler instance - mock_instance = mock.Mock() - mock_profiler.return_value = mock_instance - - # Call sample with GIL mode and collapsed format - profiling.sampling.sample.sample( - 12345, - mode=2, # PROFILING_MODE_GIL - output_format="collapsed", - filename="test_output.txt", - ) - - # Verify collector was created with skip_idle=True - mock_collector.assert_called_once() - call_args = mock_collector.call_args[1] - self.assertTrue(call_args["skip_idle"]) - def test_gil_mode_cli_argument_parsing(self): """Test CLI argument parsing for GIL mode with various options.""" + from profiling.sampling.cli import main + test_args = [ - "profiling.sampling.sample", + "profiling.sampling.cli", + "attach", + "12345", "--mode", "gil", - "--interval", + "-i", "500", - "--duration", + "-d", "5", - "-p", - "12345", ] with ( mock.patch("sys.argv", test_args), - mock.patch("profiling.sampling.sample.sample") as mock_sample, + mock.patch("profiling.sampling.cli.sample") as mock_sample, ): try: - profiling.sampling.sample.main() - except SystemExit: + main() + except (SystemExit, OSError, RuntimeError): pass # Expected due to invalid PID # Verify all arguments were parsed correctly mock_sample.assert_called_once() - call_args = mock_sample.call_args[1] - self.assertEqual(call_args["mode"], 2) # GIL mode - self.assertEqual(call_args["sample_interval_usec"], 500) - self.assertEqual(call_args["duration_sec"], 5) + call_args = mock_sample.call_args + self.assertEqual(call_args.kwargs.get("mode"), 2) # GIL mode + self.assertEqual(call_args.kwargs.get("duration_sec"), 5) @requires_subprocess() def test_gil_mode_integration_behavior(self): @@ -454,14 +391,15 @@ def main(): mock.patch("sys.stdout", captured_output), ): try: + collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True) profiling.sampling.sample.sample( subproc.process.pid, + collector, duration_sec=2.0, - sample_interval_usec=5000, mode=2, # GIL mode - show_summary=False, all_threads=True, ) + collector.print_stats(show_summary=False) except (PermissionError, RuntimeError) as e: self.skipTest( "Insufficient permissions for remote profiling" @@ -475,14 +413,15 @@ def main(): mock.patch("sys.stdout", captured_output), ): try: + collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, + collector, duration_sec=0.5, - sample_interval_usec=5000, mode=0, # Wall-clock mode - show_summary=False, all_threads=True, ) + collector.print_stats(show_summary=False) except (PermissionError, RuntimeError) as e: self.skipTest( "Insufficient permissions for remote profiling" diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py index ef70d8666047ac..2d129dc8db56d1 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py @@ -6,7 +6,7 @@ try: import _remote_debugging # noqa: F401 - from profiling.sampling.sample import SampleProfiler, print_sampled_stats + from profiling.sampling.sample import SampleProfiler from profiling.sampling.pstats_collector import PstatsCollector except ImportError: raise unittest.SkipTest( @@ -16,6 +16,24 @@ from test.support import force_not_colorized_test_class +def print_sampled_stats(stats, sort=-1, limit=None, show_summary=True, sample_interval_usec=100): + """Helper function to maintain compatibility with old test API. + + This wraps the new PstatsCollector.print_stats() API to work with the + existing test infrastructure. + """ + # Create a mock collector that populates stats correctly + collector = PstatsCollector(sample_interval_usec=sample_interval_usec) + + # Override create_stats to populate self.stats with the provided stats + def mock_create_stats(): + collector.stats = stats.stats + collector.create_stats = mock_create_stats + + # Call the new print_stats method + collector.print_stats(sort=sort, limit=limit, show_summary=show_summary) + + class TestSampleProfiler(unittest.TestCase): """Test the SampleProfiler class.""" @@ -406,8 +424,8 @@ def test_print_sampled_stats_empty_stats(self): result = output.getvalue() - # Should still print header - self.assertIn("Profile Stats:", result) + # Should print message about no samples + self.assertIn("No samples were collected.", result) def test_print_sampled_stats_sample_percentage_sorting(self): """Test sample percentage sorting options.""" From d6ca1b3f2756490c2627f45fe6e0dc1cbec32d4e Mon Sep 17 00:00:00 2001 From: Pablo Galindo Salgado Date: Fri, 21 Nov 2025 16:16:31 +0000 Subject: [PATCH 2/7] fixup! gh-138122: Refactor the CLI of profile.sampling into subcommands --- Lib/profiling/sampling/sample.py | 2 ++ .../test_profiling/test_sampling_profiler/test_modes.py | 9 +++++---- Misc/NEWS.d/3.15.0a2.rst | 2 +- .../2025-11-17-00-53-51.gh-issue-141645.TC3TL3.rst | 2 +- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/Lib/profiling/sampling/sample.py b/Lib/profiling/sampling/sample.py index a3ed693275e7c5..f3fa441a35f420 100644 --- a/Lib/profiling/sampling/sample.py +++ b/Lib/profiling/sampling/sample.py @@ -13,6 +13,8 @@ from .gecko_collector import GeckoCollector from .constants import ( PROFILING_MODE_WALL, + PROFILING_MODE_CPU, + PROFILING_MODE_GIL, PROFILING_MODE_ALL, ) try: diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_modes.py b/Lib/test/test_profiling/test_sampling_profiler/test_modes.py index 02b50826bb9f61..1b0e21a5fe45d6 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_modes.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_modes.py @@ -444,10 +444,11 @@ def test_mode_constants_are_defined(self): def test_parse_mode_function(self): """Test the _parse_mode function with all valid modes.""" - self.assertEqual(profiling.sampling.sample._parse_mode("wall"), 0) - self.assertEqual(profiling.sampling.sample._parse_mode("cpu"), 1) - self.assertEqual(profiling.sampling.sample._parse_mode("gil"), 2) + from profiling.sampling.cli import _parse_mode + self.assertEqual(_parse_mode("wall"), 0) + self.assertEqual(_parse_mode("cpu"), 1) + self.assertEqual(_parse_mode("gil"), 2) # Test invalid mode raises KeyError with self.assertRaises(KeyError): - profiling.sampling.sample._parse_mode("invalid") + _parse_mode("invalid") diff --git a/Misc/NEWS.d/3.15.0a2.rst b/Misc/NEWS.d/3.15.0a2.rst index ba82c854fac2d4..ba439d49517add 100644 --- a/Misc/NEWS.d/3.15.0a2.rst +++ b/Misc/NEWS.d/3.15.0a2.rst @@ -388,7 +388,7 @@ Add :func:`os.reload_environ` to ``os.__all__``. .. nonce: L13UCV .. section: Library -Fix :func:`profiling.sampling.sample` incorrectly handling a +Fix ``profiling.sampling.sample()`` incorrectly handling a :exc:`FileNotFoundError` or :exc:`PermissionError`. .. diff --git a/Misc/NEWS.d/next/Library/2025-11-17-00-53-51.gh-issue-141645.TC3TL3.rst b/Misc/NEWS.d/next/Library/2025-11-17-00-53-51.gh-issue-141645.TC3TL3.rst index 25c83105b48338..d3f3bfddf6e867 100644 --- a/Misc/NEWS.d/next/Library/2025-11-17-00-53-51.gh-issue-141645.TC3TL3.rst +++ b/Misc/NEWS.d/next/Library/2025-11-17-00-53-51.gh-issue-141645.TC3TL3.rst @@ -1,4 +1,4 @@ Add a new ``--live`` mode to the tachyon profiler in -:mod:`profiling.sampling` module. This mode consist of a live TUI that +:mod:`!profiling.sampling` module. This mode consist of a live TUI that displays real-time profiling statistics as the target application runs, similar to ``top``. Patch by Pablo Galindo From 54be163eefdfcc0fbf4a4c365c56cfb385a62ac9 Mon Sep 17 00:00:00 2001 From: Pablo Galindo Date: Fri, 21 Nov 2025 21:35:55 +0000 Subject: [PATCH 3/7] fixup! fixup! gh-138122: Refactor the CLI of profile.sampling into subcommands --- Lib/test/test_profiling/test_sampling_profiler/test_advanced.py | 2 +- .../test_profiling/test_sampling_profiler/test_integration.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py b/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py index 265e358fc6cdd0..94946d74aa4784 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py @@ -265,7 +265,7 @@ def worker(x): proc.kill() stdout, stderr = proc.communicate() - if "PermissionError" in stderr: + if "Permission Error" in stderr: self.skipTest("Insufficient permissions for remote profiling") self.assertIn("Results: [2, 4, 6]", stdout) diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py index 39642b99ec246f..e33eb0ffeed1bb 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py @@ -750,7 +750,7 @@ def test_script_error_treatment(self): ) output = result.stdout + result.stderr - if "PermissionError" in output: + if "Permission Error" in output: self.skipTest("Insufficient permissions for remote profiling") self.assertNotIn("Script file not found", output) self.assertIn( From 817fdc0b54b0a0d915b845b1d160da8b7c584cd1 Mon Sep 17 00:00:00 2001 From: Pablo Galindo Salgado Date: Fri, 21 Nov 2025 23:23:56 +0000 Subject: [PATCH 4/7] fixup! Merge remote-tracking branch 'upstream/main' into gh-138122-2 --- .../test_profiling/test_sampling_profiler/test_integration.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py index aa820c6da90cfe..5b76dd5bc55696 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py @@ -25,8 +25,6 @@ from test.support import ( requires_subprocess, - captured_stdout, - captured_stderr, SHORT_TIMEOUT, ) From 170760bff97f6f87cd83d5682b4ede472bde988d Mon Sep 17 00:00:00 2001 From: Pablo Galindo Salgado Date: Sat, 22 Nov 2025 19:16:22 +0000 Subject: [PATCH 5/7] Address feedback --- Lib/profiling/sampling/cli.py | 156 +++++++++--------- .../test_integration.py | 7 +- 2 files changed, 80 insertions(+), 83 deletions(-) diff --git a/Lib/profiling/sampling/cli.py b/Lib/profiling/sampling/cli.py index 74ea049a7c1393..03303adc5ecb2b 100644 --- a/Lib/profiling/sampling/cli.py +++ b/Lib/profiling/sampling/cli.py @@ -29,40 +29,32 @@ LiveStatsCollector = None +class CustomFormatter( + argparse.ArgumentDefaultsHelpFormatter, + argparse.RawDescriptionHelpFormatter, +): + """Custom formatter that combines default values display with raw description formatting.""" + pass + + _HELP_DESCRIPTION = """Sample a process's stack frames and generate profiling data. Commands: run Run and profile a script or module attach Attach to and profile a running process - live Interactive TUI profiler (top-like interface) Examples: - # Run and profile a script (default: pstats to stdout) + # Run and profile a script python -m profiling.sampling run script.py arg1 arg2 - # Run and profile a module - python -m profiling.sampling run -m mymodule arg1 arg2 - # Attach to a running process python -m profiling.sampling attach 1234 - # Live interactive mode for a process - python -m profiling.sampling live 1234 - - # Live mode for a script - python -m profiling.sampling live script.py - - # Generate flamegraph from a script - python -m profiling.sampling run --flamegraph -o output.html script.py + # Live interactive mode for a script + python -m profiling.sampling run --live script.py - # Profile with custom interval and duration - python -m profiling.sampling run -i 50 -d 30 script.py - - # Profile all threads, sort by total time - python -m profiling.sampling attach -a --sort tottime 1234 - - # Save collapsed stacks to file - python -m profiling.sampling run --collapsed -o stacks.txt script.py + # Live interactive mode for a running process + python -m profiling.sampling attach --live 1234 Use 'python -m profiling.sampling --help' for command-specific help.""" @@ -167,14 +159,16 @@ def _add_sampling_options(parser): "--interval", type=int, default=100, - help="Sampling interval in microseconds (default: 100)", + metavar="MICROSECONDS", + help="Sampling interval", ) sampling_group.add_argument( "-d", "--duration", type=int, default=10, - help="Sampling duration in seconds (default: 10)", + metavar="SECONDS", + help="Sampling duration", ) sampling_group.add_argument( "-a", @@ -208,7 +202,7 @@ def _add_mode_options(parser): choices=["wall", "cpu", "gil"], default="wall", help="Sampling mode: wall (all samples), cpu (only samples when thread is on CPU), " - "gil (only samples when thread holds the GIL) (default: wall)", + "gil (only samples when thread holds the GIL)", ) @@ -270,14 +264,14 @@ def _add_pstats_options(parser): "name", ], default="nsamples", - help="Sort order for pstats output (default: nsamples)", + help="Sort order for pstats output", ) pstats_group.add_argument( "-l", "--limit", type=int, default=15, - help="Limit the number of rows in the output (default: 15)", + help="Limit the number of rows in the output", ) pstats_group.add_argument( "--no-summary", @@ -368,13 +362,18 @@ def _validate_args(args, parser): parser: ArgumentParser instance for error reporting """ # Check if live mode is available - if args.command == "live" and LiveStatsCollector is None: + if hasattr(args, 'live') and args.live and LiveStatsCollector is None: parser.error( "Live mode requires the curses module, which is not available." ) - # Only validate format options for run/attach commands (live doesn't have format option) - if args.command not in ("run", "attach"): + # Live mode is incompatible with format options + if hasattr(args, 'live') and args.live: + if args.format != "pstats": + format_flag = f"--{args.format}" + parser.error( + f"--live is incompatible with {format_flag}. Live mode uses a TUI interface." + ) return # Validate gecko mode doesn't use non-wall mode @@ -406,7 +405,7 @@ def main(): # Create the main parser parser = argparse.ArgumentParser( description=_HELP_DESCRIPTION, - formatter_class=argparse.RawDescriptionHelpFormatter, + formatter_class=CustomFormatter, ) # Create subparsers for commands @@ -418,8 +417,24 @@ def main(): run_parser = subparsers.add_parser( "run", help="Run and profile a script or module", - formatter_class=argparse.RawDescriptionHelpFormatter, - description="Run and profile a Python script or module", + formatter_class=CustomFormatter, + description="""Run and profile a Python script or module + +Examples: + # Run and profile a module + python -m profiling.sampling run -m mymodule arg1 arg2 + + # Generate flamegraph from a script + python -m profiling.sampling run --flamegraph -o output.html script.py + + # Profile with custom interval and duration + python -m profiling.sampling run -i 50 -d 30 script.py + + # Save collapsed stacks to file + python -m profiling.sampling run --collapsed -o stacks.txt script.py + + # Live interactive mode for a script + python -m profiling.sampling run --live script.py""", ) run_parser.add_argument( "-m", @@ -436,6 +451,11 @@ def main(): nargs=argparse.REMAINDER, help="Arguments to pass to the script or module", ) + run_parser.add_argument( + "--live", + action="store_true", + help="Interactive TUI profiler (top-like interface, press 'q' to quit, 's' to cycle sort)", + ) _add_sampling_options(run_parser) _add_mode_options(run_parser) _add_format_options(run_parser) @@ -445,44 +465,31 @@ def main(): attach_parser = subparsers.add_parser( "attach", help="Attach to and profile a running process", - formatter_class=argparse.RawDescriptionHelpFormatter, - description="Attach to a running process and profile it", + formatter_class=CustomFormatter, + description="""Attach to a running process and profile it + +Examples: + # Profile all threads, sort by total time + python -m profiling.sampling attach -a --sort tottime 1234 + + # Live interactive mode for a running process + python -m profiling.sampling attach --live 1234""", ) attach_parser.add_argument( "pid", type=int, help="Process ID to attach to", ) + attach_parser.add_argument( + "--live", + action="store_true", + help="Interactive TUI profiler (top-like interface, press 'q' to quit, 's' to cycle sort)", + ) _add_sampling_options(attach_parser) _add_mode_options(attach_parser) _add_format_options(attach_parser) _add_pstats_options(attach_parser) - # === LIVE COMMAND === - live_parser = subparsers.add_parser( - "live", - help="Interactive TUI profiler (top-like interface)", - formatter_class=argparse.RawDescriptionHelpFormatter, - description="Interactive live profiling with a terminal UI (press 'q' to quit, 's' to cycle sort)", - ) - live_parser.add_argument( - "-m", - "--module", - action="store_true", - help="Run target as a module (like python -m)", - ) - live_parser.add_argument( - "target", - help="Process ID, script file, or module name to profile", - ) - live_parser.add_argument( - "args", - nargs=argparse.REMAINDER, - help="Arguments to pass to the script or module (if not a PID)", - ) - _add_sampling_options(live_parser) - _add_mode_options(live_parser) - # Parse arguments args = parser.parse_args() @@ -493,7 +500,6 @@ def main(): command_handlers = { "run": _handle_run, "attach": _handle_attach, - "live": _handle_live, } # Execute the appropriate command @@ -506,6 +512,11 @@ def main(): def _handle_attach(args): """Handle the 'attach' command.""" + # Check if live mode is requested + if args.live: + _handle_live_attach(args, args.pid) + return + # Use PROFILING_MODE_ALL for gecko format mode = ( PROFILING_MODE_ALL @@ -539,6 +550,11 @@ def _handle_attach(args): def _handle_run(args): """Handle the 'run' command.""" + # Check if live mode is requested + if args.live: + _handle_live_run(args) + return + # Build the command to run if args.module: cmd = (sys.executable, "-m", args.target, *args.args) @@ -589,26 +605,6 @@ def _handle_run(args): process.wait() -def _handle_live(args): - """Handle the 'live' command.""" - # Determine if target is a PID or a script/module - try: - # Try to parse as PID - pid = int(args.target) - is_pid = True - except ValueError: - # It's a script or module name - is_pid = False - pid = None - - if is_pid: - # Attach to existing process in live mode - _handle_live_attach(args, pid) - else: - # Run script/module in live mode - _handle_live_run(args) - - def _handle_live_attach(args, pid): """Handle live mode for an existing process.""" mode = _parse_mode(args.mode) diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py index 5b76dd5bc55696..58505eed228ddd 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py @@ -405,12 +405,13 @@ def test_sampling_basic_functionality(self): ): try: # Sample for up to SHORT_TIMEOUT seconds, but process exits after fixed iterations + collector = PstatsCollector(sample_interval_usec=1000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, + collector, duration_sec=SHORT_TIMEOUT, - sample_interval_usec=1000, # 1ms - show_summary=False, ) + collector.print_stats(show_summary=False) except PermissionError: self.skipTest("Insufficient permissions for remote profiling") @@ -552,7 +553,7 @@ def test_sample_target_script(self): self.addCleanup(close_and_unlink, script_file) # Sample for up to SHORT_TIMEOUT seconds, but process exits after fixed iterations - test_args = ["profiling.sampling.sample", "-d", PROFILING_TIMEOUT, script_file.name] + test_args = ["profiling.sampling.sample", "run", "-d", PROFILING_TIMEOUT, script_file.name] with ( mock.patch("sys.argv", test_args), From 6e4c0d90bc8d659085548ec758be93abc8804c7d Mon Sep 17 00:00:00 2001 From: Pablo Galindo Date: Sun, 23 Nov 2025 17:06:15 +0000 Subject: [PATCH 6/7] Address Laszlo's review --- Lib/profiling/sampling/cli.py | 35 +++++++++---- .../test_integration.py | 50 +++++++++++++++++++ 2 files changed, 76 insertions(+), 9 deletions(-) diff --git a/Lib/profiling/sampling/cli.py b/Lib/profiling/sampling/cli.py index 03303adc5ecb2b..7eab2a3894a7b8 100644 --- a/Lib/profiling/sampling/cli.py +++ b/Lib/profiling/sampling/cli.py @@ -263,15 +263,15 @@ def _add_pstats_options(parser): "nsamples-cumul", "name", ], - default="nsamples", - help="Sort order for pstats output", + default=None, + help="Sort order for pstats output (default: nsamples)", ) pstats_group.add_argument( "-l", "--limit", type=int, - default=15, - help="Limit the number of rows in the output", + default=None, + help="Limit the number of rows in the output (default: 15)", ) pstats_group.add_argument( "--no-summary", @@ -343,10 +343,12 @@ def _handle_output(collector, args, pid, mode): if args.outfile: collector.export(args.outfile) else: - # Print to stdout - sort_mode = _sort_to_mode(args.sort) + # Print to stdout with defaults applied + sort_choice = args.sort if args.sort is not None else "nsamples" + limit = args.limit if args.limit is not None else 15 + sort_mode = _sort_to_mode(sort_choice) collector.print_stats( - sort_mode, args.limit, not args.no_summary, mode + sort_mode, limit, not args.no_summary, mode ) else: # Export to file @@ -374,6 +376,21 @@ def _validate_args(args, parser): parser.error( f"--live is incompatible with {format_flag}. Live mode uses a TUI interface." ) + + # Live mode is also incompatible with pstats-specific options + issues = [] + if args.sort is not None: + issues.append("--sort") + if args.limit is not None: + issues.append("--limit") + if args.no_summary: + issues.append("--no-summary") + + if issues: + parser.error( + f"Options {', '.join(issues)} are incompatible with --live. " + "Live mode uses a TUI interface with its own controls." + ) return # Validate gecko mode doesn't use non-wall mode @@ -386,9 +403,9 @@ def _validate_args(args, parser): # Validate pstats-specific options are only used with pstats format if args.format != "pstats": issues = [] - if args.sort != "nsamples": + if args.sort is not None: issues.append("--sort") - if args.limit != 15: + if args.limit is not None: issues.append("--limit") if args.no_summary: issues.append("--no-summary") diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py index 58505eed228ddd..0e25c0b5449d61 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py @@ -730,3 +730,53 @@ def test_script_error_treatment(self): self.assertIn( "No such file or directory: 'nonexistent_file.txt'", output ) + + def test_live_incompatible_with_pstats_options(self): + """Test that --live is incompatible with individual pstats options.""" + test_cases = [ + (["--sort", "tottime"], "--sort"), + (["--limit", "30"], "--limit"), + (["--no-summary"], "--no-summary"), + ] + + for args, expected_flag in test_cases: + with self.subTest(args=args): + test_args = ["profiling.sampling.cli", "run", "--live"] + args + ["test.py"] + with mock.patch("sys.argv", test_args): + with self.assertRaises(SystemExit) as cm: + from profiling.sampling.cli import main + main() + self.assertNotEqual(cm.exception.code, 0) + + def test_live_incompatible_with_multiple_pstats_options(self): + """Test that --live is incompatible with multiple pstats options.""" + test_args = [ + "profiling.sampling.cli", "run", "--live", + "--sort", "cumtime", "--limit", "25", "--no-summary", "test.py" + ] + + with mock.patch("sys.argv", test_args): + with self.assertRaises(SystemExit) as cm: + from profiling.sampling.cli import main + main() + self.assertNotEqual(cm.exception.code, 0) + + def test_live_incompatible_with_pstats_default_values(self): + """Test that --live blocks pstats options even with default values.""" + # Test with --sort=nsamples (the default value) + test_args = ["profiling.sampling.cli", "run", "--live", "--sort=nsamples", "test.py"] + + with mock.patch("sys.argv", test_args): + with self.assertRaises(SystemExit) as cm: + from profiling.sampling.cli import main + main() + self.assertNotEqual(cm.exception.code, 0) + + # Test with --limit=15 (the default value) + test_args = ["profiling.sampling.cli", "run", "--live", "--limit=15", "test.py"] + + with mock.patch("sys.argv", test_args): + with self.assertRaises(SystemExit) as cm: + from profiling.sampling.cli import main + main() + self.assertNotEqual(cm.exception.code, 0) From 271df56775203364fc55375a05c639b948f65138 Mon Sep 17 00:00:00 2001 From: Pablo Galindo Salgado Date: Sun, 23 Nov 2025 22:46:51 +0000 Subject: [PATCH 7/7] Update Lib/profiling/sampling/cli.py Co-authored-by: Hugo van Kemenade <1324225+hugovk@users.noreply.github.com> --- Lib/profiling/sampling/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/profiling/sampling/cli.py b/Lib/profiling/sampling/cli.py index 7eab2a3894a7b8..aede6a4d3e9f1b 100644 --- a/Lib/profiling/sampling/cli.py +++ b/Lib/profiling/sampling/cli.py @@ -160,7 +160,7 @@ def _add_sampling_options(parser): type=int, default=100, metavar="MICROSECONDS", - help="Sampling interval", + help="sampling interval", ) sampling_group.add_argument( "-d",