Skip to content

Don't use CachingObject if the number of watchers is small #84043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,37 @@ func (c *Cacher) dispatchEvents() {
}
}

func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) {
switch event.Type {
case watch.Added, watch.Modified:
if object, err := newCachingObject(event.Object); err == nil {
event.Object = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
}
// Don't wrap PrevObject for update event (for create events it is nil).
// We only encode those to deliver DELETE watch events, so if
// event.Object is not nil it can be used only for watchers for which
// selector was satisfied for its previous version and is no longer
// satisfied for the current version.
// This is rare enough that it doesn't justify making deep-copy of the
// object (done by newCachingObject) every time.
case watch.Deleted:
// Don't wrap Object for delete events - these are not to deliver any
// events. Only wrap PrevObject.
if object, err := newCachingObject(event.PrevObject); err == nil {
// Update resource version of the underlying object.
// event.PrevObject is used to deliver DELETE watch events and
// for them, we set resourceVersion to <current> instead of
// the resourceVersion of the last modification of the object.
updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion)
event.PrevObject = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
}
}
}

func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
c.startDispatching(event)
defer c.finishDispatching()
Expand All @@ -843,6 +874,23 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
watcher.nonblockingAdd(event)
}
} else {
// Set up caching of object serializations only for dispatching this event.
//
// Storing serializations in memory would result in increased memory usage,
// but it would help for caching encodings for watches started from old
// versions. However, we still don't have a convincing data that the gain
// from it justifies increased memory usage, so for now we drop the cached
// serializations after dispatching this event.
//
// Given the deep-copies that are done to create cachingObjects,
// we try to cache serializations only if there are at least 3 watchers.
if len(c.watchersBuffer) >= 3 {
// Make a shallow copy to allow overwriting Object and PrevObject.
wcEvent := *event
setCachingObjects(&wcEvent, c.versioner)
event = &wcEvent
}

c.blockedWatchers = c.blockedWatchers[:0]
for _, watcher := range c.watchersBuffer {
if !watcher.nonblockingAdd(event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,3 +1012,104 @@ func TestCachingDeleteEvents(t *testing.T) {
verifyEvents(t, fooEventsWatcher, fooEvents)
verifyEvents(t, barEventsWatcher, barEvents)
}

func testCachingObjects(t *testing.T, watchersCount int) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage, 10)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()

dispatchedEvents := []*watchCacheEvent{}
cacher.watchCache.eventHandler = func(event *watchCacheEvent) {
dispatchedEvents = append(dispatchedEvents, event)
cacher.processEvent(event)
}

watchers := make([]watch.Interface, 0, watchersCount)
for i := 0; i < watchersCount; i++ {
w, err := cacher.Watch(context.TODO(), "pods/ns", "1000", storage.Everything)
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
defer w.Stop()
watchers = append(watchers, w)
}

makePod := func(name, rv string) *examplev1.Pod {
return &examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "ns",
ResourceVersion: rv,
},
}
}
pod1 := makePod("pod", "1001")
pod2 := makePod("pod", "1002")
pod3 := makePod("pod", "1003")

cacher.watchCache.Add(pod1)
cacher.watchCache.Update(pod2)
cacher.watchCache.Delete(pod3)

// At this point, we already have dispatchedEvents fully propagated.

verifyEvents := func(w watch.Interface) {
var event watch.Event
for index := range dispatchedEvents {
select {
case event = <-w.ResultChan():
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout watiching for the event")
}

var object runtime.Object
if watchersCount >= 3 {
if _, ok := event.Object.(runtime.CacheableObject); !ok {
t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object)
}
object = event.Object.(runtime.CacheableObject).GetObject()
} else {
if _, ok := event.Object.(runtime.CacheableObject); ok {
t.Fatalf("Object in %s event should not support caching: %#v", event.Type, event.Object)
}
object = event.Object.DeepCopyObject()
}

if event.Type == watch.Deleted {
resourceVersion, err := cacher.versioner.ObjectResourceVersion(cacher.watchCache.cache[index].PrevObject)
if err != nil {
t.Fatalf("Failed to parse resource version: %v", err)
}
updateResourceVersionIfNeeded(object, cacher.versioner, resourceVersion)
}

var e runtime.Object
switch event.Type {
case watch.Added, watch.Modified:
e = cacher.watchCache.cache[index].Object
case watch.Deleted:
e = cacher.watchCache.cache[index].PrevObject
default:
t.Errorf("unexpected watch event: %#v", event)
}
if a := object; !reflect.DeepEqual(a, e) {
t.Errorf("event object messed up for %s: %#v, expected: %#v", event.Type, a, e)
}
}
}

for i := range watchers {
verifyEvents(watchers[i])
}
}

func TestCachingObjects(t *testing.T) {
t.Run("single watcher", func(t *testing.T) { testCachingObjects(t, 1) })
t.Run("many watcher", func(t *testing.T) { testCachingObjects(t, 3) })
}
44 changes: 1 addition & 43 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,37 +210,6 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob
return object, resourceVersion, nil
}

func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) {
switch event.Type {
case watch.Added, watch.Modified:
if object, err := newCachingObject(event.Object); err == nil {
event.Object = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
}
// Don't wrap PrevObject for update event (for create events it is nil).
// We only encode those to deliver DELETE watch events, so if
// event.Object is not nil it can be used only for watchers for which
// selector was satisfied for its previous version and is no longer
// satisfied for the current version.
// This is rare enough that it doesn't justify making deep-copy of the
// object (done by newCachingObject) every time.
case watch.Deleted:
// Don't wrap Object for delete events - these are not to deliver any
// events. Only wrap PrevObject.
if object, err := newCachingObject(event.PrevObject); err == nil {
// Update resource version of the underlying object.
// event.PrevObject is used to deliver DELETE watch events and
// for them, we set resourceVersion to <current> instead of
// the resourceVersion of the last modification of the object.
updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion)
event.PrevObject = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
}
}
}

// processEvent is safe as long as there is at most one call to it in flight
// at any point in time.
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
Expand Down Expand Up @@ -295,18 +264,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
// This is safe as long as there is at most one call to processEvent in flight
// at any point in time.
if w.eventHandler != nil {
// Set up caching of object serializations only for dispatching this event.
//
// Storing serializations in memory would result in increased memory usage,
// but it would help for caching encodings for watches started from old
// versions. However, we still don't have a convincing data that the gain
// from it justifies increased memory usage, so for now we drop the cached
// serializations after dispatching this event.

// Make a shallow copy to allow overwriting Object and PrevObject.
wce := *wcEvent
setCachingObjects(&wce, w.versioner)
w.eventHandler(&wce)
w.eventHandler(wcEvent)
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package cacher

import (
"fmt"
"reflect"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -436,53 +435,3 @@ func TestReflectorForWatchCache(t *testing.T) {
}
}
}

func TestCachingObjects(t *testing.T) {
store := newTestWatchCache(5)

index := 0
store.eventHandler = func(event *watchCacheEvent) {
switch event.Type {
case watch.Added, watch.Modified:
if _, ok := event.Object.(runtime.CacheableObject); !ok {
t.Fatalf("Object in %s event should support caching: %#v", event.Type, event.Object)
}
if _, ok := event.PrevObject.(runtime.CacheableObject); ok {
t.Fatalf("PrevObject in %s event should not support caching: %#v", event.Type, event.Object)
}
case watch.Deleted:
if _, ok := event.Object.(runtime.CacheableObject); ok {
t.Fatalf("Object in %s event should not support caching: %#v", event.Type, event.Object)
}
if _, ok := event.PrevObject.(runtime.CacheableObject); !ok {
t.Fatalf("PrevObject in %s event should support caching: %#v", event.Type, event.Object)
}
}

// Verify that delivered event is the same as cached one modulo Object/PrevObject.
switch event.Type {
case watch.Added, watch.Modified:
event.Object = event.Object.(runtime.CacheableObject).GetObject()
case watch.Deleted:
event.PrevObject = event.PrevObject.(runtime.CacheableObject).GetObject()
// In events store in watchcache, we also don't update ResourceVersion.
// So we need to ensure that we don't fail on it.
resourceVersion, err := store.versioner.ObjectResourceVersion(store.cache[index].PrevObject)
if err != nil {
t.Fatalf("Failed to parse resource version: %v", err)
}
updateResourceVersionIfNeeded(event.PrevObject, store.versioner, resourceVersion)
}
if a, e := event, store.cache[index]; !reflect.DeepEqual(a, e) {
t.Errorf("watchCacheEvent messed up: %#v, expected: %#v", a, e)
}
index++
}

pod1 := makeTestPod("pod", 1)
pod2 := makeTestPod("pod", 2)
pod3 := makeTestPod("pod", 3)
store.Add(pod1)
store.Update(pod2)
store.Delete(pod3)
}