diff --git a/server/accounts_test.go b/server/accounts_test.go index a46582b262d..29e116b3702 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -1046,6 +1046,102 @@ func TestAccountGlobalDefault(t *testing.T) { } } +func TestAccountCheckStreamImportsEqual(t *testing.T) { + // Create bare accounts for this test + fooAcc := &Account{Name: "foo"} + if err := fooAcc.addStreamExport(">", nil); err != nil { + t.Fatalf("Error adding stream export: %v", err) + } + + barAcc := &Account{Name: "bar"} + if err := barAcc.addStreamImport(fooAcc, "foo", "myPrefix"); err != nil { + t.Fatalf("Error adding stream import: %v", err) + } + bazAcc := &Account{Name: "baz"} + if err := bazAcc.addStreamImport(fooAcc, "foo", "myPrefix"); err != nil { + t.Fatalf("Error adding stream import: %v", err) + } + if !barAcc.checkStreamImportsEqual(bazAcc) { + t.Fatal("Expected stream imports to be the same") + } + + if err := bazAcc.addStreamImport(fooAcc, "foo.>", ""); err != nil { + t.Fatalf("Error adding stream import: %v", err) + } + if barAcc.checkStreamImportsEqual(bazAcc) { + t.Fatal("Expected stream imports to be different") + } + if err := barAcc.addStreamImport(fooAcc, "foo.>", ""); err != nil { + t.Fatalf("Error adding stream import: %v", err) + } + if !barAcc.checkStreamImportsEqual(bazAcc) { + t.Fatal("Expected stream imports to be the same") + } + + // Create another account that is named "foo". We want to make sure + // that the comparison still works (based on account name, not pointer) + newFooAcc := &Account{Name: "foo"} + if err := newFooAcc.addStreamExport(">", nil); err != nil { + t.Fatalf("Error adding stream export: %v", err) + } + batAcc := &Account{Name: "bat"} + if err := batAcc.addStreamImport(newFooAcc, "foo", "myPrefix"); err != nil { + t.Fatalf("Error adding stream import: %v", err) + } + if err := batAcc.addStreamImport(newFooAcc, "foo.>", ""); err != nil { + t.Fatalf("Error adding stream import: %v", err) + } + if !batAcc.checkStreamImportsEqual(barAcc) { + t.Fatal("Expected stream imports to be the same") + } + if !batAcc.checkStreamImportsEqual(bazAcc) { + t.Fatal("Expected stream imports to be the same") + } + + // Test with account with different "from" + expAcc := &Account{Name: "new_acc"} + if err := expAcc.addStreamExport(">", nil); err != nil { + t.Fatalf("Error adding stream export: %v", err) + } + aAcc := &Account{Name: "a"} + if err := aAcc.addStreamImport(expAcc, "bar", ""); err != nil { + t.Fatalf("Error adding stream import: %v", err) + } + bAcc := &Account{Name: "b"} + if err := bAcc.addStreamImport(expAcc, "baz", ""); err != nil { + t.Fatalf("Error adding stream import: %v", err) + } + if aAcc.checkStreamImportsEqual(bAcc) { + t.Fatal("Expected stream imports to be different") + } + + // Test with account with different "prefix" + aAcc = &Account{Name: "a"} + if err := aAcc.addStreamImport(expAcc, "bar", "prefix"); err != nil { + t.Fatalf("Error adding stream import: %v", err) + } + bAcc = &Account{Name: "b"} + if err := bAcc.addStreamImport(expAcc, "bar", "diff_prefix"); err != nil { + t.Fatalf("Error adding stream import: %v", err) + } + if aAcc.checkStreamImportsEqual(bAcc) { + t.Fatal("Expected stream imports to be different") + } + + // Test with account with different "name" + expAcc = &Account{Name: "diff_name"} + if err := expAcc.addStreamExport(">", nil); err != nil { + t.Fatalf("Error adding stream export: %v", err) + } + bAcc = &Account{Name: "b"} + if err := bAcc.addStreamImport(expAcc, "bar", "prefix"); err != nil { + t.Fatalf("Error adding stream import: %v", err) + } + if aAcc.checkStreamImportsEqual(bAcc) { + t.Fatal("Expected stream imports to be different") + } +} + func BenchmarkNewRouteReply(b *testing.B) { opts := defaultServerOptions s := New(&opts) diff --git a/server/auth.go b/server/auth.go index 20a607c92a0..88f4e5ba710 100644 --- a/server/auth.go +++ b/server/auth.go @@ -340,6 +340,28 @@ func (a *Account) checkStreamImportAuthorized(account *Account, subject string) return false } +// Returns true if `a` and `b` stream imports are the same. Note that the +// check is done with the account's name, not the pointer. This is used +// during config reload where we are comparing current and new config +// in which pointers are different. +// No lock is acquired in this function, so it is assumed that the +// import maps are not changed while this executes. +func (a *Account) checkStreamImportsEqual(b *Account) bool { + if len(a.imports.streams) != len(b.imports.streams) { + return false + } + for subj, aim := range a.imports.streams { + bim := b.imports.streams[subj] + if bim == nil { + return false + } + if aim.acc.Name != bim.acc.Name || aim.from != bim.from || aim.prefix != bim.prefix { + return false + } + } + return true +} + // Check if another account is authorized to route requests to this service. func (a *Account) checkServiceImportAuthorized(account *Account, subject string) bool { // Find the subject in the services list. diff --git a/server/client.go b/server/client.go index 90662b6084f..b0fa2ce5e57 100644 --- a/server/client.go +++ b/server/client.go @@ -1364,7 +1364,7 @@ func (c *client) processSub(argo []byte) (err error) { c.sendOK() } if chkImports { - if err := c.checkAccountImports(sub); err != nil { + if err := c.addShadowSubscriptions(sub); err != nil { c.Errorf(err.Error()) } } @@ -1375,9 +1375,10 @@ func (c *client) processSub(argo []byte) (err error) { return nil } -// Check to see if we need to create a shadow subscription due to imports -// in other accounts. -func (c *client) checkAccountImports(sub *subscription) error { +// If the client's account has stream imports and there are matches for +// this subscription's subject, then add shadow subscriptions in +// other accounts that can export this subject. +func (c *client) addShadowSubscriptions(sub *subscription) error { c.mu.Lock() acc := c.acc c.mu.Unlock() @@ -1454,10 +1455,10 @@ func (c *client) canSubscribe(subject []byte) bool { } // Low level unsubscribe for a given client. -func (c *client) unsubscribe(sub *subscription) { +func (c *client) unsubscribe(sub *subscription, force bool) { c.mu.Lock() defer c.mu.Unlock() - if sub.max > 0 && sub.nm < sub.max { + if !force && sub.max > 0 && sub.nm < sub.max { c.Debugf( "Deferring actual UNSUB(%s): %d max, %d received\n", string(sub.subject), sub.max, sub.nm) @@ -1527,7 +1528,7 @@ func (c *client) processUnsub(arg []byte) error { c.mu.Unlock() if unsub { - c.unsubscribe(sub) + c.unsubscribe(sub, false) } if shouldForward { c.srv.broadcastUnSubscribe(sub) @@ -1585,11 +1586,11 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { if shouldForward { defer srv.broadcastUnSubscribe(sub) } - defer client.unsubscribe(sub) + defer client.unsubscribe(sub, true) } else if sub.nm > sub.max { c.Debugf("Auto-unsubscribe limit [%d] exceeded\n", sub.max) client.mu.Unlock() - client.unsubscribe(sub) + client.unsubscribe(sub, true) if shouldForward { srv.broadcastUnSubscribe(sub) } @@ -2022,42 +2023,69 @@ func (c *client) typeString() string { return "Unknown Type" } -// removeUnauthorizedSubs removes any subscriptions the client has that are no -// longer authorized, e.g. due to a config reload. -func (c *client) removeUnauthorizedSubs() { +// processSubsOnConfigReload removes any subscriptions the client has that are no +// longer authorized, and check for imports (accounts) due to a config reload. +func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) { c.mu.Lock() - if c.perms == nil || c.sl == nil { + var ( + checkPerms = c.perms != nil + checkAcc = c.acc != nil + ) + if c.sl == nil || (!checkPerms && !checkAcc) { c.mu.Unlock() return } - srv := c.srv - - var subsa [32]*subscription - subs := subsa[:0] + var ( + _subs [32]*subscription + subs = _subs[:0] + _removed [32]*subscription + removed = _removed[:0] + srv = c.srv + userInfo = c.opts.Nkey + ) + if userInfo == "" { + userInfo = c.opts.Username + if userInfo == "" { + userInfo = fmt.Sprintf("%v", c.cid) + } + } + if checkAcc { + // We actually only want to check if stream imports have changed. + if _, ok := awcsti[c.acc.Name]; !ok { + checkAcc = false + } + } + // Collect client's subs under the lock for _, sub := range c.subs { subs = append(subs, sub) } + c.mu.Unlock() - var removedSubs [32]*subscription - removed := removedSubs[:0] - + // We can call canSubscribe() without locking since the permissions are updated + // from config reload code prior to calling this function. So there is no risk + // of concurrent access to c.perms. for _, sub := range subs { - if !c.canSubscribe(sub.subject) { + if checkPerms && !c.canSubscribe(sub.subject) { removed = append(removed, sub) - delete(c.subs, string(sub.sid)) + c.unsubscribe(sub, true) + } else if checkAcc { + c.mu.Lock() + oldShadows := sub.shadow + sub.shadow = nil + c.mu.Unlock() + c.addShadowSubscriptions(sub) + for _, nsub := range oldShadows { + nsub.im.acc.sl.Remove(nsub) + } } } - c.mu.Unlock() - - // Remove unauthorized clients subscriptions. - c.sl.RemoveBatch(removed) // Report back to client and logs. for _, sub := range removed { - c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)", + c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %q)", sub.subject, sub.sid)) - srv.Noticef("Removed sub %q for user %q - not authorized", - string(sub.subject), c.opts.Username) + srv.Noticef("Removed sub %q (sid %q) for user %q - not authorized", + sub.subject, sub.sid, userInfo) } } diff --git a/server/reload.go b/server/reload.go index 161863bcfbf..e85ad4d8177 100644 --- a/server/reload.go +++ b/server/reload.go @@ -229,13 +229,22 @@ func (a *authTimeoutOption) Apply(server *Server) { // setting. type usersOption struct { authOption - newValue []*User } func (u *usersOption) Apply(server *Server) { server.Noticef("Reloaded: authorization users") } +// nkeysOption implements the option interface for the authorization `users` +// setting. +type nkeysOption struct { + authOption +} + +func (u *nkeysOption) Apply(server *Server) { + server.Noticef("Reloaded: authorization nkey users") +} + // clusterOption implements the option interface for the `cluster` setting. type clusterOption struct { authOption @@ -462,6 +471,17 @@ func (c *clientAdvertiseOption) Apply(server *Server) { server.Noticef("Reload: client_advertise = %s", c.newValue) } +// accountsOption implements the option interface. +// Ensure that authorization code is executed if any change in accounts +type accountsOption struct { + authOption +} + +// Apply is a no-op. Changes will be applied in reloadAuthorization +func (a *accountsOption) Apply(s *Server) { + s.Noticef("Reloaded: accounts") +} + // Reload reads the current configuration file and applies any supported // changes. This returns an error if the server was not started with a config // file or an option which doesn't support hot-swapping was changed. @@ -567,7 +587,9 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { case "authtimeout": diffOpts = append(diffOpts, &authTimeoutOption{newValue: newValue.(float64)}) case "users": - diffOpts = append(diffOpts, &usersOption{newValue: newValue.([]*User)}) + diffOpts = append(diffOpts, &usersOption{}) + case "nkeys": + diffOpts = append(diffOpts, &nkeysOption{}) case "cluster": newClusterOpts := newValue.(ClusterOpts) oldClusterOpts := oldValue.(ClusterOpts) @@ -604,6 +626,8 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { } } diffOpts = append(diffOpts, &clientAdvertiseOption{newValue: cliAdv}) + case "accounts": + diffOpts = append(diffOpts, &accountsOption{}) case "nolog", "nosigs": // Ignore NoLog and NoSigs options since they are not parsed and only used in // testing. @@ -663,25 +687,67 @@ func (s *Server) applyOptions(opts []option) { func (s *Server) reloadAuthorization() { s.mu.Lock() s.configureAuthorization() - clients := make(map[uint64]*client, len(s.clients)) - for i, client := range s.clients { - clients[i] = client + + // This map will contain the names of accounts that have their streams + // import configuration changed. + awcsti := make(map[string]struct{}, len(s.opts.Accounts)) + + oldAccounts := s.accounts + gAccount := oldAccounts[globalAccountName] + s.accounts = make(map[string]*Account) + s.registerAccount(gAccount) + for _, newAcc := range s.opts.Accounts { + // For existing accounts, moved updated config to existing account + if acc, ok := oldAccounts[newAcc.Name]; ok { + acc.mu.RLock() + sl := acc.sl + acc.mu.RUnlock() + newAcc.mu.Lock() + newAcc.sl = sl + // Check if current and new config of this account are same + // in term of stream imports. + if !acc.checkStreamImportsEqual(newAcc) { + awcsti[newAcc.Name] = struct{}{} + } + newAcc.mu.Unlock() + } + s.registerAccount(newAcc) } - routes := make(map[uint64]*client, len(s.routes)) - for i, route := range s.routes { - routes[i] = route + // Gather clients that changed accounts. We will close them and they + // will reconnect, doing the right thing. + var ( + cclientsa [64]*client + cclients = cclientsa[:0] + clientsa [64]*client + clients = clientsa[:0] + routesa [64]*client + routes = routesa[:0] + ) + for _, client := range s.clients { + if s.clientHasMovedToDifferentAccount(client) { + cclients = append(cclients, client) + } else { + clients = append(clients, client) + } + } + for _, route := range s.routes { + routes = append(routes, route) } s.mu.Unlock() + // Close clients that have moved accounts + for _, client := range cclients { + client.closeConnection(ClientClosed) + } + for _, client := range clients { // Disconnect any unauthorized clients. if !s.isClientAuthorized(client) { client.authViolation() continue } - - // Remove any unauthorized subscriptions. - client.removeUnauthorizedSubs() + // Remove any unauthorized subscriptions and check for account imports. + client.processSubsOnConfigReload(awcsti) } for _, route := range routes { @@ -696,6 +762,42 @@ func (s *Server) reloadAuthorization() { } } +// Returns true if given client current account has changed (or user +// no longer exist) in the new config, false if the user did not +// change account. +// Server lock is held on entry. +func (s *Server) clientHasMovedToDifferentAccount(c *client) bool { + var ( + nu *NkeyUser + u *User + ) + if c.opts.Nkey != "" { + if s.nkeys != nil { + nu = s.nkeys[c.opts.Nkey] + } + } else if c.opts.Username != "" { + if s.users != nil { + u = s.users[c.opts.Username] + } + } else { + return false + } + // Get the current account name + c.mu.Lock() + var curAccName string + if c.acc != nil { + curAccName = c.acc.Name + } + c.mu.Unlock() + if nu != nil && nu.Account != nil { + return curAccName != nu.Account.Name + } else if u != nil && u.Account != nil { + return curAccName != u.Account.Name + } + // user/nkey no longer exists. + return true +} + // reloadClusterPermissions reconfigures the cluster's permssions // and set the permissions to all existing routes, sending an // update INFO protocol so that remote can resend their local diff --git a/server/reload_test.go b/server/reload_test.go index fa064fb344f..03d0bc01638 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -14,8 +14,10 @@ package server import ( + "encoding/base64" "encoding/json" "fmt" + "github.com/nats-io/nkeys" "io/ioutil" "net" "os" @@ -1543,11 +1545,12 @@ func TestConfigReloadClusterRemoveSolicitedRoutes(t *testing.T) { } func reloadUpdateConfig(t *testing.T, s *Server, conf, content string) { + t.Helper() if err := ioutil.WriteFile(conf, []byte(content), 0666); err != nil { - stackFatalf(t, "Error creating config file: %v", err) + t.Fatalf("Error creating config file: %v", err) } if err := s.Reload(); err != nil { - stackFatalf(t, "Error on reload: %v", err) + t.Fatalf("Error on reload: %v", err) } } @@ -2431,3 +2434,658 @@ func TestConfigReloadClusterPermsOldServer(t *testing.T) { // Check that new route gets created check(t) } + +func TestConfigReloadAccountUsers(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: "127.0.0.1:-1" + accounts { + synadia { + users = [ + {user: derek, password: derek} + {user: foo, password: foo} + ] + } + nats.io { + users = [ + {user: ivan, password: ivan} + {user: bar, password: bar} + ] + } + acc_deleted_after_reload { + users = [ + {user: gone, password: soon} + {user: baz, password: baz} + {user: bat, password: bat} + ] + } + } + `)) + defer os.Remove(conf) + s, opts := RunServerWithConfig(conf) + defer s.Shutdown() + + // Connect as exisiting users, should work. + nc, err := nats.Connect(fmt.Sprintf("nats://derek:derek@%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + ch := make(chan bool, 2) + cb := func(_ *nats.Conn) { + ch <- true + } + nc2, err := nats.Connect( + fmt.Sprintf("nats://ivan:ivan@%s:%d", opts.Host, opts.Port), + nats.NoReconnect(), + nats.ClosedHandler(cb)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + nc3, err := nats.Connect( + fmt.Sprintf("nats://gone:soon@%s:%d", opts.Host, opts.Port), + nats.NoReconnect(), + nats.ClosedHandler(cb)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc3.Close() + // These users will be moved from an account to another (to a specific or to global account) + // We will create subscriptions to ensure that they are moved to proper sublists too. + rch := make(chan bool, 4) + rcb := func(_ *nats.Conn) { + rch <- true + } + nc4, err := nats.Connect(fmt.Sprintf("nats://foo:foo@%s:%d", opts.Host, opts.Port), + nats.ReconnectWait(50*time.Millisecond), nats.ReconnectHandler(rcb)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc4.Close() + if _, err := nc4.SubscribeSync("foo"); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + nc5, err := nats.Connect(fmt.Sprintf("nats://bar:bar@%s:%d", opts.Host, opts.Port), + nats.ReconnectWait(50*time.Millisecond), nats.ReconnectHandler(rcb)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc5.Close() + if _, err := nc5.SubscribeSync("bar"); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + nc6, err := nats.Connect(fmt.Sprintf("nats://baz:baz@%s:%d", opts.Host, opts.Port), + nats.ReconnectWait(50*time.Millisecond), nats.ReconnectHandler(rcb)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc6.Close() + if _, err := nc6.SubscribeSync("baz"); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + nc7, err := nats.Connect(fmt.Sprintf("nats://bat:bat@%s:%d", opts.Host, opts.Port), + nats.ReconnectWait(50*time.Millisecond), nats.ReconnectHandler(rcb)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc7.Close() + if _, err := nc7.SubscribeSync("bat"); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + // Remove user from account and whole account + reloadUpdateConfig(t, s, conf, ` + listen: "127.0.0.1:-1" + authorization { + users = [ + {user: foo, password: foo} + {user: baz, password: baz} + ] + } + accounts { + synadia { + users = [ + {user: derek, password: derek} + {user: bar, password: bar} + ] + } + nats.io { + users = [ + {user: bat, password: bat} + ] + } + } + `) + // nc2 and nc3 should be closed + if err := wait(ch); err != nil { + t.Fatal("Did not get the closed callback") + } + if err := wait(ch); err != nil { + t.Fatal("Did not get the closed callback") + } + // And first connection should still be connected + if !nc.IsConnected() { + t.Fatal("First connection should still be connected") + } + + // Old account should be gone + if s.LookupAccount("acc_deleted_after_reload") != nil { + t.Fatal("old account should be gone") + } + + // Check subscriptions. Since most of the users have been + // moving accounts, make sure we account for the reconnect + for i := 0; i < 4; i++ { + if err := wait(rch); err != nil { + t.Fatal("Did not get the reconnect cb") + } + } + // Still need to do the tests in a checkFor() because clients + // being reconnected does not mean that resent of subscriptions + // has already been processed. + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + gAcc := s.LookupAccount(globalAccountName) + gAcc.mu.RLock() + n := gAcc.sl.Count() + fooMatch := gAcc.sl.Match("foo") + bazMatch := gAcc.sl.Match("baz") + gAcc.mu.RUnlock() + if n != 2 { + return fmt.Errorf("Global account should have 2 subs, got %v", n) + } + if len(fooMatch.psubs) != 1 { + return fmt.Errorf("Global account should have foo sub") + } + if len(bazMatch.psubs) != 1 { + return fmt.Errorf("Global account should have baz sub") + } + + sAcc := s.LookupAccount("synadia") + sAcc.mu.RLock() + n = sAcc.sl.Count() + barMatch := sAcc.sl.Match("bar") + sAcc.mu.RUnlock() + if n != 1 { + return fmt.Errorf("Synadia account should have 1 sub, got %v", n) + } + if len(barMatch.psubs) != 1 { + return fmt.Errorf("Synadia account should have bar sub") + } + + nAcc := s.LookupAccount("nats.io") + nAcc.mu.RLock() + n = nAcc.sl.Count() + batMatch := nAcc.sl.Match("bat") + nAcc.mu.RUnlock() + if n != 1 { + return fmt.Errorf("Nats.io account should have 1 sub, got %v", n) + } + if len(batMatch.psubs) != 1 { + return fmt.Errorf("Synadia account should have bar sub") + } + return nil + }) +} + +func TestConfigReloadAccountNKeyUsers(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: "127.0.0.1:-1" + accounts { + synadia { + users = [ + # Derek + {nkey : UCNGL4W5QX66CFX6A6DCBVDH5VOHMI7B2UZZU7TXAUQQSI2JPHULCKBR} + ] + } + nats.io { + users = [ + # Ivan + {nkey : UDPGQVFIWZ7Q5UH4I5E6DBCZULQS6VTVBG6CYBD7JV3G3N2GMQOMNAUH} + ] + } + } + `)) + defer os.Remove(conf) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + synadia := s.LookupAccount("synadia") + nats := s.LookupAccount("nats.io") + + seed1 := "SUAPM67TC4RHQLKBX55NIQXSMATZDOZK6FNEOSS36CAYA7F7TY66LP4BOM" + seed2 := "SUAIS5JPX4X4GJ7EIIJEQ56DH2GWPYJRPWN5XJEDENJOZHCBLI7SEPUQDE" + + kp, _ := nkeys.FromSeed(seed1) + pubKey, _ := kp.PublicKey() + + c, cr, l := newClientForServer(s) + // Check for Nonce + var info nonceInfo + if err := json.Unmarshal([]byte(l[5:]), &info); err != nil { + t.Fatalf("Could not parse INFO json: %v\n", err) + } + if info.Nonce == "" { + t.Fatalf("Expected a non-empty nonce with nkeys defined") + } + sigraw, err := kp.Sign([]byte(info.Nonce)) + if err != nil { + t.Fatalf("Failed signing nonce: %v", err) + } + sig := base64.StdEncoding.EncodeToString(sigraw) + + // PING needed to flush the +OK to us. + cs := fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig) + go c.parse([]byte(cs)) + l, _ = cr.ReadString('\n') + if !strings.HasPrefix(l, "+OK") { + t.Fatalf("Expected an OK, got: %v", l) + } + if c.acc != synadia { + t.Fatalf("Expected the nkey client's account to match 'synadia', got %v", c.acc) + } + // Also test client sublist. + if c.sl != synadia.sl { + t.Fatalf("Expected the client's sublist to match 'synadia' account") + } + + // Now nats account nkey user. + kp, _ = nkeys.FromSeed(seed2) + pubKey, _ = kp.PublicKey() + + c, cr, l = newClientForServer(s) + // Check for Nonce + err = json.Unmarshal([]byte(l[5:]), &info) + if err != nil { + t.Fatalf("Could not parse INFO json: %v\n", err) + } + if info.Nonce == "" { + t.Fatalf("Expected a non-empty nonce with nkeys defined") + } + sigraw, err = kp.Sign([]byte(info.Nonce)) + if err != nil { + t.Fatalf("Failed signing nonce: %v", err) + } + sig = base64.StdEncoding.EncodeToString(sigraw) + + // PING needed to flush the +OK to us. + cs = fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig) + go c.parse([]byte(cs)) + l, _ = cr.ReadString('\n') + if !strings.HasPrefix(l, "+OK") { + t.Fatalf("Expected an OK, got: %v", l) + } + if c.acc != nats { + t.Fatalf("Expected the nkey client's account to match 'nats', got %v", c.acc) + } + // Also test client sublist. + if c.sl != nats.sl { + t.Fatalf("Expected the client's sublist to match 'nats' account") + } + + // Remove user from account and whole account + reloadUpdateConfig(t, s, conf, ` + listen: "127.0.0.1:-1" + authorization { + users = [ + # Ivan + {nkey : UDPGQVFIWZ7Q5UH4I5E6DBCZULQS6VTVBG6CYBD7JV3G3N2GMQOMNAUH} + ] + } + accounts { + nats.io { + users = [ + # Derek + {nkey : UCNGL4W5QX66CFX6A6DCBVDH5VOHMI7B2UZZU7TXAUQQSI2JPHULCKBR} + ] + } + } + `) + + s.mu.Lock() + nkeys := s.nkeys + s.mu.Unlock() + if n := len(nkeys); n != 2 { + t.Fatalf("NKeys map should have 2 users, got %v", n) + } + derek := nkeys["UCNGL4W5QX66CFX6A6DCBVDH5VOHMI7B2UZZU7TXAUQQSI2JPHULCKBR"] + if derek == nil { + t.Fatal("NKey for user Derek not found") + } + if derek.Account == nil || derek.Account.Name != "nats.io" { + t.Fatalf("Invalid account for user Derek: %#v", derek.Account) + } + ivan := nkeys["UDPGQVFIWZ7Q5UH4I5E6DBCZULQS6VTVBG6CYBD7JV3G3N2GMQOMNAUH"] + if ivan == nil { + t.Fatal("NKey for user Ivan not found") + } + if ivan.Account != nil { + t.Fatalf("Invalid account for user Ivan: %#v", ivan.Account) + } + if s.LookupAccount("synadia") != nil { + t.Fatal("Account Synadia should have been removed") + } +} + +func TestConfigReloadAccountStreamsImportExport(t *testing.T) { + template := ` + listen: "127.0.0.1:-1" + accounts { + synadia { + users [{user: derek, password: foo}] + exports = [ + {stream: "private.>", accounts: [nats.io]} + {stream: %s} + ] + } + nats.io { + users [ + {user: ivan, password: bar, permissions: {subscribe: {deny: %s}}} + ] + imports = [ + {stream: {account: "synadia", subject: %s}} + {stream: {account: "synadia", subject: "private.natsio.*"}, prefix: %s} + ] + } + } + ` + // synadia account exports "private.>" to nats.io + // synadia account exports "foo.*" + // user ivan denies subscription on "xxx" + // nats.io account imports "foo.*" from synadia + // nats.io account imports "private.natsio.*" from synadia with prefix "ivan" + conf := createConfFile(t, []byte(fmt.Sprintf(template, `"foo.*"`, `"xxx"`, `"foo.*"`, `"ivan"`))) + defer os.Remove(conf) + s, opts := RunServerWithConfig(conf) + defer s.Shutdown() + + derek, err := nats.Connect(fmt.Sprintf("nats://derek:foo@%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer derek.Close() + checkClientsCount(t, s, 1) + + ch := make(chan bool, 1) + ivan, err := nats.Connect(fmt.Sprintf("nats://ivan:bar@%s:%d", opts.Host, opts.Port), + nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { + if strings.Contains(err.Error(), "permissions violation") { + ch <- true + } + })) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ivan.Close() + checkClientsCount(t, s, 2) + + subscribe := func(t *testing.T, nc *nats.Conn, subj string) *nats.Subscription { + t.Helper() + s, err := nc.SubscribeSync(subj) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + return s + } + + subFooBar := subscribe(t, ivan, "foo.bar") + subFooBaz := subscribe(t, ivan, "foo.baz") + subFooBat := subscribe(t, ivan, "foo.bat") + subPriv := subscribe(t, ivan, "ivan.private.natsio.*") + ivan.Flush() + + publish := func(t *testing.T, nc *nats.Conn, subj string) { + t.Helper() + if err := nc.Publish(subj, []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + + nextMsg := func(t *testing.T, sub *nats.Subscription, expected bool) { + t.Helper() + dur := 100 * time.Millisecond + if expected { + dur = time.Second + } + _, err := sub.NextMsg(dur) + if expected && err != nil { + t.Fatalf("Expected a message on %s, got %v", sub.Subject, err) + } else if !expected && err != nats.ErrTimeout { + t.Fatalf("Expected a timeout on %s, got %v", sub.Subject, err) + } + } + + // Checks the derek's user sublist for presence of given subject + // interest. Boolean says if interest is expected or not. + checkSublist := func(t *testing.T, subject string, shouldBeThere bool) { + t.Helper() + dcli := s.getClient(1) + dcli.mu.Lock() + r := dcli.sl.Match(subject) + dcli.mu.Unlock() + if shouldBeThere && len(r.psubs) != 1 { + t.Fatalf("%s should have 1 match in derek's sublist, got %v", subject, len(r.psubs)) + } else if !shouldBeThere && len(r.psubs) > 0 { + t.Fatalf("%s should not be in derek's sublist", subject) + } + } + + // Publish on all subjects and the subs should receive and + // subjects should be in sublist + publish(t, derek, "foo.bar") + nextMsg(t, subFooBar, true) + checkSublist(t, "foo.bar", true) + + publish(t, derek, "foo.baz") + nextMsg(t, subFooBaz, true) + checkSublist(t, "foo.baz", true) + + publish(t, derek, "foo.bat") + nextMsg(t, subFooBat, true) + checkSublist(t, "foo.bat", true) + + publish(t, derek, "private.natsio.foo") + nextMsg(t, subPriv, true) + checkSublist(t, "private.natsio.foo", true) + + // Also make sure that intra-account subscription works OK + ivanSub := subscribe(t, ivan, "ivan.sub") + publish(t, ivan, "ivan.sub") + nextMsg(t, ivanSub, true) + derekSub := subscribe(t, derek, "derek.sub") + publish(t, derek, "derek.sub") + nextMsg(t, derekSub, true) + + // synadia account exports "private.>" to nats.io + // synadia account exports "foo.*" + // user ivan denies subscription on "foo.bat" + // nats.io account imports "foo.baz" from synadia + // nats.io account imports "private.natsio.*" from synadia with prefix "yyyy" + reloadUpdateConfig(t, s, conf, fmt.Sprintf(template, `"foo.*"`, `"foo.bat"`, `"foo.baz"`, `"yyyy"`)) + + // Sub on foo.bar should now fail to receive + publish(t, derek, "foo.bar") + nextMsg(t, subFooBar, false) + checkSublist(t, "foo.bar", false) + // But foo.baz should be received + publish(t, derek, "foo.baz") + nextMsg(t, subFooBaz, true) + checkSublist(t, "foo.baz", true) + // Due to permissions, foo.bat should not + publish(t, derek, "foo.bat") + nextMsg(t, subFooBat, false) + checkSublist(t, "foo.bat", false) + // Prefix changed, so should not be received + publish(t, derek, "private.natsio.foo") + nextMsg(t, subPriv, false) + checkSublist(t, "private.natsio.foo", false) + + // Wait for client notification of permissions error + if err := wait(ch); err != nil { + t.Fatal("Did not the permissions error") + } + + publish(t, ivan, "ivan.sub") + nextMsg(t, ivanSub, true) + publish(t, derek, "derek.sub") + nextMsg(t, derekSub, true) + + // Change export so that foo.* is no longer exported + // synadia account exports "private.>" to nats.io + // synadia account exports "xxx" + // user ivan denies subscription on "foo.bat" + // nats.io account imports "xxx" from synadia + // nats.io account imports "private.natsio.*" from synadia with prefix "ivan" + reloadUpdateConfig(t, s, conf, fmt.Sprintf(template, `"xxx"`, `"foo.bat"`, `"xxx"`, `"ivan"`)) + + publish(t, derek, "foo.bar") + nextMsg(t, subFooBar, false) + checkSublist(t, "foo.bar", false) + + publish(t, derek, "foo.baz") + nextMsg(t, subFooBaz, false) + checkSublist(t, "foo.baz", false) + + publish(t, derek, "foo.bat") + nextMsg(t, subFooBat, false) + checkSublist(t, "foo.bat", false) + + // Prefix changed back, so should receive + publish(t, derek, "private.natsio.foo") + nextMsg(t, subPriv, true) + checkSublist(t, "private.natsio.foo", true) + + publish(t, ivan, "ivan.sub") + nextMsg(t, ivanSub, true) + publish(t, derek, "derek.sub") + nextMsg(t, derekSub, true) +} + +func TestConfigReloadAccountServicesImportExport(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: "127.0.0.1:-1" + accounts { + synadia { + users [{user: derek, password: foo}] + exports = [ + {service: "pub.request"} + {service: "pub.special.request", accounts: [nats.io]} + ] + } + nats.io { + users [{user: ivan, password: bar}] + imports = [ + {service: {account: "synadia", subject: "pub.special.request"}, to: "foo"} + {service: {account: "synadia", subject: "pub.request"}, to: "bar"} + ] + } + } + `)) + defer os.Remove(conf) + s, opts := RunServerWithConfig(conf) + defer s.Shutdown() + + derek, err := nats.Connect(fmt.Sprintf("nats://derek:foo@%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer derek.Close() + checkClientsCount(t, s, 1) + + ivan, err := nats.Connect(fmt.Sprintf("nats://ivan:bar@%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ivan.Close() + checkClientsCount(t, s, 2) + + if _, err := derek.Subscribe("pub.special.request", func(m *nats.Msg) { + derek.Publish(m.Reply, []byte("reply1")) + }); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + if _, err := derek.Subscribe("pub.request", func(m *nats.Msg) { + derek.Publish(m.Reply, []byte("reply2")) + }); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + if _, err := derek.Subscribe("pub.special.request.new", func(m *nats.Msg) { + derek.Publish(m.Reply, []byte("reply3")) + }); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + // Also create one that will be used for intra-account communication + if _, err := derek.Subscribe("derek.sub", func(m *nats.Msg) { + derek.Publish(m.Reply, []byte("private")) + }); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + derek.Flush() + + // Create an intra-account sub for ivan too + if _, err := ivan.Subscribe("ivan.sub", func(m *nats.Msg) { + ivan.Publish(m.Reply, []byte("private")) + }); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + req := func(t *testing.T, nc *nats.Conn, subj string, reply string) { + t.Helper() + var timeout time.Duration + if reply != "" { + timeout = time.Second + } else { + timeout = 100 * time.Millisecond + } + msg, err := nc.Request(subj, []byte("request"), timeout) + if reply != "" { + if err != nil { + t.Fatalf("Expected reply %s on subject %s, got %v", reply, subj, err) + } + if string(msg.Data) != reply { + t.Fatalf("Expected reply %s on subject %s, got %s", reply, subj, msg.Data) + } + } else if err != nats.ErrTimeout { + t.Fatalf("Expected timeout on subject %s, got %v", subj, err) + } + } + + req(t, ivan, "foo", "reply1") + req(t, ivan, "bar", "reply2") + // This not exported/imported, so should timeout + req(t, ivan, "baz", "") + + // Check intra-account communication + req(t, ivan, "ivan.sub", "private") + req(t, derek, "derek.sub", "private") + + reloadUpdateConfig(t, s, conf, ` + listen: "127.0.0.1:-1" + accounts { + synadia { + users [{user: derek, password: foo}] + exports = [ + {service: "pub.request"} + {service: "pub.special.request", accounts: [nats.io]} + {service: "pub.special.request.new", accounts: [nats.io]} + ] + } + nats.io { + users [{user: ivan, password: bar}] + imports = [ + {service: {account: "synadia", subject: "pub.special.request"}, to: "foo"} + {service: {account: "synadia", subject: "pub.special.request.new"}, to: "baz"} + ] + } + } + `) + // This still should work + req(t, ivan, "foo", "reply1") + // This should not + req(t, ivan, "bar", "") + // This now should work + req(t, ivan, "baz", "reply3") + + // Check intra-account communication + req(t, ivan, "ivan.sub", "private") + req(t, derek, "derek.sub", "private") +}