Skip to content

Commit

Permalink
Fix goroutine leak on failed sync() or before
Browse files Browse the repository at this point in the history
A leak would occur if either a GenericController was created
and Sync() or Start() was never called or if Sync() failed.
  • Loading branch information
ibuildthecloud authored and Denise committed Nov 26, 2019
1 parent 530c659 commit 6269ccd
Showing 1 changed file with 29 additions and 11 deletions.
40 changes: 29 additions & 11 deletions controller/generic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type genericController struct {
generation int
informer cache.SharedIndexInformer
handlers []*handlerDef
preStart []string
queue workqueue.RateLimitingInterface
name string
running bool
Expand All @@ -91,15 +92,8 @@ func NewGenericController(name string, genericClient Backend) GenericController
},
genericClient.ObjectFactory().Object(), resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})

rl := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)

return &genericController{
informer: informer,
queue: workqueue.NewNamedRateLimitingQueue(rl, name),
name: name,
}
}
Expand All @@ -117,10 +111,14 @@ func (g *genericController) Informer() cache.SharedIndexInformer {
}

func (g *genericController) Enqueue(namespace, name string) {
if namespace == "" {
g.queue.Add(name)
key := name
if namespace != "" {
key = namespace + "/" + name
}
if g.queue == nil {
g.preStart = append(g.preStart, key)
} else {
g.queue.Add(namespace + "/" + name)
g.queue.AddRateLimited(key)
}
}

Expand Down Expand Up @@ -155,11 +153,31 @@ func (g *genericController) Sync(ctx context.Context) error {
return g.sync(ctx)
}

func (g *genericController) sync(ctx context.Context) error {
func (g *genericController) sync(ctx context.Context) (retErr error) {
if g.synced {
return nil
}

if g.queue == nil {
rl := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)

g.queue = workqueue.NewNamedRateLimitingQueue(rl, g.name)
for _, key := range g.preStart {
g.queue.Add(key)
}
g.preStart = nil

defer func() {
if retErr != nil {
g.queue.ShutDown()
}
}()
}

defer utilruntime.HandleCrash()

g.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down

0 comments on commit 6269ccd

Please sign in to comment.