Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Clustering: Snapshot skip sequence on error #923

Merged
merged 1 commit into from Aug 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
222 changes: 221 additions & 1 deletion server/clustering_test.go
Expand Up @@ -6152,7 +6152,7 @@ func TestClusteringRestoreSnapshotWithSomeMsgsNoLongerAvailFromNewServer(t *test
msgsCount := int32(0)
notFound := int32(0)
if _, err := nc.Subscribe(nats.InboxPrefix+">", func(m *nats.Msg) {
if m.Reply != restoreMsgsV2 {
if !strings.HasPrefix(m.Reply, restoreMsgsV2) {
return
}
if len(m.Data) > 0 {
Expand Down Expand Up @@ -6618,3 +6618,223 @@ func TestClusteringRaftLogging(t *testing.T) {
t.Fatalf("Wrong tracing for raft log levels: %v", wrongLevels)
}
}

type blockingLookupStore struct {
stores.MsgStore
inLookupCh chan struct{}
releaseCh chan bool
skip bool
}

func (b *blockingLookupStore) Lookup(seq uint64) (*pb.MsgProto, error) {
if !b.skip {
b.inLookupCh <- struct{}{}
b.skip = <-b.releaseCh
}
return b.MsgStore.Lookup(seq)
}

func TestClusteringRestoreSnapshotErrorDontSkipSeq(t *testing.T) {
restoreMsgsRcvTimeout = 500 * time.Millisecond
defer func() { restoreMsgsRcvTimeout = defaultRestoreMsgsRcvTimeout }()

cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// Use 2 routed servers
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

sc := NewDefaultConnection(t)
defer sc.Close()

total := 10
for i := 0; i < total; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
sc.Close()

// Create a snapshot that will indicate that there is messages from 1 to 10.
if err := s1.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error on snapshot: %v", err)
}

c := s1.channels.get("foo")
ch1 := make(chan struct{})
ch2 := make(chan bool)
c.store.Msgs = &blockingLookupStore{MsgStore: c.store.Msgs, inLookupCh: ch1, releaseCh: ch2}

// Configure second server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

// Let the server send 3 messages, then make the connection fail.
for i := 0; i < 3; i++ {
<-ch1
ch2 <- false
}

// Now at sequence 4, cause a failure..
<-ch1

// Replace the connection used to replicate with a closed connection
// so that we get an error on publish.
closedConn, err := nats.Connect("nats://127.0.0.1:4222")
if err != nil {
t.Fatalf("Error creating conn: %v", err)
}
closedConn.Close()
s1.mu.Lock()
savedConn := s1.ncsr
s1.ncsr = closedConn
s1.mu.Unlock()

// Release the lookup so that s1 nows tries to send the message(s)
ch2 <- false

// Restoring the connection now
<-ch1
s1.mu.Lock()
s1.ncsr = savedConn
s1.mu.Unlock()
// From now on, the store will not block on lookups
ch2 <- true

waitFor(t, 5*time.Second, 15*time.Millisecond, func() error {
c := s3.channels.get("foo")
if c != nil {
first, last, err := c.store.Msgs.FirstAndLastSequence()
if err != nil {
return fmt.Errorf("Error getting first/last seq: %v", err)
}
if first == 1 && last == uint64(total) {
return nil
}
return fmt.Errorf("Channel foo is not right: first=%v last=%v", first, last)
}
return fmt.Errorf("Channel foo still not restored")
})
}

func TestClusteringRestoreSnapshotGapInSeq(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

n1Opts := natsdTest.DefaultTestOptions
n1Opts.Host = "127.0.0.1"
n1Opts.Port = 4222
n1Opts.Cluster.Host = "127.0.0.1"
n1Opts.Cluster.Port = 6222
ns1 := natsdTest.RunServer(&n1Opts)
defer ns1.Shutdown()

n2Opts := natsdTest.DefaultTestOptions
n2Opts.Host = "127.0.0.1"
n2Opts.Port = 4223
n2Opts.Cluster.Host = "127.0.0.1"
n2Opts.Cluster.Port = 6223
n2Opts.Routes = natsd.RoutesFromStr("nats://127.0.0.1:6222")
ns2 := natsdTest.RunServer(&n2Opts)
defer ns2.Shutdown()

n3Opts := natsdTest.DefaultTestOptions
n3Opts.Host = "127.0.0.1"
n3Opts.Port = 4224
n3Opts.Cluster.Host = "127.0.0.1"
n3Opts.Cluster.Port = 6224
n3Opts.Routes = natsd.RoutesFromStr("nats://127.0.0.1:6222, nats://127.0.0.1:6223")
ns3 := natsdTest.RunServer(&n3Opts)
defer ns3.Shutdown()

s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.NATSServerURL = "nats://127.0.0.1:4222"
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.NATSServerURL = "nats://127.0.0.1:4223"
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

sc := NewDefaultConnection(t)
defer sc.Close()

total := 10
for i := 0; i < total; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
sc.Close()

// Create a snapshot that will indicate that there is messages from 1 to 10.
if err := s1.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error on snapshot: %v", err)
}

c := s1.channels.get("foo")
ch1 := make(chan struct{})
ch2 := make(chan bool)
c.store.Msgs = &blockingLookupStore{MsgStore: c.store.Msgs, inLookupCh: ch1, releaseCh: ch2}

// Configure second server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.NATSServerURL = "nats://127.0.0.1:4224"
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

for i := 0; i < 2; i++ {
<-ch1
ch2 <- false
}

ns3.Shutdown()

for i := 0; i < 2; i++ {
<-ch1
ch2 <- false
}

ns3 = natsdTest.RunServer(&n3Opts)
defer ns3.Shutdown()

<-ch1
// Make the store stop blocking on Lookup
ch2 <- true

waitFor(t, 5*time.Second, 15*time.Millisecond, func() error {
c := s3.channels.get("foo")
if c != nil {
first, last, err := c.store.Msgs.FirstAndLastSequence()
if err != nil {
return fmt.Errorf("Error getting first/last seq: %v", err)
}
if first == 1 && last == uint64(total) {
return nil
}
return fmt.Errorf("Channel foo is not right: first=%v last=%v", first, last)
}
return fmt.Errorf("Channel foo still not restored")
})
}
24 changes: 19 additions & 5 deletions server/server.go
Expand Up @@ -46,7 +46,7 @@ import (
// Server defaults.
const (
// VERSION is the current version for the NATS Streaming server.
VERSION = "0.16.0"
VERSION = "0.16.1"

DefaultClusterID = "test-cluster"
DefaultDiscoverPrefix = "_STAN.discover"
Expand Down Expand Up @@ -535,13 +535,15 @@ func (s *StanServer) subToSnapshotRestoreRequests() error {
// reply subject. The leader here can then send the first available
// message when getting a request for a message of a given sequence
// that is not found.
sendFirstAvail := strings.HasSuffix(m.Reply, "."+restoreMsgsV2)
v2 := strings.HasSuffix(m.Reply, "."+restoreMsgsV2)

// For the newer servers, include a "reply" subject to the response
// to let the follower know that this is coming from a 0.14.2+ server.
var reply string
if sendFirstAvail {
var replyFirstAvail string
if v2 {
reply = restoreMsgsV2
replyFirstAvail = restoreMsgsV2 + restoreMsgsFirstAvailSuffix
}

cname := m.Subject[prefixLen:]
Expand All @@ -554,19 +556,21 @@ func (s *StanServer) subToSnapshotRestoreRequests() error {
end := util.ByteOrder.Uint64(m.Data[8:])

for seq := start; seq <= end; seq++ {
sendingTheFirstAvail := false
msg, err := c.store.Msgs.Lookup(seq)
if err != nil {
s.log.Errorf("Snapshot restore request error for channel %q, error looking up message %v: %v", c.name, seq, err)
return
}
// If the requestor is a server 0.14.2+, we will send the first
// available message.
if msg == nil && sendFirstAvail {
if msg == nil && v2 {
msg, err = c.store.Msgs.FirstMsg()
if err != nil {
s.log.Errorf("Snapshot restore request error for channel %q, error looking up first message: %v", c.name, err)
return
}
sendingTheFirstAvail = msg != nil
}
if msg == nil {
// We don't have this message because of channel limits.
Expand All @@ -580,8 +584,18 @@ func (s *StanServer) subToSnapshotRestoreRequests() error {
}
buf = msgBuf[:n]
}
if err := s.ncsr.PublishRequest(m.Reply, reply, buf); err != nil {
// This will be empty string for old requestors, or restoreMsgsV2
// for servers 0.14.2+
respReply := reply
if sendingTheFirstAvail {
// Use the reply subject that contains information that this
// is the first available message. Requestors 0.16.1+ will
// make use of that.
respReply = replyFirstAvail
}
if err := s.ncsr.PublishRequest(m.Reply, respReply, buf); err != nil {
s.log.Errorf("Snapshot restore request error for channel %q, unable to send response for seq %v: %v", c.name, seq, err)
return
}
// If we sent a message and seq was not the expected one,
// reset seq to proper value for the next iteration.
Expand Down
52 changes: 48 additions & 4 deletions server/snapshot.go
Expand Up @@ -16,6 +16,7 @@ package server
import (
"fmt"
"io"
"strings"
"sync/atomic"
"time"

Expand All @@ -42,6 +43,9 @@ const (
// Used as requests inbox's suffix and reply subject of response messages
// to indicate that this is from a 0.14.2+ server.
restoreMsgsV2 = "sfa"
// Suffix added to reply subject when leader is sending the first available
// message
restoreMsgsFirstAvailSuffix = ".first"
)

// serverSnapshot implements the raft.FSMSnapshot interface by snapshotting
Expand Down Expand Up @@ -346,7 +350,7 @@ func (r *raftFSM) Restore(snapshot io.ReadCloser) (retErr error) {

serverSnap := &spb.RaftSnapshot{}
if err := serverSnap.Unmarshal(buf); err != nil {
panic(err)
return fmt.Errorf("error decoding snapshot record: %v", err)
}
if err := r.restoreClientsFromSnapshot(serverSnap); err != nil {
return err
Expand Down Expand Up @@ -560,13 +564,26 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64, fromAp
cnfcount = 0
msg := &pb.MsgProto{}
if err := msg.Unmarshal(resp.Data); err != nil {
panic(err)
return fmt.Errorf("error decoding message: %v", err)
}
// Server 0.14.2+ may send us the first available message if
// the requested one is not available. So check the message
// sequence. (we could ensure that reply subject is
// restoreMsgsV2, but it is not required).
// sequence.
if msg.Sequence != seq {
// Servers 0.16.1+ send the response with a Reply that
// indicates if this is the first available message.
// Because unless the msg.Sequence is > reqEnd, which
// is the only case where the leader could send a message
// (and only one) outside of our request range, there
// is no way to know if this is really the first available
// message or some messages were lost in transit.
if !strings.HasSuffix(resp.Reply, restoreMsgsFirstAvailSuffix) && seq <= reqEnd {
// Ensure this is really the first available, otherwise
// return an error, the restore will be retried.
if first, err := s.hasLeaderSentFirstAvail(subject, seq, msg.Sequence); !first || err != nil {
return fmt.Errorf("expected to restore message %v, got %v (err=%v)", seq, msg.Sequence, err)
}
}
// Any prior messages are now invalid, so empty our store.
if err := c.store.Msgs.Empty(); err != nil {
return err
Expand Down Expand Up @@ -644,6 +661,33 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64, fromAp
return c.store.Msgs.Flush()
}

// This function will create a new ephemeral subscription and ask for a
// single message sequence `reqSeq` and expect to get a response with
// a message sequence `gotSeq`. If that is the case, this functions
// returns true, and returns false otherwise.
func (s *StanServer) hasLeaderSentFirstAvail(subject string, reqSeq, gotSeq uint64) (bool, error) {
inbox := nats.NewInbox() + "." + restoreMsgsV2
sub, err := s.ncsr.SubscribeSync(inbox)
if err != nil {
return false, err
}
defer sub.Unsubscribe()
rawMsg, err := s.restoreMsgsFetchOne(subject, inbox, sub, reqSeq)
if err != nil {
return false, err
}
if len(rawMsg.Data) > 0 {
msg := &pb.MsgProto{}
if err := msg.Unmarshal(rawMsg.Data); err != nil {
return false, err
}
if msg.Sequence == gotSeq {
return true, nil
}
}
return false, nil
}

// Sends individual requests to leader server (pre 0.14.1) in order to
// find the sequence of the first available message in the range.
func (s *StanServer) restoreMsgsFindFirstAvailSeqFromOldLeader(
Expand Down