diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index bb78ea81dc72b..e42c93b5b5768 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -1006,6 +1006,87 @@ func (f *fakeTimeBudget) takeAvailable() time.Duration { func (f *fakeTimeBudget) returnUnused(_ time.Duration) {} +func TestStartingResourceVersion(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + // Wait until cacher is initialized. + cacher.ready.wait() + + // Ensure there is some budget for slowing down processing. + // We use the fakeTimeBudget to prevent this test from flaking under + // the following conditions: + // 1) in total we create 11 events that has to be processed by the watcher + // 2) the size of the channels are set to 10 for the watcher + // 3) if the test is cpu-starved and the internal goroutine is not picking + // up these events from the channel, after consuming the whole time + // budget (defaulted to 100ms) on waiting, we will simply close the watch, + // which will cause the test failure + // Using fakeTimeBudget gives us always a budget to wait and have a test + // pick up something from ResultCh in the meantime. + // + // The same can potentially happen in production, but in that case a watch + // can be resumed by the client. This doesn't work in the case of this test, + // because we explicitly want to test the behavior that object changes are + // happening after the watch was initiated. + cacher.dispatchTimeoutBudget = &fakeTimeBudget{} + + makePod := func(i int) *examplev1.Pod { + return &examplev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "ns", + Labels: map[string]string{"foo": strconv.Itoa(i)}, + ResourceVersion: fmt.Sprintf("%d", i), + }, + } + } + + if err := cacher.watchCache.Add(makePod(1000)); err != nil { + t.Errorf("error: %v", err) + } + // Advance RV by 10. + startVersion := uint64(1010) + + watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: strconv.FormatUint(startVersion, 10), Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer watcher.Stop() + + for i := 1; i <= 11; i++ { + if err := cacher.watchCache.Update(makePod(1000 + i)); err != nil { + t.Errorf("error: %v", err) + } + } + + select { + case e, ok := <-watcher.ResultChan(): + if !ok { + t.Errorf("unexpectedly closed watch") + break + } + object := e.Object + if co, ok := object.(runtime.CacheableObject); ok { + object = co.GetObject() + } + pod := object.(*examplev1.Pod) + podRV, err := cacher.versioner.ParseResourceVersion(pod.ResourceVersion) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // event should have at least rv + 1, since we're starting the watch at rv + if podRV <= startVersion { + t.Errorf("expected event with resourceVersion of at least %d, got %d", startVersion+1, podRV) + } + } +} + func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) @@ -1018,8 +1099,8 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { cacher.ready.wait() // Ensure there is some budget for slowing down processing. - // When using the official `timeBudgetImpl` we were observing flakiness - // due under the following conditions: + // We use the fakeTimeBudget to prevent this test from flaking under + // the following conditions: // 1) the watch w1 is blocked, so we were consuming the whole budget once // its buffer was filled in (10 items) // 2) the budget is refreshed once per second, so it basically wasn't diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index af16b97d51105..fbc1d27455e80 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -603,61 +603,6 @@ func TestFiltering(t *testing.T) { verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime) } -func TestStartingResourceVersion(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) - defer server.Terminate(t) - cacher, v, err := newTestCacher(etcdStorage) - if err != nil { - t.Fatalf("Couldn't create cacher: %v", err) - } - defer cacher.Stop() - - // add 1 object - podFoo := makeTestPod("foo") - fooCreated := updatePod(t, etcdStorage, podFoo, nil) - - // Set up Watch starting at fooCreated.ResourceVersion + 10 - rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - rv += 10 - startVersion := strconv.Itoa(int(rv)) - - watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer watcher.Stop() - - lastFoo := fooCreated - for i := 0; i < 11; i++ { - podFooForUpdate := makeTestPod("foo") - podFooForUpdate.Labels = map[string]string{"foo": strconv.Itoa(i)} - lastFoo = updatePod(t, etcdStorage, podFooForUpdate, lastFoo) - } - - select { - case e := <-watcher.ResultChan(): - object := e.Object - if co, ok := object.(runtime.CacheableObject); ok { - object = co.GetObject() - } - pod := object.(*example.Pod) - podRV, err := v.ParseResourceVersion(pod.ResourceVersion) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // event should have at least rv + 1, since we're starting the watch at rv - if podRV <= rv { - t.Errorf("expected event with resourceVersion of at least %d, got %d", rv+1, podRV) - } - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("timed out waiting for event") - } -} - func TestEmptyWatchEventCache(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t)