diff --git a/server/sublist.go b/server/sublist.go index d7ffac0c92..b5461dc31f 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -47,8 +47,6 @@ var ( const ( // cacheMax is used to bound limit the frontend cache slCacheMax = 1024 - // If we run a sweeper we will drain to this count. - slCacheSweep = 256 // plistMin is our lower bounds to create a fast plist for Match. plistMin = 256 ) @@ -68,8 +66,7 @@ type Sublist struct { inserts uint64 removes uint64 root *level - cache map[string]*SublistResult - ccSweep int32 + cache *Cache notify *notifyMaps count uint32 } @@ -114,7 +111,7 @@ func newLevel() *level { // NewSublist will create a default sublist with caching enabled per the flag. func NewSublist(enableCache bool) *Sublist { if enableCache { - return &Sublist{root: newLevel(), cache: make(map[string]*SublistResult)} + return &Sublist{root: newLevel(), cache: NewCache()} } return &Sublist{root: newLevel()} } @@ -489,16 +486,17 @@ func (s *Sublist) addToCache(subject string, sub *subscription) { } // If literal we can direct match. if subjectIsLiteral(subject) { - if r := s.cache[subject]; r != nil { - s.cache[subject] = r.addSubToResult(sub) + if r, _ := s.cache.Get(subject); r != nil { + s.cache.Set(subject, r.addSubToResult(sub)) } return } - for key, r := range s.cache { + + s.cache.Iterate(func(key string, value *SublistResult) { if matchLiteral(key, subject) { - s.cache[key] = r.addSubToResult(sub) + s.cache.Set(key, value.addSubToResult(sub)) } - } + }) } // removeFromCache will remove the sub from any active cache entries. @@ -509,15 +507,15 @@ func (s *Sublist) removeFromCache(subject string, sub *subscription) { } // If literal we can direct match. if subjectIsLiteral(subject) { - delete(s.cache, subject) + s.cache.Delete(subject) return } // Wildcard here. - for key := range s.cache { + s.cache.Iterate(func(key string, value *SublistResult) { if matchLiteral(key, subject) { - delete(s.cache, key) + s.cache.Delete(key) } - } + }) } // a place holder for an empty result. @@ -537,13 +535,9 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult { atomic.AddUint64(&s.matches, 1) // Check cache first. - if doLock { - s.RLock() - } - r, ok := s.cache[subject] - if doLock { - s.RUnlock() - } + + r, ok := s.cache.Get(subject) + if ok { atomic.AddUint64(&s.cacheHits, 1) return r @@ -571,8 +565,6 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult { // Get result from the main structure and place into the shared cache. // Hold the read lock to avoid race between match and store. - var n int - if doLock { s.Lock() } @@ -583,36 +575,15 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult { result = emptyResult } if s.cache != nil { - s.cache[subject] = result - n = len(s.cache) + s.cache.Set(subject, result) } if doLock { s.Unlock() } - // Reduce the cache count if we have exceeded our set maximum. - if n > slCacheMax && atomic.CompareAndSwapInt32(&s.ccSweep, 0, 1) { - go s.reduceCacheCount() - } - return result } -// Remove entries in the cache until we are under the maximum. -// TODO(dlc) this could be smarter now that its not inline. -func (s *Sublist) reduceCacheCount() { - defer atomic.StoreInt32(&s.ccSweep, 0) - // If we are over the cache limit randomly drop until under the limit. - s.Lock() - for key := range s.cache { - delete(s.cache, key) - if len(s.cache) <= slCacheSweep { - break - } - } - s.Unlock() -} - // Helper function for auto-expanding remote qsubs. func isRemoteQSub(sub *subscription) bool { return sub != nil && sub.queue != nil && sub.client != nil && (sub.client.kind == ROUTER || sub.client.kind == LEAF) @@ -832,7 +803,7 @@ func (s *Sublist) RemoveBatch(subs []*subscription) error { // Turn caching back on here. atomic.AddUint64(&s.genid, 1) if wasEnabled { - s.cache = make(map[string]*SublistResult) + s.cache = NewCache() } return err } @@ -914,7 +885,7 @@ func (s *Sublist) Count() uint32 { // CacheCount returns the number of result sets in the cache. func (s *Sublist) CacheCount() int { s.RLock() - cc := len(s.cache) + cc := s.cache.Len() s.RUnlock() return cc } @@ -963,7 +934,7 @@ func (s *Sublist) Stats() *SublistStats { s.RLock() cache := s.cache - cc := len(s.cache) + cc := s.CacheCount() st.NumSubs = s.count st.NumInserts = s.inserts st.NumRemoves = s.removes @@ -981,14 +952,14 @@ func (s *Sublist) Stats() *SublistStats { if cache != nil { tot, max, clen := 0, 0, 0 s.RLock() - for _, r := range s.cache { + cache.Iterate(func(key string, r *SublistResult) { clen++ l := len(r.psubs) + len(r.qsubs) tot += l if l > max { max = l } - } + }) s.RUnlock() st.totFanout = tot st.cacheCnt = clen diff --git a/server/sublist_cache.go b/server/sublist_cache.go new file mode 100755 index 0000000000..428120d3b6 --- /dev/null +++ b/server/sublist_cache.go @@ -0,0 +1,91 @@ +// Copyright 2016-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "hash/maphash" + "sync/atomic" +) + +type Cache struct { + data [slCacheMax]atomic.Pointer[cacheEntry] + hashFunc func(string) int +} + +type cacheEntry struct { + key string + value *SublistResult +} + +func NewCache() *Cache { + seed := maphash.MakeSeed() + hfunc := func(str string) int { + return int(maphash.String(seed, str) % slCacheMax) + } + + return &Cache{ + hashFunc: hfunc, + } +} + +func (c *Cache) Set(key string, value *SublistResult) { + if c == nil { + return + } + + index := c.hashFunc(key) + entry := &cacheEntry{key: key, value: value} + c.data[index].Store(entry) +} + +func (c *Cache) Get(key string) (*SublistResult, bool) { + if c == nil { + return nil, false + } + + index := c.hashFunc(key) + entry := c.data[index].Load() + if entry != nil { + if entry.key == key { + return entry.value, true + } + } + return nil, false +} + +func (c *Cache) Delete(key string) { + index := c.hashFunc(key) + c.data[index].Store(nil) +} + +func (c *Cache) Len() int { + var cc int + c.Iterate(func(key string, value *SublistResult) { + cc++ + }) + return cc +} + +func (c *Cache) Iterate(cf func(key string, value *SublistResult)) { + if c == nil { + return + } + + for i := range c.data { + p := c.data[i].Load() + if p != nil { + cf(p.key, p.value) + } + } +}