Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Optimizations for large single hub account leafnode fleets. #4135

Merged
merged 1 commit into from
May 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions locksordering.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,9 @@ jetStream -> jsAccount -> stream -> consumer

A lock to protect jetstream account's usage has been introduced: jsAccount.usageMu.
This lock is independent and can be invoked under any other lock: jsAccount -> jsa.usageMu, stream -> jsa.usageMu, etc...

A lock to protect the account's leafnodes list was also introduced to
allow that lock to be held and the acquire a client lock which is not
possible with the normal account lock.

accountLeafList -> client
70 changes: 46 additions & 24 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Account struct {
lqws map[string]int32
usersRevoked map[string]int64
mappings []*mapping
lmu sync.RWMutex
lleafs []*client
leafClusters map[string]uint64
imports importMap
Expand Down Expand Up @@ -166,14 +167,17 @@ const (
Chunked
)

var commaSeparatorRegEx = regexp.MustCompile(`,\s*`)
var partitionMappingFunctionRegEx = regexp.MustCompile(`{{\s*[pP]artition\s*\((.*)\)\s*}}`)
var wildcardMappingFunctionRegEx = regexp.MustCompile(`{{\s*[wW]ildcard\s*\((.*)\)\s*}}`)
var splitFromLeftMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit[fF]rom[lL]eft\s*\((.*)\)\s*}}`)
var splitFromRightMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit[fF]rom[rR]ight\s*\((.*)\)\s*}}`)
var sliceFromLeftMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]lice[fF]rom[lL]eft\s*\((.*)\)\s*}}`)
var sliceFromRightMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]lice[fF]rom[rR]ight\s*\((.*)\)\s*}}`)
var splitMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit\s*\((.*)\)\s*}}`)
// Subject mapping and transform setups.
var (
commaSeparatorRegEx = regexp.MustCompile(`,\s*`)
partitionMappingFunctionRegEx = regexp.MustCompile(`{{\s*[pP]artition\s*\((.*)\)\s*}}`)
wildcardMappingFunctionRegEx = regexp.MustCompile(`{{\s*[wW]ildcard\s*\((.*)\)\s*}}`)
splitFromLeftMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit[fF]rom[lL]eft\s*\((.*)\)\s*}}`)
splitFromRightMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit[fF]rom[rR]ight\s*\((.*)\)\s*}}`)
sliceFromLeftMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]lice[fF]rom[lL]eft\s*\((.*)\)\s*}}`)
sliceFromRightMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]lice[fF]rom[rR]ight\s*\((.*)\)\s*}}`)
splitMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit\s*\((.*)\)\s*}}`)
)

// Enum for the subject mapping transform function types
const (
Expand Down Expand Up @@ -374,12 +378,14 @@ func (a *Account) updateRemoteServer(m *AccountNumConns) []*client {
mtlce := a.mleafs != jwt.NoLimit && (a.nleafs+a.nrleafs > a.mleafs)
if mtlce {
// Take ones from the end.
a.lmu.RLock()
leafs := a.lleafs
over := int(a.nleafs + a.nrleafs - a.mleafs)
if over < len(leafs) {
leafs = leafs[len(leafs)-over:]
}
clients = append(clients, leafs...)
a.lmu.RUnlock()
}
a.mu.Unlock()

Expand Down Expand Up @@ -719,13 +725,15 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
a.mappings = append(a.mappings, m)

// If we have connected leafnodes make sure to update.
if len(a.lleafs) > 0 {
leafs := append([]*client(nil), a.lleafs...)
if a.nleafs > 0 {
// Need to release because lock ordering is client -> account
a.mu.Unlock()
for _, lc := range leafs {
// Now grab the leaf list lock. We can hold client lock under this one.
a.lmu.RLock()
for _, lc := range a.lleafs {
lc.forceAddToSmap(src)
}
a.lmu.RUnlock()
a.mu.Lock()
}
return nil
Expand Down Expand Up @@ -911,11 +919,17 @@ func (a *Account) addClient(c *client) int {
a.sysclients++
} else if c.kind == LEAF {
a.nleafs++
a.lleafs = append(a.lleafs, c)
}
}
a.mu.Unlock()

// If we added a new leaf use the list lock and add it to the list.
if added && c.kind == LEAF {
a.lmu.Lock()
a.lleafs = append(a.lleafs, c)
a.lmu.Unlock()
}

if c != nil && c.srv != nil && added {
c.srv.accConnsUpdate(a)
}
Expand Down Expand Up @@ -949,8 +963,12 @@ func (a *Account) isLeafNodeClusterIsolated(cluster string) bool {
// Helper function to remove leaf nodes. If number of leafnodes gets large
// this may need to be optimized out of linear search but believe number
// of active leafnodes per account scope to be small and therefore cache friendly.
// Lock should be held on account.
// Lock should not be held on general account lock.
func (a *Account) removeLeafNode(c *client) {
// Make sure we hold the list lock as well.
a.lmu.Lock()
defer a.lmu.Unlock()

ll := len(a.lleafs)
for i, l := range a.lleafs {
if l == c {
Expand All @@ -960,15 +978,6 @@ func (a *Account) removeLeafNode(c *client) {
} else {
a.lleafs = a.lleafs[:ll-1]
}
// Do cluster accounting if we are a hub.
if l.isHubLeafNode() {
cluster := l.remoteCluster()
if count := a.leafClusters[cluster]; count > 1 {
a.leafClusters[cluster]--
} else if count == 1 {
delete(a.leafClusters, cluster)
}
}
return
}
}
Expand All @@ -985,11 +994,24 @@ func (a *Account) removeClient(c *client) int {
a.sysclients--
} else if c.kind == LEAF {
a.nleafs--
a.removeLeafNode(c)
// Need to do cluster accounting here.
// Do cluster accounting if we are a hub.
if c.isHubLeafNode() {
cluster := c.remoteCluster()
if count := a.leafClusters[cluster]; count > 1 {
a.leafClusters[cluster]--
} else if count == 1 {
delete(a.leafClusters, cluster)
}
}
}
}
a.mu.Unlock()

if removed && c.kind == LEAF {
a.removeLeafNode(c)
}

if c != nil && c.srv != nil && removed {
c.srv.mu.Lock()
doRemove := a != c.srv.gacc
Expand Down Expand Up @@ -2022,7 +2044,7 @@ func (a *Account) addServiceImportSub(si *serviceImport) error {
// This is similar to what initLeafNodeSmapAndSendSubs does
// TODO we need to consider performing this update as we get client subscriptions.
// This behavior would result in subscription propagation only where actually used.
a.srv.updateLeafNodes(a, sub, 1)
a.updateLeafNodes(sub, 1)
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2560,7 +2560,7 @@ func (c *client) processSubEx(subject, queue, bsid []byte, cb msgHandler, noForw
}
}
// Now check on leafnode updates.
srv.updateLeafNodes(acc, sub, 1)
acc.updateLeafNodes(sub, 1)
return sub, nil
}

Expand Down Expand Up @@ -2859,7 +2859,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
}
}
// Now check on leafnode updates.
c.srv.updateLeafNodes(nsub.im.acc, nsub, -1)
nsub.im.acc.updateLeafNodes(nsub, -1)
}

// Now check to see if this was part of a respMap entry for service imports.
Expand Down Expand Up @@ -2923,7 +2923,7 @@ func (c *client) processUnsub(arg []byte) error {
}
}
// Now check on leafnode updates.
srv.updateLeafNodes(acc, sub, -1)
acc.updateLeafNodes(sub, -1)
}

return nil
Expand Down Expand Up @@ -4911,7 +4911,7 @@ func (c *client) closeConnection(reason ClosedState) {
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
}
}
srv.updateLeafNodes(acc, sub, -1)
acc.updateLeafNodes(sub, -1)
} else {
// We handle queue subscribers special in case we
// have a bunch we can just send one update to the
Expand All @@ -4936,7 +4936,7 @@ func (c *client) closeConnection(reason ClosedState) {
srv.gatewayUpdateSubInterest(acc.Name, esub.sub, -(esub.n))
}
}
srv.updateLeafNodes(acc, esub.sub, -(esub.n))
acc.updateLeafNodes(esub.sub, -(esub.n))
}
if prev := acc.removeClient(c); prev == 1 {
srv.decActiveAccounts()
Expand Down
10 changes: 6 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,10 +1043,12 @@ func (o *consumer) setLeader(isLeader bool) {
}

var err error
if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil {
o.mu.Unlock()
o.deleteWithoutAdvisory()
return
if o.cfg.AckPolicy != AckNone {
if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil {
o.mu.Unlock()
o.deleteWithoutAdvisory()
return
}
}

// Setup the internal sub for next message requests regardless.
Expand Down
70 changes: 45 additions & 25 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1639,11 +1639,11 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
return
}
// Collect all account subs here.
_subs := [32]*subscription{}
_subs := [1024]*subscription{}
subs := _subs[:0]
ims := []string{}

acc.mu.Lock()
acc.mu.RLock()
accName := acc.Name
accNTag := acc.nameTag

Expand Down Expand Up @@ -1682,11 +1682,15 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {

// Create a unique subject that will be used for loop detection.
lds := acc.lds
acc.mu.RUnlock()

// Check if we have to create the LDS.
if lds == _EMPTY_ {
lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
acc.mu.Lock()
acc.lds = lds
acc.mu.Unlock()
}
acc.mu.Unlock()

// Now check for gateway interest. Leafnodes will put this into
// the proper mode to propagate, but they are not held in the account.
Expand Down Expand Up @@ -1790,16 +1794,28 @@ func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscrip
s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
return
}
s.updateLeafNodes(acc, sub, delta)
acc.updateLeafNodes(sub, delta)
}

// updateLeafNodes will make sure to update the smap for the subscription. Will
// also forward to all leaf nodes as needed.
func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
// updateLeafNodes will make sure to update the account smap for the subscription.
// Will also forward to all leaf nodes as needed.
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
if acc == nil || sub == nil {
return
}

// We will do checks for no leafnodes and same cluster here inline and under the
// general account read lock.
// If we feel we need to update the leafnodes we will do that out of line to avoid
// blocking routes or GWs.

acc.mu.RLock()
// First check if we even have leafnodes here.
if acc.nleafs == 0 {
acc.mu.RUnlock()
return
}

// Is this a loop detection subject.
isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))

Expand All @@ -1809,51 +1825,52 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
cluster = string(sub.origin)
}

acc.mu.RLock()
// If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
// Empty clusters will return false for the check.
if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
acc.mu.RUnlock()
return
}
// Grab all leaf nodes.
const numStackClients = 64
var _l [numStackClients]*client
leafs := append(_l[:0], acc.lleafs...)

// We can release the general account lock.
acc.mu.RUnlock()

for _, ln := range leafs {
// We can hold the list lock here to avoid having to copy a large slice.
acc.lmu.RLock()
defer acc.lmu.RUnlock()

// Do this once.
subject := string(sub.subject)

// Walk the connected leafnodes.
for _, ln := range acc.lleafs {
if ln == sub.client {
continue
}
// Check to make sure this sub does not have an origin cluster that matches the leafnode.
ln.mu.Lock()
skip := (cluster != _EMPTY_ && cluster == ln.remoteCluster()) || !ln.canSubscribe(string(sub.subject))
ln.mu.Unlock()
skip := (cluster != _EMPTY_ && cluster == ln.remoteCluster()) || (delta > 0 && !ln.canSubscribe(subject))
// If skipped, make sure that we still let go the "$LDS." subscription that allows
// the detection of a loop.
if isLDS || !skip {
ln.updateSmap(sub, delta)
}
ln.mu.Unlock()
}
}

// This will make an update to our internal smap and determine if we should send out
// an interest update to the remote side.
// Lock should be held.
func (c *client) updateSmap(sub *subscription, delta int32) {
key := keyFromSub(sub)

c.mu.Lock()
if c.leaf.smap == nil {
c.mu.Unlock()
return
}

// If we are solicited make sure this is a local client or a non-solicited leaf node
skind := sub.client.kind
updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
if c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
c.mu.Unlock()
return
}

Expand All @@ -1866,12 +1883,16 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
c.leaf.tsubt.Stop()
c.leaf.tsubt = nil
}
c.mu.Unlock()
return
}
}

n := c.leaf.smap[key]
key := keyFromSub(sub)
n, ok := c.leaf.smap[key]
if delta < 0 && !ok {
return
}

// We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
update := sub.queue != nil || n == 0 || n+delta <= 0
n += delta
Expand All @@ -1883,7 +1904,6 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
if update {
c.sendLeafNodeSubUpdate(key, n)
}
c.mu.Unlock()
}

// Used to force add subjects to the subject map.
Expand Down Expand Up @@ -2097,7 +2117,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
}
// Now check on leafnode updates for other leaf nodes. We understand solicited
// and non-solicited state in this call so we will do the right thing.
srv.updateLeafNodes(acc, sub, delta)
acc.updateLeafNodes(sub, delta)

return nil
}
Expand Down Expand Up @@ -2154,7 +2174,7 @@ func (c *client) processLeafUnsub(arg []byte) error {
}
}
// Now check on leafnode updates for other leaf nodes.
srv.updateLeafNodes(acc, sub, -1)
acc.updateLeafNodes(sub, -1)
return nil
}

Expand Down
Loading