Description
Hi team,
Recently I've testing with the new priority queue implementation and it occurs to me that right now getting from the priority queue when it has been shutdown might get stuck. It can be easily triggered if we:
- create a priority queue
- add an item to the queue
- call the
Shutdown()
method - get from the priority queue
- get again from the priority queue -> this call will be blocked
This happens as when the priority queue is shutdown, the spin()
method will stop, but the get
channel is never closed, and the procedure (step 5) that gets from the queue will be stuck waiting on the get
channel.
On the contrary, the rate limiting queue from the workqueue
package will properly handle this situation properly by directly returning that the queue has been shut down.
The issue concerns me a bit as it will block graceful shutdown: in my project we use the envtest
package and normally when the emulated cluster environment is stripped down, controller runtime will see that the main context has been cancelled and all reconcilers will stop upon seeing that the workqueue has been turned off; however, when priority queue is in use, the shutdown of the emulation will time out as there might be workers being stuck waiting for an item to arrive from the priority queue.
Attached also is some really simple test code that I used to trigger the behavior (with v0.21.0):
import (
"fmt"
"testing"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
)
func TestBlocker(t *testing.T) {
rateLimiter := workqueue.DefaultTypedControllerRateLimiter[any]()
wq := workqueue.NewTypedRateLimitingQueue(rateLimiter)
println("Writing to the workqueue\n")
wq.Add(1)
println("Shutting down the workqueue\n")
wq.ShutDown()
println("Attempting to read from the workqueue (#1)\n")
// This returns 1 and false.
v, isShutDown := wq.Get()
println(fmt.Sprintf("Value: %v, isShutDown: %v", v, isShutDown))
println("Attempting to read from the workqueue (#2)\n")
// This returns nil and true.
v, isShutDown = wq.Get()
println(fmt.Sprintf("Value: %v, isShutDown: %v", v, isShutDown))
pq := priorityqueue.New[int]("testing")
println("Writing to the priority queue\n")
pq.Add(1)
println("Shutting down the priority queue\n")
pq.ShutDown()
println("Attempting to read from the priority queue (#1)\n")
// This returns 1, 0, and true.
v, pri, isShutDown := pq.GetWithPriority()
println(fmt.Sprintf("Value: %v, Priority: %v, isShutDown: %v", v, pri, isShutDown))
println("Attempting to read from the priority queue (#2)\n")
// This will never return.
v, pri, isShutDown = pq.GetWithPriority()
println(fmt.Sprintf("Value: %v, Priority: %v, isShutDown: %v", v, pri, isShutDown))
}
If this is already an known issue or if I misunderstanding anything, please let me know. Thanks.