Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #73845 upstream release 1.13 #73909

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
chanSize = 1000
}

// With some events already sent, update resourceVersion so that
// events that were buffered and not yet processed won't be delivered
// to this watcher second time causing going back in time.
if len(initEvents) > 0 {
watchRV = initEvents[len(initEvents)-1].ResourceVersion
}

c.Lock()
defer c.Unlock()
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,15 @@ func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
return fmt.Errorf("unimplemented")
}
func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
return 0, fmt.Errorf("unimplemented")
accessor, err := meta.Accessor(obj)
if err != nil {
return 0, err
}
version := accessor.GetResourceVersion()
if len(version) == 0 {
return 0, nil
}
return strconv.ParseUint(version, 10, 64)
}
func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) {
return strconv.ParseUint(resourceVersion, 10, 64)
Expand Down Expand Up @@ -351,3 +359,82 @@ func TestGetToListWithLimitAndRV0(t *testing.T) {
t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err)
}
}

func TestWatcherNotGoingBackInTime(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 1000)
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()

// Ensure there is some budget for slowing down processing.
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)

makePod := func(i int) *examplev1.Pod {
return &examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", 1000+i),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%d", 1000+i),
},
}
}
if err := cacher.watchCache.Add(makePod(0)); err != nil {
t.Errorf("error: %v", err)
}

totalPods := 100

// Create watcher that will be slowing down reading.
w1, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything)
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
defer w1.Stop()
go func() {
a := 0
for range w1.ResultChan() {
time.Sleep(time.Millisecond)
a++
if a == 100 {
break
}
}
}()

// Now push a ton of object to cache.
for i := 1; i < totalPods; i++ {
cacher.watchCache.Add(makePod(i))
}

// Create fast watcher and ensure it will get each object exactly once.
w2, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything)
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
defer w2.Stop()

shouldContinue := true
currentRV := uint64(0)
for shouldContinue {
select {
case event, ok := <-w2.ResultChan():
if !ok {
shouldContinue = false
break
}
rv, err := testVersioner{}.ParseResourceVersion(event.Object.(*examplev1.Pod).ResourceVersion)
if err != nil {
t.Errorf("unexpected parsing error: %v", err)
} else {
if rv < currentRV {
t.Errorf("watcher going back in time")
}
currentRV = rv
}
case <-time.After(time.Second):
w2.Stop()
}
}
}