From 27245891f2eddee34ed500e1410e0f9825f43e29 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 15 Sep 2023 16:11:00 -0700 Subject: [PATCH 1/4] Add test for scaling replica with pull consumers Signed-off-by: Waldemar Quevedo --- server/jetstream_cluster_3_test.go | 110 +++++++++++++++++++++++++++++ server/jetstream_helpers_test.go | 15 ++++ server/raft.go | 2 + 3 files changed, 127 insertions(+) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 629f797661..ed28ce5e49 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -18,6 +18,7 @@ package server import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -5682,3 +5683,112 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) { require_True(t, s.lookupRaftNode(sgn) == nil) require_True(t, s.lookupRaftNode(ogn) == nil) } + +func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + s := c.randomNonLeader() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + nc2, producer := jsClientConnect(t, s) + defer nc2.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + c.waitOnStreamLeader(globalAccountName, "TEST") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + end := time.Now().Add(10 * time.Second) + for time.Now().Before(end) { + select { + case <-ctx.Done(): + default: + } + producer.Publish("foo", []byte(strings.Repeat("A", 128))) + time.Sleep(time.Millisecond) + } + + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + sub, err := js.PullSubscribe("foo", fmt.Sprintf("C-%d", i)) + require_NoError(t, err) + + wg.Add(1) + go func() { + defer wg.Done() + for range time.NewTicker(10 * time.Millisecond).C { + select { + case <-ctx.Done(): + return + default: + } + + msgs, err := sub.Fetch(1) + if err != nil && !errors.Is(err, nats.ErrTimeout) { + t.Logf("Pull Error: %v", err) + } + for _, msg := range msgs { + msg.Ack() + } + } + }() + } + + c.lameDuckRestartAll() + c.waitOnStreamLeader(globalAccountName, "TEST") + + // Start publishing again for a while. + end = time.Now().Add(10 * time.Second) + for time.Now().Before(end) { + select { + case <-ctx.Done(): + default: + } + producer.Publish("foo", []byte(strings.Repeat("A", 128))) + } + + fmt.Printf("SCALE DOWN TO R1\n") + + // Try to do a stream edit back to R=1 after doing all the upgrade. + info, _ := js.StreamInfo("TEST") + sconfig := info.Config + sconfig.Replicas = 1 + _, err = js.UpdateStream(&sconfig) + require_NoError(t, err) + + // Let running for some time. + time.Sleep(10 * time.Second) + + fmt.Printf("SCALE UP TO R3\n") + + info, _ = js.StreamInfo("TEST") + sconfig = info.Config + sconfig.Replicas = 3 + _, err = js.UpdateStream(&sconfig) + require_NoError(t, err) + // Let running after the update... + time.Sleep(10 * time.Second) + + // Start publishing again for a while. + end = time.Now().Add(30 * time.Second) + for time.Now().Before(end) { + select { + case <-ctx.Done(): + default: + } + producer.Publish("foo", []byte(strings.Repeat("A", 128))) + time.Sleep(time.Millisecond) + } + + // Stop goroutines and wait for them to exit. + cancel() + wg.Wait() +} diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index d4d31f4802..1d6813dd98 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1541,6 +1541,21 @@ func (c *cluster) restartAll() { c.waitOnClusterReady() } +func (c *cluster) lameDuckRestartAll() { + c.t.Helper() + for i, s := range c.servers { + s.lameDuckMode() + s.WaitForShutdown() + if !s.Running() { + opts := c.opts[i] + s, o := RunServerWithConfig(opts.ConfigFile) + c.servers[i] = s + c.opts[i] = o + } + } + c.waitOnClusterReady() +} + func (c *cluster) restartAllSamePorts() { c.t.Helper() for i, s := range c.servers { diff --git a/server/raft.go b/server/raft.go index 076b8e0df9..920f6f41ba 100644 --- a/server/raft.go +++ b/server/raft.go @@ -674,7 +674,9 @@ func (n *raft) Propose(data []byte) error { func (n *raft) ProposeDirect(entries []*Entry) error { n.RLock() if n.state != Leader { + group := n.group n.RUnlock() + fmt.Printf("Direct proposal ignored, not leader (state: %v, group: %v)\n", n.state, group) n.debug("Direct proposal ignored, not leader (state: %v)", n.state) return errNotLeader } From 850c89e175dea0bf7a161ff98db026455de49e33 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 18 Sep 2023 11:00:37 -0700 Subject: [PATCH 2/4] When scaling a consumer down make sure to pop the loopAndForwardProposals go routine Signed-off-by: Derek Collison --- server/consumer.go | 14 ++++++ server/jetstream_cluster.go | 2 + server/jetstream_cluster_3_test.go | 73 ++++++++++++++++++------------ server/raft.go | 2 - 4 files changed, 60 insertions(+), 31 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 3d9d8a9bc0..ce14e101ff 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1122,6 +1122,16 @@ func (o *consumer) isLeader() bool { return true } +func (o *consumer) clearLoopAndForward() { + o.mu.Lock() + defer o.mu.Unlock() + if o.qch != nil { + close(o.qch) + // Note can not close pch here. + o.qch, o.pch = nil, nil + } +} + func (o *consumer) setLeader(isLeader bool) { o.mu.RLock() mset := o.mset @@ -2003,9 +2013,13 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { forwardProposals := func() { o.mu.Lock() + node, pch = o.node, o.pch proposal := o.phead o.phead, o.ptail = nil, nil o.mu.Unlock() + if node == nil || pch == nil || node.State() != Leader { + return + } // 256k max for now per batch. const maxBatch = 256 * 1024 var entries []*Entry diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 311dd0ecd3..2d6f30b528 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4264,6 +4264,8 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state } else { // Check for scale down to 1.. if rg.node != nil && len(rg.Peers) == 1 { + // Need to pop loopAndForward by closing qch and nil out both qch and pch. + o.clearLoopAndForward() o.clearNode() o.setLeader(true) // Need to clear from rg too. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index ed28ce5e49..f619dc762a 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5706,12 +5706,8 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - end := time.Now().Add(10 * time.Second) + end := time.Now().Add(2 * time.Second) for time.Now().Before(end) { - select { - case <-ctx.Done(): - default: - } producer.Publish("foo", []byte(strings.Repeat("A", 128))) time.Sleep(time.Millisecond) } @@ -5732,7 +5728,7 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { } msgs, err := sub.Fetch(1) - if err != nil && !errors.Is(err, nats.ErrTimeout) { + if err != nil && !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, nats.ErrConnectionClosed) { t.Logf("Pull Error: %v", err) } for _, msg := range msgs { @@ -5741,22 +5737,49 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { } }() } - c.lameDuckRestartAll() c.waitOnStreamLeader(globalAccountName, "TEST") + // Swap the logger to try to detect the condition after the restart. + loggers := make([]*captureDebugLogger, 3) + for i, srv := range c.servers { + l := &captureDebugLogger{dbgCh: make(chan string, 10)} + loggers[i] = l + srv.SetLogger(l, true, false) + } + condition := `Direct proposal ignored, not leader (state: CLOSED)` + errCh := make(chan error, 10) + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case dl := <-loggers[0].dbgCh: + if strings.Contains(dl, condition) { + errCh <- fmt.Errorf(condition) + } + case dl := <-loggers[1].dbgCh: + if strings.Contains(dl, condition) { + errCh <- fmt.Errorf(condition) + } + case dl := <-loggers[2].dbgCh: + if strings.Contains(dl, condition) { + errCh <- fmt.Errorf(condition) + } + case <-ctx.Done(): + return + } + } + }() + // Start publishing again for a while. - end = time.Now().Add(10 * time.Second) + end = time.Now().Add(2 * time.Second) for time.Now().Before(end) { - select { - case <-ctx.Done(): - default: - } producer.Publish("foo", []byte(strings.Repeat("A", 128))) + time.Sleep(time.Millisecond) } - fmt.Printf("SCALE DOWN TO R1\n") - // Try to do a stream edit back to R=1 after doing all the upgrade. info, _ := js.StreamInfo("TEST") sconfig := info.Config @@ -5764,28 +5787,20 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { _, err = js.UpdateStream(&sconfig) require_NoError(t, err) - // Let running for some time. - time.Sleep(10 * time.Second) - - fmt.Printf("SCALE UP TO R3\n") + // Leave running for some time after the update. + time.Sleep(2 * time.Second) info, _ = js.StreamInfo("TEST") sconfig = info.Config sconfig.Replicas = 3 _, err = js.UpdateStream(&sconfig) require_NoError(t, err) - // Let running after the update... - time.Sleep(10 * time.Second) - // Start publishing again for a while. - end = time.Now().Add(30 * time.Second) - for time.Now().Before(end) { - select { - case <-ctx.Done(): - default: - } - producer.Publish("foo", []byte(strings.Repeat("A", 128))) - time.Sleep(time.Millisecond) + select { + case e := <-errCh: + t.Fatalf("Bad condition on raft node: %v", e) + case <-time.After(2 * time.Second): + // Done } // Stop goroutines and wait for them to exit. diff --git a/server/raft.go b/server/raft.go index 920f6f41ba..076b8e0df9 100644 --- a/server/raft.go +++ b/server/raft.go @@ -674,9 +674,7 @@ func (n *raft) Propose(data []byte) error { func (n *raft) ProposeDirect(entries []*Entry) error { n.RLock() if n.state != Leader { - group := n.group n.RUnlock() - fmt.Printf("Direct proposal ignored, not leader (state: %v, group: %v)\n", n.state, group) n.debug("Direct proposal ignored, not leader (state: %v)", n.state) return errNotLeader } From ea775a80e81390703ffef1ca9ad5ab2a510d30d7 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Mon, 18 Sep 2023 12:46:53 -0700 Subject: [PATCH 3/4] Skip TestJetStreamClusterRestartThenScaleStreamReplicas for now Signed-off-by: Waldemar Quevedo --- server/jetstream_cluster.go | 3 ++- server/jetstream_cluster_3_test.go | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 2d6f30b528..205b598829 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4264,7 +4264,8 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state } else { // Check for scale down to 1.. if rg.node != nil && len(rg.Peers) == 1 { - // Need to pop loopAndForward by closing qch and nil out both qch and pch. + // Need to pop loopAndForward by closing qch and nil out both qch and pch + // to avoid leaving a closed raft node forwarding proposals. o.clearLoopAndForward() o.clearNode() o.setLeader(true) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index f619dc762a..0a8a65bca8 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5685,6 +5685,8 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) { } func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { + t.Skip("This test takes too long, need to make shorter") + c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() From 71b8a334567998a612a8d77f5cb9f11a7945ef17 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 18 Sep 2023 13:27:27 -0700 Subject: [PATCH 4/4] Update to not pop directly, just bail when we detect leadership change Signed-off-by: Derek Collison --- server/consumer.go | 25 +++++++++---------------- server/jetstream_cluster.go | 3 --- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index ce14e101ff..3fe0911464 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1122,16 +1122,6 @@ func (o *consumer) isLeader() bool { return true } -func (o *consumer) clearLoopAndForward() { - o.mu.Lock() - defer o.mu.Unlock() - if o.qch != nil { - close(o.qch) - // Note can not close pch here. - o.qch, o.pch = nil, nil - } -} - func (o *consumer) setLeader(isLeader bool) { o.mu.RLock() mset := o.mset @@ -2011,15 +2001,15 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { return } - forwardProposals := func() { + forwardProposals := func() error { o.mu.Lock() - node, pch = o.node, o.pch + if o.node != node || node.State() != Leader { + o.mu.Unlock() + return errors.New("no longer leader") + } proposal := o.phead o.phead, o.ptail = nil, nil o.mu.Unlock() - if node == nil || pch == nil || node.State() != Leader { - return - } // 256k max for now per batch. const maxBatch = 256 * 1024 var entries []*Entry @@ -2038,6 +2028,7 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { if len(entries) > 0 { node.ProposeDirect(entries) } + return nil } // In case we have anything pending on entry. @@ -2049,7 +2040,9 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { forwardProposals() return case <-pch: - forwardProposals() + if err := forwardProposals(); err != nil { + return + } } } } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 205b598829..311dd0ecd3 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4264,9 +4264,6 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state } else { // Check for scale down to 1.. if rg.node != nil && len(rg.Peers) == 1 { - // Need to pop loopAndForward by closing qch and nil out both qch and pch - // to avoid leaving a closed raft node forwarding proposals. - o.clearLoopAndForward() o.clearNode() o.setLeader(true) // Need to clear from rg too.