Skip to content

Commit

Permalink
Merge 684f812 into 57c6c84
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed May 5, 2019
2 parents 57c6c84 + 684f812 commit a767485
Show file tree
Hide file tree
Showing 9 changed files with 890 additions and 90 deletions.
331 changes: 326 additions & 5 deletions server/clustering_test.go
Expand Up @@ -2392,9 +2392,13 @@ func TestClusteringRaftLogReplay(t *testing.T) {
subs := leader.clients.getSubs(clientName)
// Flush the replication of SentMsg
leader.replicateSubSentAndAck(subs[0])
// Wait for it to be received on followers.
servers = removeServer(servers, leader)
for _, s := range servers {
waitForAcks(t, s, clientName, 1, 1)
}

atomic.StoreInt32(&doAckMsg, 1)
servers = removeServer(servers, leader)
leader.Shutdown()
servers = removeServer(servers, leader)
getLeader(t, 10*time.Second, servers...)
Expand Down Expand Up @@ -5186,11 +5190,11 @@ func TestClusteringReplSubSentAckWhileClosing(t *testing.T) {
sc := NewDefaultConnection(t)
defer sc.Close()

for i := 0; i < 200; i++ {
if _, err := sc.PublishAsync("foo", []byte("hello"), nil); err != nil {
t.Fatalf("Error on publish: %v", err)
}
scPub, err := stan.Connect(clusterName, "pubconn")
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer scPub.Close()

count := 0
cb := func(_ *stan.Msg) {
Expand All @@ -5207,10 +5211,327 @@ func TestClusteringReplSubSentAckWhileClosing(t *testing.T) {
t.Fatalf("Error on subscribe: %v", err)
}

wg := sync.WaitGroup{}
wg.Add(1)
quitCh := make(chan struct{})
go func() {
defer wg.Done()
for {
if _, err := scPub.PublishAsync("foo", []byte("hello"), nil); err != nil {
return
}
select {
case <-quitCh:
return
default:
}
}
}()

waitForNumClients(t, s1, 1)
close(quitCh)
wg.Wait()
scPub.Close()
waitForNumClients(t, s1, 0)
s1.Shutdown()
}

func TestClusteringSubSentAckReplication(t *testing.T) {
for _, test := range []struct {
name string
queue string
durable string
}{
{"plain_sub", "", ""},
{"queue_sub", "queue", ""},
{"durable", "", "dur"},
{"durable_queue_sub", "queue", "dur"},
} {
t.Run(test.name, func(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

sc := NewDefaultConnection(t)
defer sc.Close()

opts := []stan.SubscriptionOption{
stan.SetManualAckMode(),
stan.AckWait(ackWaitInMs(100)),
}
if test.durable != "" {
opts = append(opts, stan.DurableName(test.durable))
}
ackNow := int32(0)
closeNow := int32(0)
if _, err := sc.QueueSubscribe("foo", test.queue, func(m *stan.Msg) {
if atomic.LoadInt32(&ackNow) == 1 {
m.Ack()
}
if atomic.LoadInt32(&closeNow) == 1 {
m.Sub.Close()
}
}, opts...); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

waitForNumSubs(t, s2, clientName, 1)

var opts2 []stan.SubscriptionOption
if test.durable != "" {
opts2 = append(opts2, stan.DurableName(test.durable+"_auto"))
}
var queueName string
if test.queue != "" {
queueName += "_auto"
}
asub, err := sc.QueueSubscribe("foo", queueName, func(m *stan.Msg) {
if atomic.LoadInt32(&closeNow) == 1 {
m.Sub.Close()
}
}, opts2...)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
defer asub.Close()

waitForNumSubs(t, s2, clientName, 2)

// Check when lots of sent/ack need to be replicated
for i := 0; i < 300; i++ {
sc.PublishAsync("foo", []byte("hello"), nil)
}
waitForAcks(t, s2, clientName, 1, 300)
waitForAcks(t, s2, clientName, 2, 0)

atomic.StoreInt32(&ackNow, 1)
waitForAcks(t, s2, clientName, 1, 0)

// Check when only few
atomic.StoreInt32(&ackNow, 0)
for i := 0; i < 20; i++ {
sc.PublishAsync("foo", []byte("hello"), nil)
}
waitForAcks(t, s2, clientName, 1, 20)
waitForAcks(t, s2, clientName, 2, 0)
atomic.StoreInt32(&ackNow, 1)
waitForAcks(t, s2, clientName, 1, 0)

// Check closing sub while publisher is publishing
wg := sync.WaitGroup{}
wg.Add(1)
quitCh := make(chan struct{})
started := make(chan bool, 1)
go func() {
defer wg.Done()
i := 0
for {
if _, err := sc.PublishAsync("foo", []byte("hello"), nil); err != nil {
return
}
if i++; i == 10 {
close(started)
}
select {
case <-quitCh:
return
default:
}
}
}()

<-started
atomic.StoreInt32(&closeNow, 1)
// Make sure subs are gone
waitForNumSubs(t, s1, clientName, 0)
waitForNumSubs(t, s2, clientName, 0)
// Stop sender
close(quitCh)
// Make sure we can create a sub and close it and that works ok.
sub, err := sc.Subscribe("foo", func(_ *stan.Msg) {})
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
waitForNumSubs(t, s1, clientName, 1)
waitForNumSubs(t, s2, clientName, 1)
sub.Close()
waitForNumSubs(t, s1, clientName, 0)
waitForNumSubs(t, s2, clientName, 0)
// Make sure we can stop correctly connection
sc.Close()
waitForNumClients(t, s1, 0)
waitForNumClients(t, s2, 0)
// Create and close and that works ok
sc = NewDefaultConnection(t)
waitForNumClients(t, s1, 1)
waitForNumClients(t, s2, 1)
sc.Close()
waitForNumClients(t, s1, 0)
waitForNumClients(t, s2, 0)
})
}
}

func TestClusteringSubSentAckReplResumeAfterLeadershipReacquired(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

sc := NewDefaultConnection(t)
defer sc.Close()

ackNow := int32(0)
_, err := sc.Subscribe("foo", func(m *stan.Msg) {
if atomic.LoadInt32(&ackNow) == 1 {
m.Ack()
}
}, stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(100)))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

for i := 0; i < 2; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}

// Wait for the 2 pending messages on s2
waitForAcks(t, s2, clientName, 1, 2)

// Kill s2 and restart it
s2.Shutdown()

verifyNoLeader(t, 5*time.Second, s1)

s2 = runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

servers := []*StanServer{s1, s2}
leader := getLeader(t, 10*time.Second, servers...)
servers = removeServer(servers, leader)
follower := servers[0]

for i := 0; i < 2; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}

// Wait for the 4 pending messages on follower
waitForAcks(t, follower, clientName, 1, 4)

// Make the sub ack the messages now.
atomic.StoreInt32(&ackNow, 1)

// Wait for follower to get the replication of messages being ack'ed
waitForAcks(t, follower, clientName, 1, 0)
sc.Close()
leader.Shutdown()
}

func TestClusteringSubSentAckReplResumeOnClusterRestart(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

sc := NewDefaultConnection(t)
defer sc.Close()

ackNow := int32(0)
_, err := sc.Subscribe("foo", func(m *stan.Msg) {
if atomic.LoadInt32(&ackNow) == 1 {
m.Ack()
}
}, stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(100)))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

for i := 0; i < 2; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}

// Wait for the 2 pending messages on s2
waitForAcks(t, s2, clientName, 1, 2)

// Restart the cluster.
s1.Shutdown()
s2.Shutdown()

s1 = runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()
s2 = runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

// Make sure we have 2 pending messages on both servers.
waitForAcks(t, s1, clientName, 1, 2)
waitForAcks(t, s2, clientName, 1, 2)

// Make the sub ack the messages now.
atomic.StoreInt32(&ackNow, 1)

// Wait for number of pending messages to fall to 0.
waitForAcks(t, s1, clientName, 1, 0)
waitForAcks(t, s2, clientName, 1, 0)
sc.Close()
}

type msgStoreDoesntFlush struct {
stores.MsgStore
}
Expand Down

0 comments on commit a767485

Please sign in to comment.