diff --git a/server/accounts.go b/server/accounts.go index c739d0ea0f..5e7520c6dd 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -262,8 +262,7 @@ func (a *Account) String() string { // Used to create shallow copies of accounts for transfer // from opts to real accounts in server struct. -func (a *Account) shallowCopy() *Account { - na := NewAccount(a.Name) +func (a *Account) shallowCopy(na *Account) { na.Nkey = a.Nkey na.Issuer = a.Issuer @@ -303,12 +302,14 @@ func (a *Account) shallowCopy() *Account { } } } + na.mappings = a.mappings + if len(na.mappings) > 0 && na.prand == nil { + na.prand = rand.New(rand.NewSource(time.Now().UnixNano())) + } // JetStream na.jsLimits = a.jsLimits // Server config account limits. na.limits = a.limits - - return na } // nextEventID uses its own lock for better concurrency. @@ -2834,7 +2835,12 @@ func (a *Account) checkStreamImportsEqual(b *Account) bool { return true } +// Returns true if `a` and `b` stream exports are the same. +// Acquires `a` read lock, but `b` is assumed to not be accessed +// by anyone but the caller (`b` is not registered anywhere). func (a *Account) checkStreamExportsEqual(b *Account) bool { + a.mu.RLock() + defer a.mu.RUnlock() if len(a.exports.streams) != len(b.exports.streams) { return false } @@ -2843,14 +2849,29 @@ func (a *Account) checkStreamExportsEqual(b *Account) bool { if !ok { return false } - if !reflect.DeepEqual(aea, bea) { + if !isStreamExportEqual(aea, bea) { return false } } return true } +func isStreamExportEqual(a, b *streamExport) bool { + if a == nil && b == nil { + return true + } + if (a == nil && b != nil) || (a != nil && b == nil) { + return false + } + return isExportAuthEqual(&a.exportAuth, &b.exportAuth) +} + +// Returns true if `a` and `b` service exports are the same. +// Acquires `a` read lock, but `b` is assumed to not be accessed +// by anyone but the caller (`b` is not registered anywhere). func (a *Account) checkServiceExportsEqual(b *Account) bool { + a.mu.RLock() + defer a.mu.RUnlock() if len(a.exports.services) != len(b.exports.services) { return false } @@ -2859,7 +2880,66 @@ func (a *Account) checkServiceExportsEqual(b *Account) bool { if !ok { return false } - if !reflect.DeepEqual(aea, bea) { + if !isServiceExportEqual(aea, bea) { + return false + } + } + return true +} + +func isServiceExportEqual(a, b *serviceExport) bool { + if a == nil && b == nil { + return true + } + if (a == nil && b != nil) || (a != nil && b == nil) { + return false + } + if !isExportAuthEqual(&a.exportAuth, &b.exportAuth) { + return false + } + if a.acc.Name != b.acc.Name { + return false + } + if a.respType != b.respType { + return false + } + if a.latency != nil || b.latency != nil { + if (a.latency != nil && b.latency == nil) || (a.latency == nil && b.latency != nil) { + return false + } + if a.latency.sampling != b.latency.sampling { + return false + } + if a.latency.subject != b.latency.subject { + return false + } + } + return true +} + +// Returns true if `a` and `b` exportAuth structures are equal. +// Both `a` and `b` are guaranteed to be non-nil. +// Locking is handled by the caller. +func isExportAuthEqual(a, b *exportAuth) bool { + if a.tokenReq != b.tokenReq { + return false + } + if a.accountPos != b.accountPos { + return false + } + if len(a.approved) != len(b.approved) { + return false + } + for ak, av := range a.approved { + if bv, ok := b.approved[ak]; !ok || av.Name != bv.Name { + return false + } + } + if len(a.actsRevoked) != len(b.actsRevoked) { + return false + } + for ak, av := range a.actsRevoked { + if bv, ok := b.actsRevoked[ak]; !ok || av != bv { return false } } diff --git a/server/auth.go b/server/auth.go index 5045858dc0..ff63b323b0 100644 --- a/server/auth.go +++ b/server/auth.go @@ -171,13 +171,14 @@ func (p *Permissions) clone() *Permissions { // Lock is assumed held. func (s *Server) checkAuthforWarnings() { warn := false - if s.opts.Password != _EMPTY_ && !isBcrypt(s.opts.Password) { + opts := s.getOpts() + if opts.Password != _EMPTY_ && !isBcrypt(opts.Password) { warn = true } for _, u := range s.users { // Skip warn if using TLS certs based auth // unless a password has been left in the config. - if u.Password == _EMPTY_ && s.opts.TLSMap { + if u.Password == _EMPTY_ && opts.TLSMap { continue } // Check if this is our internal sys client created on the fly. diff --git a/server/client.go b/server/client.go index 1852f021e8..662dc7df85 100644 --- a/server/client.go +++ b/server/client.go @@ -4732,12 +4732,19 @@ func (c *client) kindString() string { // an older one. func (c *client) swapAccountAfterReload() { c.mu.Lock() - defer c.mu.Unlock() - if c.srv == nil { + srv := c.srv + an := c.acc.GetName() + c.mu.Unlock() + if srv == nil { return } - acc, _ := c.srv.LookupAccount(c.acc.Name) - c.acc = acc + if acc, _ := srv.LookupAccount(an); acc != nil { + c.mu.Lock() + if c.acc != acc { + c.acc = acc + } + c.mu.Unlock() + } } // processSubsOnConfigReload removes any subscriptions the client has that are no diff --git a/server/client_test.go b/server/client_test.go index bc1a9df959..ebbc036bc8 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -1543,71 +1543,6 @@ func TestClientOutboundQueueCoalesce(t *testing.T) { } } -// This test ensures that outbound queues don't cause a run on -// memory when sending something to lots of clients. -func TestClientOutboundQueueMemory(t *testing.T) { - opts := DefaultOptions() - s := RunServer(opts) - defer s.Shutdown() - - var before runtime.MemStats - var after runtime.MemStats - - var err error - clients := make([]*nats.Conn, 50000) - wait := &sync.WaitGroup{} - wait.Add(len(clients)) - - for i := 0; i < len(clients); i++ { - clients[i], err = nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s)) - if err != nil { - t.Fatalf("Error on connect: %v", err) - } - defer clients[i].Close() - - clients[i].Subscribe("test", func(m *nats.Msg) { - wait.Done() - }) - } - - runtime.GC() - runtime.ReadMemStats(&before) - - nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s)) - if err != nil { - t.Fatalf("Error on connect: %v", err) - } - defer nc.Close() - - var m [48000]byte - if err = nc.Publish("test", m[:]); err != nil { - t.Fatal(err) - } - - wait.Wait() - - runtime.GC() - runtime.ReadMemStats(&after) - - hb, ha := float64(before.HeapAlloc), float64(after.HeapAlloc) - ms := float64(len(m)) - diff := float64(ha) - float64(hb) - inc := (diff / float64(hb)) * 100 - - fmt.Printf("Message size: %.1fKB\n", ms/1024) - fmt.Printf("Subscribed clients: %d\n", len(clients)) - fmt.Printf("Heap allocs before: %.1fMB\n", hb/1024/1024) - fmt.Printf("Heap allocs after: %.1fMB\n", ha/1024/1024) - fmt.Printf("Heap allocs delta: %.1f%%\n", inc) - - // TODO: What threshold makes sense here for a failure? - /* - if inc > 10 { - t.Fatalf("memory increase was %.1f%% (should be <= 10%%)", inc) - } - */ -} - func TestClientTraceRace(t *testing.T) { opts := DefaultOptions() s := RunServer(opts) diff --git a/server/events.go b/server/events.go index 90185b5c15..4f7e8a7074 100644 --- a/server/events.go +++ b/server/events.go @@ -1717,7 +1717,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, c *client, _ *Account, sub // This will import any system level exports. func (s *Server) registerSystemImports(a *Account) { - if a == nil || !s.eventsEnabled() { + if a == nil || !s.EventsEnabled() { return } sacc := s.SystemAccount() diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 693d8bf964..683b39401d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -725,7 +725,7 @@ func (js *jetStream) setupMetaGroup() error { // If we are soliciting leafnode connections and we are sharing a system account and do not disable it with a hint, // we want to move to observer mode so that we extend the solicited cluster or supercluster but do not form our own. - cfg.Observer = s.canExtendOtherDomain() && s.opts.JetStreamExtHint != jsNoExtend + cfg.Observer = s.canExtendOtherDomain() && s.getOpts().JetStreamExtHint != jsNoExtend var bootstrap bool if ps, err := readPeerState(storeDir); err != nil { diff --git a/server/leafnode.go b/server/leafnode.go index 24dcde9a17..407f225d4c 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -697,7 +697,7 @@ func (s *Server) startLeafNodeAcceptLoop() { s.leafNodeInfo = info // Possibly override Host/Port and set IP based on Cluster.Advertise if err := s.setLeafNodeInfoHostPortAndIP(); err != nil { - s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", s.opts.LeafNode.Advertise, err) + s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err) l.Close() s.mu.Unlock() return diff --git a/server/nkey.go b/server/nkey.go index 8604c4413c..5b45edf756 100644 --- a/server/nkey.go +++ b/server/nkey.go @@ -34,7 +34,7 @@ func (s *Server) NonceRequired() bool { // nonceRequired tells us if we should send a nonce. // Lock should be held on entry. func (s *Server) nonceRequired() bool { - return s.opts.AlwaysEnableNonce || len(s.nkeys) > 0 || s.trustedKeys != nil + return s.getOpts().AlwaysEnableNonce || len(s.nkeys) > 0 || s.trustedKeys != nil } // Generate a nonce for INFO challenge. diff --git a/server/norace_test.go b/server/norace_test.go index 041738ea24..e3456440e6 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -5839,7 +5839,7 @@ func TestNoRaceEncodeConsumerStateBug(t *testing.T) { } // Performance impact on stream ingress with large number of consumers. -func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) { +func TestNoRaceJetStreamLargeNumConsumersPerfImpact(t *testing.T) { skip(t) s := RunBasicJetStreamServer(t) @@ -5931,7 +5931,7 @@ func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) { } // Performance impact on large number of consumers but sparse delivery. -func TestJetStreamLargeNumConsumersSparseDelivery(t *testing.T) { +func TestNoRaceJetStreamLargeNumConsumersSparseDelivery(t *testing.T) { skip(t) s := RunBasicJetStreamServer(t) @@ -7864,3 +7864,68 @@ func TestNoRaceJetStreamClusterLeafnodeConnectPerf(t *testing.T) { nc.Close() } } + +// This test ensures that outbound queues don't cause a run on +// memory when sending something to lots of clients. +func TestNoRaceClientOutboundQueueMemory(t *testing.T) { + opts := DefaultOptions() + s := RunServer(opts) + defer s.Shutdown() + + var before runtime.MemStats + var after runtime.MemStats + + var err error + clients := make([]*nats.Conn, 50000) + wait := &sync.WaitGroup{} + wait.Add(len(clients)) + + for i := 0; i < len(clients); i++ { + clients[i], err = nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer clients[i].Close() + + clients[i].Subscribe("test", func(m *nats.Msg) { + wait.Done() + }) + } + + runtime.GC() + runtime.ReadMemStats(&before) + + nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + var m [48000]byte + if err = nc.Publish("test", m[:]); err != nil { + t.Fatal(err) + } + + wait.Wait() + + runtime.GC() + runtime.ReadMemStats(&after) + + hb, ha := float64(before.HeapAlloc), float64(after.HeapAlloc) + ms := float64(len(m)) + diff := float64(ha) - float64(hb) + inc := (diff / float64(hb)) * 100 + + fmt.Printf("Message size: %.1fKB\n", ms/1024) + fmt.Printf("Subscribed clients: %d\n", len(clients)) + fmt.Printf("Heap allocs before: %.1fMB\n", hb/1024/1024) + fmt.Printf("Heap allocs after: %.1fMB\n", ha/1024/1024) + fmt.Printf("Heap allocs delta: %.1f%%\n", inc) + + // TODO: What threshold makes sense here for a failure? + /* + if inc > 10 { + t.Fatalf("memory increase was %.1f%% (should be <= 10%%)", inc) + } + */ +} diff --git a/server/reload.go b/server/reload.go index e498c6ae82..a64f8286a5 100644 --- a/server/reload.go +++ b/server/reload.go @@ -788,13 +788,6 @@ func (s *Server) Reload() error { func (s *Server) ReloadOptions(newOpts *Options) error { s.mu.Lock() - s.reloading = true - defer func() { - s.mu.Lock() - s.reloading = false - s.mu.Unlock() - }() - curOpts := s.getOpts() // Wipe trusted keys if needed when we have an operator. @@ -1563,98 +1556,44 @@ func (s *Server) reloadClientTraceLevel() { func (s *Server) reloadAuthorization() { // This map will contain the names of accounts that have their streams // import configuration changed. - awcsti := make(map[string]struct{}) + var awcsti map[string]struct{} checkJetStream := false + opts := s.getOpts() s.mu.Lock() + deletedAccounts := make(map[string]*Account) + // This can not be changed for now so ok to check server's trustedKeys unlocked. // If plain configured accounts, process here. if s.trustedKeys == nil { - // We need to drain the old accounts here since we have something - // new configured. We do not want s.accounts to change since that would - // mean adding a lock to lookupAccount which is what we are trying to - // optimize for with the change from a map to a sync.Map. - oldAccounts := make(map[string]*Account) - s.accounts.Range(func(k, v interface{}) bool { - acc := v.(*Account) - acc.mu.Lock() - oldAccounts[acc.Name] = acc - // Need to clear out eventing timers since they close over this account and not the new one. - clearTimer(&acc.etmr) - clearTimer(&acc.ctmr) - acc.mu.Unlock() - s.accounts.Delete(k) - return true - }) - s.gacc = nil - s.configureAccounts() - s.configureAuthorization() - s.mu.Unlock() - + // Make a map of the configured account names so we figure out the accounts + // that should be removed later on. + configAccs := make(map[string]struct{}, len(opts.Accounts)) + for _, acc := range opts.Accounts { + configAccs[acc.GetName()] = struct{}{} + } + // Now range over existing accounts and keep track of the ones deleted + // so some cleanup can be made after releasing the server lock. s.accounts.Range(func(k, v interface{}) bool { - newAcc := v.(*Account) - if acc, ok := oldAccounts[newAcc.Name]; ok { - // If account exist in latest config, "transfer" the account's - // sublist and client map to the new account. - acc.mu.RLock() - newAcc.mu.Lock() - if len(acc.clients) > 0 { - newAcc.clients = make(map[*client]struct{}, len(acc.clients)) - for c := range acc.clients { - newAcc.clients[c] = struct{}{} - } - } - // Same for leafnodes - newAcc.lleafs = append([]*client(nil), acc.lleafs...) - - newAcc.sl = acc.sl - if acc.rm != nil { - newAcc.rm = make(map[string]int32) - } - for k, v := range acc.rm { - newAcc.rm[k] = v - } - // Transfer internal client state. The configureAccounts call from above may have set up a new one. - // We need to use the old one, and the isid to not confuse internal subs. - newAcc.ic, newAcc.isid = acc.ic, acc.isid - // Transfer any JetStream state. - newAcc.js = acc.js - // Also transfer any internal accounting on different client types. We copy over all clients - // so need to copy this as well for proper accounting going forward. - newAcc.nrclients = acc.nrclients - newAcc.sysclients = acc.sysclients - newAcc.nleafs = acc.nleafs - newAcc.nrleafs = acc.nrleafs - // Process any reverse map entries. - if len(acc.imports.rrMap) > 0 { - newAcc.imports.rrMap = make(map[string][]*serviceRespEntry) - for k, v := range acc.imports.rrMap { - newAcc.imports.rrMap[k] = v - } - } - newAcc.mu.Unlock() - acc.mu.RUnlock() - - // Check if current and new config of this account are same - // in term of stream imports. - if !acc.checkStreamImportsEqual(newAcc) { - awcsti[newAcc.Name] = struct{}{} - } - - // We need to remove all old service import subs. - acc.removeAllServiceImportSubs() - newAcc.addAllServiceImportSubs() + an, acc := k.(string), v.(*Account) + // Exclude default and system account from this test since those + // may not actually be in opts.Accounts. + if an == DEFAULT_GLOBAL_ACCOUNT || an == DEFAULT_SYSTEM_ACCOUNT { + return true + } + // Check check if existing account is still in opts.Accounts. + if _, ok := configAccs[an]; !ok { + deletedAccounts[an] = acc + s.accounts.Delete(k) } return true }) - s.mu.Lock() - // Check if we had a default system account. - if s.sys != nil && s.sys.account != nil && !s.opts.NoSystemAccount { - s.accounts.Store(s.sys.account.Name, s.sys.account) - } + // This will update existing and add new ones. + awcsti, _ = s.configureAccounts(true) + s.configureAuthorization() // Double check any JetStream configs. checkJetStream = s.js != nil - } else if s.opts.AccountResolver != nil { + } else if opts.AccountResolver != nil { s.configureResolver() if _, ok := s.accResolver.(*MemAccResolver); ok { // Check preloads so we can issue warnings etc if needed. @@ -1710,7 +1649,7 @@ func (s *Server) reloadAuthorization() { routes = append(routes, route) } // Check here for any system/internal clients which will not be in the servers map of normal clients. - if s.sys != nil && s.sys.account != nil && !s.opts.NoSystemAccount { + if s.sys != nil && s.sys.account != nil && !opts.NoSystemAccount { s.accounts.Store(s.sys.account.Name, s.sys.account) } @@ -1736,6 +1675,18 @@ func (s *Server) reloadAuthorization() { } s.mu.Unlock() + // Clear some timers and remove service import subs for deleted accounts. + for _, acc := range deletedAccounts { + acc.mu.Lock() + clearTimer(&acc.etmr) + clearTimer(&acc.ctmr) + for _, se := range acc.exports.services { + se.clearResponseThresholdTimer() + } + acc.mu.Unlock() + acc.removeAllServiceImportSubs() + } + if resetCh != nil { resetCh <- struct{}{} } diff --git a/server/reload_test.go b/server/reload_test.go index 9f334db1d6..062d4248ca 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -2657,8 +2657,11 @@ func TestConfigReloadAccountUsers(t *testing.T) { fooMatch := gAcc.sl.Match("foo") bazMatch := gAcc.sl.Match("baz") gAcc.mu.RUnlock() - if n != 2 { - return fmt.Errorf("Global account should have 2 subs, got %v", n) + // The number of subscriptions should be 3 ($SYS.REQ.ACCOUNT.PING.CONNZ, + // $SYS.REQ.ACCOUNT.PING.STATZ, $SYS.REQ.SERVER.PING.CONNZ) + // + 2 (foo and baz) + if n != 5 { + return fmt.Errorf("Global account should have 5 subs, got %v", n) } if len(fooMatch.psubs) != 1 { return fmt.Errorf("Global account should have foo sub") @@ -3897,7 +3900,7 @@ func TestConfigReloadConnectErrReports(t *testing.T) { } } -func TestAuthReloadDoesNotBreakRouteInterest(t *testing.T) { +func TestConfigReloadAuthDoesNotBreakRouteInterest(t *testing.T) { s, opts := RunServerWithConfig("./configs/seed_tls.conf") defer s.Shutdown() @@ -4047,7 +4050,7 @@ func TestConfigReloadAccountResolverTLSConfig(t *testing.T) { } } -func TestLoggingReload(t *testing.T) { +func TestConfigReloadLogging(t *testing.T) { // This test basically starts a server and causes it's configuration to be reloaded 3 times. // Each time, a new log file is created and trace levels are turned, off - on - off. @@ -4174,7 +4177,7 @@ func TestLoggingReload(t *testing.T) { check("off-post.log", tracingAbsent) } -func TestReloadValidate(t *testing.T) { +func TestConfigReloadValidate(t *testing.T) { confFileName := createConfFile(t, []byte(` listen: "127.0.0.1:-1" no_auth_user: a @@ -4234,12 +4237,14 @@ func TestConfigReloadAccounts(t *testing.T) { urlSys := fmt.Sprintf("nats://sys:pwd@%s:%d", o.Host, o.Port) urlUsr := fmt.Sprintf("nats://usr:pwd@%s:%d", o.Host, o.Port) - oldAcc, ok := s.accounts.Load("SYS") + oldAcci, ok := s.accounts.Load("SYS") if !ok { t.Fatal("No SYS account") } + oldAcc := oldAcci.(*Account) - testSrvState := func(oldAcc interface{}) { + testSrvState := func(oldAcc *Account) { + t.Helper() sysAcc := s.SystemAccount() s.mu.Lock() defer s.mu.Unlock() @@ -4252,11 +4257,13 @@ func TestConfigReloadAccounts(t *testing.T) { if s.opts.SystemAccount != "SYS" { t.Fatal("Found wrong sys.account") } - // This will fail prior to system account reload - if acc, ok := s.accounts.Load(s.opts.SystemAccount); !ok { - t.Fatal("Found different sys.account pointer") - } else if acc == oldAcc { - t.Fatal("System account is unaltered") + ai, ok := s.accounts.Load(s.opts.SystemAccount) + if !ok { + t.Fatalf("System account %q not found in s.accounts map", s.opts.SystemAccount) + } + acc := ai.(*Account) + if acc != oldAcc { + t.Fatalf("System account pointer was changed during reload, was %p now %p", oldAcc, acc) } if s.sys.client == nil { t.Fatal("Expected sys.client to be non-nil") @@ -4324,7 +4331,7 @@ func TestConfigReloadAccounts(t *testing.T) { } } - testSrvState(nil) + testSrvState(oldAcc) c1, s1C, s1D := subscribe("SYS1") defer c1.Close() defer s1C.Unsubscribe() @@ -4508,3 +4515,73 @@ func TestConfigReloadWithSysAccountOnly(t *testing.T) { // ok } } + +func TestConfigReloadGlobalAccountWithMappingAndJetStream(t *testing.T) { + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + mappings { + subj.orig: subj.mapped.before.reload + } + + leaf { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + # For access to system account. + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } + ` + c := createJetStreamClusterWithTemplate(t, tmpl, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Verify that mapping works + checkMapping := func(expectedSubj string) { + t.Helper() + sub := natsSubSync(t, nc, "subj.>") + defer sub.Unsubscribe() + natsPub(t, nc, "subj.orig", nil) + msg := natsNexMsg(t, sub, time.Second) + if msg.Subject != expectedSubj { + t.Fatalf("Expected subject to have been mapped to %q, got %q", expectedSubj, msg.Subject) + } + } + checkMapping("subj.mapped.before.reload") + + // Create a stream and check that we can get the INFO + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Replicas: 3, + Subjects: []string{"foo"}, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + c.waitOnStreamLeader(globalAccountName, "TEST") + + _, err = js.StreamInfo("TEST") + require_NoError(t, err) + + // Change mapping on all servers and issue reload + for i, s := range c.servers { + opts := c.opts[i] + content, err := os.ReadFile(opts.ConfigFile) + require_NoError(t, err) + reloadUpdateConfig(t, s, opts.ConfigFile, strings.Replace(string(content), "subj.mapped.before.reload", "subj.mapped.after.reload", 1)) + } + // Make sure the cluster is still formed + checkClusterFormed(t, c.servers...) + // Now repeat the test for the subject mapping and stream info + checkMapping("subj.mapped.after.reload") + _, err = js.StreamInfo("TEST") + require_NoError(t, err) +} diff --git a/server/route.go b/server/route.go index 4047cd2c07..b135ce500e 100644 --- a/server/route.go +++ b/server/route.go @@ -1732,7 +1732,7 @@ func (s *Server) startRouteAcceptLoop() { s.routeInfo = info // Possibly override Host/Port and set IP based on Cluster.Advertise if err := s.setRouteInfoHostPortAndIP(); err != nil { - s.Fatalf("Error setting route INFO with Cluster.Advertise value of %s, err=%v", s.opts.Cluster.Advertise, err) + s.Fatalf("Error setting route INFO with Cluster.Advertise value of %s, err=%v", opts.Cluster.Advertise, err) l.Close() s.mu.Unlock() return @@ -1772,8 +1772,9 @@ func (s *Server) startRouteAcceptLoop() { // Similar to setInfoHostPortAndGenerateJSON, but for routeInfo. func (s *Server) setRouteInfoHostPortAndIP() error { - if s.opts.Cluster.Advertise != "" { - advHost, advPort, err := parseHostPort(s.opts.Cluster.Advertise, s.opts.Cluster.Port) + opts := s.getOpts() + if opts.Cluster.Advertise != _EMPTY_ { + advHost, advPort, err := parseHostPort(opts.Cluster.Advertise, opts.Cluster.Port) if err != nil { return err } @@ -1781,8 +1782,8 @@ func (s *Server) setRouteInfoHostPortAndIP() error { s.routeInfo.Port = advPort s.routeInfo.IP = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(advHost, strconv.Itoa(advPort))) } else { - s.routeInfo.Host = s.opts.Cluster.Host - s.routeInfo.Port = s.opts.Cluster.Port + s.routeInfo.Host = opts.Cluster.Host + s.routeInfo.Port = opts.Cluster.Port s.routeInfo.IP = "" } // (re)generate the routeInfoJSON byte array diff --git a/server/server.go b/server/server.go index 6022e1b301..7754d6343d 100644 --- a/server/server.go +++ b/server/server.go @@ -120,7 +120,6 @@ type Server struct { opts *Options running bool shutdown bool - reloading bool listener net.Listener listenerErr error gacc *Account @@ -531,22 +530,22 @@ func NewServer(opts *Options) (*Server, error) { s.mu.Unlock() var a *Account // perform direct lookup to avoid warning trace - if _, err := fetchAccount(ar, s.opts.SystemAccount); err == nil { - a, _ = s.lookupAccount(s.opts.SystemAccount) + if _, err := fetchAccount(ar, opts.SystemAccount); err == nil { + a, _ = s.lookupAccount(opts.SystemAccount) } s.mu.Lock() if a == nil { - sac := NewAccount(s.opts.SystemAccount) + sac := NewAccount(opts.SystemAccount) sac.Issuer = opts.TrustedOperators[0].Issuer sac.signingKeys = map[string]jwt.Scope{} - sac.signingKeys[s.opts.SystemAccount] = nil + sac.signingKeys[opts.SystemAccount] = nil s.registerAccountNoLock(sac) } } } // For tracking accounts - if err := s.configureAccounts(); err != nil { + if _, err := s.configureAccounts(false); err != nil { return nil, err } @@ -739,40 +738,85 @@ func (s *Server) globalAccount() *Account { return gacc } -// Used to setup Accounts. -// Lock is held upon entry. -func (s *Server) configureAccounts() error { +// Used to setup or update Accounts. +// Returns a map that indicates which accounts have had their stream imports +// changed (in case of an update in configuration reload). +// Lock is held upon entry, but will be released/reacquired in this function. +func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) { + awcsti := make(map[string]struct{}) + // Create the global account. if s.gacc == nil { s.gacc = NewAccount(globalAccountName) s.registerAccountNoLock(s.gacc) } - opts := s.opts + opts := s.getOpts() // Check opts and walk through them. We need to copy them here // so that we do not keep a real one sitting in the options. - for _, acc := range s.opts.Accounts { + for _, acc := range opts.Accounts { var a *Account - if acc.Name == globalAccountName { - a = s.gacc - } else { - a = acc.shallowCopy() + create := true + // For the global account, we want to skip the reload process + // and fall back into the "create" case which will in that + // case really be just an update (shallowCopy will make sure + // that mappings are copied over). + if reloading && acc.Name != globalAccountName { + if ai, ok := s.accounts.Load(acc.Name); ok { + a = ai.(*Account) + a.mu.Lock() + // Before updating the account, check if stream imports have changed. + if !a.checkStreamImportsEqual(acc) { + awcsti[acc.Name] = struct{}{} + } + // Collect the sids for the service imports since we are going to + // replace with new ones. + var sids [][]byte + c := a.ic + for _, si := range a.imports.services { + if c != nil && si.sid != nil { + sids = append(sids, si.sid) + } + } + // Now reset all export/imports fields since they are going to be + // filled in shallowCopy() + a.imports.streams, a.imports.services = nil, nil + a.exports.streams, a.exports.services = nil, nil + // We call shallowCopy from the account `acc` (the one in Options) + // and pass `a` (our existing account) to get it updated. + acc.shallowCopy(a) + a.mu.Unlock() + // Need to release the lock for this. + s.mu.Unlock() + for _, sid := range sids { + c.processUnsub(sid) + } + // Add subscriptions for existing service imports. + a.addAllServiceImportSubs() + s.mu.Lock() + create = false + } } - if acc.hasMappings() { - // For now just move and wipe from opts.Accounts version. - a.mappings = acc.mappings - acc.mappings = nil - // We use this for selecting between multiple weighted destinations. - a.prand = rand.New(rand.NewSource(time.Now().UnixNano())) + if create { + if acc.Name == globalAccountName { + a = s.gacc + } else { + a = NewAccount(acc.Name) + } + // Locking matters in the case of an update of the global account + a.mu.Lock() + acc.shallowCopy(a) + a.mu.Unlock() + // Will be a no-op in case of the global account since it is alrady registered. + s.registerAccountNoLock(a) } - acc.sl = nil - acc.clients = nil - s.registerAccountNoLock(a) + // The `acc` account is stored in options, not in the server, and these can be cleared. + acc.sl, acc.clients, acc.mappings = nil, nil, nil // If we see an account defined using $SYS we will make sure that is set as system account. if acc.Name == DEFAULT_SYSTEM_ACCOUNT && opts.SystemAccount == _EMPTY_ { - s.opts.SystemAccount = DEFAULT_SYSTEM_ACCOUNT + opts.SystemAccount = DEFAULT_SYSTEM_ACCOUNT } } @@ -791,6 +835,7 @@ func (s *Server) configureAccounts() error { s.accounts.Range(func(k, v interface{}) bool { numAccounts++ acc := v.(*Account) + acc.mu.Lock() // Exports for _, se := range acc.exports.streams { if se != nil { @@ -817,16 +862,30 @@ func (s *Server) configureAccounts() error { for _, si := range acc.imports.services { if v, ok := s.accounts.Load(si.acc.Name); ok { si.acc = v.(*Account) + // It is possible to allow for latency tracking inside your + // own account, so lock only when not the same account. + if si.acc == acc { + si.se = si.acc.getServiceExport(si.to) + continue + } + si.acc.mu.RLock() si.se = si.acc.getServiceExport(si.to) + si.acc.mu.RUnlock() } } // Make sure the subs are running, but only if not reloading. - if len(acc.imports.services) > 0 && acc.ic == nil && !s.reloading { + if len(acc.imports.services) > 0 && acc.ic == nil && !reloading { acc.ic = s.createInternalAccountClient() acc.ic.acc = acc + // Need to release locks to invoke this function. + acc.mu.Unlock() + s.mu.Unlock() acc.addAllServiceImportSubs() + s.mu.Lock() + acc.mu.Lock() } acc.updated = time.Now().UTC() + acc.mu.Unlock() return true }) @@ -846,14 +905,14 @@ func (s *Server) configureAccounts() error { s.mu.Lock() } if err != nil { - return fmt.Errorf("error resolving system account: %v", err) + return awcsti, fmt.Errorf("error resolving system account: %v", err) } // If we have defined a system account here check to see if its just us and the $G account. // We would do this to add user/pass to the system account. If this is the case add in // no-auth-user for $G. // Only do this if non-operator mode. - if len(opts.TrustedOperators) == 0 && numAccounts == 2 && s.opts.NoAuthUser == _EMPTY_ { + if len(opts.TrustedOperators) == 0 && numAccounts == 2 && opts.NoAuthUser == _EMPTY_ { // If we come here from config reload, let's not recreate the fake user name otherwise // it will cause currently clients to be disconnected. uname := s.sysAccOnlyNoAuthUser @@ -868,12 +927,12 @@ func (s *Server) configureAccounts() error { uname = fmt.Sprintf("nats-%s", b[:]) s.sysAccOnlyNoAuthUser = uname } - s.opts.Users = append(s.opts.Users, &User{Username: uname, Password: uname[6:], Account: s.gacc}) - s.opts.NoAuthUser = uname + opts.Users = append(opts.Users, &User{Username: uname, Password: uname[6:], Account: s.gacc}) + opts.NoAuthUser = uname } } - return nil + return awcsti, nil } // Setup the account resolver. For memory resolver, make sure the JWTs are @@ -1002,16 +1061,17 @@ func (s *Server) isTrustedIssuer(issuer string) bool { // options-based trusted nkeys. Returns success. func (s *Server) processTrustedKeys() bool { s.strictSigningKeyUsage = map[string]struct{}{} + opts := s.getOpts() if trustedKeys != _EMPTY_ && !s.initStampedTrustedKeys() { return false - } else if s.opts.TrustedKeys != nil { - for _, key := range s.opts.TrustedKeys { + } else if opts.TrustedKeys != nil { + for _, key := range opts.TrustedKeys { if !nkeys.IsValidPublicOperatorKey(key) { return false } } - s.trustedKeys = append([]string(nil), s.opts.TrustedKeys...) - for _, claim := range s.opts.TrustedOperators { + s.trustedKeys = append([]string(nil), opts.TrustedKeys...) + for _, claim := range opts.TrustedOperators { if !claim.StrictSigningKeyUsage { continue } @@ -1044,7 +1104,7 @@ func checkTrustedKeyString(keys string) []string { // it succeeded or not. func (s *Server) initStampedTrustedKeys() bool { // Check to see if we have an override in options, which will cause us to fail. - if len(s.opts.TrustedKeys) > 0 { + if len(s.getOpts().TrustedKeys) > 0 { return false } tks := checkTrustedKeyString(trustedKeys) @@ -1336,7 +1396,8 @@ func (s *Server) createInternalClient(kind int) *client { // efficient propagation. // Lock should be held on entry. func (s *Server) shouldTrackSubscriptions() bool { - return (s.opts.Cluster.Port != 0 || s.opts.Gateway.Port != 0) + opts := s.getOpts() + return (opts.Cluster.Port != 0 || opts.Gateway.Port != 0) } // Invokes registerAccountNoLock under the protection of the server lock. @@ -1740,8 +1801,9 @@ func (s *Server) Start() { // In operator mode, when the account resolver depends on an external system and // the system account is the bootstrapping account, start fetching it. if len(opts.TrustedOperators) == 1 && opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT { + opts := s.getOpts() _, isMemResolver := ar.(*MemAccResolver) - if v, ok := s.accounts.Load(s.opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == "" { + if v, ok := s.accounts.Load(opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == _EMPTY_ { s.Noticef("Using bootstrapping system account") s.startGoRoutine(func() { defer s.grWG.Done() @@ -1753,7 +1815,7 @@ func (s *Server) Start() { return case <-t.C: sacc := s.SystemAccount() - if claimJWT, err := fetchAccount(ar, s.opts.SystemAccount); err != nil { + if claimJWT, err := fetchAccount(ar, opts.SystemAccount); err != nil { continue } else if err = s.updateAccountWithClaimJWT(sacc, claimJWT); err != nil { continue @@ -2121,7 +2183,7 @@ func (s *Server) AcceptLoop(clr chan struct{}) { // server's info Host/Port with either values from Options or // ClientAdvertise. if err := s.setInfoHostPort(); err != nil { - s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", s.opts.ClientAdvertise, err) + s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err) l.Close() s.mu.Unlock() return @@ -2199,16 +2261,17 @@ func (s *Server) setInfoHostPort() error { // When this function is called, opts.Port is set to the actual listen // port (if option was originally set to RANDOM), even during a config // reload. So use of s.opts.Port is safe. - if s.opts.ClientAdvertise != _EMPTY_ { - h, p, err := parseHostPort(s.opts.ClientAdvertise, s.opts.Port) + opts := s.getOpts() + if opts.ClientAdvertise != _EMPTY_ { + h, p, err := parseHostPort(opts.ClientAdvertise, opts.Port) if err != nil { return err } s.info.Host = h s.info.Port = p } else { - s.info.Host = s.opts.Host - s.info.Port = s.opts.Port + s.info.Host = opts.Host + s.info.Port = opts.Port } return nil } diff --git a/server/sublist.go b/server/sublist.go index bcf271d8e2..47d45999fa 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -820,9 +820,13 @@ func (s *Sublist) RemoveBatch(subs []*subscription) error { // Turn off our cache if enabled. wasEnabled := s.cache != nil s.cache = nil + // We will try to remove all subscriptions but will report the first that caused + // an error. In other words, we don't bail out at the first error which would + // possibly leave a bunch of subscriptions that could have been removed. + var err error for _, sub := range subs { - if err := s.remove(sub, false, false); err != nil { - return err + if lerr := s.remove(sub, false, false); lerr != nil && err == nil { + err = lerr } } // Turn caching back on here. @@ -830,7 +834,7 @@ func (s *Sublist) RemoveBatch(subs []*subscription) error { if wasEnabled { s.cache = make(map[string]*SublistResult) } - return nil + return err } // pruneNode is used to prune an empty node from the tree. diff --git a/server/sublist_test.go b/server/sublist_test.go index b9e12a2646..4d3e913f07 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -370,6 +370,30 @@ func TestSublistNoCacheRemoveBatch(t *testing.T) { } } +func TestSublistRemoveBatchWithError(t *testing.T) { + s := NewSublistNoCache() + sub1 := newSub("foo") + sub2 := newSub("bar") + sub3 := newSub("baz") + s.Insert(sub1) + s.Insert(sub2) + s.Insert(sub3) + subNotPresent := newSub("not.inserted") + // Try to remove all subs, but include the sub that has not been inserted. + err := s.RemoveBatch([]*subscription{subNotPresent, sub1, sub3}) + // We expect an error to be returned, but sub1,2 and 3 to have been removed. + require_Error(t, err, ErrNotFound) + // Make sure that we have only sub2 present + verifyCount(s, 1, t) + r := s.Match("bar") + verifyLen(r.psubs, 1, t) + verifyMember(r.psubs, sub2, t) + r = s.Match("foo") + verifyLen(r.psubs, 0, t) + r = s.Match("baz") + verifyLen(r.psubs, 0, t) +} + func testSublistInvalidSubjectsInsert(t *testing.T, s *Sublist) { // Insert, or subscriptions, can have wildcards, but not empty tokens, // and can not have a FWC that is not the terminal token.