Skip to content

Commit

Permalink
[FIXED] Redelivery of unack'ed msgs of leaving queue member
Browse files Browse the repository at this point in the history
When a queue member has unacknowledged messages and is leaving the
group, these messages are supposed to be immediately redelivered
to existing queue members. However, if a target member had unack'ed
messages of a sequence lower than the reassigned messages these
reassigned messages would not be redelivered until after the AckWait.

The issue was introduced in PR #729
with the incorrect change in ordering algo for the messages that
should expire.

Resolves #869

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jul 2, 2019
1 parent 718ebd3 commit d049586
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 15 deletions.
70 changes: 70 additions & 0 deletions server/clustering_test.go
Expand Up @@ -6450,3 +6450,73 @@ func TestClusteringSQLMsgStoreFlushed(t *testing.T) {
t.Fatalf("Expected some messages, got none")
}
}

func TestClusteringQueueMemberPendingCount(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

sc := NewDefaultConnection(t)
defer sc.Close()

count := int32(0)
if _, err := sc.QueueSubscribe("foo", "bar",
func(m *stan.Msg) {
if atomic.AddInt32(&count, 1) == 5 {
m.Sub.Close()
}
},
stan.SetManualAckMode(),
stan.MaxInflight(5)); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

if _, err := sc.QueueSubscribe("foo", "bar",
func(m *stan.Msg) {
// Delay a bit to give a chance to server to send to qsub1.
// If not for this delay, it is likely that this qsub would
// receive all messages past message 1.
time.Sleep(10 * time.Millisecond)
},
stan.MaxInflight(5)); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

total := 50
for i := 0; i < total; i++ {
sc.Publish("foo", []byte("msg"))
}

// Make sure that acksPending on follower get down to 0.
waitFor(t, 5*time.Second, 15*time.Millisecond, func() error {
subs := s2.clients.getSubs(clientName)
if len(subs) != 1 {
return fmt.Errorf("Should have only one sub")
}
sub := subs[0]
sub.RLock()
numPending := len(sub.acksPending)
sub.RUnlock()
if numPending != 0 {
return fmt.Errorf("Expected no pending, got %v", numPending)
}
return nil
})
}
13 changes: 8 additions & 5 deletions server/server.go
Expand Up @@ -3299,11 +3299,14 @@ type byExpire []*pendingMsg
func (a byExpire) Len() int { return (len(a)) }
func (a byExpire) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byExpire) Less(i, j int) bool {
// Always order by sequence since expire could be 0 (in
// case of a server restart) but even if it is not, if
// expire time happens to be the same, we still want
// messages to be ordered by sequence.
return a[i].seq < a[j].seq
// If expire is 0, it means the server was restarted
// and we don't have the expiration time, which will
// be set later, or if expiration is identical, then
// order by sequence instead.
if a[i].expire == 0 || a[j].expire == 0 || a[i].expire == a[j].expire {
return a[i].seq < a[j].seq
}
return a[i].expire < a[j].expire
}

// Returns an array of pendingMsg ordered by expiration date, unless
Expand Down
132 changes: 122 additions & 10 deletions server/server_redelivery_test.go
Expand Up @@ -1455,14 +1455,9 @@ func TestQueueRedeliveryAfterMembersCrash(t *testing.T) {
defer sc1.Close()

total := 20
count := int32(0)
ch := make(chan bool, 1)
cb := func(_ *stan.Msg) {
if n := int(atomic.AddInt32(&count, 1)); n == total {
ch <- true
}
}
if _, err := sc1.QueueSubscribe("foo", "bar", cb,

if _, err := sc1.QueueSubscribe("foo", "bar", func(_ *stan.Msg) {},
stan.SetManualAckMode(),
stan.DeliverAllAvailable(),
stan.DurableName("dur"),
stan.MaxInflight(1),
Expand All @@ -1473,7 +1468,8 @@ func TestQueueRedeliveryAfterMembersCrash(t *testing.T) {
sc2, nc2 := createConnectionWithNatsOpts(t, "member2")
defer nc2.Close()
defer sc2.Close()
if _, err := sc2.QueueSubscribe("foo", "bar", cb,
if _, err := sc2.QueueSubscribe("foo", "bar", func(_ *stan.Msg) {},
stan.SetManualAckMode(),
stan.DeliverAllAvailable(),
stan.DurableName("dur"),
stan.MaxInflight(1),
Expand All @@ -1485,14 +1481,20 @@ func TestQueueRedeliveryAfterMembersCrash(t *testing.T) {
waitForNumSubs(t, s, "member1", 1)
waitForNumSubs(t, s, "member2", 1)

for i := 0; i < 2; i++ {
if err := sc1.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}

// Simulate crash by closing the NATS connections.
nc1.Close()
nc2.Close()

// Produce messages
sc := NewDefaultConnection(t)
defer sc.Close()
for i := 0; i < total; i++ {
for i := 0; i < total-2; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
Expand All @@ -1512,6 +1514,14 @@ func TestQueueRedeliveryAfterMembersCrash(t *testing.T) {
}
defer sc2.Close()

count := int32(0)
ch := make(chan bool, 1)
cb := func(m *stan.Msg) {
if n := int(atomic.AddInt32(&count, 1)); n == total {
ch <- true
}
}

if _, err := sc1.QueueSubscribe("foo", "bar", cb,
stan.DeliverAllAvailable(),
stan.DurableName("dur"),
Expand All @@ -1532,3 +1542,105 @@ func TestQueueRedeliveryAfterMembersCrash(t *testing.T) {
t.Fatal("Did not get redelivered messages on time")
}
}

func TestQueueRedeliveryWithMsgsReassignedToMemberWithAcksPending(t *testing.T) {
s := runServer(t, clusterName)
defer s.Shutdown()

sc := NewDefaultConnection(t)
defer sc.Close()

// First check that the queue sub leaving is the one that
// had a msg with sequence smaller than the pending msg
// of the queue sub to which first message is reassigned.
for i := 0; i < 2; i++ {
sc.Publish("foo", []byte("msg"))
}

subCh := make(chan stan.Subscription, 1)
if _, err := sc.QueueSubscribe("foo", "bar",
func(m *stan.Msg) {
if m.Sequence == 1 && !m.Redelivered {
subCh <- m.Sub
}
},
stan.DeliverAllAvailable(),
stan.SetManualAckMode(),
stan.MaxInflight(1),
); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

// Wait for qsub1 to receive message
qsub1 := <-subCh

ch := make(chan bool, 1)
if _, err := sc.QueueSubscribe("foo", "bar",
func(m *stan.Msg) {
if m.Sequence == 2 && !m.Redelivered {
qsub1.Close()
} else if m.Sequence == 1 && m.Redelivered {
ch <- true
}
},
stan.DeliverAllAvailable(),
stan.MaxInflight(1),
stan.SetManualAckMode(),
); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

select {
case <-ch:
case <-time.After(2 * time.Second):
t.Fatalf("Message 2 was not immediately redelivered")
}

// Now test with situation where the leaving queue sub
// has a message with seq higher than the pending msg
// of the queue sub the message is reassigned to.

for i := 0; i < 2; i++ {
sc.Publish("baz", []byte("msg"))
}

if _, err := sc.QueueSubscribe("baz", "bar",
func(m *stan.Msg) {
if m.Redelivered && m.Sequence == 2 {
ch <- true
} else if m.Sequence == 1 && !m.Redelivered {
subCh <- m.Sub
}
},
stan.DeliverAllAvailable(),
stan.SetManualAckMode(),
stan.MaxInflight(1),
); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

// Wait for first qsub to get message seq1
<-subCh

// Now that we got the first message to the first queue sub,
// start a new queue sub that will get message 2 and close.
// We want message 2 to be redelivered right away to qsub 1
if _, err := sc.QueueSubscribe("baz", "bar",
func(m *stan.Msg) {
if m.Sequence == 2 {
m.Sub.Close()
}
},
stan.DeliverAllAvailable(),
stan.MaxInflight(1),
stan.SetManualAckMode(),
); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

select {
case <-ch:
case <-time.After(2 * time.Second):
t.Fatalf("Message 2 was not immediately redelivered")
}
}

0 comments on commit d049586

Please sign in to comment.