Skip to content

Commit

Permalink
Merge pull request kubernetes#121615 from p0lyn0mial/upstream-cacher-…
Browse files Browse the repository at this point in the history
…forget-watcher

cacher: when forgeting a watcher, call stopWatcherLocked multiple times
  • Loading branch information
k8s-ci-robot committed Oct 31, 2023
2 parents d337523 + bbca4a4 commit 5bac451
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 9 deletions.
16 changes: 7 additions & 9 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Expand Up @@ -112,11 +112,8 @@ func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
wm[number] = w
}

func (wm watchersMap) deleteWatcher(number int, done func(*cacheWatcher)) {
if watcher, ok := wm[number]; ok {
delete(wm, number)
done(watcher)
}
func (wm watchersMap) deleteWatcher(number int) {
delete(wm, number)
}

func (wm watchersMap) terminateAll(done func(*cacheWatcher)) {
Expand Down Expand Up @@ -147,14 +144,14 @@ func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, scope namespac
}
}

func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool, done func(*cacheWatcher)) {
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool) {
if supported {
i.valueWatchers[value].deleteWatcher(number, done)
i.valueWatchers[value].deleteWatcher(number)
if len(i.valueWatchers[value]) == 0 {
delete(i.valueWatchers, value)
}
} else {
i.allWatchers[scope].deleteWatcher(number, done)
i.allWatchers[scope].deleteWatcher(number)
if len(i.allWatchers[scope]) == 0 {
delete(i.allWatchers, scope)
}
Expand Down Expand Up @@ -1223,7 +1220,8 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
// It's possible that the watcher is already not in the structure (e.g. in case of
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
// on a watcher multiple times.
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported, c.stopWatcherLocked)
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported)
c.stopWatcherLocked(w)
}
}

Expand Down
Expand Up @@ -47,6 +47,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -1807,3 +1808,45 @@ func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
}},
}, true)
}

func TestForgetWatcher(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
require.NoError(t, err)
defer cacher.Stop()

require.Equal(t, 0, len(cacher.watchers.allWatchers))
require.Equal(t, 0, len(cacher.watchers.valueWatchers))

var forgetWatcherFn func(bool)
var forgetCounter int
forgetWatcherWrapped := func(drainWatcher bool) {
forgetCounter++
forgetWatcherFn(drainWatcher)
}
w := newCacheWatcher(
0,
func(_ string, _ labels.Set, _ fields.Set) bool { return true },
nil,
storage.APIObjectVersioner{},
testingclock.NewFakeClock(time.Now()).Now().Add(2*time.Minute),
true,
schema.GroupResource{Resource: "pods"},
"1",
)
forgetWatcherFn = forgetWatcher(cacher, w, 0, namespacedName{}, "", false)

cacher.watchers.addWatcher(w, 0, namespacedName{}, "", false)
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
require.Equal(t, 1, len(cacher.watchers.allWatchers))

forgetWatcherWrapped(false)
require.Equal(t, 0, len(cacher.watchers.allWatchers))
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
require.Equal(t, 1, forgetCounter)

forgetWatcherWrapped(false)
require.Equal(t, 0, len(cacher.watchers.allWatchers))
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
require.Equal(t, 2, forgetCounter)
}

0 comments on commit 5bac451

Please sign in to comment.