Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increasing memory consumption #4822

Closed
mbrekhov opened this issue Nov 27, 2023 · 2 comments
Closed

Increasing memory consumption #4822

mbrekhov opened this issue Nov 27, 2023 · 2 comments
Labels
defect Suspected defect such as a bug or regression stale This issue has had no activity in a while

Comments

@mbrekhov
Copy link

Observed behavior

Hi!
Can you please help me to identify what can be the reason of constantly increasing/leaking memory consumption of our NATs cluster?

I've collected some statistics:

ile: nats-server
Type: inuse_space
Time: Nov 27, 2023 at 1:28pm (UTC)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top15
Showing nodes accounting for 329.56MB, 97.48% of 338.09MB total
Dropped 28 nodes (cum <= 1.69MB)
Showing top 15 nodes out of 45
      flat  flat%   sum%        cum   cum%
  134.83MB 39.88% 39.88%   146.33MB 43.28%  github.com/nats-io/nats-server/v2/server.(*Sublist).match
   96.51MB 28.54% 68.42%   291.11MB 86.10%  github.com/nats-io/nats-server/v2/server.(*client).processInboundClientMsg
   17.59MB  5.20% 73.63%    17.59MB  5.20%  math/rand.newSource (inline)
   16.14MB  4.77% 78.40%    16.14MB  4.77%  github.com/nats-io/nats-server/v2/server.(*Server).createClientEx
   16.13MB  4.77% 83.17%    16.13MB  4.77%  sync.(*Map).dirtyLocked
   13.56MB  4.01% 87.18%    29.68MB  8.78%  sync.(*Map).Swap
   11.50MB  3.40% 90.58%    11.50MB  3.40%  github.com/nats-io/nats-server/v2/server.addNodeToResults
    9.50MB  2.81% 93.39%       11MB  3.25%  github.com/nats-io/nats-server/v2/server.(*Server).saveClosedClient
    3.02MB  0.89% 94.29%     3.02MB  0.89%  github.com/nats-io/nats-server/v2/server.(*Server).createWSClient
    2.50MB  0.74% 95.03%     2.50MB  0.74%  runtime.malg
    2.50MB  0.74% 95.77%     2.50MB  0.74%  github.com/nats-io/nats-server/v2/server.newLevel (inline)
       2MB  0.59% 96.36%        2MB  0.59%  github.com/nats-io/nats-server/v2/server.(*client).addSubToRouteTargets
    1.50MB  0.44% 96.80%     6.50MB  1.92%  github.com/nats-io/nats-server/v2/server.(*client).setPermissions
    1.23MB  0.37% 97.17%     2.73MB  0.81%  github.com/nats-io/nats-server/v2/server.(*client).getAccAndResultFromCache
    1.06MB  0.31% 97.48%     4.56MB  1.35%  github.com/nats-io/nats-server/v2/server.(*Sublist).Insert

(pprof) list .match

Total: 338.09MB
ROUTINE ======================== github.com/nats-io/nats-server/v2/server.(*Sublist).match in server/sublist.go
  134.83MB   146.33MB (flat, cum) 43.28% of Total
         .          .    536:func (s *Sublist) match(subject string, doLock bool) *SublistResult {
         .          .    537:	atomic.AddUint64(&s.matches, 1)
         .          .    538:
         .          .    539:	// Check cache first.
         .          .    540:	if doLock {
         .          .    541:		s.RLock()
         .          .    542:	}
         .          .    543:	cacheEnabled := s.cache != nil
         .          .    544:	r, ok := s.cache[subject]
         .          .    545:	if doLock {
         .          .    546:		s.RUnlock()
         .          .    547:	}
         .          .    548:	if ok {
         .          .    549:		atomic.AddUint64(&s.cacheHits, 1)
         .          .    550:		return r
         .          .    551:	}
         .          .    552:
         .          .    553:	tsa := [32]string{}
         .          .    554:	tokens := tsa[:0]
         .          .    555:	start := 0
         .          .    556:	for i := 0; i < len(subject); i++ {
         .          .    557:		if subject[i] == btsep {
         .          .    558:			if i-start == 0 {
         .          .    559:				return emptyResult
         .          .    560:			}
         .          .    561:			tokens = append(tokens, subject[start:i])
         .          .    562:			start = i + 1
         .          .    563:		}
         .          .    564:	}
         .          .    565:	if start >= len(subject) {
         .          .    566:		return emptyResult
         .          .    567:	}
         .          .    568:	tokens = append(tokens, subject[start:])
         .          .    569:
      61MB       61MB    570:	// FIXME(dlc) - Make shared pool between sublist and client readLoop?
         .          .    571:	result := &SublistResult{}
         .          .    572:
         .          .    573:	// Get result from the main structure and place into the shared cache.
         .          .    574:	// Hold the read lock to avoid race between match and store.
         .          .    575:	var n int
         .          .    576:
         .          .    577:	if doLock {
         .          .    578:		if cacheEnabled {
         .          .    579:			s.Lock()
         .    11.50MB    580:		} else {
         .          .    581:			s.RLock()
         .          .    582:		}
         .          .    583:	}
         .          .    584:
         .          .    585:	matchLevel(s.root, tokens, result)
   73.83MB    73.83MB    586:	// Check for empty result.
         .          .    587:	if len(result.psubs) == 0 && len(result.qsubs) == 0 {
         .          .    588:		result = emptyResult
         .          .    589:	}
         .          .    590:	if cacheEnabled {
         .          .    591:		s.cache[subject] = result

list .processInboundClientMsg

         (pprof) list .processInboundClientMsg
         Total: 338.09MB
         ROUTINE ======================== github.com/nats-io/nats-server/v2/server.(*client).processInboundClientMsg in server/client.go
            96.51MB   291.11MB (flat, cum) 86.10% of Total
                  .          .   3568:// Lock should be held upon entry.
                  .          .   3569:func (c *client) pruneRemoteTracking() {
                  .          .   3570:	c.mu.Lock()
                  .          .   3571:	if c.rrTracking == nil {
                  .          .   3572:		c.mu.Unlock()
                  .          .   3573:		return
                  .          .   3574:	}
                  .          .   3575:	now := time.Now()
                  .          .   3576:	for subject, rl := range c.rrTracking.rmap {
                  .          .   3577:		if now.After(rl.M2.RequestStart.Add(rl.respThresh)) {
                  .          .   3578:			delete(c.rrTracking.rmap, subject)
                  .          .   3579:		}
                  .          .   3580:	}
                  .          .   3581:	if len(c.rrTracking.rmap) > 0 {
                  .          .   3582:		t := c.rrTracking.ptmr
                  .          .   3583:		t.Stop()
                  .          .   3584:		t.Reset(c.rrTracking.lrt)
                  .          .   3585:	} else {
                  .          .   3586:		c.rrTracking.ptmr.Stop()
                  .          .   3587:		c.rrTracking = nil
                  .          .   3588:	}
                  .          .   3589:	c.mu.Unlock()
            96.01MB   271.02MB   3590:}
                  .          .   3591:
                  .          .   3592:// pruneReplyPerms will remove any stale or expired entries
                  .          .   3593:// in our reply cache. We make sure to not check too often.
                  .          .   3594:func (c *client) pruneReplyPerms() {
                  .          .   3595:	// Make sure we do not check too often.
                  .          .   3596:	if c.perms.resp == nil {
                  .          .   3597:		return
                  .          .   3598:	}
                  .          .   3599:
                  .          .   3600:	mm := c.perms.resp.MaxMsgs
                  .          .   3601:	ttl := c.perms.resp.Expires
                  .          .   3602:	now := time.Now()
                  .          .   3603:
                  .          .   3604:	for k, resp := range c.replies {
                  .          .   3605:		if mm > 0 && resp.n >= mm {
                  .          .   3606:			delete(c.replies, k)
                  .          .   3607:		} else if ttl > 0 && now.Sub(resp.t) > ttl {
                  .          .   3608:			delete(c.replies, k)
                  .          .   3609:		}
                  .          .   3610:	}
                  .          .   3611:}
                  .          .   3612:
                  .          .   3613:// pruneDenyCache will prune the deny cache via randomly
                  .          .   3614:// deleting items. Doing so pruneSize items at a time.
                  .          .   3615:// Lock must be held for this one since it is shared under
                  .          .   3616:// deliverMsg.
                  .          .   3617:func (c *client) pruneDenyCache() {
                  .          .   3618:	r := 0
                  .          .   3619:	for subject := range c.mperms.dcache {
                  .          .   3620:		delete(c.mperms.dcache, subject)
                  .          .   3621:		if r++; r > pruneSize {
                  .          .   3622:			break
                  .          .   3623:		}
                  .          .   3624:	}
                  .          .   3625:}
                  .          .   3626:
                  .          .   3627:// prunePubPermsCache will prune the cache via randomly
                  .          .   3628:// deleting items. Doing so pruneSize items at a time.
                  .          .   3629:func (c *client) prunePubPermsCache() {
                  .          .   3630:	// There is a case where we can invoke this from multiple go routines,
                  .          .   3631:	// (in deliverMsg() if sub.client is a LEAF), so we make sure to prune
                  .          .   3632:	// from only one go routine at a time.
                  .          .   3633:	if !atomic.CompareAndSwapInt32(&c.perms.prun, 0, 1) {
                  .          .   3634:		return
                  .          .   3635:	}
                  .          .   3636:	const maxPruneAtOnce = 1000
                  .          .   3637:	r := 0
                  .          .   3638:	c.perms.pcache.Range(func(k, _ interface{}) bool {
                  .          .   3639:		c.perms.pcache.Delete(k)
                  .          .   3640:		if r++; (r > pruneSize && atomic.LoadInt32(&c.perms.pcsz) < int32(maxPermCacheSize)) ||
                  .          .   3641:			(r > maxPruneAtOnce) {
                  .          .   3642:			return false
                  .          .   3643:		}
                  .          .   3644:		return true
                  .          .   3645:	})
                  .          .   3646:	atomic.AddInt32(&c.perms.pcsz, -int32(r))
                  .          .   3647:	atomic.StoreInt32(&c.perms.prun, 0)
                  .          .   3648:}
                  .          .   3649:
                  .          .   3650:// pubAllowed checks on publish permissioning.
                  .          .   3651:// Lock should not be held.
                  .          .   3652:func (c *client) pubAllowed(subject string) bool {
                  .          .   3653:	return c.pubAllowedFullCheck(subject, true, false)
                  .          .   3654:}
                  .          .   3655:
                  .          .   3656:// pubAllowedFullCheck checks on all publish permissioning depending
                  .          .   3657:// on the flag for dynamic reply permissions.
                  .          .   3658:func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bool {
                  .          .   3659:	if c.perms == nil || (c.perms.pub.allow == nil && c.perms.pub.deny == nil) {
                  .          .   3660:		return true
                  .          .   3661:	}
                  .          .   3662:	// Check if published subject is allowed if we have permissions in place.
                  .          .   3663:	v, ok := c.perms.pcache.Load(subject)
                  .          .   3664:	if ok {
                  .          .   3665:		return v.(bool)
                  .          .   3666:	}
                  .          .   3667:	allowed := true
                  .          .   3668:	// Cache miss, check allow then deny as needed.
                  .          .   3669:	if c.perms.pub.allow != nil {
                  .          .   3670:		r := c.perms.pub.allow.Match(subject)
                  .          .   3671:		allowed = len(r.psubs) != 0
                  .          .   3672:	}
                  .          .   3673:	// If we have a deny list and are currently allowed, check that as well.
                  .          .   3674:	if allowed && c.perms.pub.deny != nil {
           512.10kB   512.10kB   3675:		r := c.perms.pub.deny.Match(subject)
                  .          .   3676:		allowed = len(r.psubs) == 0
                  .          .   3677:	}
                  .          .   3678:
                  .          .   3679:	// If we are currently not allowed but we are tracking reply subjects
                  .          .   3680:	// dynamically, check to see if we are allowed here but avoid pcache.
                  .          .   3681:	// We need to acquire the lock though.
                  .          .   3682:	if !allowed && fullCheck && c.perms.resp != nil {
                  .          .   3683:		if !hasLock {
                  .          .   3684:			c.mu.Lock()
                  .          .   3685:		}
                  .          .   3686:		if resp := c.replies[subject]; resp != nil {
                  .          .   3687:			resp.n++
                  .          .   3688:			// Check if we have sent too many responses.
                  .          .   3689:			if c.perms.resp.MaxMsgs > 0 && resp.n > c.perms.resp.MaxMsgs {
                  .          .   3690:				delete(c.replies, subject)
                  .          .   3691:			} else if c.perms.resp.Expires > 0 && time.Since(resp.t) > c.perms.resp.Expires {
                  .          .   3692:				delete(c.replies, subject)
                  .          .   3693:			} else {
                  .          .   3694:				allowed = true
                  .          .   3695:			}
                  .          .   3696:		}
                  .          .   3697:		if !hasLock {
                  .          .   3698:			c.mu.Unlock()
                  .          .   3699:		}
                  .          .   3700:	} else {
                  .          .   3701:		// Update our cache here.
                  .          .   3702:		c.perms.pcache.Store(string(subject), allowed)
                  .          .   3703:		if n := atomic.AddInt32(&c.perms.pcsz, 1); n > maxPermCacheSize {
                  .          .   3704:			c.prunePubPermsCache()
                  .          .   3705:		}
                  .    19.59MB   3706:	}
                  .          .   3707:	return allowed
                  .          .   3708:}
                  .          .   3709:
                  .          .   3710:// Test whether a reply subject is a service import reply.
                  .          .   3711:func isServiceReply(reply []byte) bool {

/varz

{
  "server_id": "NBT7JXKUKCU7XEAMBQUSSVVRLPQEDU5EVJ7PTYCBTYUZF7SKJXB4O6I7",
  "server_name": "nats-1",
  "version": "2.9.23",
  "proto": 1,
  "git_commit": "45436e1",
  "go": "go1.20.10",
  "host": "0.0.0.0",
  "port": 4224,
  "auth_required": true,
  "connect_urls": [
    "172.27.20.240:4224",
    "172.27.22.164:4224",
    "172.27.21.135:4224"
  ],
  "ws_connect_urls": [
    "172.27.20.240:8091",
    "172.27.22.164:8091",
    "172.27.21.135:8091"
  ],
  "max_connections": 65536,
  "ping_interval": 120000000000,
  "ping_max": 2,
  "http_host": "0.0.0.0",
  "http_port": 8224,
  "http_base_path": "",
  "https_port": 0,
  "auth_timeout": 2,
  "max_control_line": 4096,
  "max_payload": 1048576,
  "max_pending": 67108864,
  "cluster": {
    "name": "tournaments",
    "addr": "0.0.0.0",
    "cluster_port": 4246,
    "auth_timeout": 2,
    "urls": [
      "<deleted>.com:4246",
      "<deleted>.com:4246",
      "<deleted>.com:4246"
    ],
    "tls_timeout": 2
  },
  "gateway": {},
  "leaf": {},
  "mqtt": {},
  "websocket": {
    "host": "0.0.0.0",
    "port": 8091,
    "no_tls": true
  },
  "jetstream": {},
  "tls_timeout": 2,
  "write_deadline": 10000000000,
  "start": "2023-11-23T18:18:35.194826761Z",
  "now": "2023-11-27T13:33:16.139861719Z",
  "uptime": "3d19h14m40s",
  "mem": 587067392,
  "cores": 2,
  "gomaxprocs": 2,
  "cpu": 4,
  "connections": 2939,
  "total_connections": 502739,
  "routes": 2,
  "remotes": 2,
  "leafnodes": 0,
  "in_msgs": 16791326,
  "out_msgs": 16802883,
  "in_bytes": 2690254810,
  "out_bytes": 2686343929,
  "slow_consumers": 0,
  "subscriptions": 3563,
  "http_req_stats": {
    "/accountz": 1,
    "/accstatz": 1,
    "/connz": 3,
    "/leafz": 1,
    "/subsz": 11,
    "/varz": 5481
  },
  "config_load_time": "2023-11-23T18:18:35.194826761Z",
  "system_account": "$SYS",
  "ocsp_peer_cache": {}
}

excerpt from /subsz

  "server_id": "NBT7JXKUKCU7XEAMBQUSSVVRLPQEDU5EVJ7PTYCBTYUZF7SKJXB4O6I7",
  "now": "2023-11-27T13:33:55.166875431Z",
  "num_subscriptions": 3588,
  "num_cache": 13,
  "num_inserts": 2776812,
  "num_removes": 2773224,
  "num_matches": 15432793,
  "cache_hit_rate": 0.30323616729648356,
  "max_fanout": 3,
  "avg_fanout": 1.8461538461538463,
  "total": 1296,
  "offset": 0,
  "limit": 10000,

excerpt from /connz

  "server_id": "NBT7JXKUKCU7XEAMBQUSSVVRLPQEDU5EVJ7PTYCBTYUZF7SKJXB4O6I7",
  "now": "2023-11-27T13:35:07.517826048Z",
  "num_connections": 2955,
  "total": 2955,
  "offset": 0,
  "limit": 10000,

Expected behavior

no memory leak

Server and client version

$ nats-server --version
nats-server: v2.9.23

Host environment

No response

Steps to reproduce

No response

@mbrekhov mbrekhov added the defect Suspected defect such as a bug or regression label Nov 27, 2023
@derekcollison
Copy link
Member

Do you expect to have 3k subscriptions?

If the answer is yes, do you send alot of publish messages that have no matching subscriptions? If this is the case you could be increasing your cache for those misses. You can turn off sublist caching in the server config to experiment.

@derekcollison
Copy link
Member

Any updates on this?

@derekcollison derekcollison added the stale This issue has had no activity in a while label Jan 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression stale This issue has had no activity in a while
Projects
None yet
Development

No branches or pull requests

2 participants