diff --git a/example_test.go b/example_test.go index e86719480f..d3a113ec39 100644 --- a/example_test.go +++ b/example_test.go @@ -87,7 +87,7 @@ func GenericExample() { b := ctrl.NewControllerManagedBy(manager) // Create the Controller // ReplicaSet is the Application API - b.Add(builder.For(manager, &appsv1.ReplicaSet{})). + err = b.Add(builder.For(manager, &appsv1.ReplicaSet{})). Add(builder.Owns(manager, &appsv1.ReplicaSet{}, &corev1.Pod{})). // ReplicaSet owns Pods created by it Complete(&ReplicaSetReconciler{Client: manager.GetClient()}) if err != nil { diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index f75bfaaaec..e495749abc 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -187,10 +187,12 @@ func (blder *Builder) WatchesRawSource(src source.PrepareSyncing, eventHandler h return blder } +// For defines the type of Object being reconciled and allows to respond to object events inheriting the object type at all cases. func For[T client.Object](mgr manager.Manager, object T, prct ...predicate.ObjectPredicate[T]) source.Source { return source.ObjectKind(mgr.GetCache(), object).PrepareObject(&handler.EnqueueRequest[T]{}, prct...) } +// Owns defines the type of owner and owned objects to watch with predicates inheriting the owned object type applying to owned object. func Owns[F, T client.Object](mgr manager.Manager, owner F, owned T, prct ...predicate.ObjectPredicate[T]) source.Source { src := source.ObjectKind(mgr.GetCache(), owned) @@ -202,10 +204,12 @@ func Owns[F, T client.Object](mgr manager.Manager, owner F, owned T, prct ...pre return src.PrepareObject(handler.ObjectFuncAdapter[T](hdler), prct...) } +// Watches defines the type of object to watch with ObjectHandler and predicates inheriting the object type. func Watches[T client.Object](mgr manager.Manager, object T, eventHandler handler.ObjectHandler[T], prct ...predicate.ObjectPredicate[T]) source.Source { return source.ObjectKind(mgr.GetCache(), object).PrepareObject(eventHandler, prct...) } +// Add allows to pass a prepared source object with a fully defined event handler and predicates list. func (blder *Builder) Add(src source.Source) *Builder { blder.rawWatches = append(blder.rawWatches, src) return blder diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index c104168aa5..e23045bf40 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -96,14 +96,6 @@ type Informers interface { client.FieldIndexer } -type ObjectInformers[T any] interface { - Informer - - // GetInformer fetches or constructs an informer for the given object that corresponds to a single - // API kind and resource. - GetInformer(ctx context.Context, obj T, opts ...InformerGetOption) (Informer, error) -} - // Informer allows you to interact with the underlying informer. type Informer interface { // AddEventHandler adds an event handler to the shared informer using the shared informer's resync diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 72d15661af..3f91769c03 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -192,13 +192,13 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller // ReconcileIDFromContext gets the reconcileID from the current context. var ReconcileIDFromContext = controller.ReconcileIDFromContext -// ControllerAdapter is an adapter for old controller implementations -type ControllerAdapter struct { +// Adapter is an adapter for old controller implementations +type Adapter struct { Controller } // Watch implements old controller Watch interface -func (c *ControllerAdapter) Watch(src source.Source, handler handler.EventHandler, predicates ...predicate.Predicate) error { +func (c *Adapter) Watch(src source.Source, handler handler.EventHandler, predicates ...predicate.Predicate) error { source, ok := src.(source.PrepareSource) if !ok { return fmt.Errorf("expected source to fulfill SourcePrepare interface") diff --git a/pkg/handler/enqueue_mapped.go b/pkg/handler/enqueue_mapped.go index c3a24089f7..aab04792be 100644 --- a/pkg/handler/enqueue_mapped.go +++ b/pkg/handler/enqueue_mapped.go @@ -38,7 +38,9 @@ type MapFunc func(context.Context, client.Object) []reconcile.Request // For UpdateEvents which contain both a new and old object, the transformation function is run on both // objects and both sets of Requests are enqueue. func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler { - return &enqueueRequestsFromObjectMapFunc[any]{ - toRequests: MapFuncAdapter(fn), + return &enqueueRequestsFromObjectMapFunc[client.Object]{ + toRequests: func(ctx context.Context, obj client.Object) (reqs []reconcile.Request) { + return fn(ctx, obj) + }, } } diff --git a/pkg/handler/enqueue_mapped_typed.go b/pkg/handler/enqueue_mapped_typed.go index 98ffe8d27e..125c772e88 100644 --- a/pkg/handler/enqueue_mapped_typed.go +++ b/pkg/handler/enqueue_mapped_typed.go @@ -20,7 +20,6 @@ import ( "context" "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -30,17 +29,6 @@ import ( // Unlike MapFunc, a specific object type can be used to process and create mapping requests. type ObjectMapFunc[T any] func(context.Context, T) []reconcile.Request -func MapFuncAdapter(m MapFunc) ObjectMapFunc[any] { - return func(ctx context.Context, a any) (reqs []reconcile.Request) { - obj, ok := a.(client.Object) - if ok { - return m(ctx, obj) - } - - return []reconcile.Request{} - } -} - // EnqueueRequestsFromObjectMapFunc enqueues Requests by running a transformation function that outputs a collection // of reconcile.Requests on each Event. The reconcile.Requests may be for an arbitrary set of objects // defined by some user specified transformation of the source Event. (e.g. trigger Reconciler for a set of objects @@ -116,10 +104,10 @@ func (e *enqueueRequestsFromObjectMapFunc[T]) Create(ctx context.Context, evt ev // Update implements EventHandler. func (e *enqueueRequestsFromObjectMapFunc[T]) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { - old, okOld := evt.ObjectOld.(T) - new, okNew := evt.ObjectNew.(T) + oldObj, okOld := evt.ObjectOld.(T) + newObj, okNew := evt.ObjectNew.(T) if okOld && okNew { - e.OnUpdate(ctx, old, new, q) + e.OnUpdate(ctx, oldObj, newObj, q) } } diff --git a/pkg/handler/enqueue_typed.go b/pkg/handler/enqueue_typed.go index 5801ee9f67..b26905d2ff 100644 --- a/pkg/handler/enqueue_typed.go +++ b/pkg/handler/enqueue_typed.go @@ -29,6 +29,7 @@ import ( var _ EventHandler = &EnqueueRequest[metav1.Object]{} var _ ObjectHandler[metav1.Object] = &EnqueueRequest[metav1.Object]{} +// Request is a minimal subset of a client.Object interface, allowing to enact on non kubernetes resources. type Request interface { GetName() string GetNamespace() string diff --git a/pkg/handler/eventhandler.go b/pkg/handler/eventhandler.go index 121e675b19..cab0ef002e 100644 --- a/pkg/handler/eventhandler.go +++ b/pkg/handler/eventhandler.go @@ -125,7 +125,7 @@ var _ EventHandler = Funcs{} var _ EventHandler = ObjectFuncs[any]{} var _ ObjectHandler[any] = ObjectFuncs[any]{} -// Funcs is a function that implements Predicate. +// ObjectFuncs is a function that implements ObjectPredicate. type ObjectFuncs[T any] struct { // Create is called in response to an add event. Defaults to no-op. // RateLimitingInterface is used to enqueue reconcile.Requests. @@ -146,10 +146,10 @@ type ObjectFuncs[T any] struct { // Update implements Predicate. func (p ObjectFuncs[T]) Update(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { - new, ok := e.ObjectNew.(T) - old, oldOk := e.ObjectOld.(T) - if ok && oldOk { - p.OnUpdate(ctx, old, new, q) + objNew, newOk := e.ObjectNew.(T) + objOld, oldOk := e.ObjectOld.(T) + if newOk && oldOk { + p.OnUpdate(ctx, objOld, objNew, q) } } @@ -177,34 +177,35 @@ func (p ObjectFuncs[T]) Delete(ctx context.Context, e event.DeleteEvent, q workq } } -// Update implements Predicate. +// OnUpdate implements ObjectPredicate. func (p ObjectFuncs[T]) OnUpdate(ctx context.Context, old, new T, q workqueue.RateLimitingInterface) { if p.UpdateFunc != nil { p.UpdateFunc(ctx, old, new, q) } } -// Generic implements Predicate. +// OnGeneric implements ObjectPredicate. func (p ObjectFuncs[T]) OnGeneric(ctx context.Context, obj T, q workqueue.RateLimitingInterface) { if p.GenericFunc != nil { p.GenericFunc(ctx, obj, q) } } -// Create implements Predicate. +// OnCreate implements ObjectPredicate. func (p ObjectFuncs[T]) OnCreate(ctx context.Context, obj T, q workqueue.RateLimitingInterface) { if p.CreateFunc != nil { p.CreateFunc(ctx, obj, q) } } -// Delete implements Predicate. +// OnDelete implements ObjectPredicate. func (p ObjectFuncs[T]) OnDelete(ctx context.Context, obj T, q workqueue.RateLimitingInterface) { if p.DeleteFunc != nil { p.DeleteFunc(ctx, obj, q) } } +// ObjectFuncAdapter allows to reuse existing EventHandler for a typed ObjectHandler func ObjectFuncAdapter[T client.Object](h EventHandler) ObjectHandler[T] { return ObjectFuncs[T]{ CreateFunc: func(ctx context.Context, obj T, queue workqueue.RateLimitingInterface) { @@ -222,6 +223,7 @@ func ObjectFuncAdapter[T client.Object](h EventHandler) ObjectHandler[T] { } } +// EventHandlerAdapter allows to reuse existing typed event handler as EventHandler func EventHandlerAdapter[T client.Object](h ObjectHandler[T]) EventHandler { return Funcs{ CreateFunc: func(ctx context.Context, e event.CreateEvent, queue workqueue.RateLimitingInterface) { @@ -243,10 +245,10 @@ func EventHandlerAdapter[T client.Object](h ObjectHandler[T]) EventHandler { } }, UpdateFunc: func(ctx context.Context, e event.UpdateEvent, queue workqueue.RateLimitingInterface) { - new, ok := e.ObjectNew.(T) - old, oldOk := e.ObjectOld.(T) - if ok && oldOk { - h.OnUpdate(ctx, old, new, queue) + objNew, newOk := e.ObjectNew.(T) + objOld, oldOk := e.ObjectOld.(T) + if newOk && oldOk { + h.OnUpdate(ctx, objOld, objNew, queue) } }, } diff --git a/pkg/interfaces/source.go b/pkg/interfaces/source.go index bbb6a0b7d9..0e7f77f592 100644 --- a/pkg/interfaces/source.go +++ b/pkg/interfaces/source.go @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package interfaces import ( @@ -37,12 +38,12 @@ type Source interface { Start(context.Context, workqueue.RateLimitingInterface) error } -// PrepareSource: Prepares a Source to be used with EventHandler and predicates +// PrepareSource - Prepares a Source to be used with EventHandler and predicates type PrepareSource interface { Prepare(handler.EventHandler, ...predicate.Predicate) SyncingSource } -// PrepareSourceObject[T]: Prepares a Source preserving the object type +// PrepareSourceObject - Prepares a Source preserving the object type type PrepareSourceObject[T any] interface { PrepareObject(handler.ObjectHandler[T], ...predicate.ObjectPredicate[T]) SyncingSource } @@ -59,13 +60,13 @@ type SyncingSource interface { Syncing } -// PrepareSyncing: A SyncingSource that also implements SourcePrepare and has WaitForSync method +// PrepareSyncing - a SyncingSource that also implements SourcePrepare and has WaitForSync method type PrepareSyncing interface { SyncingSource PrepareSource } -// PrepareSyncingObject[T]: A SyncingSource that also implements PrepareSourceObject[T] and has WaitForSync method +// PrepareSyncingObject - a SyncingSource that also implements PrepareSourceObject[T] and has WaitForSync method type PrepareSyncingObject[T any] interface { SyncingSource PrepareSourceObject[T] diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index d8537eeb80..73c2d5b3cb 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -35,11 +35,7 @@ type Kind[T client.Object] struct { handler handler.ObjectHandler[T] } -// SetPredicates implements source.SyncingSource. -func (ks *Kind[T]) SetPredicates(...predicate.PredicateConstraint) { - panic("unimplemented") -} - +// PrepareObject implements PrepareSyncingObject preparation and should only be called when handler and predicates are available. func (ks *Kind[T]) PrepareObject(h handler.ObjectHandler[T], prct ...predicate.ObjectPredicate[T]) interfaces.SyncingSource { ks.handler = h ks.predicates = prct @@ -47,6 +43,7 @@ func (ks *Kind[T]) PrepareObject(h handler.ObjectHandler[T], prct ...predicate.O return ks } +// Prepare implements Source preparation and should only be called when handler and predicates are available. func (ks *Kind[T]) Prepare(h handler.EventHandler, prct ...predicate.Predicate) interfaces.SyncingSource { ks.handler = handler.ObjectFuncAdapter[T](h) ks.predicates = predicate.ObjectPredicatesAdapter[T](prct...) @@ -59,10 +56,10 @@ func (ks *Kind[T]) Start( ctx context.Context, queue workqueue.RateLimitingInterface, ) error { - return ks.Run(ctx, ks.handler, queue, ks.predicates...) + return ks.run(ctx, ks.handler, queue, ks.predicates...) } -func (ks *Kind[T]) Run(ctx context.Context, handler handler.ObjectHandler[T], queue workqueue.RateLimitingInterface, +func (ks *Kind[T]) run(ctx context.Context, handler handler.ObjectHandler[T], queue workqueue.RateLimitingInterface, prct ...predicate.ObjectPredicate[T]) error { if reflect.DeepEqual(ks.Type, *new(T)) { return fmt.Errorf("must create Kind with a non-nil object") diff --git a/pkg/predicate/predicate.go b/pkg/predicate/predicate.go index 2f45bfca66..288493807e 100644 --- a/pkg/predicate/predicate.go +++ b/pkg/predicate/predicate.go @@ -28,10 +28,6 @@ import ( var log = logf.RuntimeLog.WithName("predicate").WithName("eventFilters") -type PredicateConstraint interface { - Register() -} - // Predicate filters events before enqueuing the keys. type Predicate interface { // Create returns true if the Create event should be processed @@ -120,7 +116,7 @@ func (p Funcs) Generic(e event.GenericEvent) bool { return true } -// Funcs is a function that implements Predicate. +// ObjectFuncs is a function that implements Predicate and ObjectPrediace. type ObjectFuncs[T any] struct { // Create returns true if the Create event should be processed CreateFunc func(obj T) bool @@ -137,9 +133,9 @@ type ObjectFuncs[T any] struct { // Update implements Predicate. func (p ObjectFuncs[T]) Update(e event.UpdateEvent) bool { - new, ok := e.ObjectNew.(T) - old, oldOk := e.ObjectOld.(T) - return ok && oldOk && p.OnUpdate(old, new) + newObj, newOk := e.ObjectNew.(T) + oldObj, oldOk := e.ObjectOld.(T) + return newOk && oldOk && p.OnUpdate(oldObj, newObj) } // Generic implements Predicate. @@ -160,22 +156,22 @@ func (p ObjectFuncs[T]) Delete(e event.DeleteEvent) bool { return ok && p.OnDelete(obj) } -// Update implements Predicate. +// OnUpdate implements ObjectPredicate. func (p ObjectFuncs[T]) OnUpdate(old, new T) bool { return p.UpdateFunc == nil || p.UpdateFunc(old, new) } -// Generic implements Predicate. +// OnGeneric implements ObjectPredicate. func (p ObjectFuncs[T]) OnGeneric(obj T) bool { return p.GenericFunc == nil || p.GenericFunc(obj) } -// Create implements Predicate. +// OnCreate implements ObjectPredicate. func (p ObjectFuncs[T]) OnCreate(obj T) bool { return p.CreateFunc == nil || p.CreateFunc(obj) } -// Delete implements Predicate. +// OnDelete implements ObjectPredicate. func (p ObjectFuncs[T]) OnDelete(obj T) bool { return p.DeleteFunc == nil || p.DeleteFunc(obj) } @@ -200,7 +196,7 @@ func NewPredicateFuncs(filter func(object client.Object) bool) Funcs { } } -// NewPredicateFuncs returns a predicate funcs that applies the given filter function +// NewObjectPredicateFuncs returns a typed predicate funcs that applies the given filter function // on CREATE, UPDATE, DELETE and GENERIC events. For UPDATE events, the filter is applied // to the new object. func NewObjectPredicateFuncs[T any](filter func(object T) bool) ObjectFuncs[T] { @@ -590,6 +586,7 @@ func LabelSelectorPredicate(s metav1.LabelSelector) (Predicate, error) { }), nil } +// ObjectPredicateAdapter allows to reuse existing predicate as a typed ObjectPredicate func ObjectPredicateAdapter[T client.Object](h Predicate) ObjectPredicate[T] { return ObjectFuncs[T]{ CreateFunc: func(obj T) bool { @@ -607,6 +604,7 @@ func ObjectPredicateAdapter[T client.Object](h Predicate) ObjectPredicate[T] { } } +// ObjectPredicatesAdapter allows to reuse existing set of predicates as ObjectPredicates func ObjectPredicatesAdapter[T client.Object](predicates ...Predicate) (prdt []ObjectPredicate[T]) { for _, p := range predicates { prdt = append(prdt, ObjectPredicateAdapter[T](p)) diff --git a/pkg/source/source.go b/pkg/source/source.go index b4d96691b2..c70a6dacae 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -37,12 +37,30 @@ const ( defaultBufferSize = 1024 ) +// Source is a source of events (e.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc) +// which should be processed by event.EventHandlers to enqueue reconcile.Requests. +// +// * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update). +// +// * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls). +// +// Users may build their own Source implementations. type Source = interfaces.Source + +// Syncing allows to wait for synchronization with context type Syncing = interfaces.Syncing + +// SyncingSource is a source that needs syncing prior to being usable. The controller +// will call its WaitForSync prior to starting workers. type SyncingSource = interfaces.SyncingSource + +// PrepareSyncing - a SyncingSource that also implements SourcePrepare and has WaitForSync method type PrepareSyncing = interfaces.PrepareSyncing + +// PrepareSource - Prepares a Source to be used with EventHandler and predicates type PrepareSource = interfaces.PrepareSource +// PrepareSyncingObject - a SyncingSource that also implements PrepareSourceObject[T] and has WaitForSync method type PrepareSyncingObject[T any] interface { interfaces.PrepareSyncingObject[T] } @@ -88,6 +106,7 @@ func (cs *Channel) String() string { return fmt.Sprintf("channel source: %p", cs) } +// WaitForSync implements the source.SyncingSource interface func (cs *Channel) WaitForSync(ctx context.Context) error { return nil } @@ -210,6 +229,7 @@ type Informer struct { var _ PrepareSource = &Informer{} +// Prepare implements the source.PrepareSyncing interface func (is *Informer) Prepare( h handler.EventHandler, prct ...predicate.Predicate, @@ -220,7 +240,8 @@ func (is *Informer) Prepare( return is } -func (cs *Informer) WaitForSync(ctx context.Context) error { +// WaitForSync implements the source.SyncingSource interface +func (is *Informer) WaitForSync(ctx context.Context) error { return nil }