Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Include/cpython/pystate.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ struct _ts {
/* Pointer to currently executing frame. */
struct _PyInterpreterFrame *current_frame;

/* Pointer to the base frame (bottommost sentinel frame).
Used by profilers to validate complete stack unwinding.
Points to the embedded base_frame in _PyThreadStateImpl.
The frame is embedded there rather than here because _PyInterpreterFrame
is defined in internal headers that cannot be exposed in the public API. */
struct _PyInterpreterFrame *base_frame;

struct _PyInterpreterFrame *last_profiled_frame;

Py_tracefunc c_profilefunc;
Expand Down
2 changes: 2 additions & 0 deletions Include/internal/pycore_debug_offsets.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ typedef struct _Py_DebugOffsets {
uint64_t next;
uint64_t interp;
uint64_t current_frame;
uint64_t base_frame;
uint64_t last_profiled_frame;
uint64_t thread_id;
uint64_t native_thread_id;
Expand Down Expand Up @@ -273,6 +274,7 @@ typedef struct _Py_DebugOffsets {
.next = offsetof(PyThreadState, next), \
.interp = offsetof(PyThreadState, interp), \
.current_frame = offsetof(PyThreadState, current_frame), \
.base_frame = offsetof(PyThreadState, base_frame), \
.last_profiled_frame = offsetof(PyThreadState, last_profiled_frame), \
.thread_id = offsetof(PyThreadState, thread_id), \
.native_thread_id = offsetof(PyThreadState, native_thread_id), \
Expand Down
5 changes: 5 additions & 0 deletions Include/internal/pycore_tstate.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ extern "C" {

#include "pycore_brc.h" // struct _brc_thread_state
#include "pycore_freelist_state.h" // struct _Py_freelists
#include "pycore_interpframe_structs.h" // _PyInterpreterFrame
#include "pycore_mimalloc.h" // struct _mimalloc_thread_state
#include "pycore_qsbr.h" // struct qsbr
#include "pycore_uop.h" // struct _PyUOpInstruction
Expand Down Expand Up @@ -61,6 +62,10 @@ typedef struct _PyThreadStateImpl {
// semi-public fields are in PyThreadState.
PyThreadState base;

// Embedded base frame - sentinel at the bottom of the frame stack.
// Used by profiling/sampling to detect incomplete stack traces.
_PyInterpreterFrame base_frame;

// The reference count field is used to synchronize deallocation of the
// thread state during runtime finalization.
Py_ssize_t refcount;
Expand Down
29 changes: 29 additions & 0 deletions InternalDocs/frames.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,35 @@ The shim frame points to a special code object containing the `INTERPRETER_EXIT`
instruction which cleans up the shim frame and returns.


### Base frame

Each thread state contains an embedded `_PyInterpreterFrame` called the "base frame"
that serves as a sentinel at the bottom of the frame stack. This frame is allocated
in `_PyThreadStateImpl` (the internal extension of `PyThreadState`) and initialized
when the thread state is created. The `owner` field is set to `FRAME_OWNED_BY_INTERPRETER`.

External profilers and sampling tools can validate that they have successfully unwound
the complete call stack by checking that the frame chain terminates at the base frame.
The `PyThreadState.base_frame` pointer provides the expected address to compare against.
If a stack walk doesn't reach this frame, the sample is incomplete (possibly due to a
race condition) and should be discarded.

The base frame is embedded in `_PyThreadStateImpl` rather than `PyThreadState` because
`_PyInterpreterFrame` is defined in internal headers that cannot be exposed in the
public API. A pointer (`PyThreadState.base_frame`) is provided for profilers to access
the address without needing internal headers.

See the initialization in `new_threadstate()` in [Python/pystate.c](../Python/pystate.c).

#### How profilers should use the base frame

External profilers should read `tstate->base_frame` before walking the stack, then
walk from `tstate->current_frame` following `frame->previous` pointers until reaching
a frame with `owner == FRAME_OWNED_BY_INTERPRETER`. After the walk, verify that the
last frame address matches `base_frame`. If not, discard the sample as incomplete
since the frame chain may have been in an inconsistent state due to concurrent updates.


### Remote Profiling Frame Cache

The `last_profiled_frame` field in `PyThreadState` supports an optimization for
Expand Down
126 changes: 33 additions & 93 deletions Lib/test/test_external_inspection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import contextlib
import unittest
import os
import textwrap
import contextlib
import importlib
import sys
import socket
Expand Down Expand Up @@ -216,33 +216,13 @@ def requires_subinterpreters(meth):
# Simple wrapper functions for RemoteUnwinder
# ============================================================================

# Errors that can occur transiently when reading process memory without synchronization
RETRIABLE_ERRORS = (
"Task list appears corrupted",
"Invalid linked list structure reading remote memory",
"Unknown error reading memory",
"Unhandled frame owner",
"Failed to parse initial frame",
"Failed to process frame chain",
"Failed to unwind stack",
)


def _is_retriable_error(exc):
"""Check if an exception is a transient error that should be retried."""
msg = str(exc)
return any(msg.startswith(err) or err in msg for err in RETRIABLE_ERRORS)


def get_stack_trace(pid):
for _ in busy_retry(SHORT_TIMEOUT):
try:
unwinder = RemoteUnwinder(pid, all_threads=True, debug=True)
return unwinder.get_stack_trace()
except RuntimeError as e:
if _is_retriable_error(e):
continue
raise
continue
raise RuntimeError("Failed to get stack trace after retries")


Expand All @@ -252,9 +232,7 @@ def get_async_stack_trace(pid):
unwinder = RemoteUnwinder(pid, debug=True)
return unwinder.get_async_stack_trace()
except RuntimeError as e:
if _is_retriable_error(e):
continue
raise
continue
raise RuntimeError("Failed to get async stack trace after retries")


Expand All @@ -264,9 +242,7 @@ def get_all_awaited_by(pid):
unwinder = RemoteUnwinder(pid, debug=True)
return unwinder.get_all_awaited_by()
except RuntimeError as e:
if _is_retriable_error(e):
continue
raise
continue
raise RuntimeError("Failed to get all awaited_by after retries")


Expand Down Expand Up @@ -2268,18 +2244,13 @@ def make_unwinder(cache_frames=True):
def _get_frames_with_retry(self, unwinder, required_funcs):
"""Get frames containing required_funcs, with retry for transient errors."""
for _ in range(MAX_TRIES):
try:
with contextlib.suppress(OSError, RuntimeError):
traces = unwinder.get_stack_trace()
for interp in traces:
for thread in interp.threads:
funcs = {f.funcname for f in thread.frame_info}
if required_funcs.issubset(funcs):
return thread.frame_info
except RuntimeError as e:
if _is_retriable_error(e):
pass
else:
raise
time.sleep(0.1)
return None

Expand Down Expand Up @@ -2802,70 +2773,39 @@ def foo2():
make_unwinder,
):
unwinder = make_unwinder(cache_frames=True)
buffer = b""

def recv_msg():
"""Receive a single message from socket."""
nonlocal buffer
while b"\n" not in buffer:
chunk = client_socket.recv(256)
if not chunk:
return None
buffer += chunk
msg, buffer = buffer.split(b"\n", 1)
return msg

def get_thread_frames(target_funcs):
"""Get frames for thread matching target functions."""
retries = 0
for _ in busy_retry(SHORT_TIMEOUT):
if retries >= 5:
break
retries += 1
# On Windows, ReadProcessMemory can fail with OSError
# (WinError 299) when frame pointers are in flux
with contextlib.suppress(RuntimeError, OSError):
traces = unwinder.get_stack_trace()
for interp in traces:
for thread in interp.threads:
funcs = [f.funcname for f in thread.frame_info]
if any(f in funcs for f in target_funcs):
return funcs
return None

# Message dispatch table: signal -> required functions for that thread
dispatch = {
b"t1:baz1": {"baz1", "bar1", "foo1"},
b"t2:baz2": {"baz2", "bar2", "foo2"},
b"t1:blech1": {"blech1", "foo1"},
b"t2:blech2": {"blech2", "foo2"},
}

# Track results for each sync point
results = {}

# Process 4 sync points: baz1, baz2, blech1, blech2
# With the lock, threads are serialized - handle one at a time
for _ in range(4):
msg = recv_msg()
self.assertIsNotNone(msg, "Expected message from subprocess")

# Determine which thread/function and take snapshot
if msg == b"t1:baz1":
funcs = get_thread_frames(["baz1", "bar1", "foo1"])
self.assertIsNotNone(funcs, "Thread 1 not found at baz1")
results["t1:baz1"] = funcs
elif msg == b"t2:baz2":
funcs = get_thread_frames(["baz2", "bar2", "foo2"])
self.assertIsNotNone(funcs, "Thread 2 not found at baz2")
results["t2:baz2"] = funcs
elif msg == b"t1:blech1":
funcs = get_thread_frames(["blech1", "foo1"])
self.assertIsNotNone(funcs, "Thread 1 not found at blech1")
results["t1:blech1"] = funcs
elif msg == b"t2:blech2":
funcs = get_thread_frames(["blech2", "foo2"])
self.assertIsNotNone(funcs, "Thread 2 not found at blech2")
results["t2:blech2"] = funcs

# Release thread to continue
# Process 4 sync points (order depends on thread scheduling)
buffer = _wait_for_signal(client_socket, b"\n")
for i in range(4):
# Extract first message from buffer
msg, sep, buffer = buffer.partition(b"\n")
self.assertIn(msg, dispatch, f"Unexpected message: {msg!r}")

# Sample frames for the thread at this sync point
required_funcs = dispatch[msg]
frames = self._get_frames_with_retry(unwinder, required_funcs)
self.assertIsNotNone(frames, f"Thread not found for {msg!r}")
results[msg] = [f.funcname for f in frames]

# Release thread and wait for next message (if not last)
client_socket.sendall(b"k")
if i < 3:
buffer += _wait_for_signal(client_socket, b"\n")

# Validate Phase 1: baz snapshots
t1_baz = results.get("t1:baz1")
t2_baz = results.get("t2:baz2")
t1_baz = results.get(b"t1:baz1")
t2_baz = results.get(b"t2:baz2")
self.assertIsNotNone(t1_baz, "Missing t1:baz1 snapshot")
self.assertIsNotNone(t2_baz, "Missing t2:baz2 snapshot")

Expand All @@ -2890,8 +2830,8 @@ def get_thread_frames(target_funcs):
self.assertNotIn("foo1", t2_baz)

# Validate Phase 2: blech snapshots (cache invalidation test)
t1_blech = results.get("t1:blech1")
t2_blech = results.get("t2:blech2")
t1_blech = results.get(b"t1:blech1")
t2_blech = results.get(b"t2:blech2")
self.assertIsNotNone(t1_blech, "Missing t1:blech1 snapshot")
self.assertIsNotNone(t2_blech, "Missing t2:blech2 snapshot")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Add incomplete sample detection to prevent corrupted profiling data. Each
thread state now contains an embedded base frame (sentinel at the bottom of
the frame stack) with owner type ``FRAME_OWNED_BY_INTERPRETER``. The profiler
validates that stack unwinding terminates at this sentinel frame. Samples that
fail to reach the base frame (due to race conditions, memory corruption, or
other errors) are now rejected rather than being included as spurious data.
1 change: 1 addition & 0 deletions Modules/_remote_debugging/_remote_debugging.h
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ extern int process_frame_chain(
uintptr_t initial_frame_addr,
StackChunkList *chunks,
PyObject *frame_info,
uintptr_t base_frame_addr,
uintptr_t gc_frame,
uintptr_t last_profiled_frame,
int *stopped_at_cached_frame,
Expand Down
30 changes: 21 additions & 9 deletions Modules/_remote_debugging/frames.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,13 @@ is_frame_valid(

void* frame = (void*)frame_addr;

if (GET_MEMBER(char, frame, unwinder->debug_offsets.interpreter_frame.owner) == FRAME_OWNED_BY_INTERPRETER) {
return 0; // C frame
char owner = GET_MEMBER(char, frame, unwinder->debug_offsets.interpreter_frame.owner);
if (owner == FRAME_OWNED_BY_INTERPRETER) {
return 0; // C frame or sentinel base frame
}

if (GET_MEMBER(char, frame, unwinder->debug_offsets.interpreter_frame.owner) != FRAME_OWNED_BY_GENERATOR
&& GET_MEMBER(char, frame, unwinder->debug_offsets.interpreter_frame.owner) != FRAME_OWNED_BY_THREAD) {
PyErr_Format(PyExc_RuntimeError, "Unhandled frame owner %d.\n",
GET_MEMBER(char, frame, unwinder->debug_offsets.interpreter_frame.owner));
if (owner != FRAME_OWNED_BY_GENERATOR && owner != FRAME_OWNED_BY_THREAD) {
PyErr_Format(PyExc_RuntimeError, "Unhandled frame owner %d.\n", owner);
set_exception_cause(unwinder, PyExc_RuntimeError, "Unhandled frame owner type in async frame");
return -1;
}
Expand Down Expand Up @@ -260,6 +259,7 @@ process_frame_chain(
uintptr_t initial_frame_addr,
StackChunkList *chunks,
PyObject *frame_info,
uintptr_t base_frame_addr,
uintptr_t gc_frame,
uintptr_t last_profiled_frame,
int *stopped_at_cached_frame,
Expand All @@ -269,6 +269,7 @@ process_frame_chain(
{
uintptr_t frame_addr = initial_frame_addr;
uintptr_t prev_frame_addr = 0;
uintptr_t last_frame_addr = 0; // Track last frame visited for validation
const size_t MAX_FRAMES = 1024 + 512;
size_t frame_count = 0;

Expand Down Expand Up @@ -296,14 +297,14 @@ process_frame_chain(
PyObject *frame = NULL;
uintptr_t next_frame_addr = 0;
uintptr_t stackpointer = 0;
last_frame_addr = frame_addr; // Remember this frame address

if (++frame_count > MAX_FRAMES) {
PyErr_SetString(PyExc_RuntimeError, "Too many stack frames (possible infinite loop)");
set_exception_cause(unwinder, PyExc_RuntimeError, "Frame chain iteration limit exceeded");
return -1;
}

// Try chunks first, fallback to direct memory read
if (parse_frame_from_chunks(unwinder, &frame, frame_addr, &next_frame_addr, &stackpointer, chunks) < 0) {
PyErr_Clear();
uintptr_t address_of_code_object = 0;
Expand Down Expand Up @@ -377,6 +378,17 @@ process_frame_chain(
frame_addr = next_frame_addr;
}

// Validate we reached the base frame (sentinel at bottom of stack)
// Only validate if we walked the full chain (didn't stop at cached frame)
// and base_frame_addr is provided (non-zero)
int stopped_early = stopped_at_cached_frame && *stopped_at_cached_frame;
if (!stopped_early && base_frame_addr != 0 && last_frame_addr != base_frame_addr) {
PyErr_Format(PyExc_RuntimeError,
"Incomplete sample: did not reach base frame (expected 0x%lx, got 0x%lx)",
base_frame_addr, last_frame_addr);
return -1;
}

return 0;
}

Expand Down Expand Up @@ -540,7 +552,7 @@ collect_frames_with_cache(
Py_ssize_t frames_before = PyList_GET_SIZE(frame_info);

int stopped_at_cached = 0;
if (process_frame_chain(unwinder, frame_addr, chunks, frame_info, gc_frame,
if (process_frame_chain(unwinder, frame_addr, chunks, frame_info, 0, gc_frame,
last_profiled_frame, &stopped_at_cached,
addrs, &num_addrs, FRAME_CACHE_MAX_FRAMES) < 0) {
return -1;
Expand All @@ -562,7 +574,7 @@ collect_frames_with_cache(
// Cache miss - continue walking from last_profiled_frame to get the rest
STATS_INC(unwinder, frame_cache_misses);
Py_ssize_t frames_before_walk = PyList_GET_SIZE(frame_info);
if (process_frame_chain(unwinder, last_profiled_frame, chunks, frame_info, gc_frame,
if (process_frame_chain(unwinder, last_profiled_frame, chunks, frame_info, 0, gc_frame,
0, NULL, addrs, &num_addrs, FRAME_CACHE_MAX_FRAMES) < 0) {
return -1;
}
Expand Down
Loading
Loading