Skip to content

Commit

Permalink
Serve watch without resourceVersion from cache and introduce a WatchF…
Browse files Browse the repository at this point in the history
…romStorageWithoutResourceVersion feature gate to allow serving watch from storage.
  • Loading branch information
serathius committed Mar 20, 2024
1 parent c37a819 commit 017248f
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 10 deletions.
2 changes: 2 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

genericfeatures.UnauthenticatedHTTP2DOSMitigation: {Default: false, PreRelease: featuregate.Beta},

genericfeatures.WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},

// features that enable backwards compatibility but are scheduled to be removed
// ...
HPAScaleToZero: {Default: false, PreRelease: featuregate.Alpha},
Expand Down
8 changes: 8 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ const (
// Enables support for watch bookmark events.
WatchBookmark featuregate.Feature = "WatchBookmark"

// owner: @serathius
// beta: 1.30
// Enables watches without resourceVersion to be served from storage.
// Used to prevent https://github.com/kubernetes/kubernetes/issues/123072 until etcd fixes the issue.
WatchFromStorageWithoutResourceVersion featuregate.Feature = "WatchFromStorageWithoutResourceVersion"

// owner: @vinaykul
// kep: http://kep.k8s.io/1287
// alpha: v1.27
Expand Down Expand Up @@ -297,6 +303,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},

WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},

InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha},

WatchList: {Default: false, PreRelease: featuregate.Alpha},
Expand Down
2 changes: 1 addition & 1 deletion staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
opts.SendInitialEvents = nil
}
if opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return c.storage.Watch(ctx, key, opts)
}
requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,6 @@ func TestWatchCacheBypass(t *testing.T) {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.injectError(errDummy)
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "0",
Predicate: storage.Everything,
Expand All @@ -327,12 +325,32 @@ func TestWatchCacheBypass(t *testing.T) {
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
}

// With unset RV, check if cacher is bypassed.
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: storage.Everything,
})
if err != errDummy {
t.Errorf("Watch with unset RV should bypass cacher: %v", err)
if err != nil {
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
}

defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false)()
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: storage.Everything,
})
if err != nil {
t.Errorf("With WatchFromStorageWithoutResourceVersion disabled, watch with unset RV should be served from cache: %v", err)
}

// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.injectError(errDummy)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true)()
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: storage.Everything,
})
if !errors.Is(err, errDummy) {
t.Errorf("With WatchFromStorageWithoutResourceVersion enabled, watch with unset RV should be served from storage: %v", err)
}
}

Expand Down Expand Up @@ -1679,10 +1697,10 @@ func TestCacherWatchSemantics(t *testing.T) {
},
{
// note we set storage's RV to some future value, mustn't be used by this scenario
name: "legacy, RV=unset, storageRV=105",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
// no events because the watch is delegated to the underlying storage
name: "legacy, RV=unset, storageRV=105",
storageResourceVersion: "105",
initialPods: []*example.Pod{makePod(101), makePod(102)},
expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
},
}
for _, scenario := range scenarios {
Expand Down

0 comments on commit 017248f

Please sign in to comment.