Skip to content

Commit

Permalink
Treat replaced events that didn't change resourceVersion as resync ev…
Browse files Browse the repository at this point in the history
…ents

Kubernetes-commit: a6caa0a4726ba97737056175494516367cf98cae
  • Loading branch information
liggitt authored and k8s-publishing-bot committed Feb 8, 2020
1 parent af50d22 commit 08cc531
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
17 changes: 16 additions & 1 deletion tools/cache/shared_informer.go
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -486,7 +487,21 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := d.Type == Sync

isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions tools/cache/shared_informer_test.go
Expand Up @@ -271,8 +271,8 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()

source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})

// create the shared informer and resync every 1s
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
Expand Down Expand Up @@ -301,8 +301,8 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
}

// Add pod3, bump pod2 but don't broadcast it, so that the change will be seen only on relist
source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3"}})
source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}})
source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3", ResourceVersion: "3"}})
source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "4"}})

// Ensure that nobody saw any changes
for _, listener := range listeners {
Expand All @@ -315,7 +315,7 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
listener.receivedItemNames = []string{}
}

listenerNoResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3")
listenerNoResync.expectedItemNames = sets.NewString("pod2", "pod3")
listenerResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3")

// This calls shouldSync, which deletes noResync from the list of syncingListeners
Expand Down

0 comments on commit 08cc531

Please sign in to comment.