Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-0.17] 🐛 Runnable group should check if stopped before enqueueing #2761

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pkg/manager/runnable_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,15 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
r.start.Unlock()
}

// Recheck if we're stopped and hold the readlock, given that the stop and start can be called
// at the same time, we can end up in a situation where the runnable is added
// after the group is stopped and the channel is closed.
r.stop.RLock()
defer r.stop.RUnlock()
if r.stopped {
return errRunnableGroupStopped
}

// Enqueue the runnable.
r.ch <- readyRunnable
return nil
Expand All @@ -272,7 +281,11 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
func (r *runnableGroup) StopAndWait(ctx context.Context) {
r.stopOnce.Do(func() {
// Close the reconciler channel once we're done.
defer close(r.ch)
defer func() {
r.stop.Lock()
close(r.ch)
r.stop.Unlock()
}()

_ = r.Start(ctx)
r.stop.Lock()
Expand Down
36 changes: 36 additions & 0 deletions pkg/manager/runnable_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,42 @@ var _ = Describe("runnableGroup", func() {
}
})

It("should be able to handle adding runnables while stopping", func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
rg := newRunnableGroup(defaultBaseContext, errCh)

go func() {
defer GinkgoRecover()
<-time.After(1 * time.Millisecond)
Expect(rg.Start(ctx)).To(Succeed())
}()
go func() {
defer GinkgoRecover()
<-time.After(1 * time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
cancel()
rg.StopAndWait(ctx)
}()

for i := 0; i < 200; i++ {
go func(i int) {
defer GinkgoRecover()

<-time.After(time.Duration(i) * time.Microsecond)
Expect(rg.Add(RunnableFunc(func(c context.Context) error {
<-ctx.Done()
return nil
}), func(_ context.Context) bool {
return true
})).To(SatisfyAny(
Succeed(),
Equal(errRunnableGroupStopped),
))
}(i)
}
})

It("should not turn ready if some readiness check fail", func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand Down