Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,18 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool)

w.notifyItemOrWaiterAdded()

item := <-w.get

return item.Key, item.Priority, w.shutdown.Load()
select {
case <-w.done:
// Return if the queue was shutdown while we were already waiting for an item here.
// For example controller workers are continuously calling GetWithPriority and
// GetWithPriority is blocking the workers if there are no items in the queue.
// If the controller and accordingly the queue is then shut down, without this code
// branch the controller workers remain blocked here and are unable to shut down.
var zero T
return zero, 0, true
case item := <-w.get:
return item.Key, item.Priority, w.shutdown.Load()
}
}

func (w *priorityqueue[T]) Get() (item T, shutdown bool) {
Expand Down
26 changes: 26 additions & 0 deletions pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,32 @@ var _ = Describe("Controllerworkqueue", func() {
Expect(isShutDown).To(BeTrue())
})

It("Get from priority queue should get unblocked when the priority queue is shut down", func() {
q, _ := newQueue()

getUnblocked := make(chan struct{})

go func() {
defer GinkgoRecover()
defer close(getUnblocked)

item, priority, isShutDown := q.GetWithPriority()
Expect(item).To(Equal(""))
Expect(priority).To(Equal(0))
Expect(isShutDown).To(BeTrue())
}()

// Verify the go routine above is now waiting for an item.
Eventually(q.(*priorityqueue[string]).waiters.Load).Should(Equal(int64(1)))
Consistently(getUnblocked).ShouldNot(BeClosed())

// shut down
q.ShutDown()

// Verify the shutdown unblocked the go routine.
Eventually(getUnblocked).Should(BeClosed())
})

It("items are included in Len() and the queueDepth metric once they are ready", func() {
q, metrics := newQueue()
defer q.ShutDown()
Expand Down