Skip to content

Commit

Permalink
Merge pull request #28524 from hongchaodeng/listopt
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

ListOptions: add test for ResourceVersion > 0 in List

ref: #28472 

Done:
- Add a test for ResourceVersion > 0 in registry (cache) store List()
- Fix the docs.
  • Loading branch information
k8s-merge-robot committed Jul 12, 2016
2 parents baa956b + 73821d2 commit 6a4de09
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 51 deletions.
6 changes: 5 additions & 1 deletion pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2232,7 +2232,11 @@ type ListOptions struct {
FieldSelector fields.Selector
// If true, watch for changes to this list
Watch bool
// The resource version to watch (no effect on list yet)
// For watch, it's the resource version to watch.
// For list,
// - if unset, then the result is returned from remote storage based on quorum-read flag;
// - if it's 0, then we simply return what we currently have in cache, no guarantee;
// - if set to non zero, then the result is as fresh as given rv.
ResourceVersion string
// Timeout for the list/watch call.
TimeoutSeconds *int64
Expand Down
171 changes: 121 additions & 50 deletions pkg/registry/generic/registry/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"path"
"reflect"
"strconv"
"testing"

"sync"
"testing"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
Expand All @@ -35,12 +35,14 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
storagetesting "k8s.io/kubernetes/pkg/storage/testing"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/util/wait"
)

type testRESTStrategy struct {
Expand Down Expand Up @@ -77,55 +79,8 @@ func (t *testRESTStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Obje
}
func (t *testRESTStrategy) Canonicalize(obj runtime.Object) {}

func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool {
return func(obj runtime.Object) bool {
actualPod := obj.(*api.Pod)
if !api.Semantic.DeepDerivative(pod.Status, actualPod.Status) {
t.Errorf("not a deep derivative %#v", actualPod)
return false
}
return api.HasObjectMetaSystemFieldValues(&actualPod.ObjectMeta)
}
}

func NewTestGenericStoreRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Store) {
podPrefix := "/pods"
server := etcdtesting.NewEtcdTestClientServer(t)
s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true}

return server, &Store{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
QualifiedResource: api.Resource("pods"),
CreateStrategy: strategy,
UpdateStrategy: strategy,
DeleteStrategy: strategy,
KeyRootFunc: func(ctx api.Context) string {
return podPrefix
},
KeyFunc: func(ctx api.Context, id string) (string, error) {
if _, ok := api.NamespaceFrom(ctx); !ok {
return "", fmt.Errorf("namespace is required")
}
return path.Join(podPrefix, id), nil
},
ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil },
PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
return &generic.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, nil, fmt.Errorf("not a pod")
}
return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(pod.ObjectMeta, true), nil
},
}
},
Storage: s,
}
return newTestGenericStoreRegistry(t, false)
}

// setMatcher is a matcher that matches any pod with id in the set.
Expand Down Expand Up @@ -235,6 +190,70 @@ func TestStoreList(t *testing.T) {
}
}

// TestStoreListResourceVersion tests that if List with ResourceVersion > 0, it will wait until
// the results are as fresh as given version.
func TestStoreListResourceVersion(t *testing.T) {
fooPod := &api.Pod{
ObjectMeta: api.ObjectMeta{Namespace: "test", Name: "foo"},
Spec: api.PodSpec{NodeName: "machine"},
}
barPod := &api.Pod{
ObjectMeta: api.ObjectMeta{Namespace: "test", Name: "bar"},
Spec: api.PodSpec{NodeName: "machine"},
}
ctx := api.WithNamespace(api.NewContext(), "test")

server, registry := newTestGenericStoreRegistry(t, true)
defer server.Terminate(t)

obj, err := registry.Create(ctx, fooPod)
if err != nil {
t.Fatal(err)
}

versioner := etcdstorage.APIObjectVersioner{}
rev, err := versioner.ObjectResourceVersion(obj)
if err != nil {
t.Fatal(err)
}

waitListCh := make(chan runtime.Object, 1)
go func(listRev uint64) {
option := &api.ListOptions{ResourceVersion: strconv.FormatUint(listRev, 10)}
// It will wait until we create the second pod.
l, err := registry.List(ctx, option)
if err != nil {
close(waitListCh)
t.Fatal(err)
return
}
waitListCh <- l
}(rev + 1)

select {
case <-time.After(500 * time.Millisecond):
case l := <-waitListCh:
t.Fatalf("expected waiting, but get %#v", l)
}

if _, err := registry.Create(ctx, barPod); err != nil {
t.Fatal(err)
}

select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
case l, ok := <-waitListCh:
if !ok {
return
}
pl := l.(*api.PodList).Items
if len(pl) != 2 {
t.Errorf("Expected get 2 items, but got %d", len(pl))
}
}
}

func TestStoreCreate(t *testing.T) {
podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "test"},
Expand Down Expand Up @@ -983,3 +1002,55 @@ func TestStoreWatch(t *testing.T) {
server.Terminate(t)
}
}

func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (*etcdtesting.EtcdTestServer, *Store) {
podPrefix := "/pods"
server := etcdtesting.NewEtcdTestClientServer(t)
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true}
s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
if hasCacheEnabled {
config := storage.CacherConfig{
CacheCapacity: 10,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
Type: &api.Pod{},
ResourcePrefix: podPrefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
NewListFunc: func() runtime.Object { return &api.PodList{} },
}
s = storage.NewCacherFromConfig(config)
}

return server, &Store{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
QualifiedResource: api.Resource("pods"),
CreateStrategy: strategy,
UpdateStrategy: strategy,
DeleteStrategy: strategy,
KeyRootFunc: func(ctx api.Context) string {
return podPrefix
},
KeyFunc: func(ctx api.Context, id string) (string, error) {
if _, ok := api.NamespaceFrom(ctx); !ok {
return "", fmt.Errorf("namespace is required")
}
return path.Join(podPrefix, id), nil
},
ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil },
PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
return &generic.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, nil, fmt.Errorf("not a pod")
}
return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(pod.ObjectMeta, true), nil
},
}
},
Storage: s,
}
}

0 comments on commit 6a4de09

Please sign in to comment.