Skip to content

Commit

Permalink
Separate source into several interfaces
Browse files Browse the repository at this point in the history
- Add controller adapters

Signed-off-by: Danil Grigorev <danil.grigorev@suse.com>
  • Loading branch information
Danil-Grigorev committed Apr 17, 2024
1 parent 3d2371d commit 8c4d3df
Show file tree
Hide file tree
Showing 16 changed files with 267 additions and 207 deletions.
8 changes: 2 additions & 6 deletions examples/builtins/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,13 @@ func main() {
}

// Watch ReplicaSets and enqueue ReplicaSet object key
src := source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{})
src.Prepare(&handler.EnqueueRequestForObject{})
if err := c.Watch(src); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}).Prepare(&handler.EnqueueRequestForObject{})); err != nil {
entryLog.Error(err, "unable to watch ReplicaSets")
os.Exit(1)
}

// Watch Pods and enqueue owning ReplicaSet key
src = source.Kind(mgr.GetCache(), &corev1.Pod{})
src.Prepare(handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))
if err := c.Watch(src); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}).Prepare(handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
entryLog.Error(err, "unable to watch Pods")
os.Exit(1)
}
Expand Down
28 changes: 10 additions & 18 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {

// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
src source.SourcePrepare
src source.PrepareSyncing
eventHandler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
Expand Down Expand Up @@ -177,7 +177,7 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
//
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
func (blder *Builder) WatchesRawSource(src source.SourcePrepare, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
func (blder *Builder) WatchesRawSource(src source.PrepareSyncing, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
input := WatchesInput{src: src, eventHandler: eventHandler}
for _, opt := range opts {
opt.ApplyToWatches(&input)
Expand All @@ -188,29 +188,24 @@ func (blder *Builder) WatchesRawSource(src source.SourcePrepare, eventHandler ha
}

func For[T client.Object](blder *Builder, object T, prct ...predicate.ObjectPredicate[T]) source.Source {
src := source.ObjectKind(blder.mgr.GetCache(), object)
src.PrepareObject(&handler.EnqueueRequest[T]{}, prct...)
blder.forInput = ForInput{object: object}

return src
return source.ObjectKind(blder.mgr.GetCache(), object).PrepareObject(&handler.EnqueueRequest[T]{}, prct...)
}

func Owns[F, T client.Object](blder *Builder, owner F, owned T) source.Source {
func Owns[F, T client.Object](blder *Builder, owner F, owned T, prct ...predicate.ObjectPredicate[T]) source.Source {
src := source.ObjectKind(blder.mgr.GetCache(), owned)

hdler := handler.EnqueueRequestForOwner(
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
owner,
)
src.PrepareObject(handler.ObjectFuncAdapter[T](hdler))

return src
return src.PrepareObject(handler.ObjectFuncAdapter[T](hdler), prct...)
}

func Watches[T client.Object](blder *Builder, object T, eventHandler handler.ObjectHandler[T], prct ...predicate.ObjectPredicate[T]) source.Source {
src := source.ObjectKind(blder.mgr.GetCache(), object)
src.PrepareObject(eventHandler, prct...)

return src
return source.ObjectKind(blder.mgr.GetCache(), object).PrepareObject(eventHandler, prct...)
}

func (blder *Builder) Add(src source.Source) *Builder {
Expand Down Expand Up @@ -308,8 +303,7 @@ func (blder *Builder) doWatch() error {
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, blder.forInput.predicates...)
src.Prepare(hdler, allPredicates...)
if err := blder.ctrl.Watch(src); err != nil {
if err := blder.ctrl.Watch(src.Prepare(hdler, allPredicates...)); err != nil {
return err
}
}
Expand All @@ -335,8 +329,7 @@ func (blder *Builder) doWatch() error {
)
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
src.Prepare(hdler, allPredicates...)
if err := blder.ctrl.Watch(src); err != nil {
if err := blder.ctrl.Watch(src.Prepare(hdler, allPredicates...)); err != nil {
return err
}
}
Expand All @@ -356,8 +349,7 @@ func (blder *Builder) doWatch() error {
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
w.src.Prepare(w.eventHandler, allPredicates...)
if err := blder.ctrl.Watch(w.src); err != nil {
if err := blder.ctrl.Watch(w.src.Prepare(w.eventHandler, allPredicates...)); err != nil {
return err
}
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand Down Expand Up @@ -189,3 +191,18 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller

// ReconcileIDFromContext gets the reconcileID from the current context.
var ReconcileIDFromContext = controller.ReconcileIDFromContext

// ControllerAdapter is an adapter for old controller implementations
type ControllerAdapter struct {
Controller
}

// Watch implements old controller Watch interface
func (c *ControllerAdapter) Watch(src source.Source, handler handler.EventHandler, predicates ...predicate.Predicate) error {
source, ok := src.(source.PrepareSource)
if !ok {
return fmt.Errorf("expected source to fulfill SourcePrepare interface")
}

return c.Controller.Watch(source.Prepare(handler, predicates...))
}
8 changes: 2 additions & 6 deletions pkg/controller/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,10 @@ var _ = Describe("controller", func() {
Expect(err).NotTo(HaveOccurred())

By("Watching Resources")
src := source.Kind(cm.GetCache(), &appsv1.ReplicaSet{})
src.Prepare(handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}))
err = instance.Watch(src)
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}).Prepare(handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{})))
Expect(err).NotTo(HaveOccurred())

src = source.Kind(cm.GetCache(), &appsv1.Deployment{})
src.Prepare(&handler.EnqueueRequestForObject{})
err = instance.Watch(src)
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}).Prepare(&handler.EnqueueRequestForObject{}))
Expect(err).NotTo(HaveOccurred())

err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ var _ = Describe("controller.Controller", func() {
Expect(err).NotTo(HaveOccurred())

c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
watch.Prepare(&handler.EnqueueRequestForObject{})
Expect(c.Watch(watch)).To(Succeed())
Expect(c.Watch(watch.Prepare(&handler.EnqueueRequestForObject{}))).To(Succeed())
Expect(err).NotTo(HaveOccurred())

go func() {
Expand Down
10 changes: 3 additions & 7 deletions pkg/controller/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ func ExampleController() {
}

// Watch for Pod create / update / delete events and call Reconcile
src := source.Kind(mgr.GetCache(), &corev1.Pod{})
src.Prepare(&handler.EnqueueRequestForObject{})
err = c.Watch(src)
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}).Prepare(&handler.EnqueueRequestForObject{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -111,8 +109,7 @@ func ExampleController_unstructured() {
})
// Watch for Pod create / update / delete events and call Reconcile
src := source.Kind(mgr.GetCache(), u)
src.Prepare(&handler.EnqueueRequestForObject{})
err = c.Watch(src)
err = c.Watch(src.Prepare(&handler.EnqueueRequestForObject{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -144,8 +141,7 @@ func ExampleNewUnmanaged() {
}

src := source.Kind(mgr.GetCache(), &corev1.Pod{})
src.Prepare(&handler.EnqueueRequestForObject{})
if err := c.Watch(src); err != nil {
if err := c.Watch(src.Prepare(&handler.EnqueueRequestForObject{})); err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/handler/enqueue_mapped_typed.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ func EnqueueRequestsFromObjectMapFunc[T any](fn ObjectMapFunc[T]) EventHandler {
}
}

// EnqueueRequestsFromObjectMap enqueues Requests by running a transformation function that outputs a collection
// of reconcile.Requests on each Event. The reconcile.Requests may be for an arbitrary set of objects
// defined by some user specified transformation of the source Event. (e.g. trigger Reconciler for a set of objects
// in response to a cluster resize event caused by adding or deleting a Node)
//
// EnqueueRequestsFromObjectMap is frequently used to fan-out updates from one object to one or more other
// objects of a differing type.
//
// For UpdateEvents which contain both a new and old object, the transformation function is run on both
// objects and both sets of Requests are enqueue.
func EnqueueRequestsFromObjectMap[T any](fn ObjectMapFunc[T]) ObjectHandler[T] {
return &enqueueRequestsFromObjectMapFunc[T]{
toRequests: fn,
}
}

var _ EventHandler = &enqueueRequestsFromObjectMapFunc[any]{}
var _ ObjectHandler[any] = &enqueueRequestsFromObjectMapFunc[any]{}

Expand Down
83 changes: 41 additions & 42 deletions pkg/handler/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ var (
func ExampleEnqueueRequestForObject() {
// controller is a controller.controller
src := source.Kind(mgr.GetCache(), &corev1.Pod{})
src.Prepare(&handler.EnqueueRequestForObject{})
err := c.Watch(src)
err := c.Watch(src.Prepare(&handler.EnqueueRequestForObject{}))
if err != nil {
// handle it
}
Expand All @@ -65,19 +64,19 @@ func ExampleEnqueueRequestForOwner() {
// objects (of Type: MyKind) using a mapping function defined by the user.
func ExampleEnqueueRequestsFromMapFunc() {
// controller is a controller.controller
src := source.Kind(mgr.GetCache(), &appsv1.Deployment{})
src.Prepare(handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request {
return []reconcile.Request{
{NamespacedName: types.NamespacedName{
Name: a.GetName() + "-1",
Namespace: a.GetNamespace(),
}},
{NamespacedName: types.NamespacedName{
Name: a.GetName() + "-2",
Namespace: a.GetNamespace(),
}},
}
}))
src := source.Kind(mgr.GetCache(), &appsv1.Deployment{}).
Prepare(handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request {
return []reconcile.Request{
{NamespacedName: types.NamespacedName{
Name: a.GetName() + "-1",
Namespace: a.GetNamespace(),
}},
{NamespacedName: types.NamespacedName{
Name: a.GetName() + "-2",
Namespace: a.GetNamespace(),
}},
}
}))
err := c.Watch(src)
if err != nil {
// handle it
Expand All @@ -87,33 +86,33 @@ func ExampleEnqueueRequestsFromMapFunc() {
// This example implements handler.EnqueueRequestForObject.
func ExampleFuncs() {
// controller is a controller.controller
src := source.Kind(mgr.GetCache(), &corev1.Pod{})
src.Prepare(handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.ObjectNew.GetName(),
Namespace: e.ObjectNew.GetNamespace(),
}})
},
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
},
})
src := source.Kind(mgr.GetCache(), &corev1.Pod{}).
Prepare(handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.ObjectNew.GetName(),
Namespace: e.ObjectNew.GetNamespace(),
}})
},
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
},
GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
Namespace: e.Object.GetNamespace(),
}})
},
})

err := c.Watch(src)
if err != nil {
Expand Down
72 changes: 72 additions & 0 deletions pkg/interfaces/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package interfaces

import (
"context"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// Source is a source of events (e.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
// which should be processed by event.EventHandlers to enqueue reconcile.Requests.
//
// * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update).
//
// * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls).
//
// Users may build their own Source implementations.
type Source interface {
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
Start(context.Context, workqueue.RateLimitingInterface) error
}

// PrepareSource: Prepares a Source to be used with EventHandler and predicates
type PrepareSource interface {
Prepare(handler.EventHandler, ...predicate.Predicate) SyncingSource
}

// PrepareSourceObject[T]: Prepares a Source preserving the object type
type PrepareSourceObject[T any] interface {
PrepareObject(handler.ObjectHandler[T], ...predicate.ObjectPredicate[T]) SyncingSource
}

// Syncing allows to wait for synchronization with context
type Syncing interface {
WaitForSync(ctx context.Context) error
}

// SyncingSource is a source that needs syncing prior to being usable. The controller
// will call its WaitForSync prior to starting workers.
type SyncingSource interface {
Source
Syncing
}

// PrepareSyncing: A SyncingSource that also implements SourcePrepare and has WaitForSync method
type PrepareSyncing interface {
SyncingSource
PrepareSource
}

// PrepareSyncingObject[T]: A SyncingSource that also implements PrepareSourceObject[T] and has WaitForSync method
type PrepareSyncingObject[T any] interface {
SyncingSource
PrepareSourceObject[T]
}

0 comments on commit 8c4d3df

Please sign in to comment.