From dba03dbc2f30734a282b618e6e917a864a276c26 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 30 Sep 2023 14:52:15 -0700 Subject: [PATCH] Optimizations to reduce contention for high connections in a JetStream enabled account with high API usage. Several strategies which are listed below. 1. Checking a RaftNode to see if it is the leader now uses atomics. 2. Checking if we are the JetStream meta leader from the server now uses an atomic. 3. Accessing the JetStream context no longer requires a server lock, uses atomic.Pointer. 4. Filestore syncBlocks would hold msgBlock locks during sync, now does not. Signed-off-by: Derek Collison --- server/consumer.go | 2 +- server/events.go | 2 +- server/filestore.go | 29 ++++++------ server/gateway.go | 2 +- server/jetstream.go | 76 ++++++++++-------------------- server/jetstream_api.go | 26 ++++++----- server/jetstream_cluster.go | 93 ++++++++++++++++++------------------- server/monitor.go | 31 +++++++------ server/raft.go | 13 +++--- server/reload.go | 2 +- server/server.go | 3 +- 11 files changed, 127 insertions(+), 152 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 54bd3fd032..dae9cec761 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1556,7 +1556,7 @@ func (o *consumer) deleteNotActive() { } } - s, js := o.mset.srv, o.mset.srv.js + s, js := o.mset.srv, o.srv.js.Load() acc, stream, name, isDirect := o.acc.Name, o.stream, o.name, o.cfg.Direct o.mu.Unlock() diff --git a/server/events.go b/server/events.go index bb249d4362..0f761a47c5 100644 --- a/server/events.go +++ b/server/events.go @@ -875,7 +875,7 @@ func (s *Server) sendStatsz(subj string) { m.Stats.ActiveServers = len(s.sys.servers) + 1 // JetStream - if js := s.js; js != nil { + if js := s.js.Load(); js != nil { jStat := &JetStreamVarz{} s.mu.RUnlock() js.mu.RLock() diff --git a/server/filestore.go b/server/filestore.go index 2e4a537878..909629eb6d 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4729,29 +4729,30 @@ func (fs *fileStore) syncBlocks() { mb.mu.Unlock() continue } + // See if we can close FDs due to being idle. + if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle { + mb.dirtyCloseWithRemove(false) + } + // Check if we need to sync. We will not hold lock during actual sync. + var fn string if mb.needSync { // Flush anything that may be pending. if mb.pendingWriteSizeLocked() > 0 { mb.flushPendingMsgsLocked() } - if mb.mfd != nil { - mb.mfd.Sync() - } else { - fd, err := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms) - if err != nil { - mb.mu.Unlock() - continue - } + fn = mb.mfn + mb.needSync = false + } + mb.mu.Unlock() + + // Check if we need to sync. + // This is done not holding any locks. + if fn != _EMPTY_ { + if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil { fd.Sync() fd.Close() } - mb.needSync = false } - // See if we can close FDs due to being idle. - if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle { - mb.dirtyCloseWithRemove(false) - } - mb.mu.Unlock() } fs.mu.Lock() diff --git a/server/gateway.go b/server/gateway.go index 5c3b86f336..715a2c1dc5 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -1128,8 +1128,8 @@ func (c *client) processGatewayInfo(info *Info) { // connect events to switch those accounts into interest only mode. s.mu.Lock() s.ensureGWsInterestOnlyForLeafNodes() - js := s.js s.mu.Unlock() + js := s.js.Load() // If running in some tests, maintain the original behavior. if gwDoNotForceInterestOnlyMode && js != nil { diff --git a/server/jetstream.go b/server/jetstream.go index 04b7430ac6..756e75a562 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -117,9 +117,11 @@ type jetStream struct { // Some bools regarding general state. metaRecovering bool standAlone bool - disabled bool oos bool shuttingDown bool + + // Atomic versions + disabled atomic.Bool } type remoteUsage struct { @@ -372,9 +374,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { } s.gcbMu.Unlock() - s.mu.Lock() - s.js = js - s.mu.Unlock() + s.js.Store(js) // FIXME(dlc) - Allow memory only operation? if stat, err := os.Stat(cfg.StoreDir); os.IsNotExist(err) { @@ -530,10 +530,7 @@ func (s *Server) setupJetStreamExports() { } func (s *Server) jetStreamOOSPending() (wasPending bool) { - s.mu.Lock() - js := s.js - s.mu.Unlock() - if js != nil { + if js := s.getJetStream(); js != nil { js.mu.Lock() wasPending = js.oos js.oos = true @@ -543,13 +540,8 @@ func (s *Server) jetStreamOOSPending() (wasPending bool) { } func (s *Server) setJetStreamDisabled() { - s.mu.Lock() - js := s.js - s.mu.Unlock() - if js != nil { - js.mu.Lock() - js.disabled = true - js.mu.Unlock() + if js := s.getJetStream(); js != nil { + js.disabled.Store(true) } } @@ -738,16 +730,15 @@ func (s *Server) configAllJetStreamAccounts() error { // a non-default system account. s.checkJetStreamExports() - // Snapshot into our own list. Might not be needed. - s.mu.Lock() // Bail if server not enabled. If it was enabled and a reload turns it off // that will be handled elsewhere. - js := s.js + js := s.getJetStream() if js == nil { - s.mu.Unlock() return nil } + // Snapshot into our own list. Might not be needed. + s.mu.RLock() if s.sys != nil { // clustered stream removal will perform this cleanup as well // this is mainly for initial cleanup @@ -764,12 +755,12 @@ func (s *Server) configAllJetStreamAccounts() error { } var jsAccounts []*Account - s.accounts.Range(func(k, v interface{}) bool { + s.accounts.Range(func(k, v any) bool { jsAccounts = append(jsAccounts, v.(*Account)) return true }) accounts := &s.accounts - s.mu.Unlock() + s.mu.RUnlock() // Process any jetstream enabled accounts here. These will be accounts we are // already aware of at startup etc. @@ -809,9 +800,7 @@ func (js *jetStream) isEnabled() bool { if js == nil { return false } - js.mu.RLock() - defer js.mu.RUnlock() - return !js.disabled + return !js.disabled.Load() } // Mark that we will be in standlone mode. @@ -821,9 +810,9 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) { } js.mu.Lock() defer js.mu.Unlock() - js.standAlone = isStandAlone - - if isStandAlone { + if js.standAlone = isStandAlone; js.standAlone { + // Update our server atomic. + js.srv.isMetaLeader.Store(true) js.accountPurge, _ = js.srv.systemSubscribe(JSApiAccountPurge, _EMPTY_, false, nil, js.srv.jsLeaderAccountPurgeRequest) } else if js.accountPurge != nil { js.srv.sysUnsubscribe(js.accountPurge) @@ -832,11 +821,7 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) { // JetStreamEnabled reports if jetstream is enabled for this server. func (s *Server) JetStreamEnabled() bool { - var js *jetStream - s.mu.RLock() - js = s.js - s.mu.RUnlock() - return js.isEnabled() + return s.getJetStream().isEnabled() } // JetStreamEnabledForDomain will report if any servers have JetStream enabled within this domain. @@ -909,10 +894,7 @@ func (js *jetStream) isShuttingDown() bool { // Shutdown jetstream for this server. func (s *Server) shutdownJetStream() { - s.mu.RLock() - js := s.js - s.mu.RUnlock() - + js := s.getJetStream() if js == nil { return } @@ -951,9 +933,7 @@ func (s *Server) shutdownJetStream() { a.removeJetStream() } - s.mu.Lock() - s.js = nil - s.mu.Unlock() + s.js.Store(nil) js.mu.Lock() js.accounts = nil @@ -994,23 +974,20 @@ func (s *Server) shutdownJetStream() { // created a dynamic configuration. A copy is returned. func (s *Server) JetStreamConfig() *JetStreamConfig { var c *JetStreamConfig - s.mu.Lock() - if s.js != nil { - copy := s.js.config + if js := s.getJetStream(); js != nil { + copy := js.config c = &(copy) } - s.mu.Unlock() return c } // StoreDir returns the current JetStream directory. func (s *Server) StoreDir() string { - s.mu.Lock() - defer s.mu.Unlock() - if s.js == nil { + js := s.getJetStream() + if js == nil { return _EMPTY_ } - return s.js.config.StoreDir + return js.config.StoreDir } // JetStreamNumAccounts returns the number of enabled accounts this server is tracking. @@ -1036,10 +1013,7 @@ func (s *Server) JetStreamReservedResources() (int64, int64, error) { } func (s *Server) getJetStream() *jetStream { - s.mu.RLock() - js := s.js - s.mu.RUnlock() - return js + return s.js.Load() } func (a *Account) assignJetStreamLimits(limits map[string]JetStreamAccountLimits) { diff --git a/server/jetstream_api.go b/server/jetstream_api.go index dfade67dc1..d930298baa 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2314,14 +2314,15 @@ func (s *Server) peerSetToNames(ps []string) []string { // looks up the peer id for a given server name. Cluster and domain name are optional filter criteria func (s *Server) nameToPeer(js *jetStream, serverName, clusterName, domainName string) string { js.mu.RLock() - cc := js.cluster defer js.mu.RUnlock() - for _, p := range cc.meta.Peers() { - si, ok := s.nodeToInfo.Load(p.ID) - if ok && si.(nodeInfo).name == serverName { - if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster { - if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain { - return p.ID + if cc := js.cluster; cc != nil { + for _, p := range cc.meta.Peers() { + si, ok := s.nodeToInfo.Load(p.ID) + if ok && si.(nodeInfo).name == serverName { + if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster { + if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain { + return p.ID + } } } } @@ -4217,11 +4218,11 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, } // We have a consumer assignment. js.mu.RLock() - - var node RaftNode - var leaderNotPartOfGroup bool - var isMember bool - + var ( + node RaftNode + leaderNotPartOfGroup bool + isMember bool + ) rg := ca.Group if rg != nil && rg.isMember(ourID) { isMember = true @@ -4233,6 +4234,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, } } js.mu.RUnlock() + // Check if we should ignore all together. if node == nil { // We have been assigned but have not created a node yet. If we are a member return diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index b5c02bd5fb..92e70501d8 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -202,10 +202,7 @@ func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) { return nil, nil } - s.mu.RLock() - js := s.js - s.mu.RUnlock() - + js := s.getJetStream() if js == nil { return nil, nil } @@ -223,13 +220,7 @@ func (s *Server) JetStreamIsClustered() bool { } func (s *Server) JetStreamIsLeader() bool { - js := s.getJetStream() - if js == nil { - return false - } - js.mu.RLock() - defer js.mu.RUnlock() - return js.cluster.isLeader() + return s.isMetaLeader.Load() } func (s *Server) JetStreamIsCurrent() bool { @@ -237,9 +228,20 @@ func (s *Server) JetStreamIsCurrent() bool { if js == nil { return false } + // Grab what we need and release js lock. js.mu.RLock() - defer js.mu.RUnlock() - return js.cluster.isCurrent() + var meta RaftNode + cc := js.cluster + if cc != nil { + meta = cc.meta + } + js.mu.RUnlock() + + if cc == nil { + // Non-clustered mode + return true + } + return meta.Current() } func (s *Server) JetStreamSnapshotMeta() error { @@ -385,19 +387,6 @@ func (cc *jetStreamCluster) isLeader() bool { return cc.meta != nil && cc.meta.Leader() } -// isCurrent will determine if this node is a leader or an up to date follower. -// Read lock should be held. -func (cc *jetStreamCluster) isCurrent() bool { - if cc == nil { - // Non-clustered mode - return true - } - if cc.meta == nil { - return false - } - return cc.meta.Current() -} - // isStreamCurrent will determine if the stream is up to date. // For R1 it will make sure the stream is present on this server. // Read lock should be held. @@ -647,9 +636,8 @@ func (a *Account) getJetStreamFromAccount() (*Server, *jetStream, *jsAccount) { if js == nil { return nil, nil, nil } - js.mu.RLock() + // Lock not needed, set on creation. s := js.srv - js.mu.RUnlock() return s, js, jsa } @@ -860,10 +848,8 @@ func (js *jetStream) getMetaGroup() RaftNode { } func (js *jetStream) server() *Server { - js.mu.RLock() - s := js.srv - js.mu.RUnlock() - return s + // Lock not needed, only set once on creation. + return js.srv } // Will respond if we do not think we have a metacontroller leader. @@ -1241,6 +1227,7 @@ func (js *jetStream) monitorCluster() { // Make sure to stop the raft group on exit to prevent accidental memory bloat. defer n.Stop() + defer s.isMetaLeader.Store(false) const compactInterval = time.Minute t := time.NewTicker(compactInterval) @@ -1728,6 +1715,11 @@ func (js *jetStream) processAddPeer(peer string) { } func (js *jetStream) processRemovePeer(peer string) { + // We may be already disabled. + if js == nil || js.disabled.Load() { + return + } + js.mu.Lock() s, cc := js.srv, js.cluster if cc == nil || cc.meta == nil { @@ -1737,14 +1729,8 @@ func (js *jetStream) processRemovePeer(peer string) { isLeader := cc.isLeader() // All nodes will check if this is them. isUs := cc.meta.ID() == peer - disabled := js.disabled js.mu.Unlock() - // We may be already disabled. - if disabled { - return - } - if isUs { s.Errorf("JetStream being DISABLED, our server was removed from the cluster") adv := &JSServerRemovedAdvisory{ @@ -5282,21 +5268,31 @@ func (js *jetStream) stopUpdatesSub() { } func (js *jetStream) processLeaderChange(isLeader bool) { + if js == nil { + return + } + s := js.srv + if s == nil { + return + } + // Update our server atomic. + s.isMetaLeader.Store(isLeader) + if isLeader { - js.srv.Noticef("Self is new JetStream cluster metadata leader") + s.Noticef("Self is new JetStream cluster metadata leader") } else { var node string if meta := js.getMetaGroup(); meta != nil { node = meta.GroupLeader() } if node == _EMPTY_ { - js.srv.Noticef("JetStream cluster no metadata leader") + s.Noticef("JetStream cluster no metadata leader") } else if srv := js.srv.serverNameForNode(node); srv == _EMPTY_ { - js.srv.Noticef("JetStream cluster new remote metadata leader") + s.Noticef("JetStream cluster new remote metadata leader") } else if clst := js.srv.clusterNameForNode(node); clst == _EMPTY_ { - js.srv.Noticef("JetStream cluster new metadata leader: %s", srv) + s.Noticef("JetStream cluster new metadata leader: %s", srv) } else { - js.srv.Noticef("JetStream cluster new metadata leader: %s/%s", srv, clst) + s.Noticef("JetStream cluster new metadata leader: %s/%s", srv, clst) } } @@ -5317,7 +5313,7 @@ func (js *jetStream) processLeaderChange(isLeader bool) { for acc, asa := range cc.streams { for _, sa := range asa { if sa.Sync == _EMPTY_ { - js.srv.Warnf("Stream assigment corrupt for stream '%s > %s'", acc, sa.Config.Name) + s.Warnf("Stream assigment corrupt for stream '%s > %s'", acc, sa.Config.Name) nsa := &streamAssignment{Group: sa.Group, Config: sa.Config, Subject: sa.Subject, Reply: sa.Reply, Client: sa.Client} nsa.Sync = syncSubjForStream() cc.meta.Propose(encodeUpdateStreamAssignment(nsa)) @@ -8141,19 +8137,18 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { s := js.srv if rg == nil || rg.node == nil { return &ClusterInfo{ - Name: s.ClusterName(), + Name: s.cachedClusterName(), Leader: s.Name(), } } - n := rg.node + n := rg.node ci := &ClusterInfo{ - Name: s.ClusterName(), + Name: s.cachedClusterName(), Leader: s.serverNameForNode(n.GroupLeader()), } now := time.Now() - id, peers := n.ID(), n.Peers() // If we are leaderless, do not suppress putting us in the peer list. @@ -8274,7 +8269,7 @@ func (mset *stream) handleClusterStreamInfoRequest(_ *subscription, c *client, _ func (mset *stream) processClusterStreamInfoRequest(reply string) { mset.mu.RLock() - sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg + sysc, js, sa, config := mset.sysc, mset.srv.js.Load(), mset.sa, mset.cfg isLeader := mset.isLeader() mset.mu.RUnlock() diff --git a/server/monitor.go b/server/monitor.go index 7a01cfe21b..66f5e81a36 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1465,14 +1465,14 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) { // We want to do that outside of the lock. pse.ProcUsage(&pcpu, &rss, &vss) - s.mu.Lock() - js := s.js + s.mu.RLock() // We need to create a new instance of Varz (with no reference // whatsoever to anything stored in the server) since the user // has access to the returned value. v := s.createVarz(pcpu, rss) - s.mu.Unlock() - if js != nil { + s.mu.RUnlock() + + if js := s.getJetStream(); js != nil { s.updateJszVarz(js, &v.JetStream, true) } @@ -1798,7 +1798,6 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { // Use server lock to create/update the server's varz object. s.mu.Lock() var created bool - js := s.js s.httpReqStats[VarzPath]++ if s.varz == nil { s.varz = s.createVarz(pcpu, rss) @@ -1809,19 +1808,20 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { s.mu.Unlock() // Since locking is jetStream -> Server, need to update jetstream // varz outside of server lock. - if js != nil { + + if js := s.getJetStream(); js != nil { var v JetStreamVarz // Work on stack variable s.updateJszVarz(js, &v, created) // Now update server's varz - s.mu.Lock() + s.mu.RLock() sv := &s.varz.JetStream if created { sv.Config = v.Config } sv.Stats = v.Stats sv.Meta = v.Meta - s.mu.Unlock() + s.mu.RUnlock() } // Do the marshaling outside of server lock, but under varzMu lock. @@ -2835,10 +2835,10 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, } jsa.mu.RUnlock() - if optStreams { + if js := s.getJetStream(); js != nil && optStreams { for _, stream := range streams { rgroup := stream.raftGroup() - ci := s.js.clusterInfo(rgroup) + ci := js.clusterInfo(rgroup) var cfg *StreamConfig if optCfg { c := stream.config() @@ -2884,7 +2884,8 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, } func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) { - if s.js == nil { + js := s.getJetStream() + if js == nil { return nil, fmt.Errorf("jetstream not enabled") } acc := opts.Account @@ -2892,9 +2893,9 @@ func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) { if !ok { return nil, fmt.Errorf("account %q not found", acc) } - s.js.mu.RLock() - jsa, ok := s.js.accounts[account.(*Account).Name] - s.js.mu.RUnlock() + js.mu.RLock() + jsa, ok := js.accounts[account.(*Account).Name] + js.mu.RUnlock() if !ok { return nil, fmt.Errorf("account %q not jetstream enabled", acc) } @@ -2916,7 +2917,7 @@ func (s *Server) raftNodeToClusterInfo(node RaftNode) *ClusterInfo { Peers: peerList, node: node, } - return s.js.clusterInfo(group) + return s.getJetStream().clusterInfo(group) } // Jsz returns a Jsz structure containing information about JetStream. diff --git a/server/raft.go b/server/raft.go index f7a0535372..5bcc2a1a19 100644 --- a/server/raft.go +++ b/server/raft.go @@ -134,6 +134,7 @@ type raft struct { track bool werr error state RaftState + isLeader atomic.Bool hh hash.Hash64 snapfile string csz int @@ -1158,14 +1159,12 @@ func (n *raft) loadLastSnapshot() (*snapshot, error) { } // Leader returns if we are the leader for our group. +// We use an atomic here now vs acquiring the read lock. func (n *raft) Leader() bool { if n == nil { return false } - n.RLock() - isLeader := n.state == Leader - n.RUnlock() - return isLeader + return n.isLeader.Load() } func (n *raft) isCatchingUp() bool { @@ -1688,8 +1687,7 @@ func (n *raft) run() { // We want to wait for some routing to be enabled, so we will wait for // at least a route, leaf or gateway connection to be established before // starting the run loop. - gw := s.gateway - for { + for gw := s.gateway; ; { s.mu.Lock() ready := s.numRemotes()+len(s.leafs) > 0 if !ready && gw.enabled { @@ -3831,6 +3829,9 @@ func (n *raft) quorumNeeded() int { // Lock should be held. func (n *raft) updateLeadChange(isLeader bool) { + // Update our atomic about being the leader. + n.isLeader.Store(isLeader) + // We don't care about values that have not been consumed (transitory states), // so we dequeue any state that is pending and push the new one. for { diff --git a/server/reload.go b/server/reload.go index 516b52d3e8..239881715e 100644 --- a/server/reload.go +++ b/server/reload.go @@ -1873,7 +1873,7 @@ func (s *Server) reloadAuthorization() { awcsti, _ = s.configureAccounts(true) s.configureAuthorization() // Double check any JetStream configs. - checkJetStream = s.js != nil + checkJetStream = s.getJetStream() != nil } else if opts.AccountResolver != nil { s.configureResolver() if _, ok := s.accResolver.(*MemAccResolver); ok { diff --git a/server/server.go b/server/server.go index 7ea3000a5b..d1d0d109d4 100644 --- a/server/server.go +++ b/server/server.go @@ -137,7 +137,8 @@ type Server struct { listenerErr error gacc *Account sys *internal - js *jetStream + js atomic.Pointer[jetStream] + isMetaLeader atomic.Bool accounts sync.Map tmpAccounts sync.Map // Temporarily stores accounts that are being built activeAccounts int32