Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #94002 upstream release 1.17 #94047

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 10 additions & 3 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go
Expand Up @@ -547,7 +547,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor

newItemFunc := getNewItemFunc(listObj, v)

var returnedRV, continueRV int64
var returnedRV, continueRV, withRev int64
var continueKey string
switch {
case s.pagingEnabled && len(pred.Continue) > 0:
Expand All @@ -568,7 +568,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
// continueRV==0 is invalid.
// If continueRV < 0, the request is for the latest resource version.
if continueRV > 0 {
options = append(options, clientv3.WithRev(continueRV))
withRev = continueRV
returnedRV = continueRV
}
case s.pagingEnabled && pred.Limit > 0:
Expand All @@ -578,7 +578,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
if fromRV > 0 {
options = append(options, clientv3.WithRev(int64(fromRV)))
withRev = int64(fromRV)
}
returnedRV = int64(fromRV)
}
Expand All @@ -589,6 +589,9 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
default:
options = append(options, clientv3.WithPrefix())
}
if withRev != 0 {
options = append(options, clientv3.WithRev(withRev))
}

// loop until we have filled the requested limit from etcd or there are no more results
var lastKey []byte
Expand Down Expand Up @@ -650,6 +653,10 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
break
}
key = string(lastKey) + "\x00"
if withRev == 0 {
withRev = returnedRV
options = append(options, clientv3.WithRev(withRev))
}
}

// instruct the client to begin querying from immediately after the last key we returned
Expand Down
84 changes: 84 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go
Expand Up @@ -1597,3 +1597,87 @@ func Test_growSlice(t *testing.T) {
})
}
}

// fancyTransformer creates next object on each call to
// TransformFromStorage call.
type fancyTransformer struct {
transformer value.Transformer
store *store

lock sync.Mutex
index int
}

func (t *fancyTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) {
if err := t.createObject(); err != nil {
return nil, false, err
}
return t.transformer.TransformFromStorage(b, ctx)
}

func (t *fancyTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) {
return t.transformer.TransformToStorage(b, ctx)
}

func (t *fancyTransformer) createObject() error {
t.lock.Lock()
defer t.lock.Unlock()

t.index++
key := fmt.Sprintf("pod-%d", t.index)
obj := &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: key,
Labels: map[string]string{
"even": strconv.FormatBool(t.index%2 == 0),
},
},
}
out := &example.Pod{}
return t.store.Create(context.TODO(), key, obj, out, 0)
}

func TestConsistentList(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

transformer := &fancyTransformer{
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
}
store := newStore(cluster.RandClient(), true, codec, "", transformer)
transformer.store = store

for i := 0; i < 5; i++ {
if err := transformer.createObject(); err != nil {
t.Fatalf("failed to create object: %v", err)
}
}

getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod, ok := obj.(*example.Pod)
if !ok {
return nil, nil, fmt.Errorf("invalid object")
}
return labels.Set(pod.Labels), nil, nil
}
predicate := storage.SelectionPredicate{
Label: labels.Set{"even": "true"}.AsSelector(),
GetAttrs: getAttrs,
Limit: 4,
}

result1 := example.PodList{}
if err := store.List(context.TODO(), "/", "", predicate, &result1); err != nil {
t.Fatalf("failed to list objects: %v", err)
}

result2 := example.PodList{}
if err := store.List(context.TODO(), "/", result1.ResourceVersion, predicate, &result2); err != nil {
t.Fatalf("failed to list objects: %v", err)
}

if !reflect.DeepEqual(result1, result2) {
t.Errorf("inconsistent lists: %#v, %#v", result1, result2)
}
}