diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index 0762da3befac..96005ff58535 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -353,17 +353,6 @@ func NewIndexerInformer( return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil) } -// TransformFunc allows for transforming an object before it will be processed -// and put into the controller cache and before the corresponding handlers will -// be called on it. -// TransformFunc (similarly to ResourceEventHandler functions) should be able -// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown -// -// The most common usage pattern is to clean-up some parts of the object to -// reduce component memory usage if a given component doesn't care about them. -// given controller doesn't care for them -type TransformFunc func(interface{}) (interface{}, error) - // NewTransformingInformer returns a Store and a controller for populating // the store while also providing event notifications. You should only used // the returned Store for Get/List operations; Add/Modify/Deletes will cause @@ -411,19 +400,11 @@ func processDeltas( // Object which receives event notifications from the given deltas handler ResourceEventHandler, clientState Store, - transformer TransformFunc, deltas Deltas, ) error { // from oldest to newest for _, d := range deltas { obj := d.Object - if transformer != nil { - var err error - obj, err = transformer(obj) - if err != nil { - return err - } - } switch d.Type { case Sync, Replaced, Added, Updated: @@ -475,6 +456,7 @@ func newInformer( fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: clientState, EmitDeltaTypeReplaced: true, + Transformer: transformer, }) cfg := &Config{ @@ -486,7 +468,7 @@ func newInformer( Process: func(obj interface{}) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(h, clientState, transformer, deltas) + return processDeltas(h, clientState, deltas) } return errors.New("object given as Process argument is not Deltas") }, diff --git a/staging/src/k8s.io/client-go/tools/cache/controller_test.go b/staging/src/k8s.io/client-go/tools/cache/controller_test.go index cf42478e057e..ca4ba63ce392 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -32,7 +32,7 @@ import ( "k8s.io/apimachinery/pkg/watch" fcache "k8s.io/client-go/tools/cache/testing" - "github.com/google/gofuzz" + fuzz "github.com/google/gofuzz" ) func Example() { diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index 0c13a41f065b..84f3ab9ca13f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -51,6 +51,10 @@ type DeltaFIFOOptions struct { // When true, `Replaced` events will be sent for items passed to a Replace() call. // When false, `Sync` events will be sent instead. EmitDeltaTypeReplaced bool + + // If set, will be called for objects before enqueueing them. Please + // see the comment on TransformFunc for details. + Transformer TransformFunc } // DeltaFIFO is like FIFO, but differs in two ways. One is that the @@ -129,8 +133,32 @@ type DeltaFIFO struct { // emitDeltaTypeReplaced is whether to emit the Replaced or Sync // DeltaType when Replace() is called (to preserve backwards compat). emitDeltaTypeReplaced bool + + // Called with every object if non-nil. + transformer TransformFunc } +// TransformFunc allows for transforming an object before it will be processed. +// TransformFunc (similarly to ResourceEventHandler functions) should be able +// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown. +// +// New in v1.27: In such cases, the contained object will already have gone +// through the transform object separately (when it was added / updated prior +// to the delete), so the TransformFunc can likely safely ignore such objects +// (i.e., just return the input object). +// +// The most common usage pattern is to clean-up some parts of the object to +// reduce component memory usage if a given component doesn't care about them. +// +// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc +// sees the object before any other actor, and it is now safe to mutate the +// object in place instead of making a copy. +// +// Note that TransformFunc is called while inserting objects into the +// notification queue and is therefore extremely performance sensitive; please +// do not do anything that will take a long time. +type TransformFunc func(interface{}) (interface{}, error) + // DeltaType is the type of a change (addition, deletion, etc) type DeltaType string @@ -227,6 +255,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { knownObjects: opts.KnownObjects, emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, + transformer: opts.Transformer, } f.cond.L = &f.lock return f @@ -411,6 +440,21 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err if err != nil { return KeyError{obj, err} } + + // Every object comes through this code path once, so this is a good + // place to call the transform func. If obj is a + // DeletedFinalStateUnknown tombstone, then the containted inner object + // will already have gone through the transformer, but we document that + // this can happen. In cases involving Replace(), such an object can + // come through multiple times. + if f.transformer != nil { + var err error + obj, err = f.transformer(obj) + if err != nil { + return err + } + } + oldDeltas := f.items[id] newDeltas := append(oldDeltas, Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) @@ -566,12 +610,11 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { // using the Sync or Replace DeltaType and then (2) it does some deletions. // In particular: for every pre-existing key K that is not the key of // an object in `list` there is the effect of -// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object -// of K. If `f.knownObjects == nil` then the pre-existing keys are -// those in `f.items` and the current object of K is the `.Newest()` -// of the Deltas associated with K. Otherwise the pre-existing keys -// are those listed by `f.knownObjects` and the current object of K is -// what `f.knownObjects.GetByKey(K)` returns. +// `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known +// object of K. The pre-existing keys are those in the union set of the keys in +// `f.items` and `f.knownObjects` (if not nil). The last known object for key K is +// the one present in the last delta in `f.items`. If there is no delta for K +// in `f.items`, it is the object in `f.knownObjects` func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { f.lock.Lock() defer f.lock.Unlock() @@ -595,51 +638,23 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { } } - if f.knownObjects == nil { - // Do deletion detection against our own list. - queuedDeletions := 0 - for k, oldItem := range f.items { - if keys.Has(k) { - continue - } - // Delete pre-existing items not in the new list. - // This could happen if watch deletion event was missed while - // disconnected from apiserver. - var deletedObj interface{} - if n := oldItem.Newest(); n != nil { - deletedObj = n.Object - } - queuedDeletions++ - if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { - return err - } - } - - if !f.populated { - f.populated = true - // While there shouldn't be any queued deletions in the initial - // population of the queue, it's better to be on the safe side. - f.initialPopulationCount = keys.Len() + queuedDeletions - } - - return nil - } - - // Detect deletions not already in the queue. - knownKeys := f.knownObjects.ListKeys() + // Do deletion detection against objects in the queue queuedDeletions := 0 - for _, k := range knownKeys { + for k, oldItem := range f.items { if keys.Has(k) { continue } - - deletedObj, exists, err := f.knownObjects.GetByKey(k) - if err != nil { - deletedObj = nil - klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) - } else if !exists { - deletedObj = nil - klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) + // Delete pre-existing items not in the new list. + // This could happen if watch deletion event was missed while + // disconnected from apiserver. + var deletedObj interface{} + if n := oldItem.Newest(); n != nil { + deletedObj = n.Object + + // if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object + if d, ok := deletedObj.(DeletedFinalStateUnknown); ok { + deletedObj = d.Obj + } } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { @@ -647,6 +662,32 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { } } + if f.knownObjects != nil { + // Detect deletions for objects not present in the queue, but present in KnownObjects + knownKeys := f.knownObjects.ListKeys() + for _, k := range knownKeys { + if keys.Has(k) { + continue + } + if len(f.items[k]) > 0 { + continue + } + + deletedObj, exists, err := f.knownObjects.GetByKey(k) + if err != nil { + deletedObj = nil + klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) + } else if !exists { + deletedObj = nil + klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) + } + queuedDeletions++ + if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { + return err + } + } + } + if !f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go index f17240da5c86..a7054e69cfee 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go @@ -121,6 +121,130 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) { } } +func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) { + obj := mkFifoObj("foo", 2) + objV2 := mkFifoObj("foo", 3) + table := []struct { + name string + operations func(f *DeltaFIFO) + expectedDeltas Deltas + }{ + { + name: "Added object should be deleted on Replace", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Replaced object should have only a single Delete", + operations: func(f *DeltaFIFO) { + f.emitDeltaTypeReplaced = true + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Replaced, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Deleted object should have only a single Delete", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Delete(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + }, + }, + { + name: "Synced objects should have a single delete", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Sync, obj}, + {Sync, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added objects should have a single delete on multiple Replaces", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + f.Replace([]interface{}{}, "1") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added and deleted and added object should be deleted", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Delete(obj) + f.Add(objV2) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + {Added, objV2}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}}, + }, + }, + } + for _, tt := range table { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + // Test with a DeltaFIFO with a backing KnownObjects + fWithKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{} + }), + }) + tt.operations(fWithKnownObjects) + actualDeltasWithKnownObjects := Pop(fWithKnownObjects) + if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithKnownObjects) { + t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithKnownObjects) + } + if len(fWithKnownObjects.items) != 0 { + t.Errorf("expected no extra deltas (empty map), got %#v", fWithKnownObjects.items) + } + + // Test with a DeltaFIFO without a backing KnownObjects + fWithoutKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + }) + tt.operations(fWithoutKnownObjects) + actualDeltasWithoutKnownObjects := Pop(fWithoutKnownObjects) + if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithoutKnownObjects) { + t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithoutKnownObjects) + } + if len(fWithoutKnownObjects.items) != 0 { + t.Errorf("expected no extra deltas (empty map), got %#v", fWithoutKnownObjects.items) + } + }) + } +} + func TestDeltaFIFO_requeueOnPop(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) @@ -203,6 +327,88 @@ func TestDeltaFIFO_addUpdate(t *testing.T) { } } +type rvAndXfrm struct { + rv int + xfrm int +} + +func TestDeltaFIFO_transformer(t *testing.T) { + mk := func(name string, rv int) testFifoObject { + return mkFifoObj(name, &rvAndXfrm{rv, 0}) + } + xfrm := TransformFunc(func(obj interface{}) (interface{}, error) { + switch v := obj.(type) { + case testFifoObject: + v.val.(*rvAndXfrm).xfrm++ + case DeletedFinalStateUnknown: + if x := v.Obj.(testFifoObject).val.(*rvAndXfrm).xfrm; x != 1 { + return nil, fmt.Errorf("object has been transformed wrong number of times: %#v", obj) + } + default: + return nil, fmt.Errorf("unexpected object: %#v", obj) + } + return obj, nil + }) + + must := func(err error) { + if err != nil { + t.Fatal(err) + } + } + + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + Transformer: xfrm, + }) + must(f.Add(mk("foo", 10))) + must(f.Add(mk("bar", 11))) + must(f.Update(mk("foo", 12))) + must(f.Delete(mk("foo", 15))) + must(f.Replace([]interface{}{}, "")) + must(f.Add(mk("bar", 16))) + must(f.Replace([]interface{}{}, "")) + + // Should be empty + if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %+v, got %+v", e, a) + } + + for i := 0; i < 2; i++ { + obj, err := f.Pop(func(o interface{}) error { return nil }) + if err != nil { + t.Fatalf("got nothing on try %v?", i) + } + obj = obj.(Deltas).Newest().Object + switch v := obj.(type) { + case testFifoObject: + if v.name != "foo" { + t.Errorf("expected regular deletion of foo, got %q", v.name) + } + rx := v.val.(*rvAndXfrm) + if rx.rv != 15 { + t.Errorf("expected last message, got %#v", obj) + } + if rx.xfrm != 1 { + t.Errorf("obj %v transformed wrong number of times.", obj) + } + case DeletedFinalStateUnknown: + tf := v.Obj.(testFifoObject) + rx := tf.val.(*rvAndXfrm) + if tf.name != "bar" { + t.Errorf("expected tombstone deletion of bar, got %q", tf.name) + } + if rx.rv != 16 { + t.Errorf("expected last message, got %#v", obj) + } + if rx.xfrm != 1 { + t.Errorf("tombstoned obj %v transformed wrong number of times.", obj) + } + default: + t.Errorf("unknown item %#v", obj) + } + } +} + func TestDeltaFIFO_enqueueingNoLister(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) f.Add(mkFifoObj("foo", 10)) @@ -371,7 +577,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { expectedList = []Deltas{ {{Added, mkFifoObj("baz", 10)}, - {Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + {Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}}, {{Sync, mkFifoObj("foo", 5)}}, // Since "bar" didn't have a delete event and wasn't in the Replace list // it should get a tombstone key with the right Obj. @@ -385,6 +591,67 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } } + // Now try deleting and recreating the object in the queue, then delete it by a Replace call + f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + }) + f.Delete(mkFifoObj("bar", 6)) + f.Add(mkFifoObj("bar", 100)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList = []Deltas{ + { + {Deleted, mkFifoObj("bar", 6)}, + {Added, mkFifoObj("bar", 100)}, + // Since "bar" has a newer object in the queue than in the state, + // it should get a tombstone key with the latest object from the queue + {Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}}, + }, + {{Sync, mkFifoObj("foo", 5)}}, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + + // Now try syncing it first to ensure the delete use the latest version + f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + }) + f.Replace([]interface{}{mkFifoObj("bar", 100), mkFifoObj("foo", 5)}, "0") + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList = []Deltas{ + { + {Sync, mkFifoObj("bar", 100)}, + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}}, + }, + { + {Sync, mkFifoObj("foo", 5)}, + {Sync, mkFifoObj("foo", 5)}, + }, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + // Now try starting without an explicit KeyListerGetter f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) f.Add(mkFifoObj("baz", 10)) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 9f42782d1792..35ebd396cc55 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -190,10 +190,7 @@ type SharedInformer interface { // // Must be set before starting the informer. // - // Note: Since the object given to the handler may be already shared with - // other goroutines, it is advisable to copy the object being - // transform before mutating it at all and returning the copy to prevent - // data races. + // Please see the comment on TransformFunc for more details. SetTransform(handler TransformFunc) error } @@ -404,6 +401,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: s.indexer, EmitDeltaTypeReplaced: true, + Transformer: s.transform, }) cfg := &Config{ @@ -568,7 +566,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { defer s.blockDeltas.Unlock() if deltas, ok := obj.(Deltas); ok { - return processDeltas(s, s.indexer, s.transform, deltas) + return processDeltas(s, s.indexer, deltas) } return errors.New("object given as Process argument is not Deltas") } diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 3f2ac71bfe4c..0712c9313bc1 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -371,9 +371,8 @@ func TestSharedInformerTransformer(t *testing.T) { name := pod.GetName() if upper := strings.ToUpper(name); upper != name { - copied := pod.DeepCopyObject().(*v1.Pod) - copied.SetName(upper) - return copied, nil + pod.SetName(upper) + return pod, nil } } return obj, nil