diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go index 999f3f7639ac..d4914e1fa94b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go @@ -334,6 +334,13 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, chanSize = 1000 } + // With some events already sent, update resourceVersion so that + // events that were buffered and not yet processed won't be delivered + // to this watcher second time causing going back in time. + if len(initEvents) > 0 { + watchRV = initEvents[len(initEvents)-1].ResourceVersion + } + c.Lock() defer c.Unlock() forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go index 7f636caba496..db91f0b344b1 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go @@ -212,7 +212,15 @@ func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error { return fmt.Errorf("unimplemented") } func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { - return 0, fmt.Errorf("unimplemented") + accessor, err := meta.Accessor(obj) + if err != nil { + return 0, err + } + version := accessor.GetResourceVersion() + if len(version) == 0 { + return 0, nil + } + return strconv.ParseUint(version, 10, 64) } func (testVersioner) ParseWatchResourceVersion(resourceVersion string) (uint64, error) { return strconv.ParseUint(resourceVersion, 10, 64) @@ -353,3 +361,82 @@ func TestGetToListWithLimitAndRV0(t *testing.T) { t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err) } } + +func TestWatcherNotGoingBackInTime(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _ := newTestCacher(backingStorage, 1000) + defer cacher.Stop() + + // Wait until cacher is initialized. + cacher.ready.wait() + + // Ensure there is some budget for slowing down processing. + cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond) + + makePod := func(i int) *examplev1.Pod { + return &examplev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", 1000+i), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%d", 1000+i), + }, + } + } + if err := cacher.watchCache.Add(makePod(0)); err != nil { + t.Errorf("error: %v", err) + } + + totalPods := 100 + + // Create watcher that will be slowing down reading. + w1, err := cacher.Watch(context.TODO(), "pods/ns", "999", Everything) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + defer w1.Stop() + go func() { + a := 0 + for range w1.ResultChan() { + time.Sleep(time.Millisecond) + a++ + if a == 100 { + break + } + } + }() + + // Now push a ton of object to cache. + for i := 1; i < totalPods; i++ { + cacher.watchCache.Add(makePod(i)) + } + + // Create fast watcher and ensure it will get each object exactly once. + w2, err := cacher.Watch(context.TODO(), "pods/ns", "999", Everything) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + defer w2.Stop() + + shouldContinue := true + currentRV := uint64(0) + for shouldContinue { + select { + case event, ok := <-w2.ResultChan(): + if !ok { + shouldContinue = false + break + } + rv, err := testVersioner{}.ParseListResourceVersion(event.Object.(*examplev1.Pod).ResourceVersion) + if err != nil { + t.Errorf("unexpected parsing error: %v", err) + } else { + if rv < currentRV { + t.Errorf("watcher going back in time") + } + currentRV = rv + } + case <-time.After(time.Second): + w2.Stop() + } + } +}