diff --git a/locksordering.txt b/locksordering.txt index 88391838d8f..89832611940 100644 --- a/locksordering.txt +++ b/locksordering.txt @@ -8,3 +8,9 @@ jetStream -> jsAccount -> stream -> consumer A lock to protect jetstream account's usage has been introduced: jsAccount.usageMu. This lock is independent and can be invoked under any other lock: jsAccount -> jsa.usageMu, stream -> jsa.usageMu, etc... + +A lock to protect the account's leafnodes list was also introduced to +allow that lock to be held and the acquire a client lock which is not +possible with the normal account lock. + +accountLeafList -> client diff --git a/server/accounts.go b/server/accounts.go index 5e7520c6ddf..5539a6d8688 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -73,6 +73,7 @@ type Account struct { lqws map[string]int32 usersRevoked map[string]int64 mappings []*mapping + lmu sync.RWMutex lleafs []*client leafClusters map[string]uint64 imports importMap @@ -166,14 +167,17 @@ const ( Chunked ) -var commaSeparatorRegEx = regexp.MustCompile(`,\s*`) -var partitionMappingFunctionRegEx = regexp.MustCompile(`{{\s*[pP]artition\s*\((.*)\)\s*}}`) -var wildcardMappingFunctionRegEx = regexp.MustCompile(`{{\s*[wW]ildcard\s*\((.*)\)\s*}}`) -var splitFromLeftMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit[fF]rom[lL]eft\s*\((.*)\)\s*}}`) -var splitFromRightMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit[fF]rom[rR]ight\s*\((.*)\)\s*}}`) -var sliceFromLeftMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]lice[fF]rom[lL]eft\s*\((.*)\)\s*}}`) -var sliceFromRightMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]lice[fF]rom[rR]ight\s*\((.*)\)\s*}}`) -var splitMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit\s*\((.*)\)\s*}}`) +// Subject mapping and transform setups. +var ( + commaSeparatorRegEx = regexp.MustCompile(`,\s*`) + partitionMappingFunctionRegEx = regexp.MustCompile(`{{\s*[pP]artition\s*\((.*)\)\s*}}`) + wildcardMappingFunctionRegEx = regexp.MustCompile(`{{\s*[wW]ildcard\s*\((.*)\)\s*}}`) + splitFromLeftMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit[fF]rom[lL]eft\s*\((.*)\)\s*}}`) + splitFromRightMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit[fF]rom[rR]ight\s*\((.*)\)\s*}}`) + sliceFromLeftMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]lice[fF]rom[lL]eft\s*\((.*)\)\s*}}`) + sliceFromRightMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]lice[fF]rom[rR]ight\s*\((.*)\)\s*}}`) + splitMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit\s*\((.*)\)\s*}}`) +) // Enum for the subject mapping transform function types const ( @@ -374,12 +378,14 @@ func (a *Account) updateRemoteServer(m *AccountNumConns) []*client { mtlce := a.mleafs != jwt.NoLimit && (a.nleafs+a.nrleafs > a.mleafs) if mtlce { // Take ones from the end. + a.lmu.RLock() leafs := a.lleafs over := int(a.nleafs + a.nrleafs - a.mleafs) if over < len(leafs) { leafs = leafs[len(leafs)-over:] } clients = append(clients, leafs...) + a.lmu.RUnlock() } a.mu.Unlock() @@ -719,13 +725,15 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error { a.mappings = append(a.mappings, m) // If we have connected leafnodes make sure to update. - if len(a.lleafs) > 0 { - leafs := append([]*client(nil), a.lleafs...) + if a.nleafs > 0 { // Need to release because lock ordering is client -> account a.mu.Unlock() - for _, lc := range leafs { + // Now grab the leaf list lock. We can hold client lock under this one. + a.lmu.RLock() + for _, lc := range a.lleafs { lc.forceAddToSmap(src) } + a.lmu.RUnlock() a.mu.Lock() } return nil @@ -911,11 +919,17 @@ func (a *Account) addClient(c *client) int { a.sysclients++ } else if c.kind == LEAF { a.nleafs++ - a.lleafs = append(a.lleafs, c) } } a.mu.Unlock() + // If we added a new leaf use the list lock and add it to the list. + if added && c.kind == LEAF { + a.lmu.Lock() + a.lleafs = append(a.lleafs, c) + a.lmu.Unlock() + } + if c != nil && c.srv != nil && added { c.srv.accConnsUpdate(a) } @@ -949,8 +963,12 @@ func (a *Account) isLeafNodeClusterIsolated(cluster string) bool { // Helper function to remove leaf nodes. If number of leafnodes gets large // this may need to be optimized out of linear search but believe number // of active leafnodes per account scope to be small and therefore cache friendly. -// Lock should be held on account. +// Lock should not be held on general account lock. func (a *Account) removeLeafNode(c *client) { + // Make sure we hold the list lock as well. + a.lmu.Lock() + defer a.lmu.Unlock() + ll := len(a.lleafs) for i, l := range a.lleafs { if l == c { @@ -960,15 +978,6 @@ func (a *Account) removeLeafNode(c *client) { } else { a.lleafs = a.lleafs[:ll-1] } - // Do cluster accounting if we are a hub. - if l.isHubLeafNode() { - cluster := l.remoteCluster() - if count := a.leafClusters[cluster]; count > 1 { - a.leafClusters[cluster]-- - } else if count == 1 { - delete(a.leafClusters, cluster) - } - } return } } @@ -985,11 +994,24 @@ func (a *Account) removeClient(c *client) int { a.sysclients-- } else if c.kind == LEAF { a.nleafs-- - a.removeLeafNode(c) + // Need to do cluster accounting here. + // Do cluster accounting if we are a hub. + if c.isHubLeafNode() { + cluster := c.remoteCluster() + if count := a.leafClusters[cluster]; count > 1 { + a.leafClusters[cluster]-- + } else if count == 1 { + delete(a.leafClusters, cluster) + } + } } } a.mu.Unlock() + if removed && c.kind == LEAF { + a.removeLeafNode(c) + } + if c != nil && c.srv != nil && removed { c.srv.mu.Lock() doRemove := a != c.srv.gacc @@ -2022,7 +2044,7 @@ func (a *Account) addServiceImportSub(si *serviceImport) error { // This is similar to what initLeafNodeSmapAndSendSubs does // TODO we need to consider performing this update as we get client subscriptions. // This behavior would result in subscription propagation only where actually used. - a.srv.updateLeafNodes(a, sub, 1) + a.updateLeafNodes(sub, 1) return nil } diff --git a/server/client.go b/server/client.go index 662dc7df850..b1d680b9b8b 100644 --- a/server/client.go +++ b/server/client.go @@ -2560,7 +2560,7 @@ func (c *client) processSubEx(subject, queue, bsid []byte, cb msgHandler, noForw } } // Now check on leafnode updates. - srv.updateLeafNodes(acc, sub, 1) + acc.updateLeafNodes(sub, 1) return sub, nil } @@ -2859,7 +2859,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool } } // Now check on leafnode updates. - c.srv.updateLeafNodes(nsub.im.acc, nsub, -1) + nsub.im.acc.updateLeafNodes(nsub, -1) } // Now check to see if this was part of a respMap entry for service imports. @@ -2923,7 +2923,7 @@ func (c *client) processUnsub(arg []byte) error { } } // Now check on leafnode updates. - srv.updateLeafNodes(acc, sub, -1) + acc.updateLeafNodes(sub, -1) } return nil @@ -4911,7 +4911,7 @@ func (c *client) closeConnection(reason ClosedState) { srv.gatewayUpdateSubInterest(acc.Name, sub, -1) } } - srv.updateLeafNodes(acc, sub, -1) + acc.updateLeafNodes(sub, -1) } else { // We handle queue subscribers special in case we // have a bunch we can just send one update to the @@ -4936,7 +4936,7 @@ func (c *client) closeConnection(reason ClosedState) { srv.gatewayUpdateSubInterest(acc.Name, esub.sub, -(esub.n)) } } - srv.updateLeafNodes(acc, esub.sub, -(esub.n)) + acc.updateLeafNodes(esub.sub, -(esub.n)) } if prev := acc.removeClient(c); prev == 1 { srv.decActiveAccounts() diff --git a/server/consumer.go b/server/consumer.go index 2fd466c06a0..c9fd4055874 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1043,10 +1043,12 @@ func (o *consumer) setLeader(isLeader bool) { } var err error - if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil { - o.mu.Unlock() - o.deleteWithoutAdvisory() - return + if o.cfg.AckPolicy != AckNone { + if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil { + o.mu.Unlock() + o.deleteWithoutAdvisory() + return + } } // Setup the internal sub for next message requests regardless. diff --git a/server/leafnode.go b/server/leafnode.go index 407f225d4c1..2c727adffde 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1639,11 +1639,11 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { return } // Collect all account subs here. - _subs := [32]*subscription{} + _subs := [1024]*subscription{} subs := _subs[:0] ims := []string{} - acc.mu.Lock() + acc.mu.RLock() accName := acc.Name accNTag := acc.nameTag @@ -1682,11 +1682,15 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { // Create a unique subject that will be used for loop detection. lds := acc.lds + acc.mu.RUnlock() + + // Check if we have to create the LDS. if lds == _EMPTY_ { lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next() + acc.mu.Lock() acc.lds = lds + acc.mu.Unlock() } - acc.mu.Unlock() // Now check for gateway interest. Leafnodes will put this into // the proper mode to propagate, but they are not held in the account. @@ -1790,16 +1794,28 @@ func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscrip s.Debugf("No or bad account for %q, failed to update interest from gateway", accName) return } - s.updateLeafNodes(acc, sub, delta) + acc.updateLeafNodes(sub, delta) } -// updateLeafNodes will make sure to update the smap for the subscription. Will -// also forward to all leaf nodes as needed. -func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) { +// updateLeafNodes will make sure to update the account smap for the subscription. +// Will also forward to all leaf nodes as needed. +func (acc *Account) updateLeafNodes(sub *subscription, delta int32) { if acc == nil || sub == nil { return } + // We will do checks for no leafnodes and same cluster here inline and under the + // general account read lock. + // If we feel we need to update the leafnodes we will do that out of line to avoid + // blocking routes or GWs. + + acc.mu.RLock() + // First check if we even have leafnodes here. + if acc.nleafs == 0 { + acc.mu.RUnlock() + return + } + // Is this a loop detection subject. isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix)) @@ -1809,43 +1825,45 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) { cluster = string(sub.origin) } - acc.mu.RLock() // If we have an isolated cluster we can return early, as long as it is not a loop detection subject. // Empty clusters will return false for the check. if !isLDS && acc.isLeafNodeClusterIsolated(cluster) { acc.mu.RUnlock() return } - // Grab all leaf nodes. - const numStackClients = 64 - var _l [numStackClients]*client - leafs := append(_l[:0], acc.lleafs...) + + // We can release the general account lock. acc.mu.RUnlock() - for _, ln := range leafs { + // We can hold the list lock here to avoid having to copy a large slice. + acc.lmu.RLock() + defer acc.lmu.RUnlock() + + // Do this once. + subject := string(sub.subject) + + // Walk the connected leafnodes. + for _, ln := range acc.lleafs { if ln == sub.client { continue } // Check to make sure this sub does not have an origin cluster that matches the leafnode. ln.mu.Lock() - skip := (cluster != _EMPTY_ && cluster == ln.remoteCluster()) || !ln.canSubscribe(string(sub.subject)) - ln.mu.Unlock() + skip := (cluster != _EMPTY_ && cluster == ln.remoteCluster()) || (delta > 0 && !ln.canSubscribe(subject)) // If skipped, make sure that we still let go the "$LDS." subscription that allows // the detection of a loop. if isLDS || !skip { ln.updateSmap(sub, delta) } + ln.mu.Unlock() } } // This will make an update to our internal smap and determine if we should send out // an interest update to the remote side. +// Lock should be held. func (c *client) updateSmap(sub *subscription, delta int32) { - key := keyFromSub(sub) - - c.mu.Lock() if c.leaf.smap == nil { - c.mu.Unlock() return } @@ -1853,7 +1871,6 @@ func (c *client) updateSmap(sub *subscription, delta int32) { skind := sub.client.kind updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT if c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) { - c.mu.Unlock() return } @@ -1866,12 +1883,16 @@ func (c *client) updateSmap(sub *subscription, delta int32) { c.leaf.tsubt.Stop() c.leaf.tsubt = nil } - c.mu.Unlock() return } } - n := c.leaf.smap[key] + key := keyFromSub(sub) + n, ok := c.leaf.smap[key] + if delta < 0 && !ok { + return + } + // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0. update := sub.queue != nil || n == 0 || n+delta <= 0 n += delta @@ -1883,7 +1904,6 @@ func (c *client) updateSmap(sub *subscription, delta int32) { if update { c.sendLeafNodeSubUpdate(key, n) } - c.mu.Unlock() } // Used to force add subjects to the subject map. @@ -2097,7 +2117,7 @@ func (c *client) processLeafSub(argo []byte) (err error) { } // Now check on leafnode updates for other leaf nodes. We understand solicited // and non-solicited state in this call so we will do the right thing. - srv.updateLeafNodes(acc, sub, delta) + acc.updateLeafNodes(sub, delta) return nil } @@ -2154,7 +2174,7 @@ func (c *client) processLeafUnsub(arg []byte) error { } } // Now check on leafnode updates for other leaf nodes. - srv.updateLeafNodes(acc, sub, -1) + acc.updateLeafNodes(sub, -1) return nil } diff --git a/server/route.go b/server/route.go index b135ce500e5..9f9dba7eb41 100644 --- a/server/route.go +++ b/server/route.go @@ -902,7 +902,7 @@ func (c *client) removeRemoteSubs() { if srv.gateway.enabled { srv.gatewayUpdateSubInterest(accountName, sub, -1) } - srv.updateLeafNodes(ase.acc, sub, -1) + ase.acc.updateLeafNodes(sub, -1) } // Now remove the subs by batch for each account sublist. @@ -972,7 +972,7 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) { } // Now check on leafnode updates. - srv.updateLeafNodes(acc, sub, -1) + acc.updateLeafNodes(sub, -1) if c.opts.Verbose { c.sendOK() @@ -1109,7 +1109,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { } // Now check on leafnode updates. - srv.updateLeafNodes(acc, sub, delta) + acc.updateLeafNodes(sub, delta) if c.opts.Verbose { c.sendOK() diff --git a/server/server.go b/server/server.go index 7754d6343df..545b4b1cec3 100644 --- a/server/server.go +++ b/server/server.go @@ -3765,7 +3765,7 @@ func (s *Server) updateRemoteSubscription(acc *Account, sub *subscription, delta s.gatewayUpdateSubInterest(acc.Name, sub, delta) } - s.updateLeafNodes(acc, sub, delta) + acc.updateLeafNodes(sub, delta) } func (s *Server) startRateLimitLogExpiration() {