diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index be7a1fb5e..25853bb08 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -53,17 +53,18 @@ var plog = logrus.WithField("component", "ebpf.PacketFetcher") // and to flows that are forwarded by the kernel via ringbuffer because could not be aggregated // in the map type FlowFetcher struct { - objects *BpfObjects - qdiscs map[ifaces.Interface]*netlink.GenericQdisc - egressFilters map[ifaces.Interface]*netlink.BpfFilter - ingressFilters map[ifaces.Interface]*netlink.BpfFilter - ringbufReader *ringbuf.Reader - cacheMaxSize int - enableIngress bool - enableEgress bool - pktDropsTracePoint link.Link - rttFentryLink link.Link - rttKprobeLink link.Link + objects *BpfObjects + qdiscs map[ifaces.Interface]*netlink.GenericQdisc + egressFilters map[ifaces.Interface]*netlink.BpfFilter + ingressFilters map[ifaces.Interface]*netlink.BpfFilter + ringbufReader *ringbuf.Reader + cacheMaxSize int + enableIngress bool + enableEgress bool + pktDropsTracePoint link.Link + rttFentryLink link.Link + rttKprobeLink link.Link + lookupAndDeleteSupported bool } type FlowFetcherConfig struct { @@ -119,7 +120,10 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { return nil, fmt.Errorf("rewriting BPF constants definition: %w", err) } - oldKernel := utils.IskernelOlderthan514() + oldKernel := utils.IsKernelOlderThan("5.14.0") + if oldKernel { + log.Infof("kernel older than 5.14.0 detected: not all hooks are supported") + } objects, err := kernelSpecificLoadAndAssign(oldKernel, spec) if err != nil { return nil, err @@ -165,17 +169,18 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { } return &FlowFetcher{ - objects: &objects, - ringbufReader: flows, - egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, - cacheMaxSize: cfg.CacheMaxSize, - enableIngress: cfg.EnableIngress, - enableEgress: cfg.EnableEgress, - pktDropsTracePoint: pktDropsLink, - rttFentryLink: rttFentryLink, - rttKprobeLink: rttKprobeLink, + objects: &objects, + ringbufReader: flows, + egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, + ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, + qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, + cacheMaxSize: cfg.CacheMaxSize, + enableIngress: cfg.EnableIngress, + enableEgress: cfg.EnableEgress, + pktDropsTracePoint: pktDropsLink, + rttFentryLink: rttFentryLink, + rttKprobeLink: rttKprobeLink, + lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported }, nil } @@ -404,35 +409,41 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) { } // LookupAndDeleteMap reads all the entries from the eBPF map and removes them from it. -// It returns a map where the key -// For synchronization purposes, we get/delete a whole snapshot of the flows map. -// This way we avoid missing packets that could be updated on the -// ebpf side while we process/aggregate them here -// Changing this method invocation by BatchLookupAndDelete could improve performance // TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively // Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md -// Race conditions here causes that some flows are lost in high-load scenarios func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics { + if !m.lookupAndDeleteSupported { + return m.legacyLookupAndDeleteMap(met) + } + flowMap := m.objects.AggregatedFlows iterator := flowMap.Iterate() var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize) + var ids []BpfFlowId var id BpfFlowId var metrics []BpfFlowMetrics - count := 0 - // Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions - // TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively + // First, get all ids and don't care about metrics (we need lookup+delete to be atomic) for iterator.Next(&id, &metrics) { + ids = append(ids, id) + } + + count := 0 + // Run the atomic Lookup+Delete; if new ids have been inserted in the meantime, they'll be fetched next time + for i, id := range ids { count++ - if err := flowMap.Delete(id); err != nil { + if err := flowMap.LookupAndDelete(&id, &metrics); err != nil { + if i == 0 && errors.Is(err, ebpf.ErrNotSupported) { + log.WithError(err).Warnf("switching to legacy mode") + m.lookupAndDeleteSupported = false + return m.legacyLookupAndDeleteMap(met) + } log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry") met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows").Inc() + continue } - // We observed that eBFP PerCPU map might insert multiple times the same key in the map - // (probably due to race conditions) so we need to re-join metrics again at userspace - // TODO: instrument how many times the keys are is repeated in the same eviction - flows[id] = append(flows[id], metrics...) + flows[id] = metrics } met.BufferSizeGauge.WithBufferName("hashmap-total").Set(float64(count)) met.BufferSizeGauge.WithBufferName("hashmap-unique").Set(float64(len(flows))) @@ -451,16 +462,21 @@ func (m *FlowFetcher) lookupAndDeleteDNSMap(timeOut time.Duration) { monotonicTimeNow := monotime.Now() dnsMap := m.objects.DnsFlows var dnsKey BpfDnsFlowId + var keysToDelete []BpfDnsFlowId var dnsVal uint64 if dnsMap != nil { + // Ideally the Lookup + Delete should be atomic, however we cannot use LookupAndDelete since the deletion is conditional + // Do not delete while iterating, as it causes severe performance degradation iterator := dnsMap.Iterate() for iterator.Next(&dnsKey, &dnsVal) { if time.Duration(uint64(monotonicTimeNow)-dnsVal) >= timeOut { - if err := dnsMap.Delete(dnsKey); err != nil { - log.WithError(err).WithField("dnsKey", dnsKey). - Warnf("couldn't delete DNS record entry") - } + keysToDelete = append(keysToDelete, dnsKey) + } + } + for _, dnsKey = range keysToDelete { + if err := dnsMap.Delete(dnsKey); err != nil { + log.WithError(err).WithField("dnsKey", dnsKey).Warnf("couldn't delete DNS record entry") } } } @@ -529,14 +545,15 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf // It provides access to packets from the kernel space (via PerfCPU hashmap) type PacketFetcher struct { - objects *BpfObjects - qdiscs map[ifaces.Interface]*netlink.GenericQdisc - egressFilters map[ifaces.Interface]*netlink.BpfFilter - ingressFilters map[ifaces.Interface]*netlink.BpfFilter - perfReader *perf.Reader - cacheMaxSize int - enableIngress bool - enableEgress bool + objects *BpfObjects + qdiscs map[ifaces.Interface]*netlink.GenericQdisc + egressFilters map[ifaces.Interface]*netlink.BpfFilter + ingressFilters map[ifaces.Interface]*netlink.BpfFilter + perfReader *perf.Reader + cacheMaxSize int + enableIngress bool + enableEgress bool + lookupAndDeleteSupported bool } func NewPacketFetcher( @@ -605,14 +622,15 @@ func NewPacketFetcher( } return &PacketFetcher{ - objects: &objects, - perfReader: packets, - egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, - cacheMaxSize: cacheMaxSize, - enableIngress: ingress, - enableEgress: egress, + objects: &objects, + perfReader: packets, + egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, + ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, + qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, + cacheMaxSize: cacheMaxSize, + enableIngress: ingress, + enableEgress: egress, + lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported }, nil } @@ -797,19 +815,35 @@ func (p *PacketFetcher) ReadPerf() (perf.Record, error) { } func (p *PacketFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte { + if !p.lookupAndDeleteSupported { + return p.legacyLookupAndDeleteMap(met) + } + packetMap := p.objects.PacketRecord iterator := packetMap.Iterate() packets := make(map[int][]*byte, p.cacheMaxSize) - var id int + var ids []int var packet []*byte + + // First, get all ids and ignore content (we need lookup+delete to be atomic) for iterator.Next(&id, &packet) { - if err := packetMap.Delete(id); err != nil { - log.WithError(err).WithField("packetID ", id). - Warnf("couldn't delete entry") - met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteFlows").Inc() + ids = append(ids, id) + } + + // Run the atomic Lookup+Delete; if new ids have been inserted in the meantime, they'll be fetched next time + for i, id := range ids { + if err := packetMap.LookupAndDelete(&id, &packet); err != nil { + if i == 0 && errors.Is(err, ebpf.ErrNotSupported) { + log.WithError(err).Warnf("switching to legacy mode") + p.lookupAndDeleteSupported = false + return p.legacyLookupAndDeleteMap(met) + } + log.WithError(err).WithField("packetID", id).Warnf("couldn't delete entry") + met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteEntry").Inc() } - packets[id] = append(packets[id], packet...) + packets[id] = packet } + return packets } diff --git a/pkg/ebpf/tracer_legacy.go b/pkg/ebpf/tracer_legacy.go new file mode 100644 index 000000000..68d879ef7 --- /dev/null +++ b/pkg/ebpf/tracer_legacy.go @@ -0,0 +1,49 @@ +package ebpf + +import "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" + +// This file contains legacy implementations kept for old kernels + +func (m *FlowFetcher) legacyLookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics { + flowMap := m.objects.AggregatedFlows + + iterator := flowMap.Iterate() + var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize) + var id BpfFlowId + var metrics []BpfFlowMetrics + count := 0 + + // Deleting while iterating is really bad for performance (like, really!) as it causes seeing multiple times the same key + // This is solved in >=4.20 kernels with LookupAndDelete + for iterator.Next(&id, &metrics) { + count++ + if err := flowMap.Delete(id); err != nil { + log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry") + met.Errors.WithErrorName("flow-fetcher-legacy", "CannotDeleteFlows").Inc() + } + // We observed that eBFP PerCPU map might insert multiple times the same key in the map + // (probably due to race conditions) so we need to re-join metrics again at userspace + flows[id] = append(flows[id], metrics...) + } + met.BufferSizeGauge.WithBufferName("hashmap-legacy-total").Set(float64(count)) + met.BufferSizeGauge.WithBufferName("hashmap-legacy-unique").Set(float64(len(flows))) + + return flows +} + +func (p *PacketFetcher) legacyLookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte { + packetMap := p.objects.PacketRecord + iterator := packetMap.Iterate() + packets := make(map[int][]*byte, p.cacheMaxSize) + + var id int + var packet []*byte + for iterator.Next(&id, &packet) { + if err := packetMap.Delete(id); err != nil { + log.WithError(err).WithField("packetID ", id).Warnf("couldn't delete entry") + met.Errors.WithErrorName("pkt-fetcher-legacy", "CannotDeleteEntry").Inc() + } + packets[id] = append(packets[id], packet...) + } + return packets +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index f813fd9ca..4691364ba 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -12,10 +12,18 @@ import ( ) var ( - getCurrentKernelVersion = currentKernelVersion - log = logrus.WithField("component", "utils") + kernelVersion uint32 + log = logrus.WithField("component", "utils") ) +func init() { + var err error + kernelVersion, err = currentKernelVersion() + if err != nil { + log.Errorf("failed to get current kernel version: %v", err) + } +} + // GetSocket returns socket string in the correct format based on address family func GetSocket(hostIP string, hostPort int) string { socket := fmt.Sprintf("%s:%d", hostIP, hostPort) @@ -26,22 +34,13 @@ func GetSocket(hostIP string, hostPort int) string { return socket } -func IskernelOlderthan514() bool { - kernelVersion514, err := kernelVersionFromReleaseString("5.14.0") +func IsKernelOlderThan(version string) bool { + refVersion, err := kernelVersionFromReleaseString(version) if err != nil { log.Warnf("failed to get kernel version from release string: %v", err) return false } - currentVersion, err := getCurrentKernelVersion() - if err != nil { - log.Warnf("failed to get current kernel version: %v", err) - return false - } - if currentVersion < kernelVersion514 { - log.Infof("older kernel version not all hooks will be supported") - return true - } - return false + return kernelVersion != 0 && kernelVersion < refVersion } var versionRegex = regexp.MustCompile(`^(\d+)\.(\d+).(\d+).*$`) diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go index 8f212a8d4..24a64df66 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/utils/utils_test.go @@ -1,52 +1,50 @@ package utils import ( - "errors" "testing" + + "github.com/stretchr/testify/require" ) func TestIskernelOlderthan514(t *testing.T) { tests := []struct { - name string - mockKernelVersion func() (uint32, error) - want bool + name string + kernelVersion string + want bool + wantError bool }{ { - name: "Kernel version < 5.14.0", - mockKernelVersion: func() (uint32, error) { - ver, _ := kernelVersionFromReleaseString("5.13.0") - return ver, nil - }, - want: true, + name: "Kernel version < 5.14.0", + kernelVersion: "5.13.0", + want: true, }, { - name: "Kernel version = 5.14.0", - mockKernelVersion: func() (uint32, error) { - ver, _ := kernelVersionFromReleaseString("5.14.0") - return ver, nil - }, - want: false, + name: "Kernel version = 5.14.0", + kernelVersion: "5.14.0", + want: false, }, { - name: "Kernel version > 5.14.0", - mockKernelVersion: func() (uint32, error) { - ver, _ := kernelVersionFromReleaseString("5.15.0") - return ver, nil - }, - want: false, + name: "Kernel version > 5.14.0", + kernelVersion: "5.15.0", + want: false, }, { - name: "Error getting kernel version", - mockKernelVersion: func() (uint32, error) { - return 0, errors.New("error") - }, - want: false, + name: "Error getting kernel version", + kernelVersion: "invalid version", + want: false, + wantError: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - getCurrentKernelVersion = tt.mockKernelVersion - got := IskernelOlderthan514() + versionUint, err := kernelVersionFromReleaseString(tt.kernelVersion) + if tt.wantError { + require.Errorf(t, err, "%s: expecting error, got none", tt.name) + } else { + require.NoErrorf(t, err, "%s: expecting no error, got %v", tt.name, err) + } + kernelVersion = versionUint + got := IsKernelOlderThan("5.14.0") if got != tt.want { t.Errorf("%s: IskernelOlderthan514() = %v, want %v", tt.name, got, tt.want) }