Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration point to SharedInformer to transform objects before storing #107507

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 47 additions & 31 deletions staging/src/k8s.io/client-go/tools/cache/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cache

import (
"errors"
"sync"
"time"

Expand Down Expand Up @@ -406,6 +407,49 @@ func NewTransformingIndexerInformer(
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer)
}

// Multiplexes updates in the form of a list of Deltas into a Store, and informs
// a given handler of events OnUpdate, OnAdd, OnDelete
func processDeltas(
// Object which receives event notifications from the given deltas
handler ResourceEventHandler,
clientState Store,
transformer TransformFunc,
deltas Deltas,
) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}

switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
handler.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
handler.OnDelete(obj)
}
}
return nil
}

// newInformer returns a controller for populating the store while also
// providing event notifications.
//
Expand Down Expand Up @@ -444,38 +488,10 @@ func newInformer(
RetryOnError: false,

Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}

switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
h.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
h.OnAdd(obj)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
h.OnDelete(obj)
}
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, transformer, deltas)
}
return nil
return errors.New("object given as Process argument is not Deltas")
},
}
return New(cfg)
Expand Down
103 changes: 67 additions & 36 deletions staging/src/k8s.io/client-go/tools/cache/shared_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cache

import (
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -180,6 +181,20 @@ type SharedInformer interface {
// The handler should return quickly - any expensive processing should be
// offloaded.
SetWatchErrorHandler(handler WatchErrorHandler) error

// The TransformFunc is called for each object which is about to be stored.
//
// This function is intended for you to take the opportunity to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to list the limitations of this. Does the returned type have to be the same type as the input?

From what I can tell it does not but must implement runtime.Object and metav1.ObjectMetaAccessor or equivalent interfaces

// remove, transform, or normalize fields. One use case is to strip unused
// metadata fields out of objects to save on RAM cost.
//
// Must be set before starting the informer.
//
// Note: Since the object given to the handler may be already shared with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this happen? Seems like a sharedIndexInformer can only run once, and ProcessDeltas is called in controller.processLoop which is not concurrent.

DeepCopy may be expensive so given one of the primary use cases for this is performance it seems fairly important to not do extra work when not needed?

Copy link
Contributor Author

@alexzielenski alexzielenski Feb 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a relist of existing data occurs it is possible for handledeltas to be called multiple times for the same object. The first time the transformation is applied it the object instance will not have been shared. Assuming you have a way of marking an already- transformed object in order to skip repeated transformation applications then a copy should not be necessary.

// other goroutines, it is advisable to copy the object being
// transform before mutating it at all and returning the copy to prevent
// data races.
SetTransform(handler TransformFunc) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit concerned that if an error occurs there is, as far as I can tell, no logging at all. This bubbles up to processLoop, which discards the error

}

// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
Expand Down Expand Up @@ -318,6 +333,8 @@ type sharedIndexInformer struct {

// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler

transform TransformFunc
}

// dummyController hides the fact that a SharedInformer is different from a dedicated one
Expand Down Expand Up @@ -365,6 +382,18 @@ func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) er
return nil
}

func (s *sharedIndexInformer) SetTransform(handler TransformFunc) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()

if s.started {
return fmt.Errorf("informer has already started")
}

s.transform = handler
return nil
}

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

Expand Down Expand Up @@ -538,45 +567,47 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, s.transform, deltas)
}
return errors.New("object given as Process argument is not Deltas")
}

isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnAdd(obj interface{}) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.cacheMutationDetector.AddObject(obj)
s.processor.distribute(addNotification{newObj: obj}, false)
}

// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
isSync := false

// If is a Sync event, isSync should be true
// If is a Replaced event, isSync is true if resource version is unchanged.
// If RV is unchanged: this is a Sync/Replaced event, so isSync is true

alexzielenski marked this conversation as resolved.
Show resolved Hide resolved
if accessor, err := meta.Accessor(new); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
return nil

// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.cacheMutationDetector.AddObject(new)
s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
}

// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnDelete(old interface{}) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.processor.distribute(deleteNotification{oldObj: old}, false)
}

// sharedProcessor has a collection of processorListener and can
Expand Down
35 changes: 34 additions & 1 deletion staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"testing"
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -357,3 +357,36 @@ func TestSharedInformerErrorHandling(t *testing.T) {
}
close(stop)
}

func TestSharedInformerTransformer(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()

source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})

informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
informer.SetTransform(func(obj interface{}) (interface{}, error) {
if pod, ok := obj.(*v1.Pod); ok {
name := pod.GetName()

if upper := strings.ToUpper(name); upper != name {
copied := pod.DeepCopyObject().(*v1.Pod)
copied.SetName(upper)
return copied, nil
}
}
return obj, nil
})

listenerTransformer := newTestListener("listenerTransformer", 0, "POD1", "POD2")
informer.AddEventHandler(listenerTransformer)

stop := make(chan struct{})
go informer.Run(stop)
defer close(stop)

if !listenerTransformer.ok() {
t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames)
}
}