Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce contention in watchcache by not calling event handler under lock #76702

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 21 additions & 20 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,30 +291,18 @@ type Cacher struct {
// given configuration.
func NewCacherFromConfig(config Config) *Cacher {
stopCh := make(chan struct{})

watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner)
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix

obj := config.NewFunc()
// Give this error when it is constructed rather than when you get the
// first watch item, because it's much easier to track down that way.
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
panic("storage codec doesn't seem to match given type: " + err.Error())
}

reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
reflector.WatchListPageSize = storageWatchListPageSize

clock := clock.RealClock{}
cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
objectType: reflect.TypeOf(obj),
watchCache: watchCache,
reflector: reflector,
versioner: config.Versioner,
newFunc: config.NewFunc,
triggerFunc: config.TriggerPublisherFunc,
Expand All @@ -337,7 +325,27 @@ func NewCacherFromConfig(config Config) *Cacher {
bookmarkWatchers: newTimeBucketWatchers(clock),
watchBookmarkEnabled: utilfeature.DefaultFeatureGate.Enabled(features.WatchBookmark),
}
watchCache.SetOnEvent(cacher.processEvent)

// Ensure that timer is stopped.
if !cacher.timer.Stop() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we need keep these code ? Have any reason to keep?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - we need this code, because we create above time.NewTimer(0).
So in order to not have any element in the .C channel, we need to consume it here.

// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<-cacher.timer.C
}

watchCache := newWatchCache(
config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner)
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix

reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
reflector.WatchListPageSize = storageWatchListPageSize

cacher.watchCache = watchCache
cacher.reflector = reflector

go cacher.dispatchEvents()

cacher.stopWg.Add(1)
Expand All @@ -352,13 +360,6 @@ func NewCacherFromConfig(config Config) *Cacher {
)
}()

// Ensure that timer is stopped.
if !cacher.timer.Stop() {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<-cacher.timer.C
}

return cacher
}

Expand Down
63 changes: 36 additions & 27 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type watchCache struct {

// This handler is run at the end of every Add/Update/Delete method
// and additionally gets the previous value of the object.
onEvent func(*watchCacheEvent)
eventHandler func(*watchCacheEvent)

// for testing timeouts.
clock clock.Clock
Expand All @@ -137,6 +137,7 @@ type watchCache struct {
func newWatchCache(
capacity int,
keyFunc func(runtime.Object) (string, error),
eventHandler func(*watchCacheEvent),
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
versioner storage.Versioner) *watchCache {
wc := &watchCache{
Expand All @@ -149,6 +150,7 @@ func newWatchCache(
store: cache.NewStore(storeElementKey),
resourceVersion: 0,
listResourceVersion: 0,
eventHandler: eventHandler,
clock: clock.RealClock{},
versioner: versioner,
}
Expand Down Expand Up @@ -204,6 +206,8 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob
return object, resourceVersion, nil
}

// processEvent is safe as long as there is at most one call to it in flight
// at any point in time.
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
key, err := w.keyFunc(event.Object)
if err != nil {
Expand All @@ -224,30 +228,41 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
ResourceVersion: resourceVersion,
}

// TODO: We should consider moving this lock below after the watchCacheEvent
// is created. In such situation, the only problematic scenario is Replace(
// happening after getting object from store and before acquiring a lock.
// Maybe introduce another lock for this purpose.
w.Lock()
defer w.Unlock()
previous, exists, err := w.store.Get(elem)
if err != nil {
if err := func() error {
// TODO: We should consider moving this lock below after the watchCacheEvent
// is created. In such situation, the only problematic scenario is Replace(
// happening after getting object from store and before acquiring a lock.
// Maybe introduce another lock for this purpose.
w.Lock()
defer w.Unlock()

previous, exists, err := w.store.Get(elem)
if err != nil {
return err
}
if exists {
previousElem := previous.(*storeElement)
watchCacheEvent.PrevObject = previousElem.Object
watchCacheEvent.PrevObjLabels = previousElem.Labels
watchCacheEvent.PrevObjFields = previousElem.Fields
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we acquire the lock after watchCacheEvent is constructed (after PrevObjFields is assigned) ?
We can check whether the previousElem is the same as (current) object from store. If they are not same, we construct watchCacheEvent again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what I meant:
https://pastebin.com/G2EM5B6v

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't understand how that helps - you still do Get() under lock, so this doesn't buy as almost anything for lock contention, and it add additional load.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can call Get() under read lock.
But w.updateCache() should be called under write lock (by which time the entry may have changed).

Let me think more on this.

}

w.updateCache(watchCacheEvent)
w.resourceVersion = resourceVersion
defer w.cond.Broadcast()

return updateFunc(elem)
}(); err != nil {
return err
}
if exists {
previousElem := previous.(*storeElement)
watchCacheEvent.PrevObject = previousElem.Object
watchCacheEvent.PrevObjLabels = previousElem.Labels
watchCacheEvent.PrevObjFields = previousElem.Fields
}
w.updateCache(watchCacheEvent)
w.resourceVersion = resourceVersion

if w.onEvent != nil {
w.onEvent(watchCacheEvent)
// Avoid calling event handler under lock.
// This is safe as long as there is at most one call to processEvent in flight
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include this info in the godoc for the function? It's an important part of this function's contract.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use w.Lock() to protect w.cache and w.onEvent both, which should be fixed too.

func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
	w.Lock()
	defer w.Unlock()
	w.onEvent = onEvent
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to eliminate SetOnEvent method - we don't need to allow modifying this once watchCache is created.

// at any point in time.
if w.eventHandler != nil {
w.eventHandler(watchCacheEvent)
}
w.cond.Broadcast()
return updateFunc(elem)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should updateFunc be removed from the function parameters as well, since we're no longer calling it? Any callers that might be impacted by no longer calling the provided updateFunc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return nil
}

// Assumes that lock is already held for write.
Expand Down Expand Up @@ -397,12 +412,6 @@ func (w *watchCache) SetOnReplace(onReplace func()) {
w.onReplace = onReplace
}

func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
w.Lock()
defer w.Unlock()
w.onEvent = onEvent
}

func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
size := w.endIndex - w.startIndex
var oldest uint64
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func newTestWatchCache(capacity int) *watchCache {
return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil
}
versioner := etcd.APIObjectVersioner{}
wc := newWatchCache(capacity, keyFunc, getAttrsFunc, versioner)
mockHandler := func(*watchCacheEvent) {}
wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner)
wc.clock = clock.NewFakeClock(time.Now())
return wc
}
Expand Down