diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index 1fafb58bbcaf8..8b5072aa2d4c4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -182,9 +182,20 @@ func TestListWithListFromCache(t *testing.T) { } func TestGetListNonRecursive(t *testing.T) { - ctx, cacher, terminate := testSetup(t) + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)() + ctx, cacher, server, terminate := testSetupWithEtcdServer(t) + t.Cleanup(terminate) + storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client), cacher) +} + +func TestGetListNonRecursiveWithConsistentListFromCache(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)() + ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestGetListNonRecursive(ctx, t, cacher) + // Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507 + // TODO(https://github.com/etcd-io/etcd/issues/17507): Remove sleep when etcd is upgraded to version with fix. + time.Sleep(100 * time.Millisecond) + storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client), cacher) } func checkStorageCalls(t *testing.T, pageSize, estimatedProcessedObjects uint64) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index bb875fe692d6c..9c4d93f176a08 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -536,7 +536,14 @@ func (w *watchCache) notFresh(resourceVersion uint64) bool { // WaitUntilFreshAndGet returns a pointers to object. func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) { - err := w.waitUntilFreshAndBlock(ctx, resourceVersion) + var err error + if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && w.notFresh(resourceVersion) { + w.waitingUntilFresh.Add() + err = w.waitUntilFreshAndBlock(ctx, resourceVersion) + w.waitingUntilFresh.Remove() + } else { + err = w.waitUntilFreshAndBlock(ctx, resourceVersion) + } defer w.RUnlock() if err != nil { return nil, false, 0, err diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 182a4aea5c49c..410da44ba8f43 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -162,8 +162,8 @@ func TestPreconditionalDeleteWithSuggestionPass(t *testing.T) { } func TestGetListNonRecursive(t *testing.T) { - ctx, store, _ := testSetup(t) - storagetesting.RunTestGetListNonRecursive(ctx, t, store) + ctx, store, client := testSetup(t) + storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(client), store) } type storeWithPrefixTransformer struct { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index c5a1d98ec7c21..b6234b58066bf 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -1341,7 +1341,7 @@ func seedMultiLevelData(ctx context.Context, store storage.Interface) (string, [ return initialRV, created, nil } -func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage.Interface) { +func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, compaction Compaction, store storage.Interface) { key, prevStoredObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion) @@ -1354,7 +1354,11 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage }, nil); err != nil { t.Fatalf("update failed: %v", err) } - currentRV, _ := strconv.Atoi(storedObj.ResourceVersion) + objRV, _ := strconv.Atoi(storedObj.ResourceVersion) + // Use compact to increase etcd global revision without changes to any resources. + // The increase in resources version comes from Kubernetes compaction updating hidden key. + // Used to test consistent List to confirm it returns latest etcd revision. + compaction(ctx, t, prevStoredObj.ResourceVersion) tests := []struct { name string @@ -1388,13 +1392,13 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage key: key, pred: storage.Everything, expectedOut: []example.Pod{*storedObj}, - rv: fmt.Sprintf("%d", currentRV), + rv: fmt.Sprintf("%d", objRV), }, { name: "existing key, resourceVersion=current, resourceVersionMatch=notOlderThan", key: key, pred: storage.Everything, expectedOut: []example.Pod{*storedObj}, - rv: fmt.Sprintf("%d", currentRV), + rv: fmt.Sprintf("%d", objRV), rvMatch: metav1.ResourceVersionMatchNotOlderThan, }, { name: "existing key, resourceVersion=previous, resourceVersionMatch=notOlderThan", @@ -1408,7 +1412,7 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage key: key, pred: storage.Everything, expectedOut: []example.Pod{*storedObj}, - rv: fmt.Sprintf("%d", currentRV), + rv: fmt.Sprintf("%d", objRV), rvMatch: metav1.ResourceVersionMatchExact, }, { name: "existing key, resourceVersion=previous, resourceVersionMatch=exact", @@ -1453,7 +1457,7 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage }, }, expectedOut: []example.Pod{}, - rv: fmt.Sprintf("%d", currentRV), + rv: fmt.Sprintf("%d", objRV), }} for _, tt := range tests {