From 8c4d3df7b95c096af9ee15c3ae9f4ce36b728aa9 Mon Sep 17 00:00:00 2001 From: Danil Grigorev Date: Tue, 16 Apr 2024 18:02:19 +0200 Subject: [PATCH] Separate source into several interfaces - Add controller adapters Signed-off-by: Danil Grigorev --- examples/builtins/main.go | 8 +- pkg/builder/controller.go | 28 ++-- pkg/controller/controller.go | 17 +++ pkg/controller/controller_integration_test.go | 8 +- pkg/controller/controller_test.go | 3 +- pkg/controller/example_test.go | 10 +- pkg/handler/enqueue_mapped_typed.go | 16 +++ pkg/handler/example_test.go | 83 ++++++----- pkg/interfaces/source.go | 72 ++++++++++ .../recorder/recorder_integration_test.go | 3 +- pkg/internal/source/kind.go | 8 +- pkg/predicate/predicate.go | 2 +- pkg/source/example_test.go | 3 +- pkg/source/source.go | 73 ++++------ pkg/source/source_integration_test.go | 9 +- pkg/source/source_test.go | 131 +++++++++--------- 16 files changed, 267 insertions(+), 207 deletions(-) create mode 100644 pkg/interfaces/source.go diff --git a/examples/builtins/main.go b/examples/builtins/main.go index f83180a6cd..514f2663f1 100644 --- a/examples/builtins/main.go +++ b/examples/builtins/main.go @@ -59,17 +59,13 @@ func main() { } // Watch ReplicaSets and enqueue ReplicaSet object key - src := source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}) - src.Prepare(&handler.EnqueueRequestForObject{}) - if err := c.Watch(src); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}).Prepare(&handler.EnqueueRequestForObject{})); err != nil { entryLog.Error(err, "unable to watch ReplicaSets") os.Exit(1) } // Watch Pods and enqueue owning ReplicaSet key - src = source.Kind(mgr.GetCache(), &corev1.Pod{}) - src.Prepare(handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())) - if err := c.Watch(src); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}).Prepare(handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil { entryLog.Error(err, "unable to watch Pods") os.Exit(1) } diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index a3b68da323..b23468ee15 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -124,7 +124,7 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder { // WatchesInput represents the information set by Watches method. type WatchesInput struct { - src source.SourcePrepare + src source.PrepareSyncing eventHandler handler.EventHandler predicates []predicate.Predicate objectProjection objectProjection @@ -177,7 +177,7 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler // // STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead. // This method is only exposed for more advanced use cases, most users should use one of the higher level functions. -func (blder *Builder) WatchesRawSource(src source.SourcePrepare, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder { +func (blder *Builder) WatchesRawSource(src source.PrepareSyncing, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder { input := WatchesInput{src: src, eventHandler: eventHandler} for _, opt := range opts { opt.ApplyToWatches(&input) @@ -188,29 +188,24 @@ func (blder *Builder) WatchesRawSource(src source.SourcePrepare, eventHandler ha } func For[T client.Object](blder *Builder, object T, prct ...predicate.ObjectPredicate[T]) source.Source { - src := source.ObjectKind(blder.mgr.GetCache(), object) - src.PrepareObject(&handler.EnqueueRequest[T]{}, prct...) + blder.forInput = ForInput{object: object} - return src + return source.ObjectKind(blder.mgr.GetCache(), object).PrepareObject(&handler.EnqueueRequest[T]{}, prct...) } -func Owns[F, T client.Object](blder *Builder, owner F, owned T) source.Source { +func Owns[F, T client.Object](blder *Builder, owner F, owned T, prct ...predicate.ObjectPredicate[T]) source.Source { src := source.ObjectKind(blder.mgr.GetCache(), owned) hdler := handler.EnqueueRequestForOwner( blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(), owner, ) - src.PrepareObject(handler.ObjectFuncAdapter[T](hdler)) - return src + return src.PrepareObject(handler.ObjectFuncAdapter[T](hdler), prct...) } func Watches[T client.Object](blder *Builder, object T, eventHandler handler.ObjectHandler[T], prct ...predicate.ObjectPredicate[T]) source.Source { - src := source.ObjectKind(blder.mgr.GetCache(), object) - src.PrepareObject(eventHandler, prct...) - - return src + return source.ObjectKind(blder.mgr.GetCache(), object).PrepareObject(eventHandler, prct...) } func (blder *Builder) Add(src source.Source) *Builder { @@ -308,8 +303,7 @@ func (blder *Builder) doWatch() error { hdler := &handler.EnqueueRequestForObject{} allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, blder.forInput.predicates...) - src.Prepare(hdler, allPredicates...) - if err := blder.ctrl.Watch(src); err != nil { + if err := blder.ctrl.Watch(src.Prepare(hdler, allPredicates...)); err != nil { return err } } @@ -335,8 +329,7 @@ func (blder *Builder) doWatch() error { ) allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) - src.Prepare(hdler, allPredicates...) - if err := blder.ctrl.Watch(src); err != nil { + if err := blder.ctrl.Watch(src.Prepare(hdler, allPredicates...)); err != nil { return err } } @@ -356,8 +349,7 @@ func (blder *Builder) doWatch() error { } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) - w.src.Prepare(w.eventHandler, allPredicates...) - if err := blder.ctrl.Watch(w.src); err != nil { + if err := blder.ctrl.Watch(w.src.Prepare(w.eventHandler, allPredicates...)); err != nil { return err } } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d143af33df..72d15661af 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -25,8 +25,10 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/internal/controller" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/ratelimiter" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -189,3 +191,18 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller // ReconcileIDFromContext gets the reconcileID from the current context. var ReconcileIDFromContext = controller.ReconcileIDFromContext + +// ControllerAdapter is an adapter for old controller implementations +type ControllerAdapter struct { + Controller +} + +// Watch implements old controller Watch interface +func (c *ControllerAdapter) Watch(src source.Source, handler handler.EventHandler, predicates ...predicate.Predicate) error { + source, ok := src.(source.PrepareSource) + if !ok { + return fmt.Errorf("expected source to fulfill SourcePrepare interface") + } + + return c.Controller.Watch(source.Prepare(handler, predicates...)) +} diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index cf95f57141..75ec77c9e4 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -64,14 +64,10 @@ var _ = Describe("controller", func() { Expect(err).NotTo(HaveOccurred()) By("Watching Resources") - src := source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}) - src.Prepare(handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{})) - err = instance.Watch(src) + err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}).Prepare(handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}))) Expect(err).NotTo(HaveOccurred()) - src = source.Kind(cm.GetCache(), &appsv1.Deployment{}) - src.Prepare(&handler.EnqueueRequestForObject{}) - err = instance.Watch(src) + err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}).Prepare(&handler.EnqueueRequestForObject{})) Expect(err).NotTo(HaveOccurred()) err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{}) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index f26416f0bc..7ee524ae19 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -101,8 +101,7 @@ var _ = Describe("controller.Controller", func() { Expect(err).NotTo(HaveOccurred()) c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec}) - watch.Prepare(&handler.EnqueueRequestForObject{}) - Expect(c.Watch(watch)).To(Succeed()) + Expect(c.Watch(watch.Prepare(&handler.EnqueueRequestForObject{}))).To(Succeed()) Expect(err).NotTo(HaveOccurred()) go func() { diff --git a/pkg/controller/example_test.go b/pkg/controller/example_test.go index 7a9ee367d7..581abf8167 100644 --- a/pkg/controller/example_test.go +++ b/pkg/controller/example_test.go @@ -71,9 +71,7 @@ func ExampleController() { } // Watch for Pod create / update / delete events and call Reconcile - src := source.Kind(mgr.GetCache(), &corev1.Pod{}) - src.Prepare(&handler.EnqueueRequestForObject{}) - err = c.Watch(src) + err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}).Prepare(&handler.EnqueueRequestForObject{})) if err != nil { log.Error(err, "unable to watch pods") os.Exit(1) @@ -111,8 +109,7 @@ func ExampleController_unstructured() { }) // Watch for Pod create / update / delete events and call Reconcile src := source.Kind(mgr.GetCache(), u) - src.Prepare(&handler.EnqueueRequestForObject{}) - err = c.Watch(src) + err = c.Watch(src.Prepare(&handler.EnqueueRequestForObject{})) if err != nil { log.Error(err, "unable to watch pods") os.Exit(1) @@ -144,8 +141,7 @@ func ExampleNewUnmanaged() { } src := source.Kind(mgr.GetCache(), &corev1.Pod{}) - src.Prepare(&handler.EnqueueRequestForObject{}) - if err := c.Watch(src); err != nil { + if err := c.Watch(src.Prepare(&handler.EnqueueRequestForObject{})); err != nil { log.Error(err, "unable to watch pods") os.Exit(1) } diff --git a/pkg/handler/enqueue_mapped_typed.go b/pkg/handler/enqueue_mapped_typed.go index 80f54bacef..98ffe8d27e 100644 --- a/pkg/handler/enqueue_mapped_typed.go +++ b/pkg/handler/enqueue_mapped_typed.go @@ -57,6 +57,22 @@ func EnqueueRequestsFromObjectMapFunc[T any](fn ObjectMapFunc[T]) EventHandler { } } +// EnqueueRequestsFromObjectMap enqueues Requests by running a transformation function that outputs a collection +// of reconcile.Requests on each Event. The reconcile.Requests may be for an arbitrary set of objects +// defined by some user specified transformation of the source Event. (e.g. trigger Reconciler for a set of objects +// in response to a cluster resize event caused by adding or deleting a Node) +// +// EnqueueRequestsFromObjectMap is frequently used to fan-out updates from one object to one or more other +// objects of a differing type. +// +// For UpdateEvents which contain both a new and old object, the transformation function is run on both +// objects and both sets of Requests are enqueue. +func EnqueueRequestsFromObjectMap[T any](fn ObjectMapFunc[T]) ObjectHandler[T] { + return &enqueueRequestsFromObjectMapFunc[T]{ + toRequests: fn, + } +} + var _ EventHandler = &enqueueRequestsFromObjectMapFunc[any]{} var _ ObjectHandler[any] = &enqueueRequestsFromObjectMapFunc[any]{} diff --git a/pkg/handler/example_test.go b/pkg/handler/example_test.go index 273167ab6b..38ae3a9f0f 100644 --- a/pkg/handler/example_test.go +++ b/pkg/handler/example_test.go @@ -42,8 +42,7 @@ var ( func ExampleEnqueueRequestForObject() { // controller is a controller.controller src := source.Kind(mgr.GetCache(), &corev1.Pod{}) - src.Prepare(&handler.EnqueueRequestForObject{}) - err := c.Watch(src) + err := c.Watch(src.Prepare(&handler.EnqueueRequestForObject{})) if err != nil { // handle it } @@ -65,19 +64,19 @@ func ExampleEnqueueRequestForOwner() { // objects (of Type: MyKind) using a mapping function defined by the user. func ExampleEnqueueRequestsFromMapFunc() { // controller is a controller.controller - src := source.Kind(mgr.GetCache(), &appsv1.Deployment{}) - src.Prepare(handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request { - return []reconcile.Request{ - {NamespacedName: types.NamespacedName{ - Name: a.GetName() + "-1", - Namespace: a.GetNamespace(), - }}, - {NamespacedName: types.NamespacedName{ - Name: a.GetName() + "-2", - Namespace: a.GetNamespace(), - }}, - } - })) + src := source.Kind(mgr.GetCache(), &appsv1.Deployment{}). + Prepare(handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request { + return []reconcile.Request{ + {NamespacedName: types.NamespacedName{ + Name: a.GetName() + "-1", + Namespace: a.GetNamespace(), + }}, + {NamespacedName: types.NamespacedName{ + Name: a.GetName() + "-2", + Namespace: a.GetNamespace(), + }}, + } + })) err := c.Watch(src) if err != nil { // handle it @@ -87,33 +86,33 @@ func ExampleEnqueueRequestsFromMapFunc() { // This example implements handler.EnqueueRequestForObject. func ExampleFuncs() { // controller is a controller.controller - src := source.Kind(mgr.GetCache(), &corev1.Pod{}) - src.Prepare(handler.Funcs{ - CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: e.Object.GetName(), - Namespace: e.Object.GetNamespace(), - }}) - }, - UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: e.ObjectNew.GetName(), - Namespace: e.ObjectNew.GetNamespace(), - }}) - }, - DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) { - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: e.Object.GetName(), - Namespace: e.Object.GetNamespace(), - }}) - }, - GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) { - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: e.Object.GetName(), - Namespace: e.Object.GetNamespace(), - }}) - }, - }) + src := source.Kind(mgr.GetCache(), &corev1.Pod{}). + Prepare(handler.Funcs{ + CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: e.Object.GetName(), + Namespace: e.Object.GetNamespace(), + }}) + }, + UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: e.ObjectNew.GetName(), + Namespace: e.ObjectNew.GetNamespace(), + }}) + }, + DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) { + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: e.Object.GetName(), + Namespace: e.Object.GetNamespace(), + }}) + }, + GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) { + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: e.Object.GetName(), + Namespace: e.Object.GetNamespace(), + }}) + }, + }) err := c.Watch(src) if err != nil { diff --git a/pkg/interfaces/source.go b/pkg/interfaces/source.go new file mode 100644 index 0000000000..bbb6a0b7d9 --- /dev/null +++ b/pkg/interfaces/source.go @@ -0,0 +1,72 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package interfaces + +import ( + "context" + + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// Source is a source of events (e.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc) +// which should be processed by event.EventHandlers to enqueue reconcile.Requests. +// +// * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update). +// +// * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls). +// +// Users may build their own Source implementations. +type Source interface { + // Start is internal and should be called only by the Controller to register an EventHandler with the Informer + // to enqueue reconcile.Requests. + Start(context.Context, workqueue.RateLimitingInterface) error +} + +// PrepareSource: Prepares a Source to be used with EventHandler and predicates +type PrepareSource interface { + Prepare(handler.EventHandler, ...predicate.Predicate) SyncingSource +} + +// PrepareSourceObject[T]: Prepares a Source preserving the object type +type PrepareSourceObject[T any] interface { + PrepareObject(handler.ObjectHandler[T], ...predicate.ObjectPredicate[T]) SyncingSource +} + +// Syncing allows to wait for synchronization with context +type Syncing interface { + WaitForSync(ctx context.Context) error +} + +// SyncingSource is a source that needs syncing prior to being usable. The controller +// will call its WaitForSync prior to starting workers. +type SyncingSource interface { + Source + Syncing +} + +// PrepareSyncing: A SyncingSource that also implements SourcePrepare and has WaitForSync method +type PrepareSyncing interface { + SyncingSource + PrepareSource +} + +// PrepareSyncingObject[T]: A SyncingSource that also implements PrepareSourceObject[T] and has WaitForSync method +type PrepareSyncingObject[T any] interface { + SyncingSource + PrepareSourceObject[T] +} diff --git a/pkg/internal/recorder/recorder_integration_test.go b/pkg/internal/recorder/recorder_integration_test.go index b30eab6353..477a35a594 100644 --- a/pkg/internal/recorder/recorder_integration_test.go +++ b/pkg/internal/recorder/recorder_integration_test.go @@ -56,8 +56,7 @@ var _ = Describe("recorder", func() { Expect(err).NotTo(HaveOccurred()) By("Watching Resources") - src := source.Kind(cm.GetCache(), &appsv1.Deployment{}) - src.Prepare(&handler.EnqueueRequestForObject{}) + src := source.Kind(cm.GetCache(), &appsv1.Deployment{}).Prepare(&handler.EnqueueRequestForObject{}) err = instance.Watch(src) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index adf8d31721..d8537eeb80 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -14,6 +14,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/interfaces" "sigs.k8s.io/controller-runtime/pkg/predicate" ) @@ -39,14 +40,17 @@ func (ks *Kind[T]) SetPredicates(...predicate.PredicateConstraint) { panic("unimplemented") } -func (ks *Kind[T]) PrepareObject(h handler.ObjectHandler[T], prct ...predicate.ObjectPredicate[T]) { +func (ks *Kind[T]) PrepareObject(h handler.ObjectHandler[T], prct ...predicate.ObjectPredicate[T]) interfaces.SyncingSource { ks.handler = h ks.predicates = prct + + return ks } -func (ks *Kind[T]) Prepare(h handler.EventHandler, prct ...predicate.Predicate) { +func (ks *Kind[T]) Prepare(h handler.EventHandler, prct ...predicate.Predicate) interfaces.SyncingSource { ks.handler = handler.ObjectFuncAdapter[T](h) ks.predicates = predicate.ObjectPredicatesAdapter[T](prct...) + return ks } // Start is internal and should be called only by the Controller to register an EventHandler with the Informer diff --git a/pkg/predicate/predicate.go b/pkg/predicate/predicate.go index 62e9965a0d..2f45bfca66 100644 --- a/pkg/predicate/predicate.go +++ b/pkg/predicate/predicate.go @@ -609,7 +609,7 @@ func ObjectPredicateAdapter[T client.Object](h Predicate) ObjectPredicate[T] { func ObjectPredicatesAdapter[T client.Object](predicates ...Predicate) (prdt []ObjectPredicate[T]) { for _, p := range predicates { - prdt = append(prdt, ObjectPredicatesAdapter[T](p)...) + prdt = append(prdt, ObjectPredicateAdapter[T](p)) } return diff --git a/pkg/source/example_test.go b/pkg/source/example_test.go index 60cfd83c6b..9611aecb23 100644 --- a/pkg/source/example_test.go +++ b/pkg/source/example_test.go @@ -32,8 +32,7 @@ var ctrl controller.Controller // with the Name and Namespace of the Pod. func ExampleKind() { instance := source.Kind(mgr.GetCache(), &corev1.Pod{}) - instance.Prepare(&handler.EnqueueRequestForObject{}) - err := ctrl.Watch(instance) + err := ctrl.Watch(instance.Prepare(&handler.EnqueueRequestForObject{})) if err != nil { // handle it } diff --git a/pkg/source/source.go b/pkg/source/source.go index 1a5d4f9ce7..b4d96691b2 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -25,6 +25,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/interfaces" internal "sigs.k8s.io/controller-runtime/pkg/internal/source" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -36,50 +37,14 @@ const ( defaultBufferSize = 1024 ) -// Source is a source of events (e.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc) -// which should be processed by event.EventHandlers to enqueue reconcile.Requests. -// -// * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update). -// -// * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls). -// -// Users may build their own Source implementations. -type Source interface { - // Start is internal and should be called only by the Controller to register an EventHandler with the Informer - // to enqueue reconcile.Requests. - // Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error - Start(context.Context, workqueue.RateLimitingInterface) error -} - -type SourcePrepare interface { - Source - Prepare(handler.EventHandler, ...predicate.Predicate) -} - -type ObjectSourcePrepare[T any] interface { - Source - PrepareObject(handler.ObjectHandler[T], ...predicate.ObjectPredicate[T]) -} +type Source = interfaces.Source +type Syncing = interfaces.Syncing +type SyncingSource = interfaces.SyncingSource +type PrepareSyncing = interfaces.PrepareSyncing +type PrepareSource = interfaces.PrepareSource -// SyncingSource is a source that needs syncing prior to being usable. The controller -// will call its WaitForSync prior to starting workers. -type SyncingSource interface { - Source - Syncing -} - -type PrepareSyncing interface { - SyncingSource - SourcePrepare -} - -type ObjectPrepare[T any] interface { - SyncingSource - ObjectSourcePrepare[T] -} - -type Syncing interface { - WaitForSync(ctx context.Context) error +type PrepareSyncingObject[T any] interface { + interfaces.PrepareSyncingObject[T] } // Kind creates a KindSource with the given cache provider. @@ -88,11 +53,11 @@ func Kind(cache cache.Cache, object client.Object) PrepareSyncing { } // ObjectKind creates a typed KindSource with the given cache provider. -func ObjectKind[T client.Object](cache cache.Cache, object T) ObjectPrepare[T] { +func ObjectKind[T client.Object](cache cache.Cache, object T) PrepareSyncingObject[T] { return &internal.Kind[T]{Type: object, Cache: cache} } -var _ SourcePrepare = &Channel{} +var _ PrepareSource = &Channel{} // Channel is used to provide a source of events originating outside the cluster // (e.g. GitHub Webhook callback). Channel requires the user to wire the external @@ -123,13 +88,19 @@ func (cs *Channel) String() string { return fmt.Sprintf("channel source: %p", cs) } +func (cs *Channel) WaitForSync(ctx context.Context) error { + return nil +} + // Prepare implements Source preparation and should only be called when handler and predicates are available. func (cs *Channel) Prepare( handler handler.EventHandler, prct ...predicate.Predicate, -) { +) SyncingSource { cs.predicates = prct cs.handler = handler + + return cs } // Start implements Source and should only be called by the Controller. @@ -237,14 +208,20 @@ type Informer struct { handler handler.EventHandler } -var _ SourcePrepare = &Informer{} +var _ PrepareSource = &Informer{} func (is *Informer) Prepare( h handler.EventHandler, prct ...predicate.Predicate, -) { +) SyncingSource { is.handler = h is.predicates = prct + + return is +} + +func (cs *Informer) WaitForSync(ctx context.Context) error { + return nil } // Start is internal and should be called only by the Controller to register an EventHandler with the Informer diff --git a/pkg/source/source_integration_test.go b/pkg/source/source_integration_test.go index 2d724b6508..a1d9872b61 100644 --- a/pkg/source/source_integration_test.go +++ b/pkg/source/source_integration_test.go @@ -24,6 +24,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/interfaces" "sigs.k8s.io/controller-runtime/pkg/source" . "github.com/onsi/ginkgo/v2" @@ -37,7 +38,7 @@ import ( ) var _ = Describe("Source", func() { - var instance1, instance2 source.SourcePrepare + var instance1, instance2 interfaces.PrepareSource var obj client.Object var q workqueue.RateLimitingInterface var c1, c2 chan interface{} @@ -124,10 +125,8 @@ var _ = Describe("Source", func() { handler2 := newHandler(c2) // Create 2 instances - instance1.Prepare(handler1) - instance2.Prepare(handler2) - Expect(instance1.Start(ctx, q)).To(Succeed()) - Expect(instance2.Start(ctx, q)).To(Succeed()) + Expect(instance1.Prepare(handler1).Start(ctx, q)).To(Succeed()) + Expect(instance2.Prepare(handler2).Start(ctx, q)).To(Succeed()) By("Creating a Deployment and expecting the CreateEvent.") created, err = client.Create(ctx, deployment, metav1.CreateOptions{}) diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index 8d7cedd164..11b05c751d 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -66,27 +66,27 @@ var _ = Describe("Source", func() { } q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := source.Kind(ic, &corev1.Pod{}) - instance.Prepare(handler.Funcs{ - CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Expect(q2).To(Equal(q)) - Expect(evt.Object).To(Equal(p)) - close(c) - }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected UpdateEvent") - }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected DeleteEvent") - }, - GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected GenericEvent") - }, - }) + instance := source.Kind(ic, &corev1.Pod{}). + Prepare(handler.Funcs{ + CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Expect(q2).To(Equal(q)) + Expect(evt.Object).To(Equal(p)) + close(c) + }, + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected UpdateEvent") + }, + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected DeleteEvent") + }, + GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected GenericEvent") + }, + }) err := instance.Start(ctx, q) Expect(err).NotTo(HaveOccurred()) Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred()) @@ -104,30 +104,30 @@ var _ = Describe("Source", func() { ic := &informertest.FakeInformers{} q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := source.Kind(ic, &corev1.Pod{}) - instance.Prepare(handler.Funcs{ - CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected CreateEvent") - }, - UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Expect(q2).To(BeIdenticalTo(q)) - Expect(evt.ObjectOld).To(Equal(p)) + instance := source.Kind(ic, &corev1.Pod{}). + Prepare(handler.Funcs{ + CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected CreateEvent") + }, + UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Expect(q2).To(BeIdenticalTo(q)) + Expect(evt.ObjectOld).To(Equal(p)) - Expect(evt.ObjectNew).To(Equal(p2)) + Expect(evt.ObjectNew).To(Equal(p2)) - close(c) - }, - DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected DeleteEvent") - }, - GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected GenericEvent") - }, - }) + close(c) + }, + DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected DeleteEvent") + }, + GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected GenericEvent") + }, + }) err := instance.Start(ctx, q) Expect(err).NotTo(HaveOccurred()) Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred()) @@ -150,27 +150,27 @@ var _ = Describe("Source", func() { } q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := source.Kind(ic, &corev1.Pod{}) - instance.Prepare(handler.Funcs{ - CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected DeleteEvent") - }, - UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected UpdateEvent") - }, - DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Expect(q2).To(BeIdenticalTo(q)) - Expect(evt.Object).To(Equal(p)) - close(c) - }, - GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { - defer GinkgoRecover() - Fail("Unexpected GenericEvent") - }, - }) + instance := source.Kind(ic, &corev1.Pod{}). + Prepare(handler.Funcs{ + CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected DeleteEvent") + }, + UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected UpdateEvent") + }, + DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Expect(q2).To(BeIdenticalTo(q)) + Expect(evt.Object).To(Equal(p)) + close(c) + }, + GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected GenericEvent") + }, + }) err := instance.Start(ctx, q) Expect(err).NotTo(HaveOccurred()) Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred()) @@ -230,8 +230,7 @@ var _ = Describe("Source", func() { defer cancel() instance := source.Kind(ic, &corev1.Pod{}) - instance.Prepare(handler.Funcs{}) - err := instance.Start(ctx, q) + err := instance.Prepare(handler.Funcs{}).Start(ctx, q) Expect(err).NotTo(HaveOccurred()) Eventually(instance.WaitForSync).WithArguments(context.Background()).Should(HaveOccurred()) })