diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e48db41f94..8537f77336 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -59,6 +59,12 @@ type Options struct { // The overall is a token bucket and the per-item is exponential. RateLimiter ratelimiter.RateLimiter + // NewWorkQueue constructs the queue for this controller once the controller is ready to start. + // This is a func because the standard Kubernetes work queues start themselves immediately, which + // leads to goroutine leaks if something calls controller.New repeatedly. + // Defaults to NewRateLimitingQueueWithConfig. + NewWorkQueue func(rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface + // LogConstructor is used to construct a logger used for this controller and passed // to each reconciliation via the context field. LogConstructor func(request *reconcile.Request) logr.Logger @@ -147,6 +153,14 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller options.RateLimiter = workqueue.DefaultControllerRateLimiter() } + if options.NewWorkQueue == nil { + options.NewWorkQueue = func(rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface { + return workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{ + Name: name, + }) + } + } + if options.RecoverPanic == nil { options.RecoverPanic = mgr.GetControllerOptions().RecoverPanic } @@ -157,12 +171,9 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller // Create controller with dependencies set return &controller.Controller{ - Do: options.Reconciler, - MakeQueue: func() workqueue.RateLimitingInterface { - return workqueue.NewRateLimitingQueueWithConfig(options.RateLimiter, workqueue.RateLimitingQueueConfig{ - Name: name, - }) - }, + Do: options.Reconciler, + RateLimiter: options.RateLimiter, + NewWorkQueue: options.NewWorkQueue, MaxConcurrentReconciles: options.MaxConcurrentReconciles, CacheSyncTimeout: options.CacheSyncTimeout, Name: name, diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index e49a2c5774..8b7b8d6305 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/gomega" "go.uber.org/goleak" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/config" @@ -32,6 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" internalcontroller "sigs.k8s.io/controller-runtime/pkg/internal/controller" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/ratelimiter" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -133,6 +135,48 @@ var _ = Describe("controller.Controller", func() { Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed()) }) + It("should default RateLimiter and NewWorkQueue if not specified", func() { + m, err := manager.New(cfg, manager.Options{}) + Expect(err).NotTo(HaveOccurred()) + + c, err := controller.New("new-controller", m, controller.Options{ + Reconciler: reconcile.Func(nil), + }) + Expect(err).NotTo(HaveOccurred()) + + ctrl, ok := c.(*internalcontroller.Controller) + Expect(ok).To(BeTrue()) + + Expect(ctrl.RateLimiter).NotTo(BeNil()) + Expect(ctrl.NewWorkQueue).NotTo(BeNil()) + }) + + It("should not override RateLimiter and NewWorkQueue if specified", func() { + m, err := manager.New(cfg, manager.Options{}) + Expect(err).NotTo(HaveOccurred()) + + customRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second) + customNewWorkQueueCalled := false + customNewWorkQueue := func(rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface { + customNewWorkQueueCalled = true + return nil + } + + c, err := controller.New("new-controller", m, controller.Options{ + Reconciler: reconcile.Func(nil), + RateLimiter: customRateLimiter, + NewWorkQueue: customNewWorkQueue, + }) + Expect(err).NotTo(HaveOccurred()) + + ctrl, ok := c.(*internalcontroller.Controller) + Expect(ok).To(BeTrue()) + + Expect(ctrl.RateLimiter).To(BeIdenticalTo(customRateLimiter)) + ctrl.NewWorkQueue(nil) + Expect(customNewWorkQueueCalled).To(BeTrue(), "Expected customNewWorkQueue to be called") + }) + It("should default RecoverPanic from the manager", func() { m, err := manager.New(cfg, manager.Options{Controller: config.Controller{RecoverPanic: ptr.To(true)}}) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 33883647b9..9e59e9f917 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -33,6 +33,7 @@ import ( ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/ratelimiter" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -50,10 +51,13 @@ type Controller struct { // Defaults to the DefaultReconcileFunc. Do reconcile.Reconciler - // MakeQueue constructs the queue for this controller once the controller is ready to start. - // This exists because the standard Kubernetes workqueues start themselves immediately, which + // RateLimiter is used to limit how frequently requests may be queued into the work queue. + RateLimiter ratelimiter.RateLimiter + + // NewWorkQueue constructs the queue for this controller once the controller is ready to start. + // This is a func because the standard Kubernetes work queues start themselves immediately, which // leads to goroutine leaks if something calls controller.New repeatedly. - MakeQueue func() workqueue.RateLimitingInterface + NewWorkQueue func(rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface // Queue is an listeningQueue that listens for events from Informers and adds object keys to // the Queue for processing @@ -158,7 +162,7 @@ func (c *Controller) Start(ctx context.Context) error { // Set the internal context. c.ctx = ctx - c.Queue = c.MakeQueue() + c.Queue = c.NewWorkQueue(c.RateLimiter) go func() { <-ctx.Done() c.Queue.ShutDown() diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index ce2245e60f..102ec1f0b2 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/cache/informertest" "sigs.k8s.io/controller-runtime/pkg/client" @@ -43,6 +44,7 @@ import ( ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/ratelimiter" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -68,7 +70,7 @@ var _ = Describe("controller", func() { ctrl = &Controller{ MaxConcurrentReconciles: 1, Do: fakeReconcile, - MakeQueue: func() workqueue.RateLimitingInterface { return queue }, + NewWorkQueue: func(ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return queue }, LogConstructor: func(_ *reconcile.Request) logr.Logger { return log.RuntimeLog.WithName("controller").WithName("test") }, @@ -408,8 +410,8 @@ var _ = Describe("controller", func() { // TODO(directxman12): we should ensure that backoff occurrs with error requeue It("should not reset backoff until there's a non-error result", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} - ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } + dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewWorkQueue(nil)} + ctrl.NewWorkQueue = func(ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -444,8 +446,8 @@ var _ = Describe("controller", func() { }) It("should requeue a Request with rate limiting if the Result sets Requeue:true and continue processing items", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} - ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } + dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewWorkQueue(nil)} + ctrl.NewWorkQueue = func(ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -474,8 +476,8 @@ var _ = Describe("controller", func() { }) It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} - ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } + dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewWorkQueue(nil)} + ctrl.NewWorkQueue = func(ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -504,8 +506,8 @@ var _ = Describe("controller", func() { }) It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() { - dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} - ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } + dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewWorkQueue(nil)} + ctrl.NewWorkQueue = func(ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq } ctx, cancel := context.WithCancel(context.Background()) defer cancel()