diff --git a/client.go b/client.go index a94dc033..d6a03ede 100644 --- a/client.go +++ b/client.go @@ -668,7 +668,8 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) { // Stop performs a graceful shutdown of the Client. It signals all producers // to stop fetching new jobs and waits for any fetched or in-progress jobs to -// complete before exiting. +// complete before exiting. If the provided context is done before shutdown has +// completed, Stop will return immediately with the context's error. // // There's no need to call this method if a hard stop has already been initiated // by cancelling the context passed to Start or by calling StopAndCancel. @@ -679,22 +680,20 @@ func (c *Client[TTx]) Stop(ctx context.Context) error { } func (c *Client[TTx]) awaitStop(ctx context.Context) error { - <-c.stopComplete - - // If context was cancelled, return that error. select { case <-ctx.Done(): return ctx.Err() - default: + case <-c.stopComplete: + return nil } - - return nil } // StopAndCancel shuts down the client and cancels all work in progress. It is a -// more aggressive stop than Stop because the contexts for any -// in-progress jobs are cancelled. However, it still waits for jobs to complete -// before returning, even though their contexts are cancelled. +// more aggressive stop than Stop because the contexts for any in-progress jobs +// are cancelled. However, it still waits for jobs to complete before returning, +// even though their contexts are cancelled. If the provided context is done +// before shutdown has completed, Stop will return immediately with the +// context's error. // // This can also be initiated by cancelling the context passed to Run. There is // no need to call this method if the context passed to Run is cancelled diff --git a/client_test.go b/client_test.go index 6488cc17..b24b7b01 100644 --- a/client_test.go +++ b/client_test.go @@ -437,8 +437,8 @@ func Test_Client_Stop(t *testing.T) { select { case <-jobDoneChan: + require.FailNow(t, "Expected Stop to return before job was done") default: - require.FailNow(t, "Expected job to be done before stop returns") } }) diff --git a/example_graceful_shutdown_test.go b/example_graceful_shutdown_test.go index c2a5e6ec..7feb5f36 100644 --- a/example_graceful_shutdown_test.go +++ b/example_graceful_shutdown_test.go @@ -105,52 +105,40 @@ func Example_gracefulShutdown() { <-sigintOrTerm fmt.Printf("Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)\n") - softStopSucceeded := make(chan struct{}) + softStopCtx, softStopCtxCancel := context.WithTimeout(ctx, 10*time.Second) + defer softStopCtxCancel() + go func() { - if err := riverClient.Stop(ctx); err != nil { - if !errors.Is(err, context.Canceled) { - panic(err) - } + select { + case <-sigintOrTerm: + fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n") + softStopCtxCancel() + case <-softStopCtx.Done(): + fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n") } - close(softStopSucceeded) }() - // Wait for soft stop to succeed, or another SIGINT/SIGTERM. - select { - case <-sigintOrTerm: - fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n") - - case <-time.After(10 * time.Second): - fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n") - - case <-softStopSucceeded: - // Will never be reached in this example. - return + if err := riverClient.Stop(softStopCtx); err != nil { + if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { + panic(err) + } } - hardStopSucceeded := make(chan struct{}) - go func() { - if err := riverClient.StopAndCancel(ctx); err != nil { - if !errors.Is(err, context.Canceled) { - panic(err) - } - } - close(hardStopSucceeded) - }() + hardStopCtx, hardStopCtxCancel := context.WithTimeout(ctx, 10*time.Second) + defer hardStopCtxCancel() // As long as all jobs respect context cancellation, StopAndCancel will // always work. However, in the case of a bug where a job blocks despite // being cancelled, it may be necessary to either ignore River's stop // result (what's shown here) or have a supervisor kill the process. - select { - case <-sigintOrTerm: - fmt.Printf("Received SIGINT/SIGTERM again; ignoring stop procedure and exiting unsafely\n") - - case <-time.After(10 * time.Second): + err = riverClient.StopAndCancel(hardStopCtx) + if err != nil && errors.Is(err, context.DeadlineExceeded) { fmt.Printf("Hard stop timeout; ignoring stop procedure and exiting unsafely\n") - - case <-hardStopSucceeded: + } else if err != nil { + panic(err) } + + // hard stop succeeded }() // Make sure our job starts being worked before doing anything else.