Skip to content

Commit

Permalink
Make watch order conformance test reliable
Browse files Browse the repository at this point in the history
  • Loading branch information
liggitt committed May 13, 2021
1 parent f1eb4b6 commit 410c89f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
2 changes: 2 additions & 0 deletions test/e2e/apimachinery/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ go_library(
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/watch:go_default_library",
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
"//staging/src/k8s.io/client-go/util/keyutil:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
Expand Down
23 changes: 19 additions & 4 deletions test/e2e/apimachinery/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
cachetools "k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/test/e2e/framework"

"github.com/onsi/ginkgo"
Expand Down Expand Up @@ -336,6 +338,11 @@ var _ = SIGDescribe("Watchers", func() {

iterations := 100

ginkgo.By("getting a starting resourceVersion")
configmaps, err := c.CoreV1().ConfigMaps(ns).List(context.TODO(), metav1.ListOptions{})
framework.ExpectNoError(err, "Failed to list configmaps in the namespace %s", ns)
resourceVersion := configmaps.ResourceVersion

ginkgo.By("starting a background goroutine to produce watch events")
donec := make(chan struct{})
stopc := make(chan struct{})
Expand All @@ -345,11 +352,16 @@ var _ = SIGDescribe("Watchers", func() {
produceConfigMapEvents(f, stopc, 5*time.Millisecond)
}()

listWatcher := &cachetools.ListWatch{
WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) {
return c.CoreV1().ConfigMaps(ns).Watch(context.TODO(), listOptions)
},
}

ginkgo.By("creating watches starting from each resource version of the events produced and verifying they all receive resource versions in the same order")
wcs := []watch.Interface{}
resourceVersion := "0"
for i := 0; i < iterations; i++ {
wc, err := c.CoreV1().ConfigMaps(ns).Watch(context.TODO(), metav1.ListOptions{ResourceVersion: resourceVersion})
wc, err := watchtools.NewRetryWatcher(resourceVersion, listWatcher)
framework.ExpectNoError(err, "Failed to watch configmaps in the namespace %s", ns)
wcs = append(wcs, wc)
resourceVersion = waitForNextConfigMapEvent(wcs[0]).ResourceVersion
Expand Down Expand Up @@ -436,11 +448,14 @@ func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject ru

func waitForNextConfigMapEvent(watch watch.Interface) *v1.ConfigMap {
select {
case event := <-watch.ResultChan():
case event, ok := <-watch.ResultChan():
if !ok {
framework.Failf("Watch closed unexpectedly")
}
if configMap, ok := event.Object.(*v1.ConfigMap); ok {
return configMap
}
framework.Failf("expected config map")
framework.Failf("expected config map, got %T", event.Object)
case <-time.After(10 * time.Second):
framework.Failf("timed out waiting for watch event")
}
Expand Down

0 comments on commit 410c89f

Please sign in to comment.