Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
af65c15
It might work but the code is bad
savannahostrowski Nov 3, 2025
ec28f88
Account for function doing CPU work before/after spawning workers
savannahostrowski Nov 3, 2025
1e01766
Merge branch 'main' into async-tachyon
savannahostrowski Nov 3, 2025
2a2e197
Code cleanup
savannahostrowski Nov 3, 2025
61dc0bb
WIP
savannahostrowski Nov 3, 2025
c9c34a5
Merge branch 'main' into async-tachyon
savannahostrowski Nov 13, 2025
cc9e9ab
Remove depth
savannahostrowski Nov 13, 2025
9b22f1e
Make keyword only
savannahostrowski Nov 13, 2025
890474d
Fix tests
savannahostrowski Nov 13, 2025
563ecff
Bruuuh, it worked
savannahostrowski Nov 13, 2025
112ce73
Simplify algo
pablogsal Nov 14, 2025
2beed97
Fix multiple parents
pablogsal Nov 14, 2025
7315953
Good shit
pablogsal Nov 14, 2025
f8e9d72
Deque, deduplicate yields, propagate thread_id
savannahostrowski Nov 14, 2025
9a4875f
📜🤖 Added by blurb_it.
blurb-it[bot] Nov 14, 2025
ec6fb51
Remove deduplication of leaves to ensure call stacks can be properly …
savannahostrowski Nov 14, 2025
67e1f74
Merge branch 'async-tachyon' of https://github.com/savannahostrowski/…
savannahostrowski Nov 14, 2025
e9ae950
Fix WASI
savannahostrowski Nov 14, 2025
acef9a0
More WASI fixes
savannahostrowski Nov 14, 2025
2953454
Merge main
savannahostrowski Nov 23, 2025
09f5205
Fix tests
savannahostrowski Nov 23, 2025
dc7abae
Fix broken imports
savannahostrowski Nov 24, 2025
36c8b3c
Remove old test file
savannahostrowski Nov 24, 2025
be6d228
Merge remote-tracking branch 'upstream/main' into async-tachyon
pablogsal Nov 24, 2025
64ccb1a
Fixes
pablogsal Nov 24, 2025
fca9c88
fixup! Fixes
pablogsal Nov 25, 2025
394069d
Merge main
savannahostrowski Dec 1, 2025
3d9d2fb
Fix test error
savannahostrowski Dec 1, 2025
1134431
Fix quotations for consistency
savannahostrowski Dec 1, 2025
f0242e1
Merge remote-tracking branch 'upstream/main' into async-tachyon
pablogsal Dec 6, 2025
56661dc
Update to latest main
pablogsal Dec 6, 2025
ff983d8
Fix tests
pablogsal Dec 6, 2025
2203021
Fix tests
pablogsal Dec 6, 2025
e6eaa2c
CLI update
pablogsal Dec 6, 2025
47ebc11
Small fixes
pablogsal Dec 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion Lib/profiling/sampling/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ def _add_sampling_options(parser):
dest="gc",
help='Don\'t include artificial "<GC>" frames to denote active garbage collection',
)
sampling_group.add_argument(
"--async-aware",
action="store_true",
help="Enable async-aware profiling (uses task-based stack reconstruction)",
)


def _add_mode_options(parser):
Expand All @@ -205,7 +210,14 @@ def _add_mode_options(parser):
choices=["wall", "cpu", "gil"],
default="wall",
help="Sampling mode: wall (all samples), cpu (only samples when thread is on CPU), "
"gil (only samples when thread holds the GIL)",
"gil (only samples when thread holds the GIL). Incompatible with --async-aware",
)
mode_group.add_argument(
"--async-mode",
choices=["running", "all"],
default="running",
help='Async profiling mode: "running" (only running task) '
'or "all" (all tasks including waiting). Requires --async-aware',
)


Expand Down Expand Up @@ -382,6 +394,27 @@ def _validate_args(args, parser):
"Live mode requires the curses module, which is not available."
)

# Async-aware mode is incompatible with --native, --no-gc, --mode, and --all-threads
if args.async_aware:
issues = []
if args.native:
issues.append("--native")
if not args.gc:
issues.append("--no-gc")
if hasattr(args, 'mode') and args.mode != "wall":
issues.append(f"--mode={args.mode}")
if hasattr(args, 'all_threads') and args.all_threads:
issues.append("--all-threads")
if issues:
parser.error(
f"Options {', '.join(issues)} are incompatible with --async-aware. "
"Async-aware profiling uses task-based stack reconstruction."
)

# --async-mode requires --async-aware
if hasattr(args, 'async_mode') and args.async_mode != "running" and not args.async_aware:
parser.error("--async-mode requires --async-aware to be enabled.")

# Live mode is incompatible with format options
if hasattr(args, 'live') and args.live:
if args.format != "pstats":
Expand Down Expand Up @@ -570,6 +603,7 @@ def _handle_attach(args):
all_threads=args.all_threads,
realtime_stats=args.realtime_stats,
mode=mode,
async_aware=args.async_mode if args.async_aware else None,
native=args.native,
gc=args.gc,
)
Expand Down Expand Up @@ -618,6 +652,7 @@ def _handle_run(args):
all_threads=args.all_threads,
realtime_stats=args.realtime_stats,
mode=mode,
async_aware=args.async_mode if args.async_aware else None,
native=args.native,
gc=args.gc,
)
Expand Down Expand Up @@ -650,6 +685,7 @@ def _handle_live_attach(args, pid):
limit=20, # Default limit
pid=pid,
mode=mode,
async_aware=args.async_mode if args.async_aware else None,
)

# Sample in live mode
Expand All @@ -660,6 +696,7 @@ def _handle_live_attach(args, pid):
all_threads=args.all_threads,
realtime_stats=args.realtime_stats,
mode=mode,
async_aware=args.async_mode if args.async_aware else None,
native=args.native,
gc=args.gc,
)
Expand Down Expand Up @@ -689,6 +726,7 @@ def _handle_live_run(args):
limit=20, # Default limit
pid=process.pid,
mode=mode,
async_aware=args.async_mode if args.async_aware else None,
)

# Profile the subprocess in live mode
Expand All @@ -700,6 +738,7 @@ def _handle_live_run(args):
all_threads=args.all_threads,
realtime_stats=args.realtime_stats,
mode=mode,
async_aware=args.async_mode if args.async_aware else None,
native=args.native,
gc=args.gc,
)
Expand Down
97 changes: 96 additions & 1 deletion Lib/profiling/sampling/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
from .constants import (
THREAD_STATUS_HAS_GIL,
THREAD_STATUS_ON_CPU,
THREAD_STATUS_UNKNOWN,
THREAD_STATUS_GIL_REQUESTED,
THREAD_STATUS_UNKNOWN,
)

try:
from _remote_debugging import FrameInfo
except ImportError:
# Fallback definition if _remote_debugging is not available
FrameInfo = None

class Collector(ABC):
@abstractmethod
def collect(self, stack_frames):
Expand Down Expand Up @@ -33,6 +39,95 @@ def _iter_all_frames(self, stack_frames, skip_idle=False):
if frames:
yield frames, thread_info.thread_id

def _iter_async_frames(self, awaited_info_list):
# Phase 1: Index tasks and build parent relationships with pre-computed selection
task_map, child_to_parent, all_task_ids, all_parent_ids = self._build_task_graph(awaited_info_list)

# Phase 2: Find leaf tasks (tasks not awaited by anyone)
leaf_task_ids = self._find_leaf_tasks(all_task_ids, all_parent_ids)

# Phase 3: Build linear stacks from each leaf to root (optimized - no sorting!)
yield from self._build_linear_stacks(leaf_task_ids, task_map, child_to_parent)

def _build_task_graph(self, awaited_info_list):
task_map = {}
child_to_parent = {} # Maps child_id -> (selected_parent_id, parent_count)
all_task_ids = set()
all_parent_ids = set() # Track ALL parent IDs for leaf detection

for awaited_info in awaited_info_list:
thread_id = awaited_info.thread_id
for task_info in awaited_info.awaited_by:
task_id = task_info.task_id
task_map[task_id] = (task_info, thread_id)
all_task_ids.add(task_id)

# Pre-compute selected parent and count for optimization
if task_info.awaited_by:
parent_ids = [p.task_name for p in task_info.awaited_by]
parent_count = len(parent_ids)
# Track ALL parents for leaf detection
all_parent_ids.update(parent_ids)
# Use min() for O(n) instead of sorted()[0] which is O(n log n)
selected_parent = min(parent_ids) if parent_count > 1 else parent_ids[0]
child_to_parent[task_id] = (selected_parent, parent_count)

return task_map, child_to_parent, all_task_ids, all_parent_ids

def _find_leaf_tasks(self, all_task_ids, all_parent_ids):
# Leaves are tasks that are not parents of any other task
return all_task_ids - all_parent_ids

def _build_linear_stacks(self, leaf_task_ids, task_map, child_to_parent):
for leaf_id in leaf_task_ids:
frames = []
visited = set()
current_id = leaf_id
thread_id = None

# Follow the single parent chain from leaf to root
while current_id is not None:
# Cycle detection
if current_id in visited:
break
visited.add(current_id)

# Check if task exists in task_map
if current_id not in task_map:
break

task_info, tid = task_map[current_id]

# Set thread_id from first task
if thread_id is None:
thread_id = tid

# Add all frames from all coroutines in this task
if task_info.coroutine_stack:
for coro_info in task_info.coroutine_stack:
for frame in coro_info.call_stack:
frames.append(frame)

# Get pre-computed parent info (no sorting needed!)
parent_info = child_to_parent.get(current_id)

# Add task boundary marker with parent count annotation if multiple parents
task_name = task_info.task_name or "Task-" + str(task_info.task_id)
if parent_info:
selected_parent, parent_count = parent_info
if parent_count > 1:
task_name = f"{task_name} ({parent_count} parents)"
frames.append(FrameInfo(("<task>", 0, task_name)))
current_id = selected_parent
else:
# Root task - no parent
frames.append(FrameInfo(("<task>", 0, task_name)))
current_id = None

# Yield the complete stack if we collected any frames
if frames and thread_id is not None:
yield frames, thread_id, leaf_id

def _is_gc_frame(self, frame):
if isinstance(frame, tuple):
funcname = frame[2] if len(frame) >= 3 else ""
Expand Down
116 changes: 46 additions & 70 deletions Lib/profiling/sampling/live_collector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def __init__(
pid=None,
display=None,
mode=None,
async_aware=None,
):
"""
Initialize the live stats collector.
Expand All @@ -115,6 +116,7 @@ def __init__(
pid: Process ID being profiled
display: DisplayInterface implementation (None means curses will be used)
mode: Profiling mode ('cpu', 'gil', etc.) - affects what stats are shown
async_aware: Async tracing mode - None (sync only), "all" or "running"
"""
self.result = collections.defaultdict(
lambda: dict(total_rec_calls=0, direct_calls=0, cumulative_calls=0)
Expand All @@ -133,6 +135,9 @@ def __init__(
self.running = True
self.pid = pid
self.mode = mode # Profiling mode
self.async_aware = async_aware # Async tracing mode
# Pre-select frame iterator method to avoid per-call dispatch overhead
self._get_frame_iterator = self._get_async_frame_iterator if async_aware else self._get_sync_frame_iterator
self._saved_stdout = None
self._saved_stderr = None
self._devnull = None
Expand Down Expand Up @@ -294,6 +299,15 @@ def process_frames(self, frames, thread_id=None):
if thread_data:
thread_data.result[top_location]["direct_calls"] += 1

def _get_sync_frame_iterator(self, stack_frames):
"""Iterator for sync frames."""
return self._iter_all_frames(stack_frames, skip_idle=self.skip_idle)

def _get_async_frame_iterator(self, stack_frames):
"""Iterator for async frames, yielding (frames, thread_id) tuples."""
for frames, thread_id, task_id in self._iter_async_frames(stack_frames):
yield frames, thread_id

def collect_failed_sample(self):
self.failed_samples += 1
self.total_samples += 1
Expand All @@ -304,78 +318,40 @@ def collect(self, stack_frames):
self.start_time = time.perf_counter()
self._last_display_update = self.start_time

# Thread status counts for this sample
temp_status_counts = {
"has_gil": 0,
"on_cpu": 0,
"gil_requested": 0,
"unknown": 0,
"total": 0,
}
has_gc_frame = False

# Always collect data, even when paused
# Track thread status flags and GC frames
for interpreter_info in stack_frames:
threads = getattr(interpreter_info, "threads", [])
for thread_info in threads:
temp_status_counts["total"] += 1

# Track thread status using bit flags
status_flags = getattr(thread_info, "status", 0)
thread_id = getattr(thread_info, "thread_id", None)

# Update aggregated counts
if status_flags & THREAD_STATUS_HAS_GIL:
temp_status_counts["has_gil"] += 1
if status_flags & THREAD_STATUS_ON_CPU:
temp_status_counts["on_cpu"] += 1
if status_flags & THREAD_STATUS_GIL_REQUESTED:
temp_status_counts["gil_requested"] += 1
if status_flags & THREAD_STATUS_UNKNOWN:
temp_status_counts["unknown"] += 1

# Update per-thread status counts
if thread_id is not None:
thread_data = self._get_or_create_thread_data(thread_id)
thread_data.increment_status_flag(status_flags)

# Process frames (respecting skip_idle)
if self.skip_idle:
has_gil = bool(status_flags & THREAD_STATUS_HAS_GIL)
on_cpu = bool(status_flags & THREAD_STATUS_ON_CPU)
if not (has_gil or on_cpu):
continue

frames = getattr(thread_info, "frame_info", None)
if frames:
self.process_frames(frames, thread_id=thread_id)

# Track thread IDs only for threads that actually have samples
if (
thread_id is not None
and thread_id not in self.thread_ids
):
self.thread_ids.append(thread_id)

# Increment per-thread sample count and check for GC frames
thread_has_gc_frame = False
for frame in frames:
funcname = getattr(frame, "funcname", "")
if "<GC>" in funcname or "gc_collect" in funcname:
has_gc_frame = True
thread_has_gc_frame = True
break

if thread_id is not None:
thread_data = self._get_or_create_thread_data(thread_id)
thread_data.sample_count += 1
if thread_has_gc_frame:
thread_data.gc_frame_samples += 1

# Update cumulative thread status counts
for key, count in temp_status_counts.items():
self.thread_status_counts[key] += count
# Collect thread status stats (only available in sync mode)
if not self.async_aware:
status_counts, sample_has_gc, per_thread_stats = self._collect_thread_status_stats(stack_frames)
for key, count in status_counts.items():
self.thread_status_counts[key] += count
if sample_has_gc:
has_gc_frame = True

for thread_id, stats in per_thread_stats.items():
thread_data = self._get_or_create_thread_data(thread_id)
thread_data.has_gil += stats.get("has_gil", 0)
thread_data.on_cpu += stats.get("on_cpu", 0)
thread_data.gil_requested += stats.get("gil_requested", 0)
thread_data.unknown += stats.get("unknown", 0)
thread_data.total += stats.get("total", 0)
if stats.get("gc_samples", 0):
thread_data.gc_frame_samples += stats["gc_samples"]

# Process frames using pre-selected iterator
for frames, thread_id in self._get_frame_iterator(stack_frames):
if not frames:
continue

self.process_frames(frames, thread_id=thread_id)

# Track thread IDs
if thread_id is not None and thread_id not in self.thread_ids:
self.thread_ids.append(thread_id)

if thread_id is not None:
thread_data = self._get_or_create_thread_data(thread_id)
thread_data.sample_count += 1

if has_gc_frame:
self.gc_frame_samples += 1
Expand Down
10 changes: 8 additions & 2 deletions Lib/profiling/sampling/pstats_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,14 @@ def _process_frames(self, frames):
self.callers[callee][caller] += 1

def collect(self, stack_frames):
for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=self.skip_idle):
self._process_frames(frames)
if stack_frames and hasattr(stack_frames[0], "awaited_by"):
# Async frame processing
for frames, thread_id, task_id in self._iter_async_frames(stack_frames):
self._process_frames(frames)
else:
# Regular frame processing
for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=self.skip_idle):
self._process_frames(frames)

def export(self, filename):
self.create_stats()
Expand Down
Loading
Loading