Skip to content

Commit

Permalink
Merge 1783951 into f5e574c
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Apr 25, 2019
2 parents f5e574c + 1783951 commit 0d66baa
Show file tree
Hide file tree
Showing 8 changed files with 398 additions and 50 deletions.
94 changes: 81 additions & 13 deletions server/accounts.go
Expand Up @@ -43,9 +43,11 @@ type Account struct {
sl *Sublist
etmr *time.Timer
ctmr *time.Timer
strack map[string]int32
strack map[string]sconns
nrclients int32
sysclients int32
nleafs int32
nrleafs int32
clients map[*client]*client
rm map[string]int32
imports importMap
Expand All @@ -63,10 +65,17 @@ type limits struct {
mpay int32
msubs int32
mconns int32
mleafs int32
maxnae int32
maxaettl time.Duration
}

// Used to track remote clients and leafnodes per remote server.
type sconns struct {
conns int32
leafs int32
}

// Import stream mapping struct
type streamImport struct {
acc *Account
Expand Down Expand Up @@ -110,7 +119,7 @@ func NewAccount(name string) *Account {
a := &Account{
Name: name,
sl: NewSublist(),
limits: limits{-1, -1, -1, 0, 0},
limits: limits{-1, -1, -1, -1, 0, 0},
}
return a
}
Expand Down Expand Up @@ -146,10 +155,14 @@ func (a *Account) NumLocalConnections() int {

// Do not account for the system accounts.
func (a *Account) numLocalConnections() int {
return len(a.clients) - int(a.sysclients)
return len(a.clients) - int(a.sysclients) - int(a.nleafs)
}

func (a *Account) numLocalLeafNodes() int {
return int(a.nleafs)
}

// MaxClientsReached returns if we have reached our limit for number of connections.
// MaxTotalConnectionsReached returns if we have reached our limit for number of connections.
func (a *Account) MaxTotalConnectionsReached() bool {
a.mu.RLock()
mtc := a.maxTotalConnectionsReached()
Expand All @@ -168,8 +181,52 @@ func (a *Account) maxTotalConnectionsReached() bool {
// wide for total number of active connections.
func (a *Account) MaxActiveConnections() int {
a.mu.RLock()
defer a.mu.RUnlock()
return int(a.mconns)
mconns := int(a.mconns)
a.mu.RUnlock()
return mconns
}

// MaxTotalLeafNodesReached() returns if we have reached our limit for number of leafnodes.
func (a *Account) MaxTotalLeafNodesReached() bool {
a.mu.RLock()
mtc := a.maxTotalLeafNodesReached()
a.mu.RUnlock()
return mtc
}

func (a *Account) maxTotalLeafNodesReached() bool {
if a.mleafs != jwt.NoLimit {
return a.nleafs+a.nrleafs >= a.mleafs
}
return false
}

// NumLeafNodes returns the active number of local and remote
// leaf node connections.
func (a *Account) NumLeafNodes() int {
a.mu.RLock()
nln := int(a.nleafs + a.nrleafs)
a.mu.RUnlock()
return nln
}

// NumRemoteLeafNodes returns the active number of remote
// leaf node connections.
func (a *Account) NumRemoteLeafNodes() int {
a.mu.RLock()
nrn := int(a.nrleafs)
a.mu.RUnlock()
return nrn
}

// MaxActiveLeafnodes return the set limit for the account system
// wide for total number of leavenode connections.
// NOTE: these are tracked separately.
func (a *Account) MaxActiveLeafNodes() int {
a.mu.RLock()
mleafs := int(a.mleafs)
a.mu.RUnlock()
return mleafs
}

// RoutedSubs returns how many subjects we would send across a route when first
Expand All @@ -187,19 +244,24 @@ func (a *Account) TotalSubs() int {
return int(a.sl.Count())
}

// addClient keeps our accounting of local active clients updated.
// addClient keeps our accounting of local active clients or leafnodes updated.
// Returns previous total.
func (a *Account) addClient(c *client) int {
a.mu.Lock()
n := len(a.clients)
if a.clients != nil {
a.clients[c] = c
}
if c.kind == SYSTEM {
a.sysclients++
added := n != len(a.clients)
if added {
if c.kind == SYSTEM {
a.sysclients++
} else if c.kind == LEAF {
a.nleafs++
}
}
a.mu.Unlock()
if c != nil && c.srv != nil && a != c.srv.gacc {
if c != nil && c.srv != nil && a != c.srv.gacc && added {
c.srv.accConnsUpdate(a)
}
return n
Expand All @@ -210,11 +272,16 @@ func (a *Account) removeClient(c *client) int {
a.mu.Lock()
n := len(a.clients)
delete(a.clients, c)
if c.kind == SYSTEM {
a.sysclients--
removed := n != len(a.clients)
if removed {
if c.kind == SYSTEM {
a.sysclients--
} else if c.kind == LEAF {
a.nleafs--
}
}
a.mu.Unlock()
if c != nil && c.srv != nil && a != c.srv.gacc {
if c != nil && c.srv != nil && a != c.srv.gacc && removed {
c.srv.accConnsUpdate(a)
}
return n
Expand Down Expand Up @@ -1006,6 +1073,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
a.msubs = int32(ac.Limits.Subs)
a.mpay = int32(ac.Limits.Payload)
a.mconns = int32(ac.Limits.Conn)
a.mleafs = int32(ac.Limits.LeafNodeConn)
a.mu.Unlock()

clients := gatherClients()
Expand Down
7 changes: 3 additions & 4 deletions server/auth.go
Expand Up @@ -634,10 +634,9 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool {
}

nkey := buildInternalNkeyUser(juc, acc)
c.RegisterNkeyUser(nkey)

// Generate a connect event if we have a system account.
// FIXME(dlc) - Make one for leafnodes if we track active connections.
if err := c.RegisterNkeyUser(nkey); err != nil {
return false
}

// Check if we need to set an auth timer if the user jwt expires.
c.checkExpiration(juc.Claims())
Expand Down
47 changes: 31 additions & 16 deletions server/client.go
Expand Up @@ -444,21 +444,26 @@ func (c *client) registerWithAccount(acc *Account) error {
c.srv.decActiveAccounts()
}
}

c.mu.Lock()
kind := c.kind
srv := c.srv
c.acc = acc
c.applyAccountLimits()
c.mu.Unlock()

// Check if we have a max connections violation
if c.kind == CLIENT && acc.MaxTotalConnectionsReached() {
if kind == CLIENT && acc.MaxTotalConnectionsReached() {
return ErrTooManyAccountConnections
} else if kind == LEAF && acc.MaxTotalLeafNodesReached() {
return ErrTooManyAccountConnections
}

// Add in new one.
if prev := acc.addClient(c); prev == 0 && c.srv != nil {
c.srv.incActiveAccounts()
if prev := acc.addClient(c); prev == 0 && srv != nil {
srv.incActiveAccounts()
}

c.mu.Lock()
c.acc = acc
c.applyAccountLimits()
c.mu.Unlock()

return nil
}

Expand All @@ -471,7 +476,7 @@ func (c *client) subsAtLimit() bool {
// Lock is held on entry.
// FIXME(dlc) - Should server be able to override here?
func (c *client) applyAccountLimits() {
if c.acc == nil || c.kind != CLIENT {
if c.acc == nil || (c.kind != CLIENT && c.kind != LEAF) {
return
}

Expand Down Expand Up @@ -534,12 +539,12 @@ func (c *client) RegisterUser(user *User) {
// RegisterNkey allows auth to call back into a new nkey
// client with the authenticated user. This is used to map
// any permissions into the client and setup accounts.
func (c *client) RegisterNkeyUser(user *NkeyUser) {
func (c *client) RegisterNkeyUser(user *NkeyUser) error {
// Register with proper account and sublist.
if user.Account != nil {
if err := c.registerWithAccount(user.Account); err != nil {
c.reportErrRegisterAccount(user.Account, err)
return
return err
}
}

Expand All @@ -552,10 +557,10 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) {
// Reset perms to nil in case client previously had them.
c.perms = nil
c.mperms = nil
return
} else {
c.setPermissions(user.Permissions)
}

c.setPermissions(user.Permissions)
return nil
}

// Initializes client.perms structure.
Expand Down Expand Up @@ -1123,6 +1128,7 @@ func (c *client) processConnect(arg []byte) error {
lang := c.opts.Lang
account := c.opts.Account
accountNew := c.opts.AccountNew
ujwt := c.opts.JWT
c.mu.Unlock()

if srv != nil {
Expand All @@ -1139,11 +1145,20 @@ func (c *client) processConnect(arg []byte) error {

// Check for Auth
if ok := srv.checkAuthentication(c); !ok {
// We may fail here because we reached max limits on an account.
if ujwt != "" {
c.mu.Lock()
acc := c.acc
c.mu.Unlock()
if acc != nil && acc != srv.gacc {
return ErrTooManyAccountConnections
}
}
c.authViolation()
return ErrAuthentication
}

// Check for Account designation
// Check for Account designation, this section should be only used when there is not a jwt.
if account != "" {
var acc *Account
var wasNew bool
Expand All @@ -1152,7 +1167,7 @@ func (c *client) processConnect(arg []byte) error {
acc, err = srv.LookupAccount(account)
if err != nil {
c.Errorf(err.Error())
c.sendErr("Account Not Found")
c.sendErr(ErrMissingAccount.Error())
return err
} else if accountNew && acc != nil {
c.sendErrAndErr(ErrAccountExists.Error())
Expand Down
35 changes: 23 additions & 12 deletions server/events.go
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/nats-io/gnatsd/server/pse"
"github.com/nats-io/jwt"
)

const (
Expand Down Expand Up @@ -91,6 +92,7 @@ type AccountNumConns struct {
Server ServerInfo `json:"server"`
Account string `json:"acc"`
Conns int `json:"conns"`
LeafNodes int `json:"leafnodes"`
TotalConns int `json:"total_conns"`
}

Expand Down Expand Up @@ -482,7 +484,8 @@ func (s *Server) processRemoteServerShutdown(sid string) {
a.mu.Lock()
prev := a.strack[sid]
delete(a.strack, sid)
a.nrclients -= prev
a.nrclients -= prev.conns
a.nrleafs -= prev.leafs
a.mu.Unlock()
return true
})
Expand Down Expand Up @@ -625,11 +628,18 @@ func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg
s.sys.client.Errorf("Error unmarshalling account connection event message: %v", err)
return
}

// See if we have the account registered, if not drop it.
acc, _ := s.lookupAccount(m.Account)

s.mu.Lock()
defer s.mu.Unlock()

// check again here if we have been shutdown.
if !s.running || !s.eventsEnabled() {
return
}

// Double check that this is not us, should never happen, so error if it does.
if m.Server.ID == s.info.ID {
s.sys.client.Errorf("Processing our own account connection event message: ignored")
Expand All @@ -642,12 +652,13 @@ func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg
// If we are here we have interest in tracking this account. Update our accounting.
acc.mu.Lock()
if acc.strack == nil {
acc.strack = make(map[string]int32)
acc.strack = make(map[string]sconns)
}
// This does not depend on receiving all updates since each one is idempotent.
prev := acc.strack[m.Server.ID]
acc.strack[m.Server.ID] = int32(m.Conns)
acc.nrclients += int32(m.Conns) - prev
acc.strack[m.Server.ID] = sconns{conns: int32(m.Conns), leafs: int32(m.LeafNodes)}
acc.nrclients += int32(m.Conns) - prev.conns
acc.nrleafs += int32(m.LeafNodes) - prev.leafs
acc.mu.Unlock()

s.updateRemoteServer(&m.Server)
Expand Down Expand Up @@ -675,8 +686,7 @@ func (s *Server) enableAccountTracking(a *Account) {
// Lock should NOT be held on entry.
func (s *Server) sendLeafNodeConnect(a *Account) {
s.mu.Lock()
// If we do not have any gateways defined this should also be a no-op.
// FIXME(dlc) - if we do accounting for operator limits might have to send regardless.
// If we are not in operator mode, or do not have any gateways defined, this should also be a no-op.
if a == nil || !s.eventsEnabled() || !s.gateway.enabled {
s.mu.Unlock()
return
Expand All @@ -699,21 +709,22 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
if !s.eventsEnabled() || a == nil || a == s.gacc {
return
}
a.mu.Lock()
a.mu.RLock()

// If no limits set, don't update, no need to.
if a.mconns == 0 {
a.mu.Unlock()
if a.mconns == jwt.NoLimit && a.mleafs == jwt.NoLimit {
a.mu.RUnlock()
return
}

// Build event with account name and number of local clients.
// Build event with account name and number of local clients and leafnodes.
m := AccountNumConns{
Account: a.Name,
Conns: a.numLocalConnections(),
TotalConns: len(s.clients),
LeafNodes: a.numLocalLeafNodes(),
TotalConns: a.numLocalConnections() + a.numLocalLeafNodes(),
}
a.mu.Unlock()
a.mu.RUnlock()

s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)

Expand Down

0 comments on commit 0d66baa

Please sign in to comment.