From d107ba354936814f06751dbbd42f6e3e03739a0d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 28 Apr 2023 17:11:08 -0700 Subject: [PATCH 1/8] Under certain scenarios we have witnessed healthz() that never retrun healthy due to a stream or consumer being missing or stopped. This will now allow the healthy call to attempt to restart those assets. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 99 ++++++++++++++++++++++++++---- server/jetstream_cluster_3_test.go | 71 +++++++++++++++++++++ server/monitor.go | 5 +- 3 files changed, 162 insertions(+), 13 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 9a29635759..b846f3eeee 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -434,24 +434,89 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { return false } +// Restart the stream in question. +// Should only be called when the stream is know in a bad state. +func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) { + js.mu.Lock() + cc := js.cluster + if cc == nil { + js.mu.Unlock() + return + } + // Need to lookup the one directly from the meta layer, what we get handed is a copy if coming from isStreamHealthy. + asa := cc.streams[acc.Name] + if asa == nil { + js.mu.Unlock() + return + } + sa := asa[csa.Config.Name] + if sa == nil { + js.mu.Unlock() + return + } + // Make sure to clear out the raft node if still present in the meta layer. + if rg := sa.Group; rg != nil && rg.node != nil { + rg.node = nil + } + js.mu.Unlock() + + // Process stream assignment to recreate. + js.processStreamAssignment(sa) + + // If we had consumers assigned to this server they will be present in the copy, csa. + // They also need to be processed. The csa consumers is a copy of only our consumers, + // those assigned to us, but the consumer assignment's there are direct from the meta + // layer to make this part much easier and avoid excessive lookups. + for _, cca := range csa.consumers { + if cca.deleted { + continue + } + // Need to look up original as well here to make sure node is nil. + js.mu.Lock() + ca := sa.consumers[cca.Name] + if ca != nil && ca.Group != nil { + // Make sure node is wiped. + ca.Group.node = nil + } + js.mu.Unlock() + if ca != nil { + js.processConsumerAssignment(ca) + } + } +} + // isStreamHealthy will determine if the stream is up to date or very close. // For R1 it will make sure the stream is present on this server. func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { - js.mu.RLock() - defer js.mu.RUnlock() - + js.mu.Lock() cc := js.cluster if cc == nil { // Non-clustered mode + js.mu.Unlock() return true } + + // Pull the group out. rg := sa.Group if rg == nil { + js.mu.Unlock() + return false + } + + streamName := sa.Config.Name + node := rg.node + js.mu.Unlock() + + // First lookup stream and make sure its there. + mset, err := acc.lookupStream(streamName) + if err != nil { + js.restartStream(acc, sa) return false } - if rg := sa.Group; rg != nil && (rg.node == nil || rg.node.Healthy()) { + + if node == nil || node.Healthy() { // Check if we are processing a snapshot and are catching up. - if mset, err := acc.lookupStream(sa.Config.Name); err == nil && !mset.isCatchingUp() { + if !mset.isCatchingUp() { return true } } @@ -460,23 +525,35 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { // isConsumerCurrent will determine if the consumer is up to date. // For R1 it will make sure the consunmer is present on this server. -func (js *jetStream) isConsumerCurrent(mset *stream, consumer string, ca *consumerAssignment) bool { +func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consumerAssignment) bool { + if mset == nil { + return false + } js.mu.RLock() - defer js.mu.RUnlock() - cc := js.cluster + js.mu.RUnlock() + if cc == nil { // Non-clustered mode return true } o := mset.lookupConsumer(consumer) if o == nil { + js.mu.Lock() + if ca.Group != nil { + ca.Group.node = nil + } + deleted := ca.deleted + js.mu.Unlock() + if !deleted { + js.processConsumerAssignment(ca) + } return false } - if n := o.raftNode(); n != nil && !n.Current() { - return false + if node := o.raftNode(); node == nil || node.Healthy() { + return true } - return true + return false } // subjectsOverlap checks all existing stream assignments for the account cross-cluster for subject overlap diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 3301ebb0c6..bd91380c7e 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3763,3 +3763,74 @@ func TestJetStreamClusterConsumerInfoForJszForFollowers(t *testing.T) { } } } + +// Under certain scenarios we have seen consumers become stopped and cause healthz to fail. +// The specific scneario is heavy loads, and stream resets on upgrades that could orphan consumers. +func TestJetStreamClusterHealthzCheckForStoppedAssets(t *testing.T) { + c := createJetStreamClusterExplicit(t, "NATS", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Replicas: 3, + }) + require_NoError(t, err) + + for i := 0; i < 1000; i++ { + sendStreamMsg(t, nc, "foo", "HELLO") + } + + sub, err := js.PullSubscribe("foo", "d") + require_NoError(t, err) + + fetch, ack := 122, 22 + msgs, err := sub.Fetch(fetch, nats.MaxWait(10*time.Second)) + require_NoError(t, err) + require_True(t, len(msgs) == fetch) + for _, m := range msgs[:ack] { + m.AckSync() + } + // Let acks propagate. + time.Sleep(100 * time.Millisecond) + + // We will now stop a stream on a given server. + s := c.randomServer() + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + // Stop the stream + mset.stop(false, false) + + // Wait for exit. + time.Sleep(100 * time.Millisecond) + + checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { + hs := s.healthz(nil) + if hs.Error != _EMPTY_ { + return errors.New(hs.Error) + } + return nil + }) + + // Now take out the consumer. + mset, err = s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + o := mset.lookupConsumer("d") + require_NotNil(t, o) + + o.stop() + // Wait for exit. + time.Sleep(100 * time.Millisecond) + + checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { + hs := s.healthz(nil) + if hs.Error != _EMPTY_ { + return errors.New(hs.Error) + } + return nil + }) +} diff --git a/server/monitor.go b/server/monitor.go index 7d118ab433..66045cd084 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -3144,7 +3144,8 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { csa.consumers = make(map[string]*consumerAssignment) for consumer, ca := range sa.consumers { if ca.Group.isMember(ourID) { - csa.consumers[consumer] = ca.copyGroup() + // Use original here. Not a copy. + csa.consumers[consumer] = ca } } nasa[stream] = csa @@ -3173,7 +3174,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { mset, _ := acc.lookupStream(stream) // Now check consumers. for consumer, ca := range sa.consumers { - if !js.isConsumerCurrent(mset, consumer, ca) { + if !js.isConsumerHealthy(mset, consumer, ca) { health.Status = na health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer) return health From ac27fd046a6aa5a22f5c49a71690145f0616664a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 28 Apr 2023 17:57:03 -0700 Subject: [PATCH 2/8] Fix data race Signed-off-by: Derek Collison --- server/stream.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/server/stream.go b/server/stream.go index 7be712af4b..f0f5065f62 100644 --- a/server/stream.go +++ b/server/stream.go @@ -601,6 +601,20 @@ func (mset *stream) streamAssignment() *streamAssignment { } func (mset *stream) setStreamAssignment(sa *streamAssignment) { + var node RaftNode + + mset.mu.RLock() + js := mset.js + mset.mu.RUnlock() + + if js != nil { + js.mu.RLock() + if sa.Group != nil { + node = sa.Group.node + } + js.mu.RUnlock() + } + mset.mu.Lock() defer mset.mu.Unlock() @@ -610,7 +624,7 @@ func (mset *stream) setStreamAssignment(sa *streamAssignment) { } // Set our node. - mset.node = sa.Group.node + mset.node = node if mset.node != nil { mset.node.UpdateKnownPeers(sa.Group.Peers) } From 85f6bfb2ac0208e1c8af02eb5abdd91ac8d8c656 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 28 Apr 2023 17:58:45 -0700 Subject: [PATCH 3/8] Check healthz periodically Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index b846f3eeee..c5a0262fb3 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1127,6 +1127,10 @@ func (js *jetStream) monitorCluster() { lt := time.NewTicker(leaderCheckInterval) defer lt.Stop() + const healthCheckInterval = 2 * time.Minute + ht := time.NewTicker(healthCheckInterval) + defer ht.Stop() + var ( isLeader bool lastSnapTime time.Time @@ -1237,6 +1241,11 @@ func (js *jetStream) monitorCluster() { if n.Leader() { js.checkClusterSize() } + case <-ht.C: + if hs := s.healthz(nil); hs.Error != _EMPTY_ { + s.Warnf("%v", hs.Error) + } + case <-lt.C: s.Debugf("Checking JetStream cluster state") // If we have a current leader or had one in the past we can cancel this here since the metaleader From 546dd0c9ab05bd2fca035949e9175342611319b4 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 29 Apr 2023 07:42:23 -0700 Subject: [PATCH 4/8] Make sure we can recover an underlying node being stopped. Do not return healthy if the node is closed, and wait a bit longer for forward progress. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 15 +++++++++++++-- server/jetstream_cluster_3_test.go | 15 +++++++++++++++ server/raft.go | 11 +++++++++-- 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c5a0262fb3..39fcb0aef2 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -537,8 +537,10 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum // Non-clustered mode return true } - o := mset.lookupConsumer(consumer) - if o == nil { + + // When we try to restart we nil out the node if applicable + // and reprocess the consumer assignment. + restartConsumer := func() { js.mu.Lock() if ca.Group != nil { ca.Group.node = nil @@ -548,10 +550,19 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum if !deleted { js.processConsumerAssignment(ca) } + } + + o := mset.lookupConsumer(consumer) + if o == nil { + restartConsumer() return false } if node := o.raftNode(); node == nil || node.Healthy() { return true + } else if node != nil && node.State() == Closed { + // We have a consumer, and it should have a running node but it is closed. + o.stop() + restartConsumer() } return false } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index bd91380c7e..6700e67845 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3833,4 +3833,19 @@ func TestJetStreamClusterHealthzCheckForStoppedAssets(t *testing.T) { } return nil }) + + // Now just stop the raft node from underneath the consumer. + o = mset.lookupConsumer("d") + require_NotNil(t, o) + node := o.raftNode() + require_NotNil(t, node) + node.Stop() + + checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { + hs := s.healthz(nil) + if hs.Error != _EMPTY_ { + return errors.New(hs.Error) + } + return nil + }) } diff --git a/server/raft.go b/server/raft.go index 3f3060164d..1cb2bdb5db 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1172,9 +1172,16 @@ func (n *raft) isCatchingUp() bool { return n.catchup != nil } -// Lock should be held. This function may block for up to ~5ms to check +// This function may block for up to ~10ms to check // forward progress in some cases. +// Lock should be held. func (n *raft) isCurrent(includeForwardProgress bool) bool { + // Check if we are closed. + if n.state == Closed { + n.debug("Not current, node is closed") + return false + } + // Check whether we've made progress on any state, 0 is invalid so not healthy. if n.commit == 0 { n.debug("Not current, no commits") @@ -1219,7 +1226,7 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { if startDelta := n.commit - n.applied; startDelta > 0 { for i := 0; i < 10; i++ { // 5ms, in 0.5ms increments n.Unlock() - time.Sleep(time.Millisecond / 2) + time.Sleep(time.Millisecond) n.Lock() if n.commit-n.applied < startDelta { // The gap is getting smaller, so we're making forward progress. From fac5658966157506a423e88e9bd3aaa56e2138f8 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 29 Apr 2023 08:15:33 -0700 Subject: [PATCH 5/8] If we fail to create a consumer, make sure to clean up any raft nodes in meta layer and to shutdown the consumer if created but we encountered an error. Signed-off-by: Derek Collison --- server/consumer.go | 2 +- server/jetstream_cluster.go | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/server/consumer.go b/server/consumer.go index 7fce8ed023..e96ba84c48 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -621,7 +621,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri mset.mu.Lock() if mset.client == nil || mset.store == nil || mset.consumers == nil { mset.mu.Unlock() - return nil, errors.New("invalid stream") + return nil, NewJSStreamInvalidError() } // If this one is durable and already exists, we let that be ok as long as only updating what should be allowed. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 39fcb0aef2..cf17792607 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4010,6 +4010,13 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state if rg.node != nil { rg.node.Delete() + // Clear the node here. + rg.node = nil + } + + // If we did seem to create a consumer make sure to stop it. + if o != nil { + o.stop() } var result *consumerAssignmentResult From 4eb4e5496bdf3d37c776e09f58d42e037bb93706 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 29 Apr 2023 09:36:35 -0700 Subject: [PATCH 6/8] Do health check on startup once we have processed existing state. Also do health checks in separate go routine. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index cf17792607..4cc59281a5 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1142,6 +1142,13 @@ func (js *jetStream) monitorCluster() { ht := time.NewTicker(healthCheckInterval) defer ht.Stop() + // Utility to check health. + checkHealth := func() { + if hs := s.healthz(nil); hs.Error != _EMPTY_ { + s.Warnf("%v", hs.Error) + } + } + var ( isLeader bool lastSnapTime time.Time @@ -1216,6 +1223,8 @@ func (js *jetStream) monitorCluster() { ru = nil s.Debugf("Recovered JetStream cluster metadata") js.checkForOrphans() + // Do a health check here as well. + go checkHealth() continue } // FIXME(dlc) - Deal with errors. @@ -1232,6 +1241,7 @@ func (js *jetStream) monitorCluster() { } } aq.recycle(&ces) + case isLeader = <-lch: // For meta layer synchronize everyone to our state on becoming leader. if isLeader { @@ -1253,9 +1263,8 @@ func (js *jetStream) monitorCluster() { js.checkClusterSize() } case <-ht.C: - if hs := s.healthz(nil); hs.Error != _EMPTY_ { - s.Warnf("%v", hs.Error) - } + // Do this in a separate go routine. + go checkHealth() case <-lt.C: s.Debugf("Checking JetStream cluster state") From db972048ceebc57f3d1d1309b1ee64600776280b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 29 Apr 2023 11:18:10 -0700 Subject: [PATCH 7/8] Detect when we are shutting down or if a consumer is already closed when removing a stream. Signed-off-by: Derek Collison --- server/consumer.go | 7 +++++++ server/jetstream.go | 9 +++++++-- server/stream.go | 18 +++++++++++++----- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index e96ba84c48..2fd466c06a 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4395,6 +4395,13 @@ func (o *consumer) delete() error { return o.stopWithFlags(true, false, true, true) } +// To test for closed state. +func (o *consumer) isClosed() bool { + o.mu.RLock() + defer o.mu.RUnlock() + return o.closed +} + func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { o.mu.Lock() js := o.js diff --git a/server/jetstream.go b/server/jetstream.go index fd76e36e92..ae8cab5986 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1,4 +1,4 @@ -// Copyright 2019-2022 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -109,11 +109,14 @@ type jetStream struct { started time.Time // System level request to purge a stream move - accountPurge *subscription + accountPurge *subscription + + // Some bools regarding general state. metaRecovering bool standAlone bool disabled bool oos bool + shuttingDown bool } type remoteUsage struct { @@ -887,6 +890,8 @@ func (s *Server) shutdownJetStream() { } accPurgeSub := js.accountPurge js.accountPurge = nil + // Signal we are shutting down. + js.shuttingDown = true js.mu.Unlock() if accPurgeSub != nil { diff --git a/server/stream.go b/server/stream.go index f0f5065f62..e49f9d7592 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4556,12 +4556,20 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { } mset.mu.Unlock() + js.mu.RLock() + isShuttingDown := js.shuttingDown + js.mu.RUnlock() + for _, o := range obs { - // Third flag says do not broadcast a signal. - // TODO(dlc) - If we have an err here we don't want to stop - // but should we log? - o.stopWithFlags(deleteFlag, deleteFlag, false, advisory) - o.monitorWg.Wait() + if !o.isClosed() { + // Third flag says do not broadcast a signal. + // TODO(dlc) - If we have an err here we don't want to stop + // but should we log? + o.stopWithFlags(deleteFlag, deleteFlag, false, advisory) + if !isShuttingDown { + o.monitorWg.Wait() + } + } } mset.mu.Lock() From b27ce6de809fffb1b2b3823c56b4e33b61eda306 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 29 Apr 2023 11:27:18 -0700 Subject: [PATCH 8/8] Add in a few more places to check on jetstream shutting down. Add in a helper method. Signed-off-by: Derek Collison --- server/jetstream.go | 7 +++++++ server/jetstream_cluster.go | 16 +++++++++++++--- server/stream.go | 7 ++----- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index ae8cab5986..56e2a6fe99 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -858,6 +858,13 @@ func (s *Server) signalPullConsumers() { } } +// Helper for determining if we are shutting down. +func (js *jetStream) isShuttingDown() bool { + js.mu.RLock() + defer js.mu.RUnlock() + return js.shuttingDown +} + // Shutdown jetstream for this server. func (s *Server) shutdownJetStream() { s.mu.RLock() diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 4cc59281a5..693d8bf964 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2567,6 +2567,12 @@ func (mset *stream) resetClusteredState(err error) bool { node.StepDown() } + // If we detect we are shutting down just return. + if js != nil && js.isShuttingDown() { + s.Debugf("Will not reset stream, jetstream shutting down") + return false + } + // Server if js.limitsExceeded(stype) { s.Debugf("Will not reset stream, server resources exceeded") @@ -3203,19 +3209,23 @@ func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment) node.StepDown(nsa.Group.Preferred) } node.ProposeRemovePeer(ourID) - // shut down monitor by shutting down raft + // shutdown monitor by shutting down raft. node.Delete() } + var isShuttingDown bool // Make sure this node is no longer attached to our stream assignment. if js, _ := s.getJetStreamCluster(); js != nil { js.mu.Lock() nsa.Group.node = nil + isShuttingDown = js.shuttingDown js.mu.Unlock() } - // wait for monitor to be shut down - mset.monitorWg.Wait() + if !isShuttingDown { + // wait for monitor to be shutdown. + mset.monitorWg.Wait() + } mset.stop(true, false) } diff --git a/server/stream.go b/server/stream.go index e49f9d7592..9d3e20004b 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4474,7 +4474,7 @@ func (mset *stream) internalLoop() { } } -// Used to break consumers out of their +// Used to break consumers out of their monitorConsumer go routines. func (mset *stream) resetAndWaitOnConsumers() { mset.mu.RLock() consumers := make([]*consumer, 0, len(mset.consumers)) @@ -4556,10 +4556,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { } mset.mu.Unlock() - js.mu.RLock() - isShuttingDown := js.shuttingDown - js.mu.RUnlock() - + isShuttingDown := js.isShuttingDown() for _, o := range obs { if !o.isClosed() { // Third flag says do not broadcast a signal.