From af5edf866119b18432cf6b2c136416504b22dae4 Mon Sep 17 00:00:00 2001 From: jordigilh Date: Wed, 16 Aug 2023 17:29:28 -0400 Subject: [PATCH 1/7] [OCPBUGS-17692] Fixes an issue in APB External Route where deleting a pod and recreating it failed due to the Deleted state in the routeInfo. The solution involves getting the status before obtaining the routeInfo lock and comparing it after obtaining lock: if the values differ, return with error. If the values match, there has been no changes and it is safe to reuse the routeInfo in case it was deleted. Signed-off-by: jordigilh (cherry picked from commit c7c7be4aab5bfbeaf6108cdec408f75789ae1615) --- .../node/default_node_network_controller.go | 1 - .../apbroute/external_controller.go | 38 ++++++++++- .../controller/apbroute/master_controller.go | 40 +++++------- .../ovn/controller/apbroute/network_client.go | 58 ++++++++++------- .../controller/apbroute/node_controller.go | 6 -- .../pkg/ovn/default_network_controller.go | 10 +-- go-controller/pkg/ovn/egressgw.go | 65 ++++++++++--------- 7 files changed, 127 insertions(+), 91 deletions(-) diff --git a/go-controller/pkg/node/default_node_network_controller.go b/go-controller/pkg/node/default_node_network_controller.go index 2788862855..5c45e3a6ce 100644 --- a/go-controller/pkg/node/default_node_network_controller.go +++ b/go-controller/pkg/node/default_node_network_controller.go @@ -1035,7 +1035,6 @@ func (nc *DefaultNodeNetworkController) Start(ctx context.Context) error { if config.HybridOverlay.Enabled { // Not supported with DPUs, enforced in config // TODO(adrianc): Revisit above comment - // TODO(jtanenba): LocalPodInformer informs on all pods nodeController, err := honode.NewNode( nc.Kube, nc.name, diff --git a/go-controller/pkg/ovn/controller/apbroute/external_controller.go b/go-controller/pkg/ovn/controller/apbroute/external_controller.go index d16b636752..93033301ea 100644 --- a/go-controller/pkg/ovn/controller/apbroute/external_controller.go +++ b/go-controller/pkg/ovn/controller/apbroute/external_controller.go @@ -37,7 +37,7 @@ func newPodInfo() *podInfo { } } -type ExternalRouteInfo struct { +type RouteInfo struct { sync.Mutex Deleted bool PodName ktypes.NamespacedName @@ -47,6 +47,42 @@ type ExternalRouteInfo struct { PodExternalRoutes map[string]map[string]string } +type ExternalGatewayRouteInfoCache struct { + // External gateway caches + // Make them public so that they can be used by the annotation logic to lock on namespaces and share the same external route information + ExternalGWCache map[ktypes.NamespacedName]*RouteInfo + ExGWCacheMutex *sync.RWMutex + routeInfoLocks *syncmap.SyncMap[string] +} + +func NewExternalGatewayRouteInfoCache() *ExternalGatewayRouteInfoCache { + return &ExternalGatewayRouteInfoCache{ + ExternalGWCache: make(map[ktypes.NamespacedName]*RouteInfo), + ExGWCacheMutex: &sync.RWMutex{}, + routeInfoLocks: syncmap.NewSyncMap[string](), + } +} + +func (e *ExternalGatewayRouteInfoCache) GetRouteInfoDeletedStatus(podName ktypes.NamespacedName) bool { + var status bool + _ = e.routeInfoLocks.DoWithLock(podName.String(), func(_ string) error { + if v, ok := e.ExternalGWCache[podName]; ok { + status = v.Deleted + } + return nil + }) + return status +} + +func (e *ExternalGatewayRouteInfoCache) SetRouteInfoDeletedStatus(podName ktypes.NamespacedName, status bool) { + _ = e.routeInfoLocks.DoWithLock(podName.String(), func(_ string) error { + if v, ok := e.ExternalGWCache[podName]; ok { + v.Deleted = status + } + return nil + }) +} + // routePolicyState contains current policy state as it was applied. // Since every config is applied to a pod, podInfo stores current state for every target pod. type routePolicyState struct { diff --git a/go-controller/pkg/ovn/controller/apbroute/master_controller.go b/go-controller/pkg/ovn/controller/apbroute/master_controller.go index fb75d1fea2..ce87d84fab 100644 --- a/go-controller/pkg/ovn/controller/apbroute/master_controller.go +++ b/go-controller/pkg/ovn/controller/apbroute/master_controller.go @@ -15,7 +15,6 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - ktypes "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -38,10 +37,12 @@ import ( ) const ( - maxRetries = 15 + resyncInterval = 0 + maxRetries = 15 ) // Admin Policy Based Route services + type ExternalGatewayMasterController struct { client kubernetes.Interface apbRoutePolicyClient adminpolicybasedrouteclient.Interface @@ -62,13 +63,9 @@ type ExternalGatewayMasterController struct { namespaceLister corev1listers.NamespaceLister namespaceInformer cache.SharedIndexInformer - // External gateway caches - // Make them public so that they can be used by the annotation logic to lock on namespaces and share the same external route information - ExternalGWCache map[ktypes.NamespacedName]*ExternalRouteInfo - ExGWCacheMutex *sync.RWMutex - - mgr *externalPolicyManager - nbClient *northBoundClient + mgr *externalPolicyManager + nbClient *northBoundClient + ExternalGWRouteInfoCache *ExternalGatewayRouteInfoCache } func NewExternalMasterController( @@ -84,22 +81,20 @@ func NewExternalMasterController( controllerName string, ) (*ExternalGatewayMasterController, error) { - externalGWCache := make(map[ktypes.NamespacedName]*ExternalRouteInfo) - exGWCacheMutex := &sync.RWMutex{} + externalGWRouteInfo := NewExternalGatewayRouteInfoCache() zone, err := libovsdbutil.GetNBZone(nbClient) if err != nil { return nil, err } nbCli := &northBoundClient{ - routeLister: apbRouteInformer.Lister(), - nodeLister: nodeLister, - podLister: podInformer.Lister(), - nbClient: nbClient, - addressSetFactory: addressSetFactory, - externalGWCache: externalGWCache, - exGWCacheMutex: exGWCacheMutex, - controllerName: controllerName, - zone: zone, + routeLister: apbRouteInformer.Lister(), + nodeLister: nodeLister, + podLister: podInformer.Lister(), + nbClient: nbClient, + addressSetFactory: addressSetFactory, + controllerName: controllerName, + zone: zone, + externalGatewayRouteInfo: externalGWRouteInfo, } c := &ExternalGatewayMasterController{ @@ -124,9 +119,8 @@ func NewExternalMasterController( workqueue.NewItemFastSlowRateLimiter(time.Second, 5*time.Second, 5), "apbexternalroutenamespaces", ), - ExternalGWCache: externalGWCache, - ExGWCacheMutex: exGWCacheMutex, - nbClient: nbCli, + ExternalGWRouteInfoCache: externalGWRouteInfo, + nbClient: nbCli, mgr: newExternalPolicyManager( stopCh, podInformer.Lister(), diff --git a/go-controller/pkg/ovn/controller/apbroute/network_client.go b/go-controller/pkg/ovn/controller/apbroute/network_client.go index 615aebe1d0..31aa6bfe04 100644 --- a/go-controller/pkg/ovn/controller/apbroute/network_client.go +++ b/go-controller/pkg/ovn/controller/apbroute/network_client.go @@ -44,10 +44,10 @@ type northBoundClient struct { nbClient libovsdbclient.Client // An address set factory that creates address sets - addressSetFactory addressset.AddressSetFactory - externalGWCache map[ktypes.NamespacedName]*ExternalRouteInfo - exGWCacheMutex *sync.RWMutex - controllerName string + addressSetFactory addressset.AddressSetFactory + externalGatewayRouteInfo *ExternalGatewayRouteInfoCache + + controllerName string zone string } @@ -128,7 +128,7 @@ func (nb *northBoundClient) delAllLegacyHybridRoutePolicies() error { func (nb *northBoundClient) deleteGatewayIPs(podNsName ktypes.NamespacedName, toBeDeletedGWIPs, _ sets.Set[string]) error { for _, routeInfo := range nb.getRouteInfosForPod(podNsName) { // if we encounter error while deleting routes for one pod; we return and don't try subsequent pods - if err := nb.deletePodGWRoutes(routeInfo, toBeDeletedGWIPs); err != nil { + if err := nb.deletePodGWRoutes(routeInfo, toBeDeletedGWIPs, podNsName); err != nil { return err } } @@ -136,10 +136,10 @@ func (nb *northBoundClient) deleteGatewayIPs(podNsName ktypes.NamespacedName, to } // deletePodGWRoutes removes known exgw routes for a pod via routeInfo for a list of given GW IPs -func (nb *northBoundClient) deletePodGWRoutes(routeInfo *ExternalRouteInfo, toBeDeletedGWIPs sets.Set[string]) error { +func (nb *northBoundClient) deletePodGWRoutes(routeInfo *RouteInfo, toBeDeletedGWIPs sets.Set[string], podNsName ktypes.NamespacedName) error { routeInfo.Lock() defer routeInfo.Unlock() - if routeInfo.Deleted { + if nb.externalGatewayRouteInfo.GetRouteInfoDeletedStatus(podNsName) { return nil } pod, err := nb.podLister.Pods(routeInfo.PodName.Namespace).Get(routeInfo.PodName.Name) @@ -170,17 +170,17 @@ func (nb *northBoundClient) deletePodGWRoutes(routeInfo *ExternalRouteInfo, toBe } } } - routeInfo.Deleted = deletedPod + nb.externalGatewayRouteInfo.SetRouteInfoDeletedStatus(podNsName, deletedPod) return nil } // getRouteInfosForPod returns all routeInfos for a specific namespace -func (nb *northBoundClient) getRouteInfosForPod(podNsName ktypes.NamespacedName) []*ExternalRouteInfo { - nb.exGWCacheMutex.RLock() - defer nb.exGWCacheMutex.RUnlock() +func (nb *northBoundClient) getRouteInfosForPod(podNsName ktypes.NamespacedName) []*RouteInfo { + nb.externalGatewayRouteInfo.ExGWCacheMutex.RLock() + defer nb.externalGatewayRouteInfo.ExGWCacheMutex.RUnlock() - routes := make([]*ExternalRouteInfo, 0) - for namespacedName, routeInfo := range nb.externalGWCache { + routes := make([]*RouteInfo, 0) + for namespacedName, routeInfo := range nb.externalGatewayRouteInfo.ExternalGWCache { if namespacedName == podNsName { routes = append(routes, routeInfo) } @@ -478,27 +478,32 @@ func (nb *northBoundClient) updateExternalGWInfoCacheForPodIPWithGatewayIP(podIP } // ensureRouteInfoLocked either gets the current routeInfo in the cache with a lock, or creates+locks a new one if missing -func (nb *northBoundClient) ensureRouteInfoLocked(podName ktypes.NamespacedName) (*ExternalRouteInfo, error) { +func (nb *northBoundClient) ensureRouteInfoLocked(podName ktypes.NamespacedName) (*RouteInfo, error) { // We don't want to hold the cache lock while we try to lock the routeInfo (unless we are creating it, then we know // no one else is using it). This could lead to dead lock. Therefore the steps here are: // 1. Get the cache lock, try to find the routeInfo // 2. If routeInfo existed, release the cache lock // 3. If routeInfo did not exist, safe to hold the cache lock while we create the new routeInfo - nb.exGWCacheMutex.Lock() - routeInfo, ok := nb.externalGWCache[podName] + nb.externalGatewayRouteInfo.ExGWCacheMutex.Lock() + routeInfo, ok := nb.externalGatewayRouteInfo.ExternalGWCache[podName] + var isDeleted bool if !ok { - routeInfo = &ExternalRouteInfo{ + routeInfo = &RouteInfo{ PodExternalRoutes: make(map[string]map[string]string), PodName: podName, } // we are creating routeInfo and going to set it in podExternalRoutes map // so safe to hold the lock while we create and add it - defer nb.exGWCacheMutex.Unlock() - nb.externalGWCache[podName] = routeInfo + defer nb.externalGatewayRouteInfo.ExGWCacheMutex.Unlock() + nb.externalGatewayRouteInfo.ExternalGWCache[podName] = routeInfo } else { + // capture the current status of the routeInfo. Compare it once + // the route info lock is secured to check if the status was changed + // while waiting for the lock. + isDeleted = nb.externalGatewayRouteInfo.GetRouteInfoDeletedStatus(podName) // if we found an existing routeInfo, do not hold the cache lock // while waiting for routeInfo to Lock - nb.exGWCacheMutex.Unlock() + nb.externalGatewayRouteInfo.ExGWCacheMutex.Unlock() } // 4. Now lock the routeInfo @@ -507,15 +512,20 @@ func (nb *northBoundClient) ensureRouteInfoLocked(podName ktypes.NamespacedName) // 5. If routeInfo was deleted between releasing the cache lock and grabbing // the routeInfo lock, return an error so the caller doesn't use it and // retries the operation later - if routeInfo.Deleted { - routeInfo.Unlock() - return nil, fmt.Errorf("routeInfo for pod %s, was altered during ensure route info", podName) + if nb.externalGatewayRouteInfo.GetRouteInfoDeletedStatus(podName) { + if !isDeleted { + // info was modified while waiting for unlock, return error and retry later + routeInfo.Unlock() + return nil, fmt.Errorf("routeInfo for pod %s, was altered during ensure route info", podName) + } + // it was already deleted before the lock, so change the status as not deleted + nb.externalGatewayRouteInfo.SetRouteInfoDeletedStatus(podName, false) } return routeInfo, nil } -func (nb *northBoundClient) deletePodGWRoute(routeInfo *ExternalRouteInfo, podIP, gw, gr string) error { +func (nb *northBoundClient) deletePodGWRoute(routeInfo *RouteInfo, podIP, gw, gr string) error { if utilnet.IsIPv6String(gw) != utilnet.IsIPv6String(podIP) { return nil } diff --git a/go-controller/pkg/ovn/controller/apbroute/node_controller.go b/go-controller/pkg/ovn/controller/apbroute/node_controller.go index 052a859ac3..40b17838a2 100644 --- a/go-controller/pkg/ovn/controller/apbroute/node_controller.go +++ b/go-controller/pkg/ovn/controller/apbroute/node_controller.go @@ -8,7 +8,6 @@ import ( "time" v1 "k8s.io/api/core/v1" - ktypes "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -48,11 +47,6 @@ type ExternalGatewayNodeController struct { namespaceLister corev1listers.NamespaceLister namespaceInformer cache.SharedIndexInformer - //external gateway caches - //make them public so that they can be used by the annotation logic to lock on namespaces and share the same external route information - ExternalGWCache map[ktypes.NamespacedName]*ExternalRouteInfo - ExGWCacheMutex *sync.RWMutex - mgr *externalPolicyManager } diff --git a/go-controller/pkg/ovn/default_network_controller.go b/go-controller/pkg/ovn/default_network_controller.go index bc84affc8d..a03f203bb1 100644 --- a/go-controller/pkg/ovn/default_network_controller.go +++ b/go-controller/pkg/ovn/default_network_controller.go @@ -28,13 +28,11 @@ import ( zoneic "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/zone_interconnect" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/retry" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/syncmap" - "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" ovntypes "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util" kapi "k8s.io/api/core/v1" knet "k8s.io/api/networking/v1" - ktypes "k8s.io/apimachinery/pkg/types" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -52,8 +50,7 @@ type DefaultNetworkController struct { // cluster's east-west traffic. loadbalancerClusterCache map[kapi.Protocol]string - externalGWCache map[ktypes.NamespacedName]*apbroutecontroller.ExternalRouteInfo - exGWCacheMutex *sync.RWMutex + externalGatewayRouteInfo *apbroutecontroller.ExternalGatewayRouteInfoCache // egressFirewalls is a map of namespaces and the egressFirewall attached to it egressFirewalls sync.Map @@ -206,8 +203,7 @@ func newDefaultNetworkControllerCommon(cnci *CommonNetworkControllerInfo, zoneICHandler: zoneICHandler, cancelableCtx: util.NewCancelableContext(), }, - externalGWCache: apbExternalRouteController.ExternalGWCache, - exGWCacheMutex: apbExternalRouteController.ExGWCacheMutex, + externalGatewayRouteInfo: apbExternalRouteController.ExternalGWRouteInfoCache, eIPC: egressIPZoneController{ nodeIPUpdateMutex: &sync.Mutex{}, podAssignmentMutex: &sync.Mutex{}, @@ -229,7 +225,7 @@ func newDefaultNetworkControllerCommon(cnci *CommonNetworkControllerInfo, // allocate the first IPs in the join switch subnets. gwLRPIfAddrs, err := oc.getOVNClusterRouterPortToJoinSwitchIfAddrs() if err != nil { - return nil, fmt.Errorf("failed to allocate join switch IP address connected to %s: %v", types.OVNClusterRouter, err) + return nil, fmt.Errorf("failed to allocate join switch IP address connected to %s: %v", ovntypes.OVNClusterRouter, err) } oc.ovnClusterLRPToJoinIfAddrs = gwLRPIfAddrs diff --git a/go-controller/pkg/ovn/egressgw.go b/go-controller/pkg/ovn/egressgw.go index 8a9c6d13ba..5f586151f5 100644 --- a/go-controller/pkg/ovn/egressgw.go +++ b/go-controller/pkg/ovn/egressgw.go @@ -36,27 +36,29 @@ type gatewayInfo struct { } // ensureRouteInfoLocked either gets the current routeInfo in the cache with a lock, or creates+locks a new one if missing -func (oc *DefaultNetworkController) ensureRouteInfoLocked(podName ktypes.NamespacedName) (*apbroutecontroller.ExternalRouteInfo, error) { +func (oc *DefaultNetworkController) ensureRouteInfoLocked(podName ktypes.NamespacedName) (*apbroutecontroller.RouteInfo, error) { // We don't want to hold the cache lock while we try to lock the routeInfo (unless we are creating it, then we know // no one else is using it). This could lead to dead lock. Therefore the steps here are: // 1. Get the cache lock, try to find the routeInfo // 2. If routeInfo existed, release the cache lock // 3. If routeInfo did not exist, safe to hold the cache lock while we create the new routeInfo - oc.exGWCacheMutex.Lock() - routeInfo, ok := oc.externalGWCache[podName] + oc.externalGatewayRouteInfo.ExGWCacheMutex.Lock() + routeInfo, ok := oc.externalGatewayRouteInfo.ExternalGWCache[podName] + var isDeleted bool if !ok { - routeInfo = &apbroutecontroller.ExternalRouteInfo{ + routeInfo = &apbroutecontroller.RouteInfo{ PodExternalRoutes: make(map[string]map[string]string), PodName: podName, } // we are creating routeInfo and going to set it in podExternalRoutes map // so safe to hold the lock while we create and add it - defer oc.exGWCacheMutex.Unlock() - oc.externalGWCache[podName] = routeInfo + defer oc.externalGatewayRouteInfo.ExGWCacheMutex.Unlock() + oc.externalGatewayRouteInfo.ExternalGWCache[podName] = routeInfo } else { // if we found an existing routeInfo, do not hold the cache lock // while waiting for routeInfo to Lock - oc.exGWCacheMutex.Unlock() + isDeleted = oc.externalGatewayRouteInfo.GetRouteInfoDeletedStatus(podName) + oc.externalGatewayRouteInfo.ExGWCacheMutex.Unlock() } // 4. Now lock the routeInfo @@ -65,23 +67,28 @@ func (oc *DefaultNetworkController) ensureRouteInfoLocked(podName ktypes.Namespa // 5. If routeInfo was deleted between releasing the cache lock and grabbing // the routeInfo lock, return an error so the caller doesn't use it and // retries the operation later - if routeInfo.Deleted { - routeInfo.Unlock() - return nil, fmt.Errorf("routeInfo for pod %s, was altered during ensure route info", podName) + if oc.externalGatewayRouteInfo.GetRouteInfoDeletedStatus(podName) { + if !isDeleted { + // info was modified while waiting for unlock, return error and retry later + routeInfo.Unlock() + return nil, fmt.Errorf("routeInfo for pod %s, was altered during ensure route info", podName) + } + // it was already deleted before the lock, so change the status as not deleted + oc.externalGatewayRouteInfo.SetRouteInfoDeletedStatus(podName, false) } return routeInfo, nil } // getRouteInfosForNamespace returns all routeInfos for a specific namespace -func (oc *DefaultNetworkController) getRouteInfosForNamespace(namespace string) []*apbroutecontroller.ExternalRouteInfo { - oc.exGWCacheMutex.RLock() - defer oc.exGWCacheMutex.RUnlock() +func (oc *DefaultNetworkController) getRouteInfosForNamespace(namespace string) map[ktypes.NamespacedName]*apbroutecontroller.RouteInfo { + oc.externalGatewayRouteInfo.ExGWCacheMutex.RLock() + defer oc.externalGatewayRouteInfo.ExGWCacheMutex.RUnlock() - routes := make([]*apbroutecontroller.ExternalRouteInfo, 0) - for namespacedName, routeInfo := range oc.externalGWCache { + routes := make(map[ktypes.NamespacedName]*apbroutecontroller.RouteInfo) + for namespacedName, routeInfo := range oc.externalGatewayRouteInfo.ExternalGWCache { if namespacedName.Namespace == namespace { - routes = append(routes, routeInfo) + routes[namespacedName] = routeInfo } } @@ -89,30 +96,30 @@ func (oc *DefaultNetworkController) getRouteInfosForNamespace(namespace string) } // deleteRouteInfoLocked removes a routeInfo from the cache, and returns it locked -func (oc *DefaultNetworkController) deleteRouteInfoLocked(name ktypes.NamespacedName) *apbroutecontroller.ExternalRouteInfo { +func (oc *DefaultNetworkController) deleteRouteInfoLocked(name ktypes.NamespacedName) *apbroutecontroller.RouteInfo { // Attempt to find the routeInfo in the cache, release the cache lock while // we try to lock the routeInfo to avoid any deadlock - oc.exGWCacheMutex.RLock() - routeInfo := oc.externalGWCache[name] - oc.exGWCacheMutex.RUnlock() + oc.externalGatewayRouteInfo.ExGWCacheMutex.RLock() + routeInfo := oc.externalGatewayRouteInfo.ExternalGWCache[name] + oc.externalGatewayRouteInfo.ExGWCacheMutex.RUnlock() if routeInfo == nil { return nil } routeInfo.Lock() - if routeInfo.Deleted { + if oc.externalGatewayRouteInfo.GetRouteInfoDeletedStatus(name) { routeInfo.Unlock() return nil } - routeInfo.Deleted = true + oc.externalGatewayRouteInfo.SetRouteInfoDeletedStatus(name, true) go func() { - oc.exGWCacheMutex.Lock() - defer oc.exGWCacheMutex.Unlock() - if newRouteInfo := oc.externalGWCache[name]; routeInfo == newRouteInfo { - delete(oc.externalGWCache, name) + oc.externalGatewayRouteInfo.ExGWCacheMutex.Lock() + defer oc.externalGatewayRouteInfo.ExGWCacheMutex.Unlock() + if newRouteInfo := oc.externalGatewayRouteInfo.ExternalGWCache[name]; routeInfo == newRouteInfo { + delete(oc.externalGatewayRouteInfo.ExternalGWCache, name) } }() @@ -305,7 +312,7 @@ func (oc *DefaultNetworkController) deleteLogicalRouterStaticRoute(podIP, mask, // deletePodGWRoute deletes all associated gateway routing resources for one // pod gateway route // this MUST be called with a lock on routeInfo -func (oc *DefaultNetworkController) deletePodGWRoute(routeInfo *apbroutecontroller.ExternalRouteInfo, podIP, gw, gr string) error { +func (oc *DefaultNetworkController) deletePodGWRoute(routeInfo *apbroutecontroller.RouteInfo, podIP, gw, gr string) error { if utilnet.IsIPv6String(gw) != utilnet.IsIPv6String(podIP) { return nil } @@ -418,9 +425,9 @@ func (oc *DefaultNetworkController) deleteGWRoutesForNamespace(namespace string, return err } policyGWIPs = policyGWIPs.Union(policyStaticGWIPs) - for _, routeInfo := range oc.getRouteInfosForNamespace(namespace) { + for podNsName, routeInfo := range oc.getRouteInfosForNamespace(namespace) { routeInfo.Lock() - if routeInfo.Deleted { + if oc.externalGatewayRouteInfo.GetRouteInfoDeletedStatus(podNsName) { routeInfo.Unlock() continue } From 2917c28f1037c38667822283f5f1a01588bd9f24 Mon Sep 17 00:00:00 2001 From: Nadia Pinaeva Date: Thu, 17 Aug 2023 11:49:08 +0200 Subject: [PATCH 2/7] Update SyncMap to enable any comparable key type, not just string. Signed-off-by: Nadia Pinaeva (cherry picked from commit 6a56e3ba737eab131875cbedf351f0caa2125c32) --- go-controller/pkg/syncmap/syncmap.go | 44 ++++++++++++++++------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/go-controller/pkg/syncmap/syncmap.go b/go-controller/pkg/syncmap/syncmap.go index bc59ef6cc0..66064e98ef 100644 --- a/go-controller/pkg/syncmap/syncmap.go +++ b/go-controller/pkg/syncmap/syncmap.go @@ -27,9 +27,9 @@ func newKeyLock() *keyLock { return &c } -// SyncMap is a map with lockable keys. It allows to lock the key regardless of whether the entry for +// SyncMapComparableKey is a map with lockable keys. It allows to lock the key regardless of whether the entry for // given key exists. When key is locked other threads can't read/write the key. -type SyncMap[T any] struct { +type SyncMapComparableKey[T1 comparable, T2 any] struct { // keyLocksMutex needs to be locked for every read/write operation with keyLocks. // refCounter should be updated for keyLock before keyLocksMutex lock is released. // to avoid deadlocks make sure no other locks are acquired when keyLocksMutex is locked. @@ -37,28 +37,28 @@ type SyncMap[T any] struct { // map of key mutexes, should only be accessed with keyLocksMutex lock // keyLock exists for a key that was locked with LockKey and until all threads that called LockKey // execute UnlockKey - keyLocks map[string]*keyLock + keyLocks map[T1]*keyLock // entriesMutex needs to be locked for every read/write operation with entries // to avoid deadlocks make sure no other locks are acquired when entriesMutex is locked entriesMutex sync.Mutex // cache entries // should only be accessed with entriesMutex, also // read/write for a given key is only allowed with keyLock - entries map[string]T + entries map[T1]T2 } -func NewSyncMap[T any]() *SyncMap[T] { - c := SyncMap[T]{ +func NewSyncMapComparableKey[T1 comparable, T2 any]() *SyncMapComparableKey[T1, T2] { + c := SyncMapComparableKey[T1, T2]{ sync.Mutex{}, - make(map[string]*keyLock), + make(map[T1]*keyLock), sync.Mutex{}, - make(map[string]T), + make(map[T1]T2), } return &c } // UnlockKey unlocks previously locked key. Call it when all the operations with the given key are done. -func (c *SyncMap[T]) UnlockKey(lockedKey string) { +func (c *SyncMapComparableKey[T1, T2]) UnlockKey(lockedKey T1) { c.keyLocksMutex.Lock() defer c.keyLocksMutex.Unlock() kLock, ok := c.keyLocks[lockedKey] @@ -82,7 +82,7 @@ func (c *SyncMap[T]) UnlockKey(lockedKey string) { // Otherwise, it stores and returns the given value. // The loaded result is true if the value was loaded, false if stored. // loadOrStoreKeyLock will increase refCounter for returned value -func (c *SyncMap[T]) loadOrStoreKeyLock(lockedKey string, value *keyLock) (*keyLock, bool) { +func (c *SyncMapComparableKey[T1, T2]) loadOrStoreKeyLock(lockedKey T1, value *keyLock) (*keyLock, bool) { c.keyLocksMutex.Lock() defer c.keyLocksMutex.Unlock() if kLock, ok := c.keyLocks[lockedKey]; ok { @@ -99,7 +99,7 @@ func (c *SyncMap[T]) loadOrStoreKeyLock(lockedKey string, value *keyLock) (*keyL // it guarantees exclusive access to the key. // Unlock(key) should be called once the work for this key is done to unlock other threads // After the key is unlocked there are no guarantees for the entry for given key -func (c *SyncMap[T]) LockKey(key string) { +func (c *SyncMapComparableKey[T1, T2]) LockKey(key T1) { // if the kLock is not present, we create a new one // lock it before adding, to prevent other threads from getting the key lock after we add it newKLock := newKeyLock() @@ -120,7 +120,7 @@ func (c *SyncMap[T]) LockKey(key string) { // Load returns the value stored in the map for a key, or nil if no value is present. // The loaded result indicates whether value was found in the map. -func (c *SyncMap[T]) Load(lockedKey string) (value T, loaded bool) { +func (c *SyncMapComparableKey[T1, T2]) Load(lockedKey T1) (value T2, loaded bool) { c.entriesMutex.Lock() defer c.entriesMutex.Unlock() entry, ok := c.entries[lockedKey] @@ -129,7 +129,7 @@ func (c *SyncMap[T]) Load(lockedKey string) (value T, loaded bool) { // LoadOrStore gets the key value if it's present or creates a new one if it isn't, // loaded return value signals if the object was present. -func (c *SyncMap[T]) LoadOrStore(lockedKey string, newEntry T) (value T, loaded bool) { +func (c *SyncMapComparableKey[T1, T2]) LoadOrStore(lockedKey T1, newEntry T2) (value T2, loaded bool) { c.entriesMutex.Lock() defer c.entriesMutex.Unlock() if entry, ok := c.entries[lockedKey]; ok { @@ -142,14 +142,14 @@ func (c *SyncMap[T]) LoadOrStore(lockedKey string, newEntry T) (value T, loaded // Store sets the value for a key. // If key-value was already present, it will be over-written -func (c *SyncMap[T]) Store(lockedKey string, newEntry T) { +func (c *SyncMapComparableKey[T1, T2]) Store(lockedKey T1, newEntry T2) { c.entriesMutex.Lock() defer c.entriesMutex.Unlock() c.entries[lockedKey] = newEntry } // Delete deletes object from the entries map -func (c *SyncMap[T]) Delete(lockedKey string) { +func (c *SyncMapComparableKey[T1, T2]) Delete(lockedKey T1) { c.entriesMutex.Lock() defer c.entriesMutex.Unlock() delete(c.entries, lockedKey) @@ -157,10 +157,10 @@ func (c *SyncMap[T]) Delete(lockedKey string) { // GetKeys returns a snapshot of all keys from entries map. // After this function returns there are no guarantees that the keys in the real entries map are still the same -func (c *SyncMap[T]) GetKeys() []string { +func (c *SyncMapComparableKey[T1, T2]) GetKeys() []T1 { c.entriesMutex.Lock() defer c.entriesMutex.Unlock() - keys := make([]string, len(c.entries)) + keys := make([]T1, len(c.entries)) i := 0 for k := range c.entries { keys[i] = k @@ -170,8 +170,16 @@ func (c *SyncMap[T]) GetKeys() []string { } // DoWithLock takes care of locking and unlocking key. -func (c *SyncMap[T]) DoWithLock(key string, f func(key string) error) error { +func (c *SyncMapComparableKey[T1, T2]) DoWithLock(key T1, f func(key T1) error) error { c.LockKey(key) defer c.UnlockKey(key) return f(key) } + +type SyncMap[T any] struct { + SyncMapComparableKey[string, T] +} + +func NewSyncMap[T any]() *SyncMap[T] { + return &SyncMap[T]{*NewSyncMapComparableKey[string, T]()} +} From c947bd4954622f999b28b450fb11df2eba0a5471 Mon Sep 17 00:00:00 2001 From: Nadia Pinaeva Date: Thu, 17 Aug 2023 11:54:30 +0200 Subject: [PATCH 3/7] Update ExternalGatewayRouteInfoCache to only expose methods, and not internal locks. Use key-based locking to simplify locking logic. Add cleanup functionality for routeInfo: previously only individual routes were deleted, but podIPs with empty lists of gateways and routeInfos themselves were not cleaned up. Signed-off-by: Nadia Pinaeva (cherry picked from commit 23058d92bc13a6676a5973f32b6319911c327cb4) --- .../apbroute/external_controller.go | 60 ++-- .../ovn/controller/apbroute/network_client.go | 261 ++++++------------ go-controller/pkg/ovn/egressgw.go | 259 ++++++----------- 3 files changed, 205 insertions(+), 375 deletions(-) diff --git a/go-controller/pkg/ovn/controller/apbroute/external_controller.go b/go-controller/pkg/ovn/controller/apbroute/external_controller.go index 93033301ea..be848ae0d6 100644 --- a/go-controller/pkg/ovn/controller/apbroute/external_controller.go +++ b/go-controller/pkg/ovn/controller/apbroute/external_controller.go @@ -38,8 +38,6 @@ func newPodInfo() *podInfo { } type RouteInfo struct { - sync.Mutex - Deleted bool PodName ktypes.NamespacedName // PodExternalRoutes is a cache keeping the LR routes added to the GRs when // external gateways are used. The first map key is the podIP (src-ip of the route), @@ -49,40 +47,60 @@ type RouteInfo struct { type ExternalGatewayRouteInfoCache struct { // External gateway caches - // Make them public so that they can be used by the annotation logic to lock on namespaces and share the same external route information - ExternalGWCache map[ktypes.NamespacedName]*RouteInfo - ExGWCacheMutex *sync.RWMutex - routeInfoLocks *syncmap.SyncMap[string] + routeInfos *syncmap.SyncMapComparableKey[ktypes.NamespacedName, *RouteInfo] } func NewExternalGatewayRouteInfoCache() *ExternalGatewayRouteInfoCache { return &ExternalGatewayRouteInfoCache{ - ExternalGWCache: make(map[ktypes.NamespacedName]*RouteInfo), - ExGWCacheMutex: &sync.RWMutex{}, - routeInfoLocks: syncmap.NewSyncMap[string](), + routeInfos: syncmap.NewSyncMapComparableKey[ktypes.NamespacedName, *RouteInfo](), } } -func (e *ExternalGatewayRouteInfoCache) GetRouteInfoDeletedStatus(podName ktypes.NamespacedName) bool { - var status bool - _ = e.routeInfoLocks.DoWithLock(podName.String(), func(_ string) error { - if v, ok := e.ExternalGWCache[podName]; ok { - status = v.Deleted +func (e *ExternalGatewayRouteInfoCache) CreateOrLoad(podName ktypes.NamespacedName, f func(routeInfo *RouteInfo) error) error { + return e.routeInfos.DoWithLock(podName, func(key ktypes.NamespacedName) error { + routeInfo := &RouteInfo{ + PodExternalRoutes: make(map[string]map[string]string), + PodName: podName, } - return nil + routeInfo, _ = e.routeInfos.LoadOrStore(key, routeInfo) + return f(routeInfo) }) - return status } -func (e *ExternalGatewayRouteInfoCache) SetRouteInfoDeletedStatus(podName ktypes.NamespacedName, status bool) { - _ = e.routeInfoLocks.DoWithLock(podName.String(), func(_ string) error { - if v, ok := e.ExternalGWCache[podName]; ok { - v.Deleted = status +// Cleanup will call given function with lock and remove empty maps from the routeInfo. +// If routeInfo is empty after cleanup, it will be deleted from the cache +func (e *ExternalGatewayRouteInfoCache) Cleanup(podName ktypes.NamespacedName, f func(routeInfo *RouteInfo) error) error { + return e.routeInfos.DoWithLock(podName, func(key ktypes.NamespacedName) error { + routeInfo, loaded := e.routeInfos.Load(key) + if !loaded { + return nil } - return nil + err := f(routeInfo) + for podIP, routes := range routeInfo.PodExternalRoutes { + if len(routes) == 0 { + delete(routeInfo.PodExternalRoutes, podIP) + } + } + if err == nil && len(routeInfo.PodExternalRoutes) == 0 { + e.routeInfos.Delete(key) + } + return err }) } +// CleanupNamespace calls e.Cleanup for every pod from namespace=nsName +func (e *ExternalGatewayRouteInfoCache) CleanupNamespace(nsName string, f func(routeInfo *RouteInfo) error) error { + for _, podName := range e.routeInfos.GetKeys() { + if podName.Namespace == nsName { + err := e.Cleanup(podName, f) + if err != nil { + return err + } + } + } + return nil +} + // routePolicyState contains current policy state as it was applied. // Since every config is applied to a pod, podInfo stores current state for every target pod. type routePolicyState struct { diff --git a/go-controller/pkg/ovn/controller/apbroute/network_client.go b/go-controller/pkg/ovn/controller/apbroute/network_client.go index 31aa6bfe04..a490d27c88 100644 --- a/go-controller/pkg/ovn/controller/apbroute/network_client.go +++ b/go-controller/pkg/ovn/controller/apbroute/network_client.go @@ -126,67 +126,37 @@ func (nb *northBoundClient) delAllLegacyHybridRoutePolicies() error { // If a set of gateways is given, only routes for that gateway are deleted. If no gateways // are given, all routes for the namespace are deleted. func (nb *northBoundClient) deleteGatewayIPs(podNsName ktypes.NamespacedName, toBeDeletedGWIPs, _ sets.Set[string]) error { - for _, routeInfo := range nb.getRouteInfosForPod(podNsName) { - // if we encounter error while deleting routes for one pod; we return and don't try subsequent pods - if err := nb.deletePodGWRoutes(routeInfo, toBeDeletedGWIPs, podNsName); err != nil { - return err - } - } - return nil -} - -// deletePodGWRoutes removes known exgw routes for a pod via routeInfo for a list of given GW IPs -func (nb *northBoundClient) deletePodGWRoutes(routeInfo *RouteInfo, toBeDeletedGWIPs sets.Set[string], podNsName ktypes.NamespacedName) error { - routeInfo.Lock() - defer routeInfo.Unlock() - if nb.externalGatewayRouteInfo.GetRouteInfoDeletedStatus(podNsName) { - return nil - } - pod, err := nb.podLister.Pods(routeInfo.PodName.Namespace).Get(routeInfo.PodName.Name) - var deletedPod bool - if err != nil && apierrors.IsNotFound(err) { - // Mark this routeInfo as deleted - deletedPod = true - } - if err == nil { - local, err := nb.isPodInLocalZone(pod) - if err != nil { - return err - } - if !local { - klog.V(4).Infof("APB will not delete exgw routes for pod %s not in the local zone %s", routeInfo.PodName, nb.zone) - return nil + return nb.externalGatewayRouteInfo.Cleanup(podNsName, func(routeInfo *RouteInfo) error { + pod, err := nb.podLister.Pods(routeInfo.PodName.Namespace).Get(routeInfo.PodName.Name) + var deletedPod bool + if err != nil && apierrors.IsNotFound(err) { + // Mark this routeInfo as deleted + deletedPod = true + } + if err == nil { + local, err := nb.isPodInLocalZone(pod) + if err != nil { + return err + } + if !local { + klog.V(4).Infof("APB will not delete exgw routes for pod %s not in the local zone %s", routeInfo.PodName, nb.zone) + return nil + } } - } - for podIP, routes := range routeInfo.PodExternalRoutes { - for gw, gr := range routes { - if toBeDeletedGWIPs.Has(gw) || deletedPod { - // we cannot delete an external gateway IP from the north bound if it's also being provided by an external gateway annotation or if it is also - // defined by a coexisting policy in the same namespace - if err := nb.deletePodGWRoute(routeInfo, podIP, gw, gr); err != nil { - return fmt.Errorf("APB delete pod GW route failed: %w", err) + for podIP, routes := range routeInfo.PodExternalRoutes { + for gw, gr := range routes { + if toBeDeletedGWIPs.Has(gw) || deletedPod { + // we cannot delete an external gateway IP from the north bound if it's also being provided by an external gateway annotation or if it is also + // defined by a coexisting policy in the same namespace + if err := nb.deletePodGWRoute(routeInfo, podIP, gw, gr); err != nil { + return fmt.Errorf("APB delete pod GW route failed: %w", err) + } + delete(routes, gw) } - delete(routes, gw) } } - } - nb.externalGatewayRouteInfo.SetRouteInfoDeletedStatus(podNsName, deletedPod) - return nil -} - -// getRouteInfosForPod returns all routeInfos for a specific namespace -func (nb *northBoundClient) getRouteInfosForPod(podNsName ktypes.NamespacedName) []*RouteInfo { - nb.externalGatewayRouteInfo.ExGWCacheMutex.RLock() - defer nb.externalGatewayRouteInfo.ExGWCacheMutex.RUnlock() - - routes := make([]*RouteInfo, 0) - for namespacedName, routeInfo := range nb.externalGatewayRouteInfo.ExternalGWCache { - if namespacedName == podNsName { - routes = append(routes, routeInfo) - } - } - - return routes + return nil + }) } func (nb *northBoundClient) addGatewayIPs(pod *v1.Pod, egress *gateway_info.GatewayInfoList) error { @@ -269,50 +239,47 @@ func (nb *northBoundClient) addGWRoutesForPod(gateways []*gateway_info.GatewayIn } port := portPrefix + types.GWRouterToExtSwitchPrefix + gr - routeInfo, err := nb.ensureRouteInfoLocked(podNsName) - if err != nil { - return fmt.Errorf("failed to ensure routeInfo for %s, error: %v", podNsName, err) - } - defer routeInfo.Unlock() - for _, podIPNet := range podIfAddrs { - for _, gateway := range gateways { - // TODO (trozet): use the go bindings here and batch commands - // validate the ip and gateway belong to the same address family - gws, err := util.MatchAllIPStringFamily(utilnet.IsIPv6(podIPNet.IP), gateway.Gateways.UnsortedList()) - if err != nil { - klog.Warningf("Address families for the pod address %s and gateway %s did not match", podIPNet.IP.String(), gateway.Gateways) - continue - } - podIP := podIPNet.IP.String() - for _, gw := range gws { - // if route was already programmed, skip it - if foundGR, ok := routeInfo.PodExternalRoutes[podIP][gw]; ok && foundGR == gr { - routesAdded++ + return nb.externalGatewayRouteInfo.CreateOrLoad(podNsName, func(routeInfo *RouteInfo) error { + for _, podIPNet := range podIfAddrs { + for _, gateway := range gateways { + // TODO (trozet): use the go bindings here and batch commands + // validate the ip and gateway belong to the same address family + gws, err := util.MatchAllIPStringFamily(utilnet.IsIPv6(podIPNet.IP), gateway.Gateways.UnsortedList()) + if err != nil { + klog.Warningf("Address families for the pod address %s and gateway %s did not match", podIPNet.IP.String(), gateway.Gateways) continue } - mask := util.GetIPFullMaskString(podIP) - if err := nb.createOrUpdateBFDStaticRoute(gateway.BFDEnabled, gw, podIP, gr, port, mask); err != nil { - return err - } - if routeInfo.PodExternalRoutes[podIP] == nil { - routeInfo.PodExternalRoutes[podIP] = make(map[string]string) - } - routeInfo.PodExternalRoutes[podIP][gw] = gr - routesAdded++ - if len(routeInfo.PodExternalRoutes[podIP]) == 1 { - if err := nb.addHybridRoutePolicyForPod(podIPNet.IP, node); err != nil { + podIP := podIPNet.IP.String() + for _, gw := range gws { + // if route was already programmed, skip it + if foundGR, ok := routeInfo.PodExternalRoutes[podIP][gw]; ok && foundGR == gr { + routesAdded++ + continue + } + mask := util.GetIPFullMaskString(podIP) + if err := nb.createOrUpdateBFDStaticRoute(gateway.BFDEnabled, gw, podIP, gr, port, mask); err != nil { return err } + if routeInfo.PodExternalRoutes[podIP] == nil { + routeInfo.PodExternalRoutes[podIP] = make(map[string]string) + } + routeInfo.PodExternalRoutes[podIP][gw] = gr + routesAdded++ + if len(routeInfo.PodExternalRoutes[podIP]) == 1 { + if err := nb.addHybridRoutePolicyForPod(podIPNet.IP, node); err != nil { + return err + } + } } } } - } - // if no routes are added return an error - if routesAdded < 1 { - return fmt.Errorf("gateway specified for namespace %s with gateway addresses %v but no valid routes exist for pod: %s", - podNsName.Namespace, podIfAddrs, podNsName.Name) - } - return nil + // if no routes are added return an error + if routesAdded < 1 { + return fmt.Errorf("gateway specified for namespace %s with gateway addresses %v but no valid routes exist for pod: %s", + podNsName.Namespace, podIfAddrs, podNsName.Name) + } + return nil + }) } // AddHybridRoutePolicyForPod handles adding a higher priority allow policy to allow traffic to be routed normally @@ -437,92 +404,42 @@ func (nb *northBoundClient) createOrUpdateBFDStaticRoute(bfdEnabled bool, gw str func (nb *northBoundClient) updateExternalGWInfoCacheForPodIPWithGatewayIP(podIP, gwIP, nodeName string, bfdEnabled bool, namespacedName ktypes.NamespacedName) error { gr := util.GetGatewayRouterFromNode(nodeName) - routeInfo, err := nb.ensureRouteInfoLocked(namespacedName) - if err != nil { - return fmt.Errorf("failed to ensure routeInfo for %s, error: %v", namespacedName.Name, err) - } - defer routeInfo.Unlock() - // if route was already programmed, skip it - if foundGR, ok := routeInfo.PodExternalRoutes[podIP][gwIP]; ok && foundGR == gr { - return nil - } - mask := util.GetIPFullMaskString(podIP) - portPrefix, err := nb.extSwitchPrefix(nodeName) - if err != nil { - klog.Warningf("Failed to find ext switch prefix for %s %v", nodeName, err) - return err - } - if bfdEnabled { - port := portPrefix + types.GWRouterToExtSwitchPrefix + gr - // update the BFD static route just in case it has changed - if err := nb.createOrUpdateBFDStaticRoute(bfdEnabled, gwIP, podIP, gr, port, mask); err != nil { - return err + return nb.externalGatewayRouteInfo.CreateOrLoad(namespacedName, func(routeInfo *RouteInfo) error { + // if route was already programmed, skip it + if foundGR, ok := routeInfo.PodExternalRoutes[podIP][gwIP]; ok && foundGR == gr { + return nil } - } else { - _, err := nb.lookupBFDEntry(gwIP, gr, portPrefix) + mask := util.GetIPFullMaskString(podIP) + + portPrefix, err := nb.extSwitchPrefix(nodeName) if err != nil { - err = nb.cleanUpBFDEntry(gwIP, gr, portPrefix) - if err != nil { + klog.Warningf("Failed to find ext switch prefix for %s %v", nodeName, err) + return err + } + if bfdEnabled { + port := portPrefix + types.GWRouterToExtSwitchPrefix + gr + // update the BFD static route just in case it has changed + if err := nb.createOrUpdateBFDStaticRoute(bfdEnabled, gwIP, podIP, gr, port, mask); err != nil { return err } + } else { + _, err := nb.lookupBFDEntry(gwIP, gr, portPrefix) + if err != nil { + err = nb.cleanUpBFDEntry(gwIP, gr, portPrefix) + if err != nil { + return err + } + } } - } - if routeInfo.PodExternalRoutes[podIP] == nil { - routeInfo.PodExternalRoutes[podIP] = make(map[string]string) - } - routeInfo.PodExternalRoutes[podIP][gwIP] = gr - - return nil -} + if routeInfo.PodExternalRoutes[podIP] == nil { + routeInfo.PodExternalRoutes[podIP] = make(map[string]string) + } + routeInfo.PodExternalRoutes[podIP][gwIP] = gr -// ensureRouteInfoLocked either gets the current routeInfo in the cache with a lock, or creates+locks a new one if missing -func (nb *northBoundClient) ensureRouteInfoLocked(podName ktypes.NamespacedName) (*RouteInfo, error) { - // We don't want to hold the cache lock while we try to lock the routeInfo (unless we are creating it, then we know - // no one else is using it). This could lead to dead lock. Therefore the steps here are: - // 1. Get the cache lock, try to find the routeInfo - // 2. If routeInfo existed, release the cache lock - // 3. If routeInfo did not exist, safe to hold the cache lock while we create the new routeInfo - nb.externalGatewayRouteInfo.ExGWCacheMutex.Lock() - routeInfo, ok := nb.externalGatewayRouteInfo.ExternalGWCache[podName] - var isDeleted bool - if !ok { - routeInfo = &RouteInfo{ - PodExternalRoutes: make(map[string]map[string]string), - PodName: podName, - } - // we are creating routeInfo and going to set it in podExternalRoutes map - // so safe to hold the lock while we create and add it - defer nb.externalGatewayRouteInfo.ExGWCacheMutex.Unlock() - nb.externalGatewayRouteInfo.ExternalGWCache[podName] = routeInfo - } else { - // capture the current status of the routeInfo. Compare it once - // the route info lock is secured to check if the status was changed - // while waiting for the lock. - isDeleted = nb.externalGatewayRouteInfo.GetRouteInfoDeletedStatus(podName) - // if we found an existing routeInfo, do not hold the cache lock - // while waiting for routeInfo to Lock - nb.externalGatewayRouteInfo.ExGWCacheMutex.Unlock() - } - - // 4. Now lock the routeInfo - routeInfo.Lock() - - // 5. If routeInfo was deleted between releasing the cache lock and grabbing - // the routeInfo lock, return an error so the caller doesn't use it and - // retries the operation later - if nb.externalGatewayRouteInfo.GetRouteInfoDeletedStatus(podName) { - if !isDeleted { - // info was modified while waiting for unlock, return error and retry later - routeInfo.Unlock() - return nil, fmt.Errorf("routeInfo for pod %s, was altered during ensure route info", podName) - } - // it was already deleted before the lock, so change the status as not deleted - nb.externalGatewayRouteInfo.SetRouteInfoDeletedStatus(podName, false) - } - - return routeInfo, nil + return nil + }) } func (nb *northBoundClient) deletePodGWRoute(routeInfo *RouteInfo, podIP, gw, gr string) error { diff --git a/go-controller/pkg/ovn/egressgw.go b/go-controller/pkg/ovn/egressgw.go index 5f586151f5..73c1dd1e8e 100644 --- a/go-controller/pkg/ovn/egressgw.go +++ b/go-controller/pkg/ovn/egressgw.go @@ -35,97 +35,6 @@ type gatewayInfo struct { bfdEnabled bool } -// ensureRouteInfoLocked either gets the current routeInfo in the cache with a lock, or creates+locks a new one if missing -func (oc *DefaultNetworkController) ensureRouteInfoLocked(podName ktypes.NamespacedName) (*apbroutecontroller.RouteInfo, error) { - // We don't want to hold the cache lock while we try to lock the routeInfo (unless we are creating it, then we know - // no one else is using it). This could lead to dead lock. Therefore the steps here are: - // 1. Get the cache lock, try to find the routeInfo - // 2. If routeInfo existed, release the cache lock - // 3. If routeInfo did not exist, safe to hold the cache lock while we create the new routeInfo - oc.externalGatewayRouteInfo.ExGWCacheMutex.Lock() - routeInfo, ok := oc.externalGatewayRouteInfo.ExternalGWCache[podName] - var isDeleted bool - if !ok { - routeInfo = &apbroutecontroller.RouteInfo{ - PodExternalRoutes: make(map[string]map[string]string), - PodName: podName, - } - // we are creating routeInfo and going to set it in podExternalRoutes map - // so safe to hold the lock while we create and add it - defer oc.externalGatewayRouteInfo.ExGWCacheMutex.Unlock() - oc.externalGatewayRouteInfo.ExternalGWCache[podName] = routeInfo - } else { - // if we found an existing routeInfo, do not hold the cache lock - // while waiting for routeInfo to Lock - isDeleted = oc.externalGatewayRouteInfo.GetRouteInfoDeletedStatus(podName) - oc.externalGatewayRouteInfo.ExGWCacheMutex.Unlock() - } - - // 4. Now lock the routeInfo - routeInfo.Lock() - - // 5. If routeInfo was deleted between releasing the cache lock and grabbing - // the routeInfo lock, return an error so the caller doesn't use it and - // retries the operation later - if oc.externalGatewayRouteInfo.GetRouteInfoDeletedStatus(podName) { - if !isDeleted { - // info was modified while waiting for unlock, return error and retry later - routeInfo.Unlock() - return nil, fmt.Errorf("routeInfo for pod %s, was altered during ensure route info", podName) - } - // it was already deleted before the lock, so change the status as not deleted - oc.externalGatewayRouteInfo.SetRouteInfoDeletedStatus(podName, false) - } - - return routeInfo, nil -} - -// getRouteInfosForNamespace returns all routeInfos for a specific namespace -func (oc *DefaultNetworkController) getRouteInfosForNamespace(namespace string) map[ktypes.NamespacedName]*apbroutecontroller.RouteInfo { - oc.externalGatewayRouteInfo.ExGWCacheMutex.RLock() - defer oc.externalGatewayRouteInfo.ExGWCacheMutex.RUnlock() - - routes := make(map[ktypes.NamespacedName]*apbroutecontroller.RouteInfo) - for namespacedName, routeInfo := range oc.externalGatewayRouteInfo.ExternalGWCache { - if namespacedName.Namespace == namespace { - routes[namespacedName] = routeInfo - } - } - - return routes -} - -// deleteRouteInfoLocked removes a routeInfo from the cache, and returns it locked -func (oc *DefaultNetworkController) deleteRouteInfoLocked(name ktypes.NamespacedName) *apbroutecontroller.RouteInfo { - // Attempt to find the routeInfo in the cache, release the cache lock while - // we try to lock the routeInfo to avoid any deadlock - oc.externalGatewayRouteInfo.ExGWCacheMutex.RLock() - routeInfo := oc.externalGatewayRouteInfo.ExternalGWCache[name] - oc.externalGatewayRouteInfo.ExGWCacheMutex.RUnlock() - - if routeInfo == nil { - return nil - } - routeInfo.Lock() - - if oc.externalGatewayRouteInfo.GetRouteInfoDeletedStatus(name) { - routeInfo.Unlock() - return nil - } - - oc.externalGatewayRouteInfo.SetRouteInfoDeletedStatus(name, true) - - go func() { - oc.externalGatewayRouteInfo.ExGWCacheMutex.Lock() - defer oc.externalGatewayRouteInfo.ExGWCacheMutex.Unlock() - if newRouteInfo := oc.externalGatewayRouteInfo.ExternalGWCache[name]; routeInfo == newRouteInfo { - delete(oc.externalGatewayRouteInfo.ExternalGWCache, name) - } - }() - - return routeInfo -} - // addPodExternalGW handles detecting if a pod is serving as an external gateway for namespace(s) and adding routes // to all pods in that namespace func (oc *DefaultNetworkController) addPodExternalGW(pod *kapi.Pod) error { @@ -425,68 +334,57 @@ func (oc *DefaultNetworkController) deleteGWRoutesForNamespace(namespace string, return err } policyGWIPs = policyGWIPs.Union(policyStaticGWIPs) - for podNsName, routeInfo := range oc.getRouteInfosForNamespace(namespace) { - routeInfo.Lock() - if oc.externalGatewayRouteInfo.GetRouteInfoDeletedStatus(podNsName) { - routeInfo.Unlock() - continue - } + return oc.externalGatewayRouteInfo.CleanupNamespace(namespace, func(routeInfo *apbroutecontroller.RouteInfo) error { for podIP, routes := range routeInfo.PodExternalRoutes { for gw, gr := range routes { if (deleteAll || matchGWs.Has(gw)) && !policyGWIPs.Has(gw) { if err := oc.deletePodGWRoute(routeInfo, podIP, gw, gr); err != nil { // if we encounter error while deleting routes for one pod; we return and don't try subsequent pods - routeInfo.Unlock() return fmt.Errorf("delete pod GW route failed: %w", err) } delete(routes, gw) } } } - routeInfo.Unlock() - } - return nil + return nil + }) } // deleteGwRoutesForPod handles deleting all routes to gateways for a pod IP on a specific GR func (oc *DefaultNetworkController) deleteGWRoutesForPod(name ktypes.NamespacedName, podIPNets []*net.IPNet) (err error) { - routeInfo := oc.deleteRouteInfoLocked(name) - if routeInfo == nil { - return nil - } - defer routeInfo.Unlock() - - policyGWIPs, err := oc.apbExternalRouteController.GetDynamicGatewayIPsForTargetNamespace(name.Namespace) - if err != nil { - return err - } - policyStaticGWIPs, err := oc.apbExternalRouteController.GetStaticGatewayIPsForTargetNamespace(name.Namespace) - if err != nil { - return err - } - policyGWIPs = policyGWIPs.Union(policyStaticGWIPs) - - for _, podIPNet := range podIPNets { - podIP := podIPNet.IP.String() - routes, ok := routeInfo.PodExternalRoutes[podIP] - if !ok { - continue + return oc.externalGatewayRouteInfo.Cleanup(name, func(routeInfo *apbroutecontroller.RouteInfo) error { + policyGWIPs, err := oc.apbExternalRouteController.GetDynamicGatewayIPsForTargetNamespace(name.Namespace) + if err != nil { + return err } - if len(routes) == 0 { - delete(routeInfo.PodExternalRoutes, podIP) - continue + policyStaticGWIPs, err := oc.apbExternalRouteController.GetStaticGatewayIPsForTargetNamespace(name.Namespace) + if err != nil { + return err } - for gw, gr := range routes { - if !policyGWIPs.Has(gw) { - if err := oc.deletePodGWRoute(routeInfo, podIP, gw, gr); err != nil { - // if we encounter error while deleting routes for one pod; we return and don't try subsequent pods - return fmt.Errorf("delete pod GW route failed: %w", err) + policyGWIPs = policyGWIPs.Union(policyStaticGWIPs) + + for _, podIPNet := range podIPNets { + podIP := podIPNet.IP.String() + routes, ok := routeInfo.PodExternalRoutes[podIP] + if !ok { + continue + } + if len(routes) == 0 { + delete(routeInfo.PodExternalRoutes, podIP) + continue + } + for gw, gr := range routes { + if !policyGWIPs.Has(gw) { + if err := oc.deletePodGWRoute(routeInfo, podIP, gw, gr); err != nil { + // if we encounter error while deleting routes for one pod; we return and don't try subsequent pods + return fmt.Errorf("delete pod GW route failed: %w", err) + } + delete(routes, gw) } - delete(routes, gw) } } - } - return nil + return nil + }) } // addEgressGwRoutesForPod handles adding all routes to gateways for a pod on a specific GR @@ -515,63 +413,60 @@ func (oc *DefaultNetworkController) addGWRoutesForPod(gateways []*gatewayInfo, p } port := portPrefix + types.GWRouterToExtSwitchPrefix + gr - routeInfo, err := oc.ensureRouteInfoLocked(podNsName) - if err != nil { - return fmt.Errorf("failed to ensure routeInfo for %s, error: %v", podNsName, err) - } - policyGWIPs, err := oc.apbExternalRouteController.GetDynamicGatewayIPsForTargetNamespace(podNsName.Namespace) - if err != nil { - return err - } - policyStaticGWIPs, err := oc.apbExternalRouteController.GetStaticGatewayIPsForTargetNamespace(podNsName.Namespace) - if err != nil { - return err - } - policyGWIPs = policyGWIPs.Union(policyStaticGWIPs) - defer routeInfo.Unlock() - - for _, podIPNet := range podIfAddrs { - for _, gateway := range gateways { - // TODO (trozet): use the go bindings here and batch commands - // validate the ip and gateway belong to the same address family - gws, err := util.MatchAllIPStringFamily(utilnet.IsIPv6(podIPNet.IP), gateway.gws.UnsortedList()) - if err == nil { - podIP := podIPNet.IP.String() - for _, gw := range gws { - // if route was already programmed, skip it - foundGR, ok := routeInfo.PodExternalRoutes[podIP][gw] - if (ok && foundGR == gr) || policyGWIPs.Has(gw) { - routesAdded++ - continue - } - mask := util.GetIPFullMaskString(podIP) + return oc.externalGatewayRouteInfo.CreateOrLoad(podNsName, func(routeInfo *apbroutecontroller.RouteInfo) error { + policyGWIPs, err := oc.apbExternalRouteController.GetDynamicGatewayIPsForTargetNamespace(podNsName.Namespace) + if err != nil { + return err + } + policyStaticGWIPs, err := oc.apbExternalRouteController.GetStaticGatewayIPsForTargetNamespace(podNsName.Namespace) + if err != nil { + return err + } + policyGWIPs = policyGWIPs.Union(policyStaticGWIPs) + + for _, podIPNet := range podIfAddrs { + for _, gateway := range gateways { + // TODO (trozet): use the go bindings here and batch commands + // validate the ip and gateway belong to the same address family + gws, err := util.MatchAllIPStringFamily(utilnet.IsIPv6(podIPNet.IP), gateway.gws.UnsortedList()) + if err == nil { + podIP := podIPNet.IP.String() + for _, gw := range gws { + // if route was already programmed, skip it + foundGR, ok := routeInfo.PodExternalRoutes[podIP][gw] + if (ok && foundGR == gr) || policyGWIPs.Has(gw) { + routesAdded++ + continue + } + mask := util.GetIPFullMaskString(podIP) - if err := oc.createBFDStaticRoute(gateway.bfdEnabled, gw, podIP, gr, port, mask); err != nil { - return err - } - if routeInfo.PodExternalRoutes[podIP] == nil { - routeInfo.PodExternalRoutes[podIP] = make(map[string]string) - } - routeInfo.PodExternalRoutes[podIP][gw] = gr - routesAdded++ - if len(routeInfo.PodExternalRoutes[podIP]) == 1 { - if err := oc.addHybridRoutePolicyForPod(podIPNet.IP, node); err != nil { + if err := oc.createBFDStaticRoute(gateway.bfdEnabled, gw, podIP, gr, port, mask); err != nil { return err } + if routeInfo.PodExternalRoutes[podIP] == nil { + routeInfo.PodExternalRoutes[podIP] = make(map[string]string) + } + routeInfo.PodExternalRoutes[podIP][gw] = gr + routesAdded++ + if len(routeInfo.PodExternalRoutes[podIP]) == 1 { + if err := oc.addHybridRoutePolicyForPod(podIPNet.IP, node); err != nil { + return err + } + } } + } else { + klog.Warningf("Address families for the pod address %s and gateway %s did not match", podIPNet.IP.String(), gateway.gws) } - } else { - klog.Warningf("Address families for the pod address %s and gateway %s did not match", podIPNet.IP.String(), gateway.gws) } } - } - // if no routes are added return an error - if routesAdded < 1 { - return fmt.Errorf("gateway specified for namespace %s with gateway addresses %v but no valid routes exist for pod: %s", - podNsName.Namespace, podIfAddrs, podNsName.Name) - } - return nil + // if no routes are added return an error + if routesAdded < 1 { + return fmt.Errorf("gateway specified for namespace %s with gateway addresses %v but no valid routes exist for pod: %s", + podNsName.Namespace, podIfAddrs, podNsName.Name) + } + return nil + }) } // deletePodSNAT removes per pod SNAT rules towards the nodeIP that are applied to the GR where the pod resides From b8e052dd88f097b4055d5d6f15dceada5ea625e5 Mon Sep 17 00:00:00 2001 From: jordigilh Date: Tue, 29 Aug 2023 14:27:23 -0400 Subject: [PATCH 4/7] [APB External Route]: Add unit test to cover pod delete -> recreate scenario Signed-off-by: jordigilh Signed-off-by: Jordi Gil (cherry picked from commit 1c67bacbf08908fe390c0e90f817750102481f4a) --- .../apbroute/external_controller_pod_test.go | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/go-controller/pkg/ovn/controller/apbroute/external_controller_pod_test.go b/go-controller/pkg/ovn/controller/apbroute/external_controller_pod_test.go index 93c45ecce8..2837e423c9 100644 --- a/go-controller/pkg/ovn/controller/apbroute/external_controller_pod_test.go +++ b/go-controller/pkg/ovn/controller/apbroute/external_controller_pod_test.go @@ -280,6 +280,40 @@ var _ = Describe("OVN External Gateway pod", func() { eventuallyExpectConfig(dynamicPolicy.Name, expectedPolicy1, expectedRefs1) eventuallyExpectConfig(dynamicPolicyDiffTargetNS.Name, expectedPolicy2, expectedRefs2) }) + + It("deletes a target pod that matches a policy and creates it again", func() { + initController([]runtime.Object{namespaceGW, namespaceTarget2, targetPod2, pod1}, + []runtime.Object{dynamicPolicy}) + + expectedPolicy1, expectedRefs1 := expectedPolicyStateAndRefs( + []*namespaceWithPods{namespaceTarget2WithPod}, + nil, + []*namespaceWithPods{namespaceGWWithPod}, false) + + eventuallyExpectNumberOfPolicies(1) + eventuallyExpectConfig(dynamicPolicy.Name, expectedPolicy1, expectedRefs1) + + By("delete one of the target pods") + deletePod(targetPod2, fakeClient) + expectedPolicy1, expectedRefs1 = expectedPolicyStateAndRefs( + []*namespaceWithPods{{nsName: targetNamespaceName2}}, + nil, + []*namespaceWithPods{namespaceGWWithPod}, false) + + eventuallyExpectNumberOfPolicies(1) + eventuallyExpectConfig(dynamicPolicy.Name, expectedPolicy1, expectedRefs1) + + By("create the deleted target pod") + + createPod(targetPod2, fakeClient) + expectedPolicy1, expectedRefs1 = expectedPolicyStateAndRefs( + []*namespaceWithPods{namespaceTarget2WithPod}, + nil, + []*namespaceWithPods{namespaceGWWithPod}, false) + + eventuallyExpectNumberOfPolicies(1) + eventuallyExpectConfig(dynamicPolicy.Name, expectedPolicy1, expectedRefs1) + }) }) var _ = Context("When updating a pod", func() { From af933eddad6d8c3e03e130605480c8dc229013e9 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Mon, 25 Sep 2023 12:19:41 -0400 Subject: [PATCH 5/7] Added missing documentation for new functions Signed-off-by: Jordi Gil (cherry picked from commit 99767392fb8c70e70ed2092a0819b6d8f2e179b9) --- .../ovn/controller/apbroute/external_controller.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/go-controller/pkg/ovn/controller/apbroute/external_controller.go b/go-controller/pkg/ovn/controller/apbroute/external_controller.go index be848ae0d6..21295d10af 100644 --- a/go-controller/pkg/ovn/controller/apbroute/external_controller.go +++ b/go-controller/pkg/ovn/controller/apbroute/external_controller.go @@ -56,6 +56,11 @@ func NewExternalGatewayRouteInfoCache() *ExternalGatewayRouteInfoCache { } } +// CreateOrLoad provides a mechanism to initialize keys in the cache before calling the argument function `f`. This approach +// hides the logic to initialize and retrieval of the key's routeInfo and allows reusability by exposing a function signature as argument +// that has a routeInfo instance as argument. The function will attempt to retrieve the routeInfo for a given key, +// and create an empty routeInfo structure in the cache when not found. Then it will execute the function argument `f` passing +// the routeInfo as argument. func (e *ExternalGatewayRouteInfoCache) CreateOrLoad(podName ktypes.NamespacedName, f func(routeInfo *RouteInfo) error) error { return e.routeInfos.DoWithLock(podName, func(key ktypes.NamespacedName) error { routeInfo := &RouteInfo{ @@ -67,8 +72,9 @@ func (e *ExternalGatewayRouteInfoCache) CreateOrLoad(podName ktypes.NamespacedNa }) } -// Cleanup will call given function with lock and remove empty maps from the routeInfo. -// If routeInfo is empty after cleanup, it will be deleted from the cache +// Cleanup will lock the key `podName` and use the routeInfo associated to the key to pass it as an argument to function `f`. +// After the function `f` completes, it will delete any empty PodExternalRoutes references for each given podIP in the routeInfo object, +// as well as deleting the key itself if it contains no entries in its `PodExternalRoutes` map. func (e *ExternalGatewayRouteInfoCache) Cleanup(podName ktypes.NamespacedName, f func(routeInfo *RouteInfo) error) error { return e.routeInfos.DoWithLock(podName, func(key ktypes.NamespacedName) error { routeInfo, loaded := e.routeInfos.Load(key) @@ -88,7 +94,8 @@ func (e *ExternalGatewayRouteInfoCache) Cleanup(podName ktypes.NamespacedName, f }) } -// CleanupNamespace calls e.Cleanup for every pod from namespace=nsName +// CleanupNamespace wraps the cleanup call for all the pods in a given namespace. +// The routeInfo reference for each pod in the given namespace is processed by the `f` function inside the `Cleanup` function func (e *ExternalGatewayRouteInfoCache) CleanupNamespace(nsName string, f func(routeInfo *RouteInfo) error) error { for _, podName := range e.routeInfos.GetKeys() { if podName.Namespace == nsName { From 259e70b3e39f195577b0612e366961905f5b67c0 Mon Sep 17 00:00:00 2001 From: Surya Seetharaman Date: Tue, 26 Sep 2023 21:12:38 +0200 Subject: [PATCH 6/7] IC/ICNI: Remove the need for namespace updates Signed-off-by: Surya Seetharaman (cherry picked from commit 7b0ba0daa60b674af8287a2b7e783dbd4e61d34e) --- .../node/default_node_network_controller.go | 23 ++- go-controller/pkg/node/obj_retry_node.go | 19 +- .../controller/apbroute/master_controller.go | 13 ++ .../pkg/ovn/default_network_controller.go | 12 ++ go-controller/pkg/ovn/egressgw.go | 167 +++++++++++++++++- go-controller/pkg/ovn/namespace.go | 20 +++ test/e2e/external_gateways.go | 56 +++--- 7 files changed, 272 insertions(+), 38 deletions(-) diff --git a/go-controller/pkg/node/default_node_network_controller.go b/go-controller/pkg/node/default_node_network_controller.go index 5c45e3a6ce..5f1d7ef697 100644 --- a/go-controller/pkg/node/default_node_network_controller.go +++ b/go-controller/pkg/node/default_node_network_controller.go @@ -1079,15 +1079,22 @@ func (nc *DefaultNodeNetworkController) Start(ctx context.Context) error { } if config.OvnKubeNode.Mode != types.NodeModeDPUHost { - util.SetARPTimeout() - err := nc.WatchNamespaces() - if err != nil { - return fmt.Errorf("failed to watch namespaces: %w", err) + // If interconnect is disabled OR interconnect is running in single-zone-mode, + // the ovnkube-master is responsible for patching ICNI managed namespaces with + // "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush + // conntrack on every node. In multi-zone-interconnect case, we will handle the flushing + // directly on the ovnkube-controller code to avoid an extra namespace annotation + if !config.OVNKubernetesFeature.EnableInterconnect || sbZone == types.OvnDefaultZone { + util.SetARPTimeout() + err := nc.WatchNamespaces() + if err != nil { + return fmt.Errorf("failed to watch namespaces: %w", err) + } + // every minute cleanup stale conntrack entries if any + go wait.Until(func() { + nc.checkAndDeleteStaleConntrackEntries() + }, time.Minute*1, nc.stopChan) } - // every minute cleanup stale conntrack entries if any - go wait.Until(func() { - nc.checkAndDeleteStaleConntrackEntries() - }, time.Minute*1, nc.stopChan) err = nc.WatchEndpointSlices() if err != nil { return fmt.Errorf("failed to watch endpointSlices: %w", err) diff --git a/go-controller/pkg/node/obj_retry_node.go b/go-controller/pkg/node/obj_retry_node.go index cf508aef92..a52f775df7 100644 --- a/go-controller/pkg/node/obj_retry_node.go +++ b/go-controller/pkg/node/obj_retry_node.go @@ -8,8 +8,11 @@ import ( discovery "k8s.io/api/discovery/v1" cache "k8s.io/client-go/tools/cache" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/retry" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util" ) type nodeEventHandler struct { @@ -159,8 +162,20 @@ func (h *nodeEventHandler) AddResource(obj interface{}, fromRetryLoop bool) erro func (h *nodeEventHandler) UpdateResource(oldObj, newObj interface{}, inRetryCache bool) error { switch h.objType { case factory.NamespaceExGwType: - newNs := newObj.(*kapi.Namespace) - return h.nc.syncConntrackForExternalGateways(newNs) + // If interconnect is disabled OR interconnect is running in single-zone-mode, + // the ovnkube-master is responsible for patching ICNI managed namespaces with + // "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush + // conntrack on every node. In multi-zone-interconnect case, we will handle the flushing + // directly on the ovnkube-controller code to avoid an extra namespace annotation + node, err := h.nc.watchFactory.GetNode(h.nc.name) + if err != nil { + return fmt.Errorf("error retrieving node %s: %v", h.nc.name, err) + } + if !config.OVNKubernetesFeature.EnableInterconnect || util.GetNodeZone(node) == types.OvnDefaultZone { + newNs := newObj.(*kapi.Namespace) + return h.nc.syncConntrackForExternalGateways(newNs) + } + return nil case factory.EndpointSliceForStaleConntrackRemovalType: oldEndpointSlice := oldObj.(*discovery.EndpointSlice) diff --git a/go-controller/pkg/ovn/controller/apbroute/master_controller.go b/go-controller/pkg/ovn/controller/apbroute/master_controller.go index ce87d84fab..73a56b22d7 100644 --- a/go-controller/pkg/ovn/controller/apbroute/master_controller.go +++ b/go-controller/pkg/ovn/controller/apbroute/master_controller.go @@ -239,6 +239,19 @@ func (c *ExternalGatewayMasterController) processNextPolicyWorkItem(wg *sync.Wai return true } +func (c *ExternalGatewayMasterController) GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(namespaceName string) (sets.Set[string], error) { + gwIPs, err := c.mgr.getDynamicGatewayIPsForTargetNamespace(namespaceName) + if err != nil { + return nil, err + } + tmpIPs, err := c.mgr.getStaticGatewayIPsForTargetNamespace(namespaceName) + if err != nil { + return nil, err + } + + return gwIPs.Union(tmpIPs), nil +} + func (c *ExternalGatewayMasterController) onPolicyAdd(obj interface{}) { _, ok := obj.(*adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute) if !ok { diff --git a/go-controller/pkg/ovn/default_network_controller.go b/go-controller/pkg/ovn/default_network_controller.go index a03f203bb1..dd12a32098 100644 --- a/go-controller/pkg/ovn/default_network_controller.go +++ b/go-controller/pkg/ovn/default_network_controller.go @@ -33,6 +33,7 @@ import ( kapi "k8s.io/api/core/v1" knet "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/util/wait" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -530,6 +531,17 @@ func (oc *DefaultNetworkController) Run(ctx context.Context) error { if err = oc.apbExternalRouteController.Run(oc.wg, 1); err != nil { return err } + // If interconnect is enabled and it is a multi-zone setup, then we flush conntrack + // on ovnkube-controller side and not on ovnkube-node side, since they are run in the + // same process. TODO(tssurya): In upstream ovnk, its possible to run these as different processes + // in which case this flushing feature is not supported. + if config.OVNKubernetesFeature.EnableInterconnect && oc.zone != ovntypes.OvnDefaultZone { + util.SetARPTimeout() + // every minute cleanup stale conntrack entries if any + go wait.Until(func() { + oc.checkAndDeleteStaleConntrackEntries() + }, time.Minute*1, oc.stopChan) + } } end := time.Since(start) diff --git a/go-controller/pkg/ovn/egressgw.go b/go-controller/pkg/ovn/egressgw.go index 73c1dd1e8e..9147c8f6d9 100644 --- a/go-controller/pkg/ovn/egressgw.go +++ b/go-controller/pkg/ovn/egressgw.go @@ -6,6 +6,7 @@ import ( "net" "regexp" "strings" + "sync" utilnet "k8s.io/utils/net" @@ -20,6 +21,8 @@ import ( "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util" "github.com/pkg/errors" + "github.com/vishvananda/netlink" + utilapierrors "k8s.io/apimachinery/pkg/util/errors" nettypes "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" @@ -89,6 +92,9 @@ func (oc *DefaultNetworkController) addPodExternalGWForNamespace(namespace strin for _, gwInfo := range nsInfo.routingExternalPodGWs { existingGWs.Insert(gwInfo.gws.UnsortedList()...) } + if config.OVNKubernetesFeature.EnableInterconnect && oc.zone != types.OvnDefaultZone { + existingGWs.Insert(nsInfo.routingExternalGWs.gws.UnsortedList()...) + } nsUnlock() klog.Infof("Adding routes for external gateway pod: %s, next hops: %q, namespace: %s, bfd-enabled: %t", @@ -98,12 +104,143 @@ func (oc *DefaultNetworkController) addPodExternalGWForNamespace(namespace strin return err } // add the exgw podIP to the namespace's k8s.ovn.org/external-gw-pod-ips list - if err := util.UpdateExternalGatewayPodIPsAnnotation(oc.kube, namespace, existingGWs.List()); err != nil { - klog.Errorf("Unable to update %s/%v annotation for namespace %s: %v", util.ExternalGatewayPodIPsAnnotation, existingGWs, namespace, err) + if !config.OVNKubernetesFeature.EnableInterconnect || oc.zone == types.OvnDefaultZone { + // If interconnect is disabled OR interconnect is running in single-zone-mode, + // the ovnkube-master is responsible for patching ICNI managed namespaces with + // "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush + // conntrack on every node. In multi-zone-interconnect case, we will handle the flushing + // directly on the ovnkube-controller code to avoid an extra namespace annotation + if err := util.UpdateExternalGatewayPodIPsAnnotation(oc.kube, namespace, existingGWs.List()); err != nil { + klog.Errorf("Unable to update %s/%v annotation for namespace %s: %v", util.ExternalGatewayPodIPsAnnotation, existingGWs, namespace, err) + } + } else { + // flush here since we know we have added an egressgw pod and we also know the full list of existing gatewayIPs + gatewayIPs, err := oc.apbExternalRouteController.GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(namespace) + if err != nil { + return fmt.Errorf("unable to retrieve gateway IPs for Admin Policy Based External Route objects: %w", err) + } + gatewayIPs = gatewayIPs.Insert(existingGWs.List()...) + err = oc.syncConntrackForExternalGateways(namespace, gatewayIPs) // best effort + if err != nil { + klog.Errorf("Syncing conntrack entries for egressGW pod %v serving the namespace %s failed: %v", + egress, namespace, err) + } } return nil } +func (oc *DefaultNetworkController) syncConntrackForExternalGateways(namespace string, gwIPsToKeep sets.Set[string]) error { + var wg sync.WaitGroup + wg.Add(len(gwIPsToKeep)) + validMACs := sync.Map{} + for gwIP := range gwIPsToKeep { + go func(gwIP string) { + defer wg.Done() + if len(gwIP) > 0 && !utilnet.IsIPv6String(gwIP) { + // TODO: Add support for IPv6 external gateways + if hwAddr, err := util.GetMACAddressFromARP(net.ParseIP(gwIP)); err != nil { + klog.Errorf("Failed to lookup hardware address for gatewayIP %s: %v", gwIP, err) + } else if len(hwAddr) > 0 { + // we need to reverse the mac before passing it to the conntrack filter since OVN saves the MAC in the following format + // +------------------------------------------------------------ + + // | 128 ... 112 ... 96 ... 80 ... 64 ... 48 ... 32 ... 16 ... 0| + // +------------------+-------+--------------------+-------------| + // | | UNUSED| MAC ADDRESS | UNUSED | + // +------------------+-------+--------------------+-------------+ + for i, j := 0, len(hwAddr)-1; i < j; i, j = i+1, j-1 { + hwAddr[i], hwAddr[j] = hwAddr[j], hwAddr[i] + } + validMACs.Store(gwIP, []byte(hwAddr)) + } + } + }(gwIP) + } + wg.Wait() + + validNextHopMACs := [][]byte{} + validMACs.Range(func(key interface{}, value interface{}) bool { + validNextHopMACs = append(validNextHopMACs, value.([]byte)) + return true + }) + // Handle corner case where there are 0 IPs on the annotations OR none of the ARPs were successful; i.e allowMACList={empty}. + // This means we *need to* pass a label > 128 bits that will not match on any conntrack entry labels for these pods. + // That way any remaining entries with labels having MACs set will get purged. + if len(validNextHopMACs) == 0 { + validNextHopMACs = append(validNextHopMACs, []byte("does-not-contain-anything")) + } + pods, err := oc.watchFactory.GetPods(namespace) + if err != nil { + return fmt.Errorf("unable to get pods from informer: %v", err) + } + + var errs []error + for _, pod := range pods { + pod := pod + // Since it's executed in ovnkube-controller only for multi-zone-ic the following hack of filtering + // local pods will work. Error will be treated as best-effort and ignored + if localPod, _ := oc.isPodInLocalZone(pod); !localPod { + continue + } + podIPs, err := util.GetPodIPsOfNetwork(pod, &util.DefaultNetInfo{}) + if err != nil && !errors.Is(err, util.ErrNoPodIPFound) { + errs = append(errs, fmt.Errorf("unable to fetch IP for pod %s/%s: %v", pod.Namespace, pod.Name, err)) + } + for _, podIP := range podIPs { // flush conntrack only for UDP + // for this pod, we check if the conntrack entry has a label that is not in the provided allowlist of MACs + // only caveat here is we assume egressGW served pods shouldn't have conntrack entries with other labels set + err := util.DeleteConntrack(podIP.String(), 0, kapi.ProtocolUDP, netlink.ConntrackOrigDstIP, validNextHopMACs) + if err != nil { + errs = append(errs, fmt.Errorf("failed to delete conntrack entry for pod %s: %v", podIP.String(), err)) + } + } + } + return utilapierrors.NewAggregate(errs) +} + +func (oc *DefaultNetworkController) checkAndDeleteStaleConntrackEntries() { + namespaces, err := oc.watchFactory.GetNamespaces() + if err != nil { + klog.Errorf("Unable to get pods from informer: %v", err) + return + } + for _, namespace := range namespaces { + // flush here since we know we have added an egressgw pod and we also know the full list of existing gatewayIPs + existingGWs, err := oc.apbExternalRouteController.GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(namespace.Name) + if err != nil { + klog.Errorf("Unable to retrieve gateway IPs for Admin Policy Based External Route objects for ns %s: %w", namespace.Name, err) + return + } + // by now the nsInfo cache must be repaired for this feature fully; + // however this introduces cache lock scale concern by doing this every minute + // versus previously this was done purely using annotations + nsInfo, nsUnlock, err := oc.ensureNamespaceLocked(namespace.Name, false, nil) + if err != nil { + klog.Errorf("Failed to ensure namespace %s locked: %v", namespace, err) + return + } + for _, gwInfo := range nsInfo.routingExternalPodGWs { + existingGWs.Insert(gwInfo.gws.UnsortedList()...) + } + existingGWs.Insert(nsInfo.routingExternalGWs.gws.UnsortedList()...) + nsUnlock() + if len(existingGWs) > 0 { + pods, err := oc.watchFactory.GetPods(namespace.Name) + if err != nil { + klog.Warningf("Unable to get pods from informer for namespace %s: %v", namespace.Name, err) + } + if len(pods) > 0 || err != nil { + // we only need to proceed if there is at least one pod in this namespace on this node + // OR if we couldn't fetch the pods for some reason at this juncture + err = oc.syncConntrackForExternalGateways(namespace.Name, existingGWs) + if err != nil { + klog.Errorf("Syncing conntrack entries for egressGWs %+v serving the namespace %s failed: %v", + existingGWs, namespace.Name, err) + } + } + } + } +} + // addExternalGWsForNamespace handles adding annotated gw routes to all pods in namespace // This should only be called with a lock on nsInfo func (oc *DefaultNetworkController) addExternalGWsForNamespace(egress gatewayInfo, nsInfo *namespaceInfo, namespace string) error { @@ -293,6 +430,9 @@ func (oc *DefaultNetworkController) deletePodGWRoutesForNamespace(pod *kapi.Pod, for _, gwInfo := range nsInfo.routingExternalPodGWs { existingGWs.Insert(gwInfo.gws.UnsortedList()...) } + if config.OVNKubernetesFeature.EnableInterconnect && oc.zone != types.OvnDefaultZone { + existingGWs.Insert(nsInfo.routingExternalGWs.gws.UnsortedList()...) + } nsUnlock() if !ok || len(foundGws.gws) == 0 { @@ -313,8 +453,27 @@ func (oc *DefaultNetworkController) deletePodGWRoutesForNamespace(pod *kapi.Pod, return fmt.Errorf("failed to delete GW routes for pod %s: %w", pod.Name, err) } // remove the exgw podIP from the namespace's k8s.ovn.org/external-gw-pod-ips list - if err := util.UpdateExternalGatewayPodIPsAnnotation(oc.kube, namespace, sets.List(existingGWs)); err != nil { - klog.Errorf("Unable to update %s/%v annotation for namespace %s: %v", util.ExternalGatewayPodIPsAnnotation, existingGWs, namespace, err) + if !config.OVNKubernetesFeature.EnableInterconnect || oc.zone == types.OvnDefaultZone { + // If interconnect is disabled OR interconnect is running in single-zone-mode, + // the ovnkube-master is responsible for patching ICNI managed namespaces with + // "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush + // conntrack on every node. In multi-zone-interconnect case, we will handle the flushing + // directly on the ovnkube-controller code to avoid an extra namespace annotation + if err := util.UpdateExternalGatewayPodIPsAnnotation(oc.kube, namespace, sets.List(existingGWs)); err != nil { + klog.Errorf("Unable to update %s/%v annotation for namespace %s: %v", util.ExternalGatewayPodIPsAnnotation, existingGWs, namespace, err) + } + } else { + // flush here since we know we have deleted an egressgw pod and we also know the full list of existing gatewayIPs + gatewayIPs, err := oc.apbExternalRouteController.GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(namespace) + if err != nil { + return fmt.Errorf("unable to retrieve gateway IPs for Admin Policy Based External Route objects: %w", err) + } + gatewayIPs = gatewayIPs.Insert(sets.List(existingGWs)...) + err = oc.syncConntrackForExternalGateways(namespace, gatewayIPs) // best effort + if err != nil { + klog.Errorf("Syncing conntrack entries for egressGWs %+v serving the namespace %s failed: %v", + gatewayIPs, namespace, err) + } } return nil } diff --git a/go-controller/pkg/ovn/namespace.go b/go-controller/pkg/ovn/namespace.go index 21d149b1d0..938bcc709e 100644 --- a/go-controller/pkg/ovn/namespace.go +++ b/go-controller/pkg/ovn/namespace.go @@ -210,6 +210,26 @@ func (oc *DefaultNetworkController) updateNamespace(old, newer *kapi.Namespace) } } } + if config.OVNKubernetesFeature.EnableInterconnect && oc.zone != types.OvnDefaultZone { + // If interconnect is disabled OR interconnect is running in single-zone-mode, + // the ovnkube-master is responsible for patching ICNI managed namespaces with + // "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush + // conntrack on every node. In multi-zone-interconnect case, we will handle the flushing + // directly on the ovnkube-controller code to avoid an extra namespace annotation + gatewayIPs, err := oc.apbExternalRouteController.GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(old.Name) + if err != nil { + return fmt.Errorf("unable to retrieve gateway IPs for Admin Policy Based External Route objects for namespace %s: %w", old.Name, err) + } + for _, gwInfo := range nsInfo.routingExternalPodGWs { + gatewayIPs.Insert(gwInfo.gws.UnsortedList()...) + } + gatewayIPs.Insert(nsInfo.routingExternalGWs.gws.UnsortedList()...) + err = oc.syncConntrackForExternalGateways(old.Name, gatewayIPs) // best effort + if err != nil { + klog.Errorf("Syncing conntrack entries for egressGWs %+v serving the namespace %s failed: %v", + gatewayIPs, old.Name, err) + } + } // if new annotation is empty, exgws were removed, may need to add SNAT per pod // check if there are any pod gateways serving this namespace as well if gwAnnotation == "" && len(nsInfo.routingExternalPodGWs) == 0 && config.Gateway.DisableSNATMultipleGWs { diff --git a/test/e2e/external_gateways.go b/test/e2e/external_gateways.go index 732984489b..c2d1d7fcef 100644 --- a/test/e2e/external_gateways.go +++ b/test/e2e/external_gateways.go @@ -512,12 +512,14 @@ var _ = ginkgo.Describe("External Gateway", func() { } // ensure the conntrack deletion tracker annotation is updated - ginkgo.By("Check if the k8s.ovn.org/external-gw-pod-ips got updated for the app namespace") - err := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) { - ns := getNamespace(f, f.Namespace.Name) - return (ns.Annotations[externalGatewayPodIPsAnnotation] == fmt.Sprintf("%s,%s", addresses.gatewayIPs[0], addresses.gatewayIPs[1])), nil - }) - framework.ExpectNoError(err, "Check if the k8s.ovn.org/external-gw-pod-ips got updated, failed: %v", err) + if !isInterconnectEnabled() { + ginkgo.By("Check if the k8s.ovn.org/external-gw-pod-ips got updated for the app namespace") + err := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) { + ns := getNamespace(f, f.Namespace.Name) + return (ns.Annotations[externalGatewayPodIPsAnnotation] == fmt.Sprintf("%s,%s", addresses.gatewayIPs[0], addresses.gatewayIPs[1])), nil + }) + framework.ExpectNoError(err, "Check if the k8s.ovn.org/external-gw-pod-ips got updated, failed: %v", err) + } setupIperf3Client := func(container, address string, port int) { // note iperf3 even when using udp also spawns tcp connection first; so we indirectly also have the tcp connection when using "-u" flag @@ -547,12 +549,14 @@ var _ = ginkgo.Describe("External Gateway", func() { annotatePodForGateway(gatewayPodName2, servingNamespace, "", addresses.gatewayIPs[1], false) // ensure the conntrack deletion tracker annotation is updated - ginkgo.By("Check if the k8s.ovn.org/external-gw-pod-ips got updated for the app namespace") - err = wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) { - ns := getNamespace(f, f.Namespace.Name) - return (ns.Annotations[externalGatewayPodIPsAnnotation] == addresses.gatewayIPs[0]), nil - }) - framework.ExpectNoError(err, "Check if the k8s.ovn.org/external-gw-pod-ips got updated, failed: %v", err) + if !isInterconnectEnabled() { + ginkgo.By("Check if the k8s.ovn.org/external-gw-pod-ips got updated for the app namespace") + err = wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) { + ns := getNamespace(f, f.Namespace.Name) + return (ns.Annotations[externalGatewayPodIPsAnnotation] == addresses.gatewayIPs[0]), nil + }) + framework.ExpectNoError(err, "Check if the k8s.ovn.org/external-gw-pod-ips got updated, failed: %v", err) + } ginkgo.By("Check if conntrack entries for ECMP routes are removed for the deleted external gateway if traffic is UDP") podConnEntriesWithMACLabelsSet = pokeConntrackEntries(nodeName, addresses.srcPodIP, protocol, macAddressGW) @@ -569,12 +573,14 @@ var _ = ginkgo.Describe("External Gateway", func() { annotatePodForGateway(gatewayPodName1, servingNamespace, "", addresses.gatewayIPs[0], false) // ensure the conntrack deletion tracker annotation is updated - ginkgo.By("Check if the k8s.ovn.org/external-gw-pod-ips got updated for the app namespace") - err = wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) { - ns := getNamespace(f, f.Namespace.Name) - return (ns.Annotations[externalGatewayPodIPsAnnotation] == ""), nil - }) - framework.ExpectNoError(err, "Check if the k8s.ovn.org/external-gw-pod-ips got updated, failed: %v", err) + if !isInterconnectEnabled() { + ginkgo.By("Check if the k8s.ovn.org/external-gw-pod-ips got updated for the app namespace") + err = wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) { + ns := getNamespace(f, f.Namespace.Name) + return (ns.Annotations[externalGatewayPodIPsAnnotation] == ""), nil + }) + framework.ExpectNoError(err, "Check if the k8s.ovn.org/external-gw-pod-ips got updated, failed: %v", err) + } ginkgo.By("Check if conntrack entries for ECMP routes are removed for the deleted external gateway if traffic is UDP") podConnEntriesWithMACLabelsSet = pokeConntrackEntries(nodeName, addresses.srcPodIP, protocol, macAddressGW) @@ -2205,12 +2211,14 @@ var _ = ginkgo.Describe("External Gateway", func() { } createAPBExternalRouteCRWithDynamicHop(defaultPolicyName, f.Namespace.Name, servingNamespace, false, addressesv4.gatewayIPs) // ensure the conntrack deletion tracker annotation is updated - ginkgo.By("Check if the k8s.ovn.org/external-gw-pod-ips got updated for the app namespace") - err := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) { - ns := getNamespace(f, f.Namespace.Name) - return (ns.Annotations[externalGatewayPodIPsAnnotation] == fmt.Sprintf("%s,%s", addresses.gatewayIPs[0], addresses.gatewayIPs[1])), nil - }) - framework.ExpectNoError(err, "Check if the k8s.ovn.org/external-gw-pod-ips got updated, failed: %v", err) + if !isInterconnectEnabled() { + ginkgo.By("Check if the k8s.ovn.org/external-gw-pod-ips got updated for the app namespace") + err := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) { + ns := getNamespace(f, f.Namespace.Name) + return (ns.Annotations[externalGatewayPodIPsAnnotation] == fmt.Sprintf("%s,%s", addresses.gatewayIPs[0], addresses.gatewayIPs[1])), nil + }) + framework.ExpectNoError(err, "Check if the k8s.ovn.org/external-gw-pod-ips got updated, failed: %v", err) + } annotatePodForGateway(gatewayPodName2, servingNamespace, "", addresses.gatewayIPs[1], false) annotatePodForGateway(gatewayPodName1, servingNamespace, "", addresses.gatewayIPs[0], false) From 49826e7229481b5b2bf19c87906493a538768c35 Mon Sep 17 00:00:00 2001 From: Patryk Diak Date: Thu, 28 Sep 2023 16:45:37 +0200 Subject: [PATCH 7/7] Run node certificate manager in hybrid overlay hybrid-overlay-node runs on windows nodes and should be able to set the same annotations as ovnkube-node. Signed-off-by: Patryk Diak (cherry picked from commit ac329f818509ca282a5791dd4a4751c1cf2bba84) --- go-controller/cmd/ovnkube/ovnkube.go | 2 +- .../hybrid-overlay-node.go | 26 +++++++++++++++++-- go-controller/pkg/util/kube.go | 14 +++++----- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/go-controller/cmd/ovnkube/ovnkube.go b/go-controller/cmd/ovnkube/ovnkube.go index 333a85c7a3..db137bcbbe 100644 --- a/go-controller/cmd/ovnkube/ovnkube.go +++ b/go-controller/cmd/ovnkube/ovnkube.go @@ -286,7 +286,7 @@ func startOvnKube(ctx *cli.Context, cancel context.CancelFunc) error { }() if config.Kubernetes.BootstrapKubeconfig != "" { - if err := util.StartNodeCertificateManager(ctx.Context, ovnKubeStartWg, &config.Kubernetes); err != nil { + if err := util.StartNodeCertificateManager(ctx.Context, ovnKubeStartWg, os.Getenv("K8S_NODE"), &config.Kubernetes); err != nil { return fmt.Errorf("failed to start the node certificate manager: %w", err) } } diff --git a/go-controller/hybrid-overlay/cmd/hybrid-overlay-node/hybrid-overlay-node.go b/go-controller/hybrid-overlay/cmd/hybrid-overlay-node/hybrid-overlay-node.go index a156458d80..c4ac74ad17 100644 --- a/go-controller/hybrid-overlay/cmd/hybrid-overlay-node/hybrid-overlay-node.go +++ b/go-controller/hybrid-overlay/cmd/hybrid-overlay-node/hybrid-overlay-node.go @@ -9,6 +9,7 @@ import ( "syscall" "github.com/urfave/cli/v2" + "k8s.io/client-go/tools/clientcmd" "github.com/ovn-org/ovn-kubernetes/go-controller/hybrid-overlay/pkg/controller" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" @@ -99,7 +100,29 @@ func runHybridOverlay(ctx *cli.Context) error { return fmt.Errorf("missing node name; use the 'node' flag to provide one") } - clientset, err := util.NewKubernetesClientset(&config.Kubernetes) + wg := &sync.WaitGroup{} + clientCfg := config.Kubernetes + if config.Kubernetes.BootstrapKubeconfig != "" { + if err := util.StartNodeCertificateManager(ctx.Context, wg, nodeName, &config.Kubernetes); err != nil { + return fmt.Errorf("failed to start the node certificate manager: %w", err) + } + + bootstrapConfig, err := clientcmd.BuildConfigFromFlags("", config.Kubernetes.BootstrapKubeconfig) + if err != nil { + return err + } + // Copy the APIServer and CAData from the bootstrap kubeconfig + clientCfg.APIServer = bootstrapConfig.Host + clientCfg.CAData = bootstrapConfig.CAData + if bootstrapConfig.CAFile != "" { + bytes, err := os.ReadFile(bootstrapConfig.CAFile) + if err != nil { + return err + } + clientCfg.CAData = bytes + } + } + clientset, err := util.NewKubernetesClientset(&clientCfg) if err != nil { return err } @@ -121,7 +144,6 @@ func runHybridOverlay(ctx *cli.Context) error { } f.Start(stopChan) - wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() diff --git a/go-controller/pkg/util/kube.go b/go-controller/pkg/util/kube.go index 218601b19e..057a699b28 100644 --- a/go-controller/pkg/util/kube.go +++ b/go-controller/pkg/util/kube.go @@ -8,7 +8,6 @@ import ( "fmt" "net" "os" - "path" "path/filepath" "runtime" "strings" @@ -208,8 +207,8 @@ func newKubernetesRestConfig(conf *config.KubernetesConfig) (*rest.Config, error // uses the current context in kubeconfig kconfig, err = clientcmd.BuildConfigFromFlags("", conf.Kubeconfig) } else if strings.HasPrefix(conf.APIServer, "https") { - if conf.Token == "" || len(conf.CAData) == 0 { - return nil, fmt.Errorf("TLS-secured apiservers require token and CA certificate") + if (conf.Token == "" && conf.CertDir == "") || len(conf.CAData) == 0 { + return nil, fmt.Errorf("TLS-secured apiservers require token/cert and CA certificate") } if _, err := cert.NewPoolFromBytes(conf.CAData); err != nil { return nil, err @@ -224,8 +223,8 @@ func newKubernetesRestConfig(conf *config.KubernetesConfig) (*rest.Config, error kconfig = &rest.Config{ Host: conf.APIServer, TLSClientConfig: rest.TLSClientConfig{ - KeyFile: path.Join(conf.CertDir, certNamePrefix+"-current.pem"), - CertFile: path.Join(conf.CertDir, certNamePrefix+"-current.pem"), + KeyFile: filepath.Join(conf.CertDir, certNamePrefix+"-current.pem"), + CertFile: filepath.Join(conf.CertDir, certNamePrefix+"-current.pem"), CAData: conf.CAData, }, } @@ -254,10 +253,9 @@ func newKubernetesRestConfig(conf *config.KubernetesConfig) (*rest.Config, error // StartNodeCertificateManager manages the creation and rotation of the node-specific client certificate. // When there is no existing certificate, it will use the BootstrapKubeconfig kubeconfig to create a CSR and it will // wait for the certificate before returning. -func StartNodeCertificateManager(ctx context.Context, wg *sync.WaitGroup, conf *config.KubernetesConfig) error { - nodeName := os.Getenv("K8S_NODE") +func StartNodeCertificateManager(ctx context.Context, wg *sync.WaitGroup, nodeName string, conf *config.KubernetesConfig) error { if nodeName == "" { - return fmt.Errorf("failed to get the node name required for the certificate from K8S_NODE env") + return fmt.Errorf("the provided node name cannot be empty") } defaultKConfig, err := newKubernetesRestConfig(conf) if err != nil {