Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serve watch without resourceVersion from cache and introduce a WatchFromStorageWithoutResourceVersion feature gate to allow serving watch from storage #123935

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/features/kube_features.go
Expand Up @@ -1307,6 +1307,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

genericfeatures.WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},

genericfeatures.WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},

genericfeatures.WatchList: {Default: false, PreRelease: featuregate.Alpha},

genericfeatures.ZeroLimitedNominalConcurrencyShares: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
Expand Down
8 changes: 8 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/features/kube_features.go
Expand Up @@ -258,6 +258,12 @@ const (
// Enables support for watch bookmark events.
WatchBookmark featuregate.Feature = "WatchBookmark"

// owner: @serathius
// beta: 1.30
// Enables watches without resourceVersion to be served from storage.
// Used to prevent https://github.com/kubernetes/kubernetes/issues/123072 until etcd fixes the issue.
WatchFromStorageWithoutResourceVersion featuregate.Feature = "WatchFromStorageWithoutResourceVersion"

// owner: @vinaykul
// kep: http://kep.k8s.io/1287
// alpha: v1.27
Expand Down Expand Up @@ -349,6 +355,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},

WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},

InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha},

WatchList: {Default: false, PreRelease: featuregate.Alpha},
Expand Down
8 changes: 5 additions & 3 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Expand Up @@ -523,7 +523,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
opts.SendInitialEvents = nil
}
// TODO: we should eventually get rid of this legacy case
if opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return c.storage.Watch(ctx, key, opts)
}
requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
Expand Down Expand Up @@ -1282,12 +1282,14 @@ func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(parsedResourceVersion
//
// if SendInitiaEvents != nil => ResourceVersionMatch = NotOlderThan
// if ResourceVersionmatch != nil => ResourceVersionMatch = NotOlderThan & SendInitialEvents != nil
//
// to satisfy the legacy case (SendInitialEvents = true, RV="") we skip checking opts.Predicate.AllowWatchBookmarks
func (c *Cacher) getWatchCacheResourceVersion(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (uint64, error) {
if len(opts.ResourceVersion) != 0 {
return parsedWatchResourceVersion, nil
}
// legacy case
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we add it here?

If we're already at this point, we will be serving watch from cache. So the problem we're facing [being watches on etcd overloading etcd] is not a problem here. I don't think this is needed here - I would revert changes to this function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactors for WatchList have changed the default behavior of watch cache, that was not a problem was previously this response was served from etcd, however to pass the tests (WatchSemantics and conformance test) I need to update this function.

Reason is that getWatchCacheResourceVersion returns a ResourceVersion to which the watch cache must be synchronized to. Without this change getWatchCacheResourceVersion will call GetCurrentResourceVersionFromStorage which is meant for consistent read, and wait for such revision.

With my change I will return zero, which means serve the data available from cache, which is consistent with the old behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK - we will need to update tests when we will switch FG default, but that's fine.

So the tests should effectively be [if FG set, then 100, otherwise 0].
So maybe please add a short comment for those tests that when we switch FG, those should be reverted to 100.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a hard time following what this bit does, but at least the 1.27 / 1.28 / 1.29 backports won't need this and will be a straight gating of the etcd pass-through, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - this bit will not be part of backports

return 0, nil
}
rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
return rv, err
}
Expand Down
15 changes: 12 additions & 3 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go
Expand Up @@ -381,9 +381,18 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
}

func TestWatchSemantics(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemantics(context.TODO(), t, store)
t.Run("WatchFromStorageWithoutResourceVersion=true", func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true)()
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemantics(context.TODO(), t, store)
})
t.Run("WatchFromStorageWithoutResourceVersion=false", func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false)()
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemantics(context.TODO(), t, store)
Copy link
Member

@liggitt liggitt Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this doubling of the RunWatchSemantics test is responsible for the increased timeout rate in https://testgrid.k8s.io/sig-release-master-blocking#ci-kubernetes-unit&width=20 and #123850

})
}

func TestWatchSemanticInitialEventsExtended(t *testing.T) {
Expand Down
Expand Up @@ -338,8 +338,6 @@ func TestWatchCacheBypass(t *testing.T) {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.injectError(errDummy)
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "0",
Predicate: storage.Everything,
Expand All @@ -348,12 +346,32 @@ func TestWatchCacheBypass(t *testing.T) {
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
}

// With unset RV, check if cacher is bypassed.
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: storage.Everything,
})
if err != errDummy {
t.Errorf("Watch with unset RV should bypass cacher: %v", err)
if err != nil {
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
}

defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false)()
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: storage.Everything,
})
if err != nil {
t.Errorf("With WatchFromStorageWithoutResourceVersion disabled, watch with unset RV should be served from cache: %v", err)
}

// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.injectError(errDummy)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true)()
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: storage.Everything,
})
if !errors.Is(err, errDummy) {
t.Errorf("With WatchFromStorageWithoutResourceVersion enabled, watch with unset RV should be served from storage: %v", err)
}
}

Expand Down Expand Up @@ -2032,9 +2050,11 @@ func TestGetWatchCacheResourceVersion(t *testing.T) {
// | Unset | true/false | nil/true/false |
// +-----------------+---------------------+-----------------------+
{
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=nil",
opts: listOptions(true, nil, ""),
expectedWatchResourceVersion: 100,
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=nil",
opts: listOptions(true, nil, ""),
// Expecting RV 0, due to https://github.com/kubernetes/kubernetes/pull/123935 reverted to serving those requests from watch cache.
// Set to 100, when WatchFromStorageWithoutResourceVersion is set to true.
expectedWatchResourceVersion: 0,
},
{
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=true",
Expand All @@ -2047,9 +2067,11 @@ func TestGetWatchCacheResourceVersion(t *testing.T) {
expectedWatchResourceVersion: 100,
},
{
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=nil",
opts: listOptions(false, nil, ""),
expectedWatchResourceVersion: 100,
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=nil",
opts: listOptions(false, nil, ""),
// Expecting RV 0, due to https://github.com/kubernetes/kubernetes/pull/123935 reverted to serving those requests from watch cache.
// Set to 100, when WatchFromStorageWithoutResourceVersion is set to true.
expectedWatchResourceVersion: 0,
},
{
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=true, legacy",
Expand Down