Skip to content

Commit

Permalink
Cleanup and lint fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Danil Grigorev <danil.grigorev@suse.com>
  • Loading branch information
Danil-Grigorev committed Apr 17, 2024
1 parent 8984b30 commit 9179c03
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 67 deletions.
2 changes: 1 addition & 1 deletion example_test.go
Expand Up @@ -87,7 +87,7 @@ func GenericExample() {

b := ctrl.NewControllerManagedBy(manager) // Create the Controller
// ReplicaSet is the Application API
b.Add(builder.For(manager, &appsv1.ReplicaSet{})).
err = b.Add(builder.For(manager, &appsv1.ReplicaSet{})).
Add(builder.Owns(manager, &appsv1.ReplicaSet{}, &corev1.Pod{})). // ReplicaSet owns Pods created by it
Complete(&ReplicaSetReconciler{Client: manager.GetClient()})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/builder/controller.go
Expand Up @@ -187,10 +187,12 @@ func (blder *Builder) WatchesRawSource(src source.PrepareSyncing, eventHandler h
return blder
}

// For defines the type of Object being reconciled and allows to respond to object events inheriting the object type at all cases.
func For[T client.Object](mgr manager.Manager, object T, prct ...predicate.ObjectPredicate[T]) source.Source {
return source.ObjectKind(mgr.GetCache(), object).PrepareObject(&handler.EnqueueRequest[T]{}, prct...)
}

// Owns defines the type of owner and owned objects to watch with predicates inheriting the owned object type applying to owned object.
func Owns[F, T client.Object](mgr manager.Manager, owner F, owned T, prct ...predicate.ObjectPredicate[T]) source.Source {
src := source.ObjectKind(mgr.GetCache(), owned)

Expand All @@ -202,10 +204,12 @@ func Owns[F, T client.Object](mgr manager.Manager, owner F, owned T, prct ...pre
return src.PrepareObject(handler.ObjectFuncAdapter[T](hdler), prct...)
}

// Watches defines the type of object to watch with ObjectHandler and predicates inheriting the object type.
func Watches[T client.Object](mgr manager.Manager, object T, eventHandler handler.ObjectHandler[T], prct ...predicate.ObjectPredicate[T]) source.Source {
return source.ObjectKind(mgr.GetCache(), object).PrepareObject(eventHandler, prct...)
}

// Add allows to pass a prepared source object with a fully defined event handler and predicates list.
func (blder *Builder) Add(src source.Source) *Builder {
blder.rawWatches = append(blder.rawWatches, src)
return blder
Expand Down
8 changes: 0 additions & 8 deletions pkg/cache/cache.go
Expand Up @@ -96,14 +96,6 @@ type Informers interface {
client.FieldIndexer
}

type ObjectInformers[T any] interface {
Informer

// GetInformer fetches or constructs an informer for the given object that corresponds to a single
// API kind and resource.
GetInformer(ctx context.Context, obj T, opts ...InformerGetOption) (Informer, error)
}

// Informer allows you to interact with the underlying informer.
type Informer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/controller.go
Expand Up @@ -192,13 +192,13 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
// ReconcileIDFromContext gets the reconcileID from the current context.
var ReconcileIDFromContext = controller.ReconcileIDFromContext

// ControllerAdapter is an adapter for old controller implementations
type ControllerAdapter struct {
// Adapter is an adapter for old controller implementations
type Adapter struct {
Controller
}

// Watch implements old controller Watch interface
func (c *ControllerAdapter) Watch(src source.Source, handler handler.EventHandler, predicates ...predicate.Predicate) error {
func (c *Adapter) Watch(src source.Source, handler handler.EventHandler, predicates ...predicate.Predicate) error {
source, ok := src.(source.PrepareSource)
if !ok {
return fmt.Errorf("expected source to fulfill SourcePrepare interface")
Expand Down
6 changes: 4 additions & 2 deletions pkg/handler/enqueue_mapped.go
Expand Up @@ -38,7 +38,9 @@ type MapFunc func(context.Context, client.Object) []reconcile.Request
// For UpdateEvents which contain both a new and old object, the transformation function is run on both
// objects and both sets of Requests are enqueue.
func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
return &enqueueRequestsFromObjectMapFunc[any]{
toRequests: MapFuncAdapter(fn),
return &enqueueRequestsFromObjectMapFunc[client.Object]{
toRequests: func(ctx context.Context, obj client.Object) (reqs []reconcile.Request) {
return fn(ctx, obj)
},
}
}
18 changes: 3 additions & 15 deletions pkg/handler/enqueue_mapped_typed.go
Expand Up @@ -20,7 +20,6 @@ import (
"context"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand All @@ -30,17 +29,6 @@ import (
// Unlike MapFunc, a specific object type can be used to process and create mapping requests.
type ObjectMapFunc[T any] func(context.Context, T) []reconcile.Request

func MapFuncAdapter(m MapFunc) ObjectMapFunc[any] {
return func(ctx context.Context, a any) (reqs []reconcile.Request) {
obj, ok := a.(client.Object)
if ok {
return m(ctx, obj)
}

return []reconcile.Request{}
}
}

// EnqueueRequestsFromObjectMapFunc enqueues Requests by running a transformation function that outputs a collection
// of reconcile.Requests on each Event. The reconcile.Requests may be for an arbitrary set of objects
// defined by some user specified transformation of the source Event. (e.g. trigger Reconciler for a set of objects
Expand Down Expand Up @@ -116,10 +104,10 @@ func (e *enqueueRequestsFromObjectMapFunc[T]) Create(ctx context.Context, evt ev

// Update implements EventHandler.
func (e *enqueueRequestsFromObjectMapFunc[T]) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
old, okOld := evt.ObjectOld.(T)
new, okNew := evt.ObjectNew.(T)
oldObj, okOld := evt.ObjectOld.(T)
newObj, okNew := evt.ObjectNew.(T)
if okOld && okNew {
e.OnUpdate(ctx, old, new, q)
e.OnUpdate(ctx, oldObj, newObj, q)
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/handler/enqueue_typed.go
Expand Up @@ -29,6 +29,7 @@ import (
var _ EventHandler = &EnqueueRequest[metav1.Object]{}
var _ ObjectHandler[metav1.Object] = &EnqueueRequest[metav1.Object]{}

// Request is a minimal subset of a client.Object interface, allowing to enact on non kubernetes resources.
type Request interface {
GetName() string
GetNamespace() string
Expand Down
28 changes: 15 additions & 13 deletions pkg/handler/eventhandler.go
Expand Up @@ -125,7 +125,7 @@ var _ EventHandler = Funcs{}
var _ EventHandler = ObjectFuncs[any]{}
var _ ObjectHandler[any] = ObjectFuncs[any]{}

// Funcs is a function that implements Predicate.
// ObjectFuncs is a function that implements ObjectPredicate.
type ObjectFuncs[T any] struct {
// Create is called in response to an add event. Defaults to no-op.
// RateLimitingInterface is used to enqueue reconcile.Requests.
Expand All @@ -146,10 +146,10 @@ type ObjectFuncs[T any] struct {

// Update implements Predicate.
func (p ObjectFuncs[T]) Update(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
new, ok := e.ObjectNew.(T)
old, oldOk := e.ObjectOld.(T)
if ok && oldOk {
p.OnUpdate(ctx, old, new, q)
objNew, newOk := e.ObjectNew.(T)
objOld, oldOk := e.ObjectOld.(T)
if newOk && oldOk {
p.OnUpdate(ctx, objOld, objNew, q)
}
}

Expand Down Expand Up @@ -177,34 +177,35 @@ func (p ObjectFuncs[T]) Delete(ctx context.Context, e event.DeleteEvent, q workq
}
}

// Update implements Predicate.
// OnUpdate implements ObjectPredicate.
func (p ObjectFuncs[T]) OnUpdate(ctx context.Context, old, new T, q workqueue.RateLimitingInterface) {
if p.UpdateFunc != nil {
p.UpdateFunc(ctx, old, new, q)
}
}

// Generic implements Predicate.
// OnGeneric implements ObjectPredicate.
func (p ObjectFuncs[T]) OnGeneric(ctx context.Context, obj T, q workqueue.RateLimitingInterface) {
if p.GenericFunc != nil {
p.GenericFunc(ctx, obj, q)
}
}

// Create implements Predicate.
// OnCreate implements ObjectPredicate.
func (p ObjectFuncs[T]) OnCreate(ctx context.Context, obj T, q workqueue.RateLimitingInterface) {
if p.CreateFunc != nil {
p.CreateFunc(ctx, obj, q)
}
}

// Delete implements Predicate.
// OnDelete implements ObjectPredicate.
func (p ObjectFuncs[T]) OnDelete(ctx context.Context, obj T, q workqueue.RateLimitingInterface) {
if p.DeleteFunc != nil {
p.DeleteFunc(ctx, obj, q)
}
}

// ObjectFuncAdapter allows to reuse existing EventHandler for a typed ObjectHandler
func ObjectFuncAdapter[T client.Object](h EventHandler) ObjectHandler[T] {
return ObjectFuncs[T]{
CreateFunc: func(ctx context.Context, obj T, queue workqueue.RateLimitingInterface) {
Expand All @@ -222,6 +223,7 @@ func ObjectFuncAdapter[T client.Object](h EventHandler) ObjectHandler[T] {
}
}

// EventHandlerAdapter allows to reuse existing typed event handler as EventHandler
func EventHandlerAdapter[T client.Object](h ObjectHandler[T]) EventHandler {
return Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, queue workqueue.RateLimitingInterface) {
Expand All @@ -243,10 +245,10 @@ func EventHandlerAdapter[T client.Object](h ObjectHandler[T]) EventHandler {
}
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, queue workqueue.RateLimitingInterface) {
new, ok := e.ObjectNew.(T)
old, oldOk := e.ObjectOld.(T)
if ok && oldOk {
h.OnUpdate(ctx, old, new, queue)
objNew, newOk := e.ObjectNew.(T)
objOld, oldOk := e.ObjectOld.(T)
if newOk && oldOk {
h.OnUpdate(ctx, objOld, objNew, queue)
}
},
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/interfaces/source.go
Expand Up @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package interfaces

import (
Expand All @@ -37,12 +38,12 @@ type Source interface {
Start(context.Context, workqueue.RateLimitingInterface) error
}

// PrepareSource: Prepares a Source to be used with EventHandler and predicates
// PrepareSource - Prepares a Source to be used with EventHandler and predicates
type PrepareSource interface {
Prepare(handler.EventHandler, ...predicate.Predicate) SyncingSource
}

// PrepareSourceObject[T]: Prepares a Source preserving the object type
// PrepareSourceObject - Prepares a Source preserving the object type
type PrepareSourceObject[T any] interface {
PrepareObject(handler.ObjectHandler[T], ...predicate.ObjectPredicate[T]) SyncingSource
}
Expand All @@ -59,13 +60,13 @@ type SyncingSource interface {
Syncing
}

// PrepareSyncing: A SyncingSource that also implements SourcePrepare and has WaitForSync method
// PrepareSyncing - a SyncingSource that also implements SourcePrepare and has WaitForSync method
type PrepareSyncing interface {
SyncingSource
PrepareSource
}

// PrepareSyncingObject[T]: A SyncingSource that also implements PrepareSourceObject[T] and has WaitForSync method
// PrepareSyncingObject - a SyncingSource that also implements PrepareSourceObject[T] and has WaitForSync method
type PrepareSyncingObject[T any] interface {
SyncingSource
PrepareSourceObject[T]
Expand Down
11 changes: 4 additions & 7 deletions pkg/internal/source/kind.go
Expand Up @@ -35,18 +35,15 @@ type Kind[T client.Object] struct {
handler handler.ObjectHandler[T]
}

// SetPredicates implements source.SyncingSource.
func (ks *Kind[T]) SetPredicates(...predicate.PredicateConstraint) {
panic("unimplemented")
}

// PrepareObject implements PrepareSyncingObject preparation and should only be called when handler and predicates are available.
func (ks *Kind[T]) PrepareObject(h handler.ObjectHandler[T], prct ...predicate.ObjectPredicate[T]) interfaces.SyncingSource {
ks.handler = h
ks.predicates = prct

return ks
}

// Prepare implements Source preparation and should only be called when handler and predicates are available.
func (ks *Kind[T]) Prepare(h handler.EventHandler, prct ...predicate.Predicate) interfaces.SyncingSource {
ks.handler = handler.ObjectFuncAdapter[T](h)
ks.predicates = predicate.ObjectPredicatesAdapter[T](prct...)
Expand All @@ -59,10 +56,10 @@ func (ks *Kind[T]) Start(
ctx context.Context,
queue workqueue.RateLimitingInterface,
) error {
return ks.Run(ctx, ks.handler, queue, ks.predicates...)
return ks.run(ctx, ks.handler, queue, ks.predicates...)
}

func (ks *Kind[T]) Run(ctx context.Context, handler handler.ObjectHandler[T], queue workqueue.RateLimitingInterface,
func (ks *Kind[T]) run(ctx context.Context, handler handler.ObjectHandler[T], queue workqueue.RateLimitingInterface,
prct ...predicate.ObjectPredicate[T]) error {
if reflect.DeepEqual(ks.Type, *new(T)) {
return fmt.Errorf("must create Kind with a non-nil object")
Expand Down
24 changes: 11 additions & 13 deletions pkg/predicate/predicate.go
Expand Up @@ -28,10 +28,6 @@ import (

var log = logf.RuntimeLog.WithName("predicate").WithName("eventFilters")

type PredicateConstraint interface {
Register()
}

// Predicate filters events before enqueuing the keys.
type Predicate interface {
// Create returns true if the Create event should be processed
Expand Down Expand Up @@ -120,7 +116,7 @@ func (p Funcs) Generic(e event.GenericEvent) bool {
return true
}

// Funcs is a function that implements Predicate.
// ObjectFuncs is a function that implements Predicate and ObjectPrediace.
type ObjectFuncs[T any] struct {
// Create returns true if the Create event should be processed
CreateFunc func(obj T) bool
Expand All @@ -137,9 +133,9 @@ type ObjectFuncs[T any] struct {

// Update implements Predicate.
func (p ObjectFuncs[T]) Update(e event.UpdateEvent) bool {
new, ok := e.ObjectNew.(T)
old, oldOk := e.ObjectOld.(T)
return ok && oldOk && p.OnUpdate(old, new)
newObj, newOk := e.ObjectNew.(T)
oldObj, oldOk := e.ObjectOld.(T)
return newOk && oldOk && p.OnUpdate(oldObj, newObj)
}

// Generic implements Predicate.
Expand All @@ -160,22 +156,22 @@ func (p ObjectFuncs[T]) Delete(e event.DeleteEvent) bool {
return ok && p.OnDelete(obj)
}

// Update implements Predicate.
// OnUpdate implements ObjectPredicate.
func (p ObjectFuncs[T]) OnUpdate(old, new T) bool {
return p.UpdateFunc == nil || p.UpdateFunc(old, new)
}

// Generic implements Predicate.
// OnGeneric implements ObjectPredicate.
func (p ObjectFuncs[T]) OnGeneric(obj T) bool {
return p.GenericFunc == nil || p.GenericFunc(obj)
}

// Create implements Predicate.
// OnCreate implements ObjectPredicate.
func (p ObjectFuncs[T]) OnCreate(obj T) bool {
return p.CreateFunc == nil || p.CreateFunc(obj)
}

// Delete implements Predicate.
// OnDelete implements ObjectPredicate.
func (p ObjectFuncs[T]) OnDelete(obj T) bool {
return p.DeleteFunc == nil || p.DeleteFunc(obj)
}
Expand All @@ -200,7 +196,7 @@ func NewPredicateFuncs(filter func(object client.Object) bool) Funcs {
}
}

// NewPredicateFuncs returns a predicate funcs that applies the given filter function
// NewObjectPredicateFuncs returns a typed predicate funcs that applies the given filter function
// on CREATE, UPDATE, DELETE and GENERIC events. For UPDATE events, the filter is applied
// to the new object.
func NewObjectPredicateFuncs[T any](filter func(object T) bool) ObjectFuncs[T] {
Expand Down Expand Up @@ -590,6 +586,7 @@ func LabelSelectorPredicate(s metav1.LabelSelector) (Predicate, error) {
}), nil
}

// ObjectPredicateAdapter allows to reuse existing predicate as a typed ObjectPredicate
func ObjectPredicateAdapter[T client.Object](h Predicate) ObjectPredicate[T] {
return ObjectFuncs[T]{
CreateFunc: func(obj T) bool {
Expand All @@ -607,6 +604,7 @@ func ObjectPredicateAdapter[T client.Object](h Predicate) ObjectPredicate[T] {
}
}

// ObjectPredicatesAdapter allows to reuse existing set of predicates as ObjectPredicates
func ObjectPredicatesAdapter[T client.Object](predicates ...Predicate) (prdt []ObjectPredicate[T]) {
for _, p := range predicates {
prdt = append(prdt, ObjectPredicateAdapter[T](p))
Expand Down

0 comments on commit 9179c03

Please sign in to comment.