Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid computing keys multiple times in Cacher #35029

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 19 additions & 25 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type) {
}
}

type filterObjectFunc func(string, runtime.Object) bool

// Cacher is responsible for serving WATCH and LIST requests for a given
// resource from its internal cache and updating its cache in the background
// based on the underlying storage contents.
Expand Down Expand Up @@ -161,9 +163,6 @@ type Cacher struct {
// Versioner is used to handle resource versions.
versioner Versioner

// keyFunc is used to get a key in the underyling storage for a given object.
keyFunc func(runtime.Object) (string, error)

// triggerFunc is used for optimizing amount of watchers that needs to process
// an incoming event.
triggerFunc TriggerPublisherFunc
Expand All @@ -183,7 +182,7 @@ type Cacher struct {
// internal cache and updating its cache in the background based on the given
// configuration.
func NewCacherFromConfig(config CacherConfig) *Cacher {
watchCache := newWatchCache(config.CacheCapacity)
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)

// Give this error when it is constructed rather than when you get the
Expand All @@ -201,7 +200,6 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
versioner: config.Versioner,
keyFunc: config.KeyFunc,
triggerFunc: config.TriggerPublisherFunc,
watcherIdx: 0,
watchers: indexedWatchers{
Expand Down Expand Up @@ -328,7 +326,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
c.Lock()
defer c.Unlock()
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, c.keyFunc, pred), forget)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, pred), forget)

c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++
Expand Down Expand Up @@ -382,20 +380,20 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
if err != nil || listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterFunction(key, c.keyFunc, pred)
filter := filterFunction(key, pred)

objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
if err != nil {
return fmt.Errorf("failed to wait for fresh list: %v", err)
}
trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
for _, obj := range objs {
object, ok := obj.(runtime.Object)
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non runtime.Object returned from storage: %v", obj)
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
if filter(object) {
listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
if filter(elem.Key, elem.Object) {
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
}
}
trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
Expand Down Expand Up @@ -524,14 +522,9 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
}
}

func filterFunction(key string, keyFunc func(runtime.Object) (string, error), p SelectionPredicate) FilterFunc {
func filterFunction(key string, p SelectionPredicate) filterObjectFunc {
f := SimpleFilter(p)
filterFunc := func(obj runtime.Object) bool {
objKey, err := keyFunc(obj)
if err != nil {
glog.Errorf("invalid object for filter. Obj: %v. Err: %v", obj, err)
return false
}
filterFunc := func(objKey string, obj runtime.Object) bool {
if !hasPathPrefix(objKey, key) {
return false
}
Expand Down Expand Up @@ -626,12 +619,12 @@ type cacheWatcher struct {
sync.Mutex
input chan watchCacheEvent
result chan watch.Event
filter FilterFunc
filter filterObjectFunc
stopped bool
forget func(bool)
}

func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter filterObjectFunc, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
Expand Down Expand Up @@ -707,11 +700,12 @@ func (c *cacheWatcher) add(event *watchCacheEvent) {
}
}

func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
curObjPasses := event.Type != watch.Deleted && c.filter(event.Object)
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.Object)
oldObjPasses := false
if event.PrevObject != nil {
oldObjPasses = c.filter(event.PrevObject)
oldObjPasses = c.filter(event.Key, event.PrevObject)
}
if !curObjPasses && !oldObjPasses {
// Watcher is not interested in that object.
Expand Down Expand Up @@ -752,7 +746,7 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin
const initProcessThreshold = 500 * time.Millisecond
startTime := time.Now()
for _, event := range initEvents {
c.sendWatchCacheEvent(event)
c.sendWatchCacheEvent(&event)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this depend on all watchers treating the incoming event as immutable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes (and this is true assumption today). I added explicit comment for that method.

}
processingTime := time.Since(startTime)
if processingTime > initProcessThreshold {
Expand All @@ -772,7 +766,7 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin
}
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(event)
c.sendWatchCacheEvent(&event)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question here, what happens if a watcher mutates an event (or object in an event)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above - added comment to "sendWatchCacheEvent" method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, just realized sendWatchCacheEvent copies the object and constructs a watch.Event per watcher, so that's fine

}
}
}
Expand Down
106 changes: 90 additions & 16 deletions pkg/storage/watch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,27 @@ type watchCacheEvent struct {
Type watch.EventType
Object runtime.Object
PrevObject runtime.Object
Key string
ResourceVersion uint64
}

// Computing a key of an object is generally non-trivial (it performs
// e.g. validation underneath). To avoid computing it multiple times
// (to serve the event in different List/Watch requests), in the
// underlying store we are keeping pair (key, object).
type storeElement struct {
Key string
Object runtime.Object
}

func storeElementKey(obj interface{}) (string, error) {
elem, ok := obj.(*storeElement)
if !ok {
return "", fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Key, nil
}

// watchCacheElement is a single "watch event" stored in a cache.
// It contains the resource version of the object and the object
// itself.
Expand All @@ -72,6 +90,9 @@ type watchCache struct {
// Maximum size of history window.
capacity int

// keyFunc is used to get a key in the underlying storage for a given object.
keyFunc func(runtime.Object) (string, error)

// cache is used a cyclic buffer - its first element (with the smallest
// resourceVersion) is defined by startIndex, its last element is defined
// by endIndex (if cache is full it will be startIndex + capacity).
Expand Down Expand Up @@ -100,50 +121,54 @@ type watchCache struct {
clock clock.Clock
}

func newWatchCache(capacity int) *watchCache {
func newWatchCache(capacity int, keyFunc func(runtime.Object) (string, error)) *watchCache {
wc := &watchCache{
capacity: capacity,
keyFunc: keyFunc,
cache: make([]watchCacheElement, capacity),
startIndex: 0,
endIndex: 0,
store: cache.NewStore(cache.MetaNamespaceKeyFunc),
store: cache.NewStore(storeElementKey),
resourceVersion: 0,
clock: clock.RealClock{},
}
wc.cond = sync.NewCond(wc.RLocker())
return wc
}

// Add takes runtime.Object as an argument.
func (w *watchCache) Add(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
event := watch.Event{Type: watch.Added, Object: object}

f := func(obj runtime.Object) error { return w.store.Add(obj) }
f := func(elem *storeElement) error { return w.store.Add(elem) }
return w.processEvent(event, resourceVersion, f)
}

// Update takes runtime.Object as an argument.
func (w *watchCache) Update(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
event := watch.Event{Type: watch.Modified, Object: object}

f := func(obj runtime.Object) error { return w.store.Update(obj) }
f := func(elem *storeElement) error { return w.store.Update(elem) }
return w.processEvent(event, resourceVersion, f)
}

// Delete takes runtime.Object as an argument.
func (w *watchCache) Delete(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
event := watch.Event{Type: watch.Deleted, Object: object}

f := func(obj runtime.Object) error { return w.store.Delete(obj) }
f := func(elem *storeElement) error { return w.store.Delete(elem) }
return w.processEvent(event, resourceVersion, f)
}

Expand All @@ -170,43 +195,57 @@ func parseResourceVersion(resourceVersion string) (uint64, error) {
return strconv.ParseUint(resourceVersion, 10, 64)
}

func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error {
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
key, err := w.keyFunc(event.Object)
if err != nil {
return fmt.Errorf("couldn't compute key: %v", err)
}
elem := &storeElement{Key: key, Object: event.Object}

w.Lock()
defer w.Unlock()
previous, exists, err := w.store.Get(event.Object)
previous, exists, err := w.store.Get(elem)
if err != nil {
return err
}
var prevObject runtime.Object
if exists {
prevObject = previous.(runtime.Object)
prevObject = previous.(*storeElement).Object
}
watchCacheEvent := watchCacheEvent{
Type: event.Type,
Object: event.Object,
PrevObject: prevObject,
Key: key,
ResourceVersion: resourceVersion,
}
watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion}
if w.onEvent != nil {
w.onEvent(watchCacheEvent)
}
w.updateCache(resourceVersion, watchCacheEvent)
w.updateCache(resourceVersion, &watchCacheEvent)
w.resourceVersion = resourceVersion
w.cond.Broadcast()
return updateFunc(event.Object)
return updateFunc(elem)
}

// Assumes that lock is already held for write.
func (w *watchCache) updateCache(resourceVersion uint64, event watchCacheEvent) {
func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) {
if w.endIndex == w.startIndex+w.capacity {
// Cache is full - remove the oldest element.
w.startIndex++
}
w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, *event}
w.endIndex++
}

// List returns list of pointers to <storeElement> objects.
func (w *watchCache) List() []interface{} {
w.RLock()
defer w.RUnlock()
return w.store.List()
}

// WaitUntilFreshAndList returns list of pointers to <storeElement> objects.
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) {
startTime := w.clock.Now()
go func() {
Expand Down Expand Up @@ -244,30 +283,56 @@ func (w *watchCache) ListKeys() []string {
return w.store.ListKeys()
}

// Get takes runtime.Object as a parameter. However, it returns
// pointer to <storeElement>.
func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
object, ok := obj.(runtime.Object)
if !ok {
return nil, false, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
}
key, err := w.keyFunc(object)
if err != nil {
return nil, false, fmt.Errorf("couldn't compute key: %v", err)
}

w.RLock()
defer w.RUnlock()
return w.store.Get(obj)
return w.store.Get(&storeElement{Key: key, Object: object})
}

// GetByKey returns pointer to <storeElement>.
func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
w.RLock()
defer w.RUnlock()
return w.store.GetByKey(key)
}

// Replace takes slice of runtime.Object as a paramater.
func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
version, err := parseResourceVersion(resourceVersion)
if err != nil {
return err
}

toReplace := make([]interface{}, 0, len(objs))
for _, obj := range objs {
object, ok := obj.(runtime.Object)
if !ok {
return fmt.Errorf("didn't get runtime.Object for replace: %#v", obj)
}
key, err := w.keyFunc(object)
if err != nil {
return fmt.Errorf("couldn't compute key: %v", err)
}
toReplace = append(toReplace, &storeElement{Key: key, Object: object})
}

w.Lock()
defer w.Unlock()

w.startIndex = 0
w.endIndex = 0
if err := w.store.Replace(objs, resourceVersion); err != nil {
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
return err
}
w.resourceVersion = version
Expand Down Expand Up @@ -306,7 +371,16 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa
allItems := w.store.List()
result := make([]watchCacheEvent, len(allItems))
for i, item := range allItems {
result[i] = watchCacheEvent{Type: watch.Added, Object: item.(runtime.Object)}
elem, ok := item.(*storeElement)
if !ok {
return nil, fmt.Errorf("not a storeElement: %v", elem)
}
result[i] = watchCacheEvent{
Type: watch.Added,
Object: elem.Object,
Key: elem.Key,
ResourceVersion: w.resourceVersion,
}
}
return result, nil
}
Expand Down