Skip to content

Commit

Permalink
[FIXED] Multiple deliveries of messages with delivery count going bac…
Browse files Browse the repository at this point in the history
…kwards. (#5305)

When we fail to deliver a message, we were not checking if this was a
redelivery already and would decrement o.sseq, meaning we would pick up
the same message after the next redelivery and would have a delivered
count of 1.

This could lead to a message being delivered from the redelivery queue,
but that could fail, then you would see same message twice, first with
dc of 2, then 1.

Now app only gets one copy with delivery count of 2.

Signed-off-by: Derek Collison <derek@nats.io>

---------

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
Signed-off-by: Derek Collison <derek@nats.io>
Co-authored-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
derekcollison and Jarema committed Apr 11, 2024
1 parent d83920c commit c3416ca
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 3 deletions.
6 changes: 3 additions & 3 deletions server/consumer.go
Expand Up @@ -4048,9 +4048,9 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
wr.hbt = time.Now().Add(wr.hb)
}
} else {
// We will redo this one.
o.sseq--
// We will redo this one as long as this is not a redelivery.
if dc == 1 {
o.sseq--
o.npc++
}
pmsg.returnToPool()
Expand Down Expand Up @@ -4492,7 +4492,7 @@ func (o *consumer) didNotDeliver(seq uint64, subj string) {
if _, ok := o.pending[seq]; ok {
// We found this messsage on pending, we need
// to queue it up for immediate redelivery since
// we know it was not delivered.
// we know it was not delivered
if !o.onRedeliverQueue(seq) {
o.addToRedeliverQueue(seq)
o.signalNewMessages()
Expand Down
88 changes: 88 additions & 0 deletions server/jetstream_consumer_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1078,6 +1079,93 @@ func TestJetStreamConsumerDelete(t *testing.T) {
}
}

func TestFetchWithDrain(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.LimitsPolicy,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C",
AckPolicy: nats.AckExplicitPolicy,
AckWait: time.Second * 10,
})
require_NoError(t, err)

const messages = 10_000

for i := 0; i < messages; i++ {
sendStreamMsg(t, nc, "foo", fmt.Sprintf("%d", i+1))
}

cr := JSApiConsumerGetNextRequest{
Batch: 100_000,
Expires: time.Second * 10,
}
crBytes, err := json.Marshal(cr)
require_NoError(t, err)

msgs := make(map[int]int)

processMsg := func(t *testing.T, sub *nats.Subscription, msgs map[int]int) bool {
msg, err := sub.NextMsg(time.Second * 1)
if err != nil {
return false
}
metadata, err := msg.Metadata()
require_NoError(t, err)
err = msg.Ack()
require_NoError(t, err)

v, err := strconv.Atoi(string(msg.Data))
require_NoError(t, err)
require_Equal(t, uint64(v), metadata.Sequence.Stream)

_, ok := msgs[int(metadata.Sequence.Stream)]
if _, ok := msgs[int(metadata.Sequence.Stream-1)]; !ok && len(msgs) > 0 {
t.Logf("Stream Sequence gap detected: current %d", metadata.Sequence.Stream)
}
if ok {
t.Fatalf("Message has been seen before")
}

msgs[int(metadata.Sequence.Stream)] = int(metadata.NumDelivered)

require_NoError(t, err)
return true
}

for {
inbox := nats.NewInbox()
sub, err := nc.SubscribeSync(inbox)
require_NoError(t, err)

err = nc.PublishRequest(fmt.Sprintf(JSApiRequestNextT, "TEST", "C"), inbox, crBytes)
require_NoError(t, err)

// Drain after first message processed.
processMsg(t, sub, msgs)
sub.Drain()

for {
if !processMsg(t, sub, msgs) {
if len(msgs) == messages {
return
}
break
}
}
}
}

func Benchmark____JetStreamConsumerIsFilteredMatch(b *testing.B) {
subject := "foo.bar.do.not.match.any.filter.subject"
for n := 1; n <= 1024; n *= 2 {
Expand Down

0 comments on commit c3416ca

Please sign in to comment.