diff --git a/felix/dataplane/linux/bpf_ep_mgr.go b/felix/dataplane/linux/bpf_ep_mgr.go index 13420995264..d387e7d49b3 100644 --- a/felix/dataplane/linux/bpf_ep_mgr.go +++ b/felix/dataplane/linux/bpf_ep_mgr.go @@ -101,6 +101,7 @@ var ( Name: "felix_bpf_happy_dataplane_endpoints", Help: "Number of BPF endpoints that are successfully programmed.", }) + errApplyingPolicy = errors.New("error applying policy") ) var ( @@ -140,12 +141,13 @@ type fileDescriptor interface { type bpfDataplane interface { ensureStarted() - ensureProgramAttached(ap attachPoint) (bpf.AttachResult, error) - ensureNoProgram(ap attachPoint) error + ensureProgramAttached(attachPoint) (qDiscInfo, error) + ensureProgramLoaded(ap attachPoint, ipFamily proto.IPVersion) error + ensureNoProgram(attachPoint) error ensureQdisc(iface string) (bool, error) ensureBPFDevices() error - updatePolicyProgram(rules polprog.Rules, polDir string, ap attachPoint) error - removePolicyProgram(ap attachPoint) error + updatePolicyProgram(rules polprog.Rules, polDir string, ap attachPoint, ipFamily proto.IPVersion) error + removePolicyProgram(ap attachPoint, ipFamily proto.IPVersion) error setAcceptLocal(iface string, val bool) error setRPFilter(iface string, val int) error setRoute(ip.CIDR) @@ -178,8 +180,8 @@ type bpfInterface struct { } func (i *bpfInterfaceState) clearJumps() { - i.policyIdx = [hook.Count]int{-1, -1, -1} - i.filterIdx = [hook.Count]int{-1, -1, -1} + i.v4.clearJumps() + i.v6.clearJumps() } var zeroIface bpfInterface = func() bpfInterface { @@ -209,12 +211,22 @@ const ( ) type bpfInterfaceState struct { - policyIdx [hook.Count]int - filterIdx [hook.Count]int + v4 bpfInterfaceJumpIndices + v6 bpfInterfaceJumpIndices readiness ifaceReadiness qdisc qDiscInfo } +type bpfInterfaceJumpIndices struct { + policyIdx [hook.Count]int + filterIdx [hook.Count]int +} + +func (d *bpfInterfaceJumpIndices) clearJumps() { + d.policyIdx = [hook.Count]int{-1, -1, -1} + d.filterIdx = [hook.Count]int{-1, -1, -1} +} + type qDiscInfo struct { valid bool prio int @@ -252,7 +264,6 @@ type bpfEndpointManager struct { logFilters map[string]string bpfLogLevel string hostname string - hostIP net.IP fibLookupEnabled bool dataIfaceRegex *regexp.Regexp l3IfaceRegex *regexp.Regexp @@ -294,7 +305,7 @@ type bpfEndpointManager struct { polProgsMap maps.Map, opts ...polprog.Option, ) ([]fileDescriptor, []asm.Insns, error) - updatePolicyProgramFn func(rules polprog.Rules, polDir string, ap attachPoint) error + updatePolicyProgramFn func(rules polprog.Rules, polDir string, ap attachPoint, ipFamily proto.IPVersion) error // HEP processing. hostIfaceToEpMap map[string]proto.HostEndpoint @@ -304,8 +315,8 @@ type bpfEndpointManager struct { // UT-able BPF dataplane interface. dp bpfDataplane - ifaceToIpMap map[string]net.IP - opReporter logutils.OpRecorder + //ifaceToIpMap map[string]net.IP + opReporter logutils.OpRecorder // XDP xdpModes []bpf.XDPMode @@ -313,9 +324,6 @@ type bpfEndpointManager struct { // IPv6 Support ipv6Enabled bool - // IP of the tunnel / overlay device - tunnelIP net.IP - // Detected features Features *environment.Features @@ -343,6 +351,20 @@ type bpfEndpointManager struct { natOutIdx int arpMap maps.Map + + v4 *bpfEndpointManagerDataplane + v6 *bpfEndpointManagerDataplane +} + +type bpfEndpointManagerDataplane struct { + ipFamily proto.IPVersion + hostIP net.IP + mgr *bpfEndpointManager + + ifaceToIpMap map[string]net.IP + + // IP of the tunnel / overlay device + tunnelIP net.IP } type serviceKey struct { @@ -415,7 +437,6 @@ func newBPFEndpointManager( profilesToWorkloads: map[proto.ProfileID]set.Set[any]{}, dirtyIfaceNames: set.New[string](), bpfLogLevel: config.BPFLogLevel, - logFilters: config.BPFLogFilters, hostname: config.Hostname, fibLookupEnabled: fibLookupEnabled, dataIfaceRegex: config.BPFDataIfacePattern, @@ -445,8 +466,8 @@ func newBPFEndpointManager( iptablesFilterTable: iptablesFilterTable, onStillAlive: livenessCallback, hostIfaceToEpMap: map[string]proto.HostEndpoint{}, - ifaceToIpMap: map[string]net.IP{}, - opReporter: opReporter, + //ifaceToIpMap: map[string]net.IP{}, + opReporter: opReporter, // ipv6Enabled Should be set to config.Ipv6Enabled, but for now it is better // to set it to BPFIpv6Enabled which is a dedicated flag for development of IPv6. // TODO: set ipv6Enabled to config.Ipv6Enabled when IPv6 support is complete @@ -486,6 +507,12 @@ func newBPFEndpointManager( m.hostNetworkedNATMode = hostNetworkedNATEnabled } + m.v4 = newBPFEndpointManagerDataplane(proto.IPVersion_IPV4, m) + + if m.ipv6Enabled { + m.v6 = newBPFEndpointManagerDataplane(proto.IPVersion_IPV6, m) + } + if m.hostNetworkedNATMode != hostNetworkedNATDisabled { log.Infof("HostNetworkedNATMode is %d", m.hostNetworkedNATMode) family := uint8(4) @@ -575,6 +602,18 @@ func newBPFEndpointManager( return m, nil } +func newBPFEndpointManagerDataplane( + ipFamily proto.IPVersion, + epMgr *bpfEndpointManager, +) *bpfEndpointManagerDataplane { + d := &bpfEndpointManagerDataplane{ + ipFamily: ipFamily, + ifaceToIpMap: map[string]net.IP{}, + mgr: epMgr, + } + return d +} + var _ hasLoadPolicyProgram = (*bpfEndpointManager)(nil) func (m *bpfEndpointManager) repinJumpMaps() error { @@ -657,9 +696,13 @@ func (m *bpfEndpointManager) withIface(ifaceName string, fn func(iface *bpfInter m.dirtyIfaceNames.Add(ifaceName) } -func (m *bpfEndpointManager) updateHostIP(ip net.IP) { +func (m *bpfEndpointManager) updateHostIP(ip net.IP, ipFamily int) { if ip != nil { - m.hostIP = ip + if ipFamily == 4 { + m.v4.hostIP = ip + } else { + m.v6.hostIP = ip + } // Should be safe without the lock since there shouldn't be any active background threads // but taking it now makes us robust to refactoring. m.ifacesLock.Lock() @@ -700,25 +743,25 @@ func (m *bpfEndpointManager) OnUpdate(msg interface{}) { m.onProfileRemove(msg) case *proto.HostMetadataUpdate: - if !m.ipv6Enabled && msg.Hostname == m.hostname { + if msg.Hostname == m.hostname { log.WithField("HostMetadataUpdate", msg).Infof("Host IP changed: %s", msg.Ipv4Addr) - m.updateHostIP(net.ParseIP(msg.Ipv4Addr)) + m.updateHostIP(net.ParseIP(msg.Ipv4Addr), 4) } case *proto.HostMetadataV6Update: if m.ipv6Enabled && msg.Hostname == m.hostname { log.WithField("HostMetadataV6Update", msg).Infof("Host IPv6 changed: %s", msg.Ipv6Addr) - m.updateHostIP(net.ParseIP(msg.Ipv6Addr)) + m.updateHostIP(net.ParseIP(msg.Ipv6Addr), 6) } case *proto.HostMetadataV4V6Update: if msg.Hostname != m.hostname { break } + log.WithField("HostMetadataV4V6Update", msg).Infof("Host IP changed: %s", msg.Ipv4Addr) + m.updateHostIP(net.ParseIP(msg.Ipv4Addr), 4) + if m.ipv6Enabled { log.WithField("HostMetadataV4V6Update", msg).Infof("Host IPv6 changed: %s", msg.Ipv6Addr) - m.updateHostIP(net.ParseIP(msg.Ipv6Addr)) - } else { - log.WithField("HostMetadataV4V6Update", msg).Infof("Host IP changed: %s", msg.Ipv4Addr) - m.updateHostIP(net.ParseIP(msg.Ipv4Addr)) + m.updateHostIP(net.ParseIP(msg.Ipv6Addr), 6) } case *proto.ServiceUpdate: m.onServiceUpdate(msg) @@ -737,37 +780,45 @@ func (m *bpfEndpointManager) onRouteUpdate(update *proto.RouteUpdate) { return } if m.ipv6Enabled { - if ip.To4() != nil { - return // skip ipv4 in ipv6 mode - } - } else { if ip.To4() == nil { - return // skip ipv6 in ipv4 mode + m.v6.tunnelIP = ip } } - m.tunnelIP = ip + if ip.To4() != nil { + m.v4.tunnelIP = ip + } log.WithField("ip", update.Dst).Info("host tunnel") m.dirtyIfaceNames.Add(bpfOutDev) } } func (m *bpfEndpointManager) onInterfaceAddrsUpdate(update *ifaceAddrsUpdate) { - var ipAddrs []net.IP m.ifacesLock.Lock() defer m.ifacesLock.Unlock() + var v6AddrsUpdate bool + v4AddrsUpdate := m.v4.updateIfaceIP(update) + if m.ipv6Enabled { + v6AddrsUpdate = m.v6.updateIfaceIP(update) + } + if v4AddrsUpdate || v6AddrsUpdate { + m.dirtyIfaceNames.Add(update.Name) + } +} + +func (d *bpfEndpointManagerDataplane) updateIfaceIP(update *ifaceAddrsUpdate) bool { + var ipAddrs []net.IP + isDirty := false if update.Addrs != nil && update.Addrs.Len() > 0 { log.Debugf("Interface %+v received address update %+v", update.Name, update.Addrs) update.Addrs.Iter(func(item string) error { ip := net.ParseIP(item) - if m.ipv6Enabled { + if d.ipFamily == proto.IPVersion_IPV6 { if ip.To4() == nil && !ip.IsLinkLocalUnicast() { ipAddrs = append(ipAddrs, ip) } - } else { - if ip.To4() != nil { - ipAddrs = append(ipAddrs, ip) - } + } else if ip.To4() != nil { + ipAddrs = append(ipAddrs, ip) } return nil }) @@ -775,18 +826,41 @@ func (m *bpfEndpointManager) onInterfaceAddrsUpdate(update *ifaceAddrsUpdate) { return bytes.Compare(ipAddrs[i], ipAddrs[j]) < 0 }) if len(ipAddrs) > 0 { - ip, ok := m.ifaceToIpMap[update.Name] + ip, ok := d.ifaceToIpMap[update.Name] if !ok || !ip.Equal(ipAddrs[0]) { - m.ifaceToIpMap[update.Name] = ipAddrs[0] - m.dirtyIfaceNames.Add(update.Name) + d.ifaceToIpMap[update.Name] = ipAddrs[0] + isDirty = true } - } } else { - _, ok := m.ifaceToIpMap[update.Name] + _, ok := d.ifaceToIpMap[update.Name] if ok { - delete(m.ifaceToIpMap, update.Name) - m.dirtyIfaceNames.Add(update.Name) + delete(d.ifaceToIpMap, update.Name) + isDirty = true + } + } + return isDirty +} + +func (m *bpfEndpointManager) reclaimPolicyIdx(name string, ipFamily int, iface *bpfInterface) { + idx := &iface.dpState.v4 + if ipFamily == 6 { + idx = &iface.dpState.v6 + } + for _, attachHook := range []hook.Hook{hook.XDP, hook.Ingress, hook.Egress} { + if err := m.jumpMapDelete(attachHook, idx.policyIdx[attachHook]); err != nil { + log.WithError(err).Warn("Policy program may leak.") + } + if err := m.xdpJumpMapAlloc.Put(idx.policyIdx[attachHook], name); err != nil { + log.WithError(err).Error(attachHook.String()) + } + if attachHook != hook.XDP { + if err := m.jumpMapDelete(attachHook, idx.filterIdx[attachHook]); err != nil { + log.WithError(err).Warn("Filter program may leak.") + } + if err := m.jumpMapAlloc.Put(idx.filterIdx[attachHook], name); err != nil { + log.WithError(err).Error(attachHook.String()) + } } } } @@ -794,6 +868,10 @@ func (m *bpfEndpointManager) onInterfaceAddrsUpdate(update *ifaceAddrsUpdate) { func (m *bpfEndpointManager) updateIfaceStateMap(name string, iface *bpfInterface) { k := ifstate.NewKey(uint32(iface.info.ifIndex)) if iface.info.ifaceIsUp() { + idx := iface.dpState.v4 + if m.ipv6Enabled { + idx = iface.dpState.v6 + } flags := uint32(0) if m.isWorkloadIface(name) { flags |= ifstate.FlgWEP @@ -802,49 +880,19 @@ func (m *bpfEndpointManager) updateIfaceStateMap(name string, iface *bpfInterfac flags |= ifstate.FlgReady } v := ifstate.NewValue(flags, name, - iface.dpState.policyIdx[hook.XDP], - iface.dpState.policyIdx[hook.Ingress], - iface.dpState.policyIdx[hook.Egress], - iface.dpState.filterIdx[hook.Ingress], - iface.dpState.filterIdx[hook.Egress], + idx.policyIdx[hook.XDP], + idx.policyIdx[hook.Ingress], + idx.policyIdx[hook.Egress], + idx.filterIdx[hook.Ingress], + idx.filterIdx[hook.Egress], ) m.ifStateMap.Desired().Set(k, v) } else { - if err := m.jumpMapDelete(hook.XDP, iface.dpState.policyIdx[hook.XDP]); err != nil { - log.WithError(err).Warn("Policy program may leak.") - } - if err := m.xdpJumpMapAlloc.Put(iface.dpState.policyIdx[hook.XDP], name); err != nil { - log.WithError(err).Error("XDP") - } - - if err := m.jumpMapDelete(hook.Ingress, iface.dpState.policyIdx[hook.Ingress]); err != nil { - log.WithError(err).Warn("Policy program may leak.") - } - if err := m.jumpMapAlloc.Put(iface.dpState.policyIdx[hook.Ingress], name); err != nil { - log.WithError(err).Error("Ingress") - } - - if err := m.jumpMapDelete(hook.Egress, iface.dpState.policyIdx[hook.Egress]); err != nil { - log.WithError(err).Warn("Policy program may leak.") - } - if err := m.jumpMapAlloc.Put(iface.dpState.policyIdx[hook.Egress], name); err != nil { - log.WithError(err).Error("Ingress") - } - - if err := m.jumpMapDelete(hook.Egress, iface.dpState.filterIdx[hook.Egress]); err != nil { - log.WithError(err).Warn("Filter program may leak.") - } - if err := m.jumpMapAlloc.Put(iface.dpState.filterIdx[hook.Egress], name); err != nil { - log.WithError(err).Error("Ingress") - } - - if err := m.jumpMapDelete(hook.Ingress, iface.dpState.filterIdx[hook.Ingress]); err != nil { - log.WithError(err).Warn("Filter program may leak.") - } - if err := m.jumpMapAlloc.Put(iface.dpState.filterIdx[hook.Ingress], name); err != nil { - log.WithError(err).Error("Ingress") + if !m.ipv6Enabled { + m.reclaimPolicyIdx(name, 4, iface) + } else { + m.reclaimPolicyIdx(name, 6, iface) } - m.ifStateMap.Desired().Delete(k) iface.dpState.clearJumps() } @@ -1215,15 +1263,24 @@ func (m *bpfEndpointManager) syncIfStateMap() { indexMap[h] = idx } - if !m.isWorkloadIface(netiface.Name) { - // We don't use XDP for WEPs so any ID we read back must be a mistake. - checkAndReclaimIdx(v.XDPPolicy(), hook.XDP, iface.dpState.policyIdx[:]) + if m.ipv6Enabled { + checkAndReclaimIdx(v.IngressPolicy(), hook.Ingress, iface.dpState.v6.policyIdx[:]) + checkAndReclaimIdx(v.EgressPolicy(), hook.Egress, iface.dpState.v6.policyIdx[:]) + checkAndReclaimIdx(v.TcIngressFilter(), hook.Ingress, iface.dpState.v6.filterIdx[:]) + checkAndReclaimIdx(v.TcEgressFilter(), hook.Egress, iface.dpState.v6.filterIdx[:]) + if !m.isWorkloadIface(netiface.Name) { + checkAndReclaimIdx(v.XDPPolicy(), hook.XDP, iface.dpState.v6.policyIdx[:]) + } + } else { + checkAndReclaimIdx(v.IngressPolicy(), hook.Ingress, iface.dpState.v4.policyIdx[:]) + checkAndReclaimIdx(v.EgressPolicy(), hook.Egress, iface.dpState.v4.policyIdx[:]) + checkAndReclaimIdx(v.TcIngressFilter(), hook.Ingress, iface.dpState.v4.filterIdx[:]) + checkAndReclaimIdx(v.TcEgressFilter(), hook.Egress, iface.dpState.v4.filterIdx[:]) + if !m.isWorkloadIface(netiface.Name) { + // We don't use XDP for WEPs so any ID we read back must be a mistake. + checkAndReclaimIdx(v.XDPPolicy(), hook.XDP, iface.dpState.v4.policyIdx[:]) + } } - checkAndReclaimIdx(v.IngressPolicy(), hook.Ingress, iface.dpState.policyIdx[:]) - checkAndReclaimIdx(v.EgressPolicy(), hook.Egress, iface.dpState.policyIdx[:]) - - checkAndReclaimIdx(v.TcIngressFilter(), hook.Ingress, iface.dpState.filterIdx[:]) - checkAndReclaimIdx(v.TcEgressFilter(), hook.Egress, iface.dpState.filterIdx[:]) // Mark all interfaces that we knew about, that we still manage and // that exist as dirty. Since they exist, we either have to deal @@ -1331,7 +1388,6 @@ func (m *bpfEndpointManager) loadDefaultPolicies() error { return fmt.Errorf("failed to load default allow policy program: %w", err) } m.policyTcAllowFD = bpf.ProgFD(fd) - return nil } @@ -1446,10 +1502,9 @@ func (m *bpfEndpointManager) applyProgramsToDirtyDataInterfaces() { } var ( - err error - up bool - xdpIdx, ingressIdx, egressIdx int - ingressFilterIdx, egressFilterIdx int + err error + up bool + state bpfInterfaceState ) m.ifacesLock.Lock() @@ -1457,46 +1512,10 @@ func (m *bpfEndpointManager) applyProgramsToDirtyDataInterfaces() { ifaceName := iface m.withIface(iface, func(iface *bpfInterface) bool { up = iface.info.ifaceIsUp() - - if xdpIdx = iface.dpState.policyIdx[hook.XDP]; xdpIdx == -1 { - if xdpIdx, err = m.xdpJumpMapAlloc.Get(ifaceName); err != nil { - return false - } + if err := m.dataIfaceStateFillJumps(ifaceName, &iface.dpState); err != nil { + return false } - iface.dpState.policyIdx[hook.XDP] = xdpIdx - - if ingressIdx = iface.dpState.policyIdx[hook.Ingress]; ingressIdx == -1 { - if ingressIdx, err = m.jumpMapAlloc.Get(ifaceName); err != nil { - return false - } - } - iface.dpState.policyIdx[hook.Ingress] = ingressIdx - - if egressIdx = iface.dpState.policyIdx[hook.Egress]; egressIdx == -1 { - if egressIdx, err = m.jumpMapAlloc.Get(ifaceName); err != nil { - return false - } - } - iface.dpState.policyIdx[hook.Egress] = egressIdx - - if ingressFilterIdx = iface.dpState.filterIdx[hook.Ingress]; ingressFilterIdx == -1 { - if m.bpfLogLevel == "debug" { - if ingressFilterIdx, err = m.jumpMapAlloc.Get(ifaceName); err != nil { - return false - } - } - } - iface.dpState.filterIdx[hook.Ingress] = ingressFilterIdx - - if egressFilterIdx = iface.dpState.filterIdx[hook.Egress]; egressFilterIdx == -1 { - if m.bpfLogLevel == "debug" { - if egressFilterIdx, err = m.jumpMapAlloc.Get(ifaceName); err != nil { - return false - } - } - } - iface.dpState.filterIdx[hook.Egress] = egressFilterIdx - + state = iface.dpState return false }) @@ -1531,29 +1550,79 @@ func (m *bpfEndpointManager) applyProgramsToDirtyDataInterfaces() { } var parallelWG sync.WaitGroup - var ingressErr, xdpErr error + var ingressErr, xdpErr, err4, err6 error + var ingressAP4, egressAP4 *tc.AttachPoint + var ingressAP6, egressAP6 *tc.AttachPoint + var xdpAP4, xdpAP6 *xdp.AttachPoint + + tcAttachPoint := m.calculateTCAttachPoint(iface) + xdpAttachPoint := &xdp.AttachPoint{ + AttachPoint: bpf.AttachPoint{ + Hook: hook.XDP, + Iface: iface, + LogLevel: m.bpfLogLevel, + }, + Modes: m.xdpModes, + } + + if m.v6 != nil { + parallelWG.Add(1) + go func() { + defer parallelWG.Done() + ingressAP6, egressAP6, xdpAP6, err6 = m.v6.applyPolicyToDataIface(iface, hepPtr, &state, + tcAttachPoint, xdpAttachPoint) + }() + } else if m.v4 != nil { + ingressAP4, egressAP4, xdpAP4, err4 = m.v4.applyPolicyToDataIface(iface, hepPtr, &state, + tcAttachPoint, xdpAttachPoint) + } + + parallelWG.Wait() + + // Attach ingress program. parallelWG.Add(1) go func() { defer parallelWG.Done() - ingressErr = m.attachDataIfaceProgram(iface, hepPtr, PolDirnIngress, ingressIdx, ingressFilterIdx) + ingressAP := mergeAttachPoints(ingressAP4, ingressAP6) + if ingressAP != nil { + _, ingressErr = m.dp.ensureProgramAttached(ingressAP) + } }() + // Attach xdp program. if !m.ipv6Enabled { parallelWG.Add(1) go func() { defer parallelWG.Done() - xdpErr = m.attachXDPProgram(iface, hepPtr, xdpIdx) + xdpAP := mergeAttachPoints(xdpAP4, xdpAP6) + if hepPtr != nil && len(hepPtr.UntrackedTiers) == 1 && xdpAP != nil { + _, xdpErr = m.dp.ensureProgramAttached(xdpAP4) + } else { + xdpErr = m.dp.ensureNoProgram(xdpAP) + } }() } - err = m.attachDataIfaceProgram(iface, hepPtr, PolDirnEgress, egressIdx, egressFilterIdx) + // Attach egress program. + egressAP := mergeAttachPoints(egressAP4, egressAP6) + if egressAP != nil { + _, err = m.dp.ensureProgramAttached(egressAP) + } + parallelWG.Wait() + if err == nil { err = ingressErr } if err == nil { err = xdpErr } + if err == nil { + err = err4 + } + if err == nil { + err = err6 + } if err == nil { // This is required to allow NodePort forwarding with // encapsulation with the host's IP as the source address @@ -1691,39 +1760,106 @@ func (m *bpfEndpointManager) updateWEPsInDataplane() { } } -func (m *bpfEndpointManager) wepStateFillJumps(ifaceName string, state *bpfInterfaceState) error { +func (m *bpfEndpointManager) allocJumpIndicesForWEP(ifaceName string, idx *bpfInterfaceJumpIndices) error { var err error - - if state.policyIdx[hook.Ingress] == -1 { - state.policyIdx[hook.Ingress], err = m.jumpMapAlloc.Get(ifaceName) + if idx.policyIdx[hook.Ingress] == -1 { + idx.policyIdx[hook.Ingress], err = m.jumpMapAlloc.Get(ifaceName) if err != nil { return err } } - if state.policyIdx[hook.Egress] == -1 { - state.policyIdx[hook.Egress], err = m.jumpMapAlloc.Get(ifaceName) + if idx.policyIdx[hook.Egress] == -1 { + idx.policyIdx[hook.Egress], err = m.jumpMapAlloc.Get(ifaceName) if err != nil { return err } } if m.bpfLogLevel == "debug" { - if state.filterIdx[hook.Ingress] == -1 { - state.filterIdx[hook.Ingress], err = m.jumpMapAlloc.Get(ifaceName) + if idx.filterIdx[hook.Ingress] == -1 { + idx.filterIdx[hook.Ingress], err = m.jumpMapAlloc.Get(ifaceName) + if err != nil { + return err + } + } + if idx.filterIdx[hook.Egress] == -1 { + idx.filterIdx[hook.Egress], err = m.jumpMapAlloc.Get(ifaceName) if err != nil { return err } } + } + return nil +} + +func (m *bpfEndpointManager) allocJumpIndicesForDataIface(ifaceName string, idx *bpfInterfaceJumpIndices) error { + var err error + if idx.policyIdx[hook.Ingress] == -1 { + idx.policyIdx[hook.Ingress], err = m.jumpMapAlloc.Get(ifaceName) + if err != nil { + return err + } + } + + if idx.policyIdx[hook.Egress] == -1 { + idx.policyIdx[hook.Egress], err = m.jumpMapAlloc.Get(ifaceName) + if err != nil { + return err + } + } - if state.filterIdx[hook.Egress] == -1 { - state.filterIdx[hook.Egress], err = m.jumpMapAlloc.Get(ifaceName) + if idx.policyIdx[hook.XDP] == -1 { + idx.policyIdx[hook.XDP], err = m.xdpJumpMapAlloc.Get(ifaceName) + if err != nil { + return err + } + } + + if m.bpfLogLevel == "debug" { + if idx.filterIdx[hook.Ingress] == -1 { + idx.filterIdx[hook.Ingress], err = m.jumpMapAlloc.Get(ifaceName) if err != nil { return err } } + if idx.filterIdx[hook.Egress] == -1 { + idx.filterIdx[hook.Egress], err = m.jumpMapAlloc.Get(ifaceName) + if err != nil { + return err + } + } + } + return nil +} + +func (m *bpfEndpointManager) wepStateFillJumps(ifaceName string, state *bpfInterfaceState) error { + if m.ipv6Enabled { + err := m.allocJumpIndicesForWEP(ifaceName, &state.v6) + if err != nil { + return err + } + } else { + err := m.allocJumpIndicesForWEP(ifaceName, &state.v4) + if err != nil { + return err + } } + return nil +} +func (m *bpfEndpointManager) dataIfaceStateFillJumps(ifaceName string, state *bpfInterfaceState) error { + if m.ipv6Enabled { + err := m.allocJumpIndicesForDataIface(ifaceName, &state.v6) + if err != nil { + return err + } + } else { + err := m.allocJumpIndicesForDataIface(ifaceName, &state.v4) + if err != nil { + return err + } + } return nil } @@ -1786,7 +1922,10 @@ func (m *bpfEndpointManager) doApplyPolicy(ifaceName string) (bpfInterfaceState, var ( ingressErr, egressErr error + err4, err6 error ingressQdisc, egressQdisc qDiscInfo + ingressAP4, egressAP4 *tc.AttachPoint + ingressAP6, egressAP6 *tc.AttachPoint wg sync.WaitGroup wep *proto.WorkloadEndpoint ) @@ -1795,38 +1934,59 @@ func (m *bpfEndpointManager) doApplyPolicy(ifaceName string) (bpfInterfaceState, wep = m.allWEPs[*endpointID] } - wg.Add(2) - go func() { - defer wg.Done() - readiness := state.readiness - if readiness == ifaceIsReady { - if _, err := m.dp.queryClassifier(ifindex, state.qdisc.handle, state.qdisc.prio, true); err != nil { - readiness = ifaceNotReady - } + readiness := state.readiness + if readiness == ifaceIsReady { + if _, err := m.dp.queryClassifier(ifindex, state.qdisc.handle, state.qdisc.prio, true); err != nil { + readiness = ifaceNotReady } - ingressQdisc, ingressErr = m.wepApplyPolicyToDirection(readiness, ifaceName, - state.policyIdx[hook.Ingress], state.filterIdx[hook.Ingress], wep, PolDirnIngress) - }() - go func() { - defer wg.Done() - readiness := state.readiness - if readiness == ifaceIsReady { - if _, err := m.dp.queryClassifier(ifindex, state.qdisc.handle, state.qdisc.prio, false); err != nil { - readiness = ifaceNotReady + } + + ap := m.calculateTCAttachPoint(ifaceName) + + if m.v6 != nil { + wg.Add(1) + go func() { + defer wg.Done() + ingressAP6, egressAP6, err6 = m.v6.applyPolicyToWeps(readiness, ifaceName, &state, wep, ap) + }() + } else if m.v4 != nil { + ingressAP4, egressAP4, err4 = m.v4.applyPolicyToWeps(readiness, ifaceName, &state, wep, ap) + } + wg.Wait() + + //Attach preamble TC program + if readiness != ifaceIsReady { + wg.Add(1) + go func() { + defer wg.Done() + ingressAP := mergeAttachPoints(ingressAP4, ingressAP6) + if ingressAP != nil { + ingressQdisc, ingressErr = m.dp.ensureProgramAttached(ingressAP) } + }() + egressAP := mergeAttachPoints(egressAP4, egressAP6) + if egressAP != nil { + egressQdisc, egressErr = m.dp.ensureProgramAttached(egressAP) } - egressQdisc, egressErr = m.wepApplyPolicyToDirection(readiness, ifaceName, - state.policyIdx[hook.Egress], state.filterIdx[hook.Egress], wep, PolDirnEgress) - }() - wg.Wait() + wg.Wait() + } if ingressErr != nil { return state, ingressErr } + if egressErr != nil { return state, egressErr } + if err4 != nil { + return state, err4 + } + + if err6 != nil { + return state, err6 + } + if egressQdisc != ingressQdisc { return state, fmt.Errorf("ingress qdisc info (%v) does not equal egress qdist info (%v)", ingressQdisc, egressQdisc) @@ -1842,6 +2002,20 @@ func (m *bpfEndpointManager) doApplyPolicy(ifaceName string) (bpfInterfaceState, return state, nil } +func (m *bpfEndpointManager) ensureProgramAttached(ap attachPoint) (qDiscInfo, error) { + var qdisc qDiscInfo + res, err := ap.AttachProgram() + if err != nil { + return qdisc, err + } + if tcRes, ok := res.(*tc.AttachResult); ok { + qdisc.valid = true + qdisc.prio = tcRes.Prio() + qdisc.handle = tcRes.Handle() + } + return qdisc, nil +} + // applyPolicy actually applies the policy to the given workload. func (m *bpfEndpointManager) applyPolicy(ifaceName string) error { state, err := m.doApplyPolicy(ifaceName) @@ -1856,6 +2030,26 @@ func (m *bpfEndpointManager) applyPolicy(ifaceName string) error { return err } +func mergeAttachPoints(ap4, ap6 attachPoint) attachPoint { + if aptcV4, v4ok := ap4.(*tc.AttachPoint); v4ok { + if aptcV6, v6ok := ap6.(*tc.AttachPoint); v6ok { + if aptcV4 != nil && aptcV6 == nil { + return aptcV4 + } else if aptcV6 != nil && aptcV4 == nil { + return aptcV6 + } + } + } else if apxdpV4, v4ok := ap4.(*xdp.AttachPoint); v4ok { + apxdpV6, _ := ap6.(*xdp.AttachPoint) + if apxdpV4 != nil && apxdpV6 == nil { + return apxdpV4 + } else if apxdpV6 != nil && apxdpV4 == nil { + return apxdpV6 + } + } + return nil +} + func isLinkNotFoundError(err error) bool { if errors.Is(err, tc.ErrDeviceNotFound) { // From the tc package. return true @@ -1868,67 +2062,75 @@ func isLinkNotFoundError(err error) bool { var calicoRouterIP = net.IPv4(169, 254, 1, 1).To4() -func (m *bpfEndpointManager) wepTCAttachPoint(ifaceName string, policyIdx, filterIdx int, +func (d *bpfEndpointManagerDataplane) wepTCAttachPoint(ap *tc.AttachPoint, policyIdx, filterIdx int, polDirection PolDirection) *tc.AttachPoint { - ap := m.calculateTCAttachPoint(polDirection, ifaceName) - ap.HostIP = m.hostIP + ap = d.configureTCAttachPoint(polDirection, ap, false) + ifaceName := ap.IfaceName() + ap.HostIP = d.hostIP // * Since we don't pass packet length when doing fib lookup, MTU check is skipped. // * Hence it is safe to set the tunnel mtu same as veth mtu - ap.TunnelMTU = uint16(m.vxlanMTU) - if m.ipv6Enabled { - ip, err := m.getInterfaceIP(ifaceName) + if d.ipFamily == proto.IPVersion_IPV6 { + ip, err := d.getInterfaceIP(ifaceName) if err != nil { log.Debugf("Error getting IP for interface %+v: %+v", ifaceName, err) - ap.IntfIP = m.hostIP + ap.IntfIP = d.hostIP } else { ap.IntfIP = *ip } } else { ap.IntfIP = calicoRouterIP } - ap.ExtToServiceConnmark = uint32(m.bpfExtToServiceConnmark) ap.PolicyIdx = policyIdx ap.LogFilterIdx = filterIdx return ap } -func (m *bpfEndpointManager) wepApplyPolicyToDirection(readiness ifaceReadiness, ifaceName string, policyIdx, - filterIdx int, endpoint *proto.WorkloadEndpoint, polDirection PolDirection) (qDiscInfo, error) { +func (d *bpfEndpointManagerDataplane) wepApplyPolicyToDirection(readiness ifaceReadiness, state *bpfInterfaceState, + endpoint *proto.WorkloadEndpoint, polDirection PolDirection, ap *tc.AttachPoint) (*tc.AttachPoint, error) { - var qdisc qDiscInfo + var policyIdx, filterIdx int - if m.hostIP == nil { + if d.hostIP == nil { // Do not bother and wait - return qdisc, fmt.Errorf("unknown host IP") + return nil, fmt.Errorf("unknown host IP") } - ap := m.wepTCAttachPoint(ifaceName, policyIdx, filterIdx, polDirection) + indices := state.v4 + if d.ipFamily == proto.IPVersion_IPV6 { + indices = state.v6 + } + + attachHook := hook.Ingress + if polDirection == PolDirnEgress { + attachHook = hook.Egress + } + policyIdx = indices.policyIdx[attachHook] + filterIdx = indices.filterIdx[attachHook] - log.WithField("iface", ifaceName).Debugf("readiness: %d", readiness) + ap = d.wepTCAttachPoint(ap, policyIdx, filterIdx, polDirection) + + log.WithField("iface", ap.IfaceName()).Debugf("readiness: %d", readiness) if readiness != ifaceIsReady { - res, err := m.wepAttachProgram(ap) + err := d.mgr.loadPrograms(ap, d.ipFamily) if err != nil { - return qdisc, fmt.Errorf("attaching program to wep: %w", err) + return nil, fmt.Errorf("attaching program to wep: %w", err) } - qdisc.valid = true - qdisc.prio = res.(tc.AttachResult).Prio() - qdisc.handle = res.(tc.AttachResult).Handle() ap.Log().Info("Attached programs to the WEP") } - if err := m.wepApplyPolicy(ap, endpoint, polDirection); err != nil { - return qdisc, fmt.Errorf("applying policy to wep: %w", err) + if err := d.wepApplyPolicy(ap, endpoint, polDirection); err != nil { + return ap, errApplyingPolicy } - return qdisc, nil + return ap, nil } -func (m *bpfEndpointManager) wepAttachProgram(ap *tc.AttachPoint) (bpf.AttachResult, error) { - res, err := m.dp.ensureProgramAttached(ap) +func (m *bpfEndpointManager) loadPrograms(ap *tc.AttachPoint, ipFamily proto.IPVersion) error { + err := m.dp.ensureProgramLoaded(ap, ipFamily) if err != nil { - return nil, err + return err } if ap.LogLevel == "debug" { @@ -1937,10 +2139,10 @@ func (m *bpfEndpointManager) wepAttachProgram(ap *tc.AttachPoint) (bpf.AttachRes } } - return res, nil + return nil } -func (m *bpfEndpointManager) wepApplyPolicy(ap *tc.AttachPoint, +func (d *bpfEndpointManagerDataplane) wepApplyPolicy(ap *tc.AttachPoint, endpoint *proto.WorkloadEndpoint, polDirection PolDirection) error { var profileIDs []string @@ -1955,13 +2157,14 @@ func (m *bpfEndpointManager) wepApplyPolicy(ap *tc.AttachPoint, "Workload interface with no endpoint in datastore, installing default-drop program.") } + m := d.mgr // If tier or profileIDs is nil, this will return an empty set of rules but updatePolicyProgram appends a // drop rule, giving us default drop behaviour in that case. rules := m.extractRules(tier, profileIDs, polDirection) // If host-* endpoint is configured, add in its policy. if m.wildcardExists { - m.addHostPolicy(&rules, &m.wildcardHostEndpoint, polDirection.Inverse()) + m.addHostPolicy(&rules, &d.mgr.wildcardHostEndpoint, polDirection.Inverse()) } // Intentionally leaving this code here until the *-hep takes precedence. @@ -1982,7 +2185,7 @@ func (m *bpfEndpointManager) wepApplyPolicy(ap *tc.AttachPoint, rules.SuppressNormalHostPolicy = true } - return m.updatePolicyProgramFn(rules, polDirection.RuleDir(), ap) + return m.updatePolicyProgramFn(rules, polDirection.RuleDir(), ap, d.ipFamily) } func (m *bpfEndpointManager) addHostPolicy(rules *polprog.Rules, hostEndpoint *proto.HostEndpoint, polDirection PolDirection) { @@ -2007,42 +2210,117 @@ func (m *bpfEndpointManager) addHostPolicy(rules *polprog.Rules, hostEndpoint *p rules.HostProfiles = m.extractProfiles(hostEndpoint.ProfileIds, polDirection) } -func (m *bpfEndpointManager) attachDataIfaceProgram( +func (d *bpfEndpointManagerDataplane) applyPolicyToWeps( + readiness ifaceReadiness, + ifaceName string, + state *bpfInterfaceState, + endpoint *proto.WorkloadEndpoint, + ap *tc.AttachPoint, +) (*tc.AttachPoint, *tc.AttachPoint, error) { + + ingressAttachPoint := *ap + egressAttachPoint := *ap + + var parallelWG sync.WaitGroup + var ingressAP *tc.AttachPoint + var ingressErr error + + parallelWG.Add(1) + go func() { + defer parallelWG.Done() + ingressAP, ingressErr = d.wepApplyPolicyToDirection(readiness, + state, endpoint, PolDirnIngress, &ingressAttachPoint) + }() + + egressAP, egressErr := d.wepApplyPolicyToDirection(readiness, + state, endpoint, PolDirnEgress, &egressAttachPoint) + parallelWG.Wait() + + return ingressAP, egressAP, errors.Join(ingressErr, egressErr) +} + +func (d *bpfEndpointManagerDataplane) applyPolicyToDataIface( + ifaceName string, + ep *proto.HostEndpoint, + state *bpfInterfaceState, + ap *tc.AttachPoint, + xdpAP *xdp.AttachPoint, +) (*tc.AttachPoint, *tc.AttachPoint, *xdp.AttachPoint, error) { + + ingressAttachPoint := *ap + egressAttachPoint := *ap + + var parallelWG sync.WaitGroup + var ingressAP, egressAP *tc.AttachPoint + var ingressErr, egressErr, xdpErr error + + parallelWG.Add(2) + go func() { + defer parallelWG.Done() + ingressAP, ingressErr = d.attachDataIfaceProgram(ifaceName, ep, PolDirnIngress, state, &ingressAttachPoint) + }() + + go func() { + defer parallelWG.Done() + // XDP IPv6 is not supported yet. + if d.ipFamily == proto.IPVersion_IPV4 { + xdpErr = d.attachXDPProgram(xdpAP, ep, state) + } else { + xdpAP = nil + } + }() + + egressAP, egressErr = d.attachDataIfaceProgram(ifaceName, ep, PolDirnEgress, state, &egressAttachPoint) + parallelWG.Wait() + + return ingressAP, egressAP, xdpAP, errors.Join(ingressErr, egressErr, xdpErr) +} + +func (d *bpfEndpointManagerDataplane) attachDataIfaceProgram( ifaceName string, ep *proto.HostEndpoint, polDirection PolDirection, - policyIdx, filterIdx int, -) error { + state *bpfInterfaceState, + ap *tc.AttachPoint, +) (*tc.AttachPoint, error) { - if m.hostIP == nil { + if d.hostIP == nil { // Do not bother and wait - return fmt.Errorf("unknown host IP") + return nil, fmt.Errorf("unknown host IP") } - ap := m.calculateTCAttachPoint(polDirection, ifaceName) - ap.HostIP = m.hostIP - ap.TunnelMTU = uint16(m.vxlanMTU) - ap.ExtToServiceConnmark = uint32(m.bpfExtToServiceConnmark) - ip, err := m.getInterfaceIP(ifaceName) + ap = d.configureTCAttachPoint(polDirection, ap, true) + ap.HostIP = d.hostIP + + ip, err := d.getInterfaceIP(ifaceName) if err != nil { log.Debugf("Error getting IP for interface %+v: %+v", ifaceName, err) - ap.IntfIP = m.hostIP + ap.IntfIP = d.hostIP } else { ap.IntfIP = *ip } - ap.NATin = uint32(m.natInIdx) - ap.NATout = uint32(m.natOutIdx) + + var policyIdx, filterIdx int + indices := state.v4 + if d.ipFamily == proto.IPVersion_IPV6 { + indices = state.v6 + } + + if polDirection == PolDirnIngress { + policyIdx = indices.policyIdx[hook.Ingress] + filterIdx = indices.filterIdx[hook.Ingress] + } else { + policyIdx = indices.policyIdx[hook.Egress] + filterIdx = indices.filterIdx[hook.Egress] + } + ap.PolicyIdx = policyIdx ap.LogFilterIdx = filterIdx - if _, err := m.dp.ensureProgramAttached(ap); err != nil { - return err - } + m := d.mgr - if ap.LogLevel == "debug" { - if err := m.updateLogFilter(ap); err != nil { - ap.Log().WithError(err).Warn("Failed to update logging filter, logging may be incorrect.") - } + if err := m.loadPrograms(ap, d.ipFamily); err != nil { + return nil, err } if ep != nil { @@ -2050,28 +2328,27 @@ func (m *bpfEndpointManager) attachDataIfaceProgram( ForHostInterface: true, } m.addHostPolicy(&rules, ep, polDirection) - return m.updatePolicyProgramFn(rules, polDirection.RuleDir(), ap) - } - - if err := m.dp.removePolicyProgram(ap); err != nil { - return err + if err := m.updatePolicyProgramFn(rules, polDirection.RuleDir(), ap, d.ipFamily); err != nil { + return ap, err + } + } else { + if err := m.dp.removePolicyProgram(ap, d.ipFamily); err != nil { + return ap, err + } } - return nil + return ap, nil } -func (m *bpfEndpointManager) attachXDPProgram(ifaceName string, ep *proto.HostEndpoint, jumpIdx int) error { - ap := &xdp.AttachPoint{ - AttachPoint: bpf.AttachPoint{ - Hook: hook.XDP, - Iface: ifaceName, - LogLevel: m.bpfLogLevel, - PolicyIdx: jumpIdx, - }, - Modes: m.xdpModes, +func (d *bpfEndpointManagerDataplane) attachXDPProgram(ap *xdp.AttachPoint, ep *proto.HostEndpoint, state *bpfInterfaceState) error { + if d.ipFamily == proto.IPVersion_IPV6 { + ap.PolicyIdx = state.v6.policyIdx[hook.XDP] + } else { + ap.PolicyIdx = state.v4.policyIdx[hook.XDP] } + m := d.mgr if ep != nil && len(ep.UntrackedTiers) == 1 { - _, err := m.dp.ensureProgramAttached(ap) + err := m.dp.ensureProgramLoaded(ap, d.ipFamily) if err != nil { return err } @@ -2083,13 +2360,11 @@ func (m *bpfEndpointManager) attachXDPProgram(ifaceName string, ep *proto.HostEn ForXDP: true, } ap.Log().Debugf("Rules: %v", rules) - err = m.updatePolicyProgramFn(rules, "xdp", ap) + err = m.updatePolicyProgramFn(rules, "xdp", ap, d.ipFamily) ap.Log().WithError(err).Debugf("Applied untracked policy hep=%v", ep.Name) - return err - } else { - return m.dp.ensureNoProgram(ap) } + return nil } // PolDirection is the Calico datamodel direction of policy. On a host endpoint, ingress is towards the host. @@ -2142,7 +2417,7 @@ func (m *bpfEndpointManager) apLogFilter(ap *tc.AttachPoint, iface string) (stri return m.bpfLogLevel, exp } -func (m *bpfEndpointManager) calculateTCAttachPoint(policyDirection PolDirection, ifaceName string) *tc.AttachPoint { +func (m *bpfEndpointManager) calculateTCAttachPoint(ifaceName string) *tc.AttachPoint { ap := &tc.AttachPoint{ AttachPoint: bpf.AttachPoint{ Iface: ifaceName, @@ -2156,8 +2431,6 @@ func (m *bpfEndpointManager) calculateTCAttachPoint(policyDirection PolDirection endpointType = tcdefs.EpTypeWorkload } else if ifaceName == "lo" { endpointType = tcdefs.EpTypeLO - ap.HostTunnelIP = m.tunnelIP - log.Debugf("Setting tunnel ip %s on ap %s", m.tunnelIP, ifaceName) if m.hostNetworkedNATMode == hostNetworkedNATUDPOnly { ap.UDPOnly = true } @@ -2171,17 +2444,51 @@ func (m *bpfEndpointManager) calculateTCAttachPoint(policyDirection PolDirection endpointType = tcdefs.EpTypeL3Device } else if ifaceName == bpfInDev || ifaceName == bpfOutDev { endpointType = tcdefs.EpTypeNAT - ap.HostTunnelIP = m.tunnelIP - log.Debugf("Setting tunnel ip %s on ap %s", m.tunnelIP, ifaceName) } else if m.isDataIface(ifaceName) { endpointType = tcdefs.EpTypeHost - ap.HostTunnelIP = m.tunnelIP - log.Debugf("Setting tunnel ip %s on ap %s", m.tunnelIP, ifaceName) + ap.NATin = uint32(m.natInIdx) + ap.NATout = uint32(m.natOutIdx) } else { log.Panicf("Unsupported ifaceName %v", ifaceName) } + ap.Type = endpointType + if ap.Type != tcdefs.EpTypeWorkload { + ap.WgPort = m.wgPort + ap.NATin = uint32(m.natInIdx) + ap.NATout = uint32(m.natOutIdx) + } else { + ap.ExtToServiceConnmark = uint32(m.bpfExtToServiceConnmark) + } + + ap.ToHostDrop = (m.epToHostAction == "DROP") + ap.FIB = m.fibLookupEnabled + ap.DSR = m.dsrEnabled + ap.DSROptoutCIDRs = m.dsrOptoutCidrs + ap.LogLevel, ap.LogFilter = m.apLogFilter(ap, ifaceName) + ap.VXLANPort = m.vxlanPort + ap.PSNATStart = m.psnatPorts.MinPort + ap.PSNATEnd = m.psnatPorts.MaxPort + ap.TunnelMTU = uint16(m.vxlanMTU) + + switch m.rpfEnforceOption { + case "Strict": + ap.RPFEnforceOption = tcdefs.RPFEnforceOptionStrict + case "Loose": + ap.RPFEnforceOption = tcdefs.RPFEnforceOptionLoose + default: + ap.RPFEnforceOption = tcdefs.RPFEnforceOptionDisabled + } - if endpointType == tcdefs.EpTypeWorkload { + return ap +} + +func (d *bpfEndpointManagerDataplane) configureTCAttachPoint(policyDirection PolDirection, ap *tc.AttachPoint, isDataIface bool) *tc.AttachPoint { + if ap.Type == tcdefs.EpTypeLO || ap.Type == tcdefs.EpTypeNAT || isDataIface { + ap.HostTunnelIP = d.tunnelIP + log.Debugf("Setting tunnel ip %s on ap %s", d.tunnelIP, ap.IfaceName()) + } + + if ap.Type == tcdefs.EpTypeWorkload { // Policy direction is relative to the workload so, from the host namespace it's flipped. if policyDirection == PolDirnIngress { ap.Hook = hook.Egress @@ -2189,7 +2496,6 @@ func (m *bpfEndpointManager) calculateTCAttachPoint(policyDirection PolDirection ap.Hook = hook.Ingress } } else { - ap.WgPort = m.wgPort // Host endpoints have the natural relationship between policy direction and hook. if policyDirection == PolDirnIngress { ap.Hook = hook.Ingress @@ -2205,27 +2511,8 @@ func (m *bpfEndpointManager) calculateTCAttachPoint(policyDirection PolDirection toOrFrom = tcdefs.ToEp } - ap.Type = endpointType ap.ToOrFrom = toOrFrom - ap.ToHostDrop = (m.epToHostAction == "DROP") - ap.FIB = m.fibLookupEnabled - ap.DSR = m.dsrEnabled - ap.DSROptoutCIDRs = m.dsrOptoutCidrs - ap.LogLevel, ap.LogFilter = m.apLogFilter(ap, ifaceName) - ap.VXLANPort = m.vxlanPort - ap.PSNATStart = m.psnatPorts.MinPort - ap.PSNATEnd = m.psnatPorts.MaxPort - ap.IPv6Enabled = m.ipv6Enabled - - switch m.rpfEnforceOption { - case "Strict": - ap.RPFEnforceOption = tcdefs.RPFEnforceOptionStrict - case "Loose": - ap.RPFEnforceOption = tcdefs.RPFEnforceOptionLoose - default: - ap.RPFEnforceOption = tcdefs.RPFEnforceOptionDisabled - } - + ap.IPv6Enabled = (d.ipFamily == proto.IPVersion_IPV6) return ap } @@ -2269,7 +2556,7 @@ func (m *bpfEndpointManager) extractTiers(tier *proto.TierInfo, direction PolDir for ri, r := range prules { policy.Rules[ri] = polprog.Rule{ Rule: r, - MatchID: m.dp.ruleMatchID(dir, r.Action, "Policy", polName, ri), + MatchID: m.ruleMatchID(dir, r.Action, "Policy", polName, ri), } } @@ -2308,7 +2595,7 @@ func (m *bpfEndpointManager) extractProfiles(profileNames []string, direction Po for ri, r := range prules { profile.Rules[ri] = polprog.Rule{ Rule: r, - MatchID: m.dp.ruleMatchID(dir, r.Action, "Profile", profName, ri), + MatchID: m.ruleMatchID(dir, r.Action, "Profile", profName, ri), } } @@ -2737,7 +3024,7 @@ func (m *bpfEndpointManager) loadTCObj(at hook.AttachType) (hook.Layout, error) } // Ensure TC/XDP program is attached to the specified interface. -func (m *bpfEndpointManager) ensureProgramAttached(ap attachPoint) (bpf.AttachResult, error) { +func (m *bpfEndpointManager) ensureProgramLoaded(ap attachPoint, ipFamily proto.IPVersion) error { var err error if aptc, ok := ap.(*tc.AttachPoint); ok { @@ -2750,15 +3037,10 @@ func (m *bpfEndpointManager) ensureProgramAttached(ap attachPoint) (bpf.AttachRe DSR: aptc.DSR, } - ipFamily := 4 - if m.ipv6Enabled { - ipFamily = 6 - } - - at.Family = ipFamily - ap.Log().Debugf("ensureProgramAttached %d", ipFamily) + at.Family = int(ipFamily) + ap.Log().Debugf("ensureProgramLoaded %d", ipFamily) if aptc.HookLayout, err = m.loadTCObj(at); err != nil { - return nil, fmt.Errorf("loading generic v%d tc hook program: %w", ipFamily, err) + return fmt.Errorf("loading generic v%d tc hook program: %w", ipFamily, err) } // Load default policy before the real policy is created and loaded. @@ -2772,7 +3054,7 @@ func (m *bpfEndpointManager) ensureProgramAttached(ap attachPoint) (bpf.AttachRe } if err != nil { - return nil, fmt.Errorf("failed to set default policy: %w", err) + return fmt.Errorf("failed to set default policy: %w", err) } } else if apxdp, ok := ap.(*xdp.AttachPoint); ok { at := hook.AttachType{ @@ -2782,13 +3064,12 @@ func (m *bpfEndpointManager) ensureProgramAttached(ap attachPoint) (bpf.AttachRe pm := m.bpfmaps.XDPProgramsMap.(*hook.ProgramsMap) if apxdp.HookLayout, err = pm.LoadObj(at); err != nil { - return nil, fmt.Errorf("loading generic xdp hook program: %w", err) + return fmt.Errorf("loading generic xdp hook program: %w", err) } } else { - return nil, fmt.Errorf("unknown attach type") + return fmt.Errorf("unknown attach type") } - - return ap.AttachProgram() + return nil } // Ensure that the specified attach point does not have our program. @@ -2801,7 +3082,11 @@ func (m *bpfEndpointManager) ensureNoProgram(ap attachPoint) error { } // Forget the policy debug info - m.removePolicyDebugInfo(ap.IfaceName(), 4, ap.HookName()) + if m.ipv6Enabled { + m.removePolicyDebugInfo(ap.IfaceName(), 6, ap.HookName()) + } else { + m.removePolicyDebugInfo(ap.IfaceName(), 4, ap.HookName()) + } return err } @@ -2875,13 +3160,9 @@ func (m *bpfEndpointManager) writePolicyDebugInfo(insns []asm.Insns, ifaceName s return nil } -func (m *bpfEndpointManager) updatePolicyProgram(rules polprog.Rules, polDir string, ap attachPoint) error { - ipFamily := proto.IPVersion_IPV4 - if m.ipv6Enabled { - ipFamily = proto.IPVersion_IPV6 - } +func (m *bpfEndpointManager) updatePolicyProgram(rules polprog.Rules, polDir string, ap attachPoint, ipFamily proto.IPVersion) error { - progName := policyProgramName(ap.IfaceName(), polDir, ipFamily) + progName := policyProgramName(ap.IfaceName(), polDir, proto.IPVersion(ipFamily)) var opts []polprog.Option if apj, ok := ap.(attachPointWithPolicyJumps); ok { @@ -2973,7 +3254,6 @@ func (m *bpfEndpointManager) loadPolicyProgram( log.WithFields(log.Fields{ "progName": progName, "ipFamily": ipFamily, - "rules": rules, }).Debug("Generating policy program...") if ipFamily == proto.IPVersion_IPV6 { @@ -3114,35 +3394,27 @@ func (m *bpfEndpointManager) jumpMapDelete(h hook.Hook, idx int) error { return jumpMapDeleteEntry(jumpMap, idx, stride) } -func (m *bpfEndpointManager) removePolicyProgram(ap attachPoint) error { - ipVersions := []proto.IPVersion{proto.IPVersion_IPV4} - if m.ipv6Enabled { - ipVersions = append(ipVersions, proto.IPVersion_IPV6) +func (m *bpfEndpointManager) removePolicyProgram(ap attachPoint, ipFamily proto.IPVersion) error { + idx := ap.PolicyJmp() + if idx == -1 { + return fmt.Errorf("invalid policy jump map idx %d", idx) } - for _, ipFamily := range ipVersions { - idx := ap.PolicyJmp() - if idx == -1 { - continue - } - - var pm maps.Map - var stride int - if ap.HookName() == hook.XDP { - stride = jump.XDPMaxEntryPoints - pm = m.bpfmaps.XDPJumpMap - } else { - stride = jump.TCMaxEntryPoints - pm = m.bpfmaps.JumpMap - } - - if err := jumpMapDeleteEntry(pm, idx, stride); err != nil { - return fmt.Errorf("removing policy iface %s hook %s: %w", ap.IfaceName(), ap.HookName(), err) - } + var pm maps.Map + var stride int + if ap.HookName() == hook.XDP { + stride = jump.XDPMaxEntryPoints + pm = m.bpfmaps.XDPJumpMap + } else { + stride = jump.TCMaxEntryPoints + pm = m.bpfmaps.JumpMap + } - m.removePolicyDebugInfo(ap.IfaceName(), ipFamily, ap.HookName()) + if err := jumpMapDeleteEntry(pm, idx, stride); err != nil { + return fmt.Errorf("removing policy iface %s hook %s: %w", ap.IfaceName(), ap.HookName(), err) } + m.removePolicyDebugInfo(ap.IfaceName(), ipFamily, ap.HookName()) return nil } @@ -3193,9 +3465,9 @@ func FindJumpMap(progID int, ifaceName string) (mapFD maps.FD, err error) { return 0, fmt.Errorf("failed to find jump map for iface=%s progID=%d", ifaceName, progID) } -func (m *bpfEndpointManager) getInterfaceIP(ifaceName string) (*net.IP, error) { +func (d *bpfEndpointManagerDataplane) getInterfaceIP(ifaceName string) (*net.IP, error) { var ipAddrs []net.IP - if ip, ok := m.ifaceToIpMap[ifaceName]; ok { + if ip, ok := d.ifaceToIpMap[ifaceName]; ok { return &ip, nil } intf, err := net.InterfaceByName(ifaceName) @@ -3210,11 +3482,7 @@ func (m *bpfEndpointManager) getInterfaceIP(ifaceName string) (*net.IP, error) { for _, addr := range addrs { switch t := addr.(type) { case *net.IPNet: - if !m.ipv6Enabled && t.IP.To4() != nil { - ipAddrs = append(ipAddrs, t.IP) - } else if m.ipv6Enabled && t.IP.To4() == nil { - ipAddrs = append(ipAddrs, t.IP) - } + ipAddrs = append(ipAddrs, t.IP) } } sort.Slice(ipAddrs, func(i, j int) bool { diff --git a/felix/dataplane/linux/bpf_ep_mgr_test.go b/felix/dataplane/linux/bpf_ep_mgr_test.go index d611d670abd..a9ec1f4cf5f 100644 --- a/felix/dataplane/linux/bpf_ep_mgr_test.go +++ b/felix/dataplane/linux/bpf_ep_mgr_test.go @@ -100,11 +100,16 @@ func (m *mockDataplane) loadDefaultPolicies() error { return nil } -func (m *mockDataplane) ensureProgramAttached(ap attachPoint) (bpf.AttachResult, error) { +func (m *mockDataplane) ensureProgramAttached(ap attachPoint) (qDiscInfo, error) { + var qdisc qDiscInfo + return qdisc, nil +} + +func (m *mockDataplane) ensureProgramLoaded(ap attachPoint, ipFamily proto.IPVersion) error { m.mutex.Lock() defer m.mutex.Unlock() - var res tc.AttachResult // we don't care about the values + //var res tc.AttachResult // we don't care about the values if apxdp, ok := ap.(*xdp.AttachPoint); ok { apxdp.HookLayout = hook.Layout{ @@ -115,11 +120,11 @@ func (m *mockDataplane) ensureProgramAttached(ap attachPoint) (bpf.AttachResult, key := ap.IfaceName() + ":" + ap.HookName().String() if _, exists := m.progs[key]; exists { - return res, nil + return nil } m.lastProgID += 1 m.progs[key] = m.lastProgID - return res, nil + return nil } func (m *mockDataplane) ensureNoProgram(ap attachPoint) error { @@ -140,7 +145,7 @@ func (m *mockDataplane) ensureQdisc(iface string) (bool, error) { return false, nil } -func (m *mockDataplane) updatePolicyProgram(rules polprog.Rules, polDir string, ap attachPoint) error { +func (m *mockDataplane) updatePolicyProgram(rules polprog.Rules, polDir string, ap attachPoint, ipFamily proto.IPVersion) error { m.mutex.Lock() defer m.mutex.Unlock() key := ap.IfaceName() + ":" + ap.HookName().String() @@ -148,7 +153,7 @@ func (m *mockDataplane) updatePolicyProgram(rules polprog.Rules, polDir string, return nil } -func (m *mockDataplane) removePolicyProgram(ap attachPoint) error { +func (m *mockDataplane) removePolicyProgram(ap attachPoint, ipFamily proto.IPVersion) error { m.mutex.Lock() defer m.mutex.Unlock() key := ap.IfaceName() + ":" + ap.HookName().String() @@ -373,7 +378,7 @@ var _ = Describe("BPF Endpoint Manager", func() { ) Expect(err).NotTo(HaveOccurred()) bpfEpMgr.Features = environment.NewFeatureDetector(nil).GetFeatures() - bpfEpMgr.hostIP = net.ParseIP("1.2.3.4") + bpfEpMgr.v4.hostIP = net.ParseIP("1.2.3.4") } genIfaceUpdate := func(name string, state ifacemonitor.State, index int) func() {