From cd63766b8dec04cd969b638db8939867c7128cca Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Mon, 27 Nov 2023 17:01:50 -0600 Subject: [PATCH] respect provided context in Stop + StopAndCancel An earlier refactor caused these graceful shutdown routines to stop respecting the provided context. No matter what happened with the provided context, they would continue blocking, though they would ultimately return the context's error upon return. With this change, both stop methods will return early if the provided context is cancelled or timed out _prior_ to the clean shutdown completing. This makes it straightforward to implement a shutdown flow which first calls `Stop` with a context that expires after 5 seconds, and after that call `StopAndCancel`. If the shutdown _still_ isn't complete, the user can choose to exit anyway. Partially fixes #78. --- client.go | 19 ++++++----- client_test.go | 2 +- example_graceful_shutdown_test.go | 54 ++++++++++++------------------- 3 files changed, 31 insertions(+), 44 deletions(-) diff --git a/client.go b/client.go index a94dc033..d6a03ede 100644 --- a/client.go +++ b/client.go @@ -668,7 +668,8 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) { // Stop performs a graceful shutdown of the Client. It signals all producers // to stop fetching new jobs and waits for any fetched or in-progress jobs to -// complete before exiting. +// complete before exiting. If the provided context is done before shutdown has +// completed, Stop will return immediately with the context's error. // // There's no need to call this method if a hard stop has already been initiated // by cancelling the context passed to Start or by calling StopAndCancel. @@ -679,22 +680,20 @@ func (c *Client[TTx]) Stop(ctx context.Context) error { } func (c *Client[TTx]) awaitStop(ctx context.Context) error { - <-c.stopComplete - - // If context was cancelled, return that error. select { case <-ctx.Done(): return ctx.Err() - default: + case <-c.stopComplete: + return nil } - - return nil } // StopAndCancel shuts down the client and cancels all work in progress. It is a -// more aggressive stop than Stop because the contexts for any -// in-progress jobs are cancelled. However, it still waits for jobs to complete -// before returning, even though their contexts are cancelled. +// more aggressive stop than Stop because the contexts for any in-progress jobs +// are cancelled. However, it still waits for jobs to complete before returning, +// even though their contexts are cancelled. If the provided context is done +// before shutdown has completed, Stop will return immediately with the +// context's error. // // This can also be initiated by cancelling the context passed to Run. There is // no need to call this method if the context passed to Run is cancelled diff --git a/client_test.go b/client_test.go index 6488cc17..b24b7b01 100644 --- a/client_test.go +++ b/client_test.go @@ -437,8 +437,8 @@ func Test_Client_Stop(t *testing.T) { select { case <-jobDoneChan: + require.FailNow(t, "Expected Stop to return before job was done") default: - require.FailNow(t, "Expected job to be done before stop returns") } }) diff --git a/example_graceful_shutdown_test.go b/example_graceful_shutdown_test.go index c2a5e6ec..7feb5f36 100644 --- a/example_graceful_shutdown_test.go +++ b/example_graceful_shutdown_test.go @@ -105,52 +105,40 @@ func Example_gracefulShutdown() { <-sigintOrTerm fmt.Printf("Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)\n") - softStopSucceeded := make(chan struct{}) + softStopCtx, softStopCtxCancel := context.WithTimeout(ctx, 10*time.Second) + defer softStopCtxCancel() + go func() { - if err := riverClient.Stop(ctx); err != nil { - if !errors.Is(err, context.Canceled) { - panic(err) - } + select { + case <-sigintOrTerm: + fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n") + softStopCtxCancel() + case <-softStopCtx.Done(): + fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n") } - close(softStopSucceeded) }() - // Wait for soft stop to succeed, or another SIGINT/SIGTERM. - select { - case <-sigintOrTerm: - fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n") - - case <-time.After(10 * time.Second): - fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n") - - case <-softStopSucceeded: - // Will never be reached in this example. - return + if err := riverClient.Stop(softStopCtx); err != nil { + if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { + panic(err) + } } - hardStopSucceeded := make(chan struct{}) - go func() { - if err := riverClient.StopAndCancel(ctx); err != nil { - if !errors.Is(err, context.Canceled) { - panic(err) - } - } - close(hardStopSucceeded) - }() + hardStopCtx, hardStopCtxCancel := context.WithTimeout(ctx, 10*time.Second) + defer hardStopCtxCancel() // As long as all jobs respect context cancellation, StopAndCancel will // always work. However, in the case of a bug where a job blocks despite // being cancelled, it may be necessary to either ignore River's stop // result (what's shown here) or have a supervisor kill the process. - select { - case <-sigintOrTerm: - fmt.Printf("Received SIGINT/SIGTERM again; ignoring stop procedure and exiting unsafely\n") - - case <-time.After(10 * time.Second): + err = riverClient.StopAndCancel(hardStopCtx) + if err != nil && errors.Is(err, context.DeadlineExceeded) { fmt.Printf("Hard stop timeout; ignoring stop procedure and exiting unsafely\n") - - case <-hardStopSucceeded: + } else if err != nil { + panic(err) } + + // hard stop succeeded }() // Make sure our job starts being worked before doing anything else.