Skip to content

Commit

Permalink
Merge pull request #1061 from nats-io/fix_1060
Browse files Browse the repository at this point in the history
[FIXED] No attempt to redeliver message if error during lookup
  • Loading branch information
kozlovic committed Jun 10, 2020
2 parents f47739e + 3eb04d4 commit 44ae40f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
1 change: 1 addition & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3677,6 +3677,7 @@ func (s *StanServer) getMsgForRedelivery(c *channel, sub *subState, seq uint64)
if err != nil {
s.log.Errorf("Error getting message for redelivery subid=%d, seq=%d, err=%v",
sub.ID, seq, err)
return nil
}
// Ack it so that it does not reincarnate on restart
s.processAck(c, sub, seq, false)
Expand Down
21 changes: 17 additions & 4 deletions server/server_storefailures_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2019 The NATS Authors
// Copyright 2016-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -215,9 +215,15 @@ func TestMsgLookupFailures(t *testing.T) {
sub.Unsubscribe()

// Create subscription, manual ack mode, don't ack, wait for redelivery
sub, err = sc.Subscribe("foo", func(_ *stan.Msg) {
rcvCh <- true
}, stan.DeliverAllAvailable(), stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(15)))
rdlvCh := make(chan bool)
sub, err = sc.Subscribe("foo", func(m *stan.Msg) {
if !m.Redelivered {
rcvCh <- true
} else if m.RedeliveryCount == 3 {
rdlvCh <- true
m.Ack()
}
}, stan.DeliverAllAvailable(), stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(50)))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
Expand Down Expand Up @@ -248,6 +254,13 @@ func TestMsgLookupFailures(t *testing.T) {
mms.Lock()
mms.fail = false
mms.Unlock()

// Now make sure that we do get it redelivered when the error clears
select {
case <-rdlvCh:
case <-time.After(time.Second):
t.Fatal("Redelivery should have continued until error cleared")
}
sub.Unsubscribe()
}

Expand Down

0 comments on commit 44ae40f

Please sign in to comment.