Skip to content

Commit

Permalink
refactor retry to close watchFactory
Browse files Browse the repository at this point in the history
leverage stop channel on watch factory when periodic retry stopchannel is called

Problem: Hybrid test was creating creating an ovn-controller for some tests.
The tests could not use the waitGroup in the test so the afterEach function
(cleanup function after each test) could execute while leaving controllers up.
In the tests, we were leveraging the code in ovn.go to run the controllers
and creating a watcher pod on them. This issue was just a symptom of the main
issue which was that we were using the incorrect stopChannel in the retry logic.

Solution: The stopChannel that we use in the watchFactory is private, so we
created a function that when called will close the watcher stopChannel,
closing the watcher. We then call this function in the select
(basically a switch statement in go but for channels) to close the watcher,
because when we stop watching an object, that means we no longer care about it
and should stop retrying it.

This also adds a public waitgroup property to the watchfactory that we
are using to ensure that the retry loop has exited before we shutdown
the watcher to avoid any races there

Since we no longer use the stopChannel in the retryFramework, this also
removes that stopChannel from the newRetryFramework function definition,
and all its refrences (which at this time was just in obj_retry_master)

Signed-off-by: Ben Pickard <bpickard@redhat.com>
Closes: #3203

Remove stopChannel from retryFrameowork

We no longer use the controller stopChannel passed to the
retryFramework, as we use the watcher stopChannel, so we can remove it

Signed-off-by: Ben Pickard <bpickard@redhat.com>
  • Loading branch information
bpickard22 committed Oct 27, 2022
1 parent 6e00308 commit fc2c3b9
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
9 changes: 9 additions & 0 deletions go-controller/pkg/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package factory
import (
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -60,6 +61,7 @@ type WatchFactory struct {
informers map[reflect.Type]*informer

stopChan chan struct{}
Wg sync.WaitGroup
}

// WatchFactory implements the ObjectCacheInterface interface.
Expand Down Expand Up @@ -346,13 +348,20 @@ func NewNodeWatchFactory(ovnClientset *util.OVNClientset, nodeName string) (*Wat
return wf, nil
}

func (wf *WatchFactory) WaitForWatchFactoryStopChannel(stopChan chan struct{}) {
<-wf.stopChan
close(stopChan)
}

func (wf *WatchFactory) Shutdown() {
close(wf.stopChan)

// Remove all informer handlers
for _, inf := range wf.informers {
inf.shutdown()
}

wf.Wg.Wait() //waiting for periodicallyRetry to return
}

func getObjectMeta(objType reflect.Type, obj interface{}) (*metav1.ObjectMeta, error) {
Expand Down
1 change: 0 additions & 1 deletion go-controller/pkg/ovn/obj_retry_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func (oc *Controller) newRetryFrameworkMasterWithParameters(
},
}
r := retry.NewRetryFramework(
oc.stopChan,
oc.watchFactory,
resourceHandler,
)
Expand Down
11 changes: 7 additions & 4 deletions go-controller/pkg/retry/obj_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ type RetryFramework struct {
retryChan chan struct{}

watchFactory *factory.WatchFactory
StopChan <-chan struct{}
ResourceHandler *ResourceHandler
}

Expand All @@ -86,14 +85,12 @@ type RetryFramework struct {
// ovnk node) will have to override the functions in the returned struct with the desired
// per-resource logic.
func NewRetryFramework(
stopChan <-chan struct{},
watchFactory *factory.WatchFactory,
resourceHandler *ResourceHandler) *RetryFramework {
return &RetryFramework{
retryEntries: syncmap.NewSyncMap[*retryObjEntry](),
retryChan: make(chan struct{}, 1),
watchFactory: watchFactory,
StopChan: stopChan,
ResourceHandler: resourceHandler,
}
}
Expand Down Expand Up @@ -363,7 +360,12 @@ func (r *RetryFramework) iterateRetryResources() {
// periodicallyRetryResources tracks RetryFramework and checks if any object needs to be retried for add or delete every
// RetryObjInterval seconds or when requested through retryChan.
func (r *RetryFramework) periodicallyRetryResources() {
defer r.watchFactory.Wg.Done()
timer := time.NewTicker(RetryObjInterval)
waitCh := make(chan struct{})
go func() {
r.watchFactory.WaitForWatchFactoryStopChannel(waitCh)
}()
defer timer.Stop()
for {
select {
Expand All @@ -375,7 +377,7 @@ func (r *RetryFramework) periodicallyRetryResources() {
r.iterateRetryResources()
timer.Reset(RetryObjInterval)

case <-r.StopChan:
case <-waitCh:
klog.V(5).Infof("Stop channel got triggered: will stop retrying failed objects of type %s", r.ResourceHandler.ObjType)
return
}
Expand Down Expand Up @@ -657,6 +659,7 @@ func (r *RetryFramework) WatchResourceFiltered(namespaceForFilteredHandler strin

// track the retry entries and every 30 seconds (or upon explicit request) check if any objects
// need to be retried
r.watchFactory.Wg.Add(1) //add to the watchFactory waitgroup, the waitgroup done is inside the periodicRetryResources
go r.periodicallyRetryResources()

return handler, nil
Expand Down

0 comments on commit fc2c3b9

Please sign in to comment.