Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cacher: do not simply popExpiredWatchers when the cacher hasn't dispatched any event #117014

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 14 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Expand Up @@ -922,12 +922,24 @@ func (c *Cacher) dispatchEvents() {
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
// Never send a bookmark event if we did not see an event here, this is fine
// because we don't provide any guarantees on sending bookmarks.
//
// Just pop closed watchers and requeue others if needed.
//
// TODO(#115478): rework the following logic
// in a way that would allow more
// efficient cleanup of closed watchers
if lastProcessedResourceVersion == 0 {
func() {
c.Lock()
defer c.Unlock()
// pop expired watchers in case there has been no update
c.bookmarkWatchers.popExpiredWatchersThreadUnsafe()
for _, watchers := range c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() {
for _, watcher := range watchers {
if watcher.stopped {
continue
}
c.bookmarkWatchers.addWatcherThreadUnsafe(watcher)
}
}
}()
continue
}
Expand Down
Expand Up @@ -1973,3 +1973,59 @@ func BenchmarkCacher_GetList(b *testing.B) {
})
}
}

// TestDoNotPopExpiredWatchersWhenNoEventsSeen makes sure that
// a bookmark event will be delivered after the cacher has seen an event.
// Previously the watchers have been removed from the "want bookmark" queue.
func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()

// wait until cacher is initialized.
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

pred := storage.Everything
pred.AllowWatchBookmarks = true
opts := storage.ListOptions{
Predicate: pred,
SendInitialEvents: pointer.Bool(true),
}
w, err := cacher.Watch(context.Background(), "pods/ns", opts)
require.NoError(t, err, "failed to create watch: %v")
defer w.Stop()

// Ensure that popExpiredWatchers is called to ensure that our watch isn't removed from bookmarkWatchers.
// We do that every ~1s, so waiting 2 seconds seems enough.
time.Sleep(2 * time.Second)

// Send an event to ensure that lastProcessedResourceVersion in Cacher will change to non-zero value.
makePod := func(rv uint64) *example.Pod {
wojtek-t marked this conversation as resolved.
Show resolved Hide resolved
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", rv),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%d", rv),
Annotations: map[string]string{},
},
}
}
err = cacher.watchCache.Add(makePod(102))
require.NoError(t, err)

verifyEvents(t, w, []watch.Event{
{Type: watch.Added, Object: makePod(102)},
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "102",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
}, true)
}