Skip to content

Commit

Permalink
Fix enabling consistent list from watch cache also works for resource…
Browse files Browse the repository at this point in the history
…Version=0
  • Loading branch information
serathius committed Mar 14, 2024
1 parent 3ec6a38 commit 0b8e795
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 8 deletions.
2 changes: 1 addition & 1 deletion staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Expand Up @@ -773,7 +773,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
// minimal resource version, simply forward the request to storage.
return c.storage.GetList(ctx, key, opts, listObj)
}
if listRV == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
if err != nil {
return err
Expand Down
35 changes: 31 additions & 4 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go
Expand Up @@ -167,20 +167,47 @@ func TestPreconditionalDeleteWithSuggestionPass(t *testing.T) {
}

func TestList(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)()
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
}

func TestListWithListFromCache(t *testing.T) {
// TODO(https://github.com/etcd-io/etcd/issues/17507): Remove skip.
t.Skip("This test flakes flakes due to https://github.com/etcd-io/etcd/issues/17507")
func TestListWithConsistentListFromCache(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
// Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507
// TODO(https://github.com/etcd-io/etcd/issues/17507): Remove the wait when etcd is upgraded to version with fix.
err := cacher.ready.wait(ctx)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
}

func TestConsistentList(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)()
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, false)
}

func TestConsistentListWithConsistentListFromCache(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
// Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507
// TODO(https://github.com/etcd-io/etcd/issues/17507): Remove the wait when etcd is upgraded to version with fix.
err := cacher.ready.wait(ctx)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, true)
}

func TestGetListNonRecursive(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
Expand Down Expand Up @@ -216,7 +243,7 @@ func TestListInconsistentContinuation(t *testing.T) {
storagetesting.RunTestListInconsistentContinuation(ctx, t, cacher, nil)
}

func TestConsistentList(t *testing.T) {
func TestListResourceVersionMatch(t *testing.T) {
// TODO(#109831): Enable use of this test and run it.
}

Expand Down
9 changes: 7 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go
Expand Up @@ -216,6 +216,11 @@ func TestList(t *testing.T) {
storagetesting.RunTestList(ctx, t, store, compactStorage(client), false)
}

func TestConsistentList(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client), false, true)
}

func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation {
return func(t *testing.T, pageSize, estimatedProcessedObjects uint64) {
if reads := transformer.GetReadsAndReset(); reads != estimatedProcessedObjects {
Expand Down Expand Up @@ -285,9 +290,9 @@ func TestListInconsistentContinuation(t *testing.T) {
storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client))
}

func TestConsistentList(t *testing.T) {
func TestListResourceVersionMatch(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestConsistentList(ctx, t, &storeWithPrefixTransformer{store})
storagetesting.RunTestListResourceVersionMatch(ctx, t, &storeWithPrefixTransformer{store})
}

func TestCount(t *testing.T) {
Expand Down
71 changes: 70 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go
Expand Up @@ -1170,6 +1170,14 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, com
expectRV: currentRV,
expectedOut: []example.Pod{},
},
{
name: "test non-consistent List",
prefix: "/pods/empty",
pred: storage.Everything,
rv: "0",
expectRVFunc: resourceVersionNotOlderThan(list.ResourceVersion),
expectedOut: []example.Pod{},
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -1242,6 +1250,67 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, com
}
}

func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, cacheEnabled, consistentReadsSupported bool) {
outPod := &example.Pod{}
inPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "foo"}}
err := store.Create(ctx, computePodKey(inPod), inPod, outPod, 0)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
lastObjecRV := outPod.ResourceVersion
compaction(ctx, t, outPod.ResourceVersion)
parsedRV, _ := strconv.Atoi(outPod.ResourceVersion)
currentRV := fmt.Sprintf("%d", parsedRV+1)

firstNonConsistentReadRV := lastObjecRV
if consistentReadsSupported && !cacheEnabled {
firstNonConsistentReadRV = currentRV
}

secondNonConsistentReadRV := lastObjecRV
if consistentReadsSupported {
secondNonConsistentReadRV = currentRV
}

tcs := []struct {
name string
requestRV string
expectResponseRV string
}{
{
name: "Non-consistent list before sync",
requestRV: "0",
expectResponseRV: firstNonConsistentReadRV,
},
{
name: "Consistent request returns currentRV",
requestRV: "",
expectResponseRV: currentRV,
},
{
name: "Non-consistent request after sync returns currentRV",
requestRV: "0",
expectResponseRV: secondNonConsistentReadRV,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
out := &example.PodList{}
opts := storage.ListOptions{
ResourceVersion: tc.requestRV,
Predicate: storage.Everything,
}
err = store.GetList(ctx, "/pods/empty", opts, out)
if err != nil {
t.Fatalf("GetList failed: %v", err)
}
if out.ResourceVersion != tc.expectResponseRV {
t.Errorf("resourceVersion in list response want=%s, got=%s", tc.expectResponseRV, out.ResourceVersion)
}
})
}
}

// seedMultiLevelData creates a set of keys with a multi-level structure, returning a resourceVersion
// from before any were created along with the full set of objects that were persisted
func seedMultiLevelData(ctx context.Context, store storage.Interface) (string, []*example.Pod, error) {
Expand Down Expand Up @@ -1952,7 +2021,7 @@ type InterfaceWithPrefixTransformer interface {
UpdatePrefixTransformer(PrefixTransformerModifier) func()
}

func RunTestConsistentList(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
func RunTestListResourceVersionMatch(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
nextPod := func(index uint32) (string, *example.Pod) {
obj := &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down

0 comments on commit 0b8e795

Please sign in to comment.