Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions pkg/flow/deduper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type deduperCache struct {

type entry struct {
key *ebpf.BpfFlowId
dnsRecord *ebpf.BpfDnsRecordT
ifIndex uint32
expiryTime time.Time
}
Expand All @@ -46,14 +47,7 @@ func Dedupe(expireTime time.Duration, justMark bool) func(in <-chan []*Record, o
cache.removeExpired()
fwd := make([]*Record, 0, len(records))
for _, record := range records {
if cache.isDupe((*ebpf.BpfFlowId)(&record.Id)) {
if justMark {
record.Duplicate = true
} else {
continue
}
}
fwd = append(fwd, record)
cache.checkDupe(record, justMark, &fwd)
}
if len(fwd) > 0 {
out <- fwd
Expand All @@ -62,10 +56,9 @@ func Dedupe(expireTime time.Duration, justMark bool) func(in <-chan []*Record, o
}
}

// isDupe returns whether the passed record has been already checked for duplicate for
// another interface
func (c *deduperCache) isDupe(key *ebpf.BpfFlowId) bool {
rk := *key
// checkDupe check current record if its already available nad if not added to fwd records list
func (c *deduperCache) checkDupe(r *Record, justMark bool, fwd *[]*Record) {
rk := r.Id
// zeroes fields from key that should be ignored from the flow comparison
rk.IfIndex = 0
rk.SrcMac = [MacLen]uint8{0, 0, 0, 0, 0, 0}
Expand All @@ -79,17 +72,35 @@ func (c *deduperCache) isDupe(key *ebpf.BpfFlowId) bool {
c.entries.MoveToFront(ele)
// The input flow is duplicate if its interface is different to the interface
// of the non-duplicate flow that was first registered in the cache
return fEntry.ifIndex != key.IfIndex
// except if the new flow has DNS enrichment in this case will enrich the flow in the cache
// with DNS info and mark the current flow as duplicate
if r.Metrics.DnsRecord.Latency != 0 && fEntry.dnsRecord.Latency == 0 {
// copy DNS record to the cached entry and mark it as duplicate
fEntry.dnsRecord.Flags = r.Metrics.DnsRecord.Flags
fEntry.dnsRecord.Id = r.Metrics.DnsRecord.Id
fEntry.dnsRecord.Latency = r.Metrics.DnsRecord.Latency
// fall through to do interface check
}
if fEntry.ifIndex != r.Id.IfIndex {
if justMark {
r.Duplicate = true
*fwd = append(*fwd, r)
}
return
}
*fwd = append(*fwd, r)
return
}
// The flow has not been accounted previously (or was forgotten after expiration)
// so we register it for that concrete interface
e := entry{
key: &rk,
ifIndex: key.IfIndex,
dnsRecord: &r.Metrics.DnsRecord,
ifIndex: r.Id.IfIndex,
expiryTime: timeNow().Add(c.expire),
}
c.ifaces[rk] = c.entries.PushFront(&e)
return false
*fwd = append(*fwd, r)
}

func (c *deduperCache) removeExpired() {
Expand Down
37 changes: 29 additions & 8 deletions pkg/flow/deduper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (
}, Metrics: ebpf.BpfFlowMetrics{
Packets: 2, Bytes: 456, Flags: 1,
}}, Interface: "123456789"}
// another fow from 2 different interfaces and directions
// another flow from 2 different interfaces and directions
twoIf1 = &Record{RawRecord: RawRecord{Id: ebpf.BpfFlowId{
EthProtocol: 1, Direction: 1, SrcPort: 333, DstPort: 456,
DstMac: MacAddr{0x1}, SrcMac: MacAddr{0x1}, IfIndex: 1,
Expand All @@ -36,6 +36,20 @@ var (
}, Metrics: ebpf.BpfFlowMetrics{
Packets: 2, Bytes: 456, Flags: 1,
}}, Interface: "123456789"}
// another flow from 2 different interfaces and directions with DNS latency set on the latest
threeIf1 = &Record{RawRecord: RawRecord{Id: ebpf.BpfFlowId{
EthProtocol: 1, Direction: 1, SrcPort: 433, DstPort: 456,
DstMac: MacAddr{0x1}, SrcMac: MacAddr{0x1}, IfIndex: 1,
}, Metrics: ebpf.BpfFlowMetrics{
Packets: 2, Bytes: 456, Flags: 1,
}}, Interface: "eth0"}
threeIf2 = &Record{RawRecord: RawRecord{Id: ebpf.BpfFlowId{
EthProtocol: 1, Direction: 0, SrcPort: 433, DstPort: 456,
DstMac: MacAddr{0x2}, SrcMac: MacAddr{0x2}, IfIndex: 2,
}, Metrics: ebpf.BpfFlowMetrics{
Packets: 2, Bytes: 456, Flags: 1,
DnsRecord: ebpf.BpfDnsRecordT{Id: 1, Flags: 100, Latency: 1000},
}}, Interface: "123456789", DNSLatency: time.Millisecond}
)

func TestDedupe(t *testing.T) {
Expand All @@ -45,21 +59,28 @@ func TestDedupe(t *testing.T) {
go Dedupe(time.Minute, false)(input, output)

input <- []*Record{
oneIf2, // record 1 at interface 2: should be accepted
twoIf1, // record 2 at interface 1: should be accepted
oneIf1, // record 1 duplicate at interface 1: should NOT be accepted
oneIf1, // (same record key, different interface)
twoIf2, // record 2 duplicate at interface 2: should NOT be accepted
oneIf2, // record 1 at interface 1: should be accepted (same record key, same interface)
oneIf2, // record 1 at interface 2: should be accepted
twoIf1, // record 2 at interface 1: should be accepted
oneIf1, // record 1 duplicate at interface 1: should NOT be accepted
oneIf1, // (same record key, different interface)
twoIf2, // record 2 duplicate at interface 2: should NOT be accepted
oneIf2, // record 1 at interface 1: should be accepted (same record key, same interface)
threeIf1, // record 1 has no DNS so it get enriched with DNS record from the following record
threeIf2, // record 2 is duplicate of record1 and have DNS info , should not be accepted
}
deduped := receiveTimeout(t, output)
assert.Equal(t, []*Record{oneIf2, twoIf1, oneIf2}, deduped)
assert.Equal(t, []*Record{oneIf2, twoIf1, oneIf2, threeIf1}, deduped)

// should still accept records with same key, same interface,
// and discard these with same key, different interface
input <- []*Record{oneIf1, oneIf2}
deduped = receiveTimeout(t, output)
assert.Equal(t, []*Record{oneIf2}, deduped)

// make sure flow with no DNS get enriched with the DNS record
assert.Equal(t, threeIf1.Metrics.DnsRecord.Id, threeIf2.Metrics.DnsRecord.Id)
assert.Equal(t, threeIf1.Metrics.DnsRecord.Flags, threeIf2.Metrics.DnsRecord.Flags)
assert.Equal(t, threeIf1.Metrics.DnsRecord.Latency, threeIf2.Metrics.DnsRecord.Latency)
}

func TestDedupe_EvictFlows(t *testing.T) {
Expand Down