diff --git a/server/clustering_test.go b/server/clustering_test.go index ca4f1e32..f29807e5 100644 --- a/server/clustering_test.go +++ b/server/clustering_test.go @@ -7808,7 +7808,7 @@ func TestClusteringRedeliveryCount(t *testing.T) { atomic.StoreInt32(&restarted, 1) s1 = runServerWithOpts(t, s1sOpts, nil) defer s1.Shutdown() - getLeader(t, 10*time.Second, s1, s2, s3) + leader := getLeader(t, 10*time.Second, s1, s2, s3) select { case e := <-errCh: @@ -7839,6 +7839,15 @@ func TestClusteringRedeliveryCount(t *testing.T) { case <-time.After(time.Second): t.Fatalf("Timedout") } + + // Make sure that deliver count map gets cleaned-up once messages are acknowledged. + sub := leader.clients.getSubs(clientName)[0] + waitForCount(t, 0, func() (string, int) { + sub.RLock() + l := len(sub.rdlvCount) + sub.RUnlock() + return "redelivery map size", l + }) } func testRemoveNode(t *testing.T, nc *nats.Conn, node string, timeoutExpected bool) { diff --git a/server/server.go b/server/server.go index 02eed93e..dcd3b020 100644 --- a/server/server.go +++ b/server/server.go @@ -5428,6 +5428,15 @@ func (s *StanServer) processAck(c *channel, sub *subState, sequence uint64, from return } delete(sub.acksPending, sequence) + // Remove from redelivery count map only if processing an ACK from the user, + // not simply when reassigning to a new member of a queue group. + if fromUser { + if qs != nil { + delete(qs.rdlvCount, sequence) + } else { + delete(sub.rdlvCount, sequence) + } + } } else if qs != nil && fromUser { // For queue members, if this is not an internally generated ACK // and we don't find the sequence in this sub's pending, we are @@ -5438,13 +5447,19 @@ func (s *StanServer) processAck(c *channel, sub *subState, sequence uint64, from continue } qsub.Lock() - if _, found := qsub.acksPending[sequence]; found { + _, found := qsub.acksPending[sequence] + if found { delete(qsub.acksPending, sequence) persistAck(qsub) - qsub.Unlock() - break } qsub.Unlock() + if found { + // We are still under the qstate lock. Since we found this message + // in one of the member of the group, remove it from the redelivery + // count map now. + delete(qs.rdlvCount, sequence) + break + } } sub.Lock() // Proceed with original sub (regardless if member was found diff --git a/server/server_queue_test.go b/server/server_queue_test.go index af9345ad..7aff5b60 100644 --- a/server/server_queue_test.go +++ b/server/server_queue_test.go @@ -1281,6 +1281,7 @@ func TestQueueRedeliveryCount(t *testing.T) { defer sc.Close() errCh := make(chan error, 2) + ch := make(chan bool, 1) var mu sync.Mutex var prev uint32 cb := func(m *stan.Msg) { @@ -1293,6 +1294,10 @@ func TestQueueRedeliveryCount(t *testing.T) { return } prev = m.RedeliveryCount + if m.RedeliveryCount == 5 { + m.Ack() + ch <- true + } mu.Unlock() } } @@ -1308,7 +1313,20 @@ func TestQueueRedeliveryCount(t *testing.T) { select { case e := <-errCh: t.Fatal(e.Error()) - case <-time.After(500 * time.Millisecond): - // ok! - } + case <-ch: + case <-time.After(time.Second): + t.Fatalf("Timedout") + } + + // Make sure that deliver count map gets cleaned-up once messages are acknowledged. + sub := s.clients.getSubs(clientName)[0] + sub.RLock() + qs := sub.qstate + sub.RUnlock() + waitForCount(t, 0, func() (string, int) { + qs.RLock() + l := len(qs.rdlvCount) + qs.RUnlock() + return "queue redelivery map size", l + }) } diff --git a/server/server_redelivery_test.go b/server/server_redelivery_test.go index 0998b44c..fdd6417b 100644 --- a/server/server_redelivery_test.go +++ b/server/server_redelivery_test.go @@ -1748,6 +1748,15 @@ func TestPersistentStoreRedeliveryCount(t *testing.T) { case <-time.After(time.Second): t.Fatalf("Timedout") } + + // Make sure that deliver count map gets cleaned-up once messages are acknowledged. + sub := s.clients.getSubs(clientName)[0] + waitForCount(t, 0, func() (string, int) { + sub.RLock() + l := len(sub.rdlvCount) + sub.RUnlock() + return "redelivery map size", l + }) } type testRdlvRaceWithAck struct {