diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 9dc0618446b..df84e460696 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3403,3 +3403,106 @@ func TestJetStreamInterestLeakOnDisableJetStream(t *testing.T) { t.Fatalf("unexpected dangling interests for JetStream assets after shutdown (%d $JSC, %d $NRG)", danglingJSC, danglingRaft) } } + +func TestJetStreamNoLeadersDuringLameDuck(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Grab the first server and set lameduck option directly. + s := c.servers[0] + s.optsMu.Lock() + s.opts.LameDuckDuration = 5 * time.Second + s.opts.LameDuckGracePeriod = -5 * time.Second + s.optsMu.Unlock() + + // Connect to the third server. + nc, js := jsClientConnect(t, c.servers[2]) + defer nc.Close() + + allServersHaveLeaders := func() bool { + haveLeader := make(map[*Server]bool) + for _, s := range c.servers { + s.rnMu.RLock() + for _, n := range s.raftNodes { + if n.Leader() { + haveLeader[s] = true + break + } + } + s.rnMu.RUnlock() + } + return len(haveLeader) == len(c.servers) + } + + // Create streams until we have a leader on all the servers. + var index int + checkFor(t, 10*time.Second, time.Millisecond, func() error { + if allServersHaveLeaders() { + return nil + } + index++ + _, err := js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("TEST_%d", index), + Subjects: []string{fmt.Sprintf("foo.%d", index)}, + Replicas: 3, + }) + require_NoError(t, err) + return fmt.Errorf("All servers do not have at least one leader") + }) + + // Put our server into lameduck mode. + // Need a client. + dummy, _ := jsClientConnect(t, s) + defer dummy.Close() + go s.lameDuckMode() + + // Wait for all leaders to move off. + checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { + s.rnMu.RLock() + defer s.rnMu.RUnlock() + for _, n := range s.raftNodes { + if n.Leader() { + return fmt.Errorf("Server still has a leader") + } + } + return nil + }) + + // All leader evacuated. + + // Create a go routine that will create streams constantly. + qch := make(chan bool) + go func() { + var index int + for { + select { + case <-time.After(time.Millisecond): + index++ + _, err := js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("NEW_TEST_%d", index), + Subjects: []string{fmt.Sprintf("bar.%d", index)}, + Replicas: 3, + }) + if err != nil { + return + } + case <-qch: + return + } + } + }() + defer close(qch) + + // Make sure we do not have any leaders placed on the lameduck server. + for s.isRunning() { + var hasLeader bool + s.rnMu.RLock() + for _, n := range s.raftNodes { + hasLeader = hasLeader || n.Leader() + } + s.rnMu.RUnlock() + if hasLeader { + t.Fatalf("Server had a leader when it should not due to lameduck mode") + } + } +} diff --git a/server/raft.go b/server/raft.go index 11100b0e744..3daaa63f8b3 100644 --- a/server/raft.go +++ b/server/raft.go @@ -485,6 +485,12 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig) (RaftNode, error n.debug("Started") + // Check if we need to start in observer mode due to lame duck status. + if s.isLameDuckMode() { + n.debug("Will start in observer mode due to lame duck status") + n.SetObserver(true) + } + n.Lock() n.resetElectionTimeout() n.llqrt = time.Now() @@ -611,6 +617,9 @@ func (s *Server) shutdownRaftNodes() { } } +// Used in lameduck mode to move off the leaders. +// We also put all nodes in observer mode so new leaders +// can not be placed on this server. func (s *Server) transferRaftLeaders() bool { if s == nil { return false @@ -631,6 +640,7 @@ func (s *Server) transferRaftLeaders() bool { node.StepDown() didTransfer = true } + node.SetObserver(true) } return didTransfer }