Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cacher allow context cancellation if not ready #116024

Merged
merged 2 commits into from Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -252,7 +252,7 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down
8 changes: 4 additions & 4 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Expand Up @@ -493,7 +493,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return nil, err
}

if err := c.ready.wait(); err != nil {
if err := c.ready.wait(ctx); err != nil {
return nil, errors.NewServiceUnavailable(err.Error())
}

Expand Down Expand Up @@ -592,7 +592,7 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o

// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
if err := c.ready.wait(); err != nil {
if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}

Expand Down Expand Up @@ -684,7 +684,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
attribute.Stringer("type", c.groupResource))
defer span.End(500 * time.Millisecond)

if err := c.ready.wait(); err != nil {
if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
span.AddEvent("Ready")
Expand Down Expand Up @@ -1097,7 +1097,7 @@ func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWit

// LastSyncResourceVersion returns resource version to which the underlying cache is synced.
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
if err := c.ready.wait(); err != nil {
if err := c.ready.wait(context.Background()); err != nil {
return 0, errors.NewServiceUnavailable(err.Error())
}

Expand Down
Expand Up @@ -28,6 +28,7 @@ import (
"time"

apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -185,7 +186,7 @@ func TestGetListCacheBypass(t *testing.T) {
result := &example.PodList{}

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -224,7 +225,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
result := &example.PodList{}

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -258,7 +259,7 @@ func TestGetCacheBypass(t *testing.T) {
result := &example.Pod{}

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -290,7 +291,7 @@ func TestWatchCacheBypass(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand All @@ -312,6 +313,34 @@ func TestWatchCacheBypass(t *testing.T) {
}
}

func TestWatchNotHangingOnStartupFailure(t *testing.T) {
// Configure cacher so that it can't initialize, because of
// constantly failing lists to the underlying storage.
dummyErr := fmt.Errorf("dummy")
backingStorage := &dummyStorage{err: dummyErr}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()

ctx, cancel := context.WithCancel(context.Background())
// Cancel the watch after some time to check if it will properly
// terminate instead of hanging forever.
go func() {
defer cancel()
cacher.clock.Sleep(5 * time.Second)
}()

// Watch hangs waiting on watchcache being initialized.
// Ensure that it terminates when its context is cancelled
// (e.g. the request is terminated for whatever reason).
_, err = cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0"})
if err == nil || err.Error() != apierrors.NewServiceUnavailable(context.Canceled.Error()).Error() {
t.Errorf("Unexpected error: %#v", err)
}
}

func TestWatcherNotGoingBackInTime(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
Expand All @@ -321,7 +350,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -407,7 +436,7 @@ func TestCacheDontAcceptRequestsStopped(t *testing.T) {
}

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -470,7 +499,7 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
pred := storage.Everything
Expand Down Expand Up @@ -568,7 +597,7 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
pred := storage.Everything
Expand Down Expand Up @@ -668,7 +697,7 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
cacher.bookmarkWatchers.bookmarkFrequency = time.Second

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
pred := storage.Everything
Expand Down Expand Up @@ -738,7 +767,7 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -816,7 +845,7 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) {
cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second)

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -894,7 +923,7 @@ func TestStartingResourceVersion(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -974,7 +1003,7 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -1085,7 +1114,7 @@ func TestCachingDeleteEvents(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -1167,7 +1196,7 @@ func testCachingObjects(t *testing.T, watchersCount int) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -1263,7 +1292,7 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
// Ensure there is enough budget for slow processing since
Expand Down
107 changes: 76 additions & 31 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cacher

import (
"context"
"fmt"
"sync"
)
Expand All @@ -30,67 +31,111 @@ const (
)

// ready is a three state condition variable that blocks until is Ready if is not Stopped.
// Its initial state is Pending.
// Its initial state is Pending and its state machine diagram is as follow.
//
// Pending <------> Ready -----> Stopped
//
// | ^
// └---------------------------┘
type ready struct {
state status
c *sync.Cond
state status // represent the state of the variable
lock sync.RWMutex // protect the state variable
restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated
waitCh chan struct{} // blocks until is ready or stopped
}

func newReady() *ready {
return &ready{
c: sync.NewCond(&sync.RWMutex{}),
state: Pending,
waitCh: make(chan struct{}),
state: Pending,
}
}

// done close the channel once the state is Ready or Stopped
func (r *ready) done() chan struct{} {
r.restartLock.Lock()
defer r.restartLock.Unlock()
return r.waitCh
}

// wait blocks until it is Ready or Stopped, it returns an error if is Stopped.
func (r *ready) wait() error {
r.c.L.Lock()
defer r.c.L.Unlock()
for r.state == Pending {
r.c.Wait()
}
switch r.state {
case Ready:
return nil
case Stopped:
return fmt.Errorf("apiserver cacher is stopped")
default:
return fmt.Errorf("unexpected apiserver cache state: %v", r.state)
func (r *ready) wait(ctx context.Context) error {
for {
// r.done() only blocks if state is Pending
select {
case <-ctx.Done():
return ctx.Err()
case <-r.done():
}

r.lock.RLock()
switch r.state {
case Pending:
// since we allow to switch between the states Pending and Ready
// if there is a quick transition from Pending -> Ready -> Pending
// a process that was waiting can get unblocked and see a Pending
// state again. If the state is Pending we have to wait again to
// avoid an inconsistent state on the system, with some processes not
// waiting despite the state moved back to Pending.
r.lock.RUnlock()
case Ready:
r.lock.RUnlock()
return nil
case Stopped:
r.lock.RUnlock()
return fmt.Errorf("apiserver cacher is stopped")
default:
r.lock.RUnlock()
return fmt.Errorf("unexpected apiserver cache state: %v", r.state)
}
}
}

// check returns true only if it is Ready.
func (r *ready) check() bool {
// TODO: Make check() function more sophisticated, in particular
// allow it to behave as "waitWithTimeout".
rwMutex := r.c.L.(*sync.RWMutex)
rwMutex.RLock()
defer rwMutex.RUnlock()
r.lock.RLock()
defer r.lock.RUnlock()
return r.state == Ready
}

// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
func (r *ready) set(ok bool) {
r.c.L.Lock()
defer r.c.L.Unlock()
r.lock.Lock()
defer r.lock.Unlock()
if r.state == Stopped {
return
}
if ok {
if ok && r.state == Pending {
r.state = Ready
} else {
select {
case <-r.waitCh:
default:
close(r.waitCh)
}
} else if !ok && r.state == Ready {
// creating the waitCh can be racy if
// something enter the wait() method
select {
case <-r.waitCh:
r.restartLock.Lock()
r.waitCh = make(chan struct{})
r.restartLock.Unlock()
default:
}
r.state = Pending
}
r.c.Broadcast()
}

// stop the condition variable and set it as Stopped. This state is irreversible.
func (r *ready) stop() {
r.c.L.Lock()
defer r.c.L.Unlock()
r.lock.Lock()
defer r.lock.Unlock()
if r.state != Stopped {
r.state = Stopped
r.c.Broadcast()
}
select {
case <-r.waitCh:
default:
close(r.waitCh)
}
}