From 6689b0b55650c0313c499742d6db9e8bbaabaede Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 14 May 2019 10:06:20 -0600 Subject: [PATCH] [FIXED] Clustering: restore snapshot with some messages no longer avail When a snapshot is performed prior to messages being removed (either expired or remove due to count/size limits), a node that would restore from this snapshot would ask the leader to send messages which indexes (first/last) are in the snapshot. Both leader and node would stop the restore when the first unavail message was found. Resolves #834 Signed-off-by: Ivan Kozlovic --- server/clustering_test.go | 104 +++++++++++++++++++++++++++++++++++--- server/server.go | 3 -- server/snapshot.go | 21 ++++---- 3 files changed, 108 insertions(+), 20 deletions(-) diff --git a/server/clustering_test.go b/server/clustering_test.go index cf35487a..e0d52e54 100644 --- a/server/clustering_test.go +++ b/server/clustering_test.go @@ -37,6 +37,7 @@ import ( "github.com/nats-io/go-nats-streaming" "github.com/nats-io/go-nats-streaming/pb" "github.com/nats-io/nats-streaming-server/stores" + "github.com/nats-io/nats-streaming-server/test" ) var defaultRaftLog string @@ -59,6 +60,20 @@ func cleanupRaftLog(t *testing.T) { } } +func shutdownAndCleanupState(t *testing.T, s *StanServer, nodeID string) { + t.Helper() + s.Shutdown() + switch persistentStoreType { + case stores.TypeFile: + os.RemoveAll(filepath.Join(defaultDataStore, nodeID)) + os.RemoveAll(filepath.Join(defaultRaftLog, nodeID)) + case stores.TypeSQL: + test.CleanupSQLDatastore(t, testSQLDriver, testSQLSource+nodeID) + default: + t.Fatalf("This test needs to be updated for store type: %v", persistentStoreType) + } +} + func getTestDefaultOptsForClustering(id string, bootstrap bool) *Options { opts := getTestDefaultOptsForPersistentStore() if persistentStoreType == stores.TypeFile { @@ -4707,15 +4722,10 @@ func TestClusteringSubDontStallDueToMsgExpiration(t *testing.T) { } checkReceived() - s2.Shutdown() // We aremove all state from node "b", but even if we didn't, on restart, // since "b" store would be behind the rest, it would be emptied because // the current first message is move than the "b"'s last sequence. - // For DB, let it recover its state... - if persistentStoreType == stores.TypeFile { - os.RemoveAll(filepath.Join(defaultDataStore, "b")) - os.RemoveAll(filepath.Join(defaultRaftLog, "b")) - } + shutdownAndCleanupState(t, s2, "b") for i := 0; i < secondBatch; i++ { if err := sc.Publish("foo", []byte("hello")); err != nil { @@ -5711,3 +5721,85 @@ func TestClusteringNumSubs(t *testing.T) { sc.Close() leader.Shutdown() } + +func TestClusteringRestoreSnapshotWithSomeMsgsNoLongerAvail(t *testing.T) { + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + // For this test, use a central NATS server. + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", true) + s1sOpts.MaxMsgs = 10 + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2sOpts.MaxMsgs = 10 + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + // Configure third server. + s3sOpts := getTestDefaultOptsForClustering("c", false) + s3sOpts.MaxMsgs = 10 + s3 := runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + getLeader(t, 10*time.Second, s1, s2, s3) + + sc := NewDefaultConnection(t) + defer sc.Close() + + for i := 0; i < 10; i++ { + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + + // Create a snapshot that will indicate that there is messages from 1 to 10. + if err := s1.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error on snapshot: %v", err) + } + + // Shutdown s3 and cleanup its state so it will have nothing in its stores + // and will restore from the snapshot. + shutdownAndCleanupState(t, s3, "c") + + // Send 2 more messages that will make messages 1 and 2 disappear + for i := 0; i < 2; i++ { + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + sc.Close() + + // Start s3. It will restore from the snapshot that says that + // channel has message 1 to 10, and then should receive 2 raft + // logs with messages 11 and 12. + // When s3 will ask the leader for messages 1 and 2, it should + // get empty responses indicating that these messages are gone, + // but should be able to request messages 3 to 10, then 11 and + // 12 will be replayed from raft logs. + s3 = runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + waitFor(t, 5*time.Second, 15*time.Millisecond, func() error { + c := s3.channels.get("foo") + if c == nil { + return fmt.Errorf("Channel foo not recreated yet") + } + first, last, err := c.store.Msgs.FirstAndLastSequence() + if err != nil { + return fmt.Errorf("Error getting first/last seq: %v", err) + } + if first != 3 || last != 12 { + return fmt.Errorf("Expected first=3 last=12, got %v and %v", first, last) + } + return nil + }) +} diff --git a/server/server.go b/server/server.go index ebfcc033..9003a4a6 100644 --- a/server/server.go +++ b/server/server.go @@ -545,9 +545,6 @@ func (s *StanServer) subToSnapshotRestoreRequests() error { if err := s.ncsr.Publish(m.Reply, buf); err != nil { s.log.Errorf("Snapshot restore request error for channel %q, unable to send response for seq %v: %v", c.name, seq, err) } - if buf == nil { - return - } select { case <-s.shutdownCh: return diff --git a/server/snapshot.go b/server/snapshot.go index 776a506e..868d611d 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -445,17 +445,16 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64) error return err } // It is possible that the leader does not have this message because of - // channel limits. If resp.Data is empty, we are in this situation and - // we are done recovering snapshot. - if len(resp.Data) == 0 { - break - } - msg := &pb.MsgProto{} - if err := msg.Unmarshal(resp.Data); err != nil { - panic(err) - } - if _, err := c.store.Msgs.Store(msg); err != nil { - return err + // channel limits. If resp.Data is empty, we are in this situation. + // We need to continue to see if more recent messages are available though. + if len(resp.Data) != 0 { + msg := &pb.MsgProto{} + if err := msg.Unmarshal(resp.Data); err != nil { + panic(err) + } + if _, err := c.store.Msgs.Store(msg); err != nil { + return err + } } select { case <-r.server.shutdownCh: