Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Recreating ordered consumers on server restart #1425

Merged
merged 4 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 11 additions & 4 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var errOrderedSequenceMismatch = errors.New("sequence mismatch")

// Consume can be used to continuously receive messages and handle them with the provided callback function
func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error) {
if c.consumerType == consumerTypeNotSet || c.consumerType == consumerTypeConsume && c.currentConsumer == nil {
if (c.consumerType == consumerTypeNotSet || c.consumerType == consumerTypeConsume) && c.currentConsumer == nil {
err := c.reset()
if err != nil {
return nil, err
Expand Down Expand Up @@ -178,7 +178,8 @@ func (c *orderedConsumer) errHandler(serial int) func(cc ConsumeContext, err err
}
if errors.Is(err, ErrNoHeartbeat) ||
errors.Is(err, errOrderedSequenceMismatch) ||
errors.Is(err, ErrConsumerDeleted) {
errors.Is(err, ErrConsumerDeleted) ||
errors.Is(err, ErrConsumerNotFound) {
// only reset if serial matches the current consumer serial and there is no reset in progress
if serial == c.serial && atomic.LoadUint32(&c.resetInProgress) == 0 {
atomic.StoreUint32(&c.resetInProgress, 1)
Expand All @@ -190,7 +191,7 @@ func (c *orderedConsumer) errHandler(serial int) func(cc ConsumeContext, err err

// Messages returns [MessagesContext], allowing continuously iterating over messages on a stream.
func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, error) {
if c.consumerType == consumerTypeNotSet || c.consumerType == consumerTypeConsume && c.currentConsumer == nil {
if (c.consumerType == consumerTypeNotSet || c.consumerType == consumerTypeConsume) && c.currentConsumer == nil {
err := c.reset()
if err != nil {
return nil, err
Expand Down Expand Up @@ -386,13 +387,19 @@ func (c *orderedConsumer) reset() error {
defer c.Unlock()
defer atomic.StoreUint32(&c.resetInProgress, 0)
if c.currentConsumer != nil {
c.currentConsumer.Lock()
if c.currentConsumer.subscriptions[""] != nil {
c.currentConsumer.subscriptions[""].Stop()
}
consName := c.currentConsumer.CachedInfo().Name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we confident that consumer name is updated after recreation, so we always delete the proper one?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be good, in line 443 we do:

c.currentConsumer = cons.(*pullConsumer)

This is done right after successful create.

c.currentConsumer.Unlock()
var err error
for i := 0; ; i++ {
if c.cfg.MaxResetAttempts > 0 && i == c.cfg.MaxResetAttempts {
return fmt.Errorf("%w: maximum number of delete attempts reached: %s", ErrOrderedConsumerReset, err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = c.jetStream.DeleteConsumer(ctx, c.stream, c.currentConsumer.CachedInfo().Name)
err = c.jetStream.DeleteConsumer(ctx, c.stream, consName)
cancel()
if err != nil {
if errors.Is(err, ErrConsumerNotFound) {
Expand Down
181 changes: 154 additions & 27 deletions jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,32 +280,46 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
if !isConnected {
isConnected = true
// try fetching consumer info several times to make sure consumer is available after reconnect
for i := 0; i < 5; i++ {
backoffOpts := backoffOpts{
attempts: 10,
initialInterval: 1 * time.Second,
disableInitialExecution: true,
factor: 2,
maxInterval: 10 * time.Second,
cancel: sub.done,
}
err = retryWithBackoff(func(attempt int) (bool, error) {
isClosed := atomic.LoadUint32(&sub.closed) == 1
if isClosed {
return false, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := p.Info(ctx)
cancel()
if err == nil {
break
}
if err != nil {
if i == 4 {
sub.cleanup()
if sub.consumeOpts.ErrHandler != nil {
sub.consumeOpts.ErrHandler(sub, err)
if sub.consumeOpts.ErrHandler != nil {
err = fmt.Errorf("[%d] attempting to fetch consumer info after reconnect: %w", attempt, err)
if attempt == backoffOpts.attempts-1 {
err = errors.Join(err, fmt.Errorf("maximum retry attempts reached"))
}
sub.Unlock()
return
sub.consumeOpts.ErrHandler(sub, err)
}
return true, err
}
time.Sleep(5 * time.Second)
}
batchSize := sub.consumeOpts.MaxMessages
if sub.consumeOpts.StopAfter > 0 {
batchSize = min(batchSize, sub.consumeOpts.StopAfter-sub.delivered)
return false, nil
}, backoffOpts)
if err != nil {
if sub.consumeOpts.ErrHandler != nil {
sub.consumeOpts.ErrHandler(sub, err)
}
sub.Unlock()
sub.cleanup()
return
}

sub.fetchNext <- &pullRequest{
Expires: sub.consumeOpts.Expires,
Batch: batchSize,
Batch: sub.consumeOpts.MaxMessages,
MaxBytes: sub.consumeOpts.MaxBytes,
Heartbeat: sub.consumeOpts.Heartbeat,
}
Expand Down Expand Up @@ -527,21 +541,38 @@ func (s *pullSubscription) Next() (Msg, error) {
if !isConnected {
isConnected = true
// try fetching consumer info several times to make sure consumer is available after reconnect
for i := 0; i < 5; i++ {
backoffOpts := backoffOpts{
attempts: 10,
initialInterval: 1 * time.Second,
disableInitialExecution: true,
factor: 2,
maxInterval: 10 * time.Second,
cancel: s.done,
}
err = retryWithBackoff(func(attempt int) (bool, error) {
isClosed := atomic.LoadUint32(&s.closed) == 1
if isClosed {
return false, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := s.consumer.Info(ctx)
cancel()
if err == nil {
break
}
if err != nil {
if i == 4 {
s.Stop()
return nil, err
if errors.Is(err, ErrConsumerNotFound) {
return false, err
}
if attempt == backoffOpts.attempts-1 {
return true, fmt.Errorf("could not get consumer info after server reconnect: %w", err)
}
return true, err
}
time.Sleep(5 * time.Second)
return false, nil
}, backoffOpts)
if err != nil {
s.Stop()
return nil, err
}

s.pending.msgCount = 0
s.pending.byteCount = 0
if hbMonitor != nil {
Expand Down Expand Up @@ -578,6 +609,7 @@ func (s *pullSubscription) handleStatusMsg(msg *nats.Msg, msgErr error) error {
if s.consumeOpts.ErrHandler != nil {
s.consumeOpts.ErrHandler(s, err)
}
return err
}
s.pending.msgCount -= msgsLeft
if s.pending.msgCount < 0 {
Expand Down Expand Up @@ -608,6 +640,7 @@ func (s *pullSubscription) Stop() {
if atomic.LoadUint32(&s.closed) == 1 {
return
}
atomic.StoreUint32(&s.closed, 1)
close(s.done)
if s.consumeOpts.stopAfterMsgsLeft != nil {
if s.delivered >= s.consumeOpts.StopAfter {
Expand All @@ -616,7 +649,6 @@ func (s *pullSubscription) Stop() {
s.consumeOpts.stopAfterMsgsLeft <- s.consumeOpts.StopAfter - s.delivered
}
}
atomic.StoreUint32(&s.closed, 1)
}

// Fetch sends a single request to retrieve given number of messages.
Expand Down Expand Up @@ -812,6 +844,7 @@ func (s *pullSubscription) cleanup() {
close(s.connStatusChanged)
s.subscription = nil
delete(s.consumer.subscriptions, s.id)
atomic.StoreUint32(&s.closed, 1)
}

// pull sends a pull request to the server and waits for messages using a subscription from [pullSubscription].
Expand Down Expand Up @@ -912,3 +945,97 @@ func (consumeOpts *consumeOpts) setDefaults() error {
}
return nil
}

type backoffOpts struct {
// total retry attempts
// -1 for unlimited
attempts int
// initial interval after which first retry will be performed
// defaults to 1s
initialInterval time.Duration
// determines whether first function execution should be performed immediately
disableInitialExecution bool
// multiplier on each attempt
// defaults to 2
factor float64
// max interval between retries
// after reaching this value, all subsequent
// retries will be performed with this interval
// defaults to 1 minute
maxInterval time.Duration
// custom backoff intervals
// if set, overrides all other options except attempts
// if attempts are set, then the last interval will be used
// for all subsequent retries after reaching the limit
customBackoff []time.Duration
// cancel channel
// if set, retry will be cancelled when this channel is closed
cancel <-chan struct{}
Jarema marked this conversation as resolved.
Show resolved Hide resolved
}

func retryWithBackoff(f func(int) (bool, error), opts backoffOpts) error {
Jarema marked this conversation as resolved.
Show resolved Hide resolved
var err error
var shouldContinue bool
// if custom backoff is set, use it instead of other options
if len(opts.customBackoff) > 0 {
for i := 0; ; i++ {
if opts.attempts > 0 && i >= opts.attempts {
break
}
if i >= len(opts.customBackoff) {
if opts.attempts == 0 {
break
}
i = len(opts.customBackoff) - 1
}
shouldContinue, err = f(i)
if !shouldContinue {
return err
}
select {
case <-opts.cancel:
return nil
case <-time.After(opts.customBackoff[i]):
}
}
return err
}

// set default options
if opts.initialInterval == 0 {
opts.initialInterval = 1 * time.Second
}
if opts.factor == 0 {
opts.factor = 2
}
if opts.maxInterval == 0 {
opts.maxInterval = 1 * time.Minute
}
if opts.attempts == 0 {
return fmt.Errorf("retry attempts have to be set when not using custom backoff intervals")
}
interval := opts.initialInterval
for i := 0; ; i++ {
if i == 0 && opts.disableInitialExecution {
time.Sleep(interval)
continue
}
shouldContinue, err = f(i)
if !shouldContinue {
return err
}
interval = time.Duration(float64(interval) * opts.factor)
if interval >= opts.maxInterval {
interval = opts.maxInterval
}
if opts.attempts > 0 && i >= opts.attempts {
break
}
select {
case <-opts.cancel:
return nil
case <-time.After(interval):
}
}
return err
}