Skip to content

Commit

Permalink
Merge pull request #1139 from nats-io/fix_1138
Browse files Browse the repository at this point in the history
[FIXED] Possible race causing ack'ed message to be redelivered
  • Loading branch information
kozlovic committed Jan 8, 2021
2 parents 564e335 + d6cf6e3 commit 71d1dbc
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 5 deletions.
2 changes: 2 additions & 0 deletions server/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,10 @@ func (n *natsStreamLayer) Dial(address raft.ServerAddress, timeout time.Duration
return nil, err
}

peerConn.mu.Lock()
peerConn.sub = sub
peerConn.outbox = resp.Inbox
peerConn.mu.Unlock()
n.mu.Lock()
n.conns[peerConn] = struct{}{}
n.mu.Unlock()
Expand Down
26 changes: 21 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3550,6 +3550,13 @@ func (sub *subState) makeSortedPendingMsgs() []*pendingMsg {
return results
}

// Returns true if the message is in the pending map, false otherwise.
// Lock is held on entry.
func (sub *subState) isMsgStillPending(m *pb.MsgProto) bool {
_, pending := sub.acksPending[m.Sequence]
return pending
}

func qsLock(qs *queueState) {
if qs != nil {
qs.Lock()
Expand Down Expand Up @@ -3602,7 +3609,9 @@ func (s *StanServer) performDurableRedelivery(c *channel, sub *subState) {
qsLock(qs)
sub.Lock()
// Force delivery
s.sendMsgToSub(sub, m, forceDelivery)
if sub.isMsgStillPending(m) {
s.sendMsgToSub(sub, m, forceDelivery)
}
sub.Unlock()
qsUnlock(qs)
}
Expand Down Expand Up @@ -3721,23 +3730,30 @@ func (s *StanServer) performAckExpirationRedelivery(sub *subState, isStartup boo
// otherwise this could cause a message to be redelivered to multiple members.
if !isClustered && qs != nil && !isStartup {
qs.Lock()
pick, sent = s.sendMsgToQueueGroup(qs, m, forceDelivery)
sub.Lock()
msgPending := sub.isMsgStillPending(m)
sub.Unlock()
if msgPending {
pick, sent = s.sendMsgToQueueGroup(qs, m, forceDelivery)
}
qs.Unlock()
if pick == nil {
if msgPending && pick == nil {
s.log.Errorf("[Client:%s] Unable to find queue subscriber for subid=%d", clientID, subID)
break
}
// If the message is redelivered to a different queue subscriber,
// we need to process an implicit ack for the original subscriber.
// We do this only after confirmation that it was successfully added
// as pending on the other queue subscriber.
if pick != sub && sent {
if msgPending && pick != sub && sent {
s.processAck(c, sub, m.Sequence, false)
}
} else {
qsLock(qs)
sub.Lock()
s.sendMsgToSub(sub, m, forceDelivery)
if sub.isMsgStillPending(m) {
s.sendMsgToSub(sub, m, forceDelivery)
}
sub.Unlock()
qsUnlock(qs)
}
Expand Down
107 changes: 107 additions & 0 deletions server/server_redelivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"time"

natsdTest "github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats-streaming-server/stores"
"github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
"github.com/nats-io/stan.go/pb"
)

func TestRedelivery(t *testing.T) {
Expand Down Expand Up @@ -1742,3 +1744,108 @@ func TestPersistentStoreRedeliveryCount(t *testing.T) {
t.Fatalf("Timedout")
}
}

type testRdlvRaceWithAck struct {
stores.MsgStore
sync.Mutex
block bool
ch chan struct{}
dch chan struct{}
}

func (ms *testRdlvRaceWithAck) Lookup(seq uint64) (*pb.MsgProto, error) {
ms.Lock()
block := ms.block
ch := ms.ch
dch := ms.dch
ms.Unlock()
if block {
ch <- struct{}{}
<-dch
}
return ms.MsgStore.Lookup(seq)
}

func TestRedeliveryRaceWithAck(t *testing.T) {
for _, test := range []struct {
name string
queue string
}{
{"plain", ""},
{"queue", "queue"},
} {
t.Run(test.name, func(t *testing.T) {
s := runServer(t, clusterName)
defer s.Shutdown()

c, err := s.lookupOrCreateChannel("foo")
if err != nil {
t.Fatalf("Error on lookup: %v", err)
}
ms := &testRdlvRaceWithAck{
MsgStore: c.store.Msgs,
ch: make(chan struct{}, 1),
dch: make(chan struct{}),
}
s.clients.Lock()
c.store.Msgs = ms
s.clients.Unlock()

sc := NewDefaultConnection(t)
defer sc.Close()

msgCh := make(chan *stan.Msg, 10)
if _, err := sc.QueueSubscribe("foo", test.queue, func(m *stan.Msg) {
msgCh <- m
}, stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(250))); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

sc.Publish("foo", []byte("msg"))

var m *stan.Msg
select {
case m = <-msgCh:
case <-time.After(time.Second):
t.Fatalf("Did not get the message")
}
ms.Lock()
ms.block = true
ch := ms.ch
dch := ms.dch
ms.Unlock()

select {
case <-ch:
m.Ack()
sub := c.ss.getAllSubs()[0]
waitFor(t, time.Second, 15*time.Millisecond, func() error {
if test.queue != "" {
sub.qstate.RLock()
}
sub.RLock()
done := len(sub.acksPending) == 0
sub.RUnlock()
if test.queue != "" {
sub.qstate.RUnlock()
}
if !done {
return fmt.Errorf("message still pending")
}
return nil
})
close(dch)
case <-time.After(time.Second):
t.Fatalf("Lookup not invoked for redelivery")
}

select {
case m := <-msgCh:
t.Fatalf("Should not have received redelivered message: %+v", m)
case <-time.After(250 * time.Millisecond):
// OK
}
})
}

}

0 comments on commit 71d1dbc

Please sign in to comment.