Skip to content

Commit

Permalink
Merge pull request #498 from nats-io/no_raft_but_streaming_stores
Browse files Browse the repository at this point in the history
[FIXED] Clustering: should not start if streaming but no raft state
  • Loading branch information
kozlovic committed Mar 8, 2018
2 parents 68ec3c5 + f1d080c commit 08c1b25
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 8 deletions.
5 changes: 4 additions & 1 deletion server/clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (r *raftNode) shutdown() error {
}

// createRaftNode creates and starts a new Raft node.
func (s *StanServer) createServerRaftNode() error {
func (s *StanServer) createServerRaftNode(hasStreamingState bool) error {
var (
name = s.info.ClusterID
addr = s.getClusteringAddr(name)
Expand All @@ -109,6 +109,9 @@ func (s *StanServer) createServerRaftNode() error {
if err != nil {
return err
}
if !existingState && hasStreamingState {
return fmt.Errorf("streaming state was recovered but cluster log path %q is empty", s.opts.Clustering.RaftLogPath)
}
node := s.raft

// Bootstrap if there is no previous state and we are starting this node as
Expand Down
39 changes: 37 additions & 2 deletions server/clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,8 +878,6 @@ func TestClusteringDontRecoverFSClientsAndSubs(t *testing.T) {
s1.Shutdown()
s2.Shutdown()

cleanupRaftLog(t)

s1 = runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

Expand Down Expand Up @@ -3210,3 +3208,40 @@ func TestClusteringCrashOnRestart(t *testing.T) {
s = runServerWithOpts(t, opts, nil)
defer s.Shutdown()
}

func TestClusteringNoRaftStateButStreamingState(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

opts := getTestDefaultOptsForClustering("a", true)
s := runServerWithOpts(t, opts, nil)
defer s.Shutdown()

getLeader(t, 10*time.Second, s)

sc := NewDefaultConnection(t)
defer sc.Close()
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
sc.Close()

s.Shutdown()
cleanupRaftLog(t)
s, err := RunServerWithOpts(opts, nil)
if err == nil {
s.Shutdown()
t.Fatal("Expected error, got none")
}
// Start again and still should fail
s, err = RunServerWithOpts(opts, nil)
if err == nil {
s.Shutdown()
t.Fatal("Expected error, got none")
}
}
14 changes: 9 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1819,8 +1819,9 @@ func (s *StanServer) start(runningState State) error {
if s.opts.Clustering.RaftLogPath == "" {
s.opts.Clustering.RaftLogPath = filepath.Join(s.opts.ID, s.opts.Clustering.NodeID)
}
s.log.Noticef("Cluster Node ID: %s", s.info.NodeID)
if err := s.startRaftNode(); err != nil {
s.log.Noticef("Cluster Node ID : %s", s.info.NodeID)
s.log.Noticef("Cluster Log Path: %s", s.opts.Clustering.RaftLogPath)
if err := s.startRaftNode(recoveredState != nil); err != nil {
return err
}
}
Expand Down Expand Up @@ -1849,6 +1850,9 @@ func (s *StanServer) start(runningState State) error {
}

s.log.Noticef("Message store is %s", s.store.Name())
if s.opts.FilestoreDir != "" {
s.log.Noticef("Store location: %v", s.opts.FilestoreDir)
}
// The store has a copy of the limits and the inheritance
// was not applied to our limits. To have them displayed correctly,
// call Build() on them (we know that this is not going to fail,
Expand All @@ -1871,8 +1875,8 @@ func (s *StanServer) start(runningState State) error {

// startRaftNode creates and starts the Raft group.
// This should only be called if the server is running in clustered mode.
func (s *StanServer) startRaftNode() error {
if err := s.createServerRaftNode(); err != nil {
func (s *StanServer) startRaftNode(hasStreamingState bool) error {
if err := s.createServerRaftNode(hasStreamingState); err != nil {
return err
}
node := s.raft
Expand Down Expand Up @@ -3590,7 +3594,7 @@ func (s *StanServer) ioLoop(ready *sync.WaitGroup) {
// return an error (we are not using timeout in Apply())
// is if raft fails to store its log, but it would have
// then switched follower state. On leadership acquisition
// we do reet nextSequence based on lastSequence on store.
// we do reset nextSequence based on lastSequence on store.
// Regardless, do reset here in case of error.

// Note that each future contains a batch of messages for
Expand Down

0 comments on commit 08c1b25

Please sign in to comment.