Skip to content

Commit

Permalink
cacher: do not popExpiredWatchers when the cacher hasn't dispatched a…
Browse files Browse the repository at this point in the history
…ny event

If the cacher hasn't seen any event (when lastProcessedResourceVersion is zero) and
the bookmarkTimer has ticked then we shouldn't popExpiredWatchers. This is
because the watchers wont' be re-added and will miss future bookmark events when
the cacher finally receives an event via the c.incoming chan.
  • Loading branch information
p0lyn0mial committed Mar 30, 2023
1 parent c3e7eca commit ab0877c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
2 changes: 0 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,8 +912,6 @@ func (c *Cacher) dispatchEvents() {
// Never send a bookmark event if we did not see an event here, this is fine
// because we don't provide any guarantees on sending bookmarks.
if lastProcessedResourceVersion == 0 {
// pop expired watchers in case there has been no update
c.bookmarkWatchers.popExpiredWatchers()
continue
}
bookmarkEvent := &watchCacheEvent{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1830,3 +1830,54 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
require.NoError(t, err)
require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)")
}

// TestDoNotPopExpiredWatchersWhenNoEventsSeen makes sure that
// a bookmark event will be delivered after the cacher has seen an event.
// Previously the watchers have been removed from the "want bookmark" queue.
func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()

// wait until cacher is initialized.
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

opts := storage.ListOptions{Predicate: storage.Everything}
opts.Predicate.AllowWatchBookmarks = true
opts.SendInitialEvents = pointer.Bool(true)
w, err := cacher.Watch(context.Background(), "pods/ns", opts)
require.NoError(t, err, "failed to create watch: %v")
defer w.Stop()

// the default bookmarkTimer is ~1 sec, waiting 2 seconds is enough
time.Sleep(2 * time.Second)

makePod := func(rv uint64) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", rv),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%d", rv),
Annotations: map[string]string{},
},
}
}
err = cacher.watchCache.Add(makePod(102))
require.NoError(t, err)

verifyEvents(t, w, []watch.Event{
{Type: watch.Added, Object: makePod(102)},
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "102",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
}, true)
}

0 comments on commit ab0877c

Please sign in to comment.