Skip to content

Commit

Permalink
Merge 214e2db into aa11d83
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Jul 20, 2021
2 parents aa11d83 + 214e2db commit 3824d98
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 0 deletions.
108 changes: 108 additions & 0 deletions server/clustering_test.go
Expand Up @@ -8274,3 +8274,111 @@ func TestClusteringRaceCausesFollowerToRedeliverMsgs(t *testing.T) {
// ok
}
}

func TestClusteringSnapshotQSubLastSent(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.ReplaceDurable = true
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Wait for it to bootstrap
getLeader(t, 10*time.Second, s1)

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.ReplaceDurable = true
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.ReplaceDurable = true
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

sc1 := NewDefaultConnection(t)
defer sc1.Close()

ch := make(chan bool, 1)
total := uint64(100)
sc1.QueueSubscribe("foo", "queue", func(m *stan.Msg) {
if m.Sequence == total {
select {
case ch <- true:
default:
}
}
})

nc2 := newNatsConnection(t)
defer nc2.Close()
sc2, err := stan.Connect(clusterName, clientName+"2",
stan.ConnectWait(250*time.Millisecond),
stan.NatsConn(nc2))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}

sc2.QueueSubscribe("foo", "queue", func(m *stan.Msg) {}, stan.MaxInflight(1))
nc2.Close()

for i := 0; i < int(total); i++ {
sc1.Publish("foo", []byte("msg"))
}
if err := WaitTime(ch, 2*time.Second); err != nil {
t.Fatalf("Did not receive all messages: %v", err)
}
sc1.Close()

// Make sure this is processed in the servers
servers := []*StanServer{s1, s2, s3}
for _, s := range servers {
waitForNumClients(t, s, 1)
}
for _, s := range servers {
if err := s.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error during snapshot: %v", err)
}
}
for _, s := range servers {
s.Shutdown()
}

// Restart b and c
s2 = runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

s3 = runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

getLeader(t, 10*time.Second, s2, s3)

sc1 = NewDefaultConnection(t)
defer sc1.Close()

errCh := make(chan error, 1)
sc1.QueueSubscribe("foo", "queue", func(m *stan.Msg) {
// We should not be receiving anything.
select {
case errCh <- fmt.Errorf("Received message: %+v", m):
m.Sub.Unsubscribe()
default:
}
})
select {
case err := <-errCh:
t.Fatal(err)
case <-time.After(250 * time.Millisecond):
// OK
}
}
3 changes: 3 additions & 0 deletions server/snapshot.go
Expand Up @@ -164,6 +164,9 @@ func (s *serverSnapshot) snapshotChannels(snap *spb.RaftSnapshot) error {
i++
}
}
if sub.qstate != nil && sub.qstate.lastSent > state.LastSent {
state.LastSent = sub.qstate.lastSent
}
return snapSub
}

Expand Down

0 comments on commit 3824d98

Please sign in to comment.