Skip to content

Commit

Permalink
Merge pull request #775 from nats-io/shadow
Browse files Browse the repository at this point in the history
Cleanup of shadowed subscriptions, fixes #772
  • Loading branch information
derekcollison authored Oct 8, 2018
2 parents 3438468 + 05bfedd commit 71eb6d8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 6 deletions.
29 changes: 25 additions & 4 deletions server/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,10 +529,8 @@ func TestSimpleMapping(t *testing.T) {
t.Fatalf("Error adding account import to client bar: %v", err)
}

// Normal Subscription on bar client.
go cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\nPING\r\n"))
_, err := crBar.ReadString('\n') // Make sure subscriptions were processed.
if err != nil {
// Normal and Queue Subscription on bar client.
if err := cbar.parse([]byte("SUB import.foo 1\r\nSUB import.foo bar 2\r\n")); err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}

Expand Down Expand Up @@ -568,6 +566,29 @@ func TestSimpleMapping(t *testing.T) {
}
checkMsg(l, "2")
checkPayload(crBar, []byte("hello\r\n"), t)

// We should have 2 subscriptions in both. Normal and Queue Subscriber
// for barAcc which are local, and 2 that are shadowed in fooAcc.
// Now make sure that when we unsubscribe we clean up properly for both.
if bslc := barAcc.sl.Count(); bslc != 2 {
t.Fatalf("Expected 2 normal subscriptions on barAcc, got %d", bslc)
}
if fslc := fooAcc.sl.Count(); fslc != 2 {
t.Fatalf("Expected 2 shadowed subscriptions on fooAcc, got %d", fslc)
}

// Now unsubscribe.
if err := cbar.parse([]byte("UNSUB 1\r\nUNSUB 2\r\n")); err != nil {
t.Fatalf("Error for client 'bar' from server: %v", err)
}

// We should have zero on both.
if bslc := barAcc.sl.Count(); bslc != 0 {
t.Fatalf("Expected no normal subscriptions on barAcc, got %d", bslc)
}
if fslc := fooAcc.sl.Count(); fslc != 0 {
t.Fatalf("Expected no shadowed subscriptions on fooAcc, got %d", fslc)
}
}

func TestNoPrefixWildcardMapping(t *testing.T) {
Expand Down
27 changes: 25 additions & 2 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ func (c *client) GetTLSConnectionState() *tls.ConnectionState {
// interest in published messages.
type subscription struct {
client *client
im *streamImport // This is for importing support.
im *streamImport // This is for import stream support.
shadow []*subscription // This is to track shadowed accounts.
subject []byte
queue []byte
sid []byte
Expand Down Expand Up @@ -1382,6 +1383,7 @@ func (c *client) checkAccountImports(sub *subscription) error {

var rims [32]*streamImport
var ims = rims[:0]

acc.mu.RLock()
for _, im := range acc.imports.streams {
if isSubsetMatch(tokens, im.prefix+im.from) {
Expand All @@ -1390,6 +1392,8 @@ func (c *client) checkAccountImports(sub *subscription) error {
}
acc.mu.RUnlock()

var shadow []*subscription

// Now walk through collected importMaps
for _, im := range ims {
// We have a match for a local subscription with an import from another account.
Expand All @@ -1402,9 +1406,20 @@ func (c *client) checkAccountImports(sub *subscription) error {
nsub.subject = sub.subject[len(im.prefix):]
}
if err := im.acc.sl.Insert(&nsub); err != nil {
return fmt.Errorf("Could not add shadow import subscription for account %q", im.acc.Name)
errs := fmt.Sprintf("Could not add shadow import subscription for account %q", im.acc.Name)
c.Debugf(errs)
return fmt.Errorf(errs)
}
if shadow == nil {
shadow = make([]*subscription, 0, len(ims))
}
shadow = append(shadow, &nsub)
}

c.mu.Lock()
sub.shadow = shadow
c.mu.Unlock()

return nil
}

Expand Down Expand Up @@ -1455,6 +1470,14 @@ func (c *client) unsubscribe(sub *subscription) {
if c.typ == CLIENT && c.srv != nil && len(sub.queue) > 0 {
c.srv.holdRemoteQSub(sub)
}

// Check to see if we have shadown subscriptions.
for _, nsub := range sub.shadow {
if err := nsub.im.acc.sl.Remove(nsub); err != nil {
c.Debugf("Could not remove shadow import subscription for account %q", nsub.im.acc.Name)
}
}
sub.shadow = nil
}

func (c *client) processUnsub(arg []byte) error {
Expand Down

0 comments on commit 71eb6d8

Please sign in to comment.