Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use watch cache when rv=0 even when limit is set #70735

Merged
merged 1 commit into from
Nov 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,14 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
Expand Down
9 changes: 7 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,15 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
// GetToList implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) {
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
if resourceVersion == "" || hasContinuation || hasLimit {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation or limit is
// storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well.
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cacher

import (
"context"
"fmt"
"reflect"
"strconv"
Expand All @@ -30,9 +31,14 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/diff"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
)

// verifies the cacheWatcher.process goroutine is properly cleaned up even if
Expand Down Expand Up @@ -195,7 +201,13 @@ func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) er
return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10))
}
func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error {
return fmt.Errorf("unimplemented")
listAccessor, err := meta.ListAccessor(obj)
if err != nil || listAccessor == nil {
return err
}
listAccessor.SetResourceVersion(strconv.FormatUint(resourceVersion, 10))
listAccessor.SetContinue(continueValue)
return nil
}
func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
return fmt.Errorf("unimplemented")
Expand All @@ -206,3 +218,136 @@ func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) {
return strconv.ParseUint(resourceVersion, 10, 64)
}

var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
errDummy = fmt.Errorf("dummy error")
)

func init() {
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
utilruntime.Must(example.AddToScheme(scheme))
utilruntime.Must(examplev1.AddToScheme(scheme))
}

func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner) {
prefix := "pods"
config := Config{
CacheCapacity: cap,
Storage: s,
Versioner: testVersioner{},
Type: &example.Pod{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { return nil, nil, true, nil },
NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
}
return NewCacherFromConfig(config), testVersioner{}
}

type dummyStorage struct {
err error
}

type dummyWatch struct {
ch chan watch.Event
}

func (w *dummyWatch) ResultChan() <-chan watch.Event {
return w.ch
}

func (w *dummyWatch) Stop() {
close(w.ch)
}

func newDummyWatch() watch.Interface {
return &dummyWatch{
ch: make(chan watch.Event),
}
}

func (d *dummyStorage) Versioner() storage.Versioner { return nil }
func (d *dummyStorage) Create(_ context.Context, _ string, _, _ runtime.Object, _ uint64) error {
return fmt.Errorf("unimplemented")
}
func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *storage.Preconditions) error {
return fmt.Errorf("unimplemented")
}
func (d *dummyStorage) Watch(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) {
return newDummyWatch(), nil
}
func (d *dummyStorage) WatchList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) {
return newDummyWatch(), nil
}
func (d *dummyStorage) Get(_ context.Context, _ string, _ string, _ runtime.Object, _ bool) error {
return fmt.Errorf("unimplemented")
}
func (d *dummyStorage) GetToList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, _ runtime.Object) error {
return d.err
}
func (d *dummyStorage) List(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, listObj runtime.Object) error {
podList := listObj.(*example.PodList)
podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"}
return d.err
}
func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.Object, _ bool, _ *storage.Preconditions, _ storage.UpdateFunc, _ ...runtime.Object) error {
return fmt.Errorf("unimplemented")
}
func (d *dummyStorage) Count(_ string) (int64, error) {
return 0, fmt.Errorf("unimplemented")
}

func TestListWithLimitAndRV0(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 0)
defer cacher.Stop()

pred := storage.SelectionPredicate{
Limit: 500,
}
result := &example.PodList{}

// Wait until cacher is initialized.
cacher.ready.wait()

// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
err := cacher.List(context.TODO(), "pods/ns", "0", pred, result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make a second call without rv=0 to make sure our dummy error makes it through the layers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err != nil {
t.Errorf("List with Limit and RV=0 should be served from cache: %v", err)
}

err = cacher.List(context.TODO(), "pods/ns", "", pred, result)
if err != errDummy {
t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err)
}
}

func TestGetToListWithLimitAndRV0(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 0)
defer cacher.Stop()

pred := storage.SelectionPredicate{
Limit: 500,
}
result := &example.PodList{}

// Wait until cacher is initialized.
cacher.ready.wait()

// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
err := cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, make a second call that hits storage to make sure our error is surfaced when storage is called

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err != nil {
t.Errorf("GetToList with Limit and RV=0 should be served from cache: %v", err)
}

err = cacher.GetToList(context.TODO(), "pods/ns", "", pred, result)
if err != errDummy {
t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err)
}
}