diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go index b2a588d85fec..3532ccea522d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go @@ -37,9 +37,6 @@ import ( "k8s.io/apiserver/pkg/storage/value" ) -// basePath for storage keys used across tests -const basePath = "/keybase" - // CreateObjList will create a list from the array of objects. func CreateObjList(prefix string, helper storage.Interface, items []runtime.Object) error { for i := range items { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index 1644f32e1bd1..4455f574851b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -44,65 +44,76 @@ func RunTestWatch(ctx context.Context, t *testing.T, store storage.Interface) { // - update should trigger Modified event // - update that gets filtered should trigger Deleted event func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recursive bool) { - podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} - podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} + basePod := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: example.PodSpec{NodeName: ""}, + } + basePodAssigned := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: example.PodSpec{NodeName: "bar"}, + } tests := []struct { name string + namespace string key string pred storage.SelectionPredicate watchTests []*testWatchStruct }{{ name: "create a key", - key: basePath + "/somekey-1", - watchTests: []*testWatchStruct{{podFoo, true, watch.Added}}, + namespace: fmt.Sprintf("test-ns-1-%t", recursive), + watchTests: []*testWatchStruct{{basePod, true, watch.Added}}, pred: storage.Everything, }, { name: "key updated to match predicate", - key: basePath + "/somekey-3", - watchTests: []*testWatchStruct{{podFoo, false, ""}, {podBar, true, watch.Added}}, + namespace: fmt.Sprintf("test-ns-2-%t", recursive), + watchTests: []*testWatchStruct{{basePod, false, ""}, {basePodAssigned, true, watch.Added}}, pred: storage.SelectionPredicate{ Label: labels.Everything(), - Field: fields.ParseSelectorOrDie("metadata.name=bar"), + Field: fields.ParseSelectorOrDie("spec.nodename=bar"), GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil + return nil, fields.Set{"spec.nodename": pod.Spec.NodeName}, nil }, }, }, { name: "update", - key: basePath + "/somekey-4", - watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Modified}}, + namespace: fmt.Sprintf("test-ns-3-%t", recursive), + watchTests: []*testWatchStruct{{basePod, true, watch.Added}, {basePodAssigned, true, watch.Modified}}, pred: storage.Everything, }, { name: "delete because of being filtered", - key: basePath + "/somekey-5", - watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Deleted}}, + namespace: fmt.Sprintf("test-ns-4-%t", recursive), + watchTests: []*testWatchStruct{{basePod, true, watch.Added}, {basePodAssigned, true, watch.Deleted}}, pred: storage.SelectionPredicate{ Label: labels.Everything(), - Field: fields.ParseSelectorOrDie("metadata.name!=bar"), + Field: fields.ParseSelectorOrDie("spec.nodename!=bar"), GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil + return nil, fields.Set{"spec.nodename": pod.Spec.NodeName}, nil }, }, }} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - w, err := store.Watch(ctx, tt.key, storage.ListOptions{ResourceVersion: "0", Predicate: tt.pred, Recursive: recursive}) + watchKey := fmt.Sprintf("pods/%s", tt.namespace) + key := watchKey + "/foo" + if !recursive { + watchKey = key + } + + w, err := store.Watch(ctx, watchKey, storage.ListOptions{ResourceVersion: "0", Predicate: tt.pred, Recursive: recursive}) if err != nil { t.Fatalf("Watch failed: %v", err) } var prevObj *example.Pod for _, watchTest := range tt.watchTests { out := &example.Pod{} - key := tt.key - if recursive { - key = key + "/item" - } err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { - return watchTest.obj, nil + obj := watchTest.obj.DeepCopy() + obj.Namespace = tt.namespace + return obj, nil }), nil) if err != nil { t.Fatalf("GuaranteedUpdate failed: %v", err) @@ -127,7 +138,7 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur // - watch from 0 should sync up and grab the object added before // - watch from 0 is able to return events for objects whose previous version has been compacted func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) { - key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) + key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { @@ -140,7 +151,7 @@ func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Inter out := &example.Pod{} err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { - return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil + return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns", Annotations: map[string]string{"a": "1"}}}, nil }), nil) if err != nil { t.Fatalf("GuaranteedUpdate failed: %v", err) @@ -162,7 +173,7 @@ func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Inter out = &example.Pod{} err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { - return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil + return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}, nil }), nil) if err != nil { t.Fatalf("GuaranteedUpdate failed: %v", err) @@ -180,7 +191,7 @@ func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Inter } func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.Interface) { - key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) @@ -192,7 +203,7 @@ func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage. } func RunTestWatchFromNoneZero(ctx context.Context, t *testing.T, store storage.Interface) { - key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) if err != nil { @@ -201,12 +212,17 @@ func RunTestWatchFromNoneZero(ctx context.Context, t *testing.T, store storage.I out := &example.Pod{} store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { - return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, err + newObj := storedObj.DeepCopy() + newObj.Annotations = map[string]string{"version": "2"} + return newObj, nil }), nil) testCheckResult(t, watch.Modified, w, out) } func RunTestWatchError(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) { + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}} + key := computePodKey(obj) + // Compute the initial resource version from which we can start watching later. list := &example.PodList{} storageOpts := storage.ListOptions{ @@ -214,13 +230,13 @@ func RunTestWatchError(ctx context.Context, t *testing.T, store InterfaceWithPre Predicate: storage.Everything, Recursive: true, } - if err := store.GetList(ctx, basePath, storageOpts, list); err != nil { + if err := store.GetList(ctx, "/pods", storageOpts, list); err != nil { t.Errorf("Unexpected error: %v", err) } - if err := store.GuaranteedUpdate(ctx, basePath+"//foo", &example.Pod{}, true, nil, storage.SimpleUpdate( + if err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { - return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil + return obj, nil }), nil); err != nil { t.Fatalf("GuaranteedUpdate failed: %v", err) } @@ -232,7 +248,7 @@ func RunTestWatchError(ctx context.Context, t *testing.T, store InterfaceWithPre }) defer revertTransformer() - w, err := store.Watch(ctx, basePath+"//foo", storage.ListOptions{ResourceVersion: list.ResourceVersion, Predicate: storage.Everything}) + w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: list.ResourceVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } @@ -244,7 +260,7 @@ func RunTestWatchContextCancel(ctx context.Context, t *testing.T, store storage. cancel() // When we watch with a canceled context, we should detect that it's context canceled. // We won't take it as error and also close the watcher. - w, err := store.Watch(canceledCtx, basePath+"/abc", storage.ListOptions{ + w, err := store.Watch(canceledCtx, "/pods/not-existing", storage.ListOptions{ ResourceVersion: "0", Predicate: storage.Everything, }) @@ -263,7 +279,7 @@ func RunTestWatchContextCancel(ctx context.Context, t *testing.T, store storage. } func RunTestWatchDeleteEventObjectHaveLatestRV(ctx context.Context, t *testing.T, store storage.Interface) { - key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout) w, err := store.Watch(watchCtx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) @@ -295,7 +311,7 @@ func RunTestWatchInitializationSignal(ctx context.Context, t *testing.T, store s initSignal := utilflowcontrol.NewInitializationSignal() ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal) - key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) _, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) @@ -309,8 +325,8 @@ func RunTestWatchInitializationSignal(ctx context.Context, t *testing.T, store s // (it rather is used by wrappers of storage.Interface to implement its functionalities) // this test is currently considered optional. func RunOptionalTestProgressNotify(ctx context.Context, t *testing.T, store storage.Interface) { - key := basePath + "/somekey" - input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}} + input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name", Namespace: "test-ns"}} + key := computePodKey(input) out := &example.Pod{} if err := store.Create(ctx, key, input, out, 0); err != nil { t.Fatalf("Create failed: %v", err)