Skip to content

Commit

Permalink
Add "request read" event (#2705)
Browse files Browse the repository at this point in the history
If a page is not mapped in a process, `bpf_probe_read_user` will not be
able to read it (even if the page is resident, so reading would have
only caused a minor fault).

In this case, instead of giving up, we can ask the agent to fault it
into the process by reading /proc/<pid>/mem at the specified offset.

To test, compile the following program with `-O0` and attempt to profile
it:

    #include <stdint.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>

    void x(uint64_t orig_page) {
            uint64_t this_page = ((uint64_t)(&orig_page) >> 12);

            if (this_page != orig_page) {
                    pid_t pid = fork();
                    if (pid)
                            printf("Forked child pid: %d\n", pid);
                    else
                            for (;;)
                                    ;
            } else
                    x(orig_page);
    }

    int main(int argc, char *argv[]) {
            x((uint64_t)(&argc) >> 12);
    }

This program does the following:

1. Recurses until the stack crosses a page boundary
2. Forks -- the new process will initially not have any pages mapped in
its address space
3. Loops forever -- thus it will never go back to the old stack frame
and read it.

This program consistently produces 100% `PreviousRipZero` errors before
this commit, but with this commit, it eventually starts bieng profiled
successfully.
  • Loading branch information
kakkoyun committed Apr 16, 2024
2 parents 8917e7c + bd1bc11 commit 31bdda2
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 23 deletions.
60 changes: 49 additions & 11 deletions bpf/unwinders/native.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ _Static_assert(1 << MAX_MAPPINGS_BINARY_SEARCH_DEPTH >= MAX_MAPPINGS_PER_PROCESS
#define BINARY_SEARCH_NOT_FOUND(var) (var == BINARY_SEARCH_DEFAULT)
#define BINARY_SEARCH_FAILED(var) (var == BINARY_SEARCH_SHOULD_NEVER_HAPPEN || var == BINARY_SEARCH_EXHAUSTED_ITERATIONS)

#define REQUEST_UNWIND_INFORMATION (1ULL << 63)
#define REQUEST_PROCESS_MAPPINGS (1ULL << 62)
#define REQUEST_REFRESH_PROCINFO (1ULL << 61)
#define REQUEST_UNWIND_INFORMATION 0
#define REQUEST_PROCESS_MAPPINGS 1
#define REQUEST_REFRESH_PROCINFO 2
#define REQUEST_READ 3

#define ENABLE_STATS_PRINTING false

Expand Down Expand Up @@ -129,6 +130,7 @@ struct unwinder_config_t {
u32 rate_limit_unwind_info;
u32 rate_limit_process_mappings;
u32 rate_limit_refresh_process_info;
u32 rate_limit_reads;
};

struct unwinder_stats_t {
Expand All @@ -155,6 +157,7 @@ struct unwinder_stats_t {
u64 event_request_unwind_information;
u64 event_request_process_mappings;
u64 event_request_refresh_process_info;
u64 event_request_read;

u64 total_zero_pids;
u64 total_kthreads;
Expand Down Expand Up @@ -269,7 +272,13 @@ BPF_HASH(unwind_info_chunks, u64, unwind_info_chunks_t,
BPF_HASH(unwind_tables, u64, stack_unwind_table_t,
5); // Table size will be updated in userspace.

BPF_HASH(events_count, u64, u32, MAX_PROCESSES);
typedef struct {
u8 type;
int pid;
} pid_event_t;
_Static_assert(sizeof(pid_event_t) == 8, "event payload expected to be 64 bits");

BPF_HASH(events_count, pid_event_t, u32, MAX_PROCESSES);

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
Expand Down Expand Up @@ -338,6 +347,7 @@ DEFINE_COUNTER(success_dwarf_reach_bottom);
DEFINE_COUNTER(event_request_unwind_information);
DEFINE_COUNTER(event_request_process_mappings);
DEFINE_COUNTER(event_request_refresh_process_info);
DEFINE_COUNTER(event_request_read)

DEFINE_COUNTER(total_zero_pids);
DEFINE_COUNTER(total_kthreads);
Expand Down Expand Up @@ -392,7 +402,7 @@ static void bump_samples() {

/*================================= EVENTS ==================================*/

static __always_inline bool event_rate_limited(u64 event_id, int rate) {
static __always_inline bool event_rate_limited(pid_event_t event_id, int rate) {
u32 zero = 0;
u32 *val = bpf_map_lookup_or_try_init(&events_count, &event_id, &zero);
if (val) {
Expand All @@ -411,7 +421,7 @@ static __always_inline void request_unwind_information(struct bpf_perf_event_dat
bpf_get_current_comm(comm, 20);
LOG("[debug] requesting unwind info for PID: %d, comm: %s ctx IP: %llx", user_pid, comm, PT_REGS_IP(&ctx->regs));

u64 payload = REQUEST_UNWIND_INFORMATION | user_pid;
pid_event_t payload = {REQUEST_UNWIND_INFORMATION, user_pid};
if (event_rate_limited(payload, unwinder_config.rate_limit_unwind_info)) {
return;
}
Expand All @@ -421,7 +431,7 @@ static __always_inline void request_unwind_information(struct bpf_perf_event_dat
}

static __always_inline void request_process_mappings(struct bpf_perf_event_data *ctx, int user_pid) {
u64 payload = REQUEST_PROCESS_MAPPINGS | user_pid;
pid_event_t payload = {REQUEST_PROCESS_MAPPINGS, user_pid};
if (event_rate_limited(payload, unwinder_config.rate_limit_process_mappings)) {
return;
}
Expand All @@ -430,14 +440,36 @@ static __always_inline void request_process_mappings(struct bpf_perf_event_data
}

static __always_inline void request_refresh_process_info(struct bpf_perf_event_data *ctx, int user_pid) {
u64 payload = REQUEST_REFRESH_PROCINFO | user_pid;
pid_event_t payload = {REQUEST_REFRESH_PROCINFO, user_pid};
if (event_rate_limited(payload, unwinder_config.rate_limit_process_mappings)) {
return;
}
bump_unwind_event_request_refresh_process_info();
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &payload, sizeof(u64));
}

static __always_inline void request_read(struct bpf_perf_event_data *ctx, int user_pid, u64 addr) {
typedef struct {
u8 type;
u32 pid;
u64 addr;
} payload_t;
_Static_assert(sizeof(payload_t) == 16, "request_read_addr payload expected to be 128 bits");
// `event_rate_limited` can fail open in case the map is already full.
// We want to have `rate_limit_reads == 0` act as a kill switch where we can be sure
// to NEVER try to read process memory from the agent, so let's just bail early in that case.
if (!unwinder_config.rate_limit_reads) {
return;
}
payload_t payload = {REQUEST_READ, user_pid, addr};
pid_event_t payload_for_rate_limiting = {REQUEST_READ, user_pid};
if (event_rate_limited(payload_for_rate_limiting, unwinder_config.rate_limit_reads)) {
return;
}
bump_unwind_event_request_read();
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &payload, sizeof(payload));
}

// Binary search the executable mappings to find the one that covers a given pc.
static u64 find_mapping(process_info_t *proc_info, u64 pc) {
u64 left = 0;
Expand Down Expand Up @@ -1012,12 +1044,13 @@ int native_unwind(struct bpf_perf_event_data *ctx) {
}

u64 previous_rip = 0;
u64 previous_rip_addr;

// HACK(javierhonduco): This is an architectural shortcut we can take. As we
// only support x86_64 at the minute, we can assume that the return address
// is *always* 8 bytes ahead of the previous stack pointer.
#if __TARGET_ARCH_x86
u64 previous_rip_addr = previous_rsp - 8;
previous_rip_addr = previous_rsp - 8;
int err = bpf_probe_read_user(&previous_rip, 8, (void *)(previous_rip_addr));
if (err < 0) {
LOG("\t[error] Failed to read previous rip with error: %d", err);
Expand All @@ -1029,8 +1062,9 @@ int native_unwind(struct bpf_perf_event_data *ctx) {
// For the leaf frame, the saved pc/ip is always be stored in the link register itself
if (found_lr_offset == 0) {
previous_rip = canonicalize_addr(PT_REGS_RET(&ctx->regs));
previous_rip_addr = 0;
} else {
u64 previous_rip_addr = previous_rsp + found_lr_offset;
previous_rip_addr = previous_rsp + found_lr_offset;
int err = bpf_probe_read_user(&previous_rip, 8, (void *)(previous_rip_addr));
previous_rip = canonicalize_addr(previous_rip);
if (err < 0) {
Expand All @@ -1052,7 +1086,11 @@ int native_unwind(struct bpf_perf_event_data *ctx) {
return 1;
}

LOG("[error] previous_rip should not be zero. This can mean that the read failed, ret=%d while reading previous_rip_addr", err);
LOG("[warn] previous_rip should not be zero. This can mean that the read failed, ret=%d while reading previous_rip_addr", err);
if (err == -EFAULT && previous_rip_addr) {
LOG("[info] requesting that the user-space process attempt to fault in the memory at 0x%lx", previous_rip_addr);
request_read(ctx, user_pid, previous_rip_addr);
}
bump_unwind_error_catchall();
BUMP_UNWIND_FAILED_COUNT(per_process_id, previous_rip_zero);
return 1;
Expand Down
2 changes: 2 additions & 0 deletions cmd/parca-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ type FlagsHidden struct {
RateLimitUnwindInfo uint32 `default:"50" hidden:""`
RateLimitProcessMappings uint32 `default:"50" hidden:""`
RateLimitRefreshProcessInfo uint32 `default:"50" hidden:""`
RateLimitRead uint32 `default:"50" hidden:""`
}

type FlagsBPF struct {
Expand Down Expand Up @@ -967,6 +968,7 @@ func run(logger log.Logger, reg *prometheus.Registry, flags flags, cpus cpuinfo.
RateLimitUnwindInfo: flags.Hidden.RateLimitUnwindInfo,
RateLimitProcessMappings: flags.Hidden.RateLimitProcessMappings,
RateLimitRefreshProcessInfo: flags.Hidden.RateLimitRefreshProcessInfo,
RateLimitRead: flags.Hidden.RateLimitRead,
CollectTraceID: flags.CollectTraceID,
},
bpfProgramLoaded,
Expand Down
11 changes: 8 additions & 3 deletions pkg/profiler/cpu/bpf/maps/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ const (
)

const (
RequestUnwindInformation = 1 << 63
RequestProcessMappings = 1 << 62
RequestRefreshProcInfo = 1 << 61
RequestUnwindInformation byte = iota
RequestProcessMappings
RequestRefreshProcInfo
RequestRead
)

var (
Expand Down Expand Up @@ -1529,6 +1530,10 @@ func (m *Maps) GetUnwindFailedReasons() (map[int]profiler.UnwindFailedReasons, e
return ret, nil
}

func (m *Maps) ByteOrder() binary.ByteOrder {
return m.byteOrder
}

// 1. Find executable sections
// 2. For each section, generate compact table
// 3. Add table to maps
Expand Down
1 change: 1 addition & 0 deletions pkg/profiler/cpu/bpf/metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type unwinderStats struct {
EventRequestUnwindInformation uint64
EventRequestProcessMappings uint64
EventRequestRefreshProcessInfo uint64
EventRequestRead uint64

TotalZeroPids uint64
TotalKthreads uint64
Expand Down
89 changes: 80 additions & 9 deletions pkg/profiler/cpu/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"context"
"debug/elf"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"os"
"regexp"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -74,6 +76,7 @@ type UnwinderConfig struct {
RateLimitUnwindInfo uint32
RateLimitProcessMappings uint32
RateLimitRefreshProcessInfo uint32
RateLimitRead uint32
}

type Config struct {
Expand All @@ -100,6 +103,7 @@ type Config struct {
RateLimitUnwindInfo uint32
RateLimitProcessMappings uint32
RateLimitRefreshProcessInfo uint32
RateLimitRead uint32

CollectTraceID bool
}
Expand All @@ -108,6 +112,11 @@ func (c Config) DebugModeEnabled() bool {
return len(c.DebugProcessNames) > 0
}

type requestReadCacheKey struct {
Pid int32
Addr uint64
}

type CPU struct {
config *Config

Expand All @@ -124,6 +133,7 @@ type CPU struct {
bpfMaps *bpfmaps.Maps

framePointerCache unwind.FramePointerCache
requestReadCache *cache.CacheWithTTL[requestReadCacheKey, struct{}]
interpSymTab profile.InterpreterSymbolTable

byteOrder binary.ByteOrder
Expand All @@ -140,6 +150,19 @@ type CPU struct {
cpus cpuinfo.CPUSet
}

type PidEventPayload struct {
Type uint8
Padding1, Padding2, Padding3 uint8
Pid int32
}

type RequestReadPayload struct {
Type uint8
Padding1, Padding2, Padding3 uint8
Pid int32
Addr uint64
}

func NewCPUProfiler(
logger log.Logger,
reg prometheus.Registerer,
Expand All @@ -165,6 +188,12 @@ func NewCPUProfiler(

// CPU profiler specific caches.
framePointerCache: unwind.NewHasFramePointersCache(logger, reg, compilerInfoManager),
// Cache for debouncing /proc/<pid>/mem reads: only attempt to read the
// same pid and address every 10 seconds at most.
requestReadCache: cache.NewLRUCacheWithTTL[requestReadCacheKey, struct{}](
prometheus.WrapRegistererWith(prometheus.Labels{"cache": "request_read"}, reg),
10000,
time.Second*10),

byteOrder: byteorder.GetHostByteOrder(),

Expand Down Expand Up @@ -350,6 +379,7 @@ func loadBPFModules(logger log.Logger, reg prometheus.Registerer, memlockRlimit
RateLimitUnwindInfo: config.RateLimitUnwindInfo,
RateLimitProcessMappings: config.RateLimitProcessMappings,
RateLimitRefreshProcessInfo: config.RateLimitRefreshProcessInfo,
RateLimitRead: config.RateLimitRead,
}); err != nil {
return nil, nil, fmt.Errorf("init global variable: %w", err)
}
Expand Down Expand Up @@ -436,6 +466,23 @@ func loadBPFModules(logger log.Logger, reg prometheus.Registerer, memlockRlimit
return nil, nil, lerr
}

func handleRequestRead(pid int, addr uint64) ([]byte, error) {
filePath := "/proc/" + strconv.Itoa(pid) + "/mem"
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()

buffer := make([]byte, 8)
_, err = file.ReadAt(buffer, int64(addr))
if err != nil {
return nil, err
}

return buffer, nil
}

// listenEvents listens for events from the BPF program and handles them.
// It also listens for lost events and logs them.
func (p *CPU) listenEvents(ctx context.Context, eventsChan <-chan []byte, lostChan <-chan uint64, requestUnwindInfoChan chan<- int) {
Expand Down Expand Up @@ -509,27 +556,51 @@ func (p *CPU) listenEvents(ctx context.Context, eventsChan <-chan []byte, lostCh
continue
}

payload := binary.LittleEndian.Uint64(receivedBytes)
// Get the 4 more significant bytes and convert to int as they are different types.
// On x86_64:
// - unsafe.Sizeof(int(0)) = 8
// - unsafe.Sizeof(uint32(0)) = 4
pid := int(int32(payload))
if receivedBytes[0] == bpfmaps.RequestRead {
var payload RequestReadPayload
if err := binary.Read(bytes.NewBuffer(receivedBytes), p.bpfMaps.ByteOrder(), &payload); err != nil {
level.Error(p.logger).Log("msg", "failed reading request read event payload",
"payload", hex.EncodeToString(receivedBytes),
"err", err, "byteOrder", p.bpfMaps.ByteOrder())
continue
}
pid, addr := payload.Pid, payload.Addr
key := requestReadCacheKey{pid, addr}
if _, has := p.requestReadCache.Get(key); has {
continue
}
p.requestReadCache.Add(key, struct{}{})
if _, err := handleRequestRead(int(pid), addr); err != nil {
level.Warn(p.logger).Log("msg", "failed reading memory", "pid", pid, "addr", addr, "err", err)
p.metrics.requestReadAttempts.WithLabelValues(labelFailed)
} else {
p.metrics.requestReadAttempts.WithLabelValues(labelSuccess)
}
continue
}

var payload PidEventPayload
if err := binary.Read(bytes.NewBuffer(receivedBytes), p.bpfMaps.ByteOrder(), &payload); err != nil {
level.Error(p.logger).Log("msg", "failed reading event payload", "payload", hex.EncodeToString(receivedBytes), "err", err, "byteOrder", p.bpfMaps.ByteOrder())
continue
}
pid, typ := int(payload.Pid), payload.Type

switch {
case payload&bpfmaps.RequestUnwindInformation == bpfmaps.RequestUnwindInformation:
case typ == bpfmaps.RequestUnwindInformation:
if p.config.DWARFUnwindingDisabled {
continue
}
p.metrics.eventsReceived.WithLabelValues(labelEventUnwindInfo).Inc()
// See onDemandUnwindInfoBatcher for consumer.
requestUnwindInfoChan <- pid
case payload&bpfmaps.RequestProcessMappings == bpfmaps.RequestProcessMappings:
case typ == bpfmaps.RequestProcessMappings:
p.metrics.eventsReceived.WithLabelValues(labelEventProcessMappings).Inc()
if _, exists := fetchInProgress.LoadOrStore(pid, struct{}{}); exists {
continue
}
prefetch <- pid
case payload&bpfmaps.RequestRefreshProcInfo == bpfmaps.RequestRefreshProcInfo:
case typ == bpfmaps.RequestRefreshProcInfo:
p.metrics.eventsReceived.WithLabelValues(labelEventRefreshProcInfo).Inc()
// Refresh mappings and their unwind info if they've changed.
if _, exists := refreshInProgress.LoadOrStore(pid, struct{}{}); exists {
Expand Down

0 comments on commit 31bdda2

Please sign in to comment.