Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ type TypedOptions[request comparable] struct {
// Only use a custom NewQueue if you know what you are doing.
NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request]

// PriorityQueueOptions are options for the priority queue.
// Only used if UsePriorityQueue is true and NewQueue is not set.
//
// NOTE: LOW LEVEL PRIMITIVE!
// Only pass custom options to the priority queue if you know what you are doing.
PriorityQueueOptions []priorityqueue.Opt[request]

// Logger will be used to build a default LogConstructor if unset.
Logger logr.Logger

Expand Down Expand Up @@ -260,10 +267,13 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req
if options.NewQueue == nil {
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
if ptr.Deref(options.UsePriorityQueue, true) {
return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) {
var opts []priorityqueue.Opt[request]
opts = append(opts, func(o *priorityqueue.Opts[request]) {
o.Log = options.Logger.WithValues("controller", controllerName)
o.RateLimiter = rateLimiter
})
opts = append(opts, options.PriorityQueueOptions...)
return priorityqueue.New(controllerName, opts...)
}
return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{
Name: controllerName,
Expand Down
58 changes: 58 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
. "github.com/onsi/gomega"
"go.uber.org/goleak"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"

Expand All @@ -38,6 +39,22 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

type mockHooks struct {
onBecameReadyCalls []onBecameReadyCall
}

type onBecameReadyCall struct {
item reconcile.Request
priority int
}

func (m *mockHooks) OnBecameReady(item reconcile.Request, priority int) {
m.onBecameReadyCalls = append(m.onBecameReadyCalls, onBecameReadyCall{
item: item,
priority: priority,
})
}

var _ = Describe("controller.Controller", func() {
rec := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
return reconcile.Result{}, nil
Expand Down Expand Up @@ -476,6 +493,47 @@ var _ = Describe("controller.Controller", func() {
Expect(ok).To(BeFalse())
})

It("should use the PriorityQueueOptions if specified and NewQueue is not specified", func() {
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

customRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](5*time.Millisecond, 1000*time.Second)

hooks := &mockHooks{}

c, err := controller.New("new-controller-18", m, controller.Options{
Reconciler: reconcile.Func(nil),
RateLimiter: customRateLimiter,
PriorityQueueOptions: []priorityqueue.Opt[reconcile.Request]{
func(o *priorityqueue.Opts[reconcile.Request]) {
o.Hooks = hooks
},
},
})
Expect(err).NotTo(HaveOccurred())

ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())

Expect(ctrl.RateLimiter).To(BeIdenticalTo(customRateLimiter))
q := ctrl.NewQueue("controller-pq-hooks", nil)
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: "foo",
Namespace: "bar",
}})
item, shutdown := q.Get()
Expect(shutdown).To(BeFalse())
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{
Name: "foo",
Namespace: "bar",
}}))
Expect(hooks.onBecameReadyCalls).To(HaveLen(1))
Expect(hooks.onBecameReadyCalls[0].item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{
Name: "foo",
Namespace: "bar",
}}))
})

It("should set EnableWarmup correctly", func() {
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())
Expand Down
31 changes: 31 additions & 0 deletions pkg/controller/priorityqueue/hooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package priorityqueue

// Hooks represents a set of hooks that can be implemented to
// customize the behavior of the priority queue for elements
// of type T.
//
// NOTE: LOW LEVEL PRIMITIVE!
// Implementations must be goroutine-safe and considerate
// of the time spent in each hook, as they may be called
// in performance-sensitive paths. It's recommended to
// use non-blocking operations or offload heavy processing
// to separate goroutines through the use of channels or
// context-aware mechanisms.
type Hooks[T comparable] interface {
// OnBecameReady is called when an item becomes ready to be processed.
// For AddWithOpts() calls that result in the item being added with
// a delay, this hook is called only when the item becomes ready
// after the delay has elapsed.
OnBecameReady(item T, priority int)
}

// hooks is a wrapper around Hooks to allow optional implementation.
type hooks[T comparable] struct {
Hooks[T]
}

func (h hooks[T]) OnBecameReady(item T, priority int) {
if h.Hooks != nil {
h.Hooks.OnBecameReady(item, priority)
}
}
207 changes: 207 additions & 0 deletions pkg/controller/priorityqueue/hooks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package priorityqueue

import (
"sync"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/utils/ptr"
)

type mockHooks struct {
mu sync.Mutex
onBecameReadyCalls []onBecameReadyCall
}

type onBecameReadyCall struct {
item int
priority int
}

func (m *mockHooks) OnBecameReady(item int, priority int) {
m.mu.Lock()
defer m.mu.Unlock()
m.onBecameReadyCalls = append(m.onBecameReadyCalls, onBecameReadyCall{
item: item,
priority: priority,
})
}

func (m *mockHooks) getCalls() []onBecameReadyCall {
m.mu.Lock()
defer m.mu.Unlock()
result := make([]onBecameReadyCall, len(m.onBecameReadyCalls))
copy(result, m.onBecameReadyCalls)
return result
}

var _ = Describe("Hooks", func() {
It("works with nil hooks", func() {
q := New[int]("test")
defer q.ShutDown()

q.Add(10)
item, shutdown := q.Get()
Expect(shutdown).To(BeFalse())
Expect(item).To(Equal(10))
})

It("calls OnBecameReady when item is added without delay", func() {
hooks := &mockHooks{}
q := New("test", func(o *Opts[int]) {
o.Hooks = hooks
})
defer q.ShutDown()

q.AddWithOpts(AddOpts{Priority: ptr.To(5)}, 10)

item, priority, shutdown := q.GetWithPriority()
Expect(shutdown).To(BeFalse())
Expect(item).To(Equal(10))
Expect(priority).To(Equal(5))

calls := hooks.getCalls()
Expect(calls).To(HaveLen(1))
Expect(calls[0]).To(Equal(onBecameReadyCall{item: 10, priority: 5}))
})

It("calls OnBecameReady only once for duplicate items", func() {
hooks := &mockHooks{}
q := New("test", func(o *Opts[int]) {
o.Hooks = hooks
})
defer q.ShutDown()

q.Add(10)
q.Add(10)
q.Add(10)

item, shutdown := q.Get()
Expect(shutdown).To(BeFalse())
Expect(item).To(Equal(10))

calls := hooks.getCalls()
Expect(calls).To(HaveLen(1))
Expect(calls[0].item).To(Equal(10))
})

It("calls OnBecameReady when priority is increased for existing item", func() {
hooks := &mockHooks{}
q := New("test", func(o *Opts[int]) {
o.Hooks = hooks
})
defer q.ShutDown()

q.AddWithOpts(AddOpts{Priority: ptr.To(1)}, 10)
q.AddWithOpts(AddOpts{Priority: ptr.To(5)}, 10)

item, priority, shutdown := q.GetWithPriority()
Expect(shutdown).To(BeFalse())
Expect(item).To(Equal(10))
Expect(priority).To(Equal(5))

calls := hooks.getCalls()
Expect(calls).To(HaveLen(1))
Expect(calls[0]).To(Equal(onBecameReadyCall{item: 10, priority: 1}))
})

It("does not call OnBecameReady when item is added with delay", func() {
hooks := &mockHooks{}
q := New("test", func(o *Opts[int]) {
o.Hooks = hooks
})
defer q.ShutDown()

q.AddWithOpts(AddOpts{After: time.Hour}, 10)

Consistently(func() []onBecameReadyCall {
return hooks.getCalls()
}, "100ms").Should(BeEmpty())
})

It("calls OnBecameReady when delayed item becomes ready", func() {
hooks := &mockHooks{}
q := New("test", func(o *Opts[int]) {
o.Hooks = hooks
})
defer q.ShutDown()

pq := q.(*priorityqueue[int])
now := time.Now().Round(time.Second)
nowLock := sync.Mutex{}
tick := make(chan time.Time)

pq.now = func() time.Time {
nowLock.Lock()
defer nowLock.Unlock()
return now
}
pq.tick = func(d time.Duration) <-chan time.Time {
return tick
}

q.AddWithOpts(AddOpts{After: time.Second, Priority: ptr.To(3)}, 10)

Consistently(func() []onBecameReadyCall {
return hooks.getCalls()
}, "100ms").Should(BeEmpty())

// Forward time
nowLock.Lock()
now = now.Add(time.Second)
nowLock.Unlock()
tick <- now

Eventually(func() []onBecameReadyCall {
return hooks.getCalls()
}).Should(HaveLen(1))

calls := hooks.getCalls()
Expect(calls[0]).To(Equal(onBecameReadyCall{item: 10, priority: 3}))
})

It("calls OnBecameReady when delayed item is re-added without delay", func() {
hooks := &mockHooks{}
q := New("test", func(o *Opts[int]) {
o.Hooks = hooks
})
defer q.ShutDown()

q.AddWithOpts(AddOpts{After: time.Hour}, 10)
Expect(hooks.getCalls()).To(BeEmpty())

// Re-add without delay
q.AddWithOpts(AddOpts{Priority: ptr.To(2)}, 10)

Eventually(func() []onBecameReadyCall {
return hooks.getCalls()
}).Should(HaveLen(1))

calls := hooks.getCalls()
Expect(calls[0]).To(Equal(onBecameReadyCall{item: 10, priority: 2}))
})

It("calls OnBecameReady for each unique item", func() {
hooks := &mockHooks{}
q := New("test", func(o *Opts[int]) {
o.Hooks = hooks
})
defer q.ShutDown()

q.Add(10)
q.Add(20)
q.Add(30)

Eventually(func() int {
return len(hooks.getCalls())
}).Should(Equal(3))

calls := hooks.getCalls()
items := make([]int, len(calls))
for i, call := range calls {
items[i] = call.item
}
Expect(items).To(ConsistOf(10, 20, 30))
})
})
8 changes: 8 additions & 0 deletions pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Opts[T comparable] struct {
RateLimiter workqueue.TypedRateLimiter[T]
MetricProvider workqueue.MetricsProvider
Log logr.Logger
Hooks Hooks[T]
}

// Opt allows to configure a PriorityQueue.
Expand Down Expand Up @@ -80,6 +81,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
get: make(chan item[T]),
now: time.Now,
tick: time.Tick,
hooks: hooks[T]{opts.Hooks},
}

go pq.spin()
Expand Down Expand Up @@ -130,6 +132,9 @@ type priorityqueue[T comparable] struct {
// Configurable for testing
now func() time.Time
tick func(time.Duration) <-chan time.Time

// Hooks to customize behavior
hooks hooks[T]
}

func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
Expand Down Expand Up @@ -165,6 +170,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
w.queue.ReplaceOrInsert(item)
if item.ReadyAt == nil {
w.metrics.add(key, item.Priority)
w.hooks.OnBecameReady(key, item.Priority)
}
w.addedCounter++
continue
Expand All @@ -184,6 +190,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {
if readyAt == nil && !w.becameReady.Has(key) {
w.metrics.add(key, item.Priority)
w.hooks.OnBecameReady(key, item.Priority)
}
item.ReadyAt = readyAt
}
Expand Down Expand Up @@ -266,6 +273,7 @@ func (w *priorityqueue[T]) spin() {
if !w.becameReady.Has(item.Key) {
w.metrics.add(item.Key, item.Priority)
w.becameReady.Insert(item.Key)
w.hooks.OnBecameReady(item.Key, item.Priority)
}
}

Expand Down