Skip to content

Commit

Permalink
Replace child stop channels with cancelable contexts
Browse files Browse the repository at this point in the history
Commit 5d6b136 added child stop channels to stop the network policy
handlers independently from the network controller when the policy is
deleted while also stopping them if the network controller is stopped.

Unfortunately when both things happen at the same time, one of those
events will end up attempting to close a closed channel which will panic.

Introduce a CancelableContext utility that will wrap a cancelable context
that can be chained to achieve the same effect.

Signed-off-by: Jaime Caamaño Ruiz <jcaamano@redhat.com>
  • Loading branch information
jcaamano committed Jul 19, 2023
1 parent 8c2c616 commit 30f005f
Show file tree
Hide file tree
Showing 13 changed files with 71 additions and 48 deletions.
5 changes: 5 additions & 0 deletions go-controller/pkg/ovn/base_network_controller.go
Expand Up @@ -141,6 +141,11 @@ type BaseNetworkController struct {
// waitGroup per-Controller
wg *sync.WaitGroup

// some downstream components need to stop on their own or when the network
// controller is stopped
// use a chain of cancelable contexts for this
cancelableCtx util.CancelableContext

// List of nodes which belong to the local zone (stored as a sync map)
// If the map is nil, it means the controller is not tracking the node events
// and all the nodes are considered as local zone nodes.
Expand Down
17 changes: 9 additions & 8 deletions go-controller/pkg/ovn/base_network_controller_policy.go
Expand Up @@ -185,7 +185,7 @@ type networkPolicy struct {
// Use networkPolicy.RLock to read this field and hold it for the whole event handling.
deleted bool

stopChan chan struct{}
cancelableContext *util.CancelableContext
}

func NewNetworkPolicy(policy *knet.NetworkPolicy) *networkPolicy {
Expand Down Expand Up @@ -823,7 +823,7 @@ func (bnc *BaseNetworkController) addLocalPodHandler(policy *knet.NetworkPolicy,
&NetworkPolicyExtraParameters{
np: np,
},
np.stopChan)
np.cancelableContext.Done())

podHandler, err := retryLocalPods.WatchResourceFiltered(policy.Namespace, sel)
if err != nil {
Expand Down Expand Up @@ -1019,8 +1019,9 @@ func (bnc *BaseNetworkController) createNetworkPolicy(policy *knet.NetworkPolicy
np.Unlock()
npLocked = false

if np.stopChan == nil {
np.stopChan = util.GetChildStopChan(bnc.stopChan)
if np.cancelableContext == nil {
cancelableContext := util.NewCancelableContextChild(bnc.cancelableCtx)
np.cancelableContext = &cancelableContext
}

// 6. Start peer handlers to update all allow rules first
Expand Down Expand Up @@ -1450,7 +1451,7 @@ func (bnc *BaseNetworkController) addPeerNamespaceHandler(
factory.PeerNamespaceSelectorType,
syncFunc,
&NetworkPolicyExtraParameters{gp: gress, np: np},
np.stopChan,
np.cancelableContext.Done(),
)

namespaceHandler, err := retryPeerNamespaces.WatchResourceFiltered("", sel)
Expand All @@ -1464,9 +1465,9 @@ func (bnc *BaseNetworkController) addPeerNamespaceHandler(
}

func (bnc *BaseNetworkController) shutdownHandlers(np *networkPolicy) {
if np.stopChan != nil {
close(np.stopChan)
np.stopChan = nil
if np.cancelableContext != nil {
np.cancelableContext.Cancel()
np.cancelableContext = nil
}

if np.localPodHandler != nil {
Expand Down
Expand Up @@ -186,6 +186,7 @@ func (oc *BaseSecondaryLayer2NetworkController) newRetryFramework(
func (oc *BaseSecondaryLayer2NetworkController) stop() {
klog.Infof("Stop secondary %s network controller of network %s", oc.TopologyType(), oc.GetNetworkName())
close(oc.stopChan)
oc.cancelableCtx.Cancel()
oc.wg.Wait()

if oc.policyHandler != nil {
Expand Down
2 changes: 2 additions & 0 deletions go-controller/pkg/ovn/default_network_controller.go
Expand Up @@ -199,6 +199,7 @@ func newDefaultNetworkControllerCommon(cnci *CommonNetworkControllerInfo,
wg: defaultWg,
localZoneNodes: &sync.Map{},
zoneICHandler: zoneICHandler,
cancelableCtx: util.NewCancelableContext(),
},
externalGWCache: apbExternalRouteController.ExternalGWCache,
exGWCacheMutex: apbExternalRouteController.ExGWCacheMutex,
Expand Down Expand Up @@ -322,6 +323,7 @@ func (oc *DefaultNetworkController) Start(ctx context.Context) error {
// Stop gracefully stops the controller
func (oc *DefaultNetworkController) Stop() {
close(oc.stopChan)
oc.cancelableCtx.Cancel()
oc.wg.Wait()
}

Expand Down
1 change: 1 addition & 0 deletions go-controller/pkg/ovn/ovn_test.go
Expand Up @@ -146,6 +146,7 @@ func (o *FakeOVN) startWithDBSetup(dbSetup libovsdbtest.TestSetup, objects ...ru
func (o *FakeOVN) shutdown() {
o.watcher.Shutdown()
close(o.stopChan)
o.controller.cancelableCtx.Cancel()
o.wg.Wait()
o.egressQoSWg.Wait()
o.egressSVCWg.Wait()
Expand Down
21 changes: 11 additions & 10 deletions go-controller/pkg/ovn/pod_selector_address_set.go
Expand Up @@ -54,7 +54,7 @@ type PodSelectorAddressSet struct {
// handlerResources holds the data that is used and updated by the handlers.
handlerResources *PodSelectorAddrSetHandlerInfo

stopChan chan struct{}
cancelableContext *util.CancelableContext
}

// EnsurePodSelectorAddressSet returns address set for requested (podSelector, namespaceSelector, namespace).
Expand Down Expand Up @@ -162,8 +162,9 @@ func (bnc *BaseNetworkController) DeletePodSelectorAddressSet(addrSetKey, backRe

func (psas *PodSelectorAddressSet) init(bnc *BaseNetworkController) error {
// create pod handler resources before starting the handlers
if psas.stopChan == nil {
psas.stopChan = util.GetChildStopChan(bnc.stopChan)
if psas.cancelableContext == nil {
cancelableContext := util.NewCancelableContextChild(bnc.cancelableCtx)
psas.cancelableContext = &cancelableContext
}
if psas.handlerResources == nil {
as, err := bnc.addressSetFactory.NewAddressSet(psas.addrSetDbIDs, nil)
Expand All @@ -180,7 +181,7 @@ func (psas *PodSelectorAddressSet) init(bnc *BaseNetworkController) error {
netInfo: bnc.NetInfo,
ipv4Mode: ipv4Mode,
ipv6Mode: ipv6Mode,
stopChan: psas.stopChan,
stopChan: psas.cancelableContext.Done(),
}
}

Expand Down Expand Up @@ -217,9 +218,9 @@ func (psas *PodSelectorAddressSet) init(bnc *BaseNetworkController) error {

func (psas *PodSelectorAddressSet) destroy(bnc *BaseNetworkController) error {
klog.Infof("Deleting shared address set for pod selector %s", psas.key)
if psas.stopChan != nil {
close(psas.stopChan)
psas.stopChan = nil
if psas.cancelableContext != nil {
psas.cancelableContext.Cancel()
psas.cancelableContext = nil
}

psas.needsCleanup = true
Expand Down Expand Up @@ -254,7 +255,7 @@ func (bnc *BaseNetworkController) addPodSelectorHandler(psAddrSet *PodSelectorAd
factory.AddressSetPodSelectorType,
syncFunc,
podHandlerResources,
psAddrSet.stopChan)
psAddrSet.cancelableContext.Done())

podHandler, err := retryFramework.WatchResourceFiltered(namespace, podSelector)
if err != nil {
Expand All @@ -276,7 +277,7 @@ func (bnc *BaseNetworkController) addNamespacedPodSelectorHandler(psAddrSet *Pod
factory.AddressSetNamespaceAndPodSelectorType,
nil,
psAddrSet.handlerResources,
psAddrSet.stopChan,
psAddrSet.cancelableContext.Done(),
)
namespaceHandler, err := retryFramework.WatchResourceFiltered("", psAddrSet.namespaceSelector)
if err != nil {
Expand Down Expand Up @@ -321,7 +322,7 @@ type PodSelectorAddrSetHandlerInfo struct {
ipv4Mode bool
ipv6Mode bool

stopChan chan struct{}
stopChan <-chan struct{}
}

// idempotent
Expand Down
3 changes: 1 addition & 2 deletions go-controller/pkg/ovn/pod_selector_address_set_test.go
Expand Up @@ -485,13 +485,12 @@ var _ = ginkgo.Describe("OVN PodSelectorAddressSet", func() {
// namespace selector will be run because it is not empty.
// one namespace should match the label and start a pod watchFactory.
// that gives us 2 retryFrameworks, so 2 periodicallyRetryResources goroutines.
// The request itself will create one child stopChannel, that is one more goroutine.
peerASKey, _, _, err := fakeOvn.controller.EnsurePodSelectorAddressSet(
selector, selector, namespaceName1, "backRef")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Eventually(func() int {
return runtime.NumGoroutine()
}).Should(gomega.Equal(goroutinesNumInit + 3))
}).Should(gomega.Equal(goroutinesNumInit + 2))

err = fakeOvn.controller.DeletePodSelectorAddressSet(peerASKey, "backRef")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
Expand Down
3 changes: 1 addition & 2 deletions go-controller/pkg/ovn/policy_test.go
Expand Up @@ -1886,13 +1886,12 @@ var _ = ginkgo.Describe("OVN NetworkPolicy Operations", func() {
fmt.Printf("goroutinesNumInit %v", goroutinesNumInit)
// network policy will create 1 watchFactory for local pods selector, and 1 peer namespace selector
// that gives us 2 retryFrameworks, so 2 periodicallyRetryResources goroutines.
// The networkPolicy itself will create one child stopChannel, that is one more goroutine.
_, err := fakeOvn.fakeClient.KubeClient.NetworkingV1().NetworkPolicies(networkPolicy.Namespace).
Create(context.TODO(), networkPolicy, metav1.CreateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Eventually(func() int {
return runtime.NumGoroutine()
}).Should(gomega.Equal(goroutinesNumInit + 3))
}).Should(gomega.Equal(goroutinesNumInit + 2))

// Delete network policy
err = fakeOvn.fakeClient.KubeClient.NetworkingV1().NetworkPolicies(networkPolicy.Namespace).
Expand Down
Expand Up @@ -55,6 +55,7 @@ func NewSecondaryLayer2NetworkController(cnci *CommonNetworkControllerInfo, netI
stopChan: stopChan,
wg: &sync.WaitGroup{},
localZoneNodes: &sync.Map{},
cancelableCtx: util.NewCancelableContext(),
},
},
},
Expand Down
2 changes: 2 additions & 0 deletions go-controller/pkg/ovn/secondary_layer3_network_controller.go
Expand Up @@ -258,6 +258,7 @@ func NewSecondaryLayer3NetworkController(cnci *CommonNetworkControllerInfo, netI
wg: &sync.WaitGroup{},
localZoneNodes: &sync.Map{},
zoneICHandler: zoneICHandler,
cancelableCtx: util.NewCancelableContext(),
},
},
addNodeFailed: sync.Map{},
Expand Down Expand Up @@ -331,6 +332,7 @@ func (oc *SecondaryLayer3NetworkController) Start(ctx context.Context) error {
func (oc *SecondaryLayer3NetworkController) Stop() {
klog.Infof("Stop secondary %s network controller of network %s", oc.TopologyType(), oc.GetNetworkName())
close(oc.stopChan)
oc.cancelableCtx.Cancel()
oc.wg.Wait()

if oc.policyHandler != nil {
Expand Down
Expand Up @@ -47,6 +47,7 @@ func NewSecondaryLocalnetNetworkController(cnci *CommonNetworkControllerInfo, ne
podSelectorAddressSets: syncmap.NewSyncMap[*PodSelectorAddressSet](),
stopChan: stopChan,
wg: &sync.WaitGroup{},
cancelableCtx: util.NewCancelableContext(),
},
},
},
Expand Down
36 changes: 36 additions & 0 deletions go-controller/pkg/util/context.go
@@ -0,0 +1,36 @@
package util

import "context"

// CancelableContext utility wraps a context that can be canceled
type CancelableContext struct {
ctx context.Context
cancel context.CancelFunc
}

// Done returns a channel that is closed when this or any parent context is
// canceled
func (ctx *CancelableContext) Done() <-chan struct{} {
return ctx.ctx.Done()
}

// Cancel this context
func (ctx *CancelableContext) Cancel() {
ctx.cancel()
}

func NewCancelableContext() CancelableContext {
return newCancelableContext(context.Background())
}

func NewCancelableContextChild(ctx CancelableContext) CancelableContext {
return newCancelableContext(ctx.ctx)
}

func newCancelableContext(ctx context.Context) CancelableContext {
ctx, cancel := context.WithCancel(ctx)
return CancelableContext{
ctx: ctx,
cancel: cancel,
}
}
26 changes: 0 additions & 26 deletions go-controller/pkg/util/sync.go
Expand Up @@ -8,32 +8,6 @@ import (
"k8s.io/client-go/tools/cache"
)

// GetChildStopChan returns a new channel that doesn't affect parentStopChan, but will be closed when
// parentStopChan is closed. May be used for child goroutines that may need to be stopped with the main goroutine or
// separately.
func GetChildStopChan(parentStopChan <-chan struct{}) chan struct{} {
childStopChan := make(chan struct{})

select {
case <-parentStopChan:
// parent is already canceled
close(childStopChan)
return childStopChan
default:
}

go func() {
select {
case <-parentStopChan:
close(childStopChan)
return
case <-childStopChan:
return
}
}()
return childStopChan
}

func GetChildStopChanWithTimeout(parentStopChan <-chan struct{}, duration time.Duration) chan struct{} {
childStopChan := make(chan struct{})
timer := time.NewTicker(duration)
Expand Down

0 comments on commit 30f005f

Please sign in to comment.