Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds retry mechanism for Network Policy #2809

Merged
merged 1 commit into from Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions go-controller/pkg/factory/factory.go
Expand Up @@ -32,6 +32,7 @@ import (
v1coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
netlisters "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -586,6 +587,12 @@ func (wf *WatchFactory) GetNamespacesBySelector(labelSelector metav1.LabelSelect
return namespaceLister.List(selector)
}

// GetNetworkPolicy gets a specific network policy by the namespace/name
func (wf *WatchFactory) GetNetworkPolicy(namespace, name string) (*knet.NetworkPolicy, error) {
networkPolicyLister := wf.informers[policyType].lister.(netlisters.NetworkPolicyLister)
return networkPolicyLister.NetworkPolicies(namespace).Get(name)
}

func (wf *WatchFactory) NodeInformer() cache.SharedIndexInformer {
return wf.informers[nodeType].inf
}
Expand Down
14 changes: 12 additions & 2 deletions go-controller/pkg/ovn/namespace.go
Expand Up @@ -365,8 +365,18 @@ func (oc *Controller) deleteNamespace(ns *kapi.Namespace) {

klog.V(5).Infof("Deleting Namespace's NetworkPolicy entities")
for _, np := range nsInfo.networkPolicies {
delete(nsInfo.networkPolicies, np.name)
oc.destroyNetworkPolicy(np, nsInfo)
oc.checkAndSkipRetryPolicy(np.policy)
// add the full np object to the retry entry, since the namespace is going to be removed
// along with any mappings of nsInfo -> network policies
oc.initRetryPolicyWithDelete(np.policy, np)
isLastPolicyInNamespace := len(nsInfo.networkPolicies) == 1
if err := oc.destroyNetworkPolicy(np, isLastPolicyInNamespace); err != nil {
klog.Errorf("Failed to delete network policy: %s, error: %v", getPolicyNamespacedName(np.policy), err)
oc.unSkipRetryPolicy(np.policy)
} else {
oc.checkAndDeleteRetryPolicy(np.policy)
delete(nsInfo.networkPolicies, np.name)
trozet marked this conversation as resolved.
Show resolved Hide resolved
}
}
oc.deleteGWRoutesForNamespace(ns.Name)
oc.multicastDeleteNamespace(ns, nsInfo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we do the same for deleteMulticastAllowPolicy and return the error from the multicastDeleteNamespace function or is that kinda outa scope and we focus only on the main one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have no retry mechanism for multicast, or anything else that is explicitly controlled by namespace events. The multicast "policy" is in policy.go, but its not tied to network policy (I think someone just put it in the file cause of its title). We should make things retry when the namespace handler fails its operation as well, but it's outside of the scope of this PR.

Expand Down
73 changes: 69 additions & 4 deletions go-controller/pkg/ovn/ovn.go
Expand Up @@ -200,6 +200,14 @@ type Controller struct {
// channel to indicate we need to retry pods immediately
retryPodsChan chan struct{}

// Map of network policies that need to be retried, and the timestamp of when they last failed
// keyed by namespace/name
retryNetPolices map[string]*retryNetPolEntry
retryNetPolLock sync.Mutex

// channel to indicate we need to retry policy immediately
retryPolicyChan chan struct{}

metricsRecorder *metrics.ControlPlaneRecorder
}

Expand All @@ -211,6 +219,16 @@ type retryEntry struct {
ignore bool
}

type retryNetPolEntry struct {
newPolicy *kapisnetworking.NetworkPolicy
oldPolicy *kapisnetworking.NetworkPolicy
np *networkPolicy
timeStamp time.Time
backoffSec time.Duration
// whether to include this NP in retry iterations
ignore bool
}

const (
// TCP is the constant string for the string "TCP"
TCP = "TCP"
Expand Down Expand Up @@ -283,6 +301,8 @@ func NewOvnController(ovnClient *util.OVNClientset, wf *factory.WatchFactory, st
joinSwIPManager: nil,
retryPods: make(map[types.UID]*retryEntry),
retryPodsChan: make(chan struct{}, 1),
retryNetPolices: make(map[string]*retryNetPolEntry),
retryPolicyChan: make(chan struct{}, 1),
recorder: recorder,
nbClient: libovsdbOvnNBClient,
sbClient: libovsdbOvnSBClient,
Expand Down Expand Up @@ -741,23 +761,68 @@ func (oc *Controller) WatchPods() {
// WatchNetworkPolicy starts the watching of network policy resource and calls
// back the appropriate handler logic
func (oc *Controller) WatchNetworkPolicy() {
go func() {
// track the retryNetworkPolicies map and every 30 seconds check if any pods need to be retried
for {
select {
case <-time.After(30 * time.Second):
oc.iterateRetryNetworkPolicies(false)
case <-oc.retryPolicyChan:
oc.iterateRetryNetworkPolicies(true)
case <-oc.stopChan:
return
}
}
}()

start := time.Now()
oc.watchFactory.AddPolicyHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
policy := obj.(*kapisnetworking.NetworkPolicy)
oc.addNetworkPolicy(policy)
oc.initRetryPolicy(policy)
if err := oc.addNetworkPolicy(policy); err != nil {
klog.Errorf("Failed to create network policy %s, error: %v",
getPolicyNamespacedName(policy), err)
oc.unSkipRetryPolicy(policy)
return
}
oc.checkAndDeleteRetryPolicy(policy)
},
UpdateFunc: func(old, newer interface{}) {
oldPolicy := old.(*kapisnetworking.NetworkPolicy)
newPolicy := newer.(*kapisnetworking.NetworkPolicy)
if !reflect.DeepEqual(oldPolicy, newPolicy) {
oc.deleteNetworkPolicy(oldPolicy)
oc.addNetworkPolicy(newPolicy)
oc.checkAndSkipRetryPolicy(oldPolicy)
trozet marked this conversation as resolved.
Show resolved Hide resolved
if err := oc.deleteNetworkPolicy(oldPolicy, nil); err != nil {
oc.initRetryPolicyWithDelete(oldPolicy, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit complicated. specially since we have initRetryPolicyWithDelete and initRetryPolicy.... need to go back and forth between functions to see what they do...should we add a comment to this part to explain what we do so that 4 months from now we are good?

self-note-attached-for-when-I-come-back-to-this-PR-in-the-future
IMG_6231

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice diagram! Initially I made just a single "initRetryPolicy(old, new)" type of of function, but then I thought that was more confusing when reading the code in other places. Also, it made things weird about implying what a nil old, or new object means.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah for me it was confusing going back and forth between pod_retry code and ovn handler code but in the end I think I grasped the logic, its genius with all this skip and unskip :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dcbw idea not me :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I want any of this to be called genius...

oc.initRetryPolicy(newPolicy)
oc.unSkipRetryPolicy(oldPolicy)
klog.Errorf("Failed to delete network policy %s, during update: %v",
getPolicyNamespacedName(oldPolicy), err)
return
}
// remove the old policy from retry entry since it was correctly deleted
oc.removeDeleteFromRetryPolicy(oldPolicy)
if err := oc.addNetworkPolicy(newPolicy); err != nil {
oc.initRetryPolicy(newPolicy)
oc.unSkipRetryPolicy(newPolicy)
klog.Errorf("Failed to create network policy %s, during update: %v",
getPolicyNamespacedName(newPolicy), err)
return
}
oc.checkAndDeleteRetryPolicy(newPolicy)
}
},
DeleteFunc: func(obj interface{}) {
policy := obj.(*kapisnetworking.NetworkPolicy)
oc.deleteNetworkPolicy(policy)
oc.checkAndSkipRetryPolicy(policy)
oc.initRetryPolicyWithDelete(policy, nil)
if err := oc.deleteNetworkPolicy(policy, nil); err != nil {
oc.unSkipRetryPolicy(policy)
klog.Errorf("Failed to delete network policy %s, error: %v", getPolicyNamespacedName(policy), err)
return
}
oc.checkAndDeleteRetryPolicy(policy)
},
}, oc.syncNetworkPolicies)
klog.Infof("Bootstrapping existing policies and cleaning stale policies took %v", time.Since(start))
Expand Down
14 changes: 13 additions & 1 deletion go-controller/pkg/ovn/ovn_test.go
@@ -1,6 +1,7 @@
package ovn

import (
"context"
"github.com/onsi/gomega"
libovsdbclient "github.com/ovn-org/libovsdb/client"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
Expand All @@ -9,7 +10,6 @@ import (
libovsdbtest "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/testing/libovsdb"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
util "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -106,3 +106,15 @@ func (o *FakeOVN) init() {
o.controller.multicastSupport = true
o.controller.loadBalancerGroupUUID = types.ClusterLBGroupName + "-UUID"
}

func (o *FakeOVN) resetNBClient(ctx context.Context) {
if o.controller.nbClient.Connected() {
o.controller.nbClient.Close()
}
gomega.Eventually(o.controller.nbClient.Connected()).Should(gomega.BeFalse())
err := o.controller.nbClient.Connect(ctx)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Eventually(o.controller.nbClient.Connected()).Should(gomega.BeTrue())
_, err = o.controller.nbClient.MonitorAll(ctx)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}