Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IC/ICNI: Remove the need for k8s.ovn.org/external-gw-pod-ips annotation in #3933

Merged
merged 1 commit into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 15 additions & 8 deletions go-controller/pkg/node/default_node_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,15 +1079,22 @@ func (nc *DefaultNodeNetworkController) Start(ctx context.Context) error {
}

if config.OvnKubeNode.Mode != types.NodeModeDPUHost {
util.SetARPTimeout()
err := nc.WatchNamespaces()
if err != nil {
return fmt.Errorf("failed to watch namespaces: %w", err)
// If interconnect is disabled OR interconnect is running in single-zone-mode,
// the ovnkube-master is responsible for patching ICNI managed namespaces with
// "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush
// conntrack on every node. In multi-zone-interconnect case, we will handle the flushing
// directly on the ovnkube-controller code to avoid an extra namespace annotation
if !config.OVNKubernetesFeature.EnableInterconnect || sbZone == types.OvnDefaultZone {
util.SetARPTimeout()
err := nc.WatchNamespaces()
if err != nil {
return fmt.Errorf("failed to watch namespaces: %w", err)
}
// every minute cleanup stale conntrack entries if any
go wait.Until(func() {
nc.checkAndDeleteStaleConntrackEntries()
}, time.Minute*1, nc.stopChan)
}
// every minute cleanup stale conntrack entries if any
go wait.Until(func() {
nc.checkAndDeleteStaleConntrackEntries()
}, time.Minute*1, nc.stopChan)
err = nc.WatchEndpointSlices()
if err != nil {
return fmt.Errorf("failed to watch endpointSlices: %w", err)
Expand Down
19 changes: 17 additions & 2 deletions go-controller/pkg/node/obj_retry_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
discovery "k8s.io/api/discovery/v1"
cache "k8s.io/client-go/tools/cache"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/retry"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
)

type nodeEventHandler struct {
Expand Down Expand Up @@ -159,8 +162,20 @@ func (h *nodeEventHandler) AddResource(obj interface{}, fromRetryLoop bool) erro
func (h *nodeEventHandler) UpdateResource(oldObj, newObj interface{}, inRetryCache bool) error {
switch h.objType {
case factory.NamespaceExGwType:
newNs := newObj.(*kapi.Namespace)
return h.nc.syncConntrackForExternalGateways(newNs)
// If interconnect is disabled OR interconnect is running in single-zone-mode,
// the ovnkube-master is responsible for patching ICNI managed namespaces with
// "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush
// conntrack on every node. In multi-zone-interconnect case, we will handle the flushing
// directly on the ovnkube-controller code to avoid an extra namespace annotation
node, err := h.nc.watchFactory.GetNode(h.nc.name)
if err != nil {
return fmt.Errorf("error retrieving node %s: %v", h.nc.name, err)
}
if !config.OVNKubernetesFeature.EnableInterconnect || util.GetNodeZone(node) == types.OvnDefaultZone {
newNs := newObj.(*kapi.Namespace)
return h.nc.syncConntrackForExternalGateways(newNs)
}
return nil

case factory.EndpointSliceForStaleConntrackRemovalType:
oldEndpointSlice := oldObj.(*discovery.EndpointSlice)
Expand Down
13 changes: 13 additions & 0 deletions go-controller/pkg/ovn/controller/apbroute/master_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,19 @@ func (c *ExternalGatewayMasterController) processNextPolicyWorkItem(wg *sync.Wai
return true
}

func (c *ExternalGatewayMasterController) GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(namespaceName string) (sets.Set[string], error) {
gwIPs, err := c.mgr.getDynamicGatewayIPsForTargetNamespace(namespaceName)
if err != nil {
return nil, err
}
tmpIPs, err := c.mgr.getStaticGatewayIPsForTargetNamespace(namespaceName)
if err != nil {
return nil, err
}

return gwIPs.Union(tmpIPs), nil
}

func (c *ExternalGatewayMasterController) onPolicyAdd(obj interface{}) {
_, ok := obj.(*adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute)
if !ok {
Expand Down
12 changes: 12 additions & 0 deletions go-controller/pkg/ovn/default_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

kapi "k8s.io/api/core/v1"
knet "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/util/wait"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -530,6 +531,17 @@ func (oc *DefaultNetworkController) Run(ctx context.Context) error {
if err = oc.apbExternalRouteController.Run(oc.wg, 1); err != nil {
return err
}
// If interconnect is enabled and it is a multi-zone setup, then we flush conntrack
// on ovnkube-controller side and not on ovnkube-node side, since they are run in the
// same process. TODO(tssurya): In upstream ovnk, its possible to run these as different processes
// in which case this flushing feature is not supported.
if config.OVNKubernetesFeature.EnableInterconnect && oc.zone != ovntypes.OvnDefaultZone {
util.SetARPTimeout()
// every minute cleanup stale conntrack entries if any
go wait.Until(func() {
oc.checkAndDeleteStaleConntrackEntries()
}, time.Minute*1, oc.stopChan)
}
}

end := time.Since(start)
Expand Down
167 changes: 163 additions & 4 deletions go-controller/pkg/ovn/egressgw.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"regexp"
"strings"
"sync"

utilnet "k8s.io/utils/net"

Expand All @@ -20,6 +21,8 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
"github.com/pkg/errors"
"github.com/vishvananda/netlink"
utilapierrors "k8s.io/apimachinery/pkg/util/errors"

nettypes "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"

Expand Down Expand Up @@ -89,6 +92,9 @@ func (oc *DefaultNetworkController) addPodExternalGWForNamespace(namespace strin
for _, gwInfo := range nsInfo.routingExternalPodGWs {
existingGWs.Insert(gwInfo.gws.UnsortedList()...)
}
if config.OVNKubernetesFeature.EnableInterconnect && oc.zone != types.OvnDefaultZone {
existingGWs.Insert(nsInfo.routingExternalGWs.gws.UnsortedList()...)
}
Comment on lines +95 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this conditionalized to single node zone IC? Isn't this something generic you are fixing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its conditionalized to multizone IC (not single zone).
because for nonIC and single zone IC we want to use the tranditional legacy method of annotation flush. The annotation k8s.ovn.org/external-gw-pod-ips must only contain ips of the nsInfo.routingExternalPodGWs NOT nsInfo.routingExternalGWs (this is already present as part of k8s.ovn.org/routing-external-gws so we don't want to duplicate it.) So TLDR; this condition is to compliment the

} else {
		// flush here since we know we have added an egressgw pod and we also know the full list of existing gatewayIPs

below this block.

nsUnlock()

klog.Infof("Adding routes for external gateway pod: %s, next hops: %q, namespace: %s, bfd-enabled: %t",
Expand All @@ -98,12 +104,143 @@ func (oc *DefaultNetworkController) addPodExternalGWForNamespace(namespace strin
return err
}
// add the exgw podIP to the namespace's k8s.ovn.org/external-gw-pod-ips list
if err := util.UpdateExternalGatewayPodIPsAnnotation(oc.kube, namespace, existingGWs.List()); err != nil {
klog.Errorf("Unable to update %s/%v annotation for namespace %s: %v", util.ExternalGatewayPodIPsAnnotation, existingGWs, namespace, err)
if !config.OVNKubernetesFeature.EnableInterconnect || oc.zone == types.OvnDefaultZone {
// If interconnect is disabled OR interconnect is running in single-zone-mode,
// the ovnkube-master is responsible for patching ICNI managed namespaces with
// "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush
// conntrack on every node. In multi-zone-interconnect case, we will handle the flushing
// directly on the ovnkube-controller code to avoid an extra namespace annotation
if err := util.UpdateExternalGatewayPodIPsAnnotation(oc.kube, namespace, existingGWs.List()); err != nil {
klog.Errorf("Unable to update %s/%v annotation for namespace %s: %v", util.ExternalGatewayPodIPsAnnotation, existingGWs, namespace, err)
}
} else {
// flush here since we know we have added an egressgw pod and we also know the full list of existing gatewayIPs
gatewayIPs, err := oc.apbExternalRouteController.GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(namespace)
if err != nil {
return fmt.Errorf("unable to retrieve gateway IPs for Admin Policy Based External Route objects: %w", err)
}
gatewayIPs = gatewayIPs.Insert(existingGWs.List()...)
err = oc.syncConntrackForExternalGateways(namespace, gatewayIPs) // best effort
if err != nil {
klog.Errorf("Syncing conntrack entries for egressGW pod %v serving the namespace %s failed: %v",
egress, namespace, err)
}
}
return nil
}

func (oc *DefaultNetworkController) syncConntrackForExternalGateways(namespace string, gwIPsToKeep sets.Set[string]) error {
var wg sync.WaitGroup
wg.Add(len(gwIPsToKeep))
validMACs := sync.Map{}
for gwIP := range gwIPsToKeep {
go func(gwIP string) {
defer wg.Done()
if len(gwIP) > 0 && !utilnet.IsIPv6String(gwIP) {
// TODO: Add support for IPv6 external gateways
if hwAddr, err := util.GetMACAddressFromARP(net.ParseIP(gwIP)); err != nil {
klog.Errorf("Failed to lookup hardware address for gatewayIP %s: %v", gwIP, err)
} else if len(hwAddr) > 0 {
// we need to reverse the mac before passing it to the conntrack filter since OVN saves the MAC in the following format
// +------------------------------------------------------------ +
// | 128 ... 112 ... 96 ... 80 ... 64 ... 48 ... 32 ... 16 ... 0|
// +------------------+-------+--------------------+-------------|
// | | UNUSED| MAC ADDRESS | UNUSED |
// +------------------+-------+--------------------+-------------+
for i, j := 0, len(hwAddr)-1; i < j; i, j = i+1, j-1 {
hwAddr[i], hwAddr[j] = hwAddr[j], hwAddr[i]
}
validMACs.Store(gwIP, []byte(hwAddr))
}
}
}(gwIP)
}
wg.Wait()

validNextHopMACs := [][]byte{}
validMACs.Range(func(key interface{}, value interface{}) bool {
validNextHopMACs = append(validNextHopMACs, value.([]byte))
return true
})
// Handle corner case where there are 0 IPs on the annotations OR none of the ARPs were successful; i.e allowMACList={empty}.
// This means we *need to* pass a label > 128 bits that will not match on any conntrack entry labels for these pods.
// That way any remaining entries with labels having MACs set will get purged.
if len(validNextHopMACs) == 0 {
validNextHopMACs = append(validNextHopMACs, []byte("does-not-contain-anything"))
}
pods, err := oc.watchFactory.GetPods(namespace)
if err != nil {
return fmt.Errorf("unable to get pods from informer: %v", err)
}

var errs []error
for _, pod := range pods {
pod := pod
// Since it's executed in ovnkube-controller only for multi-zone-ic the following hack of filtering
// local pods will work. Error will be treated as best-effort and ignored
if localPod, _ := oc.isPodInLocalZone(pod); !localPod {
continue
}
podIPs, err := util.GetPodIPsOfNetwork(pod, &util.DefaultNetInfo{})
if err != nil && !errors.Is(err, util.ErrNoPodIPFound) {
errs = append(errs, fmt.Errorf("unable to fetch IP for pod %s/%s: %v", pod.Namespace, pod.Name, err))
}
for _, podIP := range podIPs { // flush conntrack only for UDP
// for this pod, we check if the conntrack entry has a label that is not in the provided allowlist of MACs
// only caveat here is we assume egressGW served pods shouldn't have conntrack entries with other labels set
err := util.DeleteConntrack(podIP.String(), 0, kapi.ProtocolUDP, netlink.ConntrackOrigDstIP, validNextHopMACs)
if err != nil {
errs = append(errs, fmt.Errorf("failed to delete conntrack entry for pod %s: %v", podIP.String(), err))
}
}
}
return utilapierrors.NewAggregate(errs)
}

func (oc *DefaultNetworkController) checkAndDeleteStaleConntrackEntries() {
namespaces, err := oc.watchFactory.GetNamespaces()
if err != nil {
klog.Errorf("Unable to get pods from informer: %v", err)
return
}
for _, namespace := range namespaces {
// flush here since we know we have added an egressgw pod and we also know the full list of existing gatewayIPs
existingGWs, err := oc.apbExternalRouteController.GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(namespace.Name)
if err != nil {
klog.Errorf("Unable to retrieve gateway IPs for Admin Policy Based External Route objects for ns %s: %w", namespace.Name, err)
return
}
// by now the nsInfo cache must be repaired for this feature fully;
tssurya marked this conversation as resolved.
Show resolved Hide resolved
// however this introduces cache lock scale concern by doing this every minute
// versus previously this was done purely using annotations
nsInfo, nsUnlock, err := oc.ensureNamespaceLocked(namespace.Name, false, nil)
if err != nil {
klog.Errorf("Failed to ensure namespace %s locked: %v", namespace, err)
return
}
for _, gwInfo := range nsInfo.routingExternalPodGWs {
existingGWs.Insert(gwInfo.gws.UnsortedList()...)
}
existingGWs.Insert(nsInfo.routingExternalGWs.gws.UnsortedList()...)
nsUnlock()
if len(existingGWs) > 0 {
pods, err := oc.watchFactory.GetPods(namespace.Name)
if err != nil {
klog.Warningf("Unable to get pods from informer for namespace %s: %v", namespace.Name, err)
}
if len(pods) > 0 || err != nil {
// we only need to proceed if there is at least one pod in this namespace on this node
// OR if we couldn't fetch the pods for some reason at this juncture
err = oc.syncConntrackForExternalGateways(namespace.Name, existingGWs)
if err != nil {
klog.Errorf("Syncing conntrack entries for egressGWs %+v serving the namespace %s failed: %v",
existingGWs, namespace.Name, err)
}
}
}
}
}

// addExternalGWsForNamespace handles adding annotated gw routes to all pods in namespace
// This should only be called with a lock on nsInfo
func (oc *DefaultNetworkController) addExternalGWsForNamespace(egress gatewayInfo, nsInfo *namespaceInfo, namespace string) error {
Expand Down Expand Up @@ -293,6 +430,9 @@ func (oc *DefaultNetworkController) deletePodGWRoutesForNamespace(pod *kapi.Pod,
for _, gwInfo := range nsInfo.routingExternalPodGWs {
existingGWs.Insert(gwInfo.gws.UnsortedList()...)
}
if config.OVNKubernetesFeature.EnableInterconnect && oc.zone != types.OvnDefaultZone {
existingGWs.Insert(nsInfo.routingExternalGWs.gws.UnsortedList()...)
}
nsUnlock()

if !ok || len(foundGws.gws) == 0 {
Expand All @@ -313,8 +453,27 @@ func (oc *DefaultNetworkController) deletePodGWRoutesForNamespace(pod *kapi.Pod,
return fmt.Errorf("failed to delete GW routes for pod %s: %w", pod.Name, err)
}
// remove the exgw podIP from the namespace's k8s.ovn.org/external-gw-pod-ips list
if err := util.UpdateExternalGatewayPodIPsAnnotation(oc.kube, namespace, sets.List(existingGWs)); err != nil {
klog.Errorf("Unable to update %s/%v annotation for namespace %s: %v", util.ExternalGatewayPodIPsAnnotation, existingGWs, namespace, err)
if !config.OVNKubernetesFeature.EnableInterconnect || oc.zone == types.OvnDefaultZone {
// If interconnect is disabled OR interconnect is running in single-zone-mode,
// the ovnkube-master is responsible for patching ICNI managed namespaces with
// "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush
// conntrack on every node. In multi-zone-interconnect case, we will handle the flushing
// directly on the ovnkube-controller code to avoid an extra namespace annotation
if err := util.UpdateExternalGatewayPodIPsAnnotation(oc.kube, namespace, sets.List(existingGWs)); err != nil {
klog.Errorf("Unable to update %s/%v annotation for namespace %s: %v", util.ExternalGatewayPodIPsAnnotation, existingGWs, namespace, err)
}
} else {
// flush here since we know we have deleted an egressgw pod and we also know the full list of existing gatewayIPs
gatewayIPs, err := oc.apbExternalRouteController.GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(namespace)
if err != nil {
return fmt.Errorf("unable to retrieve gateway IPs for Admin Policy Based External Route objects: %w", err)
}
gatewayIPs = gatewayIPs.Insert(sets.List(existingGWs)...)
err = oc.syncConntrackForExternalGateways(namespace, gatewayIPs) // best effort
if err != nil {
klog.Errorf("Syncing conntrack entries for egressGWs %+v serving the namespace %s failed: %v",
gatewayIPs, namespace, err)
}
}
return nil
}
Expand Down
20 changes: 20 additions & 0 deletions go-controller/pkg/ovn/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,26 @@ func (oc *DefaultNetworkController) updateNamespace(old, newer *kapi.Namespace)
}
}
}
if config.OVNKubernetesFeature.EnableInterconnect && oc.zone != types.OvnDefaultZone {
// If interconnect is disabled OR interconnect is running in single-zone-mode,
// the ovnkube-master is responsible for patching ICNI managed namespaces with
// "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush
// conntrack on every node. In multi-zone-interconnect case, we will handle the flushing
// directly on the ovnkube-controller code to avoid an extra namespace annotation
gatewayIPs, err := oc.apbExternalRouteController.GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(old.Name)
if err != nil {
return fmt.Errorf("unable to retrieve gateway IPs for Admin Policy Based External Route objects for namespace %s: %w", old.Name, err)
}
for _, gwInfo := range nsInfo.routingExternalPodGWs {
gatewayIPs.Insert(gwInfo.gws.UnsortedList()...)
}
gatewayIPs.Insert(nsInfo.routingExternalGWs.gws.UnsortedList()...)
err = oc.syncConntrackForExternalGateways(old.Name, gatewayIPs) // best effort
if err != nil {
klog.Errorf("Syncing conntrack entries for egressGWs %+v serving the namespace %s failed: %v",
gatewayIPs, old.Name, err)
}
}
// if new annotation is empty, exgws were removed, may need to add SNAT per pod
// check if there are any pod gateways serving this namespace as well
if gwAnnotation == "" && len(nsInfo.routingExternalPodGWs) == 0 && config.Gateway.DisableSNATMultipleGWs {
Expand Down