diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d9963b7..628fbb72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- `Stop` and `StopAndCancel` have been changed to respect the provided context argument. When that context is cancelled or times out, those methods will now immediately return with the context's error, even if shutdown has not yet completed. PR #79. + ## [0.0.10] - 2023-11-26 ### Added diff --git a/client.go b/client.go index a94dc033..d6a03ede 100644 --- a/client.go +++ b/client.go @@ -668,7 +668,8 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) { // Stop performs a graceful shutdown of the Client. It signals all producers // to stop fetching new jobs and waits for any fetched or in-progress jobs to -// complete before exiting. +// complete before exiting. If the provided context is done before shutdown has +// completed, Stop will return immediately with the context's error. // // There's no need to call this method if a hard stop has already been initiated // by cancelling the context passed to Start or by calling StopAndCancel. @@ -679,22 +680,20 @@ func (c *Client[TTx]) Stop(ctx context.Context) error { } func (c *Client[TTx]) awaitStop(ctx context.Context) error { - <-c.stopComplete - - // If context was cancelled, return that error. select { case <-ctx.Done(): return ctx.Err() - default: + case <-c.stopComplete: + return nil } - - return nil } // StopAndCancel shuts down the client and cancels all work in progress. It is a -// more aggressive stop than Stop because the contexts for any -// in-progress jobs are cancelled. However, it still waits for jobs to complete -// before returning, even though their contexts are cancelled. +// more aggressive stop than Stop because the contexts for any in-progress jobs +// are cancelled. However, it still waits for jobs to complete before returning, +// even though their contexts are cancelled. If the provided context is done +// before shutdown has completed, Stop will return immediately with the +// context's error. // // This can also be initiated by cancelling the context passed to Run. There is // no need to call this method if the context passed to Run is cancelled diff --git a/client_test.go b/client_test.go index 6488cc17..b24b7b01 100644 --- a/client_test.go +++ b/client_test.go @@ -437,8 +437,8 @@ func Test_Client_Stop(t *testing.T) { select { case <-jobDoneChan: + require.FailNow(t, "Expected Stop to return before job was done") default: - require.FailNow(t, "Expected job to be done before stop returns") } }) diff --git a/example_graceful_shutdown_test.go b/example_graceful_shutdown_test.go index c2a5e6ec..7feb5f36 100644 --- a/example_graceful_shutdown_test.go +++ b/example_graceful_shutdown_test.go @@ -105,52 +105,40 @@ func Example_gracefulShutdown() { <-sigintOrTerm fmt.Printf("Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)\n") - softStopSucceeded := make(chan struct{}) + softStopCtx, softStopCtxCancel := context.WithTimeout(ctx, 10*time.Second) + defer softStopCtxCancel() + go func() { - if err := riverClient.Stop(ctx); err != nil { - if !errors.Is(err, context.Canceled) { - panic(err) - } + select { + case <-sigintOrTerm: + fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n") + softStopCtxCancel() + case <-softStopCtx.Done(): + fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n") } - close(softStopSucceeded) }() - // Wait for soft stop to succeed, or another SIGINT/SIGTERM. - select { - case <-sigintOrTerm: - fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n") - - case <-time.After(10 * time.Second): - fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n") - - case <-softStopSucceeded: - // Will never be reached in this example. - return + if err := riverClient.Stop(softStopCtx); err != nil { + if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { + panic(err) + } } - hardStopSucceeded := make(chan struct{}) - go func() { - if err := riverClient.StopAndCancel(ctx); err != nil { - if !errors.Is(err, context.Canceled) { - panic(err) - } - } - close(hardStopSucceeded) - }() + hardStopCtx, hardStopCtxCancel := context.WithTimeout(ctx, 10*time.Second) + defer hardStopCtxCancel() // As long as all jobs respect context cancellation, StopAndCancel will // always work. However, in the case of a bug where a job blocks despite // being cancelled, it may be necessary to either ignore River's stop // result (what's shown here) or have a supervisor kill the process. - select { - case <-sigintOrTerm: - fmt.Printf("Received SIGINT/SIGTERM again; ignoring stop procedure and exiting unsafely\n") - - case <-time.After(10 * time.Second): + err = riverClient.StopAndCancel(hardStopCtx) + if err != nil && errors.Is(err, context.DeadlineExceeded) { fmt.Printf("Hard stop timeout; ignoring stop procedure and exiting unsafely\n") - - case <-hardStopSucceeded: + } else if err != nil { + panic(err) } + + // hard stop succeeded }() // Make sure our job starts being worked before doing anything else.