Skip to content

Commit

Permalink
[FIXED] Clustering: Snapshot skip sequence on error
Browse files Browse the repository at this point in the history
If the leader had an issue sending a requested message, it would
simply log the error but continue with the next. The consequence
would be that the follower would get a "gap" and empty the store
up to that sequence assuming this was the new first available
message.
The missing `return` statement has been added, but there would
still be a situation where the follower could wrongly assume that
the mismatch in sequence between the request and the response
meant that this was the new first available message in the leader.
The leader is now sending - through the reply subject - information
that indicates that this is the first message. If not, the follower
has to verify that when getting a mismatch sequence, if this is
because messages were lost or because this is indeed the first
available. It is possible through a request of a single message
as opposed to a batch.

Also replaced some panic() calls due to Unmarshal() where a returned
error is more appropriate.

Resolves #921

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Aug 28, 2019
1 parent d9c28b6 commit 5c6d507
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 10 deletions.
222 changes: 221 additions & 1 deletion server/clustering_test.go
Expand Up @@ -6152,7 +6152,7 @@ func TestClusteringRestoreSnapshotWithSomeMsgsNoLongerAvailFromNewServer(t *test
msgsCount := int32(0)
notFound := int32(0)
if _, err := nc.Subscribe(nats.InboxPrefix+">", func(m *nats.Msg) {
if m.Reply != restoreMsgsV2 {
if !strings.HasPrefix(m.Reply, restoreMsgsV2) {
return
}
if len(m.Data) > 0 {
Expand Down Expand Up @@ -6618,3 +6618,223 @@ func TestClusteringRaftLogging(t *testing.T) {
t.Fatalf("Wrong tracing for raft log levels: %v", wrongLevels)
}
}

type blockingLookupStore struct {
stores.MsgStore
inLookupCh chan struct{}
releaseCh chan bool
skip bool
}

func (b *blockingLookupStore) Lookup(seq uint64) (*pb.MsgProto, error) {
if !b.skip {
b.inLookupCh <- struct{}{}
b.skip = <-b.releaseCh
}
return b.MsgStore.Lookup(seq)
}

func TestClusteringRestoreSnapshotErrorDontSkipSeq(t *testing.T) {
restoreMsgsRcvTimeout = 500 * time.Millisecond
defer func() { restoreMsgsRcvTimeout = defaultRestoreMsgsRcvTimeout }()

cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// Use 2 routed servers
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

sc := NewDefaultConnection(t)
defer sc.Close()

total := 10
for i := 0; i < total; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
sc.Close()

// Create a snapshot that will indicate that there is messages from 1 to 10.
if err := s1.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error on snapshot: %v", err)
}

c := s1.channels.get("foo")
ch1 := make(chan struct{})
ch2 := make(chan bool)
c.store.Msgs = &blockingLookupStore{MsgStore: c.store.Msgs, inLookupCh: ch1, releaseCh: ch2}

// Configure second server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

// Let the server send 3 messages, then make the connection fail.
for i := 0; i < 3; i++ {
<-ch1
ch2 <- false
}

// Now at sequence 4, cause a failure..
<-ch1

// Replace the connection used to replicate with a closed connection
// so that we get an error on publish.
closedConn, err := nats.Connect("nats://127.0.0.1:4222")
if err != nil {
t.Fatalf("Error creating conn: %v", err)
}
closedConn.Close()
s1.mu.Lock()
savedConn := s1.ncsr
s1.ncsr = closedConn
s1.mu.Unlock()

// Release the lookup so that s1 nows tries to send the message(s)
ch2 <- false

// Restoring the connection now
<-ch1
s1.mu.Lock()
s1.ncsr = savedConn
s1.mu.Unlock()
// From now on, the store will not block on lookups
ch2 <- true

waitFor(t, 5*time.Second, 15*time.Millisecond, func() error {
c := s3.channels.get("foo")
if c != nil {
first, last, err := c.store.Msgs.FirstAndLastSequence()
if err != nil {
return fmt.Errorf("Error getting first/last seq: %v", err)
}
if first == 1 && last == uint64(total) {
return nil
}
return fmt.Errorf("Channel foo is not right: first=%v last=%v", first, last)
}
return fmt.Errorf("Channel foo still not restored")
})
}

func TestClusteringRestoreSnapshotGapInSeq(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

n1Opts := natsdTest.DefaultTestOptions
n1Opts.Host = "127.0.0.1"
n1Opts.Port = 4222
n1Opts.Cluster.Host = "127.0.0.1"
n1Opts.Cluster.Port = 6222
ns1 := natsdTest.RunServer(&n1Opts)
defer ns1.Shutdown()

n2Opts := natsdTest.DefaultTestOptions
n2Opts.Host = "127.0.0.1"
n2Opts.Port = 4223
n2Opts.Cluster.Host = "127.0.0.1"
n2Opts.Cluster.Port = 6223
n2Opts.Routes = natsd.RoutesFromStr("nats://127.0.0.1:6222")
ns2 := natsdTest.RunServer(&n2Opts)
defer ns2.Shutdown()

n3Opts := natsdTest.DefaultTestOptions
n3Opts.Host = "127.0.0.1"
n3Opts.Port = 4224
n3Opts.Cluster.Host = "127.0.0.1"
n3Opts.Cluster.Port = 6224
n3Opts.Routes = natsd.RoutesFromStr("nats://127.0.0.1:6222, nats://127.0.0.1:6223")
ns3 := natsdTest.RunServer(&n3Opts)
defer ns3.Shutdown()

s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.NATSServerURL = "nats://127.0.0.1:4222"
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.NATSServerURL = "nats://127.0.0.1:4223"
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

sc := NewDefaultConnection(t)
defer sc.Close()

total := 10
for i := 0; i < total; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
sc.Close()

// Create a snapshot that will indicate that there is messages from 1 to 10.
if err := s1.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error on snapshot: %v", err)
}

c := s1.channels.get("foo")
ch1 := make(chan struct{})
ch2 := make(chan bool)
c.store.Msgs = &blockingLookupStore{MsgStore: c.store.Msgs, inLookupCh: ch1, releaseCh: ch2}

// Configure second server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.NATSServerURL = "nats://127.0.0.1:4224"
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

for i := 0; i < 2; i++ {
<-ch1
ch2 <- false
}

ns3.Shutdown()

for i := 0; i < 2; i++ {
<-ch1
ch2 <- false
}

ns3 = natsdTest.RunServer(&n3Opts)
defer ns3.Shutdown()

<-ch1
// Make the store stop blocking on Lookup
ch2 <- true

waitFor(t, 5*time.Second, 15*time.Millisecond, func() error {
c := s3.channels.get("foo")
if c != nil {
first, last, err := c.store.Msgs.FirstAndLastSequence()
if err != nil {
return fmt.Errorf("Error getting first/last seq: %v", err)
}
if first == 1 && last == uint64(total) {
return nil
}
return fmt.Errorf("Channel foo is not right: first=%v last=%v", first, last)
}
return fmt.Errorf("Channel foo still not restored")
})
}
24 changes: 19 additions & 5 deletions server/server.go
Expand Up @@ -46,7 +46,7 @@ import (
// Server defaults.
const (
// VERSION is the current version for the NATS Streaming server.
VERSION = "0.16.0"
VERSION = "0.16.1"

DefaultClusterID = "test-cluster"
DefaultDiscoverPrefix = "_STAN.discover"
Expand Down Expand Up @@ -535,13 +535,15 @@ func (s *StanServer) subToSnapshotRestoreRequests() error {
// reply subject. The leader here can then send the first available
// message when getting a request for a message of a given sequence
// that is not found.
sendFirstAvail := strings.HasSuffix(m.Reply, "."+restoreMsgsV2)
v2 := strings.HasSuffix(m.Reply, "."+restoreMsgsV2)

// For the newer servers, include a "reply" subject to the response
// to let the follower know that this is coming from a 0.14.2+ server.
var reply string
if sendFirstAvail {
var replyFirstAvail string
if v2 {
reply = restoreMsgsV2
replyFirstAvail = restoreMsgsV2 + restoreMsgsFirstAvailSuffix
}

cname := m.Subject[prefixLen:]
Expand All @@ -554,19 +556,21 @@ func (s *StanServer) subToSnapshotRestoreRequests() error {
end := util.ByteOrder.Uint64(m.Data[8:])

for seq := start; seq <= end; seq++ {
sendingTheFirstAvail := false
msg, err := c.store.Msgs.Lookup(seq)
if err != nil {
s.log.Errorf("Snapshot restore request error for channel %q, error looking up message %v: %v", c.name, seq, err)
return
}
// If the requestor is a server 0.14.2+, we will send the first
// available message.
if msg == nil && sendFirstAvail {
if msg == nil && v2 {
msg, err = c.store.Msgs.FirstMsg()
if err != nil {
s.log.Errorf("Snapshot restore request error for channel %q, error looking up first message: %v", c.name, err)
return
}
sendingTheFirstAvail = msg != nil
}
if msg == nil {
// We don't have this message because of channel limits.
Expand All @@ -580,8 +584,18 @@ func (s *StanServer) subToSnapshotRestoreRequests() error {
}
buf = msgBuf[:n]
}
if err := s.ncsr.PublishRequest(m.Reply, reply, buf); err != nil {
// This will be empty string for old requestors, or restoreMsgsV2
// for servers 0.14.2+
respReply := reply
if sendingTheFirstAvail {
// Use the reply subject that contains information that this
// is the first available message. Requestors 0.16.1+ will
// make use of that.
respReply = replyFirstAvail
}
if err := s.ncsr.PublishRequest(m.Reply, respReply, buf); err != nil {
s.log.Errorf("Snapshot restore request error for channel %q, unable to send response for seq %v: %v", c.name, seq, err)
return
}
// If we sent a message and seq was not the expected one,
// reset seq to proper value for the next iteration.
Expand Down
52 changes: 48 additions & 4 deletions server/snapshot.go
Expand Up @@ -16,6 +16,7 @@ package server
import (
"fmt"
"io"
"strings"
"sync/atomic"
"time"

Expand All @@ -42,6 +43,9 @@ const (
// Used as requests inbox's suffix and reply subject of response messages
// to indicate that this is from a 0.14.2+ server.
restoreMsgsV2 = "sfa"
// Suffix added to reply subject when leader is sending the first available
// message
restoreMsgsFirstAvailSuffix = ".first"
)

// serverSnapshot implements the raft.FSMSnapshot interface by snapshotting
Expand Down Expand Up @@ -346,7 +350,7 @@ func (r *raftFSM) Restore(snapshot io.ReadCloser) (retErr error) {

serverSnap := &spb.RaftSnapshot{}
if err := serverSnap.Unmarshal(buf); err != nil {
panic(err)
return fmt.Errorf("error decoding snapshot record: %v", err)
}
if err := r.restoreClientsFromSnapshot(serverSnap); err != nil {
return err
Expand Down Expand Up @@ -560,13 +564,26 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64, fromAp
cnfcount = 0
msg := &pb.MsgProto{}
if err := msg.Unmarshal(resp.Data); err != nil {
panic(err)
return fmt.Errorf("error decoding message: %v", err)
}
// Server 0.14.2+ may send us the first available message if
// the requested one is not available. So check the message
// sequence. (we could ensure that reply subject is
// restoreMsgsV2, but it is not required).
// sequence.
if msg.Sequence != seq {
// Servers 0.16.1+ send the response with a Reply that
// indicates if this is the first available message.
// Because unless the msg.Sequence is > reqEnd, which
// is the only case where the leader could send a message
// (and only one) outside of our request range, there
// is no way to know if this is really the first available
// message or some messages were lost in transit.
if !strings.HasSuffix(resp.Reply, restoreMsgsFirstAvailSuffix) && seq <= reqEnd {
// Ensure this is really the first available, otherwise
// return an error, the restore will be retried.
if first, err := s.hasLeaderSentFirstAvail(subject, seq, msg.Sequence); !first || err != nil {
return fmt.Errorf("expected to restore message %v, got %v (err=%v)", seq, msg.Sequence, err)
}
}
// Any prior messages are now invalid, so empty our store.
if err := c.store.Msgs.Empty(); err != nil {
return err
Expand Down Expand Up @@ -644,6 +661,33 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64, fromAp
return c.store.Msgs.Flush()
}

// This function will create a new ephemeral subscription and ask for a
// single message sequence `reqSeq` and expect to get a response with
// a message sequence `gotSeq`. If that is the case, this functions
// returns true, and returns false otherwise.
func (s *StanServer) hasLeaderSentFirstAvail(subject string, reqSeq, gotSeq uint64) (bool, error) {
inbox := nats.NewInbox() + "." + restoreMsgsV2
sub, err := s.ncsr.SubscribeSync(inbox)
if err != nil {
return false, err
}
defer sub.Unsubscribe()
rawMsg, err := s.restoreMsgsFetchOne(subject, inbox, sub, reqSeq)
if err != nil {
return false, err
}
if len(rawMsg.Data) > 0 {
msg := &pb.MsgProto{}
if err := msg.Unmarshal(rawMsg.Data); err != nil {
return false, err
}
if msg.Sequence == gotSeq {
return true, nil
}
}
return false, nil
}

// Sends individual requests to leader server (pre 0.14.1) in order to
// find the sequence of the first available message in the range.
func (s *StanServer) restoreMsgsFindFirstAvailSeqFromOldLeader(
Expand Down

0 comments on commit 5c6d507

Please sign in to comment.