Skip to content

Commit

Permalink
Add AddWorkers function to StoppableWorkers (#3899)
Browse files Browse the repository at this point in the history
It turns out several places set up background workers in multiple spots that can only be determined at runtime. So, this PR adds a way to put additional workers into the `StoppableWorkers` after the original ones are created.

I haven't tried this out on real hardware, but the compiler and linter are both happy, and I don't see anything obvious I could have broken that they wouldn't have caught.
  • Loading branch information
penguinland committed May 3, 2024
1 parent 30f6012 commit a543fe9
Showing 1 changed file with 22 additions and 4 deletions.
26 changes: 22 additions & 4 deletions utils/stoppable_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

// StoppableWorkers is a collection of goroutines that can be stopped at a later time.
type StoppableWorkers interface {
AddWorkers(...func(context.Context))
Stop()
Context() context.Context
}
Expand All @@ -22,6 +23,7 @@ type StoppableWorkers interface {
// of NewStoppableWorkers() would make a copy of it), so we do everything through the
// StoppableWorkers interface to avoid making copies (since interfaces do everything by pointer).
type stoppableWorkersImpl struct {
mu sync.Mutex
cancelCtx context.Context
cancelFunc func()
activeBackgroundWorkers sync.WaitGroup
Expand All @@ -31,23 +33,39 @@ type stoppableWorkersImpl struct {
func NewStoppableWorkers(funcs ...func(context.Context)) StoppableWorkers {
cancelCtx, cancelFunc := context.WithCancel(context.Background())
workers := &stoppableWorkersImpl{cancelCtx: cancelCtx, cancelFunc: cancelFunc}
workers.activeBackgroundWorkers.Add(len(funcs))
workers.AddWorkers(funcs...)
return workers
}

// AddWorkers starts up additional goroutines for each function passed in. If you call this after
// calling Stop(), it will return immediately without starting any new goroutines.
func (sw *stoppableWorkersImpl) AddWorkers(funcs ...func(context.Context)) {
sw.mu.Lock()
defer sw.mu.Unlock()

if sw.cancelCtx.Err() != nil { // We've already stopped everything.
return
}

sw.activeBackgroundWorkers.Add(len(funcs))
for _, f := range funcs {
// In Go 1.21 and earlier, variables created in a loop were reused from one iteration to
// the next. Make a "fresh" copy of it here so that, if we're on to the next iteration of
// the loop before the goroutine starts up, it starts this function instead of the next
// one. For details, see https://go.dev/blog/loopvar-preview
f := f
goutils.PanicCapturingGo(func() {
defer workers.activeBackgroundWorkers.Done()
f(cancelCtx)
defer sw.activeBackgroundWorkers.Done()
f(sw.cancelCtx)
})
}
return workers
}

// Stop shuts down all the goroutines we started up.
func (sw *stoppableWorkersImpl) Stop() {
sw.mu.Lock()
defer sw.mu.Unlock()

sw.cancelFunc()
sw.activeBackgroundWorkers.Wait()
}
Expand Down

0 comments on commit a543fe9

Please sign in to comment.