-
Notifications
You must be signed in to change notification settings - Fork 29
/
account.go
90 lines (82 loc) · 2.87 KB
/
account.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package flow
import (
"time"
"github.com/sirupsen/logrus"
)
// Accounter accumulates flows metrics in memory and eventually evicts them via an evictor channel.
// The accounting process is usually done at kernel-space. This type reimplements it at userspace
// for the edge case where packets are submitted directly via ring-buffer because the kernel-side
// accounting map is full.
type Accounter struct {
maxEntries int
evictTimeout time.Duration
entries map[RecordKey]*RecordMetrics
clock func() time.Time
monoClock func() time.Duration
namer InterfaceNamer
}
var alog = logrus.WithField("component", "flow/Accounter")
// NewAccounter creates a new Accounter.
// The cache has no limit and it's assumed that eviction is done by the caller.
func NewAccounter(
maxEntries int, evictTimeout time.Duration, ifaceNamer InterfaceNamer,
clock func() time.Time,
monoClock func() time.Duration,
) *Accounter {
return &Accounter{
maxEntries: maxEntries,
evictTimeout: evictTimeout,
entries: make(map[RecordKey]*RecordMetrics, maxEntries),
namer: ifaceNamer,
clock: clock,
monoClock: monoClock,
}
}
// Account runs in a new goroutine. It reads all the records from the input channel
// and accumulate their metrics internally. Once the metrics have reached their max size
// or the eviction times out, it evicts all the accumulated flows by the returned channel.
func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) {
evictTick := time.NewTicker(c.evictTimeout)
defer evictTick.Stop()
for {
select {
case <-evictTick.C:
if len(c.entries) == 0 {
break
}
evictingEntries := c.entries
c.entries = make(map[RecordKey]*RecordMetrics, c.maxEntries)
go c.evict(evictingEntries, out)
case record, ok := <-in:
if !ok {
alog.Debug("input channel closed. Evicting entries")
// if the records channel is closed, we evict the entries in the
// same goroutine to wait for all the entries to be sent before
// closing the channel
c.evict(c.entries, out)
alog.Debug("exiting account routine")
return
}
if stored, ok := c.entries[record.RecordKey]; ok {
stored.Accumulate(&record.RecordMetrics)
} else {
if len(c.entries) >= c.maxEntries {
evictingEntries := c.entries
c.entries = make(map[RecordKey]*RecordMetrics, c.maxEntries)
go c.evict(evictingEntries, out)
}
c.entries[record.RecordKey] = &record.RecordMetrics
}
}
}
}
func (c *Accounter) evict(entries map[RecordKey]*RecordMetrics, evictor chan<- []*Record) {
now := c.clock()
monotonicNow := uint64(c.monoClock())
records := make([]*Record, 0, len(entries))
for key, metrics := range entries {
records = append(records, NewRecord(key, *metrics, now, monotonicNow, c.namer))
}
alog.WithField("numEntries", len(records)).Debug("records evicted from userspace accounter")
evictor <- records
}