Skip to content

Commit

Permalink
Utilize libbpf batch APIs
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun authored and Your Name committed May 5, 2022
1 parent e56ee06 commit 227837a
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 103 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Expand Up @@ -6,3 +6,4 @@ Dockerfile
/images
/.github
/tmp
*.bpf.o
2 changes: 2 additions & 0 deletions Makefile
Expand Up @@ -106,6 +106,8 @@ bpf_compile_tools = $(CMD_LLC) $(CMD_CLANG)
.PHONY: $(bpf_compile_tools)
$(bpf_compile_tools): % : check_%

# TODO(kakkoyun): To prevent out of sync libbpf dependency, we nmight want to try directly linking/updating the submodule in the libbpf-go.
# - Determining the location of the go module cache dir and initializing the submodule in there and linking in here, should be doable.
$(LIBBPF_SRC):
test -d $(LIBBPF_SRC) || (echo "missing libbpf source - maybe do 'git submodule init && git submodule update'" ; false)

Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/write_client.go
Expand Up @@ -105,7 +105,7 @@ func (b *Batcher) batchLoop(ctx context.Context) error {
}

if len(batch) > 0 {
level.Debug(b.logger).Log("msg", "batch write client sent profiles", "count", len(batch))
level.Debug(b.logger).Log("msg", "batch write client has sent profiles", "count", len(batch))
}
return nil
}
Expand Down
209 changes: 107 additions & 102 deletions pkg/profiler/profiler.go
Expand Up @@ -66,11 +66,6 @@ const (

type stack [doubleStackDepth]uint64

type bpfMaps struct {
counts *bpf.BPFMap
stackTraces *bpf.BPFMap
}

// stackCountKey mirrors the struct in parca-agent.bpf.c
// NOTICE: The memory layout and alignment of the struct currently matches the struct in parca-agent.bpf.c.
// However, keep in mind that Go compiler injects padding to align the struct fields to be a multiple of 8 bytes.
Expand All @@ -83,54 +78,9 @@ type stackCountKey struct {
KernelStackID int32
}

func (m bpfMaps) clean() error {
// BPF iterators need the previous value to iterate to the next, so we
// can only delete the "previous" item once we've already iterated to
// the next.

it := m.stackTraces.Iterator()
var prev []byte = nil
for it.Next() {
if prev != nil {
err := m.stackTraces.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
return fmt.Errorf("failed to delete stack trace: %w", err)
}
}

key := it.Key()
prev = make([]byte, len(key))
copy(prev, key)
}
if prev != nil {
err := m.stackTraces.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
return fmt.Errorf("failed to delete stack trace: %w", err)
}
}

it = m.counts.Iterator()
prev = nil
for it.Next() {
if prev != nil {
err := m.counts.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
return fmt.Errorf("failed to delete count: %w", err)
}
}

key := it.Key()
prev = make([]byte, len(key))
copy(prev, key)
}
if prev != nil {
err := m.counts.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
return fmt.Errorf("failed to delete count: %w", err)
}
}

return nil
type bpfMaps struct {
counts *bpf.BPFMap
traces *bpf.BPFMap
}

type metrics struct {
Expand Down Expand Up @@ -199,6 +149,7 @@ type CgroupProfiler struct {

bpfMaps *bpfMaps
byteOrder binary.ByteOrder
countKeys []stackCountKey

lastError error
lastProfileTakenAt time.Time
Expand Down Expand Up @@ -366,11 +317,20 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
return fmt.Errorf("get counts map: %w", err)
}

stackTraces, err := m.GetMap("stack_traces")
traces, err := m.GetMap("stack_traces")
if err != nil {
return fmt.Errorf("get stack traces map: %w", err)
}
p.bpfMaps = &bpfMaps{counts: counts, stackTraces: stackTraces}
p.bpfMaps = &bpfMaps{counts: counts, traces: traces}

// Allocate this here, so it's only allocated once instead of every
// time that p.profileLoop is called below. This is because, as of now,
// this slice will be around 122Kb. We allocate enough to read the entire
// map instead of using the batch iteration feature because it vastly
// simplifies the code in profileLoop and the batch operations are a bit tricky to get right.
// If allocating this much memory upfront is a problem we can always revisit and use
// smaller batch sizes.
p.countKeys = make([]stackCountKey, counts.GetMaxEntries())

ticker := time.NewTicker(p.profilingDuration)
defer ticker.Stop()
Expand All @@ -393,7 +353,15 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
}
}

func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) (err error) {
func (p *CgroupProfiler) loopReport(lastProfileTakenAt time.Time, lastError error) {
p.mtx.Lock()
defer p.mtx.Unlock()

p.lastProfileTakenAt = lastProfileTakenAt
p.lastError = lastError
}

func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) error {
var (
mappings = maps.NewMapping(p.pidMappingFileCache)
kernelMapping = &profile.Mapping{
Expand All @@ -405,33 +373,77 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
kernelLocations = []*profile.Location{}
userLocations = map[uint32][]*profile.Location{} // PID -> []*profile.Location
locationIndices = map[[2]uint64]int{} // [PID, Address] -> index in locations

// Variables needed for eBPF map batch iteration.
countKeysPtr = unsafe.Pointer(&p.countKeys[0])
nextCountKey = uintptr(1)
)

// TODO(kakkoyun): Use libbpf batch functions.
// Reset count keys before collecting new traces from the kernel.
memsetCountKeys(p.countKeys, stackCountKey{})

batchSize := 0
it := p.bpfMaps.counts.Iterator()
for it.Next() {
// This byte slice is only valid for this iteration, so it must be
// copied if we want to do anything with it outside this loop.
keyBytes := it.Key()

var key stackCountKey
// NOTICE: This works because the key struct in Go and the key struct in C has exactly the same memory layout.
// See the comment in stackCountKey for more details.
if err := binary.Read(bytes.NewBuffer(keyBytes), p.byteOrder, &key); err != nil {
return fmt.Errorf("read stack count key: %w", err)
batchSize++
}
if err := it.Err(); err != nil {
return fmt.Errorf("iterate over counts map: %w", err)
}

if batchSize == 0 {
return nil
}
level.Debug(p.logger).Log("msg", "fetching stack trace counts in batch", "batchSize", batchSize)
time.Sleep(1 * time.Second)

var (
values [][]byte
err error
)
values, err = p.bpfMaps.counts.GetValueAndDeleteBatch(countKeysPtr, nil, unsafe.Pointer(&nextCountKey), uint32(batchSize))
if err != nil {
switch {
case errors.Is(err, syscall.EPERM):
level.Error(p.logger).Log("msg", "get value and delete batch: requested number of items is probably greater than existed", "err", err)
// return fmt.Errorf("get value and delete batch: requested number of items is probably greater than existed: %w", err)
return nil

case errors.Is(err, syscall.ENOENT):
level.Debug(p.logger).Log("msg", "no values in batch")
return nil

default:
return fmt.Errorf("get value and delete batch: %w", err)
}
}
if len(values) == 0 {
level.Debug(p.logger).Log("msg", "no values in batch")
return nil
}

for i, key := range p.countKeys {
var (
pid = key.PID
userStackID = key.UserStackID
kernelStackID = key.KernelStackID
)

if pid == 0 {
continue
}

// Twice the stack depth because we have a user and a potential Kernel stack.
// Read order matters, since we read from the key buffer.
stack := stack{}
userErr := p.readUserStack(key.UserStackID, &stack)
userErr := p.getAndDeleteUserStack(userStackID, &stack)
if userErr != nil {
if errors.Is(userErr, errUnrecoverable) {
return userErr
}
level.Debug(p.logger).Log("msg", "failed to read user stack", "err", userErr)
}
kernelErr := p.readKernelStack(key.KernelStackID, &stack)
kernelErr := p.getAndDeleteKernelStack(kernelStackID, &stack)
if kernelErr != nil {
if errors.Is(kernelErr, errUnrecoverable) {
return kernelErr
Expand All @@ -443,10 +455,7 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
continue
}

value, err := p.readValue(keyBytes)
if err != nil {
return fmt.Errorf("read value: %w", err)
}
value := p.byteOrder.Uint64(values[i])
if value == 0 {
// This should never happen, but it's here just in case.
// If we have a zero value, we don't want to add it to the profile.
Expand Down Expand Up @@ -519,9 +528,6 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
}
samples[stack] = sample
}
if it.Err() != nil {
return fmt.Errorf("failed iterator: %w", it.Err())
}

prof, err := p.buildProfile(ctx, captureTime, samples, locations, kernelLocations, userLocations, mappings, kernelMapping)
if err != nil {
Expand All @@ -532,10 +538,6 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
level.Error(p.logger).Log("msg", "failed to send profile", "err", err)
}

if err := p.bpfMaps.clean(); err != nil {
level.Warn(p.logger).Log("msg", "failed to clean BPF maps", "err", err)
}

ksymCacheStats := p.ksymCache.Stats
level.Debug(p.logger).Log("msg", "Kernel symbol cache stats", "stats", ksymCacheStats.String())
p.metrics.ksymCacheHitRate.WithLabelValues("hits").Add(float64(ksymCacheStats.Hits))
Expand All @@ -544,14 +546,6 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
return nil
}

func (p *CgroupProfiler) loopReport(lastProfileTakenAt time.Time, lastError error) {
p.mtx.Lock()
defer p.mtx.Unlock()

p.lastProfileTakenAt = lastProfileTakenAt
p.lastError = lastError
}

func (p *CgroupProfiler) buildProfile(
ctx context.Context,
captureTime time.Time,
Expand Down Expand Up @@ -690,14 +684,14 @@ func (p *CgroupProfiler) resolveKernelFunctions(kernelLocations []*profile.Locat
return kernelFunctions, nil
}

// readUserStack reads the user stack trace from the stacktraces ebpf map into the given buffer.
func (p *CgroupProfiler) readUserStack(userStackID int32, stack *stack) error {
// getAndDeleteUserStack reads the user stack trace from the stacktraces ebpf map into the given buffer and deletes it.
func (p *CgroupProfiler) getAndDeleteUserStack(userStackID int32, stack *stack) error {
if userStackID == 0 {
p.metrics.failedStackUnwindingAttempts.WithLabelValues("user").Inc()
return errors.New("user stack ID is 0, probably stack unwinding failed")
}

stackBytes, err := p.bpfMaps.stackTraces.GetValue(unsafe.Pointer(&userStackID))
stackBytes, err := p.bpfMaps.traces.GetValue(unsafe.Pointer(&userStackID))
if err != nil {
p.metrics.missingStacks.WithLabelValues("user").Inc()
return fmt.Errorf("read user stack trace: %w", err)
Expand All @@ -707,17 +701,21 @@ func (p *CgroupProfiler) readUserStack(userStackID int32, stack *stack) error {
return fmt.Errorf("read user stack bytes, %s: %w", err, errUnrecoverable)
}

if err := p.bpfMaps.traces.DeleteKey(unsafe.Pointer(&userStackID)); err != nil {
return fmt.Errorf("unable to delete stack trace key: %w", err)
}

return nil
}

// readKernelStack reads the kernel stack trace from the stacktraces ebpf map into the given buffer.
func (p *CgroupProfiler) readKernelStack(kernelStackID int32, stack *stack) error {
// getAndDeleteKernelStack reads the kernel stack trace from the stacktraces ebpf map into the given buffer and deletes it.
func (p *CgroupProfiler) getAndDeleteKernelStack(kernelStackID int32, stack *stack) error {
if kernelStackID == 0 {
p.metrics.failedStackUnwindingAttempts.WithLabelValues("kernel").Inc()
return errors.New("kernel stack ID is 0, probably stack unwinding failed")
}

stackBytes, err := p.bpfMaps.stackTraces.GetValue(unsafe.Pointer(&kernelStackID))
stackBytes, err := p.bpfMaps.traces.GetValue(unsafe.Pointer(&kernelStackID))
if err != nil {
p.metrics.missingStacks.WithLabelValues("kernel").Inc()
return fmt.Errorf("read kernel stack trace: %w", err)
Expand All @@ -727,16 +725,10 @@ func (p *CgroupProfiler) readKernelStack(kernelStackID int32, stack *stack) erro
return fmt.Errorf("read kernel stack bytes, %s: %w", err, errUnrecoverable)
}

return nil
}

// readValue reads the value of the given key from the counts ebpf map.
func (p *CgroupProfiler) readValue(keyBytes []byte) (uint64, error) {
valueBytes, err := p.bpfMaps.counts.GetValue(unsafe.Pointer(&keyBytes[0]))
if err != nil {
return 0, fmt.Errorf("get count value: %w", err)
if err := p.bpfMaps.traces.DeleteKey(unsafe.Pointer(&kernelStackID)); err != nil {
return fmt.Errorf("unable to delete stack trace key: %w", err)
}
return p.byteOrder.Uint64(valueBytes), nil
return nil
}

// normalizeProfile calculates the base addresses of a position-independent binary and normalizes captured locations accordingly.
Expand Down Expand Up @@ -827,3 +819,16 @@ func (p *CgroupProfiler) bumpMemlockRlimit() error {

return nil
}

// memsetCountKeys will reset the given slice to the given value.
// This function makes use of the highly optimized copy builtin function
// and is able to fill the entire slice in O(log n) time.
func memsetCountKeys(in []stackCountKey, v stackCountKey) {
if len(in) == 0 {
return
}
in[0] = v
for bp := 1; bp < len(in); bp *= 2 {
copy(in[bp:], in[:bp])
}
}

0 comments on commit 227837a

Please sign in to comment.