diff --git a/staging/src/k8s.io/client-go/tools/cache/controller_test.go b/staging/src/k8s.io/client-go/tools/cache/controller_test.go index 4058581154fa..992c1f5a4a48 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller_test.go @@ -20,6 +20,7 @@ import ( "fmt" "math/rand" "sync" + "sync/atomic" "testing" "time" @@ -575,6 +576,114 @@ func TestTransformingInformer(t *testing.T) { close(stopCh) } +func TestTransformingInformerRace(t *testing.T) { + // source simulates an apiserver object endpoint. + source := fcache.NewFakeControllerSource() + + label := "to-be-transformed" + makePod := func(name string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "namespace", + Labels: map[string]string{label: "true"}, + }, + Spec: v1.PodSpec{ + Hostname: "hostname", + }, + } + } + + badTransform := atomic.Bool{} + podTransformer := func(obj interface{}) (interface{}, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return nil, fmt.Errorf("unexpected object type: %T", obj) + } + if pod.ObjectMeta.Labels[label] != "true" { + badTransform.Store(true) + return nil, fmt.Errorf("object already transformed: %#v", obj) + } + pod.ObjectMeta.Labels[label] = "false" + return pod, nil + } + + numObjs := 5 + for i := 0; i < numObjs; i++ { + source.Add(makePod(fmt.Sprintf("pod-%d", i))) + } + + type event struct{} + events := make(chan event, numObjs) + recordEvent := func(eventType watch.EventType, previous, current interface{}) { + events <- event{} + } + checkEvents := func(count int) { + for i := 0; i < count; i++ { + <-events + } + } + store, controller := NewTransformingInformer( + source, + &v1.Pod{}, + 5*time.Millisecond, + ResourceEventHandlerDetailedFuncs{ + AddFunc: func(obj interface{}, isInInitialList bool) { recordEvent(watch.Added, nil, obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) }, + DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) }, + }, + podTransformer, + ) + + stopCh := make(chan struct{}) + go controller.Run(stopCh) + + checkEvents(numObjs) + + // Periodically fetch objects to ensure no access races. + wg := sync.WaitGroup{} + errors := make(chan error, numObjs) + for i := 0; i < numObjs; i++ { + wg.Add(1) + go func(index int) { + defer wg.Done() + key := fmt.Sprintf("namespace/pod-%d", index) + for { + select { + case <-stopCh: + return + default: + } + + obj, ok, err := store.GetByKey(key) + if !ok || err != nil { + errors <- fmt.Errorf("couldn't get the object for %v", key) + return + } + pod := obj.(*v1.Pod) + if pod.ObjectMeta.Labels[label] != "false" { + errors <- fmt.Errorf("unexpected object: %#v", pod) + return + } + } + }(i) + } + + // Let resyncs to happen for some time. + time.Sleep(time.Second) + + close(stopCh) + wg.Wait() + close(errors) + for err := range errors { + t.Error(err) + } + + if badTransform.Load() { + t.Errorf("unexpected transformation happened") + } +} + func TestDeletionHandlingObjectToName(t *testing.T) { cm := &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index 7160bb1ee72e..ff007f982e49 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -139,20 +139,13 @@ type DeltaFIFO struct { } // TransformFunc allows for transforming an object before it will be processed. -// TransformFunc (similarly to ResourceEventHandler functions) should be able -// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown. -// -// New in v1.27: In such cases, the contained object will already have gone -// through the transform object separately (when it was added / updated prior -// to the delete), so the TransformFunc can likely safely ignore such objects -// (i.e., just return the input object). +// TransformFunc (similarly to ResourceEventHandler functions). // // The most common usage pattern is to clean-up some parts of the object to // reduce component memory usage if a given component doesn't care about them. // -// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc -// sees the object before any other actor, and it is now safe to mutate the -// object in place instead of making a copy. +// New in v1.27: TransformFunc sees the object before any other actor, and it +// is now safe to mutate the object in place instead of making a copy. // // Note that TransformFunc is called while inserting objects into the // notification queue and is therefore extremely performance sensitive; please @@ -440,22 +433,44 @@ func isDeletionDup(a, b *Delta) *Delta { // queueActionLocked appends to the delta list for the object. // Caller must lock first. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { + return f.queueReplaceActionLocked(actionType, false, obj) +} + +// queueReplaceActionLocked appends to the delta list for the object. +// Called must lock first. +func (f *DeltaFIFO) queueReplaceActionLocked(actionType DeltaType, isReplace bool, obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } // Every object comes through this code path once, so this is a good - // place to call the transform func. If obj is a - // DeletedFinalStateUnknown tombstone, then the containted inner object - // will already have gone through the transformer, but we document that - // this can happen. In cases involving Replace(), such an object can - // come through multiple times. + // place to call the transform func. + // However, we need to ensure that we will not call the transformer for + // objects that have already existed in the store as it may lead to races + // because someone may be reading the object while we modify it. + // The cases involve: + // - DeletedFinalStateUnknown tombstones - the contained inned object have + // already gone through the transformer + // - actions being a result of Resync() operation + // We need a way of distinguishing the actions being a result of Resync from + // those with Replace (by default Replace uses Sync action - for that reason + // we introduced the `isReplace` parameter. + // + // TODO(wojtek-t): It is theoretically possible that the same will happen + // for Relist operations, as nothing prevents users from reusing the + // objects if they didn't change. However, in practice Relist() is always + // propagated with a result of new List/WatchList and objects are not + // shared here. if f.transformer != nil { - var err error - obj, err = f.transformer(obj) - if err != nil { - return err + _, isTombstone := obj.(DeletedFinalStateUnknown) + isSyncAction := actionType == Sync && !isReplace + if !isTombstone && !isSyncAction { + var err error + obj, err = f.transformer(obj) + if err != nil { + return err + } } } @@ -638,7 +653,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { return KeyError{item, err} } keys.Insert(key) - if err := f.queueActionLocked(action, item); err != nil { + if err := f.queueReplaceActionLocked(action, true, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } }