Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "request read" event #2705

Merged
merged 3 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 49 additions & 11 deletions bpf/unwinders/native.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ _Static_assert(1 << MAX_MAPPINGS_BINARY_SEARCH_DEPTH >= MAX_MAPPINGS_PER_PROCESS
#define BINARY_SEARCH_NOT_FOUND(var) (var == BINARY_SEARCH_DEFAULT)
#define BINARY_SEARCH_FAILED(var) (var == BINARY_SEARCH_SHOULD_NEVER_HAPPEN || var == BINARY_SEARCH_EXHAUSTED_ITERATIONS)

#define REQUEST_UNWIND_INFORMATION (1ULL << 63)
#define REQUEST_PROCESS_MAPPINGS (1ULL << 62)
#define REQUEST_REFRESH_PROCINFO (1ULL << 61)
#define REQUEST_UNWIND_INFORMATION 0
#define REQUEST_PROCESS_MAPPINGS 1
#define REQUEST_REFRESH_PROCINFO 2
#define REQUEST_READ 3

#define ENABLE_STATS_PRINTING false

Expand Down Expand Up @@ -129,6 +130,7 @@ struct unwinder_config_t {
u32 rate_limit_unwind_info;
u32 rate_limit_process_mappings;
u32 rate_limit_refresh_process_info;
u32 rate_limit_reads;
};

struct unwinder_stats_t {
Expand All @@ -155,6 +157,7 @@ struct unwinder_stats_t {
u64 event_request_unwind_information;
u64 event_request_process_mappings;
u64 event_request_refresh_process_info;
u64 event_request_read;

u64 total_zero_pids;
u64 total_kthreads;
Expand Down Expand Up @@ -269,7 +272,13 @@ BPF_HASH(unwind_info_chunks, u64, unwind_info_chunks_t,
BPF_HASH(unwind_tables, u64, stack_unwind_table_t,
5); // Table size will be updated in userspace.

BPF_HASH(events_count, u64, u32, MAX_PROCESSES);
typedef struct {
u8 type;
int pid;
} pid_event_t;
_Static_assert(sizeof(pid_event_t) == 8, "event payload expected to be 64 bits");

BPF_HASH(events_count, pid_event_t, u32, MAX_PROCESSES);

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
Expand Down Expand Up @@ -338,6 +347,7 @@ DEFINE_COUNTER(success_dwarf_reach_bottom);
DEFINE_COUNTER(event_request_unwind_information);
DEFINE_COUNTER(event_request_process_mappings);
DEFINE_COUNTER(event_request_refresh_process_info);
DEFINE_COUNTER(event_request_read)

DEFINE_COUNTER(total_zero_pids);
DEFINE_COUNTER(total_kthreads);
Expand Down Expand Up @@ -392,7 +402,7 @@ static void bump_samples() {

/*================================= EVENTS ==================================*/

static __always_inline bool event_rate_limited(u64 event_id, int rate) {
static __always_inline bool event_rate_limited(pid_event_t event_id, int rate) {
u32 zero = 0;
u32 *val = bpf_map_lookup_or_try_init(&events_count, &event_id, &zero);
if (val) {
Expand All @@ -411,7 +421,7 @@ static __always_inline void request_unwind_information(struct bpf_perf_event_dat
bpf_get_current_comm(comm, 20);
LOG("[debug] requesting unwind info for PID: %d, comm: %s ctx IP: %llx", user_pid, comm, PT_REGS_IP(&ctx->regs));

u64 payload = REQUEST_UNWIND_INFORMATION | user_pid;
pid_event_t payload = {REQUEST_UNWIND_INFORMATION, user_pid};
if (event_rate_limited(payload, unwinder_config.rate_limit_unwind_info)) {
return;
}
Expand All @@ -421,7 +431,7 @@ static __always_inline void request_unwind_information(struct bpf_perf_event_dat
}

static __always_inline void request_process_mappings(struct bpf_perf_event_data *ctx, int user_pid) {
u64 payload = REQUEST_PROCESS_MAPPINGS | user_pid;
pid_event_t payload = {REQUEST_PROCESS_MAPPINGS, user_pid};
if (event_rate_limited(payload, unwinder_config.rate_limit_process_mappings)) {
return;
}
Expand All @@ -430,14 +440,36 @@ static __always_inline void request_process_mappings(struct bpf_perf_event_data
}

static __always_inline void request_refresh_process_info(struct bpf_perf_event_data *ctx, int user_pid) {
u64 payload = REQUEST_REFRESH_PROCINFO | user_pid;
pid_event_t payload = {REQUEST_REFRESH_PROCINFO, user_pid};
if (event_rate_limited(payload, unwinder_config.rate_limit_process_mappings)) {
return;
}
bump_unwind_event_request_refresh_process_info();
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &payload, sizeof(u64));
}

static __always_inline void request_read(struct bpf_perf_event_data *ctx, int user_pid, u64 addr) {
typedef struct {
u8 type;
u32 pid;
u64 addr;
} payload_t;
_Static_assert(sizeof(payload_t) == 16, "request_read_addr payload expected to be 128 bits");
// `event_rate_limited` can fail open in case the map is already full.
// We want to have `rate_limit_reads == 0` act as a kill switch where we can be sure
// to NEVER try to read process memory from the agent, so let's just bail early in that case.
if (!unwinder_config.rate_limit_reads) {
return;
}
payload_t payload = {REQUEST_READ, user_pid, addr};
pid_event_t payload_for_rate_limiting = {REQUEST_READ, user_pid};
if (event_rate_limited(payload_for_rate_limiting, unwinder_config.rate_limit_reads)) {
return;
}
bump_unwind_event_request_read();
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &payload, sizeof(payload));
}

// Binary search the executable mappings to find the one that covers a given pc.
static u64 find_mapping(process_info_t *proc_info, u64 pc) {
u64 left = 0;
Expand Down Expand Up @@ -1012,12 +1044,13 @@ int native_unwind(struct bpf_perf_event_data *ctx) {
}

u64 previous_rip = 0;
u64 previous_rip_addr;

// HACK(javierhonduco): This is an architectural shortcut we can take. As we
// only support x86_64 at the minute, we can assume that the return address
// is *always* 8 bytes ahead of the previous stack pointer.
#if __TARGET_ARCH_x86
u64 previous_rip_addr = previous_rsp - 8;
previous_rip_addr = previous_rsp - 8;
int err = bpf_probe_read_user(&previous_rip, 8, (void *)(previous_rip_addr));
if (err < 0) {
LOG("\t[error] Failed to read previous rip with error: %d", err);
Expand All @@ -1029,8 +1062,9 @@ int native_unwind(struct bpf_perf_event_data *ctx) {
// For the leaf frame, the saved pc/ip is always be stored in the link register itself
if (found_lr_offset == 0) {
previous_rip = canonicalize_addr(PT_REGS_RET(&ctx->regs));
previous_rip_addr = 0;
} else {
u64 previous_rip_addr = previous_rsp + found_lr_offset;
previous_rip_addr = previous_rsp + found_lr_offset;
int err = bpf_probe_read_user(&previous_rip, 8, (void *)(previous_rip_addr));
previous_rip = canonicalize_addr(previous_rip);
if (err < 0) {
Expand All @@ -1052,7 +1086,11 @@ int native_unwind(struct bpf_perf_event_data *ctx) {
return 1;
}

LOG("[error] previous_rip should not be zero. This can mean that the read failed, ret=%d while reading previous_rip_addr", err);
LOG("[warn] previous_rip should not be zero. This can mean that the read failed, ret=%d while reading previous_rip_addr", err);
if (err == -EFAULT && previous_rip_addr) {
LOG("[info] requesting that the user-space process attempt to fault in the memory at 0x%lx", previous_rip_addr);
request_read(ctx, user_pid, previous_rip_addr);
}
bump_unwind_error_catchall();
BUMP_UNWIND_FAILED_COUNT(per_process_id, previous_rip_zero);
return 1;
Expand Down
2 changes: 2 additions & 0 deletions cmd/parca-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ type FlagsHidden struct {
RateLimitUnwindInfo uint32 `default:"50" hidden:""`
RateLimitProcessMappings uint32 `default:"50" hidden:""`
RateLimitRefreshProcessInfo uint32 `default:"50" hidden:""`
RateLimitRead uint32 `default:"50" hidden:""`
}

type FlagsBPF struct {
Expand Down Expand Up @@ -967,6 +968,7 @@ func run(logger log.Logger, reg *prometheus.Registry, flags flags, cpus cpuinfo.
RateLimitUnwindInfo: flags.Hidden.RateLimitUnwindInfo,
RateLimitProcessMappings: flags.Hidden.RateLimitProcessMappings,
RateLimitRefreshProcessInfo: flags.Hidden.RateLimitRefreshProcessInfo,
RateLimitRead: flags.Hidden.RateLimitRead,
CollectTraceID: flags.CollectTraceID,
},
bpfProgramLoaded,
Expand Down
11 changes: 8 additions & 3 deletions pkg/profiler/cpu/bpf/maps/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ const (
)

const (
RequestUnwindInformation = 1 << 63
RequestProcessMappings = 1 << 62
RequestRefreshProcInfo = 1 << 61
RequestUnwindInformation byte = iota
RequestProcessMappings
RequestRefreshProcInfo
RequestRead
)

var (
Expand Down Expand Up @@ -1529,6 +1530,10 @@ func (m *Maps) GetUnwindFailedReasons() (map[int]profiler.UnwindFailedReasons, e
return ret, nil
}

func (m *Maps) ByteOrder() binary.ByteOrder {
return m.byteOrder
}

// 1. Find executable sections
// 2. For each section, generate compact table
// 3. Add table to maps
Expand Down
1 change: 1 addition & 0 deletions pkg/profiler/cpu/bpf/metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type unwinderStats struct {
EventRequestUnwindInformation uint64
EventRequestProcessMappings uint64
EventRequestRefreshProcessInfo uint64
EventRequestRead uint64

TotalZeroPids uint64
TotalKthreads uint64
Expand Down
89 changes: 80 additions & 9 deletions pkg/profiler/cpu/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"context"
"debug/elf"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"os"
"regexp"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -74,6 +76,7 @@ type UnwinderConfig struct {
RateLimitUnwindInfo uint32
RateLimitProcessMappings uint32
RateLimitRefreshProcessInfo uint32
RateLimitRead uint32
}

type Config struct {
Expand All @@ -100,6 +103,7 @@ type Config struct {
RateLimitUnwindInfo uint32
RateLimitProcessMappings uint32
RateLimitRefreshProcessInfo uint32
RateLimitRead uint32

CollectTraceID bool
}
Expand All @@ -108,6 +112,11 @@ func (c Config) DebugModeEnabled() bool {
return len(c.DebugProcessNames) > 0
}

type requestReadCacheKey struct {
Pid int32
Addr uint64
}

type CPU struct {
config *Config

Expand All @@ -124,6 +133,7 @@ type CPU struct {
bpfMaps *bpfmaps.Maps

framePointerCache unwind.FramePointerCache
requestReadCache *cache.CacheWithTTL[requestReadCacheKey, struct{}]
interpSymTab profile.InterpreterSymbolTable

byteOrder binary.ByteOrder
Expand All @@ -140,6 +150,19 @@ type CPU struct {
cpus cpuinfo.CPUSet
}

type PidEventPayload struct {
Type uint8
Padding1, Padding2, Padding3 uint8
Pid int32
}

type RequestReadPayload struct {
Type uint8
Padding1, Padding2, Padding3 uint8
Pid int32
Addr uint64
}

func NewCPUProfiler(
logger log.Logger,
reg prometheus.Registerer,
Expand All @@ -165,6 +188,12 @@ func NewCPUProfiler(

// CPU profiler specific caches.
framePointerCache: unwind.NewHasFramePointersCache(logger, reg, compilerInfoManager),
// Cache for debouncing /proc/<pid>/mem reads: only attempt to read the
// same pid and address every 10 seconds at most.
requestReadCache: cache.NewLRUCacheWithTTL[requestReadCacheKey, struct{}](
prometheus.WrapRegistererWith(prometheus.Labels{"cache": "request_read"}, reg),
10000,
time.Second*10),

byteOrder: byteorder.GetHostByteOrder(),

Expand Down Expand Up @@ -350,6 +379,7 @@ func loadBPFModules(logger log.Logger, reg prometheus.Registerer, memlockRlimit
RateLimitUnwindInfo: config.RateLimitUnwindInfo,
RateLimitProcessMappings: config.RateLimitProcessMappings,
RateLimitRefreshProcessInfo: config.RateLimitRefreshProcessInfo,
RateLimitRead: config.RateLimitRead,
}); err != nil {
return nil, nil, fmt.Errorf("init global variable: %w", err)
}
Expand Down Expand Up @@ -436,6 +466,23 @@ func loadBPFModules(logger log.Logger, reg prometheus.Registerer, memlockRlimit
return nil, nil, lerr
}

func handleRequestRead(pid int, addr uint64) ([]byte, error) {
filePath := "/proc/" + strconv.Itoa(pid) + "/mem"
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()

buffer := make([]byte, 8)
_, err = file.ReadAt(buffer, int64(addr))
if err != nil {
return nil, err
}

return buffer, nil
}

// listenEvents listens for events from the BPF program and handles them.
// It also listens for lost events and logs them.
func (p *CPU) listenEvents(ctx context.Context, eventsChan <-chan []byte, lostChan <-chan uint64, requestUnwindInfoChan chan<- int) {
Expand Down Expand Up @@ -509,27 +556,51 @@ func (p *CPU) listenEvents(ctx context.Context, eventsChan <-chan []byte, lostCh
continue
}

payload := binary.LittleEndian.Uint64(receivedBytes)
// Get the 4 more significant bytes and convert to int as they are different types.
// On x86_64:
// - unsafe.Sizeof(int(0)) = 8
// - unsafe.Sizeof(uint32(0)) = 4
pid := int(int32(payload))
if receivedBytes[0] == bpfmaps.RequestRead {
var payload RequestReadPayload
if err := binary.Read(bytes.NewBuffer(receivedBytes), p.bpfMaps.ByteOrder(), &payload); err != nil {
level.Error(p.logger).Log("msg", "failed reading request read event payload",
"payload", hex.EncodeToString(receivedBytes),
"err", err, "byteOrder", p.bpfMaps.ByteOrder())
continue
}
pid, addr := payload.Pid, payload.Addr
key := requestReadCacheKey{pid, addr}
if _, has := p.requestReadCache.Get(key); has {
continue
}
p.requestReadCache.Add(key, struct{}{})
if _, err := handleRequestRead(int(pid), addr); err != nil {
level.Warn(p.logger).Log("msg", "failed reading memory", "pid", pid, "addr", addr, "err", err)
p.metrics.requestReadAttempts.WithLabelValues(labelFailed)
} else {
p.metrics.requestReadAttempts.WithLabelValues(labelSuccess)
}
continue
}

var payload PidEventPayload
if err := binary.Read(bytes.NewBuffer(receivedBytes), p.bpfMaps.ByteOrder(), &payload); err != nil {
level.Error(p.logger).Log("msg", "failed reading event payload", "payload", hex.EncodeToString(receivedBytes), "err", err, "byteOrder", p.bpfMaps.ByteOrder())
continue
}
pid, typ := int(payload.Pid), payload.Type

switch {
case payload&bpfmaps.RequestUnwindInformation == bpfmaps.RequestUnwindInformation:
case typ == bpfmaps.RequestUnwindInformation:
if p.config.DWARFUnwindingDisabled {
continue
}
p.metrics.eventsReceived.WithLabelValues(labelEventUnwindInfo).Inc()
// See onDemandUnwindInfoBatcher for consumer.
requestUnwindInfoChan <- pid
case payload&bpfmaps.RequestProcessMappings == bpfmaps.RequestProcessMappings:
case typ == bpfmaps.RequestProcessMappings:
p.metrics.eventsReceived.WithLabelValues(labelEventProcessMappings).Inc()
if _, exists := fetchInProgress.LoadOrStore(pid, struct{}{}); exists {
continue
}
prefetch <- pid
case payload&bpfmaps.RequestRefreshProcInfo == bpfmaps.RequestRefreshProcInfo:
case typ == bpfmaps.RequestRefreshProcInfo:
p.metrics.eventsReceived.WithLabelValues(labelEventRefreshProcInfo).Inc()
// Refresh mappings and their unwind info if they've changed.
if _, exists := refreshInProgress.LoadOrStore(pid, struct{}{}); exists {
Expand Down
Loading
Loading