Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add api-machinery 'watch-consistency' e2e test #69829

Merged
merged 1 commit into from
Oct 20, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
112 changes: 112 additions & 0 deletions test/e2e/apimachinery/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package apimachinery

import (
"fmt"
"math/rand"
"time"

"k8s.io/api/core/v1"
Expand Down Expand Up @@ -314,6 +316,49 @@ var _ = SIGDescribe("Watchers", func() {
expectEvent(testWatch, watch.Modified, testConfigMapThirdUpdate)
expectEvent(testWatch, watch.Deleted, nil)
})

/*
Testname: watch-consistency
Description: Ensure that concurrent watches are consistent with each other by initiating an additional watch
for events received from the first watch, initiated at the resource version of the event, and checking that all
resource versions of all events match. Events are produced from writes on a background goroutine.
*/
It("should receive events on concurrent watches in same order", func() {
c := f.ClientSet
ns := f.Namespace.Name

iterations := 100

By("starting a background goroutine to produce watch events")
donec := make(chan struct{})
stopc := make(chan struct{})
go func() {
defer GinkgoRecover()
defer close(donec)
produceConfigMapEvents(f, stopc, 5*time.Millisecond)
}()

By("creating watches starting from each resource version of the events produced and verifying they all receive resource versions in the same order")
wcs := []watch.Interface{}
resourceVersion := "0"
for i := 0; i < iterations; i++ {
wc, err := c.CoreV1().ConfigMaps(ns).Watch(metav1.ListOptions{ResourceVersion: resourceVersion})
Expect(err).NotTo(HaveOccurred())
wcs = append(wcs, wc)
resourceVersion = waitForNextConfigMapEvent(wcs[0]).ResourceVersion
for _, wc := range wcs[1:] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bet doing these all in parallel would significantly reduce the wall time.

Copy link
Contributor Author

@jpbetz jpbetz Oct 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit to my surprise, this didn't speed things up. I'm guessing that the buffered channels <-watch.ResultChan() in waitForNextConfigMapEvent() are sufficiently concurrent already, since the watches are initiated in the outer for loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting.

e := waitForNextConfigMapEvent(wc)
if resourceVersion != e.ResourceVersion {
framework.Failf("resource version mismatch, expected %s but got %s", resourceVersion, e.ResourceVersion)
}
}
}
close(stopc)
for _, wc := range wcs {
wc.Stop()
}
<-donec
})
})

func watchConfigMaps(f *framework.Framework, resourceVersion string, labels ...string) (watch.Interface, error) {
Expand Down Expand Up @@ -381,3 +426,70 @@ func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject ru
}
}
}

func waitForNextConfigMapEvent(watch watch.Interface) *v1.ConfigMap {
select {
case event := <-watch.ResultChan():
if configMap, ok := event.Object.(*v1.ConfigMap); ok {
return configMap
} else {
framework.Failf("expected config map")
}
case <-time.After(10 * time.Second):
framework.Failf("timed out waiting for watch event")
}
return nil // should never happen
}

const (
createEvent = iota
updateEvent
deleteEvent
)

func produceConfigMapEvents(f *framework.Framework, stopc <-chan struct{}, minWaitBetweenEvents time.Duration) {
c := f.ClientSet
ns := f.Namespace.Name

name := func(i int) string {
return fmt.Sprintf("cm-%d", i)
}

existing := []int{}
tc := time.NewTicker(minWaitBetweenEvents)
defer tc.Stop()
i := 0
for range tc.C {
op := rand.Intn(3)
if len(existing) == 0 {
op = createEvent
}

cm := &v1.ConfigMap{}
switch op {
case createEvent:
cm.Name = name(i)
_, err := c.CoreV1().ConfigMaps(ns).Create(cm)
Expect(err).NotTo(HaveOccurred())
existing = append(existing, i)
i += 1
case updateEvent:
idx := rand.Intn(len(existing))
cm.Name = name(existing[idx])
_, err := c.CoreV1().ConfigMaps(ns).Update(cm)
Expect(err).NotTo(HaveOccurred())
case deleteEvent:
idx := rand.Intn(len(existing))
err := c.CoreV1().ConfigMaps(ns).Delete(name(existing[idx]), &metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
existing = append(existing[:idx], existing[idx+1:]...)
default:
framework.Failf("Unsupported event operation: %d", op)
}
select {
case <-stopc:
return
default:
}
}
}