Skip to content

Commit

Permalink
Terminate watchers when watch cache is destroyed
Browse files Browse the repository at this point in the history
  • Loading branch information
liggitt committed May 17, 2019
1 parent 8c3d596 commit f1c00f3
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Expand Up @@ -234,6 +234,7 @@ func NewCacherFromConfig(config Config) *Cacher {
cacher.stopWg.Add(1)
go func() {
defer cacher.stopWg.Done()
defer cacher.terminateAllWatchers()
wait.Until(
func() {
if !cacher.isStopped() {
Expand Down
Expand Up @@ -438,3 +438,39 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
}
}
}

func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 1000)
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()

w, err := cacher.Watch(context.Background(), "pods/ns", "0", storage.Everything)
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}

watchClosed := make(chan struct{})
go func() {
defer close(watchClosed)
for event := range w.ResultChan() {
switch event.Type {
case watch.Added, watch.Modified, watch.Deleted:
// ok
default:
t.Errorf("unexpected event %#v", event)
}
}
}()

cacher.Stop()

select {
case <-watchClosed:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for watch to close")
}

}

0 comments on commit f1c00f3

Please sign in to comment.