Skip to content

Commit

Permalink
Fix race in informer transformers
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-t committed Apr 17, 2024
1 parent d831b13 commit 7bf49d5
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 20 deletions.
109 changes: 109 additions & 0 deletions staging/src/k8s.io/client-go/tools/cache/controller_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -575,6 +576,114 @@ func TestTransformingInformer(t *testing.T) {
close(stopCh)
}

func TestTransformingInformerRace(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()

label := "to-be-transformed"
makePod := func(name string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "namespace",
Labels: map[string]string{label: "true"},
},
Spec: v1.PodSpec{
Hostname: "hostname",
},
}
}

badTransform := atomic.Bool{}
podTransformer := func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object type: %T", obj)
}
if pod.ObjectMeta.Labels[label] != "true" {
badTransform.Store(true)
return nil, fmt.Errorf("object already transformed: %#v", obj)
}
pod.ObjectMeta.Labels[label] = "false"
return pod, nil
}

numObjs := 5
for i := 0; i < numObjs; i++ {
source.Add(makePod(fmt.Sprintf("pod-%d", i)))
}

type event struct{}
events := make(chan event, numObjs)
recordEvent := func(eventType watch.EventType, previous, current interface{}) {
events <- event{}
}
checkEvents := func(count int) {
for i := 0; i < count; i++ {
<-events
}
}
store, controller := NewTransformingInformer(
source,
&v1.Pod{},
5*time.Millisecond,
ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, isInInitialList bool) { recordEvent(watch.Added, nil, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) },
DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) },
},
podTransformer,
)

stopCh := make(chan struct{})
go controller.Run(stopCh)

checkEvents(numObjs)

// Periodically fetch objects to ensure no access races.
wg := sync.WaitGroup{}
errors := make(chan error, numObjs)
for i := 0; i < numObjs; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
key := fmt.Sprintf("namespace/pod-%d", index)
for {
select {
case <-stopCh:
return
default:
}

obj, ok, err := store.GetByKey(key)
if !ok || err != nil {
errors <- fmt.Errorf("couldn't get the object for %v", key)
return
}
pod := obj.(*v1.Pod)
if pod.ObjectMeta.Labels[label] != "false" {
errors <- fmt.Errorf("unexpected object: %#v", pod)
return
}
}
}(i)
}

// Let resyncs to happen for some time.
time.Sleep(time.Second)

close(stopCh)
wg.Wait()
close(errors)
for err := range errors {
t.Error(err)
}

if badTransform.Load() {
t.Errorf("unexpected transformation happened")
}
}

func TestDeletionHandlingObjectToName(t *testing.T) {
cm := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Expand Down
55 changes: 35 additions & 20 deletions staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
Expand Up @@ -139,20 +139,13 @@ type DeltaFIFO struct {
}

// TransformFunc allows for transforming an object before it will be processed.
// TransformFunc (similarly to ResourceEventHandler functions) should be able
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
//
// New in v1.27: In such cases, the contained object will already have gone
// through the transform object separately (when it was added / updated prior
// to the delete), so the TransformFunc can likely safely ignore such objects
// (i.e., just return the input object).
// TransformFunc (similarly to ResourceEventHandler functions).
//
// The most common usage pattern is to clean-up some parts of the object to
// reduce component memory usage if a given component doesn't care about them.
//
// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
// sees the object before any other actor, and it is now safe to mutate the
// object in place instead of making a copy.
// New in v1.27: TransformFunc sees the object before any other actor, and it
// is now safe to mutate the object in place instead of making a copy.
//
// Note that TransformFunc is called while inserting objects into the
// notification queue and is therefore extremely performance sensitive; please
Expand Down Expand Up @@ -440,22 +433,44 @@ func isDeletionDup(a, b *Delta) *Delta {
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
return f.queueReplaceActionLocked(actionType, false, obj)
}

// queueReplaceActionLocked appends to the delta list for the object.
// Called must lock first.
func (f *DeltaFIFO) queueReplaceActionLocked(actionType DeltaType, isReplace bool, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}

// Every object comes through this code path once, so this is a good
// place to call the transform func. If obj is a
// DeletedFinalStateUnknown tombstone, then the containted inner object
// will already have gone through the transformer, but we document that
// this can happen. In cases involving Replace(), such an object can
// come through multiple times.
// place to call the transform func.
// However, we need to ensure that we will not call the transformer for
// objects that have already existed in the store as it may lead to races
// because someone may be reading the object while we modify it.
// The cases involve:
// - DeletedFinalStateUnknown tombstones - the contained inned object have
// already gone through the transformer
// - actions being a result of Resync() operation
// We need a way of distinguishing the actions being a result of Resync from
// those with Replace (by default Replace uses Sync action - for that reason
// we introduced the `isReplace` parameter.
//
// TODO(wojtek-t): It is theoretically possible that the same will happen
// for Relist operations, as nothing prevents users from reusing the
// objects if they didn't change. However, in practice Relist() is always
// propagated with a result of new List/WatchList and objects are not
// shared here.
if f.transformer != nil {
var err error
obj, err = f.transformer(obj)
if err != nil {
return err
_, isTombstone := obj.(DeletedFinalStateUnknown)
isSyncAction := actionType == Sync && !isReplace
if !isTombstone && !isSyncAction {
var err error
obj, err = f.transformer(obj)
if err != nil {
return err
}
}
}

Expand Down Expand Up @@ -638,7 +653,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(action, item); err != nil {
if err := f.queueReplaceActionLocked(action, true, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
Expand Down

0 comments on commit 7bf49d5

Please sign in to comment.