diff --git a/server/accounts.go b/server/accounts.go index e7808551c3..c26c09a108 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -1702,21 +1702,19 @@ func (a *Account) addReverseRespMapEntry(acc *Account, reply, from string) { // This will be called from checkForReverseEntry when the reply arg is a wildcard subject. // This will usually be called in a go routine since we need to walk all the entries. func (a *Account) checkForReverseEntries(reply string, checkInterest, recursed bool) { - a.mu.RLock() - if len(a.imports.rrMap) == 0 { - a.mu.RUnlock() + if subjectIsLiteral(reply) { + a._checkForReverseEntry(reply, nil, checkInterest, recursed) return } - if subjectIsLiteral(reply) { + a.mu.RLock() + if len(a.imports.rrMap) == 0 { a.mu.RUnlock() - a._checkForReverseEntry(reply, nil, checkInterest, recursed) return } var _rs [64]string rs := _rs[:0] - if n := len(a.imports.rrMap); n > cap(rs) { rs = make([]string, 0, n) } @@ -1726,9 +1724,14 @@ func (a *Account) checkForReverseEntries(reply string, checkInterest, recursed b } a.mu.RUnlock() - // subjectIsSubsetMatch is heavy so make sure we do this without the lock. + tsa := [32]string{} + tts := tokenizeSubjectIntoSlice(tsa[:0], reply) + + rsa := [32]string{} for _, r := range rs { - if subjectIsSubsetMatch(r, reply) { + rts := tokenizeSubjectIntoSlice(rsa[:0], r) + // isSubsetMatchTokenized is heavy so make sure we do this without the lock. + if isSubsetMatchTokenized(rts, tts) { a._checkForReverseEntry(r, nil, checkInterest, recursed) } } diff --git a/server/client.go b/server/client.go index 7a521047f1..e3364c8a80 100644 --- a/server/client.go +++ b/server/client.go @@ -3014,6 +3014,10 @@ func queueMatches(queue string, qsubs [][]*subscription) bool { // Low level unsubscribe for a given client. func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool) { + if s := c.srv; s != nil && s.isShuttingDown() { + return + } + c.mu.Lock() if !force && sub.max > 0 && sub.nm < sub.max { c.Debugf( @@ -3067,7 +3071,8 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool } // Now check to see if this was part of a respMap entry for service imports. - if acc != nil { + // We can skip subscriptions on reserved replies. + if acc != nil && !isReservedReply(sub.subject) { acc.checkForReverseEntry(string(sub.subject), nil, true) } } @@ -5077,6 +5082,23 @@ func (c *client) closeConnection(reason ClosedState) { c.out.stc = nil } + // If we have remote latency tracking running shut that down. + if c.rrTracking != nil { + c.rrTracking.ptmr.Stop() + c.rrTracking = nil + } + + // If we are shutting down, no need to do all the accounting on subs, etc. + if reason == ServerShutdown { + s := c.srv + c.mu.Unlock() + if s != nil { + // Unregister + s.removeClient(c) + } + return + } + var ( kind = c.kind srv = c.srv @@ -5101,12 +5123,6 @@ func (c *client) closeConnection(reason ClosedState) { spoke = c.isSpokeLeafNode() } - // If we have remote latency tracking running shut that down. - if c.rrTracking != nil { - c.rrTracking.ptmr.Stop() - c.rrTracking = nil - } - c.mu.Unlock() // Remove client's or leaf node or jetstream subscriptions. diff --git a/server/gateway.go b/server/gateway.go index b7307e47ec..5c3b86f336 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -473,6 +473,10 @@ func (s *Server) startGateways() { // This starts the gateway accept loop in a go routine, unless it // is detected that the server has already been shutdown. func (s *Server) startGatewayAcceptLoop() { + if s.isShuttingDown() { + return + } + // Snapshot server options. opts := s.getOpts() @@ -482,10 +486,6 @@ func (s *Server) startGatewayAcceptLoop() { } s.mu.Lock() - if s.shutdown { - s.mu.Unlock() - return - } hp := net.JoinHostPort(opts.Gateway.Host, strconv.Itoa(port)) l, e := natsListen("tcp", hp) s.gatewayListenerErr = e @@ -1575,7 +1575,7 @@ func (s *Server) addGatewayURL(urlStr string) bool { // Returns true if the URL has been removed, false otherwise. // Server lock held on entry func (s *Server) removeGatewayURL(urlStr string) bool { - if s.shutdown { + if s.isShuttingDown() { return false } s.gateway.Lock() diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 4c2a7a1ddf..6a94329a3c 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -198,11 +198,15 @@ func (s *Server) trackedJetStreamServers() (js, total int) { } func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) { + if s.isShuttingDown() { + return nil, nil + } + s.mu.RLock() - shutdown, js := s.shutdown, s.js + js := s.js s.mu.RUnlock() - if shutdown || js == nil { + if js == nil { return nil, nil } diff --git a/server/leafnode.go b/server/leafnode.go index 47bb8087a3..d5d41c5336 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -681,11 +681,11 @@ func (s *Server) startLeafNodeAcceptLoop() { port = 0 } - s.mu.Lock() - if s.shutdown { - s.mu.Unlock() + if s.isShuttingDown() { return } + + s.mu.Lock() hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port)) l, e := natsListen("tcp", hp) s.leafNodeListenerErr = e @@ -878,7 +878,7 @@ func (s *Server) addLeafNodeURL(urlStr string) bool { func (s *Server) removeLeafNodeURL(urlStr string) bool { // Don't need to do this if we are removing the route connection because // we are shuting down... - if s.shutdown { + if s.isShuttingDown() { return false } if s.leafURLsMap.removeUrl(urlStr) { diff --git a/server/mqtt.go b/server/mqtt.go index b285af35b5..e333333973 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -425,6 +425,10 @@ type mqttParsedPublishNATSHeader struct { } func (s *Server) startMQTT() { + if s.isShuttingDown() { + return + } + sopts := s.getOpts() o := &sopts.MQTT @@ -437,10 +441,6 @@ func (s *Server) startMQTT() { } hp := net.JoinHostPort(o.Host, strconv.Itoa(port)) s.mu.Lock() - if s.shutdown { - s.mu.Unlock() - return - } s.mqtt.sessmgr.sessions = make(map[string]*mqttAccountSessionManager) hl, err = net.Listen("tcp", hp) s.mqtt.listenerErr = err @@ -500,7 +500,7 @@ func (s *Server) createMQTTClient(conn net.Conn, ws *websocket) *client { s.mu.Lock() if !s.isRunning() || s.ldm { - if s.shutdown { + if s.isShuttingDown() { conn.Close() } s.mu.Unlock() diff --git a/server/route.go b/server/route.go index 45732fda1c..2954b83458 100644 --- a/server/route.go +++ b/server/route.go @@ -977,7 +977,7 @@ func (s *Server) updateRemoteRoutePerms(c *client, info *Info) { func (s *Server) sendAsyncInfoToClients(regCli, wsCli bool) { // If there are no clients supporting async INFO protocols, we are done. // Also don't send if we are shutting down... - if s.cproto == 0 || s.shutdown { + if s.cproto == 0 || s.isShuttingDown() { return } info := s.copyInfo() @@ -2302,6 +2302,10 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del // is detected that the server has already been shutdown. // It will also start soliciting explicit routes. func (s *Server) startRouteAcceptLoop() { + if s.isShuttingDown() { + return + } + // Snapshot server options. opts := s.getOpts() @@ -2316,10 +2320,6 @@ func (s *Server) startRouteAcceptLoop() { clusterName := s.ClusterName() s.mu.Lock() - if s.shutdown { - s.mu.Unlock() - return - } s.Noticef("Cluster name is %s", clusterName) if s.isClusterNameDynamic() { s.Warnf("Cluster name was dynamically generated, consider setting one") diff --git a/server/routes_test.go b/server/routes_test.go index 8385a6adf4..7397c8e08a 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -2777,9 +2777,7 @@ func TestRoutePoolAndPerAccountWithServiceLatencyNoDataRace(t *testing.T) { defer nc.Close() // The service listener. - // serviceTime := 25 * time.Millisecond natsSub(t, nc, "req.echo", func(msg *nats.Msg) { - // time.Sleep(serviceTime) msg.Respond(msg.Data) }) diff --git a/server/server.go b/server/server.go index 8a34096199..5db5ca49c0 100644 --- a/server/server.go +++ b/server/server.go @@ -132,7 +132,7 @@ type Server struct { optsMu sync.RWMutex opts *Options running atomic.Bool - shutdown bool + shutdown atomic.Bool listener net.Listener listenerErr error gacc *Account @@ -2350,6 +2350,10 @@ func (s *Server) Start() { s.startOCSPResponseCache() } +func (s *Server) isShuttingDown() bool { + return s.shutdown.Load() +} + // Shutdown will shutdown the server instance by kicking out the AcceptLoop // and closing all associated clients. func (s *Server) Shutdown() { @@ -2369,19 +2373,19 @@ func (s *Server) Shutdown() { // eventing items associated with accounts. s.shutdownEventing() - s.mu.Lock() // Prevent issues with multiple calls. - if s.shutdown { - s.mu.Unlock() + if s.isShuttingDown() { return } + + s.mu.Lock() s.Noticef("Initiating Shutdown...") accRes := s.accResolver opts := s.getOpts() - s.shutdown = true + s.shutdown.Store(true) s.running.Store(false) s.grMu.Lock() s.grRunning = false @@ -2545,16 +2549,15 @@ func (s *Server) AcceptLoop(clr chan struct{}) { } }() + if s.isShuttingDown() { + return + } + // Snapshot server options. opts := s.getOpts() // Setup state that can enable shutdown s.mu.Lock() - if s.shutdown { - s.mu.Unlock() - return - } - hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port)) l, e := natsListen("tcp", hp) s.listenerErr = e @@ -2677,6 +2680,10 @@ func (s *Server) setInfoHostPort() error { // StartProfiler is called to enable dynamic profiling. func (s *Server) StartProfiler() { + if s.isShuttingDown() { + return + } + // Snapshot server options. opts := s.getOpts() @@ -2688,12 +2695,7 @@ func (s *Server) StartProfiler() { } s.mu.Lock() - if s.shutdown { - s.mu.Unlock() - return - } hp := net.JoinHostPort(opts.Host, strconv.Itoa(port)) - l, err := net.Listen("tcp", hp) if err != nil { @@ -2717,10 +2719,7 @@ func (s *Server) StartProfiler() { // if this errors out, it's probably because the server is being shutdown err := srv.Serve(l) if err != nil { - s.mu.Lock() - shutdown := s.shutdown - s.mu.Unlock() - if !shutdown { + if !s.isShuttingDown() { s.Fatalf("error starting profiler: %s", err) } } @@ -2827,6 +2826,10 @@ func (s *Server) getMonitoringTLSConfig(_ *tls.ClientHelloInfo) (*tls.Config, er // Start the monitoring server func (s *Server) startMonitoring(secure bool) error { + if s.isShuttingDown() { + return nil + } + // Snapshot server options. opts := s.getOpts() @@ -2908,11 +2911,6 @@ func (s *Server) startMonitoring(secure bool) error { ErrorLog: log.New(&captureHTTPServerLog{s, "monitoring: "}, _EMPTY_, 0), } s.mu.Lock() - if s.shutdown { - httpListener.Close() - s.mu.Unlock() - return nil - } s.http = httpListener s.httpHandler = mux s.monitoringServer = srv @@ -2920,10 +2918,7 @@ func (s *Server) startMonitoring(secure bool) error { go func() { if err := srv.Serve(httpListener); err != nil { - s.mu.Lock() - shutdown := s.shutdown - s.mu.Unlock() - if !shutdown { + if !s.isShuttingDown() { s.Fatalf("Error starting monitor on %q: %v", hp, err) } } @@ -3065,7 +3060,7 @@ func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client { // clients would branch here (since server is not running). However, // when a server was really running and has been shutdown, we must // close this connection. - if s.shutdown { + if s.isShuttingDown() { conn.Close() } s.mu.Unlock() @@ -3972,7 +3967,7 @@ func (s *Server) isLameDuckMode() bool { func (s *Server) lameDuckMode() { s.mu.Lock() // Check if there is actually anything to do - if s.shutdown || s.ldm || s.listener == nil { + if s.isShuttingDown() || s.ldm || s.listener == nil { s.mu.Unlock() return } @@ -4023,7 +4018,7 @@ func (s *Server) lameDuckMode() { s.mu.Lock() // Need to recheck few things - if s.shutdown || len(s.clients) == 0 { + if s.isShuttingDown() || len(s.clients) == 0 { s.mu.Unlock() // If there is no client, we need to call Shutdown() to complete // the LDMode. If server has been shutdown while lock was released, diff --git a/server/server_test.go b/server/server_test.go index 4030aafab4..3defd412b4 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -773,10 +773,7 @@ func TestLameDuckMode(t *testing.T) { // Check that if there is no client, server is shutdown srvA.lameDuckMode() - srvA.mu.Lock() - shutdown := srvA.shutdown - srvA.mu.Unlock() - if !shutdown { + if !srvA.isShuttingDown() { t.Fatalf("Server should have shutdown") } diff --git a/server/websocket.go b/server/websocket.go index 4eb93977b8..1afa30847e 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1050,6 +1050,10 @@ func (s *Server) wsConfigAuth(opts *WebsocketOpts) { } func (s *Server) startWebsocketServer() { + if s.isShuttingDown() { + return + } + sopts := s.getOpts() o := &sopts.Websocket @@ -1071,10 +1075,6 @@ func (s *Server) startWebsocketServer() { // avoid the possibility of it being "intercepted". s.mu.Lock() - if s.shutdown { - s.mu.Unlock() - return - } // Do not check o.NoTLS here. If a TLS configuration is available, use it, // regardless of NoTLS. If we don't have a TLS config, it means that the // user has configured NoTLS because otherwise the server would have failed @@ -1221,7 +1221,7 @@ func (s *Server) createWSClient(conn net.Conn, ws *websocket) *client { s.mu.Lock() if !s.isRunning() || s.ldm { - if s.shutdown { + if s.isShuttingDown() { conn.Close() } s.mu.Unlock()