Skip to content

Commit

Permalink
Merge 3a87217 into 8fe0fa7
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Nov 26, 2018
2 parents 8fe0fa7 + 3a87217 commit c66bd0b
Show file tree
Hide file tree
Showing 7 changed files with 515 additions and 34 deletions.
56 changes: 45 additions & 11 deletions server/accounts.go
Expand Up @@ -52,11 +52,19 @@ type Account struct {
rm map[string]*rme
imports importMap
exports exportMap
nae int
limits
nae int
pruning bool
expired bool
}

// Account based limits.
type limits struct {
mpay int32
msubs int
mconns int
maxnae int
maxaettl time.Duration
pruning bool
expired bool
}

// Import stream mapping struct
Expand Down Expand Up @@ -104,6 +112,16 @@ func (a *Account) NumClients() int {
return len(a.clients)
}

// MaxClientsReached returns if we have reached our limit for number of connections.
func (a *Account) MaxClientsReached() bool {
a.mu.RLock()
defer a.mu.RUnlock()
if a.mconns != 0 {
return len(a.clients) >= a.mconns
}
return false
}

// RoutedSubs returns how many subjects we would send across a route when first
// connected or expressing interest. Local client subs.
func (a *Account) RoutedSubs() int {
Expand Down Expand Up @@ -693,6 +711,16 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
a.exports = exportMap{}
a.imports = importMap{}

gatherClients := func() []*client {
a.mu.RLock()
clients := make([]*client, 0, len(a.clients))
for _, c := range a.clients {
clients = append(clients, c)
}
a.mu.RUnlock()
return clients
}

for _, e := range ac.Exports {
switch e.Type {
case jwt.Stream:
Expand Down Expand Up @@ -723,13 +751,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
// Now let's apply any needed changes from import/export changes.
if !a.checkStreamImportsEqual(old) {
awcsti := map[string]struct{}{a.Name: struct{}{}}
a.mu.RLock()
clients := make([]*client, 0, len(a.clients))
for _, c := range a.clients {
clients = append(clients, c)
}
a.mu.RUnlock()
for _, c := range clients {
for _, c := range gatherClients() {
c.processSubsOnConfigReload(awcsti)
}
}
Expand Down Expand Up @@ -759,7 +781,19 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
}
}

// FIXME(dlc) - Limits etc..
// Now do limits if they are present.
a.msubs = int(ac.Limits.Subs)
a.mpay = int32(ac.Limits.Payload)
a.mconns = int(ac.Limits.Conn)
for i, c := range gatherClients() {
if a.mconns > 0 && i >= a.mconns {
c.maxAccountConnExceeded()
continue
}
c.mu.Lock()
c.applyAccountLimits()
c.mu.Unlock()
}
}

// Helper to build an internal account structure from a jwt.AccountClaims.
Expand Down
80 changes: 70 additions & 10 deletions server/client.go
Expand Up @@ -124,9 +124,11 @@ const (
ProtocolViolation
BadClientProtocolVersion
WrongPort
MaxAccountConnectionsExceeded
MaxConnectionsExceeded
MaxPayloadExceeded
MaxControlLineExceeded
MaxSubscriptionsExceeded
DuplicateRoute
RouteRemoved
ServerShutdown
Expand All @@ -136,7 +138,7 @@ const (
type client struct {
// Here first because of use of atomics, and memory alignment.
stats
mpay int64
mpay int32
msubs int
mu sync.Mutex
typ int
Expand Down Expand Up @@ -359,6 +361,16 @@ func (c *client) initClient() {
}
}

// Helper function to report errors.
func (c *client) reportErrRegisterAccount(acc *Account, err error) {
if err == ErrTooManyAccountConnections {
c.maxAccountConnExceeded()
return
}
c.Errorf("Problem registering with account [%s]", acc.Name)
c.sendErr("Failed Account Registration")
}

// RegisterWithAccount will register the given user with a specific
// account. This will change the subject namespace.
func (c *client) registerWithAccount(acc *Account) error {
Expand All @@ -371,25 +383,70 @@ func (c *client) registerWithAccount(acc *Account) error {
c.srv.decActiveAccounts()
}
}
// Check if we have a max connections violation
if acc.MaxClientsReached() {
return ErrTooManyAccountConnections
}

// Add in new one.
if prev := acc.addClient(c); prev == 0 && c.srv != nil {
c.srv.incActiveAccounts()
}

c.mu.Lock()
c.acc = acc
c.applyAccountLimits()
c.mu.Unlock()

return nil
}

// Apply account limits
// Lock is held on entry.
// FIXME(dlc) - Should server be able to override here?
func (c *client) applyAccountLimits() {
if c.acc == nil {
return
}
// Set here, check for more details below. Only set if non-zero.
if c.acc.msubs > 0 {
c.msubs = c.acc.msubs
}
if c.acc.mpay > 0 {
c.mpay = c.acc.mpay
}

opts := c.srv.getOpts()

// We check here if the server has an option set that is lower than the account limit.
if c.mpay != 0 && opts.MaxPayload != 0 && int32(opts.MaxPayload) < c.acc.mpay {
c.Errorf("Max Payload set to %d from server config which overrides %d from account claims", opts.MaxPayload, c.acc.mpay)
c.mpay = int32(opts.MaxPayload)
}

// We check here if the server has an option set that is lower than the account limit.
if c.msubs != 0 && opts.MaxSubs != 0 && opts.MaxSubs < c.acc.msubs {
c.Errorf("Max Subscriptions set to %d from server config which overrides %d from account claims", opts.MaxSubs, c.acc.msubs)
c.msubs = opts.MaxSubs
}

if c.msubs > 0 && len(c.subs) > c.msubs {
go func() {
c.maxSubsExceeded()
time.Sleep(20 * time.Millisecond)
c.closeConnection(MaxSubscriptionsExceeded)
}()
}
}

// RegisterUser allows auth to call back into a new client
// with the authenticated user. This is used to map
// any permissions into the client and setup accounts.
func (c *client) RegisterUser(user *User) {
// Register with proper account and sublist.
if user.Account != nil {
if err := c.registerWithAccount(user.Account); err != nil {
c.Errorf("Problem registering with account [%s]", user.Account.Name)
c.sendErr("Failed Account Registration")
c.reportErrRegisterAccount(user.Account, err)
return
}
}
Expand All @@ -415,8 +472,7 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) {
// Register with proper account and sublist.
if user.Account != nil {
if err := c.registerWithAccount(user.Account); err != nil {
c.Errorf("Problem registering with account [%s]", user.Account.Name)
c.sendErr("Failed Account Registration")
c.reportErrRegisterAccount(user.Account, err)
return
}
}
Expand Down Expand Up @@ -954,8 +1010,7 @@ func (c *client) processConnect(arg []byte) error {
}
// If we are here we can register ourselves with the new account.
if err := c.registerWithAccount(acc); err != nil {
c.Errorf("Problem registering with account [%s]", account)
c.sendErr("Failed Account Registration")
c.reportErrRegisterAccount(acc, err)
return ErrBadAccount
}
} else if c.acc == nil {
Expand Down Expand Up @@ -1047,6 +1102,11 @@ func (c *client) authViolation() {
c.closeConnection(AuthenticationViolation)
}

func (c *client) maxAccountConnExceeded() {
c.sendErrAndErr(ErrTooManyAccountConnections.Error())
c.closeConnection(MaxAccountConnectionsExceeded)
}

func (c *client) maxConnExceeded() {
c.sendErrAndErr(ErrTooManyConnections.Error())
c.closeConnection(MaxConnectionsExceeded)
Expand All @@ -1056,7 +1116,7 @@ func (c *client) maxSubsExceeded() {
c.sendErrAndErr(ErrTooManySubs.Error())
}

func (c *client) maxPayloadViolation(sz int, max int64) {
func (c *client) maxPayloadViolation(sz int, max int32) {
c.Errorf("%s: %d vs %d", ErrMaxPayload.Error(), sz, max)
c.sendErr("Maximum Payload Violation")
c.closeConnection(MaxPayloadExceeded)
Expand Down Expand Up @@ -1304,8 +1364,8 @@ func (c *client) processPub(trace bool, arg []byte) error {
if c.pa.size < 0 {
return fmt.Errorf("processPub Bad or Missing Size: '%s'", arg)
}
maxPayload := atomic.LoadInt64(&c.mpay)
if maxPayload > 0 && int64(c.pa.size) > maxPayload {
maxPayload := atomic.LoadInt32(&c.mpay)
if maxPayload > 0 && int32(c.pa.size) > maxPayload {
c.maxPayloadViolation(c.pa.size, maxPayload)
return ErrMaxPayload
}
Expand Down
4 changes: 4 additions & 0 deletions server/errors.go
Expand Up @@ -47,6 +47,10 @@ var (
// server has been reached.
ErrTooManyConnections = errors.New("Maximum Connections Exceeded")

// ErrTooManyAccountConnections signals that an acount has reached its maximum number of active
// connections.
ErrTooManyAccountConnections = errors.New("Maximum Account Active Connections Exceeded")

// ErrTooManySubs signals a client that the maximum number of subscriptions per connection
// has been reached.
ErrTooManySubs = errors.New("Maximum Subscriptions Exceeded")
Expand Down

0 comments on commit c66bd0b

Please sign in to comment.