Skip to content

Commit

Permalink
Merge pull request #1214 from nats-io/clustering_queue_redelivery_cha…
Browse files Browse the repository at this point in the history
…nges

[IMPROVED] Clustering: queue redelivery to different members
  • Loading branch information
kozlovic committed Oct 15, 2021
2 parents e06a494 + 771feb5 commit 3b51bc9
Show file tree
Hide file tree
Showing 3 changed files with 279 additions and 50 deletions.
6 changes: 5 additions & 1 deletion server/clustering.go
Expand Up @@ -51,9 +51,13 @@ var (
tportTimeout = defaultTPortTimeout
)

const (
testLazyReplicationInterval = 250 * time.Millisecond
)

func clusterSetupForTest() {
runningInTests = true
lazyReplicationInterval = 250 * time.Millisecond
lazyReplicationInterval = testLazyReplicationInterval
joinRaftGroupTimeout = 250 * time.Millisecond
tportTimeout = 250 * time.Millisecond
}
Expand Down
182 changes: 182 additions & 0 deletions server/clustering_test.go
Expand Up @@ -8502,3 +8502,185 @@ func TestClusteringMonitorQueueLastSentAfterHBTimeout(t *testing.T) {
// last_sent is set to 3.
checkLastSent()
}

func TestClusteringQueueRedelivery(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

sc := NewDefaultConnection(t)
defer sc.Close()

redelivered := int32(0)
if _, err := sc.QueueSubscribe("foo", "bar", func(m *stan.Msg) {
if m.Redelivered {
atomic.AddInt32(&redelivered, 1)
}
}, stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(100))); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}

// Verify that it gets redelivered
waitForAcks(t, s1, clientName, 1, 1)
// Same on other server
waitForAcks(t, s2, clientName, 1, 1)

// After few redeliveries, start a second member.
waitFor(t, time.Second, 100*time.Millisecond, func() error {
if n := atomic.LoadInt32(&redelivered); n < 2 {
return fmt.Errorf("Redelivery count is still %v", n)
}
return nil
})

// Start a second queue member, it should get the message and
// will ack it (auto-ack)
ok := make(chan bool, 1)
if _, err := sc.QueueSubscribe("foo", "bar", func(m *stan.Msg) {
if m.Redelivered {
ok <- true
}
}); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

select {
case <-ok:
case <-time.After(time.Second):
t.Fatalf("Message was not redelivered to second queue member")
}

// Number of acks for sub1 and sub2 should be down to 0.
waitForAcks(t, s1, clientName, 1, 0)
waitForAcks(t, s1, clientName, 2, 0)
// Same on s2.
waitForAcks(t, s2, clientName, 1, 0)
waitForAcks(t, s2, clientName, 2, 0)
}

func TestClusteringQueueRedeliverySentAndAck(t *testing.T) {
// Set this to something very large so we can manually cause the flush.
lazyReplicationInterval = time.Hour
defer func() { lazyReplicationInterval = testLazyReplicationInterval }()

cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

sc := NewDefaultConnection(t)
defer sc.Close()

qsubCh := make(chan stan.Subscription, 1)
ch := make(chan bool, 1)
// Create a queue sub with manual ack mode and ackwait of 250ms.
if _, err := sc.QueueSubscribe("foo", "queue", func(m *stan.Msg) {
if !m.Redelivered {
ch <- true
} else {
select {
case qs := <-qsubCh:
qs.Close()
ch <- true
default:
}
}
}, stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(250))); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
subs := s1.clients.getSubs(clientName)
qsub1 := subs[0]

// Send a message
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
// Wait for message to be received by qsub1
if err := Wait(ch); err != nil {
t.Fatalf("Did not our message")
}
// Now start a second queue sub member that should receive the message
// once the first qsub AckWait elapses
qsub2, err := sc.QueueSubscribe("foo", "queue", func(m *stan.Msg) {
if m.Redelivered {
ch <- true
}
}, stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(100)))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
qsubCh <- qsub2
// Wait for message to be received by qsub2
if err := Wait(ch); err != nil {
t.Fatalf("Did not our message")
}
// Now again wait that is redelivered to qsub1, which should
// close qsub2 and from now on, it should be the only one
// to get the message redelivered to.
if err := Wait(ch); err != nil {
t.Fatalf("Did not our message")
}
// Trigger the flush of sent/ack occurs now.
s1.replicateSubSentAndAck(qsub1)

// Wait for this to be replicated on s2
waitFor(t, time.Second, 15*time.Millisecond, func() error {
subs := s2.clients.getSubs(clientName)
if len(subs) != 1 {
return fmt.Errorf("Incorrect number of subs, expected 1, got %v", len(subs))
}
sub := subs[0]
sub.RLock()
if sub.ID != qsub1.ID {
sub.RUnlock()
return fmt.Errorf("Wrong subscription, expected subID %v, got %v", qsub1.ID, sub.ID)
}
lastSent := sub.LastSent
pending := len(sub.acksPending)
sub.RUnlock()
if lastSent != 1 {
return fmt.Errorf("Last sent should be 1, got %v", lastSent)
}
if pending != 1 {
return fmt.Errorf("There should be one message pending, got %v", pending)
}
return nil
})
}

0 comments on commit 3b51bc9

Please sign in to comment.