Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add to #16916 #16947

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 19 additions & 8 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
Expand Down Expand Up @@ -352,10 +353,12 @@ func (c *Cacher) terminateAllWatchers() {
}
}

func forgetWatcher(c *Cacher, index int) func() {
return func() {
c.Lock()
defer c.Unlock()
func forgetWatcher(c *Cacher, index int) func(bool) {
return func(lock bool) {
if lock {
c.Lock()
defer c.Unlock()
}
// It's possible that the watcher is already not in the map (e.g. in case of
// simulaneous Stop() and terminateAllWatchers(), but it doesn't break anything.
delete(c.watchers, index)
Expand Down Expand Up @@ -428,10 +431,10 @@ type cacheWatcher struct {
result chan watch.Event
filter FilterFunc
stopped bool
forget func()
forget func(bool)
}

func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func()) *cacheWatcher {
func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan watchCacheEvent, 10),
result: make(chan watch.Event, 10),
Expand All @@ -450,7 +453,7 @@ func (c *cacheWatcher) ResultChan() <-chan watch.Event {

// Implements watch.Interface.
func (c *cacheWatcher) Stop() {
c.forget()
c.forget(true)
c.stop()
}

Expand All @@ -464,7 +467,15 @@ func (c *cacheWatcher) stop() {
}

func (c *cacheWatcher) add(event watchCacheEvent) {
c.input <- event
select {
case c.input <- event:
case <-time.After(5 * time.Second):
// This means that we couldn't send event to that watcher.
// Since we don't want to blockin on it infinitely,
// we simply terminate it.
c.forget(false)
c.stop()
}
}

func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
Expand Down
26 changes: 26 additions & 0 deletions pkg/storage/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,32 @@ func TestWatch(t *testing.T) {
verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
}

func TestWatcherTimeout(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage)

// Create a watcher that will not be reading any result.
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", 1, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()

// Create a second watcher that will be reading result.
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", 1, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer readingWatcher.Stop()

for i := 1; i <= 22; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are these magic numbers? Is it that the cacheWatchers have input and result channels with buffer size 10? so enough to block both?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - exactly.
Both input and output have size 10 and there can we 1 "in flight" during the translation.
So with 21 the tests was still passing without the fix.

pod := makeTestPod(strconv.Itoa(i))
_ = updatePod(t, etcdStorage, pod, nil)
verifyWatchEvent(t, readingWatcher, watch.Added, pod)
}
}

func TestFiltering(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
Expand Down