Skip to content

Commit

Permalink
[FIXED] No attempt to redeliver message if error during lookup
Browse files Browse the repository at this point in the history
During a message redelivery, if the lookup were to fail (say due
to a transient storage error), the message would be implicitly
acknowledged and no other attempt to redeliver this message would
be made.
With this change, the server will try again to redeliver at the
next redelivery interval.

Resolves #1060

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jun 10, 2020
1 parent f47739e commit 3eb04d4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
1 change: 1 addition & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3677,6 +3677,7 @@ func (s *StanServer) getMsgForRedelivery(c *channel, sub *subState, seq uint64)
if err != nil {
s.log.Errorf("Error getting message for redelivery subid=%d, seq=%d, err=%v",
sub.ID, seq, err)
return nil
}
// Ack it so that it does not reincarnate on restart
s.processAck(c, sub, seq, false)
Expand Down
21 changes: 17 additions & 4 deletions server/server_storefailures_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2019 The NATS Authors
// Copyright 2016-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -215,9 +215,15 @@ func TestMsgLookupFailures(t *testing.T) {
sub.Unsubscribe()

// Create subscription, manual ack mode, don't ack, wait for redelivery
sub, err = sc.Subscribe("foo", func(_ *stan.Msg) {
rcvCh <- true
}, stan.DeliverAllAvailable(), stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(15)))
rdlvCh := make(chan bool)
sub, err = sc.Subscribe("foo", func(m *stan.Msg) {
if !m.Redelivered {
rcvCh <- true
} else if m.RedeliveryCount == 3 {
rdlvCh <- true
m.Ack()
}
}, stan.DeliverAllAvailable(), stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(50)))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
Expand Down Expand Up @@ -248,6 +254,13 @@ func TestMsgLookupFailures(t *testing.T) {
mms.Lock()
mms.fail = false
mms.Unlock()

// Now make sure that we do get it redelivered when the error clears
select {
case <-rdlvCh:
case <-time.After(time.Second):
t.Fatal("Redelivery should have continued until error cleared")
}
sub.Unsubscribe()
}

Expand Down

0 comments on commit 3eb04d4

Please sign in to comment.