diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 924cf4f2d2..956648c4ef 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -436,63 +436,39 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { // isStreamHealthy will determine if the stream is up to date or very close. // For R1 it will make sure the stream is present on this server. -// Read lock should be held. -func (js *jetStream) isStreamHealthy(account, stream string) bool { +func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { + js.mu.RLock() + defer js.mu.RUnlock() + cc := js.cluster if cc == nil { // Non-clustered mode return true } - as := cc.streams[account] - if as == nil { - return false - } - sa := as[stream] - if sa == nil { - return false - } rg := sa.Group if rg == nil { return false } - - if rg.node == nil || rg.node.Healthy() { + if rg := sa.Group; rg != nil && (rg.node == nil || rg.node.Healthy()) { // Check if we are processing a snapshot and are catching up. - acc, err := cc.s.LookupAccount(account) - if err != nil { - return false - } - mset, err := acc.lookupStream(stream) - if err != nil { - return false - } - if mset.isCatchingUp() { - return false + if mset, err := acc.lookupStream(sa.Config.Name); err == nil && !mset.isCatchingUp() { + return true } - // Success. - return true } - return false } // isConsumerCurrent will determine if the consumer is up to date. // For R1 it will make sure the consunmer is present on this server. -// Read lock should be held. -func (js *jetStream) isConsumerCurrent(account, stream, consumer string) bool { +func (js *jetStream) isConsumerCurrent(mset *stream, consumer string, ca *consumerAssignment) bool { + js.mu.RLock() + defer js.mu.RUnlock() + cc := js.cluster if cc == nil { // Non-clustered mode return true } - acc, err := cc.s.LookupAccount(account) - if err != nil { - return false - } - mset, err := acc.lookupStream(stream) - if err != nil { - return false - } o := mset.lookupConsumer(consumer) if o == nil { return false diff --git a/server/monitor.go b/server/monitor.go index 4adbffb8d9..2e72861f1d 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1,4 +1,4 @@ -// Copyright 2013-2022 The NATS Authors +// Copyright 2013-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -3126,33 +3126,54 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { // If they are assigned to this server check their status. ourID := meta.ID() - // TODO(dlc) - Might be better here to not hold the lock the whole time. + // Copy the meta layer so we do not need to hold the js read lock for an extended period of time. js.mu.RLock() - defer js.mu.RUnlock() - + streams := make(map[string]map[string]*streamAssignment, len(cc.streams)) for acc, asa := range cc.streams { + nasa := make(map[string]*streamAssignment) for stream, sa := range asa { if sa.Group.isMember(ourID) { - // Make sure we can look up - if !js.isStreamHealthy(acc, stream) { - health.Status = na - health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", acc, stream) - return health - } - // Now check consumers. + csa := sa.copyGroup() + csa.consumers = make(map[string]*consumerAssignment) for consumer, ca := range sa.consumers { if ca.Group.isMember(ourID) { - if !js.isConsumerCurrent(acc, stream, consumer) { - health.Status = na - health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer) - return health - } + csa.consumers[consumer] = ca.copyGroup() } } + nasa[stream] = csa } } + streams[acc] = nasa } + js.mu.RUnlock() + + // Use our copy to traverse so we do not need to hold the js lock. + for accName, asa := range streams { + acc, err := s.LookupAccount(accName) + if err != nil && len(asa) > 0 { + health.Status = na + health.Error = fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err) + return health + } + for stream, sa := range asa { + // Make sure we can look up + if !js.isStreamHealthy(acc, sa) { + health.Status = na + health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current", accName, stream) + return health + } + mset, _ := acc.lookupStream(stream) + // Now check consumers. + for consumer, ca := range sa.consumers { + if !js.isConsumerCurrent(mset, consumer, ca) { + health.Status = na + health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current", acc, stream, consumer) + return health + } + } + } + } // Success. return health }