diff --git a/server/consumer.go b/server/consumer.go index 3d9d8a9bc0..3fe0911464 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2001,8 +2001,12 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { return } - forwardProposals := func() { + forwardProposals := func() error { o.mu.Lock() + if o.node != node || node.State() != Leader { + o.mu.Unlock() + return errors.New("no longer leader") + } proposal := o.phead o.phead, o.ptail = nil, nil o.mu.Unlock() @@ -2024,6 +2028,7 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { if len(entries) > 0 { node.ProposeDirect(entries) } + return nil } // In case we have anything pending on entry. @@ -2035,7 +2040,9 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { forwardProposals() return case <-pch: - forwardProposals() + if err := forwardProposals(); err != nil { + return + } } } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 629f797661..0a8a65bca8 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -18,6 +18,7 @@ package server import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -5682,3 +5683,129 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) { require_True(t, s.lookupRaftNode(sgn) == nil) require_True(t, s.lookupRaftNode(ogn) == nil) } + +func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { + t.Skip("This test takes too long, need to make shorter") + + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + s := c.randomNonLeader() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + nc2, producer := jsClientConnect(t, s) + defer nc2.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + c.waitOnStreamLeader(globalAccountName, "TEST") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + end := time.Now().Add(2 * time.Second) + for time.Now().Before(end) { + producer.Publish("foo", []byte(strings.Repeat("A", 128))) + time.Sleep(time.Millisecond) + } + + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + sub, err := js.PullSubscribe("foo", fmt.Sprintf("C-%d", i)) + require_NoError(t, err) + + wg.Add(1) + go func() { + defer wg.Done() + for range time.NewTicker(10 * time.Millisecond).C { + select { + case <-ctx.Done(): + return + default: + } + + msgs, err := sub.Fetch(1) + if err != nil && !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, nats.ErrConnectionClosed) { + t.Logf("Pull Error: %v", err) + } + for _, msg := range msgs { + msg.Ack() + } + } + }() + } + c.lameDuckRestartAll() + c.waitOnStreamLeader(globalAccountName, "TEST") + + // Swap the logger to try to detect the condition after the restart. + loggers := make([]*captureDebugLogger, 3) + for i, srv := range c.servers { + l := &captureDebugLogger{dbgCh: make(chan string, 10)} + loggers[i] = l + srv.SetLogger(l, true, false) + } + condition := `Direct proposal ignored, not leader (state: CLOSED)` + errCh := make(chan error, 10) + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case dl := <-loggers[0].dbgCh: + if strings.Contains(dl, condition) { + errCh <- fmt.Errorf(condition) + } + case dl := <-loggers[1].dbgCh: + if strings.Contains(dl, condition) { + errCh <- fmt.Errorf(condition) + } + case dl := <-loggers[2].dbgCh: + if strings.Contains(dl, condition) { + errCh <- fmt.Errorf(condition) + } + case <-ctx.Done(): + return + } + } + }() + + // Start publishing again for a while. + end = time.Now().Add(2 * time.Second) + for time.Now().Before(end) { + producer.Publish("foo", []byte(strings.Repeat("A", 128))) + time.Sleep(time.Millisecond) + } + + // Try to do a stream edit back to R=1 after doing all the upgrade. + info, _ := js.StreamInfo("TEST") + sconfig := info.Config + sconfig.Replicas = 1 + _, err = js.UpdateStream(&sconfig) + require_NoError(t, err) + + // Leave running for some time after the update. + time.Sleep(2 * time.Second) + + info, _ = js.StreamInfo("TEST") + sconfig = info.Config + sconfig.Replicas = 3 + _, err = js.UpdateStream(&sconfig) + require_NoError(t, err) + + select { + case e := <-errCh: + t.Fatalf("Bad condition on raft node: %v", e) + case <-time.After(2 * time.Second): + // Done + } + + // Stop goroutines and wait for them to exit. + cancel() + wg.Wait() +} diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index d4d31f4802..1d6813dd98 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1541,6 +1541,21 @@ func (c *cluster) restartAll() { c.waitOnClusterReady() } +func (c *cluster) lameDuckRestartAll() { + c.t.Helper() + for i, s := range c.servers { + s.lameDuckMode() + s.WaitForShutdown() + if !s.Running() { + opts := c.opts[i] + s, o := RunServerWithConfig(opts.ConfigFile) + c.servers[i] = s + c.opts[i] = o + } + } + c.waitOnClusterReady() +} + func (c *cluster) restartAllSamePorts() { c.t.Helper() for i, s := range c.servers {