Skip to content

Commit

Permalink
Merge 6689b0b into f03fa10
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed May 14, 2019
2 parents f03fa10 + 6689b0b commit 87380f7
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 20 deletions.
104 changes: 98 additions & 6 deletions server/clustering_test.go
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/nats-io/go-nats-streaming"
"github.com/nats-io/go-nats-streaming/pb"
"github.com/nats-io/nats-streaming-server/stores"
"github.com/nats-io/nats-streaming-server/test"
)

var defaultRaftLog string
Expand All @@ -59,6 +60,20 @@ func cleanupRaftLog(t *testing.T) {
}
}

func shutdownAndCleanupState(t *testing.T, s *StanServer, nodeID string) {
t.Helper()
s.Shutdown()
switch persistentStoreType {
case stores.TypeFile:
os.RemoveAll(filepath.Join(defaultDataStore, nodeID))
os.RemoveAll(filepath.Join(defaultRaftLog, nodeID))
case stores.TypeSQL:
test.CleanupSQLDatastore(t, testSQLDriver, testSQLSource+nodeID)
default:
t.Fatalf("This test needs to be updated for store type: %v", persistentStoreType)
}
}

func getTestDefaultOptsForClustering(id string, bootstrap bool) *Options {
opts := getTestDefaultOptsForPersistentStore()
if persistentStoreType == stores.TypeFile {
Expand Down Expand Up @@ -4707,15 +4722,10 @@ func TestClusteringSubDontStallDueToMsgExpiration(t *testing.T) {
}
checkReceived()

s2.Shutdown()
// We aremove all state from node "b", but even if we didn't, on restart,
// since "b" store would be behind the rest, it would be emptied because
// the current first message is move than the "b"'s last sequence.
// For DB, let it recover its state...
if persistentStoreType == stores.TypeFile {
os.RemoveAll(filepath.Join(defaultDataStore, "b"))
os.RemoveAll(filepath.Join(defaultRaftLog, "b"))
}
shutdownAndCleanupState(t, s2, "b")

for i := 0; i < secondBatch; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
Expand Down Expand Up @@ -5711,3 +5721,85 @@ func TestClusteringNumSubs(t *testing.T) {
sc.Close()
leader.Shutdown()
}

func TestClusteringRestoreSnapshotWithSomeMsgsNoLongerAvail(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.MaxMsgs = 10
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.MaxMsgs = 10
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.MaxMsgs = 10
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

getLeader(t, 10*time.Second, s1, s2, s3)

sc := NewDefaultConnection(t)
defer sc.Close()

for i := 0; i < 10; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}

// Create a snapshot that will indicate that there is messages from 1 to 10.
if err := s1.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error on snapshot: %v", err)
}

// Shutdown s3 and cleanup its state so it will have nothing in its stores
// and will restore from the snapshot.
shutdownAndCleanupState(t, s3, "c")

// Send 2 more messages that will make messages 1 and 2 disappear
for i := 0; i < 2; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
sc.Close()

// Start s3. It will restore from the snapshot that says that
// channel has message 1 to 10, and then should receive 2 raft
// logs with messages 11 and 12.
// When s3 will ask the leader for messages 1 and 2, it should
// get empty responses indicating that these messages are gone,
// but should be able to request messages 3 to 10, then 11 and
// 12 will be replayed from raft logs.
s3 = runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

waitFor(t, 5*time.Second, 15*time.Millisecond, func() error {
c := s3.channels.get("foo")
if c == nil {
return fmt.Errorf("Channel foo not recreated yet")
}
first, last, err := c.store.Msgs.FirstAndLastSequence()
if err != nil {
return fmt.Errorf("Error getting first/last seq: %v", err)
}
if first != 3 || last != 12 {
return fmt.Errorf("Expected first=3 last=12, got %v and %v", first, last)
}
return nil
})
}
3 changes: 0 additions & 3 deletions server/server.go
Expand Up @@ -545,9 +545,6 @@ func (s *StanServer) subToSnapshotRestoreRequests() error {
if err := s.ncsr.Publish(m.Reply, buf); err != nil {
s.log.Errorf("Snapshot restore request error for channel %q, unable to send response for seq %v: %v", c.name, seq, err)
}
if buf == nil {
return
}
select {
case <-s.shutdownCh:
return
Expand Down
21 changes: 10 additions & 11 deletions server/snapshot.go
Expand Up @@ -445,17 +445,16 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64) error
return err
}
// It is possible that the leader does not have this message because of
// channel limits. If resp.Data is empty, we are in this situation and
// we are done recovering snapshot.
if len(resp.Data) == 0 {
break
}
msg := &pb.MsgProto{}
if err := msg.Unmarshal(resp.Data); err != nil {
panic(err)
}
if _, err := c.store.Msgs.Store(msg); err != nil {
return err
// channel limits. If resp.Data is empty, we are in this situation.
// We need to continue to see if more recent messages are available though.
if len(resp.Data) != 0 {
msg := &pb.MsgProto{}
if err := msg.Unmarshal(resp.Data); err != nil {
panic(err)
}
if _, err := c.store.Msgs.Store(msg); err != nil {
return err
}
}
select {
case <-r.server.shutdownCh:
Expand Down

0 comments on commit 87380f7

Please sign in to comment.