Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Health repair #4116

Merged
merged 8 commits into from
Apr 30, 2023
9 changes: 8 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
mset.mu.Lock()
if mset.client == nil || mset.store == nil || mset.consumers == nil {
mset.mu.Unlock()
return nil, errors.New("invalid stream")
return nil, NewJSStreamInvalidError()
}

// If this one is durable and already exists, we let that be ok as long as only updating what should be allowed.
Expand Down Expand Up @@ -4395,6 +4395,13 @@ func (o *consumer) delete() error {
return o.stopWithFlags(true, false, true, true)
}

// To test for closed state.
func (o *consumer) isClosed() bool {
o.mu.RLock()
defer o.mu.RUnlock()
return o.closed
}

func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
o.mu.Lock()
js := o.js
Expand Down
16 changes: 14 additions & 2 deletions server/jetstream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -109,11 +109,14 @@ type jetStream struct {
started time.Time

// System level request to purge a stream move
accountPurge *subscription
accountPurge *subscription

// Some bools regarding general state.
metaRecovering bool
standAlone bool
disabled bool
oos bool
shuttingDown bool
}

type remoteUsage struct {
Expand Down Expand Up @@ -855,6 +858,13 @@ func (s *Server) signalPullConsumers() {
}
}

// Helper for determining if we are shutting down.
func (js *jetStream) isShuttingDown() bool {
js.mu.RLock()
defer js.mu.RUnlock()
return js.shuttingDown
}

// Shutdown jetstream for this server.
func (s *Server) shutdownJetStream() {
s.mu.RLock()
Expand Down Expand Up @@ -887,6 +897,8 @@ func (s *Server) shutdownJetStream() {
}
accPurgeSub := js.accountPurge
js.accountPurge = nil
// Signal we are shutting down.
js.shuttingDown = true
js.mu.Unlock()

if accPurgeSub != nil {
Expand Down
151 changes: 137 additions & 14 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,24 +434,89 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {
return false
}

// Restart the stream in question.
// Should only be called when the stream is know in a bad state.
func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
js.mu.Lock()
cc := js.cluster
if cc == nil {
js.mu.Unlock()
return
}
// Need to lookup the one directly from the meta layer, what we get handed is a copy if coming from isStreamHealthy.
asa := cc.streams[acc.Name]
if asa == nil {
js.mu.Unlock()
return
}
sa := asa[csa.Config.Name]
if sa == nil {
js.mu.Unlock()
return
}
// Make sure to clear out the raft node if still present in the meta layer.
if rg := sa.Group; rg != nil && rg.node != nil {
rg.node = nil
}
js.mu.Unlock()

// Process stream assignment to recreate.
js.processStreamAssignment(sa)

// If we had consumers assigned to this server they will be present in the copy, csa.
// They also need to be processed. The csa consumers is a copy of only our consumers,
// those assigned to us, but the consumer assignment's there are direct from the meta
// layer to make this part much easier and avoid excessive lookups.
for _, cca := range csa.consumers {
if cca.deleted {
continue
}
// Need to look up original as well here to make sure node is nil.
js.mu.Lock()
ca := sa.consumers[cca.Name]
if ca != nil && ca.Group != nil {
// Make sure node is wiped.
ca.Group.node = nil
}
js.mu.Unlock()
if ca != nil {
js.processConsumerAssignment(ca)
}
}
}

// isStreamHealthy will determine if the stream is up to date or very close.
// For R1 it will make sure the stream is present on this server.
func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
js.mu.RLock()
defer js.mu.RUnlock()

js.mu.Lock()
cc := js.cluster
if cc == nil {
// Non-clustered mode
js.mu.Unlock()
return true
}

// Pull the group out.
rg := sa.Group
if rg == nil {
js.mu.Unlock()
return false
}
if rg := sa.Group; rg != nil && (rg.node == nil || rg.node.Healthy()) {

streamName := sa.Config.Name
node := rg.node
js.mu.Unlock()

// First lookup stream and make sure its there.
mset, err := acc.lookupStream(streamName)
if err != nil {
js.restartStream(acc, sa)
return false
}

if node == nil || node.Healthy() {
// Check if we are processing a snapshot and are catching up.
if mset, err := acc.lookupStream(sa.Config.Name); err == nil && !mset.isCatchingUp() {
if !mset.isCatchingUp() {
return true
}
}
Expand All @@ -460,23 +525,46 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {

// isConsumerCurrent will determine if the consumer is up to date.
// For R1 it will make sure the consunmer is present on this server.
func (js *jetStream) isConsumerCurrent(mset *stream, consumer string, ca *consumerAssignment) bool {
func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consumerAssignment) bool {
if mset == nil {
return false
}
js.mu.RLock()
defer js.mu.RUnlock()

cc := js.cluster
js.mu.RUnlock()

if cc == nil {
// Non-clustered mode
return true
}

// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
restartConsumer := func() {
js.mu.Lock()
if ca.Group != nil {
ca.Group.node = nil
}
deleted := ca.deleted
js.mu.Unlock()
if !deleted {
js.processConsumerAssignment(ca)
}
}

o := mset.lookupConsumer(consumer)
if o == nil {
restartConsumer()
return false
}
if n := o.raftNode(); n != nil && !n.Current() {
return false
if node := o.raftNode(); node == nil || node.Healthy() {
return true
} else if node != nil && node.State() == Closed {
// We have a consumer, and it should have a running node but it is closed.
o.stop()
restartConsumer()
}
return true
return false
}

// subjectsOverlap checks all existing stream assignments for the account cross-cluster for subject overlap
Expand Down Expand Up @@ -1050,6 +1138,17 @@ func (js *jetStream) monitorCluster() {
lt := time.NewTicker(leaderCheckInterval)
defer lt.Stop()

const healthCheckInterval = 2 * time.Minute
ht := time.NewTicker(healthCheckInterval)
defer ht.Stop()

// Utility to check health.
checkHealth := func() {
if hs := s.healthz(nil); hs.Error != _EMPTY_ {
s.Warnf("%v", hs.Error)
}
}

var (
isLeader bool
lastSnapTime time.Time
Expand Down Expand Up @@ -1124,6 +1223,8 @@ func (js *jetStream) monitorCluster() {
ru = nil
s.Debugf("Recovered JetStream cluster metadata")
js.checkForOrphans()
// Do a health check here as well.
go checkHealth()
continue
}
// FIXME(dlc) - Deal with errors.
Expand All @@ -1140,6 +1241,7 @@ func (js *jetStream) monitorCluster() {
}
}
aq.recycle(&ces)

case isLeader = <-lch:
// For meta layer synchronize everyone to our state on becoming leader.
if isLeader {
Expand All @@ -1160,6 +1262,10 @@ func (js *jetStream) monitorCluster() {
if n.Leader() {
js.checkClusterSize()
}
case <-ht.C:
// Do this in a separate go routine.
go checkHealth()

case <-lt.C:
s.Debugf("Checking JetStream cluster state")
// If we have a current leader or had one in the past we can cancel this here since the metaleader
Expand Down Expand Up @@ -2461,6 +2567,12 @@ func (mset *stream) resetClusteredState(err error) bool {
node.StepDown()
}

// If we detect we are shutting down just return.
if js != nil && js.isShuttingDown() {
s.Debugf("Will not reset stream, jetstream shutting down")
return false
}

// Server
if js.limitsExceeded(stype) {
s.Debugf("Will not reset stream, server resources exceeded")
Expand Down Expand Up @@ -3097,19 +3209,23 @@ func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment)
node.StepDown(nsa.Group.Preferred)
}
node.ProposeRemovePeer(ourID)
// shut down monitor by shutting down raft
// shutdown monitor by shutting down raft.
node.Delete()
}

var isShuttingDown bool
// Make sure this node is no longer attached to our stream assignment.
if js, _ := s.getJetStreamCluster(); js != nil {
js.mu.Lock()
nsa.Group.node = nil
isShuttingDown = js.shuttingDown
js.mu.Unlock()
}

// wait for monitor to be shut down
mset.monitorWg.Wait()
if !isShuttingDown {
// wait for monitor to be shutdown.
mset.monitorWg.Wait()
}
mset.stop(true, false)
}

Expand Down Expand Up @@ -3913,6 +4029,13 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state

if rg.node != nil {
rg.node.Delete()
// Clear the node here.
rg.node = nil
}

// If we did seem to create a consumer make sure to stop it.
if o != nil {
o.stop()
}

var result *consumerAssignmentResult
Expand Down
Loading