diff --git a/server/clustering_test.go b/server/clustering_test.go index d59ac536..43923c0b 100644 --- a/server/clustering_test.go +++ b/server/clustering_test.go @@ -6152,7 +6152,7 @@ func TestClusteringRestoreSnapshotWithSomeMsgsNoLongerAvailFromNewServer(t *test msgsCount := int32(0) notFound := int32(0) if _, err := nc.Subscribe(nats.InboxPrefix+">", func(m *nats.Msg) { - if m.Reply != restoreMsgsV2 { + if !strings.HasPrefix(m.Reply, restoreMsgsV2) { return } if len(m.Data) > 0 { @@ -6618,3 +6618,223 @@ func TestClusteringRaftLogging(t *testing.T) { t.Fatalf("Wrong tracing for raft log levels: %v", wrongLevels) } } + +type blockingLookupStore struct { + stores.MsgStore + inLookupCh chan struct{} + releaseCh chan bool + skip bool +} + +func (b *blockingLookupStore) Lookup(seq uint64) (*pb.MsgProto, error) { + if !b.skip { + b.inLookupCh <- struct{}{} + b.skip = <-b.releaseCh + } + return b.MsgStore.Lookup(seq) +} + +func TestClusteringRestoreSnapshotErrorDontSkipSeq(t *testing.T) { + restoreMsgsRcvTimeout = 500 * time.Millisecond + defer func() { restoreMsgsRcvTimeout = defaultRestoreMsgsRcvTimeout }() + + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + // Use 2 routed servers + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", true) + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + getLeader(t, 10*time.Second, s1, s2) + + sc := NewDefaultConnection(t) + defer sc.Close() + + total := 10 + for i := 0; i < total; i++ { + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + sc.Close() + + // Create a snapshot that will indicate that there is messages from 1 to 10. + if err := s1.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error on snapshot: %v", err) + } + + c := s1.channels.get("foo") + ch1 := make(chan struct{}) + ch2 := make(chan bool) + c.store.Msgs = &blockingLookupStore{MsgStore: c.store.Msgs, inLookupCh: ch1, releaseCh: ch2} + + // Configure second server. + s3sOpts := getTestDefaultOptsForClustering("c", false) + s3 := runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + // Let the server send 3 messages, then make the connection fail. + for i := 0; i < 3; i++ { + <-ch1 + ch2 <- false + } + + // Now at sequence 4, cause a failure.. + <-ch1 + + // Replace the connection used to replicate with a closed connection + // so that we get an error on publish. + closedConn, err := nats.Connect("nats://127.0.0.1:4222") + if err != nil { + t.Fatalf("Error creating conn: %v", err) + } + closedConn.Close() + s1.mu.Lock() + savedConn := s1.ncsr + s1.ncsr = closedConn + s1.mu.Unlock() + + // Release the lookup so that s1 nows tries to send the message(s) + ch2 <- false + + // Restoring the connection now + <-ch1 + s1.mu.Lock() + s1.ncsr = savedConn + s1.mu.Unlock() + // From now on, the store will not block on lookups + ch2 <- true + + waitFor(t, 5*time.Second, 15*time.Millisecond, func() error { + c := s3.channels.get("foo") + if c != nil { + first, last, err := c.store.Msgs.FirstAndLastSequence() + if err != nil { + return fmt.Errorf("Error getting first/last seq: %v", err) + } + if first == 1 && last == uint64(total) { + return nil + } + return fmt.Errorf("Channel foo is not right: first=%v last=%v", first, last) + } + return fmt.Errorf("Channel foo still not restored") + }) +} + +func TestClusteringRestoreSnapshotGapInSeq(t *testing.T) { + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + n1Opts := natsdTest.DefaultTestOptions + n1Opts.Host = "127.0.0.1" + n1Opts.Port = 4222 + n1Opts.Cluster.Host = "127.0.0.1" + n1Opts.Cluster.Port = 6222 + ns1 := natsdTest.RunServer(&n1Opts) + defer ns1.Shutdown() + + n2Opts := natsdTest.DefaultTestOptions + n2Opts.Host = "127.0.0.1" + n2Opts.Port = 4223 + n2Opts.Cluster.Host = "127.0.0.1" + n2Opts.Cluster.Port = 6223 + n2Opts.Routes = natsd.RoutesFromStr("nats://127.0.0.1:6222") + ns2 := natsdTest.RunServer(&n2Opts) + defer ns2.Shutdown() + + n3Opts := natsdTest.DefaultTestOptions + n3Opts.Host = "127.0.0.1" + n3Opts.Port = 4224 + n3Opts.Cluster.Host = "127.0.0.1" + n3Opts.Cluster.Port = 6224 + n3Opts.Routes = natsd.RoutesFromStr("nats://127.0.0.1:6222, nats://127.0.0.1:6223") + ns3 := natsdTest.RunServer(&n3Opts) + defer ns3.Shutdown() + + s1sOpts := getTestDefaultOptsForClustering("a", true) + s1sOpts.NATSServerURL = "nats://127.0.0.1:4222" + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2sOpts.NATSServerURL = "nats://127.0.0.1:4223" + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + getLeader(t, 10*time.Second, s1, s2) + + sc := NewDefaultConnection(t) + defer sc.Close() + + total := 10 + for i := 0; i < total; i++ { + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + sc.Close() + + // Create a snapshot that will indicate that there is messages from 1 to 10. + if err := s1.raft.Snapshot().Error(); err != nil { + t.Fatalf("Error on snapshot: %v", err) + } + + c := s1.channels.get("foo") + ch1 := make(chan struct{}) + ch2 := make(chan bool) + c.store.Msgs = &blockingLookupStore{MsgStore: c.store.Msgs, inLookupCh: ch1, releaseCh: ch2} + + // Configure second server. + s3sOpts := getTestDefaultOptsForClustering("c", false) + s3sOpts.NATSServerURL = "nats://127.0.0.1:4224" + s3 := runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + for i := 0; i < 2; i++ { + <-ch1 + ch2 <- false + } + + ns3.Shutdown() + + for i := 0; i < 2; i++ { + <-ch1 + ch2 <- false + } + + ns3 = natsdTest.RunServer(&n3Opts) + defer ns3.Shutdown() + + <-ch1 + // Make the store stop blocking on Lookup + ch2 <- true + + waitFor(t, 5*time.Second, 15*time.Millisecond, func() error { + c := s3.channels.get("foo") + if c != nil { + first, last, err := c.store.Msgs.FirstAndLastSequence() + if err != nil { + return fmt.Errorf("Error getting first/last seq: %v", err) + } + if first == 1 && last == uint64(total) { + return nil + } + return fmt.Errorf("Channel foo is not right: first=%v last=%v", first, last) + } + return fmt.Errorf("Channel foo still not restored") + }) +} diff --git a/server/server.go b/server/server.go index 908080ec..d17235db 100644 --- a/server/server.go +++ b/server/server.go @@ -46,7 +46,7 @@ import ( // Server defaults. const ( // VERSION is the current version for the NATS Streaming server. - VERSION = "0.16.0" + VERSION = "0.16.1" DefaultClusterID = "test-cluster" DefaultDiscoverPrefix = "_STAN.discover" @@ -535,13 +535,15 @@ func (s *StanServer) subToSnapshotRestoreRequests() error { // reply subject. The leader here can then send the first available // message when getting a request for a message of a given sequence // that is not found. - sendFirstAvail := strings.HasSuffix(m.Reply, "."+restoreMsgsV2) + v2 := strings.HasSuffix(m.Reply, "."+restoreMsgsV2) // For the newer servers, include a "reply" subject to the response // to let the follower know that this is coming from a 0.14.2+ server. var reply string - if sendFirstAvail { + var replyFirstAvail string + if v2 { reply = restoreMsgsV2 + replyFirstAvail = restoreMsgsV2 + restoreMsgsFirstAvailSuffix } cname := m.Subject[prefixLen:] @@ -554,6 +556,7 @@ func (s *StanServer) subToSnapshotRestoreRequests() error { end := util.ByteOrder.Uint64(m.Data[8:]) for seq := start; seq <= end; seq++ { + sendingTheFirstAvail := false msg, err := c.store.Msgs.Lookup(seq) if err != nil { s.log.Errorf("Snapshot restore request error for channel %q, error looking up message %v: %v", c.name, seq, err) @@ -561,12 +564,13 @@ func (s *StanServer) subToSnapshotRestoreRequests() error { } // If the requestor is a server 0.14.2+, we will send the first // available message. - if msg == nil && sendFirstAvail { + if msg == nil && v2 { msg, err = c.store.Msgs.FirstMsg() if err != nil { s.log.Errorf("Snapshot restore request error for channel %q, error looking up first message: %v", c.name, err) return } + sendingTheFirstAvail = msg != nil } if msg == nil { // We don't have this message because of channel limits. @@ -580,8 +584,18 @@ func (s *StanServer) subToSnapshotRestoreRequests() error { } buf = msgBuf[:n] } - if err := s.ncsr.PublishRequest(m.Reply, reply, buf); err != nil { + // This will be empty string for old requestors, or restoreMsgsV2 + // for servers 0.14.2+ + respReply := reply + if sendingTheFirstAvail { + // Use the reply subject that contains information that this + // is the first available message. Requestors 0.16.1+ will + // make use of that. + respReply = replyFirstAvail + } + if err := s.ncsr.PublishRequest(m.Reply, respReply, buf); err != nil { s.log.Errorf("Snapshot restore request error for channel %q, unable to send response for seq %v: %v", c.name, seq, err) + return } // If we sent a message and seq was not the expected one, // reset seq to proper value for the next iteration. diff --git a/server/snapshot.go b/server/snapshot.go index a6d65a48..ba7f73ca 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -16,6 +16,7 @@ package server import ( "fmt" "io" + "strings" "sync/atomic" "time" @@ -42,6 +43,9 @@ const ( // Used as requests inbox's suffix and reply subject of response messages // to indicate that this is from a 0.14.2+ server. restoreMsgsV2 = "sfa" + // Suffix added to reply subject when leader is sending the first available + // message + restoreMsgsFirstAvailSuffix = ".first" ) // serverSnapshot implements the raft.FSMSnapshot interface by snapshotting @@ -346,7 +350,7 @@ func (r *raftFSM) Restore(snapshot io.ReadCloser) (retErr error) { serverSnap := &spb.RaftSnapshot{} if err := serverSnap.Unmarshal(buf); err != nil { - panic(err) + return fmt.Errorf("error decoding snapshot record: %v", err) } if err := r.restoreClientsFromSnapshot(serverSnap); err != nil { return err @@ -560,13 +564,26 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64, fromAp cnfcount = 0 msg := &pb.MsgProto{} if err := msg.Unmarshal(resp.Data); err != nil { - panic(err) + return fmt.Errorf("error decoding message: %v", err) } // Server 0.14.2+ may send us the first available message if // the requested one is not available. So check the message - // sequence. (we could ensure that reply subject is - // restoreMsgsV2, but it is not required). + // sequence. if msg.Sequence != seq { + // Servers 0.16.1+ send the response with a Reply that + // indicates if this is the first available message. + // Because unless the msg.Sequence is > reqEnd, which + // is the only case where the leader could send a message + // (and only one) outside of our request range, there + // is no way to know if this is really the first available + // message or some messages were lost in transit. + if !strings.HasSuffix(resp.Reply, restoreMsgsFirstAvailSuffix) && seq <= reqEnd { + // Ensure this is really the first available, otherwise + // return an error, the restore will be retried. + if first, err := s.hasLeaderSentFirstAvail(subject, seq, msg.Sequence); !first || err != nil { + return fmt.Errorf("expected to restore message %v, got %v (err=%v)", seq, msg.Sequence, err) + } + } // Any prior messages are now invalid, so empty our store. if err := c.store.Msgs.Empty(); err != nil { return err @@ -644,6 +661,33 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64, fromAp return c.store.Msgs.Flush() } +// This function will create a new ephemeral subscription and ask for a +// single message sequence `reqSeq` and expect to get a response with +// a message sequence `gotSeq`. If that is the case, this functions +// returns true, and returns false otherwise. +func (s *StanServer) hasLeaderSentFirstAvail(subject string, reqSeq, gotSeq uint64) (bool, error) { + inbox := nats.NewInbox() + "." + restoreMsgsV2 + sub, err := s.ncsr.SubscribeSync(inbox) + if err != nil { + return false, err + } + defer sub.Unsubscribe() + rawMsg, err := s.restoreMsgsFetchOne(subject, inbox, sub, reqSeq) + if err != nil { + return false, err + } + if len(rawMsg.Data) > 0 { + msg := &pb.MsgProto{} + if err := msg.Unmarshal(rawMsg.Data); err != nil { + return false, err + } + if msg.Sequence == gotSeq { + return true, nil + } + } + return false, nil +} + // Sends individual requests to leader server (pre 0.14.1) in order to // find the sequence of the first available message in the range. func (s *StanServer) restoreMsgsFindFirstAvailSeqFromOldLeader(