Skip to content

Commit

Permalink
respect provided context in Stop + StopAndCancel
Browse files Browse the repository at this point in the history
An earlier refactor caused these graceful shutdown routines to stop
respecting the provided context. No matter what happened with the
provided context, they would continue blocking, though they would
ultimately return the context's error upon return.

With this change, both stop methods will return early if the provided
context is cancelled or timed out _prior_ to the clean shutdown
completing. This makes it straightforward to implement a shutdown flow
which first calls `Stop` with a context that expires after 5 seconds,
and after that call `StopAndCancel`. If the shutdown _still_ isn't
complete, the user can choose to exit anyway.

Partially fixes #78.
  • Loading branch information
bgentry committed Nov 27, 2023
1 parent dbab168 commit 4f0d0dd
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 44 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- `Stop` and `StopAndCancel` have been changed to respect the provided context argument. When that context is cancelled or times out, those methods will now immediately return with the context's error, even if shutdown has not yet completed. PR #79.

## [0.0.10] - 2023-11-26

### Added
Expand Down
19 changes: 9 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,8 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) {

// Stop performs a graceful shutdown of the Client. It signals all producers
// to stop fetching new jobs and waits for any fetched or in-progress jobs to
// complete before exiting.
// complete before exiting. If the provided context is done before shutdown has
// completed, Stop will return immediately with the context's error.
//
// There's no need to call this method if a hard stop has already been initiated
// by cancelling the context passed to Start or by calling StopAndCancel.
Expand All @@ -679,22 +680,20 @@ func (c *Client[TTx]) Stop(ctx context.Context) error {
}

func (c *Client[TTx]) awaitStop(ctx context.Context) error {
<-c.stopComplete

// If context was cancelled, return that error.
select {
case <-ctx.Done():
return ctx.Err()
default:
case <-c.stopComplete:
return nil
}

return nil
}

// StopAndCancel shuts down the client and cancels all work in progress. It is a
// more aggressive stop than Stop because the contexts for any
// in-progress jobs are cancelled. However, it still waits for jobs to complete
// before returning, even though their contexts are cancelled.
// more aggressive stop than Stop because the contexts for any in-progress jobs
// are cancelled. However, it still waits for jobs to complete before returning,
// even though their contexts are cancelled. If the provided context is done
// before shutdown has completed, Stop will return immediately with the
// context's error.
//
// This can also be initiated by cancelling the context passed to Run. There is
// no need to call this method if the context passed to Run is cancelled
Expand Down
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ func Test_Client_Stop(t *testing.T) {

select {
case <-jobDoneChan:
require.FailNow(t, "Expected Stop to return before job was done")
default:
require.FailNow(t, "Expected job to be done before stop returns")
}
})

Expand Down
54 changes: 21 additions & 33 deletions example_graceful_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,52 +105,40 @@ func Example_gracefulShutdown() {
<-sigintOrTerm
fmt.Printf("Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)\n")

softStopSucceeded := make(chan struct{})
softStopCtx, softStopCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer softStopCtxCancel()

go func() {
if err := riverClient.Stop(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
panic(err)
}
select {
case <-sigintOrTerm:
fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n")
softStopCtxCancel()
case <-softStopCtx.Done():
fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n")
}
close(softStopSucceeded)
}()

// Wait for soft stop to succeed, or another SIGINT/SIGTERM.
select {
case <-sigintOrTerm:
fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n")

case <-time.After(10 * time.Second):
fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n")

case <-softStopSucceeded:
// Will never be reached in this example.
return
if err := riverClient.Stop(softStopCtx); err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
panic(err)
}
}

hardStopSucceeded := make(chan struct{})
go func() {
if err := riverClient.StopAndCancel(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
panic(err)
}
}
close(hardStopSucceeded)
}()
hardStopCtx, hardStopCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer hardStopCtxCancel()

// As long as all jobs respect context cancellation, StopAndCancel will
// always work. However, in the case of a bug where a job blocks despite
// being cancelled, it may be necessary to either ignore River's stop
// result (what's shown here) or have a supervisor kill the process.
select {
case <-sigintOrTerm:
fmt.Printf("Received SIGINT/SIGTERM again; ignoring stop procedure and exiting unsafely\n")

case <-time.After(10 * time.Second):
err = riverClient.StopAndCancel(hardStopCtx)
if err != nil && errors.Is(err, context.DeadlineExceeded) {
fmt.Printf("Hard stop timeout; ignoring stop procedure and exiting unsafely\n")

case <-hardStopSucceeded:
} else if err != nil {
panic(err)
}

// hard stop succeeded
}()

// Make sure our job starts being worked before doing anything else.
Expand Down

0 comments on commit 4f0d0dd

Please sign in to comment.