Skip to content

Commit

Permalink
Merge pull request #96662 from wojtek-t/fix_starting_rv_test
Browse files Browse the repository at this point in the history
Fix TestStartingResourceVersion flakiness
  • Loading branch information
k8s-ci-robot committed Nov 20, 2020
2 parents 264a533 + 37b0004 commit 06b0179
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 57 deletions.
Expand Up @@ -1006,6 +1006,87 @@ func (f *fakeTimeBudget) takeAvailable() time.Duration {

func (f *fakeTimeBudget) returnUnused(_ time.Duration) {}

func TestStartingResourceVersion(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()

// Ensure there is some budget for slowing down processing.
// We use the fakeTimeBudget to prevent this test from flaking under
// the following conditions:
// 1) in total we create 11 events that has to be processed by the watcher
// 2) the size of the channels are set to 10 for the watcher
// 3) if the test is cpu-starved and the internal goroutine is not picking
// up these events from the channel, after consuming the whole time
// budget (defaulted to 100ms) on waiting, we will simply close the watch,
// which will cause the test failure
// Using fakeTimeBudget gives us always a budget to wait and have a test
// pick up something from ResultCh in the meantime.
//
// The same can potentially happen in production, but in that case a watch
// can be resumed by the client. This doesn't work in the case of this test,
// because we explicitly want to test the behavior that object changes are
// happening after the watch was initiated.
cacher.dispatchTimeoutBudget = &fakeTimeBudget{}

makePod := func(i int) *examplev1.Pod {
return &examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "ns",
Labels: map[string]string{"foo": strconv.Itoa(i)},
ResourceVersion: fmt.Sprintf("%d", i),
},
}
}

if err := cacher.watchCache.Add(makePod(1000)); err != nil {
t.Errorf("error: %v", err)
}
// Advance RV by 10.
startVersion := uint64(1010)

watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: strconv.FormatUint(startVersion, 10), Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()

for i := 1; i <= 11; i++ {
if err := cacher.watchCache.Update(makePod(1000 + i)); err != nil {
t.Errorf("error: %v", err)
}
}

select {
case e, ok := <-watcher.ResultChan():
if !ok {
t.Errorf("unexpectedly closed watch")
break
}
object := e.Object
if co, ok := object.(runtime.CacheableObject); ok {
object = co.GetObject()
}
pod := object.(*examplev1.Pod)
podRV, err := cacher.versioner.ParseResourceVersion(pod.ResourceVersion)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// event should have at least rv + 1, since we're starting the watch at rv
if podRV <= startVersion {
t.Errorf("expected event with resourceVersion of at least %d, got %d", startVersion+1, podRV)
}
}
}

func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
Expand All @@ -1018,8 +1099,8 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
cacher.ready.wait()

// Ensure there is some budget for slowing down processing.
// When using the official `timeBudgetImpl` we were observing flakiness
// due under the following conditions:
// We use the fakeTimeBudget to prevent this test from flaking under
// the following conditions:
// 1) the watch w1 is blocked, so we were consuming the whole budget once
// its buffer was filled in (10 items)
// 2) the budget is refreshed once per second, so it basically wasn't
Expand Down
55 changes: 0 additions & 55 deletions staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go
Expand Up @@ -603,61 +603,6 @@ func TestFiltering(t *testing.T) {
verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
}

func TestStartingResourceVersion(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, v, err := newTestCacher(etcdStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()

// add 1 object
podFoo := makeTestPod("foo")
fooCreated := updatePod(t, etcdStorage, podFoo, nil)

// Set up Watch starting at fooCreated.ResourceVersion + 10
rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rv += 10
startVersion := strconv.Itoa(int(rv))

watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()

lastFoo := fooCreated
for i := 0; i < 11; i++ {
podFooForUpdate := makeTestPod("foo")
podFooForUpdate.Labels = map[string]string{"foo": strconv.Itoa(i)}
lastFoo = updatePod(t, etcdStorage, podFooForUpdate, lastFoo)
}

select {
case e := <-watcher.ResultChan():
object := e.Object
if co, ok := object.(runtime.CacheableObject); ok {
object = co.GetObject()
}
pod := object.(*example.Pod)
podRV, err := v.ParseResourceVersion(pod.ResourceVersion)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// event should have at least rv + 1, since we're starting the watch at rv
if podRV <= rv {
t.Errorf("expected event with resourceVersion of at least %d, got %d", rv+1, podRV)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for event")
}
}

func TestEmptyWatchEventCache(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
Expand Down

0 comments on commit 06b0179

Please sign in to comment.