Skip to content

Commit

Permalink
Merge e5eb101 into 07ec814
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Apr 26, 2019
2 parents 07ec814 + e5eb101 commit 9416503
Show file tree
Hide file tree
Showing 2 changed files with 325 additions and 91 deletions.
153 changes: 152 additions & 1 deletion server/clustering_test.go
Expand Up @@ -2391,7 +2391,7 @@ func TestClusteringRaftLogReplay(t *testing.T) {
waitForAcks(t, leader, clientName, 1, 1)
subs := leader.clients.getSubs(clientName)
// Flush the replication of SentMsg
leader.flushReplicatedSentAndAckSeqs(subs[0], true)
leader.replicateSubSentAndAck(subs[0])

atomic.StoreInt32(&doAckMsg, 1)
servers = removeServer(servers, leader)
Expand Down Expand Up @@ -5059,3 +5059,154 @@ func TestClusteringNoIncorrectMaxSubs(t *testing.T) {
sc.Close()
s1.Shutdown()
}

func TestClusteringDeadlockOnClientClose(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

getLeader(t, 10*time.Second, s1, s2, s3)

ch := make(chan bool, 1)
numMsgs := 1000
numSubs := 20
numQSubs := 5
gCount := int32(0)
total := int32((numSubs + numQSubs) * numMsgs)

cb := func(sc stan.Conn) func(*stan.Msg) {
count := 0
return func(_ *stan.Msg) {
count++
if count == numMsgs {
sc.Close()
}
if n := atomic.AddInt32(&gCount, 1); n == total {
ch <- true
}
}
}

for i := 0; i < numSubs; i++ {
sc, err := stan.Connect(clusterName, fmt.Sprintf("sub%d", i))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
defer sc.Close()

// Create some plain, some durables
if i >= numSubs/2 {
if _, err := sc.Subscribe("foo", cb(sc), stan.DurableName("dur")); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
} else {
if _, err := sc.Subscribe("foo", cb(sc)); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
}
}
// Create queue subs on different groups so they each get a message
for i := 0; i < numQSubs; i++ {
sc, err := stan.Connect(clusterName, fmt.Sprintf("qsub%d", i))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
defer sc.Close()

if _, err := sc.QueueSubscribe("foo", fmt.Sprintf("group%d", i), cb(sc)); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
}

pubConn := NewDefaultConnection(t)
defer pubConn.Close()

for i := 0; i < numMsgs; i++ {
if err := pubConn.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
pubConn.Close()

select {
case <-ch:
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive all msgs")
}

waitForNumClients(t, s1, 0)
s1.Shutdown()
}

func TestClusteringReplSubSentAckWhileClosing(t *testing.T) {
testSubSentAndAckSlowApply = true
defer func() { testSubSentAndAckSlowApply = false }()

cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

sc := NewDefaultConnection(t)
defer sc.Close()

for i := 0; i < 200; i++ {
if _, err := sc.PublishAsync("foo", []byte("hello"), nil); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}

count := 0
cb := func(_ *stan.Msg) {
count++
if count == 101 {
sc.Close()
}
}
if _, err := sc.Subscribe("foo",
cb,
stan.DurableName("dur"),
stan.DeliverAllAvailable(),
stan.SetManualAckMode()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

waitForNumClients(t, s1, 0)
s1.Shutdown()
}

0 comments on commit 9416503

Please sign in to comment.