Skip to content

Commit

Permalink
Merge pull request #1920 from tssurya/ds-merge-29th-september-2023
Browse files Browse the repository at this point in the history
OCPBUGS-19932: OCPBUGS-19931: DownStream Batch Merge Blocker Bug 29th september 2023
  • Loading branch information
openshift-merge-robot committed Sep 29, 2023
2 parents b6b3810 + 49826e7 commit 5366ede
Show file tree
Hide file tree
Showing 15 changed files with 619 additions and 450 deletions.
2 changes: 1 addition & 1 deletion go-controller/cmd/ovnkube/ovnkube.go
Expand Up @@ -286,7 +286,7 @@ func startOvnKube(ctx *cli.Context, cancel context.CancelFunc) error {
}()

if config.Kubernetes.BootstrapKubeconfig != "" {
if err := util.StartNodeCertificateManager(ctx.Context, ovnKubeStartWg, &config.Kubernetes); err != nil {
if err := util.StartNodeCertificateManager(ctx.Context, ovnKubeStartWg, os.Getenv("K8S_NODE"), &config.Kubernetes); err != nil {
return fmt.Errorf("failed to start the node certificate manager: %w", err)
}
}
Expand Down
Expand Up @@ -9,6 +9,7 @@ import (
"syscall"

"github.com/urfave/cli/v2"
"k8s.io/client-go/tools/clientcmd"

"github.com/ovn-org/ovn-kubernetes/go-controller/hybrid-overlay/pkg/controller"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
Expand Down Expand Up @@ -99,7 +100,29 @@ func runHybridOverlay(ctx *cli.Context) error {
return fmt.Errorf("missing node name; use the 'node' flag to provide one")
}

clientset, err := util.NewKubernetesClientset(&config.Kubernetes)
wg := &sync.WaitGroup{}
clientCfg := config.Kubernetes
if config.Kubernetes.BootstrapKubeconfig != "" {
if err := util.StartNodeCertificateManager(ctx.Context, wg, nodeName, &config.Kubernetes); err != nil {
return fmt.Errorf("failed to start the node certificate manager: %w", err)
}

bootstrapConfig, err := clientcmd.BuildConfigFromFlags("", config.Kubernetes.BootstrapKubeconfig)
if err != nil {
return err
}
// Copy the APIServer and CAData from the bootstrap kubeconfig
clientCfg.APIServer = bootstrapConfig.Host
clientCfg.CAData = bootstrapConfig.CAData
if bootstrapConfig.CAFile != "" {
bytes, err := os.ReadFile(bootstrapConfig.CAFile)
if err != nil {
return err
}
clientCfg.CAData = bytes
}
}
clientset, err := util.NewKubernetesClientset(&clientCfg)
if err != nil {
return err
}
Expand All @@ -121,7 +144,6 @@ func runHybridOverlay(ctx *cli.Context) error {
}

f.Start(stopChan)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
24 changes: 15 additions & 9 deletions go-controller/pkg/node/default_node_network_controller.go
Expand Up @@ -1035,7 +1035,6 @@ func (nc *DefaultNodeNetworkController) Start(ctx context.Context) error {
if config.HybridOverlay.Enabled {
// Not supported with DPUs, enforced in config
// TODO(adrianc): Revisit above comment
// TODO(jtanenba): LocalPodInformer informs on all pods
nodeController, err := honode.NewNode(
nc.Kube,
nc.name,
Expand Down Expand Up @@ -1080,15 +1079,22 @@ func (nc *DefaultNodeNetworkController) Start(ctx context.Context) error {
}

if config.OvnKubeNode.Mode != types.NodeModeDPUHost {
util.SetARPTimeout()
err := nc.WatchNamespaces()
if err != nil {
return fmt.Errorf("failed to watch namespaces: %w", err)
// If interconnect is disabled OR interconnect is running in single-zone-mode,
// the ovnkube-master is responsible for patching ICNI managed namespaces with
// "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush
// conntrack on every node. In multi-zone-interconnect case, we will handle the flushing
// directly on the ovnkube-controller code to avoid an extra namespace annotation
if !config.OVNKubernetesFeature.EnableInterconnect || sbZone == types.OvnDefaultZone {
util.SetARPTimeout()
err := nc.WatchNamespaces()
if err != nil {
return fmt.Errorf("failed to watch namespaces: %w", err)
}
// every minute cleanup stale conntrack entries if any
go wait.Until(func() {
nc.checkAndDeleteStaleConntrackEntries()
}, time.Minute*1, nc.stopChan)
}
// every minute cleanup stale conntrack entries if any
go wait.Until(func() {
nc.checkAndDeleteStaleConntrackEntries()
}, time.Minute*1, nc.stopChan)
err = nc.WatchEndpointSlices()
if err != nil {
return fmt.Errorf("failed to watch endpointSlices: %w", err)
Expand Down
19 changes: 17 additions & 2 deletions go-controller/pkg/node/obj_retry_node.go
Expand Up @@ -8,8 +8,11 @@ import (
discovery "k8s.io/api/discovery/v1"
cache "k8s.io/client-go/tools/cache"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/retry"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
)

type nodeEventHandler struct {
Expand Down Expand Up @@ -159,8 +162,20 @@ func (h *nodeEventHandler) AddResource(obj interface{}, fromRetryLoop bool) erro
func (h *nodeEventHandler) UpdateResource(oldObj, newObj interface{}, inRetryCache bool) error {
switch h.objType {
case factory.NamespaceExGwType:
newNs := newObj.(*kapi.Namespace)
return h.nc.syncConntrackForExternalGateways(newNs)
// If interconnect is disabled OR interconnect is running in single-zone-mode,
// the ovnkube-master is responsible for patching ICNI managed namespaces with
// "k8s.ovn.org/external-gw-pod-ips". In that case, we need ovnkube-node to flush
// conntrack on every node. In multi-zone-interconnect case, we will handle the flushing
// directly on the ovnkube-controller code to avoid an extra namespace annotation
node, err := h.nc.watchFactory.GetNode(h.nc.name)
if err != nil {
return fmt.Errorf("error retrieving node %s: %v", h.nc.name, err)
}
if !config.OVNKubernetesFeature.EnableInterconnect || util.GetNodeZone(node) == types.OvnDefaultZone {
newNs := newObj.(*kapi.Namespace)
return h.nc.syncConntrackForExternalGateways(newNs)
}
return nil

case factory.EndpointSliceForStaleConntrackRemovalType:
oldEndpointSlice := oldObj.(*discovery.EndpointSlice)
Expand Down
67 changes: 64 additions & 3 deletions go-controller/pkg/ovn/controller/apbroute/external_controller.go
Expand Up @@ -37,16 +37,77 @@ func newPodInfo() *podInfo {
}
}

type ExternalRouteInfo struct {
sync.Mutex
Deleted bool
type RouteInfo struct {
PodName ktypes.NamespacedName
// PodExternalRoutes is a cache keeping the LR routes added to the GRs when
// external gateways are used. The first map key is the podIP (src-ip of the route),
// the second the GW IP (next hop), and the third the GR name
PodExternalRoutes map[string]map[string]string
}

type ExternalGatewayRouteInfoCache struct {
// External gateway caches
routeInfos *syncmap.SyncMapComparableKey[ktypes.NamespacedName, *RouteInfo]
}

func NewExternalGatewayRouteInfoCache() *ExternalGatewayRouteInfoCache {
return &ExternalGatewayRouteInfoCache{
routeInfos: syncmap.NewSyncMapComparableKey[ktypes.NamespacedName, *RouteInfo](),
}
}

// CreateOrLoad provides a mechanism to initialize keys in the cache before calling the argument function `f`. This approach
// hides the logic to initialize and retrieval of the key's routeInfo and allows reusability by exposing a function signature as argument
// that has a routeInfo instance as argument. The function will attempt to retrieve the routeInfo for a given key,
// and create an empty routeInfo structure in the cache when not found. Then it will execute the function argument `f` passing
// the routeInfo as argument.
func (e *ExternalGatewayRouteInfoCache) CreateOrLoad(podName ktypes.NamespacedName, f func(routeInfo *RouteInfo) error) error {
return e.routeInfos.DoWithLock(podName, func(key ktypes.NamespacedName) error {
routeInfo := &RouteInfo{
PodExternalRoutes: make(map[string]map[string]string),
PodName: podName,
}
routeInfo, _ = e.routeInfos.LoadOrStore(key, routeInfo)
return f(routeInfo)
})
}

// Cleanup will lock the key `podName` and use the routeInfo associated to the key to pass it as an argument to function `f`.
// After the function `f` completes, it will delete any empty PodExternalRoutes references for each given podIP in the routeInfo object,
// as well as deleting the key itself if it contains no entries in its `PodExternalRoutes` map.
func (e *ExternalGatewayRouteInfoCache) Cleanup(podName ktypes.NamespacedName, f func(routeInfo *RouteInfo) error) error {
return e.routeInfos.DoWithLock(podName, func(key ktypes.NamespacedName) error {
routeInfo, loaded := e.routeInfos.Load(key)
if !loaded {
return nil
}
err := f(routeInfo)
for podIP, routes := range routeInfo.PodExternalRoutes {
if len(routes) == 0 {
delete(routeInfo.PodExternalRoutes, podIP)
}
}
if err == nil && len(routeInfo.PodExternalRoutes) == 0 {
e.routeInfos.Delete(key)
}
return err
})
}

// CleanupNamespace wraps the cleanup call for all the pods in a given namespace.
// The routeInfo reference for each pod in the given namespace is processed by the `f` function inside the `Cleanup` function
func (e *ExternalGatewayRouteInfoCache) CleanupNamespace(nsName string, f func(routeInfo *RouteInfo) error) error {
for _, podName := range e.routeInfos.GetKeys() {
if podName.Namespace == nsName {
err := e.Cleanup(podName, f)
if err != nil {
return err
}
}
}
return nil
}

// routePolicyState contains current policy state as it was applied.
// Since every config is applied to a pod, podInfo stores current state for every target pod.
type routePolicyState struct {
Expand Down
Expand Up @@ -280,6 +280,40 @@ var _ = Describe("OVN External Gateway pod", func() {
eventuallyExpectConfig(dynamicPolicy.Name, expectedPolicy1, expectedRefs1)
eventuallyExpectConfig(dynamicPolicyDiffTargetNS.Name, expectedPolicy2, expectedRefs2)
})

It("deletes a target pod that matches a policy and creates it again", func() {
initController([]runtime.Object{namespaceGW, namespaceTarget2, targetPod2, pod1},
[]runtime.Object{dynamicPolicy})

expectedPolicy1, expectedRefs1 := expectedPolicyStateAndRefs(
[]*namespaceWithPods{namespaceTarget2WithPod},
nil,
[]*namespaceWithPods{namespaceGWWithPod}, false)

eventuallyExpectNumberOfPolicies(1)
eventuallyExpectConfig(dynamicPolicy.Name, expectedPolicy1, expectedRefs1)

By("delete one of the target pods")
deletePod(targetPod2, fakeClient)
expectedPolicy1, expectedRefs1 = expectedPolicyStateAndRefs(
[]*namespaceWithPods{{nsName: targetNamespaceName2}},
nil,
[]*namespaceWithPods{namespaceGWWithPod}, false)

eventuallyExpectNumberOfPolicies(1)
eventuallyExpectConfig(dynamicPolicy.Name, expectedPolicy1, expectedRefs1)

By("create the deleted target pod")

createPod(targetPod2, fakeClient)
expectedPolicy1, expectedRefs1 = expectedPolicyStateAndRefs(
[]*namespaceWithPods{namespaceTarget2WithPod},
nil,
[]*namespaceWithPods{namespaceGWWithPod}, false)

eventuallyExpectNumberOfPolicies(1)
eventuallyExpectConfig(dynamicPolicy.Name, expectedPolicy1, expectedRefs1)
})
})

var _ = Context("When updating a pod", func() {
Expand Down
53 changes: 30 additions & 23 deletions go-controller/pkg/ovn/controller/apbroute/master_controller.go
Expand Up @@ -15,7 +15,6 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ktypes "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -38,10 +37,12 @@ import (
)

const (
maxRetries = 15
resyncInterval = 0
maxRetries = 15
)

// Admin Policy Based Route services

type ExternalGatewayMasterController struct {
client kubernetes.Interface
apbRoutePolicyClient adminpolicybasedrouteclient.Interface
Expand All @@ -62,13 +63,9 @@ type ExternalGatewayMasterController struct {
namespaceLister corev1listers.NamespaceLister
namespaceInformer cache.SharedIndexInformer

// External gateway caches
// Make them public so that they can be used by the annotation logic to lock on namespaces and share the same external route information
ExternalGWCache map[ktypes.NamespacedName]*ExternalRouteInfo
ExGWCacheMutex *sync.RWMutex

mgr *externalPolicyManager
nbClient *northBoundClient
mgr *externalPolicyManager
nbClient *northBoundClient
ExternalGWRouteInfoCache *ExternalGatewayRouteInfoCache
}

func NewExternalMasterController(
Expand All @@ -84,22 +81,20 @@ func NewExternalMasterController(
controllerName string,
) (*ExternalGatewayMasterController, error) {

externalGWCache := make(map[ktypes.NamespacedName]*ExternalRouteInfo)
exGWCacheMutex := &sync.RWMutex{}
externalGWRouteInfo := NewExternalGatewayRouteInfoCache()
zone, err := libovsdbutil.GetNBZone(nbClient)
if err != nil {
return nil, err
}
nbCli := &northBoundClient{
routeLister: apbRouteInformer.Lister(),
nodeLister: nodeLister,
podLister: podInformer.Lister(),
nbClient: nbClient,
addressSetFactory: addressSetFactory,
externalGWCache: externalGWCache,
exGWCacheMutex: exGWCacheMutex,
controllerName: controllerName,
zone: zone,
routeLister: apbRouteInformer.Lister(),
nodeLister: nodeLister,
podLister: podInformer.Lister(),
nbClient: nbClient,
addressSetFactory: addressSetFactory,
controllerName: controllerName,
zone: zone,
externalGatewayRouteInfo: externalGWRouteInfo,
}

c := &ExternalGatewayMasterController{
Expand All @@ -124,9 +119,8 @@ func NewExternalMasterController(
workqueue.NewItemFastSlowRateLimiter(time.Second, 5*time.Second, 5),
"apbexternalroutenamespaces",
),
ExternalGWCache: externalGWCache,
ExGWCacheMutex: exGWCacheMutex,
nbClient: nbCli,
ExternalGWRouteInfoCache: externalGWRouteInfo,
nbClient: nbCli,
mgr: newExternalPolicyManager(
stopCh,
podInformer.Lister(),
Expand Down Expand Up @@ -245,6 +239,19 @@ func (c *ExternalGatewayMasterController) processNextPolicyWorkItem(wg *sync.Wai
return true
}

func (c *ExternalGatewayMasterController) GetAdminPolicyBasedExternalRouteIPsForTargetNamespace(namespaceName string) (sets.Set[string], error) {
gwIPs, err := c.mgr.getDynamicGatewayIPsForTargetNamespace(namespaceName)
if err != nil {
return nil, err
}
tmpIPs, err := c.mgr.getStaticGatewayIPsForTargetNamespace(namespaceName)
if err != nil {
return nil, err
}

return gwIPs.Union(tmpIPs), nil
}

func (c *ExternalGatewayMasterController) onPolicyAdd(obj interface{}) {
_, ok := obj.(*adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute)
if !ok {
Expand Down

0 comments on commit 5366ede

Please sign in to comment.