Skip to content

Commit

Permalink
double check the context for cancellation
Browse files Browse the repository at this point in the history
When the context is canceled, there's no guarantee that Consume() will return
context.Canceled. As a result, the context needs to be double-checked before
looping in order to shutdown cleanly.
  • Loading branch information
ewollesen committed Mar 29, 2024
1 parent f878754 commit 28473c8
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion events/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package events

import (
"context"
stderrors "errors"
"log"
"sync"

Expand Down Expand Up @@ -65,7 +66,15 @@ func (s *SaramaEventConsumer) Start() error {
// recreated to get the new claims
if err := cg.Consume(ctx, topics, handler); err != nil {
log.Printf("Error from consumer: %v", err)
if err == context.Canceled {
if stderrors.Is(err, context.Canceled) {
return ErrConsumerStopped
}
return err
}
// Double check the context isn't canceled before looping. This is necessary as
// Consume() sometimes returns nil when the context is canceled.
if err := ctx.Err(); err != nil {
if stderrors.Is(err, context.Canceled) {
return ErrConsumerStopped
}
return err
Expand Down

0 comments on commit 28473c8

Please sign in to comment.