Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cacher: replace usable lock with conditional variable #26860

Merged
merged 1 commit into from
Jul 5, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 36 additions & 32 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,13 @@ type CacherConfig struct {
type Cacher struct {
sync.RWMutex

// Each user-facing method that is not simply redirected to the underlying
// storage has to read-lock on this mutex before starting any processing.
// Before accessing the cacher's cache, wait for the ready to be ok.
// This is necessary to prevent users from accessing structures that are
// uninitialized or are being repopulated right now.
// NOTE: We cannot easily reuse the main mutex for it due to multi-threaded
// interactions of Cacher with the underlying WatchCache. Since Cacher is
// caling WatchCache directly and WatchCache is calling Cacher methods
// via its OnEvent and OnReplace hooks, we explicitly assume that if mutexes
// of both structures are held, the one from WatchCache is acquired first
// to avoid deadlocks. Unfortunately, forcing this rule in startCaching
// would be very difficult and introducing one more mutex seems to be much
// easier.
usable sync.RWMutex
// ready needs to be set to false when the cacher is paused or stopped.
// ready needs to be set to true when the cacher is ready to use after
// initialization.
ready *ready

// Underlying storage.Interface.
storage Interface
Expand Down Expand Up @@ -126,6 +120,7 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
}

cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
Expand All @@ -139,8 +134,6 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
stopCh: make(chan struct{}),
}
// See startCaching method for explanation and where this is unlocked.
cacher.usable.Lock()
watchCache.SetOnEvent(cacher.processEvent)

stopCh := cacher.stopCh
Expand Down Expand Up @@ -168,11 +161,11 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
successfulList := false
c.watchCache.SetOnReplace(func() {
successfulList = true
c.usable.Unlock()
c.ready.set(true)
})
defer func() {
if successfulList {
c.usable.Lock()
c.ready.set(false)
}
}()

Expand Down Expand Up @@ -213,9 +206,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
return nil, err
}

// Do NOT allow Watch to start when the underlying structures are not propagated.
c.usable.RLock()
defer c.usable.RUnlock()
c.ready.wait()

// We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked
Expand Down Expand Up @@ -272,13 +263,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f
return err
}

// To avoid situation when List is processed before the underlying
// watchCache is propagated for the first time, we acquire and immediately
// release the 'usable' lock.
// We don't need to hold it all the time, because watchCache is thread-safe
// and it would complicate already very difficult locking pattern.
c.usable.RLock()
c.usable.RUnlock()
c.ready.wait()

// List elements from cache, with at least 'listRV'.
listPtr, err := meta.GetItemsPtr(listObj)
Expand Down Expand Up @@ -381,18 +366,13 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi

// Returns resource version to which the underlying cache is synced.
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
// To avoid situation when LastSyncResourceVersion is processed before the
// underlying watchCache is propagated, we acquire 'usable' lock.
c.usable.RLock()
defer c.usable.RUnlock()

c.RLock()
defer c.RUnlock()
c.ready.wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a functionality change to me-- we've had bugs here before, this is pretty tricky to get right. I will let @wojtek-t do a careful review. Let's not make this change until after 1.3, at least.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why exactly we need to hold the c.Rlock. It would be great if @wojtek-t can help to review. Also if it is really tricky, I would love to write a test to ensure we make the tricky part right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, I think I added a test last time I fixed a bug here.

On Mon, Jun 6, 2016 at 4:33 PM, Xiang Li notifications@github.com wrote:

In pkg/storage/cacher.go
#26860 (comment)
:

@@ -417,18 +401,13 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi

// Returns resource version to which the underlying cache is synced.
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {

  • // To avoid situation when LastSyncResourceVersion is processed before the
  • // underlying watchCache is propagated, we acquire 'usable' lock.
  • c.usable.RLock()

- defer c.usable.RUnlock()

  • c.RLock()
  • defer c.RUnlock()
  • c.ready.wait()

I am not sure why exactly we need to hold the c.Rlock. It would be great
if @wojtek-t https://github.com/wojtek-t can help to review. Also if it
is really tricky, I would love to write a test to ensure we make the tricky
part right.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/kubernetes/kubernetes/pull/26860/files/5d6b423585362dd13620036bc44e1f3a1baec730#r65988030,
or mute the thread
https://github.com/notifications/unsubscribe/AAnglox9DOYIzeq-y37zBQ5SN5_V8xwsks5qJK5NgaJpZM4IuX_4
.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea. It seems all the tests are passed. Probably we can assume it works correctly? Anyway, I am happy to write more tests to handle concerns or potential regressions. Thank you!


resourceVersion := c.reflector.LastSyncResourceVersion()
if resourceVersion == "" {
return 0, nil
}

return strconv.ParseUint(resourceVersion, 10, 64)
}

Expand Down Expand Up @@ -591,3 +571,27 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin
}
}
}

type ready struct {
ok bool
c *sync.Cond
}

func newReady() *ready {
return &ready{c: sync.NewCond(&sync.Mutex{})}
}

func (r *ready) wait() {
r.c.L.Lock()
for !r.ok {
r.c.Wait()
}
r.c.L.Unlock()
}

func (r *ready) set(ok bool) {
r.c.L.Lock()
Copy link
Contributor

@0xmichalis 0xmichalis Jun 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this lock be acquired if something else already wait()s?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what do you mean. Do you mean that you worry about someone entries at line 622 without releasing the lock? This is how convar works. Convar will release lock in wait. See https://golang.org/pkg/sync/#Cond.Wait

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see, thanks

defer r.c.L.Unlock()
r.ok = ok
r.c.Broadcast()
}