Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ const (
//
// Allow the API server to stream individual items instead of chunking
WatchList featuregate.Feature = "WatchList"

// owner: @serathius
// kep: http://kep.k8s.io/2340
// alpha: v1.28
//
// Allow the API server to serve consistent lists from cache
ConsistentListFromCache featuregate.Feature = "ConsistentListFromCache"
)

func init() {
Expand Down Expand Up @@ -264,4 +271,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha},

WatchList: {Default: false, PreRelease: featuregate.Alpha},

ConsistentListFromCache: {Default: false, PreRelease: featuregate.Alpha},
}
15 changes: 9 additions & 6 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,9 +726,10 @@ func shouldDelegateList(opts storage.ListOptions) bool {
pred := opts.Predicate
match := opts.ResourceVersionMatch
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)

// Serve consistent reads from storage
consistentReadFromStorage := resourceVersion == ""
// Serve consistent reads from storage if ConsistentListFromCache is disabled
consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled
// Watch cache doesn't support continuations, so serve them from etcd.
hasContinuation := pagingEnabled && len(pred.Continue) > 0
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
Expand Down Expand Up @@ -762,19 +763,21 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
return c.storage.GetList(ctx, key, opts, listObj)
}

// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return err
}

if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.GetList(ctx, key, opts, listObj)
}
if listRV == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
listRV, err = c.getCurrentResourceVersionFromStorage(ctx)
if err != nil {
return err
}
}

ctx, span := tracing.Start(ctx, "cacher list",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
Expand Down
10 changes: 10 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
)

Expand Down Expand Up @@ -150,6 +153,13 @@ func TestList(t *testing.T) {
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
}

func TestListWithListFromCache(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
}

func TestListWithoutPaging(t *testing.T) {
ctx, cacher, terminate := testSetup(t, withoutPaging)
t.Cleanup(terminate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ func TestGetListCacheBypass(t *testing.T) {
opts storage.ListOptions
expectBypass bool
}
testCases := []testCase{
{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true},
commonTestCases := []testCase{
{opts: storage.ListOptions{ResourceVersion: "0"}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "1"}, expectBypass: false},

Expand All @@ -180,9 +179,25 @@ func TestGetListCacheBypass(t *testing.T) {
{opts: storage.ListOptions{ResourceVersion: "1", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, expectBypass: true},
}

for _, tc := range testCases {
testGetListCacheBypass(t, tc.opts, tc.expectBypass)
}
t.Run("ConsistentListFromStorage", func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)()
testCases := append(commonTestCases,
testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true},
)
for _, tc := range testCases {
testGetListCacheBypass(t, tc.opts, tc.expectBypass)
}

})
t.Run("ConsistentListFromCache", func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
testCases := append(commonTestCases,
testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: false},
)
for _, tc := range testCases {
testGetListCacheBypass(t, tc.opts, tc.expectBypass)
}
})
}

func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectBypass bool) {
Expand All @@ -200,7 +215,23 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
t.Fatalf("unexpected error waiting for the cache to be ready")
}
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.injectError(errDummy)
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
currentResourceVersion := "42"
switch {
// request made by getCurrentResourceVersionFromStorage by checking Limit
case key == cacher.resourcePrefix:
podList := listObj.(*example.PodList)
podList.ResourceVersion = currentResourceVersion
return nil
// request made by storage.GetList with revision from original request or
// returned by getCurrentResourceVersionFromStorage
case opts.ResourceVersion == options.ResourceVersion || opts.ResourceVersion == currentResourceVersion:
return errDummy
default:
t.Fatalf("Unexpected request %+v", opts)
return nil
}
}
err = cacher.GetList(context.TODO(), "pods/ns", options, result)
if err != nil && err != errDummy {
t.Fatalf("Unexpected error for List request with options: %v, err: %v", options, err)
Expand Down
18 changes: 17 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -412,6 +414,7 @@ func (w *watchCache) UpdateResourceVersion(resourceVersion string) {
w.Lock()
defer w.Unlock()
w.resourceVersion = rv
w.cond.Broadcast()
}()

// Avoid calling event handler under lock.
Expand Down Expand Up @@ -490,7 +493,14 @@ func (s sortableStoreElements) Swap(i, j int) {
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
// with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) {
err := w.waitUntilFreshAndBlock(ctx, resourceVersion)
var err error
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
}
defer w.RUnlock()
if err != nil {
return nil, 0, "", err
Expand All @@ -513,6 +523,12 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
return result, rv, index, err
}

func (w *watchCache) notFresh(resourceVersion uint64) bool {
w.RLock()
defer w.RUnlock()
return resourceVersion > w.resourceVersion
}

// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
err := w.waitUntilFreshAndBlock(ctx, resourceVersion)
Expand Down
98 changes: 74 additions & 24 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
)
Expand Down Expand Up @@ -516,6 +519,33 @@ func TestWaitUntilFreshAndList(t *testing.T) {
}
}

func TestWaitUntilFreshAndListFromCache(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{})
defer store.Stop()
// In background, update the store.
go func() {
store.Add(makeTestPod("pod1", 2))
store.bookmarkRevision <- 3
}()

// list from future revision. Requires watch cache to request bookmark to get it.
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resourceVersion != 3 {
t.Errorf("unexpected resourceVersion: %v, expected: 6", resourceVersion)
}
if len(list) != 1 {
t.Errorf("unexpected list returned: %#v", list)
}
if indexUsed != "" {
t.Errorf("Used index %q but expected none to be used", indexUsed)
}
}

func TestWaitUntilFreshAndGet(t *testing.T) {
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{})
Expand Down Expand Up @@ -544,31 +574,51 @@ func TestWaitUntilFreshAndGet(t *testing.T) {
}

func TestWaitUntilFreshAndListTimeout(t *testing.T) {
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{})
defer store.Stop()
fc := store.clock.(*testingclock.FakeClock)

// In background, step clock after the below call starts the timer.
go func() {
for !fc.HasWaiters() {
time.Sleep(time.Millisecond)
}
fc.Step(blockTimeout)

// Add an object to make sure the test would
// eventually fail instead of just waiting
// forever.
time.Sleep(30 * time.Second)
store.Add(makeTestPod("bar", 5))
}()

_, _, _, err := store.WaitUntilFreshAndList(ctx, 5, nil)
if !errors.IsTimeout(err) {
t.Errorf("expected timeout error but got: %v", err)
tcs := []struct {
name string
ConsistentListFromCache bool
}{
{
name: "FromStorage",
ConsistentListFromCache: false,
},
{
name: "FromCache",
ConsistentListFromCache: true,
},
}
if !storage.IsTooLargeResourceVersion(err) {
t.Errorf("expected 'Too large resource version' cause in error but got: %v", err)
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, tc.ConsistentListFromCache)()
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{})
defer store.Stop()
fc := store.clock.(*testingclock.FakeClock)

// In background, step clock after the below call starts the timer.
go func() {
for !fc.HasWaiters() {
time.Sleep(time.Millisecond)
}
store.Add(makeTestPod("foo", 2))
store.bookmarkRevision <- 3
fc.Step(blockTimeout)

// Add an object to make sure the test would
// eventually fail instead of just waiting
// forever.
time.Sleep(30 * time.Second)
store.Add(makeTestPod("bar", 4))
}()

_, _, _, err := store.WaitUntilFreshAndList(ctx, 4, nil)
if !errors.IsTimeout(err) {
t.Errorf("expected timeout error but got: %v", err)
}
if !storage.IsTooLargeResourceVersion(err) {
t.Errorf("expected 'Too large resource version' cause in error but got: %v", err)
}
})
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,10 @@ func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool {
resourceVersion := opts.ResourceVersion
match := opts.ResourceVersionMatch
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)

// Serve consistent reads from storage
consistentReadFromStorage := resourceVersion == ""
// Serve consistent reads from storage if ConsistentListFromCache is disabled
consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled
// Watch cache doesn't support continuations, so serve them from etcd.
hasContinuation := pagingEnabled && len(opts.Continue) > 0
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
Expand Down