From 214e2dbe0af5189aa32998a05a14a5a4e63237f9 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 19 Jul 2021 18:13:37 -0600 Subject: [PATCH] [FIXED] Clustering: snapshot queue group's last_sent In some situations where a member has fallen behind and other members left the group, a snapshot would have the lone member with a lower last_sent, which could cause older messages to be redelivered. Signed-off-by: Ivan Kozlovic --- server/clustering_test.go | 108 ++++++++++++++++++++++++++++++++++++++ server/snapshot.go | 3 ++ 2 files changed, 111 insertions(+) diff --git a/server/clustering_test.go b/server/clustering_test.go index 2d582b41..3c92a02b 100644 --- a/server/clustering_test.go +++ b/server/clustering_test.go @@ -8274,3 +8274,111 @@ func TestClusteringRaceCausesFollowerToRedeliverMsgs(t *testing.T) { // ok } } + +func TestClusteringSnapshotQSubLastSent(t *testing.T) { + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + // For this test, use a central NATS server. + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", true) + s1sOpts.ReplaceDurable = true + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + // Wait for it to bootstrap + getLeader(t, 10*time.Second, s1) + + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2sOpts.ReplaceDurable = true + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + // Configure third server. + s3sOpts := getTestDefaultOptsForClustering("c", false) + s3sOpts.ReplaceDurable = true + s3 := runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + sc1 := NewDefaultConnection(t) + defer sc1.Close() + + ch := make(chan bool, 1) + total := uint64(100) + sc1.QueueSubscribe("foo", "queue", func(m *stan.Msg) { + if m.Sequence == total { + select { + case ch <- true: + default: + } + } + }) + + nc2 := newNatsConnection(t) + defer nc2.Close() + sc2, err := stan.Connect(clusterName, clientName+"2", + stan.ConnectWait(250*time.Millisecond), + stan.NatsConn(nc2)) + if err != nil { + t.Fatalf("Error connecting: %v", err) + } + + sc2.QueueSubscribe("foo", "queue", func(m *stan.Msg) {}, stan.MaxInflight(1)) + nc2.Close() + + for i := 0; i < int(total); i++ { + sc1.Publish("foo", []byte("msg")) + } + if err := WaitTime(ch, 2*time.Second); err != nil { + t.Fatalf("Did not receive all messages: %v", err) + } + sc1.Close() + + // Make sure this is processed in the servers + servers := []*StanServer{s1, s2, s3} + for _, s := range servers { + waitForNumClients(t, s, 1) + } + for _, s := range servers { + if err := s.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error during snapshot: %v", err) + } + } + for _, s := range servers { + s.Shutdown() + } + + // Restart b and c + s2 = runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + s3 = runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + getLeader(t, 10*time.Second, s2, s3) + + sc1 = NewDefaultConnection(t) + defer sc1.Close() + + errCh := make(chan error, 1) + sc1.QueueSubscribe("foo", "queue", func(m *stan.Msg) { + // We should not be receiving anything. + select { + case errCh <- fmt.Errorf("Received message: %+v", m): + m.Sub.Unsubscribe() + default: + } + }) + select { + case err := <-errCh: + t.Fatal(err) + case <-time.After(250 * time.Millisecond): + // OK + } +} diff --git a/server/snapshot.go b/server/snapshot.go index 3fe16a6a..20ab1b41 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -164,6 +164,9 @@ func (s *serverSnapshot) snapshotChannels(snap *spb.RaftSnapshot) error { i++ } } + if sub.qstate != nil && sub.qstate.lastSent > state.LastSent { + state.LastSent = sub.qstate.lastSent + } return snapSub }