Skip to content

Commit

Permalink
Handlers return object and error
Browse files Browse the repository at this point in the history
  • Loading branch information
ibuildthecloud committed Oct 30, 2018
1 parent dfeffc8 commit 77869d2
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 86 deletions.
94 changes: 81 additions & 13 deletions controller/generic_controller.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"reflect"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -41,12 +42,12 @@ func init() {
}
}

type HandlerFunc func(key string) error
type HandlerFunc func(key string, obj interface{}) (interface{}, error)

type GenericController interface {
SetThreadinessOverride(count int)
Informer() cache.SharedIndexInformer
AddHandler(name string, handler HandlerFunc)
AddHandler(ctx context.Context, name string, handler HandlerFunc)
HandlerCount() int
Enqueue(namespace, name string)
Sync(ctx context.Context) error
Expand All @@ -60,15 +61,22 @@ type Backend interface {
}

type handlerDef struct {
name string
handler HandlerFunc
name string
generation int
handler HandlerFunc
}

type generationKey struct {
generation int
key string
}

type genericController struct {
sync.Mutex
threadinessOverride int
generation int
informer cache.SharedIndexInformer
handlers []handlerDef
handlers []*handlerDef
queue workqueue.RateLimitingInterface
name string
running bool
Expand Down Expand Up @@ -116,11 +124,28 @@ func (g *genericController) Enqueue(namespace, name string) {
}
}

func (g *genericController) AddHandler(name string, handler HandlerFunc) {
g.handlers = append(g.handlers, handlerDef{
name: name,
handler: handler,
})
func (g *genericController) AddHandler(ctx context.Context, name string, handler HandlerFunc) {
g.Lock()
h := &handlerDef{
name: name,
generation: g.generation,
handler: handler,
}
g.handlers = append(g.handlers, h)
g.Unlock()

go func() {
<-ctx.Done()
g.Lock()
var handlers []*handlerDef
for _, handler := range g.handlers {
if handler != h {
handlers = append(handlers, h)
}
}
g.handlers = handlers
g.Unlock()
}()
}

func (g *genericController) Sync(ctx context.Context) error {
Expand Down Expand Up @@ -175,6 +200,22 @@ func (g *genericController) Start(ctx context.Context, threadiness int) error {
go g.run(ctx, threadiness)
}

if g.running {
for _, h := range g.handlers {
if h.generation != g.generation {
continue
}
for _, key := range g.informer.GetStore().ListKeys() {
g.queueObject(generationKey{
generation: g.generation,
key: key,
})
}
break
}
}

g.generation++
g.running = true
return nil
}
Expand Down Expand Up @@ -211,7 +252,7 @@ func (g *genericController) processNextWorkItem() bool {
defer g.queue.Done(key)

// do your work on the key. This method will contains your "do stuff" logic
err := g.syncHandler(key.(string))
err := g.syncHandler(key)
checkErr := err
if handlerErr, ok := checkErr.(*handlerError); ok {
checkErr = handlerErr.err
Expand Down Expand Up @@ -265,21 +306,48 @@ func filterConflictsError(err error) error {
return err
}

func (g *genericController) syncHandler(s string) (err error) {
func (g *genericController) syncHandler(key interface{}) (err error) {
defer utilruntime.RecoverFromPanic(&err)

generation := -1
var s string
var obj interface{}

switch v := key.(type) {
case string:
s = v
case generationKey:
generation = v.generation
s = v.key
default:
return nil
}

obj, exists, err := g.informer.GetStore().GetByKey(s)
if err != nil {
return err
} else if !exists {
obj = nil
}

var errs []error
for _, handler := range g.handlers {
if generation > -1 && handler.generation != generation {
continue
}

logrus.Debugf("%s calling handler %s %s", g.name, handler.name, s)
metrics.IncTotalHandlerExecution(g.name, handler.name)
if err := handler.handler(s); err != nil {
if newObj, err := handler.handler(s, obj); err != nil {
if !ignoreError(err, false) {
metrics.IncTotalHandlerFailure(g.name, handler.name, s)
}
errs = append(errs, &handlerError{
name: handler.name,
err: err,
})
} else if newObj != nil && !reflect.ValueOf(newObj).IsNil() {
obj = newObj
}
}
err = types.NewErrors(errs...)
Expand Down
65 changes: 29 additions & 36 deletions generator/controller_template.go
Expand Up @@ -41,7 +41,7 @@ type {{.schema.CodeName}}List struct {
Items []{{.prefix}}{{.schema.CodeName}}
}
type {{.schema.CodeName}}HandlerFunc func(key string, obj *{{.prefix}}{{.schema.CodeName}}) error
type {{.schema.CodeName}}HandlerFunc func(key string, obj *{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error)
type {{.schema.CodeName}}Lister interface {
List(namespace string, selector labels.Selector) (ret []*{{.prefix}}{{.schema.CodeName}}, err error)
Expand All @@ -52,8 +52,8 @@ type {{.schema.CodeName}}Controller interface {
Generic() controller.GenericController
Informer() cache.SharedIndexInformer
Lister() {{.schema.CodeName}}Lister
AddHandler(name string, handler {{.schema.CodeName}}HandlerFunc)
AddClusterScopedHandler(name, clusterName string, handler {{.schema.CodeName}}HandlerFunc)
AddHandler(ctx context.Context, name string, handler {{.schema.CodeName}}HandlerFunc)
AddClusterScopedHandler(ctx context.Context, name, clusterName string, handler {{.schema.CodeName}}HandlerFunc)
Enqueue(namespace, name string)
Sync(ctx context.Context) error
Start(ctx context.Context, threadiness int) error
Expand All @@ -71,10 +71,10 @@ type {{.schema.CodeName}}Interface interface {
Watch(opts metav1.ListOptions) (watch.Interface, error)
DeleteCollection(deleteOpts *metav1.DeleteOptions, listOpts metav1.ListOptions) error
Controller() {{.schema.CodeName}}Controller
AddHandler(name string, sync {{.schema.CodeName}}HandlerFunc)
AddLifecycle(name string, lifecycle {{.schema.CodeName}}Lifecycle)
AddClusterScopedHandler(name, clusterName string, sync {{.schema.CodeName}}HandlerFunc)
AddClusterScopedLifecycle(name, clusterName string, lifecycle {{.schema.CodeName}}Lifecycle)
AddHandler(ctx context.Context, name string, sync {{.schema.CodeName}}HandlerFunc)
AddLifecycle(ctx context.Context, name string, lifecycle {{.schema.CodeName}}Lifecycle)
AddClusterScopedHandler(ctx context.Context, name, clusterName string, sync {{.schema.CodeName}}HandlerFunc)
AddClusterScopedLifecycle(ctx context.Context, name, clusterName string, lifecycle {{.schema.CodeName}}Lifecycle)
}
type {{.schema.ID}}Lister struct {
Expand Down Expand Up @@ -123,34 +123,27 @@ func (c *{{.schema.ID}}Controller) Lister() {{.schema.CodeName}}Lister {
}
func (c *{{.schema.ID}}Controller) AddHandler(name string, handler {{.schema.CodeName}}HandlerFunc) {
c.GenericController.AddHandler(name, func(key string) error {
obj, exists, err := c.Informer().GetStore().GetByKey(key)
if err != nil {
return err
}
if !exists {
func (c *{{.schema.ID}}Controller) AddHandler(ctx context.Context, name string, handler {{.schema.CodeName}}HandlerFunc) {
c.GenericController.AddHandler(ctx, name, func(key string, obj interface{}) (interface{}, error) {
if obj == nil {
return handler(key, nil)
} else if v, ok := obj.(*{{.prefix}}{{.schema.CodeName}}); ok {
return handler(key, v)
} else {
return nil, nil
}
return handler(key, obj.(*{{.prefix}}{{.schema.CodeName}}))
})
}
func (c *{{.schema.ID}}Controller) AddClusterScopedHandler(name, cluster string, handler {{.schema.CodeName}}HandlerFunc) {
c.GenericController.AddHandler(name, func(key string) error {
obj, exists, err := c.Informer().GetStore().GetByKey(key)
if err != nil {
return err
}
if !exists {
func (c *{{.schema.ID}}Controller) AddClusterScopedHandler(ctx context.Context, name, cluster string, handler {{.schema.CodeName}}HandlerFunc) {
c.GenericController.AddHandler(ctx, name, func(key string, obj interface{}) (interface{}, error) {
if obj == nil {
return handler(key, nil)
} else if v, ok := obj.(*{{.prefix}}{{.schema.CodeName}}); ok && controller.ObjectInCluster(cluster, obj) {
return handler(key, v)
} else {
return nil, nil
}
if !controller.ObjectInCluster(cluster, obj) {
return nil
}
return handler(key, obj.(*{{.prefix}}{{.schema.CodeName}}))
})
}
Expand Down Expand Up @@ -245,21 +238,21 @@ func (s *{{.schema.ID}}Client) DeleteCollection(deleteOpts *metav1.DeleteOptions
return s.objectClient.DeleteCollection(deleteOpts, listOpts)
}
func (s *{{.schema.ID}}Client) AddHandler(name string, sync {{.schema.CodeName}}HandlerFunc) {
s.Controller().AddHandler(name, sync)
func (s *{{.schema.ID}}Client) AddHandler(ctx context.Context, name string, sync {{.schema.CodeName}}HandlerFunc) {
s.Controller().AddHandler(ctx, name, sync)
}
func (s *{{.schema.ID}}Client) AddLifecycle(name string, lifecycle {{.schema.CodeName}}Lifecycle) {
func (s *{{.schema.ID}}Client) AddLifecycle(ctx context.Context, name string, lifecycle {{.schema.CodeName}}Lifecycle) {
sync := New{{.schema.CodeName}}LifecycleAdapter(name, false, s, lifecycle)
s.AddHandler(name, sync)
s.Controller().AddHandler(ctx, name, sync)
}
func (s *{{.schema.ID}}Client) AddClusterScopedHandler(name, clusterName string, sync {{.schema.CodeName}}HandlerFunc) {
s.Controller().AddClusterScopedHandler(name, clusterName, sync)
func (s *{{.schema.ID}}Client) AddClusterScopedHandler(ctx context.Context, name, clusterName string, sync {{.schema.CodeName}}HandlerFunc) {
s.Controller().AddClusterScopedHandler(ctx, name, clusterName, sync)
}
func (s *{{.schema.ID}}Client) AddClusterScopedLifecycle(name, clusterName string, lifecycle {{.schema.CodeName}}Lifecycle) {
func (s *{{.schema.ID}}Client) AddClusterScopedLifecycle(ctx context.Context, name, clusterName string, lifecycle {{.schema.CodeName}}Lifecycle) {
sync := New{{.schema.CodeName}}LifecycleAdapter(name+"_"+clusterName, true, s, lifecycle)
s.AddClusterScopedHandler(name, clusterName, sync)
s.Controller().AddClusterScopedHandler(ctx, name, clusterName, sync)
}
`
10 changes: 6 additions & 4 deletions generator/lifecycle_template.go
Expand Up @@ -5,6 +5,7 @@ var lifecycleTemplate = `package {{.schema.Version.Version}}
import (
{{.importPackage}}
"k8s.io/apimachinery/pkg/runtime"
"github.com/rancher/norman/controller"
"github.com/rancher/norman/lifecycle"
)
Expand Down Expand Up @@ -45,11 +46,12 @@ func (w *{{.schema.ID}}LifecycleAdapter) Updated(obj runtime.Object) (runtime.Ob
func New{{.schema.CodeName}}LifecycleAdapter(name string, clusterScoped bool, client {{.schema.CodeName}}Interface, l {{.schema.CodeName}}Lifecycle) {{.schema.CodeName}}HandlerFunc {
adapter := &{{.schema.ID}}LifecycleAdapter{lifecycle: l}
syncFn := lifecycle.NewObjectLifecycleAdapter(name, clusterScoped, adapter, client.ObjectClient())
return func(key string, obj *{{.prefix}}{{.schema.CodeName}}) error {
if obj == nil {
return syncFn(key, nil)
return func(key string, obj *{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) {
newObj, err := syncFn(key, obj)
if o, ok := newObj.(*{{.prefix}}{{.schema.CodeName}}); ok {
return o, err
}
return syncFn(key, obj)
return nil, err
}
}
`

0 comments on commit 77869d2

Please sign in to comment.