Skip to content

Commit

Permalink
Merge pull request #815 from nats-io/cluster_fix_issue_after_snapshot
Browse files Browse the repository at this point in the history
[FIXED] Clustering: possibly getting empty messages
  • Loading branch information
kozlovic committed Apr 30, 2019
2 parents 25d6ada + 72b0ebb commit 57c6c84
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 4 deletions.
13 changes: 9 additions & 4 deletions server/clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,9 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} {
case spb.RaftOperation_Publish:
// Message replication.
var (
c *channel
err error
c *channel
err error
lastSeq uint64
)
for _, msg := range op.PublishBatch.Messages {
// This is a batch for a given channel, so lookup channel once.
Expand All @@ -477,13 +478,17 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} {
if err == ErrChanDelInProgress {
return nil
}
lastSeq, err = c.store.Msgs.LastSequence()
}
if err == nil && lastSeq < msg.Sequence-1 {
err = s.raft.fsm.restoreMsgsFromSnapshot(c, lastSeq+1, msg.Sequence-1)
}
if err == nil {
_, err = c.store.Msgs.Store(msg)
}
if err != nil {
panic(fmt.Errorf("failed to store replicated message %d on channel %s: %v",
msg.Sequence, msg.Subject, err))
return fmt.Errorf("failed to store replicated message %d on channel %s: %v",
msg.Sequence, msg.Subject, err)
}
}
return nil
Expand Down
112 changes: 112 additions & 0 deletions server/clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5210,3 +5210,115 @@ func TestClusteringReplSubSentAckWhileClosing(t *testing.T) {
waitForNumClients(t, s1, 0)
s1.Shutdown()
}

type msgStoreDoesntFlush struct {
stores.MsgStore
}

func (s *msgStoreDoesntFlush) Store(m *pb.MsgProto) (uint64, error) {
// To simulate a no flush, we are actually skipping storing
// the message.
return m.Sequence, nil
}

func TestClusteringGapsAfterSnapshotAndNoFlush(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.FileStoreOpts.BufferSize = 1024 * 1024
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

getLeader(t, 10*time.Second, s1, s2, s3)

sc := NewDefaultConnection(t)
defer sc.Close()

if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}

verifyChannelExist(t, s2, "foo", true, 2*time.Second)

// Flush the first message
c := s2.channels.get("foo")
c.store.Msgs.Flush()
// Replace with a store that does not write messages
c.store.Msgs = &msgStoreDoesntFlush{c.store.Msgs}

for i := 0; i < 100; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}

if err := s2.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error on snapshot: %v", err)
}
s2.Shutdown()

for i := 0; i < 10; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}

s2 = runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

waitForCount(t, 111, func() (string, int) {
var last uint64
c := s2.channels.get("foo")
if c != nil {
last, _ = c.store.Msgs.LastSequence()
}
return "last sequence for channel foo", int(last)
})

sc.Close()
s1.Shutdown()
s3.Shutdown()

s2.Shutdown()
s2sOpts.Clustering.Clustered = false
s2 = runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

sc = NewDefaultConnection(t)
defer sc.Close()

ch := make(chan *stan.Msg, 110)
if _, err := sc.Subscribe("foo", func(msg *stan.Msg) {
ch <- msg
}, stan.DeliverAllAvailable()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
for i := 0; i < 110; i++ {
select {
case m := <-ch:
if len(m.Data) == 0 {
t.Fatalf("Received empty message: %+v", m)
}
case <-time.After(2 * time.Second):
t.Fatalf("Did not get all messages")
}
}
}
7 changes: 7 additions & 0 deletions server/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ func (s *serverSnapshot) snapshotChannels(snap *spb.RaftSnapshot) error {
snap.Channels = make([]*spb.ChannelSnapshot, numChannels)
numChannel := 0
for _, c := range s.channels.channels {
// Flush msg and sub stores before persisting snapshot
if err := c.store.Subs.Flush(); err != nil {
return err
}
if err := c.store.Msgs.Flush(); err != nil {
return err
}
first, last, err := c.store.Msgs.FirstAndLastSequence()
if err != nil {
return err
Expand Down

0 comments on commit 57c6c84

Please sign in to comment.