From 840c264f450dbc1a100684f8242fa4ee2518a716 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 20 Mar 2023 17:45:48 -0600 Subject: [PATCH 1/4] Cleanup use of s.opts and fixed some lock (deadlock/inversion) issues One should not access s.opts directly but instead use s.getOpts(). Also, server lock needs to be released when performing an account lookup (since this may result in server lock being acquired). A function was calling s.LookupAccount under the client lock, which technically creates a lock inversion situation. Signed-off-by: Ivan Kozlovic --- server/auth.go | 5 ++-- server/client.go | 15 ++++++++--- server/jetstream_cluster.go | 2 +- server/leafnode.go | 2 +- server/nkey.go | 2 +- server/reload.go | 7 +++--- server/route.go | 11 ++++---- server/server.go | 50 ++++++++++++++++++++----------------- 8 files changed, 54 insertions(+), 40 deletions(-) diff --git a/server/auth.go b/server/auth.go index 5045858dc08..ff63b323b02 100644 --- a/server/auth.go +++ b/server/auth.go @@ -171,13 +171,14 @@ func (p *Permissions) clone() *Permissions { // Lock is assumed held. func (s *Server) checkAuthforWarnings() { warn := false - if s.opts.Password != _EMPTY_ && !isBcrypt(s.opts.Password) { + opts := s.getOpts() + if opts.Password != _EMPTY_ && !isBcrypt(opts.Password) { warn = true } for _, u := range s.users { // Skip warn if using TLS certs based auth // unless a password has been left in the config. - if u.Password == _EMPTY_ && s.opts.TLSMap { + if u.Password == _EMPTY_ && opts.TLSMap { continue } // Check if this is our internal sys client created on the fly. diff --git a/server/client.go b/server/client.go index 1852f021e80..662dc7df850 100644 --- a/server/client.go +++ b/server/client.go @@ -4732,12 +4732,19 @@ func (c *client) kindString() string { // an older one. func (c *client) swapAccountAfterReload() { c.mu.Lock() - defer c.mu.Unlock() - if c.srv == nil { + srv := c.srv + an := c.acc.GetName() + c.mu.Unlock() + if srv == nil { return } - acc, _ := c.srv.LookupAccount(c.acc.Name) - c.acc = acc + if acc, _ := srv.LookupAccount(an); acc != nil { + c.mu.Lock() + if c.acc != acc { + c.acc = acc + } + c.mu.Unlock() + } } // processSubsOnConfigReload removes any subscriptions the client has that are no diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 693d8bf964e..683b39401d7 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -725,7 +725,7 @@ func (js *jetStream) setupMetaGroup() error { // If we are soliciting leafnode connections and we are sharing a system account and do not disable it with a hint, // we want to move to observer mode so that we extend the solicited cluster or supercluster but do not form our own. - cfg.Observer = s.canExtendOtherDomain() && s.opts.JetStreamExtHint != jsNoExtend + cfg.Observer = s.canExtendOtherDomain() && s.getOpts().JetStreamExtHint != jsNoExtend var bootstrap bool if ps, err := readPeerState(storeDir); err != nil { diff --git a/server/leafnode.go b/server/leafnode.go index 24dcde9a17f..407f225d4c1 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -697,7 +697,7 @@ func (s *Server) startLeafNodeAcceptLoop() { s.leafNodeInfo = info // Possibly override Host/Port and set IP based on Cluster.Advertise if err := s.setLeafNodeInfoHostPortAndIP(); err != nil { - s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", s.opts.LeafNode.Advertise, err) + s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err) l.Close() s.mu.Unlock() return diff --git a/server/nkey.go b/server/nkey.go index 8604c4413c8..5b45edf7566 100644 --- a/server/nkey.go +++ b/server/nkey.go @@ -34,7 +34,7 @@ func (s *Server) NonceRequired() bool { // nonceRequired tells us if we should send a nonce. // Lock should be held on entry. func (s *Server) nonceRequired() bool { - return s.opts.AlwaysEnableNonce || len(s.nkeys) > 0 || s.trustedKeys != nil + return s.getOpts().AlwaysEnableNonce || len(s.nkeys) > 0 || s.trustedKeys != nil } // Generate a nonce for INFO challenge. diff --git a/server/reload.go b/server/reload.go index e498c6ae823..02cb326c05f 100644 --- a/server/reload.go +++ b/server/reload.go @@ -1565,6 +1565,7 @@ func (s *Server) reloadAuthorization() { // import configuration changed. awcsti := make(map[string]struct{}) checkJetStream := false + opts := s.getOpts() s.mu.Lock() // This can not be changed for now so ok to check server's trustedKeys unlocked. @@ -1649,12 +1650,12 @@ func (s *Server) reloadAuthorization() { }) s.mu.Lock() // Check if we had a default system account. - if s.sys != nil && s.sys.account != nil && !s.opts.NoSystemAccount { + if s.sys != nil && s.sys.account != nil && !opts.NoSystemAccount { s.accounts.Store(s.sys.account.Name, s.sys.account) } // Double check any JetStream configs. checkJetStream = s.js != nil - } else if s.opts.AccountResolver != nil { + } else if opts.AccountResolver != nil { s.configureResolver() if _, ok := s.accResolver.(*MemAccResolver); ok { // Check preloads so we can issue warnings etc if needed. @@ -1710,7 +1711,7 @@ func (s *Server) reloadAuthorization() { routes = append(routes, route) } // Check here for any system/internal clients which will not be in the servers map of normal clients. - if s.sys != nil && s.sys.account != nil && !s.opts.NoSystemAccount { + if s.sys != nil && s.sys.account != nil && !opts.NoSystemAccount { s.accounts.Store(s.sys.account.Name, s.sys.account) } diff --git a/server/route.go b/server/route.go index 4047cd2c07a..ccbcb5361cb 100644 --- a/server/route.go +++ b/server/route.go @@ -1732,7 +1732,7 @@ func (s *Server) startRouteAcceptLoop() { s.routeInfo = info // Possibly override Host/Port and set IP based on Cluster.Advertise if err := s.setRouteInfoHostPortAndIP(); err != nil { - s.Fatalf("Error setting route INFO with Cluster.Advertise value of %s, err=%v", s.opts.Cluster.Advertise, err) + s.Fatalf("Error setting route INFO with Cluster.Advertise value of %s, err=%v", opts.Cluster.Advertise, err) l.Close() s.mu.Unlock() return @@ -1772,8 +1772,9 @@ func (s *Server) startRouteAcceptLoop() { // Similar to setInfoHostPortAndGenerateJSON, but for routeInfo. func (s *Server) setRouteInfoHostPortAndIP() error { - if s.opts.Cluster.Advertise != "" { - advHost, advPort, err := parseHostPort(s.opts.Cluster.Advertise, s.opts.Cluster.Port) + opts := s.getOpts() + if opts.Cluster.Advertise != "" { + advHost, advPort, err := parseHostPort(opts.Cluster.Advertise, opts.Cluster.Port) if err != nil { return err } @@ -1781,8 +1782,8 @@ func (s *Server) setRouteInfoHostPortAndIP() error { s.routeInfo.Port = advPort s.routeInfo.IP = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(advHost, strconv.Itoa(advPort))) } else { - s.routeInfo.Host = s.opts.Cluster.Host - s.routeInfo.Port = s.opts.Cluster.Port + s.routeInfo.Host = opts.Cluster.Host + s.routeInfo.Port = opts.Cluster.Port s.routeInfo.IP = "" } // (re)generate the routeInfoJSON byte array diff --git a/server/server.go b/server/server.go index 6022e1b301b..10be531284c 100644 --- a/server/server.go +++ b/server/server.go @@ -531,15 +531,15 @@ func NewServer(opts *Options) (*Server, error) { s.mu.Unlock() var a *Account // perform direct lookup to avoid warning trace - if _, err := fetchAccount(ar, s.opts.SystemAccount); err == nil { - a, _ = s.lookupAccount(s.opts.SystemAccount) + if _, err := fetchAccount(ar, opts.SystemAccount); err == nil { + a, _ = s.lookupAccount(opts.SystemAccount) } s.mu.Lock() if a == nil { - sac := NewAccount(s.opts.SystemAccount) + sac := NewAccount(opts.SystemAccount) sac.Issuer = opts.TrustedOperators[0].Issuer sac.signingKeys = map[string]jwt.Scope{} - sac.signingKeys[s.opts.SystemAccount] = nil + sac.signingKeys[opts.SystemAccount] = nil s.registerAccountNoLock(sac) } } @@ -748,11 +748,11 @@ func (s *Server) configureAccounts() error { s.registerAccountNoLock(s.gacc) } - opts := s.opts + opts := s.getOpts() // Check opts and walk through them. We need to copy them here // so that we do not keep a real one sitting in the options. - for _, acc := range s.opts.Accounts { + for _, acc := range opts.Accounts { var a *Account if acc.Name == globalAccountName { a = s.gacc @@ -772,7 +772,7 @@ func (s *Server) configureAccounts() error { // If we see an account defined using $SYS we will make sure that is set as system account. if acc.Name == DEFAULT_SYSTEM_ACCOUNT && opts.SystemAccount == _EMPTY_ { - s.opts.SystemAccount = DEFAULT_SYSTEM_ACCOUNT + opts.SystemAccount = DEFAULT_SYSTEM_ACCOUNT } } @@ -853,7 +853,7 @@ func (s *Server) configureAccounts() error { // We would do this to add user/pass to the system account. If this is the case add in // no-auth-user for $G. // Only do this if non-operator mode. - if len(opts.TrustedOperators) == 0 && numAccounts == 2 && s.opts.NoAuthUser == _EMPTY_ { + if len(opts.TrustedOperators) == 0 && numAccounts == 2 && opts.NoAuthUser == _EMPTY_ { // If we come here from config reload, let's not recreate the fake user name otherwise // it will cause currently clients to be disconnected. uname := s.sysAccOnlyNoAuthUser @@ -868,8 +868,8 @@ func (s *Server) configureAccounts() error { uname = fmt.Sprintf("nats-%s", b[:]) s.sysAccOnlyNoAuthUser = uname } - s.opts.Users = append(s.opts.Users, &User{Username: uname, Password: uname[6:], Account: s.gacc}) - s.opts.NoAuthUser = uname + opts.Users = append(opts.Users, &User{Username: uname, Password: uname[6:], Account: s.gacc}) + opts.NoAuthUser = uname } } @@ -1002,16 +1002,17 @@ func (s *Server) isTrustedIssuer(issuer string) bool { // options-based trusted nkeys. Returns success. func (s *Server) processTrustedKeys() bool { s.strictSigningKeyUsage = map[string]struct{}{} + opts := s.getOpts() if trustedKeys != _EMPTY_ && !s.initStampedTrustedKeys() { return false - } else if s.opts.TrustedKeys != nil { - for _, key := range s.opts.TrustedKeys { + } else if opts.TrustedKeys != nil { + for _, key := range opts.TrustedKeys { if !nkeys.IsValidPublicOperatorKey(key) { return false } } - s.trustedKeys = append([]string(nil), s.opts.TrustedKeys...) - for _, claim := range s.opts.TrustedOperators { + s.trustedKeys = append([]string(nil), opts.TrustedKeys...) + for _, claim := range opts.TrustedOperators { if !claim.StrictSigningKeyUsage { continue } @@ -1044,7 +1045,7 @@ func checkTrustedKeyString(keys string) []string { // it succeeded or not. func (s *Server) initStampedTrustedKeys() bool { // Check to see if we have an override in options, which will cause us to fail. - if len(s.opts.TrustedKeys) > 0 { + if len(s.getOpts().TrustedKeys) > 0 { return false } tks := checkTrustedKeyString(trustedKeys) @@ -1336,7 +1337,8 @@ func (s *Server) createInternalClient(kind int) *client { // efficient propagation. // Lock should be held on entry. func (s *Server) shouldTrackSubscriptions() bool { - return (s.opts.Cluster.Port != 0 || s.opts.Gateway.Port != 0) + opts := s.getOpts() + return (opts.Cluster.Port != 0 || opts.Gateway.Port != 0) } // Invokes registerAccountNoLock under the protection of the server lock. @@ -1740,8 +1742,9 @@ func (s *Server) Start() { // In operator mode, when the account resolver depends on an external system and // the system account is the bootstrapping account, start fetching it. if len(opts.TrustedOperators) == 1 && opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT { + opts := s.getOpts() _, isMemResolver := ar.(*MemAccResolver) - if v, ok := s.accounts.Load(s.opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == "" { + if v, ok := s.accounts.Load(opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == "" { s.Noticef("Using bootstrapping system account") s.startGoRoutine(func() { defer s.grWG.Done() @@ -1753,7 +1756,7 @@ func (s *Server) Start() { return case <-t.C: sacc := s.SystemAccount() - if claimJWT, err := fetchAccount(ar, s.opts.SystemAccount); err != nil { + if claimJWT, err := fetchAccount(ar, opts.SystemAccount); err != nil { continue } else if err = s.updateAccountWithClaimJWT(sacc, claimJWT); err != nil { continue @@ -2121,7 +2124,7 @@ func (s *Server) AcceptLoop(clr chan struct{}) { // server's info Host/Port with either values from Options or // ClientAdvertise. if err := s.setInfoHostPort(); err != nil { - s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", s.opts.ClientAdvertise, err) + s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err) l.Close() s.mu.Unlock() return @@ -2199,16 +2202,17 @@ func (s *Server) setInfoHostPort() error { // When this function is called, opts.Port is set to the actual listen // port (if option was originally set to RANDOM), even during a config // reload. So use of s.opts.Port is safe. - if s.opts.ClientAdvertise != _EMPTY_ { - h, p, err := parseHostPort(s.opts.ClientAdvertise, s.opts.Port) + opts := s.getOpts() + if opts.ClientAdvertise != _EMPTY_ { + h, p, err := parseHostPort(opts.ClientAdvertise, opts.Port) if err != nil { return err } s.info.Host = h s.info.Port = p } else { - s.info.Host = s.opts.Host - s.info.Port = s.opts.Port + s.info.Host = opts.Host + s.info.Port = opts.Port } return nil } From 95e4f2dfe1cef7c1f98d3ff1aa0f5c34338e5abf Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 24 Mar 2023 13:15:20 -0600 Subject: [PATCH 2/4] Fixed accounts configuration reload Issues could manifest with subscription interest not properly propagated. Signed-off-by: Ivan Kozlovic --- server/accounts.go | 92 ++++++++++++++++++++++++++++--- server/client_test.go | 65 ---------------------- server/events.go | 2 +- server/norace_test.go | 69 +++++++++++++++++++++++- server/reload.go | 122 +++++++++++++----------------------------- server/reload_test.go | 103 ++++++++++++++++++++++++++++++----- server/server.go | 103 +++++++++++++++++++++++++++-------- 7 files changed, 362 insertions(+), 194 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index c739d0ea0fd..5e7520c6ddf 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -262,8 +262,7 @@ func (a *Account) String() string { // Used to create shallow copies of accounts for transfer // from opts to real accounts in server struct. -func (a *Account) shallowCopy() *Account { - na := NewAccount(a.Name) +func (a *Account) shallowCopy(na *Account) { na.Nkey = a.Nkey na.Issuer = a.Issuer @@ -303,12 +302,14 @@ func (a *Account) shallowCopy() *Account { } } } + na.mappings = a.mappings + if len(na.mappings) > 0 && na.prand == nil { + na.prand = rand.New(rand.NewSource(time.Now().UnixNano())) + } // JetStream na.jsLimits = a.jsLimits // Server config account limits. na.limits = a.limits - - return na } // nextEventID uses its own lock for better concurrency. @@ -2834,7 +2835,12 @@ func (a *Account) checkStreamImportsEqual(b *Account) bool { return true } +// Returns true if `a` and `b` stream exports are the same. +// Acquires `a` read lock, but `b` is assumed to not be accessed +// by anyone but the caller (`b` is not registered anywhere). func (a *Account) checkStreamExportsEqual(b *Account) bool { + a.mu.RLock() + defer a.mu.RUnlock() if len(a.exports.streams) != len(b.exports.streams) { return false } @@ -2843,14 +2849,29 @@ func (a *Account) checkStreamExportsEqual(b *Account) bool { if !ok { return false } - if !reflect.DeepEqual(aea, bea) { + if !isStreamExportEqual(aea, bea) { return false } } return true } +func isStreamExportEqual(a, b *streamExport) bool { + if a == nil && b == nil { + return true + } + if (a == nil && b != nil) || (a != nil && b == nil) { + return false + } + return isExportAuthEqual(&a.exportAuth, &b.exportAuth) +} + +// Returns true if `a` and `b` service exports are the same. +// Acquires `a` read lock, but `b` is assumed to not be accessed +// by anyone but the caller (`b` is not registered anywhere). func (a *Account) checkServiceExportsEqual(b *Account) bool { + a.mu.RLock() + defer a.mu.RUnlock() if len(a.exports.services) != len(b.exports.services) { return false } @@ -2859,7 +2880,66 @@ func (a *Account) checkServiceExportsEqual(b *Account) bool { if !ok { return false } - if !reflect.DeepEqual(aea, bea) { + if !isServiceExportEqual(aea, bea) { + return false + } + } + return true +} + +func isServiceExportEqual(a, b *serviceExport) bool { + if a == nil && b == nil { + return true + } + if (a == nil && b != nil) || (a != nil && b == nil) { + return false + } + if !isExportAuthEqual(&a.exportAuth, &b.exportAuth) { + return false + } + if a.acc.Name != b.acc.Name { + return false + } + if a.respType != b.respType { + return false + } + if a.latency != nil || b.latency != nil { + if (a.latency != nil && b.latency == nil) || (a.latency == nil && b.latency != nil) { + return false + } + if a.latency.sampling != b.latency.sampling { + return false + } + if a.latency.subject != b.latency.subject { + return false + } + } + return true +} + +// Returns true if `a` and `b` exportAuth structures are equal. +// Both `a` and `b` are guaranteed to be non-nil. +// Locking is handled by the caller. +func isExportAuthEqual(a, b *exportAuth) bool { + if a.tokenReq != b.tokenReq { + return false + } + if a.accountPos != b.accountPos { + return false + } + if len(a.approved) != len(b.approved) { + return false + } + for ak, av := range a.approved { + if bv, ok := b.approved[ak]; !ok || av.Name != bv.Name { + return false + } + } + if len(a.actsRevoked) != len(b.actsRevoked) { + return false + } + for ak, av := range a.actsRevoked { + if bv, ok := b.actsRevoked[ak]; !ok || av != bv { return false } } diff --git a/server/client_test.go b/server/client_test.go index bc1a9df959b..ebbc036bc85 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -1543,71 +1543,6 @@ func TestClientOutboundQueueCoalesce(t *testing.T) { } } -// This test ensures that outbound queues don't cause a run on -// memory when sending something to lots of clients. -func TestClientOutboundQueueMemory(t *testing.T) { - opts := DefaultOptions() - s := RunServer(opts) - defer s.Shutdown() - - var before runtime.MemStats - var after runtime.MemStats - - var err error - clients := make([]*nats.Conn, 50000) - wait := &sync.WaitGroup{} - wait.Add(len(clients)) - - for i := 0; i < len(clients); i++ { - clients[i], err = nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s)) - if err != nil { - t.Fatalf("Error on connect: %v", err) - } - defer clients[i].Close() - - clients[i].Subscribe("test", func(m *nats.Msg) { - wait.Done() - }) - } - - runtime.GC() - runtime.ReadMemStats(&before) - - nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s)) - if err != nil { - t.Fatalf("Error on connect: %v", err) - } - defer nc.Close() - - var m [48000]byte - if err = nc.Publish("test", m[:]); err != nil { - t.Fatal(err) - } - - wait.Wait() - - runtime.GC() - runtime.ReadMemStats(&after) - - hb, ha := float64(before.HeapAlloc), float64(after.HeapAlloc) - ms := float64(len(m)) - diff := float64(ha) - float64(hb) - inc := (diff / float64(hb)) * 100 - - fmt.Printf("Message size: %.1fKB\n", ms/1024) - fmt.Printf("Subscribed clients: %d\n", len(clients)) - fmt.Printf("Heap allocs before: %.1fMB\n", hb/1024/1024) - fmt.Printf("Heap allocs after: %.1fMB\n", ha/1024/1024) - fmt.Printf("Heap allocs delta: %.1f%%\n", inc) - - // TODO: What threshold makes sense here for a failure? - /* - if inc > 10 { - t.Fatalf("memory increase was %.1f%% (should be <= 10%%)", inc) - } - */ -} - func TestClientTraceRace(t *testing.T) { opts := DefaultOptions() s := RunServer(opts) diff --git a/server/events.go b/server/events.go index 90185b5c156..4f7e8a7074c 100644 --- a/server/events.go +++ b/server/events.go @@ -1717,7 +1717,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, c *client, _ *Account, sub // This will import any system level exports. func (s *Server) registerSystemImports(a *Account) { - if a == nil || !s.eventsEnabled() { + if a == nil || !s.EventsEnabled() { return } sacc := s.SystemAccount() diff --git a/server/norace_test.go b/server/norace_test.go index 041738ea243..e3456440e60 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -5839,7 +5839,7 @@ func TestNoRaceEncodeConsumerStateBug(t *testing.T) { } // Performance impact on stream ingress with large number of consumers. -func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) { +func TestNoRaceJetStreamLargeNumConsumersPerfImpact(t *testing.T) { skip(t) s := RunBasicJetStreamServer(t) @@ -5931,7 +5931,7 @@ func TestJetStreamLargeNumConsumersPerfImpact(t *testing.T) { } // Performance impact on large number of consumers but sparse delivery. -func TestJetStreamLargeNumConsumersSparseDelivery(t *testing.T) { +func TestNoRaceJetStreamLargeNumConsumersSparseDelivery(t *testing.T) { skip(t) s := RunBasicJetStreamServer(t) @@ -7864,3 +7864,68 @@ func TestNoRaceJetStreamClusterLeafnodeConnectPerf(t *testing.T) { nc.Close() } } + +// This test ensures that outbound queues don't cause a run on +// memory when sending something to lots of clients. +func TestNoRaceClientOutboundQueueMemory(t *testing.T) { + opts := DefaultOptions() + s := RunServer(opts) + defer s.Shutdown() + + var before runtime.MemStats + var after runtime.MemStats + + var err error + clients := make([]*nats.Conn, 50000) + wait := &sync.WaitGroup{} + wait.Add(len(clients)) + + for i := 0; i < len(clients); i++ { + clients[i], err = nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer clients[i].Close() + + clients[i].Subscribe("test", func(m *nats.Msg) { + wait.Done() + }) + } + + runtime.GC() + runtime.ReadMemStats(&before) + + nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + var m [48000]byte + if err = nc.Publish("test", m[:]); err != nil { + t.Fatal(err) + } + + wait.Wait() + + runtime.GC() + runtime.ReadMemStats(&after) + + hb, ha := float64(before.HeapAlloc), float64(after.HeapAlloc) + ms := float64(len(m)) + diff := float64(ha) - float64(hb) + inc := (diff / float64(hb)) * 100 + + fmt.Printf("Message size: %.1fKB\n", ms/1024) + fmt.Printf("Subscribed clients: %d\n", len(clients)) + fmt.Printf("Heap allocs before: %.1fMB\n", hb/1024/1024) + fmt.Printf("Heap allocs after: %.1fMB\n", ha/1024/1024) + fmt.Printf("Heap allocs delta: %.1f%%\n", inc) + + // TODO: What threshold makes sense here for a failure? + /* + if inc > 10 { + t.Fatalf("memory increase was %.1f%% (should be <= 10%%)", inc) + } + */ +} diff --git a/server/reload.go b/server/reload.go index 02cb326c05f..a64f8286a5e 100644 --- a/server/reload.go +++ b/server/reload.go @@ -788,13 +788,6 @@ func (s *Server) Reload() error { func (s *Server) ReloadOptions(newOpts *Options) error { s.mu.Lock() - s.reloading = true - defer func() { - s.mu.Lock() - s.reloading = false - s.mu.Unlock() - }() - curOpts := s.getOpts() // Wipe trusted keys if needed when we have an operator. @@ -1563,96 +1556,41 @@ func (s *Server) reloadClientTraceLevel() { func (s *Server) reloadAuthorization() { // This map will contain the names of accounts that have their streams // import configuration changed. - awcsti := make(map[string]struct{}) + var awcsti map[string]struct{} checkJetStream := false opts := s.getOpts() s.mu.Lock() + deletedAccounts := make(map[string]*Account) + // This can not be changed for now so ok to check server's trustedKeys unlocked. // If plain configured accounts, process here. if s.trustedKeys == nil { - // We need to drain the old accounts here since we have something - // new configured. We do not want s.accounts to change since that would - // mean adding a lock to lookupAccount which is what we are trying to - // optimize for with the change from a map to a sync.Map. - oldAccounts := make(map[string]*Account) - s.accounts.Range(func(k, v interface{}) bool { - acc := v.(*Account) - acc.mu.Lock() - oldAccounts[acc.Name] = acc - // Need to clear out eventing timers since they close over this account and not the new one. - clearTimer(&acc.etmr) - clearTimer(&acc.ctmr) - acc.mu.Unlock() - s.accounts.Delete(k) - return true - }) - s.gacc = nil - s.configureAccounts() - s.configureAuthorization() - s.mu.Unlock() - + // Make a map of the configured account names so we figure out the accounts + // that should be removed later on. + configAccs := make(map[string]struct{}, len(opts.Accounts)) + for _, acc := range opts.Accounts { + configAccs[acc.GetName()] = struct{}{} + } + // Now range over existing accounts and keep track of the ones deleted + // so some cleanup can be made after releasing the server lock. s.accounts.Range(func(k, v interface{}) bool { - newAcc := v.(*Account) - if acc, ok := oldAccounts[newAcc.Name]; ok { - // If account exist in latest config, "transfer" the account's - // sublist and client map to the new account. - acc.mu.RLock() - newAcc.mu.Lock() - if len(acc.clients) > 0 { - newAcc.clients = make(map[*client]struct{}, len(acc.clients)) - for c := range acc.clients { - newAcc.clients[c] = struct{}{} - } - } - // Same for leafnodes - newAcc.lleafs = append([]*client(nil), acc.lleafs...) - - newAcc.sl = acc.sl - if acc.rm != nil { - newAcc.rm = make(map[string]int32) - } - for k, v := range acc.rm { - newAcc.rm[k] = v - } - // Transfer internal client state. The configureAccounts call from above may have set up a new one. - // We need to use the old one, and the isid to not confuse internal subs. - newAcc.ic, newAcc.isid = acc.ic, acc.isid - // Transfer any JetStream state. - newAcc.js = acc.js - // Also transfer any internal accounting on different client types. We copy over all clients - // so need to copy this as well for proper accounting going forward. - newAcc.nrclients = acc.nrclients - newAcc.sysclients = acc.sysclients - newAcc.nleafs = acc.nleafs - newAcc.nrleafs = acc.nrleafs - // Process any reverse map entries. - if len(acc.imports.rrMap) > 0 { - newAcc.imports.rrMap = make(map[string][]*serviceRespEntry) - for k, v := range acc.imports.rrMap { - newAcc.imports.rrMap[k] = v - } - } - newAcc.mu.Unlock() - acc.mu.RUnlock() - - // Check if current and new config of this account are same - // in term of stream imports. - if !acc.checkStreamImportsEqual(newAcc) { - awcsti[newAcc.Name] = struct{}{} - } - - // We need to remove all old service import subs. - acc.removeAllServiceImportSubs() - newAcc.addAllServiceImportSubs() + an, acc := k.(string), v.(*Account) + // Exclude default and system account from this test since those + // may not actually be in opts.Accounts. + if an == DEFAULT_GLOBAL_ACCOUNT || an == DEFAULT_SYSTEM_ACCOUNT { + return true + } + // Check check if existing account is still in opts.Accounts. + if _, ok := configAccs[an]; !ok { + deletedAccounts[an] = acc + s.accounts.Delete(k) } return true }) - s.mu.Lock() - // Check if we had a default system account. - if s.sys != nil && s.sys.account != nil && !opts.NoSystemAccount { - s.accounts.Store(s.sys.account.Name, s.sys.account) - } + // This will update existing and add new ones. + awcsti, _ = s.configureAccounts(true) + s.configureAuthorization() // Double check any JetStream configs. checkJetStream = s.js != nil } else if opts.AccountResolver != nil { @@ -1737,6 +1675,18 @@ func (s *Server) reloadAuthorization() { } s.mu.Unlock() + // Clear some timers and remove service import subs for deleted accounts. + for _, acc := range deletedAccounts { + acc.mu.Lock() + clearTimer(&acc.etmr) + clearTimer(&acc.ctmr) + for _, se := range acc.exports.services { + se.clearResponseThresholdTimer() + } + acc.mu.Unlock() + acc.removeAllServiceImportSubs() + } + if resetCh != nil { resetCh <- struct{}{} } diff --git a/server/reload_test.go b/server/reload_test.go index 9f334db1d6e..062d4248cac 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -2657,8 +2657,11 @@ func TestConfigReloadAccountUsers(t *testing.T) { fooMatch := gAcc.sl.Match("foo") bazMatch := gAcc.sl.Match("baz") gAcc.mu.RUnlock() - if n != 2 { - return fmt.Errorf("Global account should have 2 subs, got %v", n) + // The number of subscriptions should be 3 ($SYS.REQ.ACCOUNT.PING.CONNZ, + // $SYS.REQ.ACCOUNT.PING.STATZ, $SYS.REQ.SERVER.PING.CONNZ) + // + 2 (foo and baz) + if n != 5 { + return fmt.Errorf("Global account should have 5 subs, got %v", n) } if len(fooMatch.psubs) != 1 { return fmt.Errorf("Global account should have foo sub") @@ -3897,7 +3900,7 @@ func TestConfigReloadConnectErrReports(t *testing.T) { } } -func TestAuthReloadDoesNotBreakRouteInterest(t *testing.T) { +func TestConfigReloadAuthDoesNotBreakRouteInterest(t *testing.T) { s, opts := RunServerWithConfig("./configs/seed_tls.conf") defer s.Shutdown() @@ -4047,7 +4050,7 @@ func TestConfigReloadAccountResolverTLSConfig(t *testing.T) { } } -func TestLoggingReload(t *testing.T) { +func TestConfigReloadLogging(t *testing.T) { // This test basically starts a server and causes it's configuration to be reloaded 3 times. // Each time, a new log file is created and trace levels are turned, off - on - off. @@ -4174,7 +4177,7 @@ func TestLoggingReload(t *testing.T) { check("off-post.log", tracingAbsent) } -func TestReloadValidate(t *testing.T) { +func TestConfigReloadValidate(t *testing.T) { confFileName := createConfFile(t, []byte(` listen: "127.0.0.1:-1" no_auth_user: a @@ -4234,12 +4237,14 @@ func TestConfigReloadAccounts(t *testing.T) { urlSys := fmt.Sprintf("nats://sys:pwd@%s:%d", o.Host, o.Port) urlUsr := fmt.Sprintf("nats://usr:pwd@%s:%d", o.Host, o.Port) - oldAcc, ok := s.accounts.Load("SYS") + oldAcci, ok := s.accounts.Load("SYS") if !ok { t.Fatal("No SYS account") } + oldAcc := oldAcci.(*Account) - testSrvState := func(oldAcc interface{}) { + testSrvState := func(oldAcc *Account) { + t.Helper() sysAcc := s.SystemAccount() s.mu.Lock() defer s.mu.Unlock() @@ -4252,11 +4257,13 @@ func TestConfigReloadAccounts(t *testing.T) { if s.opts.SystemAccount != "SYS" { t.Fatal("Found wrong sys.account") } - // This will fail prior to system account reload - if acc, ok := s.accounts.Load(s.opts.SystemAccount); !ok { - t.Fatal("Found different sys.account pointer") - } else if acc == oldAcc { - t.Fatal("System account is unaltered") + ai, ok := s.accounts.Load(s.opts.SystemAccount) + if !ok { + t.Fatalf("System account %q not found in s.accounts map", s.opts.SystemAccount) + } + acc := ai.(*Account) + if acc != oldAcc { + t.Fatalf("System account pointer was changed during reload, was %p now %p", oldAcc, acc) } if s.sys.client == nil { t.Fatal("Expected sys.client to be non-nil") @@ -4324,7 +4331,7 @@ func TestConfigReloadAccounts(t *testing.T) { } } - testSrvState(nil) + testSrvState(oldAcc) c1, s1C, s1D := subscribe("SYS1") defer c1.Close() defer s1C.Unsubscribe() @@ -4508,3 +4515,73 @@ func TestConfigReloadWithSysAccountOnly(t *testing.T) { // ok } } + +func TestConfigReloadGlobalAccountWithMappingAndJetStream(t *testing.T) { + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + mappings { + subj.orig: subj.mapped.before.reload + } + + leaf { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + # For access to system account. + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } + ` + c := createJetStreamClusterWithTemplate(t, tmpl, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Verify that mapping works + checkMapping := func(expectedSubj string) { + t.Helper() + sub := natsSubSync(t, nc, "subj.>") + defer sub.Unsubscribe() + natsPub(t, nc, "subj.orig", nil) + msg := natsNexMsg(t, sub, time.Second) + if msg.Subject != expectedSubj { + t.Fatalf("Expected subject to have been mapped to %q, got %q", expectedSubj, msg.Subject) + } + } + checkMapping("subj.mapped.before.reload") + + // Create a stream and check that we can get the INFO + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Replicas: 3, + Subjects: []string{"foo"}, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + c.waitOnStreamLeader(globalAccountName, "TEST") + + _, err = js.StreamInfo("TEST") + require_NoError(t, err) + + // Change mapping on all servers and issue reload + for i, s := range c.servers { + opts := c.opts[i] + content, err := os.ReadFile(opts.ConfigFile) + require_NoError(t, err) + reloadUpdateConfig(t, s, opts.ConfigFile, strings.Replace(string(content), "subj.mapped.before.reload", "subj.mapped.after.reload", 1)) + } + // Make sure the cluster is still formed + checkClusterFormed(t, c.servers...) + // Now repeat the test for the subject mapping and stream info + checkMapping("subj.mapped.after.reload") + _, err = js.StreamInfo("TEST") + require_NoError(t, err) +} diff --git a/server/server.go b/server/server.go index 10be531284c..f6c1b6a73d4 100644 --- a/server/server.go +++ b/server/server.go @@ -120,7 +120,6 @@ type Server struct { opts *Options running bool shutdown bool - reloading bool listener net.Listener listenerErr error gacc *Account @@ -546,7 +545,7 @@ func NewServer(opts *Options) (*Server, error) { } // For tracking accounts - if err := s.configureAccounts(); err != nil { + if _, err := s.configureAccounts(false); err != nil { return nil, err } @@ -739,9 +738,13 @@ func (s *Server) globalAccount() *Account { return gacc } -// Used to setup Accounts. -// Lock is held upon entry. -func (s *Server) configureAccounts() error { +// Used to setup or update Accounts. +// Returns a map that indicates which accounts have had their stream imports +// changed (in case of an update in configuration reload). +// Lock is held upon entry, but will be released/reacquired in this function. +func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) { + awcsti := make(map[string]struct{}) + // Create the global account. if s.gacc == nil { s.gacc = NewAccount(globalAccountName) @@ -754,21 +757,62 @@ func (s *Server) configureAccounts() error { // so that we do not keep a real one sitting in the options. for _, acc := range opts.Accounts { var a *Account - if acc.Name == globalAccountName { - a = s.gacc - } else { - a = acc.shallowCopy() + create := true + // For the global account, we want to skip the reload process + // and fall back into the "create" case which will in that + // case really be just an update (shallowCopy will make sure + // that mappings are copied over). + if reloading && acc.Name != globalAccountName { + if ai, ok := s.accounts.Load(acc.Name); ok { + a = ai.(*Account) + a.mu.Lock() + // Before updating the account, check if stream imports have changed. + if !a.checkStreamImportsEqual(acc) { + awcsti[acc.Name] = struct{}{} + } + // Collect the sids for the service imports since we are going to + // replace with new ones. + var sids [][]byte + c := a.ic + for _, si := range a.imports.services { + if c != nil && si.sid != nil { + sids = append(sids, si.sid) + } + } + // Now reset all export/imports fields since they are going to be + // filled in shallowCopy() + a.imports.streams, a.imports.services = nil, nil + a.exports.streams, a.exports.services = nil, nil + // We call shallowCopy from the account `acc` (the one in Options) + // and pass `a` (our existing account) to get it updated. + acc.shallowCopy(a) + a.mu.Unlock() + // Need to release the lock for this. + s.mu.Unlock() + for _, sid := range sids { + c.processUnsub(sid) + } + // Add subscriptions for existing service imports. + a.addAllServiceImportSubs() + s.mu.Lock() + create = false + } } - if acc.hasMappings() { - // For now just move and wipe from opts.Accounts version. - a.mappings = acc.mappings - acc.mappings = nil - // We use this for selecting between multiple weighted destinations. - a.prand = rand.New(rand.NewSource(time.Now().UnixNano())) + if create { + if acc.Name == globalAccountName { + a = s.gacc + } else { + a = NewAccount(acc.Name) + } + // Locking matters in the case of an update of the global account + a.mu.Lock() + acc.shallowCopy(a) + a.mu.Unlock() + // Will be a no-op in case of the global account since it is alrady registered. + s.registerAccountNoLock(a) } - acc.sl = nil - acc.clients = nil - s.registerAccountNoLock(a) + // The `acc` account is stored in options, not in the server, and these can be cleared. + acc.sl, acc.clients, acc.mappings = nil, nil, nil // If we see an account defined using $SYS we will make sure that is set as system account. if acc.Name == DEFAULT_SYSTEM_ACCOUNT && opts.SystemAccount == _EMPTY_ { @@ -791,6 +835,7 @@ func (s *Server) configureAccounts() error { s.accounts.Range(func(k, v interface{}) bool { numAccounts++ acc := v.(*Account) + acc.mu.Lock() // Exports for _, se := range acc.exports.streams { if se != nil { @@ -817,16 +862,32 @@ func (s *Server) configureAccounts() error { for _, si := range acc.imports.services { if v, ok := s.accounts.Load(si.acc.Name); ok { si.acc = v.(*Account) + // TODO: Not sure if it is possible for an account to have a + // service import from itself, but if that is the case, + // we are already lock, otherwise use locking to protect + // the call to si.acc.getServiceExport(). + if si.acc == acc { + si.se = si.acc.getServiceExport(si.to) + continue + } + si.acc.mu.RLock() si.se = si.acc.getServiceExport(si.to) + si.acc.mu.RUnlock() } } // Make sure the subs are running, but only if not reloading. - if len(acc.imports.services) > 0 && acc.ic == nil && !s.reloading { + if len(acc.imports.services) > 0 && acc.ic == nil && !reloading { acc.ic = s.createInternalAccountClient() acc.ic.acc = acc + // Need to release locks to invoke this function. + acc.mu.Unlock() + s.mu.Unlock() acc.addAllServiceImportSubs() + s.mu.Lock() + acc.mu.Lock() } acc.updated = time.Now().UTC() + acc.mu.Unlock() return true }) @@ -846,7 +907,7 @@ func (s *Server) configureAccounts() error { s.mu.Lock() } if err != nil { - return fmt.Errorf("error resolving system account: %v", err) + return awcsti, fmt.Errorf("error resolving system account: %v", err) } // If we have defined a system account here check to see if its just us and the $G account. @@ -873,7 +934,7 @@ func (s *Server) configureAccounts() error { } } - return nil + return awcsti, nil } // Setup the account resolver. For memory resolver, make sure the JWTs are From 7afe76caf8ae4c640f39d3950e06ef7abafc4ceb Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 3 May 2023 15:17:08 -0600 Subject: [PATCH 3/4] Fixed Sublist.RemoveBatch to remove subs present, even if one isn't I have seen cases, maybe due to previous issue with configuration reload that would miss subscriptions in the sublist because of the sublist swap, where we would attempt to remove subscriptions by batch but some were not present. I would have expected that all present subscriptions would still be removed, even if the call overall returned an error. This is now fixed and a test has been added demonstrating that even on error, we remove all subscriptions that were present. Signed-off-by: Ivan Kozlovic --- server/sublist.go | 10 +++++++--- server/sublist_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/server/sublist.go b/server/sublist.go index bcf271d8e2f..47d45999fab 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -820,9 +820,13 @@ func (s *Sublist) RemoveBatch(subs []*subscription) error { // Turn off our cache if enabled. wasEnabled := s.cache != nil s.cache = nil + // We will try to remove all subscriptions but will report the first that caused + // an error. In other words, we don't bail out at the first error which would + // possibly leave a bunch of subscriptions that could have been removed. + var err error for _, sub := range subs { - if err := s.remove(sub, false, false); err != nil { - return err + if lerr := s.remove(sub, false, false); lerr != nil && err == nil { + err = lerr } } // Turn caching back on here. @@ -830,7 +834,7 @@ func (s *Sublist) RemoveBatch(subs []*subscription) error { if wasEnabled { s.cache = make(map[string]*SublistResult) } - return nil + return err } // pruneNode is used to prune an empty node from the tree. diff --git a/server/sublist_test.go b/server/sublist_test.go index b9e12a2646d..4d3e913f077 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -370,6 +370,30 @@ func TestSublistNoCacheRemoveBatch(t *testing.T) { } } +func TestSublistRemoveBatchWithError(t *testing.T) { + s := NewSublistNoCache() + sub1 := newSub("foo") + sub2 := newSub("bar") + sub3 := newSub("baz") + s.Insert(sub1) + s.Insert(sub2) + s.Insert(sub3) + subNotPresent := newSub("not.inserted") + // Try to remove all subs, but include the sub that has not been inserted. + err := s.RemoveBatch([]*subscription{subNotPresent, sub1, sub3}) + // We expect an error to be returned, but sub1,2 and 3 to have been removed. + require_Error(t, err, ErrNotFound) + // Make sure that we have only sub2 present + verifyCount(s, 1, t) + r := s.Match("bar") + verifyLen(r.psubs, 1, t) + verifyMember(r.psubs, sub2, t) + r = s.Match("foo") + verifyLen(r.psubs, 0, t) + r = s.Match("baz") + verifyLen(r.psubs, 0, t) +} + func testSublistInvalidSubjectsInsert(t *testing.T, s *Sublist) { // Insert, or subscriptions, can have wildcards, but not empty tokens, // and can not have a FWC that is not the terminal token. From 8a4ead22bc33d41c2511647e073a0502a55c3723 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 3 May 2023 16:14:51 -0600 Subject: [PATCH 4/4] Updates based on code review Signed-off-by: Ivan Kozlovic --- server/route.go | 2 +- server/server.go | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/server/route.go b/server/route.go index ccbcb5361cb..b135ce500e5 100644 --- a/server/route.go +++ b/server/route.go @@ -1773,7 +1773,7 @@ func (s *Server) startRouteAcceptLoop() { // Similar to setInfoHostPortAndGenerateJSON, but for routeInfo. func (s *Server) setRouteInfoHostPortAndIP() error { opts := s.getOpts() - if opts.Cluster.Advertise != "" { + if opts.Cluster.Advertise != _EMPTY_ { advHost, advPort, err := parseHostPort(opts.Cluster.Advertise, opts.Cluster.Port) if err != nil { return err diff --git a/server/server.go b/server/server.go index f6c1b6a73d4..7754d6343df 100644 --- a/server/server.go +++ b/server/server.go @@ -862,10 +862,8 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) for _, si := range acc.imports.services { if v, ok := s.accounts.Load(si.acc.Name); ok { si.acc = v.(*Account) - // TODO: Not sure if it is possible for an account to have a - // service import from itself, but if that is the case, - // we are already lock, otherwise use locking to protect - // the call to si.acc.getServiceExport(). + // It is possible to allow for latency tracking inside your + // own account, so lock only when not the same account. if si.acc == acc { si.se = si.acc.getServiceExport(si.to) continue @@ -1805,7 +1803,7 @@ func (s *Server) Start() { if len(opts.TrustedOperators) == 1 && opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT { opts := s.getOpts() _, isMemResolver := ar.(*MemAccResolver) - if v, ok := s.accounts.Load(opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == "" { + if v, ok := s.accounts.Load(opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == _EMPTY_ { s.Noticef("Using bootstrapping system account") s.startGoRoutine(func() { defer s.grWG.Done()