Skip to content

Commit

Permalink
Merge pull request #3226 from bpickard22/data-race
Browse files Browse the repository at this point in the history
 refactor retry to close watchFactory
  • Loading branch information
trozet committed Oct 27, 2022
2 parents 70e0e13 + fc2c3b9 commit 34e3601
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
9 changes: 9 additions & 0 deletions go-controller/pkg/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package factory
import (
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -60,6 +61,7 @@ type WatchFactory struct {
informers map[reflect.Type]*informer

stopChan chan struct{}
Wg sync.WaitGroup
}

// WatchFactory implements the ObjectCacheInterface interface.
Expand Down Expand Up @@ -346,13 +348,20 @@ func NewNodeWatchFactory(ovnClientset *util.OVNClientset, nodeName string) (*Wat
return wf, nil
}

func (wf *WatchFactory) WaitForWatchFactoryStopChannel(stopChan chan struct{}) {
<-wf.stopChan
close(stopChan)
}

func (wf *WatchFactory) Shutdown() {
close(wf.stopChan)

// Remove all informer handlers
for _, inf := range wf.informers {
inf.shutdown()
}

wf.Wg.Wait() //waiting for periodicallyRetry to return
}

func getObjectMeta(objType reflect.Type, obj interface{}) (*metav1.ObjectMeta, error) {
Expand Down
1 change: 0 additions & 1 deletion go-controller/pkg/ovn/obj_retry_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func (oc *Controller) newRetryFrameworkMasterWithParameters(
},
}
r := retry.NewRetryFramework(
oc.stopChan,
oc.watchFactory,
resourceHandler,
)
Expand Down
11 changes: 7 additions & 4 deletions go-controller/pkg/retry/obj_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ type RetryFramework struct {
retryChan chan struct{}

watchFactory *factory.WatchFactory
StopChan <-chan struct{}
ResourceHandler *ResourceHandler
}

Expand All @@ -86,14 +85,12 @@ type RetryFramework struct {
// ovnk node) will have to override the functions in the returned struct with the desired
// per-resource logic.
func NewRetryFramework(
stopChan <-chan struct{},
watchFactory *factory.WatchFactory,
resourceHandler *ResourceHandler) *RetryFramework {
return &RetryFramework{
retryEntries: syncmap.NewSyncMap[*retryObjEntry](),
retryChan: make(chan struct{}, 1),
watchFactory: watchFactory,
StopChan: stopChan,
ResourceHandler: resourceHandler,
}
}
Expand Down Expand Up @@ -363,7 +360,12 @@ func (r *RetryFramework) iterateRetryResources() {
// periodicallyRetryResources tracks RetryFramework and checks if any object needs to be retried for add or delete every
// RetryObjInterval seconds or when requested through retryChan.
func (r *RetryFramework) periodicallyRetryResources() {
defer r.watchFactory.Wg.Done()
timer := time.NewTicker(RetryObjInterval)
waitCh := make(chan struct{})
go func() {
r.watchFactory.WaitForWatchFactoryStopChannel(waitCh)
}()
defer timer.Stop()
for {
select {
Expand All @@ -375,7 +377,7 @@ func (r *RetryFramework) periodicallyRetryResources() {
r.iterateRetryResources()
timer.Reset(RetryObjInterval)

case <-r.StopChan:
case <-waitCh:
klog.V(5).Infof("Stop channel got triggered: will stop retrying failed objects of type %s", r.ResourceHandler.ObjType)
return
}
Expand Down Expand Up @@ -657,6 +659,7 @@ func (r *RetryFramework) WatchResourceFiltered(namespaceForFilteredHandler strin

// track the retry entries and every 30 seconds (or upon explicit request) check if any objects
// need to be retried
r.watchFactory.Wg.Add(1) //add to the watchFactory waitgroup, the waitgroup done is inside the periodicRetryResources
go r.periodicallyRetryResources()

return handler, nil
Expand Down

0 comments on commit 34e3601

Please sign in to comment.