From ebeb7eb334954e972650d42882df560ee05b5111 Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Thu, 10 Nov 2016 23:23:24 +0100 Subject: [PATCH] Ensure proper serialization of updates and creates in federation test watcher --- .../util/test/test_helper.go | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/pkg/federation-controller/util/test/test_helper.go b/pkg/federation-controller/util/test/test_helper.go index 91657924..6df7b0a4 100644 --- a/pkg/federation-controller/util/test/test_helper.go +++ b/pkg/federation-controller/util/test/test_helper.go @@ -39,8 +39,10 @@ import ( // A structure that distributes eventes to multiple watchers. type WatcherDispatcher struct { sync.Mutex - watchers []*watch.RaceFreeFakeWatcher - eventsSoFar []*watch.Event + watchers []*watch.RaceFreeFakeWatcher + eventsSoFar []*watch.Event + orderExecution chan func() + stopChan chan struct{} } func (wd *WatcherDispatcher) register(watcher *watch.RaceFreeFakeWatcher) { @@ -55,6 +57,7 @@ func (wd *WatcherDispatcher) register(watcher *watch.RaceFreeFakeWatcher) { func (wd *WatcherDispatcher) Stop() { wd.Lock() defer wd.Unlock() + close(wd.stopChan) for _, watcher := range wd.watchers { watcher.Stop() } @@ -136,9 +139,21 @@ func (wd *WatcherDispatcher) Action(action watch.EventType, obj runtime.Object) // All subsequent requests for a watch on the client will result in returning this fake watcher. func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher { dispatcher := &WatcherDispatcher{ - watchers: make([]*watch.RaceFreeFakeWatcher, 0), - eventsSoFar: make([]*watch.Event, 0), + watchers: make([]*watch.RaceFreeFakeWatcher, 0), + eventsSoFar: make([]*watch.Event, 0), + orderExecution: make(chan func()), + stopChan: make(chan struct{}), } + go func() { + for { + select { + case fun := <-dispatcher.orderExecution: + fun() + case <-dispatcher.stopChan: + return + } + } + }() client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) { watcher := watch.NewRaceFreeFake() @@ -166,11 +181,11 @@ func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *Watch originalObj := createAction.GetObject() // Create a copy of the object here to prevent data races while reading the object in go routine. obj := copy(originalObj) - go func() { + watcher.orderExecution <- func() { glog.V(4).Infof("Object created. Writing to channel: %v", obj) watcher.Add(obj) objChan <- obj - }() + } return true, originalObj, nil }) return objChan @@ -186,11 +201,11 @@ func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *Watch originalObj := updateAction.GetObject() // Create a copy of the object here to prevent data races while reading the object in go routine. obj := copy(originalObj) - go func() { + watcher.orderExecution <- func() { glog.V(4).Infof("Object updated. Writing to channel: %v", obj) watcher.Modify(obj) objChan <- obj - }() + } return true, originalObj, nil }) return objChan