Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠️ [WIP] Extend builder with generic For, Owns and Watches #2784

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
84 changes: 84 additions & 0 deletions example_test.go
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/types"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -69,6 +70,37 @@ func Example() {
}
}

// This example creates a generic application Controller that is configured for ReplicaSets and Pods.
//
// * Create a new application for ReplicaSets that manages Pods owned by the ReplicaSet and calls into
// ReplicaSetReconciler.
//
// * Start the application.
func GenericExample() {
log := ctrl.Log.WithName("builder-examples")

manager, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
}

b := ctrl.NewControllerManagedBy(manager) // Create the Controller
// ReplicaSet is the Application API
err = b.Add(builder.For(manager, &appsv1.ReplicaSet{})).
Add(builder.Owns(manager, &appsv1.ReplicaSet{}, &corev1.Pod{})). // ReplicaSet owns Pods created by it
Complete(&ReplicaSetReconciler{Client: manager.GetClient()})
if err != nil {
log.Error(err, "could not create controller")
os.Exit(1)
}

if err := manager.Start(ctrl.SetupSignalHandler()); err != nil {
log.Error(err, "could not start manager")
os.Exit(1)
}
}

type ExampleCRDWithConfigMapRef struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand Down Expand Up @@ -157,6 +189,58 @@ func Example_customHandler() {
}
}

// This example creates a simple application Controller that is configured for ExampleCRDWithConfigMapRef CRD.
// Any change in the configMap referenced in this Custom Resource will cause the re-reconcile of the parent ExampleCRDWithConfigMapRef
// due to the implementation of the .Watches method of "sigs.k8s.io/controller-runtime/pkg/builder".Builder.
func Example_generic_customHandler() {
log := ctrl.Log.WithName("builder-examples")

manager, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
}

err = ctrl.
NewControllerManagedBy(manager).
For(&ExampleCRDWithConfigMapRef{}).
Add(builder.Watches(manager, &corev1.ConfigMap{}, handler.EnqueueRequestsFromObjectMap(func(ctx context.Context, cm *corev1.ConfigMap) []ctrl.Request {
// map a change from referenced configMap to ExampleCRDWithConfigMapRef, which causes its re-reconcile
crList := &ExampleCRDWithConfigMapRefList{}
if err := manager.GetClient().List(ctx, crList); err != nil {
manager.GetLogger().Error(err, "while listing ExampleCRDWithConfigMapRefs")
return nil
}

reqs := make([]ctrl.Request, 0, len(crList.Items))
for _, item := range crList.Items {
if item.ConfigMapRef.Name == cm.Name && cm.Data["Namespace"] == item.GetNamespace() {
reqs = append(reqs, ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: item.GetNamespace(),
Name: item.GetName(),
},
})
}
}

return reqs
}))).
Complete(reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
// Your business logic to implement the API by creating, updating, deleting objects goes here.
return reconcile.Result{}, nil
}))
if err != nil {
log.Error(err, "could not create controller")
os.Exit(1)
}

if err := manager.Start(ctrl.SetupSignalHandler()); err != nil {
log.Error(err, "could not start manager")
os.Exit(1)
}
}

// This example creates a simple application Controller that is configured for ReplicaSets and Pods.
// This application controller will be running leader election with the provided configuration in the manager options.
// If leader election configuration is not provided, controller runs leader election with default values.
Expand Down
5 changes: 2 additions & 3 deletions examples/builtins/main.go
Expand Up @@ -59,14 +59,13 @@ func main() {
}

// Watch ReplicaSets and enqueue ReplicaSet object key
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}).Prepare(&handler.EnqueueRequestForObject{})); err != nil {
entryLog.Error(err, "unable to watch ReplicaSets")
os.Exit(1)
}

// Watch Pods and enqueue owning ReplicaSet key
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}).Prepare(handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
entryLog.Error(err, "unable to watch Pods")
os.Exit(1)
}
Expand Down
50 changes: 43 additions & 7 deletions pkg/builder/controller.go
Expand Up @@ -57,6 +57,7 @@ type Builder struct {
forInput ForInput
ownsInput []OwnsInput
watchesInput []WatchesInput
rawWatches []source.Source
mgr manager.Manager
globalPredicates []predicate.Predicate
ctrl controller.Controller
Expand Down Expand Up @@ -123,7 +124,7 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {

// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
src source.Source
src source.PrepareSyncing
eventHandler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
Expand Down Expand Up @@ -176,7 +177,7 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
//
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
func (blder *Builder) WatchesRawSource(src source.Source, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
func (blder *Builder) WatchesRawSource(src source.PrepareSyncing, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
input := WatchesInput{src: src, eventHandler: eventHandler}
for _, opt := range opts {
opt.ApplyToWatches(&input)
Expand All @@ -186,6 +187,34 @@ func (blder *Builder) WatchesRawSource(src source.Source, eventHandler handler.E
return blder
}

// For defines the type of Object being reconciled and allows to respond to object events inheriting the object type at all cases.
func For[T client.Object](mgr manager.Manager, object T, prct ...predicate.ObjectPredicate[T]) source.Source {
return source.ObjectKind(mgr.GetCache(), object).PrepareObject(&handler.EnqueueRequest[T]{}, prct...)
}

// Owns defines the type of owner and owned objects to watch with predicates inheriting the owned object type applying to owned object.
func Owns[F, T client.Object](mgr manager.Manager, owner F, owned T, prct ...predicate.ObjectPredicate[T]) source.Source {
src := source.ObjectKind(mgr.GetCache(), owned)

hdler := handler.EnqueueRequestForOwner(
mgr.GetScheme(), mgr.GetRESTMapper(),
owner,
)

return src.PrepareObject(handler.ObjectFuncAdapter[T](hdler), prct...)
}

// Watches defines the type of object to watch with ObjectHandler and predicates inheriting the object type.
func Watches[T client.Object](mgr manager.Manager, object T, eventHandler handler.ObjectHandler[T], prct ...predicate.ObjectPredicate[T]) source.Source {
return source.ObjectKind(mgr.GetCache(), object).PrepareObject(eventHandler, prct...)
}

// Add allows to pass a prepared source object with a fully defined event handler and predicates list.
func (blder *Builder) Add(src source.Source) *Builder {
blder.rawWatches = append(blder.rawWatches, src)
return blder
}

// WithEventFilter sets the event filters, to filter which create/update/delete/generic events eventually
// trigger reconciliations. For example, filtering on whether the resource version has changed.
// Given predicate is added for all watched objects.
Expand Down Expand Up @@ -276,7 +305,7 @@ func (blder *Builder) doWatch() error {
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
if err := blder.ctrl.Watch(src.Prepare(hdler, allPredicates...)); err != nil {
return err
}
}
Expand All @@ -302,18 +331,18 @@ func (blder *Builder) doWatch() error {
)
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
if err := blder.ctrl.Watch(src.Prepare(hdler, allPredicates...)); err != nil {
return err
}
}

// Do the watch requests
if len(blder.watchesInput) == 0 && blder.forInput.object == nil {
if len(blder.watchesInput) == 0 && blder.forInput.object == nil && len(blder.rawWatches) == 0 {
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")
}
for _, w := range blder.watchesInput {
// If the source of this watch is of type Kind, project it.
if srcKind, ok := w.src.(*internalsource.Kind); ok {
if srcKind, ok := w.src.(*internalsource.Kind[client.Object]); ok {
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
if err != nil {
return err
Expand All @@ -322,10 +351,17 @@ func (blder *Builder) doWatch() error {
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
if err := blder.ctrl.Watch(w.src, w.eventHandler, allPredicates...); err != nil {
if err := blder.ctrl.Watch(w.src.Prepare(w.eventHandler, allPredicates...)); err != nil {
return err
}
}

for _, r := range blder.rawWatches {
if err := blder.ctrl.Watch(r); err != nil {
return err
}
}

return nil
}

Expand Down
17 changes: 16 additions & 1 deletion pkg/controller/controller.go
Expand Up @@ -90,7 +90,7 @@ type Controller interface {
// Watch may be provided one or more Predicates to filter events before
// they are given to the EventHandler. Events will be passed to the
// EventHandler if all provided Predicates evaluate to true.
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
Watch(src source.Source) error

// Start starts the controller. Start blocks until the context is closed or a
// controller has an error starting.
Expand Down Expand Up @@ -191,3 +191,18 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller

// ReconcileIDFromContext gets the reconcileID from the current context.
var ReconcileIDFromContext = controller.ReconcileIDFromContext

// Adapter is an adapter for old controller implementations
type Adapter struct {
Controller
}

// Watch implements old controller Watch interface
func (c *Adapter) Watch(src source.Source, handler handler.EventHandler, predicates ...predicate.Predicate) error {
source, ok := src.(source.PrepareSource)
if !ok {
return fmt.Errorf("expected source to fulfill SourcePrepare interface")
}

return c.Controller.Watch(source.Prepare(handler, predicates...))
}
7 changes: 2 additions & 5 deletions pkg/controller/controller_integration_test.go
Expand Up @@ -64,13 +64,10 @@ var _ = Describe("controller", func() {
Expect(err).NotTo(HaveOccurred())

By("Watching Resources")
err = instance.Watch(
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}),
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
)
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}).Prepare(handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{})))
Expect(err).NotTo(HaveOccurred())

err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{})
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}).Prepare(&handler.EnqueueRequestForObject{}))
Expect(err).NotTo(HaveOccurred())

err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Expand Up @@ -101,7 +101,7 @@ var _ = Describe("controller.Controller", func() {
Expect(err).NotTo(HaveOccurred())

c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
Expect(c.Watch(watch.Prepare(&handler.EnqueueRequestForObject{}))).To(Succeed())
Expect(err).NotTo(HaveOccurred())

go func() {
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/example_test.go
Expand Up @@ -71,7 +71,7 @@ func ExampleController() {
}

// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{})
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}).Prepare(&handler.EnqueueRequestForObject{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -108,7 +108,8 @@ func ExampleController_unstructured() {
Version: "v1",
})
// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), u), &handler.EnqueueRequestForObject{})
src := source.Kind(mgr.GetCache(), u)
err = c.Watch(src.Prepare(&handler.EnqueueRequestForObject{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -139,7 +140,8 @@ func ExampleNewUnmanaged() {
os.Exit(1)
}

if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}); err != nil {
src := source.Kind(mgr.GetCache(), &corev1.Pod{})
if err := c.Watch(src.Prepare(&handler.EnqueueRequestForObject{})); err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
}
Expand Down
50 changes: 4 additions & 46 deletions pkg/handler/enqueue_mapped.go
Expand Up @@ -19,9 +19,7 @@ package handler
import (
"context"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

Expand All @@ -40,49 +38,9 @@ type MapFunc func(context.Context, client.Object) []reconcile.Request
// For UpdateEvents which contain both a new and old object, the transformation function is run on both
// objects and both sets of Requests are enqueue.
func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
return &enqueueRequestsFromMapFunc{
toRequests: fn,
}
}

var _ EventHandler = &enqueueRequestsFromMapFunc{}

type enqueueRequestsFromMapFunc struct {
// Mapper transforms the argument into a slice of keys to be reconciled
toRequests MapFunc
}

// Create implements EventHandler.
func (e *enqueueRequestsFromMapFunc) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
reqs := map[reconcile.Request]empty{}
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
}

// Update implements EventHandler.
func (e *enqueueRequestsFromMapFunc) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
reqs := map[reconcile.Request]empty{}
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)
}

// Delete implements EventHandler.
func (e *enqueueRequestsFromMapFunc) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
reqs := map[reconcile.Request]empty{}
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
}

// Generic implements EventHandler.
func (e *enqueueRequestsFromMapFunc) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
reqs := map[reconcile.Request]empty{}
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
}

func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(ctx context.Context, q workqueue.RateLimitingInterface, object client.Object, reqs map[reconcile.Request]empty) {
for _, req := range e.toRequests(ctx, object) {
_, ok := reqs[req]
if !ok {
q.Add(req)
reqs[req] = empty{}
}
return &enqueueRequestsFromObjectMapFunc[client.Object]{
toRequests: func(ctx context.Context, obj client.Object) (reqs []reconcile.Request) {
return fn(ctx, obj)
},
}
}