Skip to content
This repository has been archived by the owner on Mar 26, 2021. It is now read-only.

Commit

Permalink
Merge pull request #36613 from mwielgus/watcher-fix-2
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

Ensure proper serialization of updates and creates in federation test watcher

Fix for finalizer test problems. The issue there was that the updates were coming out of order. It was caused by firing updates in new goroutines in test watcher. The proper solution is to order them in a queue and fire in order on a single goroutine.

Ref: kubernetes/kubernetes#36473 (comment)

cc: @nikhiljindal @madhusudancs
  • Loading branch information
Kubernetes Submit Queue committed Nov 11, 2016
2 parents 29638d7 + ebeb7eb commit e32ce82
Showing 1 changed file with 23 additions and 8 deletions.
31 changes: 23 additions & 8 deletions pkg/federation-controller/util/test/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ import (
// A structure that distributes eventes to multiple watchers.
type WatcherDispatcher struct {
sync.Mutex
watchers []*watch.RaceFreeFakeWatcher
eventsSoFar []*watch.Event
watchers []*watch.RaceFreeFakeWatcher
eventsSoFar []*watch.Event
orderExecution chan func()
stopChan chan struct{}
}

func (wd *WatcherDispatcher) register(watcher *watch.RaceFreeFakeWatcher) {
Expand All @@ -55,6 +57,7 @@ func (wd *WatcherDispatcher) register(watcher *watch.RaceFreeFakeWatcher) {
func (wd *WatcherDispatcher) Stop() {
wd.Lock()
defer wd.Unlock()
close(wd.stopChan)
for _, watcher := range wd.watchers {
watcher.Stop()
}
Expand Down Expand Up @@ -136,9 +139,21 @@ func (wd *WatcherDispatcher) Action(action watch.EventType, obj runtime.Object)
// All subsequent requests for a watch on the client will result in returning this fake watcher.
func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher {
dispatcher := &WatcherDispatcher{
watchers: make([]*watch.RaceFreeFakeWatcher, 0),
eventsSoFar: make([]*watch.Event, 0),
watchers: make([]*watch.RaceFreeFakeWatcher, 0),
eventsSoFar: make([]*watch.Event, 0),
orderExecution: make(chan func()),
stopChan: make(chan struct{}),
}
go func() {
for {
select {
case fun := <-dispatcher.orderExecution:
fun()
case <-dispatcher.stopChan:
return
}
}
}()

client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) {
watcher := watch.NewRaceFreeFake()
Expand Down Expand Up @@ -166,11 +181,11 @@ func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *Watch
originalObj := createAction.GetObject()
// Create a copy of the object here to prevent data races while reading the object in go routine.
obj := copy(originalObj)
go func() {
watcher.orderExecution <- func() {
glog.V(4).Infof("Object created. Writing to channel: %v", obj)
watcher.Add(obj)
objChan <- obj
}()
}
return true, originalObj, nil
})
return objChan
Expand All @@ -186,11 +201,11 @@ func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *Watch
originalObj := updateAction.GetObject()
// Create a copy of the object here to prevent data races while reading the object in go routine.
obj := copy(originalObj)
go func() {
watcher.orderExecution <- func() {
glog.V(4).Infof("Object updated. Writing to channel: %v", obj)
watcher.Modify(obj)
objChan <- obj
}()
}
return true, originalObj, nil
})
return objChan
Expand Down

0 comments on commit e32ce82

Please sign in to comment.