Skip to content

Commit

Permalink
Avoid going back in time in watchcache watchers
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-t committed Feb 11, 2019
1 parent eadd14e commit f64db3f
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 1 deletion.
7 changes: 7 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher.go
Expand Up @@ -334,6 +334,13 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
chanSize = 1000
}

// With some events already sent, update resourceVersion so that
// events that were buffered and not yet processed won't be delivered
// to this watcher second time causing going back in time.
if len(initEvents) > 0 {
watchRV = initEvents[len(initEvents)-1].ResourceVersion
}

c.Lock()
defer c.Unlock()
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
Expand Down
89 changes: 88 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go
Expand Up @@ -212,7 +212,15 @@ func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
return fmt.Errorf("unimplemented")
}
func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
return 0, fmt.Errorf("unimplemented")
accessor, err := meta.Accessor(obj)
if err != nil {
return 0, err
}
version := accessor.GetResourceVersion()
if len(version) == 0 {
return 0, nil
}
return strconv.ParseUint(version, 10, 64)
}
func (testVersioner) ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
return strconv.ParseUint(resourceVersion, 10, 64)
Expand Down Expand Up @@ -353,3 +361,82 @@ func TestGetToListWithLimitAndRV0(t *testing.T) {
t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err)
}
}

func TestWatcherNotGoingBackInTime(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 1000)
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()

// Ensure there is some budget for slowing down processing.
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)

makePod := func(i int) *examplev1.Pod {
return &examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", 1000+i),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%d", 1000+i),
},
}
}
if err := cacher.watchCache.Add(makePod(0)); err != nil {
t.Errorf("error: %v", err)
}

totalPods := 100

// Create watcher that will be slowing down reading.
w1, err := cacher.Watch(context.TODO(), "pods/ns", "999", Everything)
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
defer w1.Stop()
go func() {
a := 0
for range w1.ResultChan() {
time.Sleep(time.Millisecond)
a++
if a == 100 {
break
}
}
}()

// Now push a ton of object to cache.
for i := 1; i < totalPods; i++ {
cacher.watchCache.Add(makePod(i))
}

// Create fast watcher and ensure it will get each object exactly once.
w2, err := cacher.Watch(context.TODO(), "pods/ns", "999", Everything)
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
defer w2.Stop()

shouldContinue := true
currentRV := uint64(0)
for shouldContinue {
select {
case event, ok := <-w2.ResultChan():
if !ok {
shouldContinue = false
break
}
rv, err := testVersioner{}.ParseListResourceVersion(event.Object.(*examplev1.Pod).ResourceVersion)
if err != nil {
t.Errorf("unexpected parsing error: %v", err)
} else {
if rv < currentRV {
t.Errorf("watcher going back in time")
}
currentRV = rv
}
case <-time.After(time.Second):
w2.Stop()
}
}
}

0 comments on commit f64db3f

Please sign in to comment.