Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop controller when the stop channel is closed (when queue is empty and Pop is hanging) #37137

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions staging/src/k8s.io/client-go/tools/cache/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func New(c *Config) Controller {
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
Expand Down Expand Up @@ -127,6 +131,9 @@ func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == FIFOClosedError {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
Expand Down
31 changes: 31 additions & 0 deletions staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ type DeltaFIFO struct {
// purpose of figuring out which items have been deleted
// when Replace() or Delete() is called.
knownObjects KeyListerGetter

// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
}

var (
Expand All @@ -132,6 +138,14 @@ var (
ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
)

// Close the queue.
func (f *DeltaFIFO) Close() {
f.closedLock.Lock()
defer f.closedLock.Unlock()
f.closed = true
f.cond.Broadcast()
}

// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
// DeletedFinalStateUnknown objects.
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
Expand Down Expand Up @@ -387,6 +401,16 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err
return d, exists, nil
}

// Checks if the queue is closed
func (f *DeltaFIFO) IsClosed() bool {
f.closedLock.Lock()
defer f.closedLock.Unlock()
if f.closed {
return true
}
return false
}

// Pop blocks until an item is added to the queue, and then returns it. If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
Expand All @@ -404,6 +428,13 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, FIFOClosedError
}

f.cond.Wait()
}
id := f.queue[0]
Expand Down
37 changes: 37 additions & 0 deletions staging/src/k8s.io/client-go/tools/cache/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cache

import (
"errors"
"sync"

"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -33,6 +34,8 @@ type ErrRequeue struct {
Err error
}

var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue")

func (e ErrRequeue) Error() string {
if e.Err == nil {
return "the popped item should be requeued without returning an error"
Expand All @@ -58,6 +61,9 @@ type Queue interface {

// Return true if the first batch of items has been popped
HasSynced() bool

// Close queue
Close()
}

// Helper function for popping from Queue.
Expand Down Expand Up @@ -100,12 +106,26 @@ type FIFO struct {
// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFunc KeyFunc

// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
}

var (
_ = Queue(&FIFO{}) // FIFO is a Queue
)

// Close the queue.
func (f *FIFO) Close() {
f.closedLock.Lock()
defer f.closedLock.Unlock()
f.closed = true
f.cond.Broadcast()
}

// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
// or an Update called first but the first batch of items inserted by Replace() has been popped
func (f *FIFO) HasSynced() bool {
Expand Down Expand Up @@ -222,6 +242,16 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
return item, exists, nil
}

// Checks if the queue is closed
func (f *FIFO) IsClosed() bool {
f.closedLock.Lock()
defer f.closedLock.Unlock()
if f.closed {
return true
}
return false
}

// Pop waits until an item is ready and processes it. If multiple items are
// ready, they are returned in the order in which they were added/updated.
// The item is removed from the queue (and the store) before it is processed,
Expand All @@ -233,6 +263,13 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, FIFOClosedError
}

f.cond.Wait()
}
id := f.queue[0]
Expand Down