Skip to content

Commit

Permalink
IC/ICNI: Remove the need for namespace updates
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Seetharaman <suryaseetharaman.9@gmail.com>
  • Loading branch information
tssurya committed Sep 27, 2023
1 parent 4f36908 commit fd2435a
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 14 deletions.
23 changes: 15 additions & 8 deletions go-controller/pkg/node/default_node_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,15 +1080,22 @@ func (nc *DefaultNodeNetworkController) Start(ctx context.Context) error {
}

if config.OvnKubeNode.Mode != types.NodeModeDPUHost {
util.SetARPTimeout()
err := nc.WatchNamespaces()
if err != nil {
return fmt.Errorf("failed to watch namespaces: %w", err)
// If interconnect is disabled OR interconnect is running in single-zone-mode,
// the ovnkube-master is responsible for patching ICNI managed namespaces with
// "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush
// conntrack on every node. In multi-zone-interconnect case, we will handle the flushing
// directly on the ovnkube-controller code to avoid an extra namespace annotation
if !config.OVNKubernetesFeature.EnableInterconnect || sbZone == types.OvnDefaultZone {
util.SetARPTimeout()
err := nc.WatchNamespaces()
if err != nil {
return fmt.Errorf("failed to watch namespaces: %w", err)
}
// every minute cleanup stale conntrack entries if any
go wait.Until(func() {
nc.checkAndDeleteStaleConntrackEntries()
}, time.Minute*1, nc.stopChan)
}
// every minute cleanup stale conntrack entries if any
go wait.Until(func() {
nc.checkAndDeleteStaleConntrackEntries()
}, time.Minute*1, nc.stopChan)
err = nc.WatchEndpointSlices()
if err != nil {
return fmt.Errorf("failed to watch endpointSlices: %w", err)
Expand Down
19 changes: 17 additions & 2 deletions go-controller/pkg/node/obj_retry_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
discovery "k8s.io/api/discovery/v1"
cache "k8s.io/client-go/tools/cache"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/retry"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
)

type nodeEventHandler struct {
Expand Down Expand Up @@ -159,8 +162,20 @@ func (h *nodeEventHandler) AddResource(obj interface{}, fromRetryLoop bool) erro
func (h *nodeEventHandler) UpdateResource(oldObj, newObj interface{}, inRetryCache bool) error {
switch h.objType {
case factory.NamespaceExGwType:
newNs := newObj.(*kapi.Namespace)
return h.nc.syncConntrackForExternalGateways(newNs)
// If interconnect is disabled OR interconnect is running in single-zone-mode,
// the ovnkube-master is responsible for patching ICNI managed namespaces with
// "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush
// conntrack on every node. In multi-zone-interconnect case, we will handle the flushing
// directly on the ovnkube-controller code to avoid an extra namespace annotation
node, err := h.nc.Kube.GetNode(h.nc.name)
if err != nil {
return fmt.Errorf("error retrieving node %s: %v", h.nc.name, err)
}
if !config.OVNKubernetesFeature.EnableInterconnect || util.GetNodeZone(node) == types.OvnDefaultZone {
newNs := newObj.(*kapi.Namespace)
return h.nc.syncConntrackForExternalGateways(newNs)
}
return nil

case factory.EndpointSliceForStaleConntrackRemovalType:
oldEndpointSlice := oldObj.(*discovery.EndpointSlice)
Expand Down
13 changes: 13 additions & 0 deletions go-controller/pkg/ovn/controller/apbroute/master_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,19 @@ func (c *ExternalGatewayMasterController) processNextPolicyWorkItem(wg *sync.Wai
return true
}

func (c *ExternalGatewayMasterController) GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(namespaceName string) (sets.Set[string], error) {
gwIPs, err := c.mgr.getDynamicGatewayIPsForTargetNamespace(namespaceName)
if err != nil {
return nil, err
}
tmpIPs, err := c.mgr.getStaticGatewayIPsForTargetNamespace(namespaceName)
if err != nil {
return nil, err
}

return gwIPs.Union(tmpIPs), nil
}

func (c *ExternalGatewayMasterController) onPolicyAdd(obj interface{}) {
_, ok := obj.(*adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute)
if !ok {
Expand Down
12 changes: 12 additions & 0 deletions go-controller/pkg/ovn/default_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
kapi "k8s.io/api/core/v1"
knet "k8s.io/api/networking/v1"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -534,6 +535,17 @@ func (oc *DefaultNetworkController) Run(ctx context.Context) error {
if err = oc.apbExternalRouteController.Run(oc.wg, 1); err != nil {
return err
}
// If interconnect is enabled and it is a multi-zone setup, then we flush conntrack
// on ovnkube-controller side and not on ovnkube-node side, since they are run in the
// same process. TODO(tssurya): In upstream ovnk, its possible to run these as different processes
// in which case this flushing feature is not supported.
if config.OVNKubernetesFeature.EnableInterconnect && oc.zone != types.OvnDefaultZone {
util.SetARPTimeout()
// every minute cleanup stale conntrack entries if any
go wait.Until(func() {
oc.checkAndDeleteStaleConntrackEntries()
}, time.Minute*1, oc.stopChan)
}
}

end := time.Since(start)
Expand Down
156 changes: 152 additions & 4 deletions go-controller/pkg/ovn/egressgw.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"regexp"
"strings"
"sync"

utilnet "k8s.io/utils/net"

Expand All @@ -20,6 +21,8 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
"github.com/pkg/errors"
"github.com/vishvananda/netlink"
utilapierrors "k8s.io/apimachinery/pkg/util/errors"

nettypes "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"

Expand Down Expand Up @@ -173,6 +176,9 @@ func (oc *DefaultNetworkController) addPodExternalGWForNamespace(namespace strin
for _, gwInfo := range nsInfo.routingExternalPodGWs {
existingGWs.Insert(gwInfo.gws.UnsortedList()...)
}
if config.OVNKubernetesFeature.EnableInterconnect && oc.zone != types.OvnDefaultZone {
existingGWs.Insert(nsInfo.routingExternalGWs.gws.UnsortedList()...)
}
nsUnlock()

klog.Infof("Adding routes for external gateway pod: %s, next hops: %q, namespace: %s, bfd-enabled: %t",
Expand All @@ -182,12 +188,135 @@ func (oc *DefaultNetworkController) addPodExternalGWForNamespace(namespace strin
return err
}
// add the exgw podIP to the namespace's k8s.ovn.org/external-gw-pod-ips list
if err := util.UpdateExternalGatewayPodIPsAnnotation(oc.kube, namespace, existingGWs.List()); err != nil {
klog.Errorf("Unable to update %s/%v annotation for namespace %s: %v", util.ExternalGatewayPodIPsAnnotation, existingGWs, namespace, err)
if !config.OVNKubernetesFeature.EnableInterconnect || oc.zone == types.OvnDefaultZone {
// If interconnect is disabled OR interconnect is running in single-zone-mode,
// the ovnkube-master is responsible for patching ICNI managed namespaces with
// "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush
// conntrack on every node. In multi-zone-interconnect case, we will handle the flushing
// directly on the ovnkube-controller code to avoid an extra namespace annotation
if err := util.UpdateExternalGatewayPodIPsAnnotation(oc.kube, namespace, existingGWs.List()); err != nil {
klog.Errorf("Unable to update %s/%v annotation for namespace %s: %v", util.ExternalGatewayPodIPsAnnotation, existingGWs, namespace, err)
}
} else {
// flush here since we know we have added an egressgw pod and we also know the full list of existing gatewayIPs
gatewayIPs, err := oc.apbExternalRouteController.GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(namespace)
if err != nil {
return fmt.Errorf("unable to retrieve gateway IPs for Admin Policy Based External Route objects: %w", err)
}
// loop through all the IPs on the annotations; ARP for their MACs and form an allowlist
gatewayIPs = gatewayIPs.Insert(existingGWs.List()...)
_ = oc.syncConntrackForExternalGateways(namespace, gatewayIPs) // best effort
}
return nil
}

func (oc *DefaultNetworkController) syncConntrackForExternalGateways(namespace string, gwIPsToKeep sets.Set[string]) error {
var wg sync.WaitGroup
wg.Add(len(gwIPsToKeep))
validMACs := sync.Map{}
for gwIP := range gwIPsToKeep {
go func(gwIP string) {
defer wg.Done()
if len(gwIP) > 0 && !utilnet.IsIPv6String(gwIP) {
// TODO: Add support for IPv6 external gateways
if hwAddr, err := util.GetMACAddressFromARP(net.ParseIP(gwIP)); err != nil {
klog.Errorf("Failed to lookup hardware address for gatewayIP %s: %v", gwIP, err)
} else if len(hwAddr) > 0 {
// we need to reverse the mac before passing it to the conntrack filter since OVN saves the MAC in the following format
// +------------------------------------------------------------ +
// | 128 ... 112 ... 96 ... 80 ... 64 ... 48 ... 32 ... 16 ... 0|
// +------------------+-------+--------------------+-------------|
// | | UNUSED| MAC ADDRESS | UNUSED |
// +------------------+-------+--------------------+-------------+
for i, j := 0, len(hwAddr)-1; i < j; i, j = i+1, j-1 {
hwAddr[i], hwAddr[j] = hwAddr[j], hwAddr[i]
}
validMACs.Store(gwIP, []byte(hwAddr))
}
}
}(gwIP)
}
wg.Wait()

validNextHopMACs := [][]byte{}
validMACs.Range(func(key interface{}, value interface{}) bool {
validNextHopMACs = append(validNextHopMACs, value.([]byte))
return true
})
// Handle corner case where there are 0 IPs on the annotations OR none of the ARPs were successful; i.e allowMACList={empty}.
// This means we *need to* pass a label > 128 bits that will not match on any conntrack entry labels for these pods.
// That way any remaining entries with labels having MACs set will get purged.
if len(validNextHopMACs) == 0 {
validNextHopMACs = append(validNextHopMACs, []byte("does-not-contain-anything"))
}
// TODO: ASK TROZET FOR LOCAL PODS ON THE NODE?!
pods, err := oc.watchFactory.GetPods(namespace)
if err != nil {
return fmt.Errorf("unable to get pods from informer: %v", err)
}

var errs []error
for _, pod := range pods {
pod := pod
podIPs, err := util.GetPodIPsOfNetwork(pod, &util.DefaultNetInfo{})
if err != nil && !errors.Is(err, util.ErrNoPodIPFound) {
errs = append(errs, fmt.Errorf("unable to fetch IP for pod %s/%s: %v", pod.Namespace, pod.Name, err))
}
for _, podIP := range podIPs { // flush conntrack only for UDP
// for this pod, we check if the conntrack entry has a label that is not in the provided allowlist of MACs
// only caveat here is we assume egressGW served pods shouldn't have conntrack entries with other labels set
err := util.DeleteConntrack(podIP.String(), 0, kapi.ProtocolUDP, netlink.ConntrackOrigDstIP, validNextHopMACs)
if err != nil {
errs = append(errs, fmt.Errorf("failed to delete conntrack entry for pod %s: %v", podIP.String(), err))
}
}
}
return utilapierrors.NewAggregate(errs)
}

func (oc *DefaultNetworkController) checkAndDeleteStaleConntrackEntries() {
namespaces, err := oc.watchFactory.GetNamespaces()
if err != nil {
klog.Errorf("Unable to get pods from informer: %v", err)
return
}
for _, namespace := range namespaces {
// flush here since we know we have added an egressgw pod and we also know the full list of existing gatewayIPs
existingGWs, err := oc.apbExternalRouteController.GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(namespace.Name)
if err != nil {
klog.Errorf("Unable to retrieve gateway IPs for Admin Policy Based External Route objects for ns %s: %w", namespace.Name, err)
return
}
// by now the nsInfo cache must be repaired for this feature fully;
// however this introduces cache lock scale concerns by doing this every minute
// versus previously this was done purely using annotations
nsInfo, nsUnlock, err := oc.ensureNamespaceLocked(namespace.Name, false, nil)
if err != nil {
klog.Errorf("Failed to ensure namespace %s locked: %v", namespace, err)
return
}
for _, gwInfo := range nsInfo.routingExternalPodGWs {
existingGWs.Insert(gwInfo.gws.UnsortedList()...)
}
if config.OVNKubernetesFeature.EnableInterconnect && oc.zone != types.OvnDefaultZone {
existingGWs.Insert(nsInfo.routingExternalGWs.gws.UnsortedList()...)
}
nsUnlock()
if len(existingGWs) > 0 {
// TODO: ASK TROZET FOR LOCAL PODS ON THE NODE?!
pods, err := oc.watchFactory.GetPods(namespace.Name)
if err != nil {
klog.Warningf("Unable to get pods from informer for namespace %s: %v", namespace.Name, err)
}
if len(pods) > 0 || err != nil {
// we only need to proceed if there is at least one pod in this namespace on this node
// OR if we couldn't fetch the pods for some reason at this juncture
_ = oc.syncConntrackForExternalGateways(namespace.Name, existingGWs)
}
}
}
}

// addExternalGWsForNamespace handles adding annotated gw routes to all pods in namespace
// This should only be called with a lock on nsInfo
func (oc *DefaultNetworkController) addExternalGWsForNamespace(egress gatewayInfo, nsInfo *namespaceInfo, namespace string) error {
Expand Down Expand Up @@ -377,6 +506,9 @@ func (oc *DefaultNetworkController) deletePodGWRoutesForNamespace(pod *kapi.Pod,
for _, gwInfo := range nsInfo.routingExternalPodGWs {
existingGWs.Insert(gwInfo.gws.UnsortedList()...)
}
if config.OVNKubernetesFeature.EnableInterconnect && oc.zone != types.OvnDefaultZone {
existingGWs.Insert(nsInfo.routingExternalGWs.gws.UnsortedList()...)
}
nsUnlock()

if !ok || len(foundGws.gws) == 0 {
Expand All @@ -397,8 +529,24 @@ func (oc *DefaultNetworkController) deletePodGWRoutesForNamespace(pod *kapi.Pod,
return fmt.Errorf("failed to delete GW routes for pod %s: %w", pod.Name, err)
}
// remove the exgw podIP from the namespace's k8s.ovn.org/external-gw-pod-ips list
if err := util.UpdateExternalGatewayPodIPsAnnotation(oc.kube, namespace, sets.List(existingGWs)); err != nil {
klog.Errorf("Unable to update %s/%v annotation for namespace %s: %v", util.ExternalGatewayPodIPsAnnotation, existingGWs, namespace, err)
if !config.OVNKubernetesFeature.EnableInterconnect || oc.zone == types.OvnDefaultZone {
// If interconnect is disabled OR interconnect is running in single-zone-mode,
// the ovnkube-master is responsible for patching ICNI managed namespaces with
// "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush
// conntrack on every node. In multi-zone-interconnect case, we will handle the flushing
// directly on the ovnkube-controller code to avoid an extra namespace annotation
if err := util.UpdateExternalGatewayPodIPsAnnotation(oc.kube, namespace, sets.List(existingGWs)); err != nil {
klog.Errorf("Unable to update %s/%v annotation for namespace %s: %v", util.ExternalGatewayPodIPsAnnotation, existingGWs, namespace, err)
}
} else {
// flush here since we know we have deleted an egressgw pod and we also know the full list of existing gatewayIPs
gatewayIPs, err := oc.apbExternalRouteController.GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(namespace)
if err != nil {
return fmt.Errorf("unable to retrieve gateway IPs for Admin Policy Based External Route objects: %w", err)
}
// loop through all the IPs on the annotations; ARP for their MACs and form an allowlist
gatewayIPs = gatewayIPs.Insert(sets.List(existingGWs)...)
_ = oc.syncConntrackForExternalGateways(namespace, gatewayIPs) // best effort
}
return nil
}
Expand Down

0 comments on commit fd2435a

Please sign in to comment.