Skip to content

Commit

Permalink
Updates based on comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Oct 24, 2018
1 parent d35bb56 commit 9a1cb08
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 30 deletions.
5 changes: 2 additions & 3 deletions server/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type Account struct {
maxnae int
maxaettl time.Duration
pruning bool
checksti bool // check stream imports during a config reload
}

// Import stream mapping struct
Expand Down Expand Up @@ -341,12 +340,12 @@ func (a *Account) checkStreamImportAuthorized(account *Account, subject string)
return false
}

// Returns true of `a` and `b` stream imports are the same. Note that the
// Returns true if `a` and `b` stream imports are the same. Note that the
// check is done with the account's name, not the pointer. This is used
// during config reload where we are comparing current and new config
// in which pointers are different.
// No lock is acquired in this function, so it is assumed that the
// import maps are not changed while this execute.
// import maps are not changed while this executes.
func (a *Account) checkStreamImportsEqual(b *Account) bool {
if len(a.imports.streams) != len(b.imports.streams) {
return false
Expand Down
12 changes: 6 additions & 6 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2025,7 +2025,7 @@ func (c *client) typeString() string {

// processSubsOnConfigReload removes any subscriptions the client has that are no
// longer authorized, and check for imports (accounts) due to a config reload.
func (c *client) processSubsOnConfigReload() {
func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) {
c.mu.Lock()
var (
checkPerms = c.perms != nil
Expand All @@ -2051,9 +2051,9 @@ func (c *client) processSubsOnConfigReload() {
}
if checkAcc {
// We actually only want to check if stream imports have changed.
c.acc.mu.RLock()
checkAcc = c.acc.checksti
c.acc.mu.RUnlock()
if _, ok := awcsti[c.acc.Name]; !ok {
checkAcc = false
}
}
// Collect client's subs under the lock
for _, sub := range c.subs {
Expand Down Expand Up @@ -2082,9 +2082,9 @@ func (c *client) processSubsOnConfigReload() {

// Report back to client and logs.
for _, sub := range removed {
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)",
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %q)",
sub.subject, sub.sid))
srv.Noticef("Removed sub %q (sid %s) for user %q - not authorized",
srv.Noticef("Removed sub %q (sid %q) for user %q - not authorized",
sub.subject, sub.sid, userInfo)
}
}
Expand Down
45 changes: 24 additions & 21 deletions server/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,11 @@ func (s *Server) applyOptions(opts []option) {
func (s *Server) reloadAuthorization() {
s.mu.Lock()
s.configureAuthorization()

// This map will contain the names of accounts that have their streams
// import configuration changed.
awcsti := make(map[string]struct{}, len(s.opts.Accounts))

oldAccounts := s.accounts
gAccount := oldAccounts[globalAccountName]
s.accounts = make(map[string]*Account)
Expand All @@ -701,30 +706,37 @@ func (s *Server) reloadAuthorization() {
newAcc.sl = sl
// Check if current and new config of this account are same
// in term of stream imports.
newAcc.checksti = !acc.checkStreamImportsEqual(newAcc)
if !acc.checkStreamImportsEqual(newAcc) {
awcsti[newAcc.Name] = struct{}{}
}
newAcc.mu.Unlock()
}
s.registerAccount(newAcc)
}
// Gather clients that changed accounts. We will close them and they
// will reconnect, doing the right thing.
closeClients := make(map[uint64]*client, len(s.clients))
clients := make(map[uint64]*client, len(s.clients))
for i, client := range s.clients {
var (
cclientsa [64]*client
cclients = cclientsa[:0]
clientsa [64]*client
clients = clientsa[:0]
routesa [64]*client
routes = routesa[:0]
)
for _, client := range s.clients {
if s.clientHasMovedToDifferentAccount(client) {
closeClients[i] = client
cclients = append(cclients, client)
} else {
clients[i] = client
clients = append(clients, client)
}
}
routes := make(map[uint64]*client, len(s.routes))
for i, route := range s.routes {
routes[i] = route
for _, route := range s.routes {
routes = append(routes, route)
}
s.mu.Unlock()

// Close clients that have moved accounts
for _, client := range closeClients {
for _, client := range cclients {
client.closeConnection(ClientClosed)
}

Expand All @@ -735,18 +747,9 @@ func (s *Server) reloadAuthorization() {
continue
}
// Remove any unauthorized subscriptions and check for account imports.
client.processSubsOnConfigReload()
client.processSubsOnConfigReload(awcsti)
}

// Reset the check import flag now
s.mu.Lock()
for _, a := range s.accounts {
a.mu.Lock()
a.checksti = false
a.mu.Unlock()
}
s.mu.Unlock()

for _, route := range routes {
// Disconnect any unauthorized routes.
// Do this only for route that were accepted, not initiated
Expand Down Expand Up @@ -791,7 +794,7 @@ func (s *Server) clientHasMovedToDifferentAccount(c *client) bool {
} else if u != nil && u.Account != nil {
return curAccName != u.Account.Name
}
// There is no longer this user/nkey in the new config
// user/nkey no longer exists.
return true
}

Expand Down

0 comments on commit 9a1cb08

Please sign in to comment.