Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race in watch tests - attempt 3 #19006

Merged
merged 1 commit into from
Dec 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/storage/etcd/etcd_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ type etcdWatcher struct {
userStop chan struct{}
stopped bool
stopLock sync.Mutex
// wg is used to avoid calls to etcd after Stop()
wg sync.WaitGroup

// Injectable for testing. Send the event down the outgoing channel.
emit func(watch.Event)
Expand Down Expand Up @@ -129,6 +131,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, e
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
stopped: false,
wg: sync.WaitGroup{},
cache: cache,
ctx: nil,
cancel: nil,
Expand All @@ -145,6 +148,11 @@ func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key st
defer close(w.etcdError)
defer close(w.etcdIncoming)

// All calls to etcd are coming from this function - once it is finished
// no other call to etcd should be generated by this watcher.
w.wg.Add(1)
defer w.wg.Done()

// We need to be prepared, that Stop() can be called at any time.
// It can potentially also be called, even before this function is called.
// If that is the case, we simply skip all the code here.
Expand Down Expand Up @@ -456,4 +464,7 @@ func (w *etcdWatcher) Stop() {
w.stopped = true
close(w.userStop)
}
// Wait until all calls to etcd are finished and no other
// will be issued.
w.wg.Wait()
}
10 changes: 1 addition & 9 deletions pkg/storage/etcd/etcd_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestWatch(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watching.Stop()
// watching is explicitly closed below.

// Test normal case
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
Expand Down Expand Up @@ -327,8 +327,6 @@ func TestWatchEtcdState(t *testing.T) {
if e, a := endpoint, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", e, a)
}

watching.Stop()
}

func TestWatchFromZeroIndex(t *testing.T) {
Expand Down Expand Up @@ -379,8 +377,6 @@ func TestWatchFromZeroIndex(t *testing.T) {
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", e, a)
}

watching.Stop()
}

func TestWatchListFromZeroIndex(t *testing.T) {
Expand Down Expand Up @@ -411,8 +407,6 @@ func TestWatchListFromZeroIndex(t *testing.T) {
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", e, a)
}

watching.Stop()
}

func TestWatchListIgnoresRootKey(t *testing.T) {
Expand Down Expand Up @@ -444,8 +438,6 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
default:
// fall through, expected behavior
}

watching.Stop()
}

func TestWatchPurposefulShutdown(t *testing.T) {
Expand Down