diff --git a/server/clustering.go b/server/clustering.go index dea6b14a..e6f6e43f 100644 --- a/server/clustering.go +++ b/server/clustering.go @@ -100,7 +100,7 @@ func (r *raftNode) shutdown() error { } // createRaftNode creates and starts a new Raft node. -func (s *StanServer) createServerRaftNode() error { +func (s *StanServer) createServerRaftNode(hasStreamingState bool) error { var ( name = s.info.ClusterID addr = s.getClusteringAddr(name) @@ -109,6 +109,9 @@ func (s *StanServer) createServerRaftNode() error { if err != nil { return err } + if !existingState && hasStreamingState { + return fmt.Errorf("streaming state was recovered but cluster log path %q is empty", s.opts.Clustering.RaftLogPath) + } node := s.raft // Bootstrap if there is no previous state and we are starting this node as diff --git a/server/clustering_test.go b/server/clustering_test.go index 3b3de9fe..b2d04b37 100644 --- a/server/clustering_test.go +++ b/server/clustering_test.go @@ -878,8 +878,6 @@ func TestClusteringDontRecoverFSClientsAndSubs(t *testing.T) { s1.Shutdown() s2.Shutdown() - cleanupRaftLog(t) - s1 = runServerWithOpts(t, s1sOpts, nil) defer s1.Shutdown() @@ -3210,3 +3208,40 @@ func TestClusteringCrashOnRestart(t *testing.T) { s = runServerWithOpts(t, opts, nil) defer s.Shutdown() } + +func TestClusteringNoRaftStateButStreamingState(t *testing.T) { + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + opts := getTestDefaultOptsForClustering("a", true) + s := runServerWithOpts(t, opts, nil) + defer s.Shutdown() + + getLeader(t, 10*time.Second, s) + + sc := NewDefaultConnection(t) + defer sc.Close() + if err := sc.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + sc.Close() + + s.Shutdown() + cleanupRaftLog(t) + s, err := RunServerWithOpts(opts, nil) + if err == nil { + s.Shutdown() + t.Fatal("Expected error, got none") + } + // Start again and still should fail + s, err = RunServerWithOpts(opts, nil) + if err == nil { + s.Shutdown() + t.Fatal("Expected error, got none") + } +} diff --git a/server/server.go b/server/server.go index 620e66a9..73ba6230 100644 --- a/server/server.go +++ b/server/server.go @@ -1819,8 +1819,9 @@ func (s *StanServer) start(runningState State) error { if s.opts.Clustering.RaftLogPath == "" { s.opts.Clustering.RaftLogPath = filepath.Join(s.opts.ID, s.opts.Clustering.NodeID) } - s.log.Noticef("Cluster Node ID: %s", s.info.NodeID) - if err := s.startRaftNode(); err != nil { + s.log.Noticef("Cluster Node ID : %s", s.info.NodeID) + s.log.Noticef("Cluster Log Path: %s", s.opts.Clustering.RaftLogPath) + if err := s.startRaftNode(recoveredState != nil); err != nil { return err } } @@ -1849,6 +1850,9 @@ func (s *StanServer) start(runningState State) error { } s.log.Noticef("Message store is %s", s.store.Name()) + if s.opts.FilestoreDir != "" { + s.log.Noticef("Store location: %v", s.opts.FilestoreDir) + } // The store has a copy of the limits and the inheritance // was not applied to our limits. To have them displayed correctly, // call Build() on them (we know that this is not going to fail, @@ -1871,8 +1875,8 @@ func (s *StanServer) start(runningState State) error { // startRaftNode creates and starts the Raft group. // This should only be called if the server is running in clustered mode. -func (s *StanServer) startRaftNode() error { - if err := s.createServerRaftNode(); err != nil { +func (s *StanServer) startRaftNode(hasStreamingState bool) error { + if err := s.createServerRaftNode(hasStreamingState); err != nil { return err } node := s.raft @@ -3590,7 +3594,7 @@ func (s *StanServer) ioLoop(ready *sync.WaitGroup) { // return an error (we are not using timeout in Apply()) // is if raft fails to store its log, but it would have // then switched follower state. On leadership acquisition - // we do reet nextSequence based on lastSequence on store. + // we do reset nextSequence based on lastSequence on store. // Regardless, do reset here in case of error. // Note that each future contains a batch of messages for