-
Notifications
You must be signed in to change notification settings - Fork 29
/
tracer_map.go
137 lines (123 loc) · 4.16 KB
/
tracer_map.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package flow
import (
"context"
"sync"
"time"
"github.com/gavv/monotime"
"github.com/netobserv/gopipes/pkg/node"
"github.com/sirupsen/logrus"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
)
var mtlog = logrus.WithField("component", "flow.MapTracer")
// MapTracer accesses a mapped source of flows (the eBPF PerCPU HashMap), deserializes it into
// a flow Record structure, and performs the accumulation of each perCPU-record into a single flow
type MapTracer struct {
mapFetcher mapFetcher
evictionTimeout time.Duration
// manages the access to the eviction routines, avoiding two evictions happening at the same time
evictionCond *sync.Cond
lastEvictionNs uint64
}
type mapFetcher interface {
LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
}
func NewMapTracer(fetcher mapFetcher, evictionTimeout time.Duration) *MapTracer {
return &MapTracer{
mapFetcher: fetcher,
evictionTimeout: evictionTimeout,
lastEvictionNs: uint64(monotime.Now()),
evictionCond: sync.NewCond(&sync.Mutex{}),
}
}
// Flush forces reading (and removing) all the flows from the source eBPF map
// and sending the entries to the next stage in the pipeline
func (m *MapTracer) Flush() {
m.evictionCond.Broadcast()
}
func (m *MapTracer) TraceLoop(ctx context.Context) node.StartFunc[[]*Record] {
return func(out chan<- []*Record) {
evictionTicker := time.NewTicker(m.evictionTimeout)
go m.evictionSynchronization(ctx, out)
for {
select {
case <-ctx.Done():
evictionTicker.Stop()
mtlog.Debug("exiting trace loop due to context cancellation")
return
case <-evictionTicker.C:
mtlog.Debug("triggering flow eviction on timer")
m.Flush()
}
}
}
}
// evictionSynchronization loop just waits for the evictionCond to happen
// and triggers the actual eviction. It makes sure that only one eviction
// is being triggered at the same time
func (m *MapTracer) evictionSynchronization(ctx context.Context, out chan<- []*Record) {
// flow eviction loop. It just keeps waiting for eviction until someone triggers the
// evictionCond.Broadcast signal
for {
// make sure we only evict once at a time, even if there are multiple eviction signals
m.evictionCond.L.Lock()
m.evictionCond.Wait()
select {
case <-ctx.Done():
mtlog.Debug("context canceled. Stopping goroutine before evicting flows")
return
default:
mtlog.Debug("evictionSynchronization signal received")
m.evictFlows(ctx, out)
}
m.evictionCond.L.Unlock()
}
}
func (m *MapTracer) evictFlows(ctx context.Context, forwardFlows chan<- []*Record) {
// it's important that this monotonic timer reports same or approximate values as kernel-side bpf_ktime_get_ns()
monotonicTimeNow := monotime.Now()
currentTime := time.Now()
var forwardingFlows []*Record
laterFlowNs := uint64(0)
for flowKey, flowMetrics := range m.mapFetcher.LookupAndDeleteMap() {
aggregatedMetrics := m.aggregate(flowMetrics)
// we ignore metrics that haven't been aggregated (e.g. all the mapped values are ignored)
if aggregatedMetrics.EndMonoTimeTs == 0 {
continue
}
// If it iterated an entry that do not have updated flows
if aggregatedMetrics.EndMonoTimeTs > laterFlowNs {
laterFlowNs = aggregatedMetrics.EndMonoTimeTs
}
forwardingFlows = append(forwardingFlows, NewRecord(
flowKey,
aggregatedMetrics,
currentTime,
uint64(monotonicTimeNow),
))
}
m.lastEvictionNs = laterFlowNs
select {
case <-ctx.Done():
mtlog.Debug("skipping flow eviction as agent is being stopped")
default:
forwardFlows <- forwardingFlows
}
mtlog.Debugf("%d flows evicted", len(forwardingFlows))
}
func (m *MapTracer) aggregate(metrics []ebpf.BpfFlowMetrics) ebpf.BpfFlowMetrics {
if len(metrics) == 0 {
mtlog.Warn("invoked aggregate with no values")
return ebpf.BpfFlowMetrics{}
}
aggr := ebpf.BpfFlowMetrics{}
for _, mt := range metrics {
// eBPF hashmap values are not zeroed when the entry is removed. That causes that we
// might receive entries from previous collect-eviction timeslots.
// We need to check the flow time and discard old flows.
if mt.StartMonoTimeTs <= m.lastEvictionNs || mt.EndMonoTimeTs <= m.lastEvictionNs {
continue
}
Accumulate(&aggr, &mt)
}
return aggr
}