diff --git a/controller/generic_controller.go b/controller/generic_controller.go index d1b215fe8..ea06aa33a 100644 --- a/controller/generic_controller.go +++ b/controller/generic_controller.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "reflect" "strings" "sync" "time" @@ -41,12 +42,12 @@ func init() { } } -type HandlerFunc func(key string) error +type HandlerFunc func(key string, obj interface{}) (interface{}, error) type GenericController interface { SetThreadinessOverride(count int) Informer() cache.SharedIndexInformer - AddHandler(name string, handler HandlerFunc) + AddHandler(ctx context.Context, name string, handler HandlerFunc) HandlerCount() int Enqueue(namespace, name string) Sync(ctx context.Context) error @@ -60,15 +61,22 @@ type Backend interface { } type handlerDef struct { - name string - handler HandlerFunc + name string + generation int + handler HandlerFunc +} + +type generationKey struct { + generation int + key string } type genericController struct { sync.Mutex threadinessOverride int + generation int informer cache.SharedIndexInformer - handlers []handlerDef + handlers []*handlerDef queue workqueue.RateLimitingInterface name string running bool @@ -116,11 +124,28 @@ func (g *genericController) Enqueue(namespace, name string) { } } -func (g *genericController) AddHandler(name string, handler HandlerFunc) { - g.handlers = append(g.handlers, handlerDef{ - name: name, - handler: handler, - }) +func (g *genericController) AddHandler(ctx context.Context, name string, handler HandlerFunc) { + g.Lock() + h := &handlerDef{ + name: name, + generation: g.generation, + handler: handler, + } + g.handlers = append(g.handlers, h) + g.Unlock() + + go func() { + <-ctx.Done() + g.Lock() + var handlers []*handlerDef + for _, handler := range g.handlers { + if handler != h { + handlers = append(handlers, h) + } + } + g.handlers = handlers + g.Unlock() + }() } func (g *genericController) Sync(ctx context.Context) error { @@ -175,6 +200,22 @@ func (g *genericController) Start(ctx context.Context, threadiness int) error { go g.run(ctx, threadiness) } + if g.running { + for _, h := range g.handlers { + if h.generation != g.generation { + continue + } + for _, key := range g.informer.GetStore().ListKeys() { + g.queueObject(generationKey{ + generation: g.generation, + key: key, + }) + } + break + } + } + + g.generation++ g.running = true return nil } @@ -211,7 +252,7 @@ func (g *genericController) processNextWorkItem() bool { defer g.queue.Done(key) // do your work on the key. This method will contains your "do stuff" logic - err := g.syncHandler(key.(string)) + err := g.syncHandler(key) checkErr := err if handlerErr, ok := checkErr.(*handlerError); ok { checkErr = handlerErr.err @@ -265,14 +306,39 @@ func filterConflictsError(err error) error { return err } -func (g *genericController) syncHandler(s string) (err error) { +func (g *genericController) syncHandler(key interface{}) (err error) { defer utilruntime.RecoverFromPanic(&err) + generation := -1 + var s string + var obj interface{} + + switch v := key.(type) { + case string: + s = v + case generationKey: + generation = v.generation + s = v.key + default: + return nil + } + + obj, exists, err := g.informer.GetStore().GetByKey(s) + if err != nil { + return err + } else if !exists { + obj = nil + } + var errs []error for _, handler := range g.handlers { + if generation > -1 && handler.generation != generation { + continue + } + logrus.Debugf("%s calling handler %s %s", g.name, handler.name, s) metrics.IncTotalHandlerExecution(g.name, handler.name) - if err := handler.handler(s); err != nil { + if newObj, err := handler.handler(s, obj); err != nil { if !ignoreError(err, false) { metrics.IncTotalHandlerFailure(g.name, handler.name, s) } @@ -280,6 +346,8 @@ func (g *genericController) syncHandler(s string) (err error) { name: handler.name, err: err, }) + } else if newObj != nil && !reflect.ValueOf(newObj).IsNil() { + obj = newObj } } err = types.NewErrors(errs...) diff --git a/generator/controller_template.go b/generator/controller_template.go index eed516056..26d52f494 100644 --- a/generator/controller_template.go +++ b/generator/controller_template.go @@ -41,7 +41,7 @@ type {{.schema.CodeName}}List struct { Items []{{.prefix}}{{.schema.CodeName}} } -type {{.schema.CodeName}}HandlerFunc func(key string, obj *{{.prefix}}{{.schema.CodeName}}) error +type {{.schema.CodeName}}HandlerFunc func(key string, obj *{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) type {{.schema.CodeName}}Lister interface { List(namespace string, selector labels.Selector) (ret []*{{.prefix}}{{.schema.CodeName}}, err error) @@ -52,8 +52,8 @@ type {{.schema.CodeName}}Controller interface { Generic() controller.GenericController Informer() cache.SharedIndexInformer Lister() {{.schema.CodeName}}Lister - AddHandler(name string, handler {{.schema.CodeName}}HandlerFunc) - AddClusterScopedHandler(name, clusterName string, handler {{.schema.CodeName}}HandlerFunc) + AddHandler(ctx context.Context, name string, handler {{.schema.CodeName}}HandlerFunc) + AddClusterScopedHandler(ctx context.Context, name, clusterName string, handler {{.schema.CodeName}}HandlerFunc) Enqueue(namespace, name string) Sync(ctx context.Context) error Start(ctx context.Context, threadiness int) error @@ -71,10 +71,10 @@ type {{.schema.CodeName}}Interface interface { Watch(opts metav1.ListOptions) (watch.Interface, error) DeleteCollection(deleteOpts *metav1.DeleteOptions, listOpts metav1.ListOptions) error Controller() {{.schema.CodeName}}Controller - AddHandler(name string, sync {{.schema.CodeName}}HandlerFunc) - AddLifecycle(name string, lifecycle {{.schema.CodeName}}Lifecycle) - AddClusterScopedHandler(name, clusterName string, sync {{.schema.CodeName}}HandlerFunc) - AddClusterScopedLifecycle(name, clusterName string, lifecycle {{.schema.CodeName}}Lifecycle) + AddHandler(ctx context.Context, name string, sync {{.schema.CodeName}}HandlerFunc) + AddLifecycle(ctx context.Context, name string, lifecycle {{.schema.CodeName}}Lifecycle) + AddClusterScopedHandler(ctx context.Context, name, clusterName string, sync {{.schema.CodeName}}HandlerFunc) + AddClusterScopedLifecycle(ctx context.Context, name, clusterName string, lifecycle {{.schema.CodeName}}Lifecycle) } type {{.schema.ID}}Lister struct { @@ -123,34 +123,27 @@ func (c *{{.schema.ID}}Controller) Lister() {{.schema.CodeName}}Lister { } -func (c *{{.schema.ID}}Controller) AddHandler(name string, handler {{.schema.CodeName}}HandlerFunc) { - c.GenericController.AddHandler(name, func(key string) error { - obj, exists, err := c.Informer().GetStore().GetByKey(key) - if err != nil { - return err - } - if !exists { +func (c *{{.schema.ID}}Controller) AddHandler(ctx context.Context, name string, handler {{.schema.CodeName}}HandlerFunc) { + c.GenericController.AddHandler(ctx, name, func(key string, obj interface{}) (interface{}, error) { + if obj == nil { return handler(key, nil) + } else if v, ok := obj.(*{{.prefix}}{{.schema.CodeName}}); ok { + return handler(key, v) + } else { + return nil, nil } - return handler(key, obj.(*{{.prefix}}{{.schema.CodeName}})) }) } -func (c *{{.schema.ID}}Controller) AddClusterScopedHandler(name, cluster string, handler {{.schema.CodeName}}HandlerFunc) { - c.GenericController.AddHandler(name, func(key string) error { - obj, exists, err := c.Informer().GetStore().GetByKey(key) - if err != nil { - return err - } - if !exists { +func (c *{{.schema.ID}}Controller) AddClusterScopedHandler(ctx context.Context, name, cluster string, handler {{.schema.CodeName}}HandlerFunc) { + c.GenericController.AddHandler(ctx, name, func(key string, obj interface{}) (interface{}, error) { + if obj == nil { return handler(key, nil) + } else if v, ok := obj.(*{{.prefix}}{{.schema.CodeName}}); ok && controller.ObjectInCluster(cluster, obj) { + return handler(key, v) + } else { + return nil, nil } - - if !controller.ObjectInCluster(cluster, obj) { - return nil - } - - return handler(key, obj.(*{{.prefix}}{{.schema.CodeName}})) }) } @@ -245,21 +238,21 @@ func (s *{{.schema.ID}}Client) DeleteCollection(deleteOpts *metav1.DeleteOptions return s.objectClient.DeleteCollection(deleteOpts, listOpts) } -func (s *{{.schema.ID}}Client) AddHandler(name string, sync {{.schema.CodeName}}HandlerFunc) { - s.Controller().AddHandler(name, sync) +func (s *{{.schema.ID}}Client) AddHandler(ctx context.Context, name string, sync {{.schema.CodeName}}HandlerFunc) { + s.Controller().AddHandler(ctx, name, sync) } -func (s *{{.schema.ID}}Client) AddLifecycle(name string, lifecycle {{.schema.CodeName}}Lifecycle) { +func (s *{{.schema.ID}}Client) AddLifecycle(ctx context.Context, name string, lifecycle {{.schema.CodeName}}Lifecycle) { sync := New{{.schema.CodeName}}LifecycleAdapter(name, false, s, lifecycle) - s.AddHandler(name, sync) + s.Controller().AddHandler(ctx, name, sync) } -func (s *{{.schema.ID}}Client) AddClusterScopedHandler(name, clusterName string, sync {{.schema.CodeName}}HandlerFunc) { - s.Controller().AddClusterScopedHandler(name, clusterName, sync) +func (s *{{.schema.ID}}Client) AddClusterScopedHandler(ctx context.Context, name, clusterName string, sync {{.schema.CodeName}}HandlerFunc) { + s.Controller().AddClusterScopedHandler(ctx, name, clusterName, sync) } -func (s *{{.schema.ID}}Client) AddClusterScopedLifecycle(name, clusterName string, lifecycle {{.schema.CodeName}}Lifecycle) { +func (s *{{.schema.ID}}Client) AddClusterScopedLifecycle(ctx context.Context, name, clusterName string, lifecycle {{.schema.CodeName}}Lifecycle) { sync := New{{.schema.CodeName}}LifecycleAdapter(name+"_"+clusterName, true, s, lifecycle) - s.AddClusterScopedHandler(name, clusterName, sync) + s.Controller().AddClusterScopedHandler(ctx, name, clusterName, sync) } ` diff --git a/generator/lifecycle_template.go b/generator/lifecycle_template.go index b67b450fc..a9f14abdc 100644 --- a/generator/lifecycle_template.go +++ b/generator/lifecycle_template.go @@ -5,6 +5,7 @@ var lifecycleTemplate = `package {{.schema.Version.Version}} import ( {{.importPackage}} "k8s.io/apimachinery/pkg/runtime" + "github.com/rancher/norman/controller" "github.com/rancher/norman/lifecycle" ) @@ -45,11 +46,12 @@ func (w *{{.schema.ID}}LifecycleAdapter) Updated(obj runtime.Object) (runtime.Ob func New{{.schema.CodeName}}LifecycleAdapter(name string, clusterScoped bool, client {{.schema.CodeName}}Interface, l {{.schema.CodeName}}Lifecycle) {{.schema.CodeName}}HandlerFunc { adapter := &{{.schema.ID}}LifecycleAdapter{lifecycle: l} syncFn := lifecycle.NewObjectLifecycleAdapter(name, clusterScoped, adapter, client.ObjectClient()) - return func(key string, obj *{{.prefix}}{{.schema.CodeName}}) error { - if obj == nil { - return syncFn(key, nil) + return func(key string, obj *{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) { + newObj, err := syncFn(key, obj) + if o, ok := newObj.(*{{.prefix}}{{.schema.CodeName}}); ok { + return o, err } - return syncFn(key, obj) + return nil, err } } ` diff --git a/lifecycle/object.go b/lifecycle/object.go index 211fac63c..10c472dfc 100644 --- a/lifecycle/object.go +++ b/lifecycle/object.go @@ -30,7 +30,7 @@ type objectLifecycleAdapter struct { objectClient *objectclient.ObjectClient } -func NewObjectLifecycleAdapter(name string, clusterScoped bool, lifecycle ObjectLifecycle, objectClient *objectclient.ObjectClient) func(key string, obj runtime.Object) error { +func NewObjectLifecycleAdapter(name string, clusterScoped bool, lifecycle ObjectLifecycle, objectClient *objectclient.ObjectClient) func(key string, obj interface{}) (interface{}, error) { o := objectLifecycleAdapter{ name: name, clusterScoped: clusterScoped, @@ -40,30 +40,39 @@ func NewObjectLifecycleAdapter(name string, clusterScoped bool, lifecycle Object return o.sync } -func (o *objectLifecycleAdapter) sync(key string, obj runtime.Object) error { - if obj == nil { - return nil +func (o *objectLifecycleAdapter) sync(key string, in interface{}) (interface{}, error) { + if in == nil || reflect.ValueOf(in).IsNil() { + return nil, nil + } + + obj, ok := in.(runtime.Object) + if !ok { + return nil, nil } metadata, err := meta.Accessor(obj) if err != nil { - return err + return nil, err } - if cont, err := o.finalize(metadata, obj); err != nil || !cont { - return err + if newObj, cont, err := o.finalize(metadata, obj); err != nil || !cont { + return nil, err + } else if newObj != nil { + obj = newObj } - if cont, err := o.create(metadata, obj); err != nil || !cont { - return err + if newObj, cont, err := o.create(metadata, obj); err != nil || !cont { + return nil, err + } else if newObj != nil { + obj = newObj } copyObj := obj.DeepCopyObject() newObj, err := o.lifecycle.Updated(copyObj) if newObj != nil { - o.update(metadata.GetName(), obj, newObj) + return o.update(metadata.GetName(), obj, newObj) } - return err + return nil, err } func (o *objectLifecycleAdapter) update(name string, orig, obj runtime.Object) (runtime.Object, error) { @@ -73,34 +82,36 @@ func (o *objectLifecycleAdapter) update(name string, orig, obj runtime.Object) ( return obj, nil } -func (o *objectLifecycleAdapter) finalize(metadata metav1.Object, obj runtime.Object) (bool, error) { +func (o *objectLifecycleAdapter) finalize(metadata metav1.Object, obj runtime.Object) (runtime.Object, bool, error) { // Check finalize if metadata.GetDeletionTimestamp() == nil { - return true, nil + return nil, true, nil } if !slice.ContainsString(metadata.GetFinalizers(), o.constructFinalizerKey()) { - return false, nil + return nil, false, nil } copyObj := obj.DeepCopyObject() if newObj, err := o.lifecycle.Finalize(copyObj); err != nil { if newObj != nil { - o.update(metadata.GetName(), obj, newObj) + newObj, _ := o.update(metadata.GetName(), obj, newObj) + return newObj, false, err } - return false, err + return nil, false, err } else if newObj != nil { copyObj = newObj } - return false, o.removeFinalizer(o.constructFinalizerKey(), copyObj) + newObj, err := o.removeFinalizer(o.constructFinalizerKey(), copyObj) + return newObj, false, err } -func (o *objectLifecycleAdapter) removeFinalizer(name string, obj runtime.Object) error { +func (o *objectLifecycleAdapter) removeFinalizer(name string, obj runtime.Object) (runtime.Object, error) { for i := 0; i < 3; i++ { metadata, err := meta.Accessor(obj) if err != nil { - return err + return nil, err } var finalizers []string @@ -112,18 +123,18 @@ func (o *objectLifecycleAdapter) removeFinalizer(name string, obj runtime.Object } metadata.SetFinalizers(finalizers) - _, err = o.objectClient.Update(metadata.GetName(), obj) + newObj, err := o.objectClient.Update(metadata.GetName(), obj) if err == nil { - return nil + return newObj, nil } obj, err = o.objectClient.GetNamespaced(metadata.GetNamespace(), metadata.GetName(), metav1.GetOptions{}) if err != nil { - return err + return nil, err } } - return fmt.Errorf("failed to remove finalizer on %s", name) + return nil, fmt.Errorf("failed to remove finalizer on %s", name) } func (o *objectLifecycleAdapter) createKey() string { @@ -137,25 +148,26 @@ func (o *objectLifecycleAdapter) constructFinalizerKey() string { return finalizerKey + o.name } -func (o *objectLifecycleAdapter) create(metadata metav1.Object, obj runtime.Object) (bool, error) { +func (o *objectLifecycleAdapter) create(metadata metav1.Object, obj runtime.Object) (runtime.Object, bool, error) { if o.isInitialized(metadata) { - return true, nil + return nil, true, nil } copyObj := obj.DeepCopyObject() copyObj, err := o.addFinalizer(copyObj) if err != nil { - return false, err + return copyObj, false, err } if newObj, err := o.lifecycle.Create(copyObj); err != nil { - o.update(metadata.GetName(), obj, newObj) - return false, err + newObj, _ = o.update(metadata.GetName(), obj, newObj) + return newObj, false, err } else if newObj != nil { copyObj = newObj } - return false, o.setInitialized(copyObj) + newObj, err := o.setInitialized(copyObj) + return newObj, false, err } func (o *objectLifecycleAdapter) isInitialized(metadata metav1.Object) bool { @@ -163,10 +175,10 @@ func (o *objectLifecycleAdapter) isInitialized(metadata metav1.Object) bool { return metadata.GetAnnotations()[initialized] == "true" } -func (o *objectLifecycleAdapter) setInitialized(obj runtime.Object) error { +func (o *objectLifecycleAdapter) setInitialized(obj runtime.Object) (runtime.Object, error) { metadata, err := meta.Accessor(obj) if err != nil { - return err + return nil, err } initialized := o.createKey() @@ -176,8 +188,7 @@ func (o *objectLifecycleAdapter) setInitialized(obj runtime.Object) error { } metadata.GetAnnotations()[initialized] = "true" - _, err = o.objectClient.Update(metadata.GetName(), obj) - return err + return o.objectClient.Update(metadata.GetName(), obj) } func (o *objectLifecycleAdapter) addFinalizer(obj runtime.Object) (runtime.Object, error) {