Skip to content

Commit

Permalink
Merge 2228146 into 36345ee
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Dec 2, 2018
2 parents 36345ee + 2228146 commit b9f7cf7
Show file tree
Hide file tree
Showing 11 changed files with 950 additions and 157 deletions.
64 changes: 46 additions & 18 deletions server/accounts.go
Expand Up @@ -40,18 +40,21 @@ type rme struct {
// Account are subject namespace definitions. By default no messages are shared between accounts.
// You can share via exports and imports of streams and services.
type Account struct {
Name string
Nkey string
Issuer string
claimJWT string
updated time.Time
mu sync.RWMutex
sl *Sublist
etmr *time.Timer
clients map[*client]*client
rm map[string]*rme
imports importMap
exports exportMap
Name string
Nkey string
Issuer string
claimJWT string
updated time.Time
mu sync.RWMutex
sl *Sublist
etmr *time.Timer
ctmr *time.Timer
strack map[string]int
nrclients int
clients map[*client]*client
rm map[string]*rme
imports importMap
exports exportMap
limits
nae int
pruning bool
Expand Down Expand Up @@ -106,19 +109,28 @@ type exportMap struct {
services map[string]*exportAuth
}

// NumClients returns active number of clients for this account.
// NumClients returns active number of clients for this account for
// all known servers.
func (a *Account) NumClients() int {
a.mu.RLock()
defer a.mu.RUnlock()
return len(a.clients) + a.nrclients
}

// NumLocalClients returns active number of clients for this account
// on this server.
func (a *Account) NumLocalClients() int {
a.mu.RLock()
defer a.mu.RUnlock()
return len(a.clients)
}

// MaxClientsReached returns if we have reached our limit for number of connections.
func (a *Account) MaxClientsReached() bool {
func (a *Account) MaxTotalClientsReached() bool {
a.mu.RLock()
defer a.mu.RUnlock()
if a.mconns != 0 {
return len(a.clients) >= a.mconns
return len(a.clients)+a.nrclients >= a.mconns
}
return false
}
Expand All @@ -138,25 +150,41 @@ func (a *Account) TotalSubs() int {
return int(a.sl.Count())
}

// addClient keeps our accounting of active clients updated.
// addClient keeps our accounting of local active clients updated.
// Returns previous total.
func (a *Account) addClient(c *client) int {
a.mu.Lock()
n := len(a.clients)
a.clients[c] = c
if a.clients != nil {
a.clients[c] = c
}
a.mu.Unlock()
if c != nil && c.srv != nil && a != c.srv.gacc {
c.srv.accConnsUpdate(a)
}
return n
}

// removeClient keeps our accounting of active clients updated.
// removeClient keeps our accounting of local active clients updated.
func (a *Account) removeClient(c *client) int {
a.mu.Lock()
n := len(a.clients)
delete(a.clients, c)
a.mu.Unlock()
if c != nil && c.srv != nil && a != c.srv.gacc {
c.srv.accConnsUpdate(a)
}
return n
}

func (a *Account) randomClient() *client {
var c *client
for _, c = range a.clients {
break
}
return c
}

// AddServiceExport will configure the account with the defined export.
func (a *Account) AddServiceExport(subject string, accounts []*Account) error {
a.mu.Lock()
Expand Down
13 changes: 6 additions & 7 deletions server/client.go
Expand Up @@ -406,14 +406,14 @@ func (c *client) registerWithAccount(acc *Account) error {
if acc == nil || acc.sl == nil {
return ErrBadAccount
}
// If we were previously register, usually to $G, do accounting here to remove.
// If we were previously registered, usually to $G, do accounting here to remove.
if c.acc != nil {
if prev := c.acc.removeClient(c); prev == 1 && c.srv != nil {
c.srv.decActiveAccounts()
}
}
// Check if we have a max connections violation
if acc.MaxClientsReached() {
if acc.MaxTotalClientsReached() {
return ErrTooManyAccountConnections
}

Expand Down Expand Up @@ -490,7 +490,6 @@ func (c *client) RegisterUser(user *User) {
c.mperms = nil
return
}

c.setPermissions(user.Permissions)
}

Expand Down Expand Up @@ -1514,7 +1513,7 @@ func (c *client) processSub(argo []byte) (err error) {
c.Errorf(err.Error())
}
// If we are routing and this is a local sub, add to the route map for the associated account.
if kind == CLIENT {
if kind == CLIENT || kind == SYSTEM {
c.srv.updateRouteSubscriptionMap(acc, sub, 1)
}
}
Expand Down Expand Up @@ -1704,7 +1703,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force bool) {
for _, nsub := range sub.shadow {
if err := nsub.im.acc.sl.Remove(nsub); err != nil {
c.Debugf("Could not remove shadow import subscription for account %q", nsub.im.acc.Name)
} else if c.kind == CLIENT && c.srv != nil {
} else if c.kind == CLIENT || c.kind == SYSTEM && c.srv != nil {
c.srv.updateRouteSubscriptionMap(nsub.im.acc, nsub, -1)
}
}
Expand Down Expand Up @@ -1758,7 +1757,7 @@ func (c *client) processUnsub(arg []byte) error {

if unsub {
c.unsubscribe(acc, sub, false)
if acc != nil && kind == CLIENT {
if acc != nil && kind == CLIENT || kind == SYSTEM {
c.srv.updateRouteSubscriptionMap(acc, sub, -1)
}
}
Expand Down Expand Up @@ -1833,7 +1832,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
defer client.removeReplySub(sub)
} else {
// For routing..
shouldForward := client.kind == CLIENT && client.srv != nil
shouldForward := client.kind == CLIENT || client.kind == SYSTEM && client.srv != nil
// If we are at the exact number, unsubscribe but
// still process the message in hand, otherwise
// unsubscribe and drop message on the floor.
Expand Down

0 comments on commit b9f7cf7

Please sign in to comment.