Skip to content

Commit

Permalink
Merge d35bb56 into 77548d4
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Oct 23, 2018
2 parents 77548d4 + d35bb56 commit 55072ec
Show file tree
Hide file tree
Showing 5 changed files with 939 additions and 35 deletions.
96 changes: 96 additions & 0 deletions server/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,102 @@ func TestAccountGlobalDefault(t *testing.T) {
}
}

func TestAccountCheckStreamImportsEqual(t *testing.T) {
// Create bare accounts for this test
fooAcc := &Account{Name: "foo"}
if err := fooAcc.addStreamExport(">", nil); err != nil {
t.Fatalf("Error adding stream export: %v", err)
}

barAcc := &Account{Name: "bar"}
if err := barAcc.addStreamImport(fooAcc, "foo", "myPrefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
bazAcc := &Account{Name: "baz"}
if err := bazAcc.addStreamImport(fooAcc, "foo", "myPrefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if !barAcc.checkStreamImportsEqual(bazAcc) {
t.Fatal("Expected stream imports to be the same")
}

if err := bazAcc.addStreamImport(fooAcc, "foo.>", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if barAcc.checkStreamImportsEqual(bazAcc) {
t.Fatal("Expected stream imports to be different")
}
if err := barAcc.addStreamImport(fooAcc, "foo.>", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if !barAcc.checkStreamImportsEqual(bazAcc) {
t.Fatal("Expected stream imports to be the same")
}

// Create another account that is named "foo". We want to make sure
// that the comparison still works (based on account name, not pointer)
newFooAcc := &Account{Name: "foo"}
if err := newFooAcc.addStreamExport(">", nil); err != nil {
t.Fatalf("Error adding stream export: %v", err)
}
batAcc := &Account{Name: "bat"}
if err := batAcc.addStreamImport(newFooAcc, "foo", "myPrefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if err := batAcc.addStreamImport(newFooAcc, "foo.>", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if !batAcc.checkStreamImportsEqual(barAcc) {
t.Fatal("Expected stream imports to be the same")
}
if !batAcc.checkStreamImportsEqual(bazAcc) {
t.Fatal("Expected stream imports to be the same")
}

// Test with account with different "from"
expAcc := &Account{Name: "new_acc"}
if err := expAcc.addStreamExport(">", nil); err != nil {
t.Fatalf("Error adding stream export: %v", err)
}
aAcc := &Account{Name: "a"}
if err := aAcc.addStreamImport(expAcc, "bar", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
bAcc := &Account{Name: "b"}
if err := bAcc.addStreamImport(expAcc, "baz", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if aAcc.checkStreamImportsEqual(bAcc) {
t.Fatal("Expected stream imports to be different")
}

// Test with account with different "prefix"
aAcc = &Account{Name: "a"}
if err := aAcc.addStreamImport(expAcc, "bar", "prefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
bAcc = &Account{Name: "b"}
if err := bAcc.addStreamImport(expAcc, "bar", "diff_prefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if aAcc.checkStreamImportsEqual(bAcc) {
t.Fatal("Expected stream imports to be different")
}

// Test with account with different "name"
expAcc = &Account{Name: "diff_name"}
if err := expAcc.addStreamExport(">", nil); err != nil {
t.Fatalf("Error adding stream export: %v", err)
}
bAcc = &Account{Name: "b"}
if err := bAcc.addStreamImport(expAcc, "bar", "prefix"); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
if aAcc.checkStreamImportsEqual(bAcc) {
t.Fatal("Expected stream imports to be different")
}
}

func BenchmarkNewRouteReply(b *testing.B) {
opts := defaultServerOptions
s := New(&opts)
Expand Down
23 changes: 23 additions & 0 deletions server/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Account struct {
maxnae int
maxaettl time.Duration
pruning bool
checksti bool // check stream imports during a config reload
}

// Import stream mapping struct
Expand Down Expand Up @@ -340,6 +341,28 @@ func (a *Account) checkStreamImportAuthorized(account *Account, subject string)
return false
}

// Returns true of `a` and `b` stream imports are the same. Note that the
// check is done with the account's name, not the pointer. This is used
// during config reload where we are comparing current and new config
// in which pointers are different.
// No lock is acquired in this function, so it is assumed that the
// import maps are not changed while this execute.
func (a *Account) checkStreamImportsEqual(b *Account) bool {
if len(a.imports.streams) != len(b.imports.streams) {
return false
}
for subj, aim := range a.imports.streams {
bim := b.imports.streams[subj]
if bim == nil {
return false
}
if aim.acc.Name != bim.acc.Name || aim.from != bim.from || aim.prefix != bim.prefix {
return false
}
}
return true
}

// Check if another account is authorized to route requests to this service.
func (a *Account) checkServiceImportAuthorized(account *Account, subject string) bool {
// Find the subject in the services list.
Expand Down
84 changes: 56 additions & 28 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,7 @@ func (c *client) processSub(argo []byte) (err error) {
c.sendOK()
}
if chkImports {
if err := c.checkAccountImports(sub); err != nil {
if err := c.addShadowSubscriptions(sub); err != nil {
c.Errorf(err.Error())
}
}
Expand All @@ -1375,9 +1375,10 @@ func (c *client) processSub(argo []byte) (err error) {
return nil
}

// Check to see if we need to create a shadow subscription due to imports
// in other accounts.
func (c *client) checkAccountImports(sub *subscription) error {
// If the client's account has stream imports and there are matches for
// this subscription's subject, then add shadow subscriptions in
// other accounts that can export this subject.
func (c *client) addShadowSubscriptions(sub *subscription) error {
c.mu.Lock()
acc := c.acc
c.mu.Unlock()
Expand Down Expand Up @@ -1454,10 +1455,10 @@ func (c *client) canSubscribe(subject []byte) bool {
}

// Low level unsubscribe for a given client.
func (c *client) unsubscribe(sub *subscription) {
func (c *client) unsubscribe(sub *subscription, force bool) {
c.mu.Lock()
defer c.mu.Unlock()
if sub.max > 0 && sub.nm < sub.max {
if !force && sub.max > 0 && sub.nm < sub.max {
c.Debugf(
"Deferring actual UNSUB(%s): %d max, %d received\n",
string(sub.subject), sub.max, sub.nm)
Expand Down Expand Up @@ -1527,7 +1528,7 @@ func (c *client) processUnsub(arg []byte) error {
c.mu.Unlock()

if unsub {
c.unsubscribe(sub)
c.unsubscribe(sub, false)
}
if shouldForward {
c.srv.broadcastUnSubscribe(sub)
Expand Down Expand Up @@ -1585,11 +1586,11 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
if shouldForward {
defer srv.broadcastUnSubscribe(sub)
}
defer client.unsubscribe(sub)
defer client.unsubscribe(sub, true)
} else if sub.nm > sub.max {
c.Debugf("Auto-unsubscribe limit [%d] exceeded\n", sub.max)
client.mu.Unlock()
client.unsubscribe(sub)
client.unsubscribe(sub, true)
if shouldForward {
srv.broadcastUnSubscribe(sub)
}
Expand Down Expand Up @@ -2022,42 +2023,69 @@ func (c *client) typeString() string {
return "Unknown Type"
}

// removeUnauthorizedSubs removes any subscriptions the client has that are no
// longer authorized, e.g. due to a config reload.
func (c *client) removeUnauthorizedSubs() {
// processSubsOnConfigReload removes any subscriptions the client has that are no
// longer authorized, and check for imports (accounts) due to a config reload.
func (c *client) processSubsOnConfigReload() {
c.mu.Lock()
if c.perms == nil || c.sl == nil {
var (
checkPerms = c.perms != nil
checkAcc = c.acc != nil
)
if c.sl == nil || (!checkPerms && !checkAcc) {
c.mu.Unlock()
return
}
srv := c.srv

var subsa [32]*subscription
subs := subsa[:0]
var (
_subs [32]*subscription
subs = _subs[:0]
_removed [32]*subscription
removed = _removed[:0]
srv = c.srv
userInfo = c.opts.Nkey
)
if userInfo == "" {
userInfo = c.opts.Username
if userInfo == "" {
userInfo = fmt.Sprintf("%v", c.cid)
}
}
if checkAcc {
// We actually only want to check if stream imports have changed.
c.acc.mu.RLock()
checkAcc = c.acc.checksti
c.acc.mu.RUnlock()
}
// Collect client's subs under the lock
for _, sub := range c.subs {
subs = append(subs, sub)
}
c.mu.Unlock()

var removedSubs [32]*subscription
removed := removedSubs[:0]

// We can call canSubscribe() without locking since the permissions are updated
// from config reload code prior to calling this function. So there is no risk
// of concurrent access to c.perms.
for _, sub := range subs {
if !c.canSubscribe(sub.subject) {
if checkPerms && !c.canSubscribe(sub.subject) {
removed = append(removed, sub)
delete(c.subs, string(sub.sid))
c.unsubscribe(sub, true)
} else if checkAcc {
c.mu.Lock()
oldShadows := sub.shadow
sub.shadow = nil
c.mu.Unlock()
c.addShadowSubscriptions(sub)
for _, nsub := range oldShadows {
nsub.im.acc.sl.Remove(nsub)
}
}
}
c.mu.Unlock()

// Remove unauthorized clients subscriptions.
c.sl.RemoveBatch(removed)

// Report back to client and logs.
for _, sub := range removed {
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)",
sub.subject, sub.sid))
srv.Noticef("Removed sub %q for user %q - not authorized",
string(sub.subject), c.opts.Username)
srv.Noticef("Removed sub %q (sid %s) for user %q - not authorized",
sub.subject, sub.sid, userInfo)
}
}

Expand Down

0 comments on commit 55072ec

Please sign in to comment.