Skip to content

Commit

Permalink
[IMPROVED] Reduce contention for high connections in a JetStream enab…
Browse files Browse the repository at this point in the history
…led account with high API usage. (#4613)

Several strategies are used which are listed below.

1. Checking a RaftNode to see if it is the leader now uses atomics.
2. Checking if we are the JetStream meta leader from the server now uses
an atomic.
3. Accessing the JetStream context no longer requires a server lock,
uses atomic.Pointer.
4. Filestore syncBlocks would hold msgBlock locks during sync, now does
not.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Oct 1, 2023
2 parents 6eee1f7 + dba03db commit 0083928
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 152 deletions.
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1556,7 +1556,7 @@ func (o *consumer) deleteNotActive() {
}
}

s, js := o.mset.srv, o.mset.srv.js
s, js := o.mset.srv, o.srv.js.Load()
acc, stream, name, isDirect := o.acc.Name, o.stream, o.name, o.cfg.Direct
o.mu.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ func (s *Server) sendStatsz(subj string) {
m.Stats.ActiveServers = len(s.sys.servers) + 1

// JetStream
if js := s.js; js != nil {
if js := s.js.Load(); js != nil {
jStat := &JetStreamVarz{}
s.mu.RUnlock()
js.mu.RLock()
Expand Down
29 changes: 15 additions & 14 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4729,29 +4729,30 @@ func (fs *fileStore) syncBlocks() {
mb.mu.Unlock()
continue
}
// See if we can close FDs due to being idle.
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
// Check if we need to sync. We will not hold lock during actual sync.
var fn string
if mb.needSync {
// Flush anything that may be pending.
if mb.pendingWriteSizeLocked() > 0 {
mb.flushPendingMsgsLocked()
}
if mb.mfd != nil {
mb.mfd.Sync()
} else {
fd, err := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms)
if err != nil {
mb.mu.Unlock()
continue
}
fn = mb.mfn
mb.needSync = false
}
mb.mu.Unlock()

// Check if we need to sync.
// This is done not holding any locks.
if fn != _EMPTY_ {
if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil {
fd.Sync()
fd.Close()
}
mb.needSync = false
}
// See if we can close FDs due to being idle.
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
mb.mu.Unlock()
}

fs.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,8 +1128,8 @@ func (c *client) processGatewayInfo(info *Info) {
// connect events to switch those accounts into interest only mode.
s.mu.Lock()
s.ensureGWsInterestOnlyForLeafNodes()
js := s.js
s.mu.Unlock()
js := s.js.Load()

// If running in some tests, maintain the original behavior.
if gwDoNotForceInterestOnlyMode && js != nil {
Expand Down
76 changes: 25 additions & 51 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,11 @@ type jetStream struct {
// Some bools regarding general state.
metaRecovering bool
standAlone bool
disabled bool
oos bool
shuttingDown bool

// Atomic versions
disabled atomic.Bool
}

type remoteUsage struct {
Expand Down Expand Up @@ -372,9 +374,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
}
s.gcbMu.Unlock()

s.mu.Lock()
s.js = js
s.mu.Unlock()
s.js.Store(js)

// FIXME(dlc) - Allow memory only operation?
if stat, err := os.Stat(cfg.StoreDir); os.IsNotExist(err) {
Expand Down Expand Up @@ -530,10 +530,7 @@ func (s *Server) setupJetStreamExports() {
}

func (s *Server) jetStreamOOSPending() (wasPending bool) {
s.mu.Lock()
js := s.js
s.mu.Unlock()
if js != nil {
if js := s.getJetStream(); js != nil {
js.mu.Lock()
wasPending = js.oos
js.oos = true
Expand All @@ -543,13 +540,8 @@ func (s *Server) jetStreamOOSPending() (wasPending bool) {
}

func (s *Server) setJetStreamDisabled() {
s.mu.Lock()
js := s.js
s.mu.Unlock()
if js != nil {
js.mu.Lock()
js.disabled = true
js.mu.Unlock()
if js := s.getJetStream(); js != nil {
js.disabled.Store(true)
}
}

Expand Down Expand Up @@ -738,16 +730,15 @@ func (s *Server) configAllJetStreamAccounts() error {
// a non-default system account.
s.checkJetStreamExports()

// Snapshot into our own list. Might not be needed.
s.mu.Lock()
// Bail if server not enabled. If it was enabled and a reload turns it off
// that will be handled elsewhere.
js := s.js
js := s.getJetStream()
if js == nil {
s.mu.Unlock()
return nil
}

// Snapshot into our own list. Might not be needed.
s.mu.RLock()
if s.sys != nil {
// clustered stream removal will perform this cleanup as well
// this is mainly for initial cleanup
Expand All @@ -764,12 +755,12 @@ func (s *Server) configAllJetStreamAccounts() error {
}

var jsAccounts []*Account
s.accounts.Range(func(k, v interface{}) bool {
s.accounts.Range(func(k, v any) bool {
jsAccounts = append(jsAccounts, v.(*Account))
return true
})
accounts := &s.accounts
s.mu.Unlock()
s.mu.RUnlock()

// Process any jetstream enabled accounts here. These will be accounts we are
// already aware of at startup etc.
Expand Down Expand Up @@ -809,9 +800,7 @@ func (js *jetStream) isEnabled() bool {
if js == nil {
return false
}
js.mu.RLock()
defer js.mu.RUnlock()
return !js.disabled
return !js.disabled.Load()
}

// Mark that we will be in standlone mode.
Expand All @@ -821,9 +810,9 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) {
}
js.mu.Lock()
defer js.mu.Unlock()
js.standAlone = isStandAlone

if isStandAlone {
if js.standAlone = isStandAlone; js.standAlone {
// Update our server atomic.
js.srv.isMetaLeader.Store(true)
js.accountPurge, _ = js.srv.systemSubscribe(JSApiAccountPurge, _EMPTY_, false, nil, js.srv.jsLeaderAccountPurgeRequest)
} else if js.accountPurge != nil {
js.srv.sysUnsubscribe(js.accountPurge)
Expand All @@ -832,11 +821,7 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) {

// JetStreamEnabled reports if jetstream is enabled for this server.
func (s *Server) JetStreamEnabled() bool {
var js *jetStream
s.mu.RLock()
js = s.js
s.mu.RUnlock()
return js.isEnabled()
return s.getJetStream().isEnabled()
}

// JetStreamEnabledForDomain will report if any servers have JetStream enabled within this domain.
Expand Down Expand Up @@ -909,10 +894,7 @@ func (js *jetStream) isShuttingDown() bool {

// Shutdown jetstream for this server.
func (s *Server) shutdownJetStream() {
s.mu.RLock()
js := s.js
s.mu.RUnlock()

js := s.getJetStream()
if js == nil {
return
}
Expand Down Expand Up @@ -951,9 +933,7 @@ func (s *Server) shutdownJetStream() {
a.removeJetStream()
}

s.mu.Lock()
s.js = nil
s.mu.Unlock()
s.js.Store(nil)

js.mu.Lock()
js.accounts = nil
Expand Down Expand Up @@ -994,23 +974,20 @@ func (s *Server) shutdownJetStream() {
// created a dynamic configuration. A copy is returned.
func (s *Server) JetStreamConfig() *JetStreamConfig {
var c *JetStreamConfig
s.mu.Lock()
if s.js != nil {
copy := s.js.config
if js := s.getJetStream(); js != nil {
copy := js.config
c = &(copy)
}
s.mu.Unlock()
return c
}

// StoreDir returns the current JetStream directory.
func (s *Server) StoreDir() string {
s.mu.Lock()
defer s.mu.Unlock()
if s.js == nil {
js := s.getJetStream()
if js == nil {
return _EMPTY_
}
return s.js.config.StoreDir
return js.config.StoreDir
}

// JetStreamNumAccounts returns the number of enabled accounts this server is tracking.
Expand All @@ -1036,10 +1013,7 @@ func (s *Server) JetStreamReservedResources() (int64, int64, error) {
}

func (s *Server) getJetStream() *jetStream {
s.mu.RLock()
js := s.js
s.mu.RUnlock()
return js
return s.js.Load()
}

func (a *Account) assignJetStreamLimits(limits map[string]JetStreamAccountLimits) {
Expand Down
26 changes: 14 additions & 12 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2314,14 +2314,15 @@ func (s *Server) peerSetToNames(ps []string) []string {
// looks up the peer id for a given server name. Cluster and domain name are optional filter criteria
func (s *Server) nameToPeer(js *jetStream, serverName, clusterName, domainName string) string {
js.mu.RLock()
cc := js.cluster
defer js.mu.RUnlock()
for _, p := range cc.meta.Peers() {
si, ok := s.nodeToInfo.Load(p.ID)
if ok && si.(nodeInfo).name == serverName {
if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster {
if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain {
return p.ID
if cc := js.cluster; cc != nil {
for _, p := range cc.meta.Peers() {
si, ok := s.nodeToInfo.Load(p.ID)
if ok && si.(nodeInfo).name == serverName {
if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster {
if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain {
return p.ID
}
}
}
}
Expand Down Expand Up @@ -4217,11 +4218,11 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
}
// We have a consumer assignment.
js.mu.RLock()

var node RaftNode
var leaderNotPartOfGroup bool
var isMember bool

var (
node RaftNode
leaderNotPartOfGroup bool
isMember bool
)
rg := ca.Group
if rg != nil && rg.isMember(ourID) {
isMember = true
Expand All @@ -4233,6 +4234,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
}
}
js.mu.RUnlock()

// Check if we should ignore all together.
if node == nil {
// We have been assigned but have not created a node yet. If we are a member return
Expand Down

0 comments on commit 0083928

Please sign in to comment.