Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/flow/deduper.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type entry struct {
// (no activity for it during the expiration time)
// The justMark argument tells that the deduper should not drop the duplicate flows but
// set their Duplicate field.
// An exception is made for flows created by DNS tracker since these are unique
func Dedupe(expireTime time.Duration, justMark bool) func(in <-chan []*Record, out chan<- []*Record) {
cache := &deduperCache{
expire: expireTime,
Expand All @@ -46,7 +47,7 @@ func Dedupe(expireTime time.Duration, justMark bool) func(in <-chan []*Record, o
cache.removeExpired()
fwd := make([]*Record, 0, len(records))
for _, record := range records {
if cache.isDupe((*ebpf.BpfFlowId)(&record.Id)) {
if record.Metrics.DnsRecord.Id == 0 && cache.isDupe((*ebpf.BpfFlowId)(&record.Id)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an option we mentioned with @msherif1234 was also adding some logic:
Set it as duplicate IF:
(dnsId unset AND cache.isDupe)
OR
(ndsId unset AND DNS-tracker-feature enabled AND Source-port is dns port)

This assumes every response from a DNS port is duplicated anyway by the dns tracker so we can mark all of them as duplicates, except the one holding the DNS info.

That wouldn't be a perfect solution, it has some risks; e.g. if for any reason the DNS tracker doesn't work then all DNS flows would be marked duplicates. I think we should still try to find better .... but that could be "good enough" if we don't find better

if justMark {
record.Duplicate = true
} else {
Expand Down
21 changes: 14 additions & 7 deletions pkg/flow/deduper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ var (
}, Metrics: ebpf.BpfFlowMetrics{
Packets: 2, Bytes: 456, Flags: 1,
}}, Interface: "123456789"}
twoIf2DNS = &Record{RawRecord: RawRecord{Id: ebpf.BpfFlowId{
EthProtocol: 1, Direction: 0, SrcPort: 333, DstPort: 456,
DstMac: MacAddr{0x2}, SrcMac: MacAddr{0x2}, IfIndex: 2,
}, Metrics: ebpf.BpfFlowMetrics{
Packets: 2, Bytes: 456, Flags: 1, DnsRecord: ebpf.BpfDnsRecordT{Id: 1},
}}, Interface: "123456789"}
)

func TestDedupe(t *testing.T) {
Expand All @@ -45,15 +51,16 @@ func TestDedupe(t *testing.T) {
go Dedupe(time.Minute, false)(input, output)

input <- []*Record{
oneIf2, // record 1 at interface 2: should be accepted
twoIf1, // record 2 at interface 1: should be accepted
oneIf1, // record 1 duplicate at interface 1: should NOT be accepted
oneIf1, // (same record key, different interface)
twoIf2, // record 2 duplicate at interface 2: should NOT be accepted
oneIf2, // record 1 at interface 1: should be accepted (same record key, same interface)
oneIf2, // record 1 at interface 2: should be accepted
twoIf1, // record 2 at interface 1: should be accepted
oneIf1, // record 1 duplicate at interface 1: should NOT be accepted
oneIf1, // (same record key, different interface)
twoIf2, // record 2 duplicate at interface 2: should NOT be accepted
oneIf2, // record 1 at interface 1: should be accepted (same record key, same interface)
twoIf2DNS, // record 2 duplicate is accepted because it contains DNS info
}
deduped := receiveTimeout(t, output)
assert.Equal(t, []*Record{oneIf2, twoIf1, oneIf2}, deduped)
assert.Equal(t, []*Record{oneIf2, twoIf1, oneIf2, twoIf2DNS}, deduped)

// should still accept records with same key, same interface,
// and discard these with same key, different interface
Expand Down