Skip to content

Commit

Permalink
netpolEventHandler: fix stopChan.
Browse files Browse the repository at this point in the history
We used to pass oc.stopChan to the retryFramework for
network policy handlers, but that means that retry loop
for failed objects will not be stopped on network policy
delete, therefore leaking goroutines.
Create getChildStopChan function to pass stop signal both
on oc and network policy delete.

Signed-off-by: Nadia Pinaeva <npinaeva@redhat.com>
  • Loading branch information
npinaeva authored and jcaamano committed Jul 10, 2023
1 parent ef362a8 commit 5d6b136
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 8 deletions.
17 changes: 16 additions & 1 deletion go-controller/pkg/ovn/base_network_controller_policy.go
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"

kapi "k8s.io/api/core/v1"
knet "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -182,6 +184,8 @@ type networkPolicy struct {
// or this value will be set to true and handler can't proceed.
// Use networkPolicy.RLock to read this field and hold it for the whole event handling.
deleted bool

stopChan chan struct{}
}

func NewNetworkPolicy(policy *knet.NetworkPolicy) *networkPolicy {
Expand Down Expand Up @@ -818,7 +822,8 @@ func (bnc *BaseNetworkController) addLocalPodHandler(policy *knet.NetworkPolicy,
syncFunc,
&NetworkPolicyExtraParameters{
np: np,
})
},
np.stopChan)

podHandler, err := retryLocalPods.WatchResourceFiltered(policy.Namespace, sel)
if err != nil {
Expand Down Expand Up @@ -1014,6 +1019,10 @@ func (bnc *BaseNetworkController) createNetworkPolicy(policy *knet.NetworkPolicy
np.Unlock()
npLocked = false

if np.stopChan == nil {
np.stopChan = util.GetChildStopChan(bnc.stopChan)
}

// 6. Start peer handlers to update all allow rules first
for _, handler := range policyHandlers {
// For each peer namespace selector, we create a watcher that
Expand Down Expand Up @@ -1440,6 +1449,7 @@ func (bnc *BaseNetworkController) addPeerNamespaceHandler(
factory.PeerNamespaceSelectorType,
syncFunc,
&NetworkPolicyExtraParameters{gp: gress, np: np},
np.stopChan,
)

namespaceHandler, err := retryPeerNamespaces.WatchResourceFiltered("", sel)
Expand All @@ -1453,6 +1463,11 @@ func (bnc *BaseNetworkController) addPeerNamespaceHandler(
}

func (bnc *BaseNetworkController) shutdownHandlers(np *networkPolicy) {
if np.stopChan != nil {
close(np.stopChan)
np.stopChan = nil
}

if np.localPodHandler != nil {
bnc.watchFactory.RemovePodHandler(np.localPodHandler)
np.localPodHandler = nil
Expand Down
Expand Up @@ -2,11 +2,10 @@ package ovn

import (
"fmt"
"reflect"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/retry"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
"reflect"

kapi "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
Expand All @@ -27,7 +26,8 @@ import (
func (bnc *BaseNetworkController) newNetpolRetryFramework(
objectType reflect.Type,
syncFunc func([]interface{}) error,
extraParameters interface{}) *retry.RetryFramework {
extraParameters interface{},
stopChan <-chan struct{}) *retry.RetryFramework {
eventHandler := &networkControllerPolicyEventHandler{
objType: objectType,
watchFactory: bnc.watchFactory,
Expand All @@ -42,7 +42,7 @@ func (bnc *BaseNetworkController) newNetpolRetryFramework(
EventHandler: eventHandler,
}
return retry.NewRetryFramework(
bnc.stopChan,
stopChan,
bnc.wg,
bnc.watchFactory,
resourceHandler,
Expand Down
18 changes: 17 additions & 1 deletion go-controller/pkg/ovn/pod_selector_address_set.go
Expand Up @@ -52,6 +52,8 @@ type PodSelectorAddressSet struct {

// handlerResources holds the data that is used and updated by the handlers.
handlerResources *PodSelectorAddrSetHandlerInfo

stopChan chan struct{}
}

// EnsurePodSelectorAddressSet returns address set for requested (podSelector, namespaceSelector, namespace).
Expand Down Expand Up @@ -159,6 +161,9 @@ func (bnc *BaseNetworkController) DeletePodSelectorAddressSet(addrSetKey, backRe

func (psas *PodSelectorAddressSet) init(bnc *BaseNetworkController) error {
// create pod handler resources before starting the handlers
if psas.stopChan == nil {
psas.stopChan = util.GetChildStopChan(bnc.stopChan)
}
if psas.handlerResources == nil {
as, err := bnc.addressSetFactory.NewAddressSet(psas.addrSetDbIDs, nil)
if err != nil {
Expand All @@ -174,6 +179,7 @@ func (psas *PodSelectorAddressSet) init(bnc *BaseNetworkController) error {
netInfo: bnc.NetInfo,
ipv4Mode: ipv4Mode,
ipv6Mode: ipv6Mode,
stopChan: psas.stopChan,
}
}

Expand Down Expand Up @@ -210,6 +216,11 @@ func (psas *PodSelectorAddressSet) init(bnc *BaseNetworkController) error {

func (psas *PodSelectorAddressSet) destroy(bnc *BaseNetworkController) error {
klog.Infof("Deleting shared address set for pod selector %s", psas.key)
if psas.stopChan != nil {
close(psas.stopChan)
psas.stopChan = nil
}

psas.needsCleanup = true
if psas.handlerResources != nil {
err := psas.handlerResources.destroy(bnc)
Expand Down Expand Up @@ -241,7 +252,8 @@ func (bnc *BaseNetworkController) addPodSelectorHandler(psAddrSet *PodSelectorAd
retryFramework := bnc.newNetpolRetryFramework(
factory.AddressSetPodSelectorType,
syncFunc,
podHandlerResources)
podHandlerResources,
psAddrSet.stopChan)

podHandler, err := retryFramework.WatchResourceFiltered(namespace, podSelector)
if err != nil {
Expand All @@ -263,6 +275,7 @@ func (bnc *BaseNetworkController) addNamespacedPodSelectorHandler(psAddrSet *Pod
factory.AddressSetNamespaceAndPodSelectorType,
nil,
psAddrSet.handlerResources,
psAddrSet.stopChan,
)
namespaceHandler, err := retryFramework.WatchResourceFiltered("", psAddrSet.namespaceSelector)
if err != nil {
Expand Down Expand Up @@ -306,6 +319,8 @@ type PodSelectorAddrSetHandlerInfo struct {
netInfo util.NetInfo
ipv4Mode bool
ipv6Mode bool

stopChan chan struct{}
}

// idempotent
Expand Down Expand Up @@ -540,6 +555,7 @@ func (bnc *BaseNetworkController) handleNamespaceAddUpdate(podHandlerInfo *PodSe
factory.AddressSetPodSelectorType,
syncFunc,
podHandlerInfo,
podHandlerInfo.stopChan,
)
// syncFunc and factory.AddressSetPodSelectorType add event handler also take np.RLock,
// and will be called form the same thread. The same thread shouldn't take the same rlock twice.
Expand Down
31 changes: 29 additions & 2 deletions go-controller/pkg/ovn/pod_selector_address_set_test.go
Expand Up @@ -13,6 +13,7 @@ import (
libovsdbtest "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/testing/libovsdb"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"net"
"runtime"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -141,10 +142,9 @@ var _ = ginkgo.Describe("OVN PodSelectorAddressSet", func() {
})
ginkgo.It("creates one address set for multiple users with the same selector", func() {
namespace1 := *newNamespace(namespaceName1)
namespace2 := *newNamespace(namespaceName2)
networkPolicy1 := getMatchLabelsNetworkPolicy(netPolicyName1, namespace1.Name,
"", "label1", true, true)
networkPolicy2 := getMatchLabelsNetworkPolicy(netPolicyName2, namespace2.Name,
networkPolicy2 := getMatchLabelsNetworkPolicy(netPolicyName2, namespace1.Name,
"", "label1", true, true)
startOvn(initialDB, []v1.Namespace{namespace1}, []knet.NetworkPolicy{*networkPolicy1, *networkPolicy2},
nil, nil)
Expand Down Expand Up @@ -468,6 +468,33 @@ var _ = ginkgo.Describe("OVN PodSelectorAddressSet", func() {
// should not be present in given address set
eventuallyExpectEmptyAddressSetsExist(fakeOvn, peer, namespace1.Name)
})
ginkgo.It("cleans up retryFramework resources", func() {
namespace1 := *newNamespace(namespaceName1)
namespace1.Labels = map[string]string{"key": "value"}
startOvn(initialDB, []v1.Namespace{namespace1}, nil, nil, nil)
selector := &metav1.LabelSelector{
MatchLabels: map[string]string{"key": "value"},
}

goroutinesNumInit := runtime.NumGoroutine()
// namespace selector will be run because it is not empty.
// one namespace should match the label and start a pod watchFactory.
// that gives us 2 retryFrameworks, so 2 periodicallyRetryResources goroutines.
// The request itself will create one child stopChannel, that is one more goroutine.
peerASKey, _, _, err := fakeOvn.controller.EnsurePodSelectorAddressSet(
selector, selector, namespaceName1, "backRef")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Eventually(func() int {
return runtime.NumGoroutine()
}).Should(gomega.Equal(goroutinesNumInit + 3))

err = fakeOvn.controller.DeletePodSelectorAddressSet(peerASKey, "backRef")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// expect goroutines number to get back
gomega.Eventually(func() int {
return runtime.NumGoroutine()
}).Should(gomega.Equal(goroutinesNumInit))
})
})

var _ = ginkgo.Describe("shortLabelSelectorString function", func() {
Expand Down
42 changes: 42 additions & 0 deletions go-controller/pkg/ovn/policy_test.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"runtime"
"sort"
"strconv"
"time"
Expand Down Expand Up @@ -1853,6 +1854,47 @@ var _ = ginkgo.Describe("OVN NetworkPolicy Operations", func() {
}
gomega.Expect(app.Run([]string{app.Name})).To(gomega.Succeed())
})

ginkgo.It("cleans up retryFramework resources", func() {
app.Action = func(ctx *cli.Context) error {
namespace1 := *newNamespace(namespaceName1)
namespace1.Labels = map[string]string{"name": "label1"}
networkPolicy := newNetworkPolicy(netPolicyName2, namespace1.Name, metav1.LabelSelector{},
[]knet.NetworkPolicyIngressRule{{
From: []knet.NetworkPolicyPeer{{
NamespaceSelector: &metav1.LabelSelector{
MatchLabels: namespace1.Labels,
}},
}},
}, nil)
startOvn(initialDB, []v1.Namespace{namespace1}, nil, nil, nil)

goroutinesNumInit := runtime.NumGoroutine()
fmt.Printf("goroutinesNumInit %v", goroutinesNumInit)
// network policy will create 1 watchFactory for local pods selector, and 1 peer namespace selector
// that gives us 2 retryFrameworks, so 2 periodicallyRetryResources goroutines.
// The networkPolicy itself will create one child stopChannel, that is one more goroutine.
_, err := fakeOvn.fakeClient.KubeClient.NetworkingV1().NetworkPolicies(networkPolicy.Namespace).
Create(context.TODO(), networkPolicy, metav1.CreateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Eventually(func() int {
return runtime.NumGoroutine()
}).Should(gomega.Equal(goroutinesNumInit + 3))

// Delete network policy
err = fakeOvn.fakeClient.KubeClient.NetworkingV1().NetworkPolicies(networkPolicy.Namespace).
Delete(context.TODO(), networkPolicy.Name, metav1.DeleteOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// expect goroutines number to get back
gomega.Eventually(func() int {
return runtime.NumGoroutine()
}).Should(gomega.Equal(goroutinesNumInit))

return nil
}

gomega.Expect(app.Run([]string{app.Name})).To(gomega.Succeed())
})
})

ginkgo.Context("ACL logging for network policies", func() {
Expand Down
27 changes: 27 additions & 0 deletions go-controller/pkg/util/sync.go
@@ -0,0 +1,27 @@
package util

// GetChildStopChan returns a new channel that doesn't affect parentStopChan, but will be closed when
// parentStopChan is closed. May be used for child goroutines that may need to be stopped with the main goroutine or
// separately.
func GetChildStopChan(parentStopChan <-chan struct{}) chan struct{} {
childStopChan := make(chan struct{})

select {
case <-parentStopChan:
// parent is already canceled
close(childStopChan)
return childStopChan
default:
}

go func() {
select {
case <-parentStopChan:
close(childStopChan)
return
case <-childStopChan:
return
}
}()
return childStopChan
}

0 comments on commit 5d6b136

Please sign in to comment.