diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e48db41f94..4d6f6b8955 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -59,6 +59,18 @@ type Options struct { // The overall is a token bucket and the per-item is exponential. RateLimiter ratelimiter.RateLimiter + // NewQueue constructs the queue for this controller once the controller is ready to start. + // With NewQueue a custom queue implementation can be used, e.g. a priority queue to prioritize with which + // priority/order objects are reconciled (e.g. to reconcile objects with changes first). + // This is a func because the standard Kubernetes work queues start themselves immediately, which + // leads to goroutine leaks if something calls controller.New repeatedly. + // The NewQueue func gets the controller name and the RateLimiter option (defaulted if necessary) passed in. + // NewQueue defaults to NewRateLimitingQueueWithConfig. + // + // NOTE: LOW LEVEL PRIMITIVE! + // Only use a custom NewQueue if you know what you are doing. + NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface + // LogConstructor is used to construct a logger used for this controller and passed // to each reconciliation via the context field. LogConstructor func(request *reconcile.Request) logr.Logger @@ -147,6 +159,14 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller options.RateLimiter = workqueue.DefaultControllerRateLimiter() } + if options.NewQueue == nil { + options.NewQueue = func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface { + return workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{ + Name: controllerName, + }) + } + } + if options.RecoverPanic == nil { options.RecoverPanic = mgr.GetControllerOptions().RecoverPanic } @@ -157,12 +177,9 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller // Create controller with dependencies set return &controller.Controller{ - Do: options.Reconciler, - MakeQueue: func() workqueue.RateLimitingInterface { - return workqueue.NewRateLimitingQueueWithConfig(options.RateLimiter, workqueue.RateLimitingQueueConfig{ - Name: name, - }) - }, + Do: options.Reconciler, + RateLimiter: options.RateLimiter, + NewQueue: options.NewQueue, MaxConcurrentReconciles: options.MaxConcurrentReconciles, CacheSyncTimeout: options.CacheSyncTimeout, Name: name, diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index e49a2c5774..f1197827c5 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/gomega" "go.uber.org/goleak" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/config" @@ -32,6 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" internalcontroller "sigs.k8s.io/controller-runtime/pkg/internal/controller" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/ratelimiter" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -133,6 +135,48 @@ var _ = Describe("controller.Controller", func() { Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed()) }) + It("should default RateLimiter and NewQueue if not specified", func() { + m, err := manager.New(cfg, manager.Options{}) + Expect(err).NotTo(HaveOccurred()) + + c, err := controller.New("new-controller", m, controller.Options{ + Reconciler: reconcile.Func(nil), + }) + Expect(err).NotTo(HaveOccurred()) + + ctrl, ok := c.(*internalcontroller.Controller) + Expect(ok).To(BeTrue()) + + Expect(ctrl.RateLimiter).NotTo(BeNil()) + Expect(ctrl.NewQueue).NotTo(BeNil()) + }) + + It("should not override RateLimiter and NewQueue if specified", func() { + m, err := manager.New(cfg, manager.Options{}) + Expect(err).NotTo(HaveOccurred()) + + customRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second) + customNewQueueCalled := false + customNewQueue := func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface { + customNewQueueCalled = true + return nil + } + + c, err := controller.New("new-controller", m, controller.Options{ + Reconciler: reconcile.Func(nil), + RateLimiter: customRateLimiter, + NewQueue: customNewQueue, + }) + Expect(err).NotTo(HaveOccurred()) + + ctrl, ok := c.(*internalcontroller.Controller) + Expect(ok).To(BeTrue()) + + Expect(ctrl.RateLimiter).To(BeIdenticalTo(customRateLimiter)) + ctrl.NewQueue("controller1", nil) + Expect(customNewQueueCalled).To(BeTrue(), "Expected customNewQueue to be called") + }) + It("should default RecoverPanic from the manager", func() { m, err := manager.New(cfg, manager.Options{Controller: config.Controller{RecoverPanic: ptr.To(true)}}) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 33883647b9..40ba0685d0 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -33,6 +33,7 @@ import ( ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/ratelimiter" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -50,10 +51,13 @@ type Controller struct { // Defaults to the DefaultReconcileFunc. Do reconcile.Reconciler - // MakeQueue constructs the queue for this controller once the controller is ready to start. - // This exists because the standard Kubernetes workqueues start themselves immediately, which + // RateLimiter is used to limit how frequently requests may be queued into the work queue. + RateLimiter ratelimiter.RateLimiter + + // NewQueue constructs the queue for this controller once the controller is ready to start. + // This is a func because the standard Kubernetes work queues start themselves immediately, which // leads to goroutine leaks if something calls controller.New repeatedly. - MakeQueue func() workqueue.RateLimitingInterface + NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface // Queue is an listeningQueue that listens for events from Informers and adds object keys to // the Queue for processing @@ -158,7 +162,7 @@ func (c *Controller) Start(ctx context.Context) error { // Set the internal context. c.ctx = ctx - c.Queue = c.MakeQueue() + c.Queue = c.NewQueue(c.Name, c.RateLimiter) go func() { <-ctx.Done() c.Queue.ShutDown() diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index ce2245e60f..96cd27e1e3 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -43,6 +43,7 @@ import ( ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/ratelimiter" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -68,7 +69,7 @@ var _ = Describe("controller", func() { ctrl = &Controller{ MaxConcurrentReconciles: 1, Do: fakeReconcile, - MakeQueue: func() workqueue.RateLimitingInterface { return queue }, + NewQueue: func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return queue }, LogConstructor: func(_ *reconcile.Request) logr.Logger { return log.RuntimeLog.WithName("controller").WithName("test") }, @@ -408,8 +409,8 @@ var _ = Describe("controller", func() { // TODO(directxman12): we should ensure that backoff occurrs with error requeue It("should not reset backoff until there's a non-error result", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} - ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } + dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)} + ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -444,8 +445,8 @@ var _ = Describe("controller", func() { }) It("should requeue a Request with rate limiting if the Result sets Requeue:true and continue processing items", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} - ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } + dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)} + ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -474,8 +475,8 @@ var _ = Describe("controller", func() { }) It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} - ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } + dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)} + ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -504,8 +505,8 @@ var _ = Describe("controller", func() { }) It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} - ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } + dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)} + ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq } ctx, cancel := context.WithCancel(context.Background()) defer cancel()