-
Notifications
You must be signed in to change notification settings - Fork 29
/
deduper.go
100 lines (92 loc) · 2.83 KB
/
deduper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package flow
import (
"container/list"
"time"
"github.com/sirupsen/logrus"
)
var dlog = logrus.WithField("component", "flow/Deduper")
var timeNow = time.Now
// deduperCache implement a LRU cache whose elements are evicted if they haven't been accessed
// during the expire duration.
// It is not safe for concurrent access.
type deduperCache struct {
expire time.Duration
// key: RecordKey with the interface and MACs erased, to detect duplicates
// value: listElement pointing to a struct entry
ifaces map[RecordKey]*list.Element
// element: entry structs of the ifaces map ordered by expiry time
entries *list.List
}
type entry struct {
key *RecordKey
ifIndex uint32
expiryTime time.Time
}
// Dedupe receives flows and filters these belonging to duplicate interfaces. It will forward
// the flows from the first interface coming to it, until that flow expires in the cache
// (no activity for it during the expiration time)
func Dedupe(expireTime time.Duration) func(in <-chan []*Record, out chan<- []*Record) {
cache := &deduperCache{
expire: expireTime,
entries: list.New(),
ifaces: map[RecordKey]*list.Element{},
}
return func(in <-chan []*Record, out chan<- []*Record) {
for records := range in {
cache.removeExpired()
fwd := make([]*Record, 0, len(records))
for _, record := range records {
if !cache.isDupe(&record.RecordKey) {
fwd = append(fwd, record)
}
}
if len(fwd) > 0 {
out <- fwd
}
}
}
}
// isDupe returns whether the passed record has been already checked for duplicate for
// another interface
func (c *deduperCache) isDupe(key *RecordKey) bool {
rk := *key
rk.IFIndex = 0
rk.DataLink = DataLink{}
// If a flow has been accounted previously, whatever its interface was,
// it updates the expiry time for that flow
if ele, ok := c.ifaces[rk]; ok {
fEntry := ele.Value.(*entry)
fEntry.expiryTime = timeNow().Add(c.expire)
c.entries.MoveToFront(ele)
// The input flow is duplicate if its interface is different to the interface
// of the non-duplicate flow that was first registered in the cache
return fEntry.ifIndex != key.IFIndex
}
// The flow has not been accounted previously (or was forgotten after expiration)
// so we register it for that concrete interface
e := entry{
key: &rk,
ifIndex: key.IFIndex,
expiryTime: timeNow().Add(c.expire),
}
c.ifaces[rk] = c.entries.PushFront(&e)
return false
}
func (c *deduperCache) removeExpired() {
now := timeNow()
ele := c.entries.Back()
evicted := 0
for ele != nil && now.After(ele.Value.(*entry).expiryTime) {
evicted++
c.entries.Remove(ele)
delete(c.ifaces, *ele.Value.(*entry).key)
ele = c.entries.Back()
}
if evicted > 0 {
dlog.WithFields(logrus.Fields{
"current": c.entries.Len(),
"evicted": evicted,
"expiryTime": c.expire,
}).Debug("entries evicted from the deduper cache")
}
}