Skip to content

Commit

Permalink
Merge pull request #116488 from natherz97/automated-cherry-pick-of-#1…
Browse files Browse the repository at this point in the history
…16024-remotes-upstream-release-1.25

Automated cherry pick of #116024 remotes upstream release 1.25

Kubernetes-commit: 2a114e626c99e6c882ecb6874c02785077ecb401
  • Loading branch information
k8s-publishing-bot committed Mar 28, 2023
2 parents 8adff38 + 91e1812 commit 6bda68d
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 54 deletions.
8 changes: 4 additions & 4 deletions pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return nil, err
}

if err := c.ready.wait(); err != nil {
if err := c.ready.wait(ctx); err != nil {
return nil, errors.NewServiceUnavailable(err.Error())
}

Expand Down Expand Up @@ -567,7 +567,7 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o

// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
if err := c.ready.wait(); err != nil {
if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}

Expand Down Expand Up @@ -657,7 +657,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
utiltrace.Field{Key: "type", Value: c.objectType.String()})
defer trace.LogIfLong(500 * time.Millisecond)

if err := c.ready.wait(); err != nil {
if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
trace.Step("Ready")
Expand Down Expand Up @@ -1066,7 +1066,7 @@ func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWit

// LastSyncResourceVersion returns resource version to which the underlying cache is synced.
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
if err := c.ready.wait(); err != nil {
if err := c.ready.wait(context.Background()); err != nil {
return 0, errors.NewServiceUnavailable(err.Error())
}

Expand Down
61 changes: 45 additions & 16 deletions pkg/storage/cacher/cacher_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -339,7 +340,7 @@ func TestGetListCacheBypass(t *testing.T) {
result := &example.PodList{}

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -378,7 +379,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
result := &example.PodList{}

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -412,7 +413,7 @@ func TestGetCacheBypass(t *testing.T) {
result := &example.Pod{}

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand All @@ -435,6 +436,34 @@ func TestGetCacheBypass(t *testing.T) {
}
}

func TestWatchNotHangingOnStartupFailure(t *testing.T) {
// Configure cacher so that it can't initialize, because of
// constantly failing lists to the underlying storage.
dummyErr := fmt.Errorf("dummy")
backingStorage := &dummyStorage{err: dummyErr}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()

ctx, cancel := context.WithCancel(context.Background())
// Cancel the watch after some time to check if it will properly
// terminate instead of hanging forever.
go func() {
defer cancel()
cacher.clock.Sleep(5 * time.Second)
}()

// Watch hangs waiting on watchcache being initialized.
// Ensure that it terminates when its context is cancelled
// (e.g. the request is terminated for whatever reason).
_, err = cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0"})
if err == nil || err.Error() != apierrors.NewServiceUnavailable(context.Canceled.Error()).Error() {
t.Errorf("Unexpected error: %#v", err)
}
}

func TestWatcherNotGoingBackInTime(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
Expand All @@ -444,7 +473,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -573,7 +602,7 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -613,7 +642,7 @@ func TestCacheDontAcceptRequestsStopped(t *testing.T) {
}

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -718,7 +747,7 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
pred := storage.Everything
Expand Down Expand Up @@ -816,7 +845,7 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
pred := storage.Everything
Expand Down Expand Up @@ -916,7 +945,7 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
cacher.bookmarkWatchers.bookmarkFrequency = time.Second

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
pred := storage.Everything
Expand Down Expand Up @@ -986,7 +1015,7 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -1064,7 +1093,7 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) {
cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second)

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -1142,7 +1171,7 @@ func TestStartingResourceVersion(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -1222,7 +1251,7 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -1333,7 +1362,7 @@ func TestCachingDeleteEvents(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -1415,7 +1444,7 @@ func testCachingObjects(t *testing.T, watchersCount int) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

Expand Down Expand Up @@ -1511,7 +1540,7 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
// Ensure there is enough budget for slow processing since
Expand Down
107 changes: 76 additions & 31 deletions pkg/storage/cacher/ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cacher

import (
"context"
"fmt"
"sync"
)
Expand All @@ -30,67 +31,111 @@ const (
)

// ready is a three state condition variable that blocks until is Ready if is not Stopped.
// Its initial state is Pending.
// Its initial state is Pending and its state machine diagram is as follow.
//
// Pending <------> Ready -----> Stopped
//
// | ^
// └---------------------------┘
type ready struct {
state status
c *sync.Cond
state status // represent the state of the variable
lock sync.RWMutex // protect the state variable
restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated
waitCh chan struct{} // blocks until is ready or stopped
}

func newReady() *ready {
return &ready{
c: sync.NewCond(&sync.RWMutex{}),
state: Pending,
waitCh: make(chan struct{}),
state: Pending,
}
}

// done close the channel once the state is Ready or Stopped
func (r *ready) done() chan struct{} {
r.restartLock.Lock()
defer r.restartLock.Unlock()
return r.waitCh
}

// wait blocks until it is Ready or Stopped, it returns an error if is Stopped.
func (r *ready) wait() error {
r.c.L.Lock()
defer r.c.L.Unlock()
for r.state == Pending {
r.c.Wait()
}
switch r.state {
case Ready:
return nil
case Stopped:
return fmt.Errorf("apiserver cacher is stopped")
default:
return fmt.Errorf("unexpected apiserver cache state: %v", r.state)
func (r *ready) wait(ctx context.Context) error {
for {
// r.done() only blocks if state is Pending
select {
case <-ctx.Done():
return ctx.Err()
case <-r.done():
}

r.lock.RLock()
switch r.state {
case Pending:
// since we allow to switch between the states Pending and Ready
// if there is a quick transition from Pending -> Ready -> Pending
// a process that was waiting can get unblocked and see a Pending
// state again. If the state is Pending we have to wait again to
// avoid an inconsistent state on the system, with some processes not
// waiting despite the state moved back to Pending.
r.lock.RUnlock()
case Ready:
r.lock.RUnlock()
return nil
case Stopped:
r.lock.RUnlock()
return fmt.Errorf("apiserver cacher is stopped")
default:
r.lock.RUnlock()
return fmt.Errorf("unexpected apiserver cache state: %v", r.state)
}
}
}

// check returns true only if it is Ready.
func (r *ready) check() bool {
// TODO: Make check() function more sophisticated, in particular
// allow it to behave as "waitWithTimeout".
rwMutex := r.c.L.(*sync.RWMutex)
rwMutex.RLock()
defer rwMutex.RUnlock()
r.lock.RLock()
defer r.lock.RUnlock()
return r.state == Ready
}

// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
func (r *ready) set(ok bool) {
r.c.L.Lock()
defer r.c.L.Unlock()
r.lock.Lock()
defer r.lock.Unlock()
if r.state == Stopped {
return
}
if ok {
if ok && r.state == Pending {
r.state = Ready
} else {
select {
case <-r.waitCh:
default:
close(r.waitCh)
}
} else if !ok && r.state == Ready {
// creating the waitCh can be racy if
// something enter the wait() method
select {
case <-r.waitCh:
r.restartLock.Lock()
r.waitCh = make(chan struct{})
r.restartLock.Unlock()
default:
}
r.state = Pending
}
r.c.Broadcast()
}

// stop the condition variable and set it as Stopped. This state is irreversible.
func (r *ready) stop() {
r.c.L.Lock()
defer r.c.L.Unlock()
r.lock.Lock()
defer r.lock.Unlock()
if r.state != Stopped {
r.state = Stopped
r.c.Broadcast()
}
select {
case <-r.waitCh:
default:
close(r.waitCh)
}
}
Loading

0 comments on commit 6bda68d

Please sign in to comment.