From a60872188a96256e6f1ed17664389b0d75de10fa Mon Sep 17 00:00:00 2001 From: Yujie Li <120415427+yujieli-temporal@users.noreply.github.com> Date: Tue, 8 Aug 2023 01:31:54 -0700 Subject: [PATCH] remove the event cache initial size and refactor putInternal (#4698) **What changed?** remove the cache initial size from lru cache and related configs. also refactor putInternal to better support different scenarios. and added more unittests for cache size. **Why?** in https://github.com/temporalio/temporal/pull/4621 I change the history cache to use item bytes instead item count. However it accidentally touched default number of key when cache initialized **How did you test it?** unittest **Potential risks** Memory usage increase and pined mutable state will never leave the cache, which cause block workflow. **Is hotfix candidate?** --- common/cache/cache.go | 6 - common/cache/lru.go | 131 ++++++---- common/cache/lru_test.go | 223 ++++++++++++++++++ common/cache/simple.go | 2 +- common/dynamicconfig/constants.go | 2 - common/namespace/registry.go | 11 +- common/persistence/xdc_cache.go | 5 +- service/history/configs/config.go | 10 +- service/history/events/cache.go | 2 - service/history/events/cache_test.go | 1 - service/history/history_engine_test.go | 1 - .../history/replication/ack_manager_test.go | 4 - service/history/shard/context_impl.go | 1 - .../timer_queue_active_task_executor_test.go | 1 - .../timer_queue_standby_task_executor_test.go | 1 - ...ransfer_queue_active_task_executor_test.go | 1 - ...ansfer_queue_standby_task_executor_test.go | 1 - .../visibility_queue_task_executor_test.go | 1 - service/history/workflow/cache/cache.go | 1 - service/matching/poller_history.go | 6 +- 20 files changed, 318 insertions(+), 93 deletions(-) diff --git a/common/cache/cache.go b/common/cache/cache.go index 044f951bd34..0893446f36e 100644 --- a/common/cache/cache.go +++ b/common/cache/cache.go @@ -64,9 +64,6 @@ type Options struct { // are older than the TTL will not be returned. TTL time.Duration - // InitialCapacity controls the initial capacity of the cache - InitialCapacity int - // Pin prevents in-use objects from getting evicted. Pin bool @@ -76,9 +73,6 @@ type Options struct { // SimpleOptions provides options that can be used to configure SimpleCache type SimpleOptions struct { - // InitialCapacity controls the initial capacity of the cache - InitialCapacity int - // RemovedFunc is an optional function called when an element // is scheduled for deletion RemovedFunc RemovedFunc diff --git a/common/cache/lru.go b/common/cache/lru.go index 6ce1b13fbf5..ad61c6eb94a 100644 --- a/common/cache/lru.go +++ b/common/cache/lru.go @@ -40,6 +40,8 @@ var ( ErrCacheItemTooLarge = errors.New("cache item size is larger than max cache capacity") ) +const emptyEntrySize = 0 + // lru is a concurrent fixed size cache that evicts elements in lru order type ( lru struct { @@ -152,7 +154,7 @@ func New(maxSize int, opts *Options) Cache { return &lru{ byAccess: list.New(), - byKey: make(map[interface{}]*list.Element, opts.InitialCapacity), + byKey: make(map[interface{}]*list.Element), ttl: opts.TTL, maxSize: maxSize, currSize: 0, @@ -167,14 +169,6 @@ func NewLRU(maxSize int) Cache { return New(maxSize, nil) } -// NewLRUWithInitialCapacity creates a new LRU cache with an initial capacity -// and a max size -func NewLRUWithInitialCapacity(initialCapacity, maxSize int) Cache { - return New(maxSize, &Options{ - InitialCapacity: initialCapacity, - }) -} - // Get retrieves the value stored under the given key func (c *lru) Get(key interface{}) interface{} { if c.maxSize == 0 { // @@ -274,85 +268,124 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) if c.maxSize == 0 { return nil, nil } - entrySize := getSize(value) - if entrySize > c.maxSize { + newEntrySize := getSize(value) + if newEntrySize > c.maxSize { return nil, ErrCacheItemTooLarge } c.mut.Lock() defer c.mut.Unlock() - c.currSize += entrySize - c.tryEvictUntilEnoughSpace() - // If there is still not enough space, remove the new entry size from the current size and return an error - if c.currSize > c.maxSize { - c.currSize -= entrySize - return nil, ErrCacheFull - } - elt := c.byKey[key] + // If the entry exists, check if it has expired or update the value if elt != nil { - entry := elt.Value.(*entryImpl) - if c.isEntryExpired(entry, time.Now().UTC()) { - // Entry has expired - c.deleteInternal(elt) - } else { - existing := entry.value + existingEntry := elt.Value.(*entryImpl) + if !c.isEntryExpired(existingEntry, time.Now().UTC()) { + existingVal := existingEntry.value if allowUpdate { - entry.value = value - if c.ttl != 0 { - entry.createTime = time.Now().UTC() + newCacheSize := c.calculateNewCacheSize(newEntrySize, existingEntry.Size()) + if newCacheSize > c.maxSize { + c.tryEvictUntilEnoughSpaceWithSkipEntry(newEntrySize, existingEntry) + // calculate again after eviction + newCacheSize = c.calculateNewCacheSize(newEntrySize, existingEntry.Size()) + if newCacheSize > c.maxSize { + // This should never happen since allowUpdate is always **true** for non-pinned cache, + // and if all entries are not pinned(ref==0), then the cache should never be full as long as + // new entry's size is less than max size. + // However, to prevent any unexpected behavior, it checks the cache size again. + return nil, ErrCacheFull + } } + existingEntry.value = value + existingEntry.size = newEntrySize + c.currSize = newCacheSize + c.updateEntryTTL(existingEntry) } + c.updateEntryRefCount(existingEntry) c.byAccess.MoveToFront(elt) - if c.pin { - entry.refCount++ - } - return existing, nil + return existingVal, nil } - } - entry := &entryImpl{ - key: key, - value: value, - size: entrySize, + // Entry has expired + c.deleteInternal(elt) } - if c.pin { - entry.refCount++ + c.tryEvictUntilEnoughSpaceWithSkipEntry(newEntrySize, nil) + + // check if the new entry can fit in the cache + newCacheSize := c.calculateNewCacheSize(newEntrySize, emptyEntrySize) + if newCacheSize > c.maxSize { + return nil, ErrCacheFull } - if c.ttl != 0 { - entry.createTime = c.timeSource.Now().UTC() + entry := &entryImpl{ + key: key, + value: value, + size: newEntrySize, } + c.updateEntryTTL(entry) + c.updateEntryRefCount(entry) element := c.byAccess.PushFront(entry) c.byKey[key] = element + c.currSize = newCacheSize return nil, nil } +func (c *lru) calculateNewCacheSize(newEntrySize int, existingEntrySize int) int { + return c.currSize - existingEntrySize + newEntrySize +} + func (c *lru) deleteInternal(element *list.Element) { entry := c.byAccess.Remove(element).(*entryImpl) c.currSize -= entry.Size() delete(c.byKey, entry.key) } -// tryEvictUntilEnoughSpace try to evict entries until there is enough space for the new entry -func (c *lru) tryEvictUntilEnoughSpace() { +// tryEvictUntilEnoughSpace try to evict entries until there is enough space for the new entry without +// evicting the existing entry. the existing entry is skipped because it is being updated. +func (c *lru) tryEvictUntilEnoughSpaceWithSkipEntry(newEntrySize int, existingEntry *entryImpl) { element := c.byAccess.Back() - for c.currSize > c.maxSize && element != nil { + existingEntrySize := 0 + if existingEntry != nil { + existingEntrySize = existingEntry.Size() + } + + for c.calculateNewCacheSize(newEntrySize, existingEntrySize) > c.maxSize && element != nil { entry := element.Value.(*entryImpl) - if entry.refCount == 0 { - c.deleteInternal(element) + if existingEntry != nil && entry.key == existingEntry.key { + element = element.Prev() + continue } + element = c.tryEvictAndGetPreviousElement(entry, element) + } +} - // entry.refCount > 0 - // skip, entry still being referenced - element = element.Prev() +func (c *lru) tryEvictAndGetPreviousElement(entry *entryImpl, element *list.Element) *list.Element { + if entry.refCount == 0 { + elementPrev := element.Prev() + // currSize will be updated within deleteInternal + c.deleteInternal(element) + return elementPrev } + // entry.refCount > 0 + // skip, entry still being referenced + return element.Prev() } func (c *lru) isEntryExpired(entry *entryImpl, currentTime time.Time) bool { return entry.refCount == 0 && !entry.createTime.IsZero() && currentTime.After(entry.createTime.Add(c.ttl)) } + +func (c *lru) updateEntryTTL(entry *entryImpl) { + if c.ttl != 0 { + entry.createTime = c.timeSource.Now().UTC() + } +} + +func (c *lru) updateEntryRefCount(entry *entryImpl) { + if c.pin { + entry.refCount++ + } +} diff --git a/common/cache/lru_test.go b/common/cache/lru_test.go index 468e48e79d1..5649cece434 100644 --- a/common/cache/lru_test.go +++ b/common/cache/lru_test.go @@ -71,19 +71,23 @@ func TestLRU(t *testing.T) { cache.Put("A", "Foo2") assert.Equal(t, "Foo2", cache.Get("A")) + assert.Equal(t, 4, cache.Size()) cache.Put("E", "Epsi") assert.Equal(t, "Epsi", cache.Get("E")) assert.Equal(t, "Foo2", cache.Get("A")) assert.Nil(t, cache.Get("B")) // Oldest, should be evicted + assert.Equal(t, 4, cache.Size()) // Access C, D is now LRU cache.Get("C") cache.Put("F", "Felp") assert.Nil(t, cache.Get("D")) + assert.Equal(t, 4, cache.Size()) cache.Delete("A") assert.Nil(t, cache.Get("A")) + assert.Equal(t, 3, cache.Size()) } func TestGenerics(t *testing.T) { @@ -107,6 +111,11 @@ func TestGenerics(t *testing.T) { dummyString: "some other random key", dummyInt: 56, })) + assert.Equal(t, 1, cache.Size()) + + cache.Put(key, "some other random value") + assert.Equal(t, "some other random value", cache.Get(key)) + assert.Equal(t, 1, cache.Size()) } func TestLRUWithTTL(t *testing.T) { @@ -205,13 +214,16 @@ func TestTTLWithPin(t *testing.T) { _, err := cache.PutIfNotExist("A", t) assert.NoError(t, err) assert.Equal(t, t, cache.Get("A")) + assert.Equal(t, 1, cache.Size()) timeSource.Advance(time.Millisecond * 100) assert.Equal(t, t, cache.Get("A")) + assert.Equal(t, 1, cache.Size()) // release 3 time since put if not exist also increase the counter cache.Release("A") cache.Release("A") cache.Release("A") assert.Nil(t, cache.Get("A")) + assert.Equal(t, 0, cache.Size()) } func TestMaxSizeWithPin_MidItem(t *testing.T) { @@ -226,31 +238,38 @@ func TestMaxSizeWithPin_MidItem(t *testing.T) { _, err := cache.PutIfNotExist("A", t) assert.NoError(t, err) + assert.Equal(t, 1, cache.Size()) _, err = cache.PutIfNotExist("B", t) assert.NoError(t, err) + assert.Equal(t, 2, cache.Size()) _, err = cache.PutIfNotExist("C", t) assert.Error(t, err) + assert.Equal(t, 2, cache.Size()) assert.Equal(t, t, cache.Get("A")) cache.Release("A") // get will also increase the ref count assert.Equal(t, t, cache.Get("B")) cache.Release("B") // get will also increase the ref count + assert.Equal(t, 2, cache.Size()) cache.Release("B") // B's ref count is 0 _, err = cache.PutIfNotExist("C", t) assert.NoError(t, err) assert.Equal(t, t, cache.Get("C")) cache.Release("C") // get will also increase the ref count + assert.Equal(t, 2, cache.Size()) cache.Release("A") // A's ref count is 0 cache.Release("C") // C's ref count is 0 + assert.Equal(t, 2, cache.Size()) timeSource.Advance(time.Millisecond * 100) assert.Nil(t, cache.Get("A")) assert.Nil(t, cache.Get("B")) assert.Nil(t, cache.Get("C")) + assert.Equal(t, 0, cache.Size()) } func TestMaxSizeWithPin_LastItem(t *testing.T) { @@ -265,31 +284,38 @@ func TestMaxSizeWithPin_LastItem(t *testing.T) { _, err := cache.PutIfNotExist("A", t) assert.NoError(t, err) + assert.Equal(t, 1, cache.Size()) _, err = cache.PutIfNotExist("B", t) assert.NoError(t, err) + assert.Equal(t, 2, cache.Size()) _, err = cache.PutIfNotExist("C", t) assert.Error(t, err) + assert.Equal(t, 2, cache.Size()) assert.Equal(t, t, cache.Get("A")) cache.Release("A") // get will also increase the ref count assert.Equal(t, t, cache.Get("B")) cache.Release("B") // get will also increase the ref count + assert.Equal(t, 2, cache.Size()) cache.Release("A") // A's ref count is 0 _, err = cache.PutIfNotExist("C", t) assert.NoError(t, err) assert.Equal(t, t, cache.Get("C")) cache.Release("C") // get will also increase the ref count + assert.Equal(t, 2, cache.Size()) cache.Release("B") // B's ref count is 0 cache.Release("C") // C's ref count is 0 + assert.Equal(t, 2, cache.Size()) timeSource.Advance(time.Millisecond * 100) assert.Nil(t, cache.Get("A")) assert.Nil(t, cache.Get("B")) assert.Nil(t, cache.Get("C")) + assert.Equal(t, 0, cache.Size()) } func TestIterator(t *testing.T) { @@ -354,10 +380,12 @@ func TestCache_ItemSizeTooLarge(t *testing.T) { res := cache.Put(uuid.New(), &testEntryWithCacheSize{maxTotalBytes}) assert.Equal(t, res, nil) + assert.Equal(t, 10, cache.Size()) res, err := cache.PutIfNotExist(uuid.New(), &testEntryWithCacheSize{maxTotalBytes + 1}) assert.Equal(t, err, ErrCacheItemTooLarge) assert.Equal(t, res, nil) + assert.Equal(t, 10, cache.Size()) } @@ -392,3 +420,198 @@ func TestCache_ItemHasCacheSizeDefined(t *testing.T) { endWG.Wait() } + +func TestCache_ItemHasCacheSizeDefined_PutWithNewKeys(t *testing.T) { + t.Parallel() + + maxTotalBytes := 10 + cache := NewLRU(maxTotalBytes) + + // Put with new key and value size greater than cache size, should not be added to cache + cache.Put(uuid.New(), &testEntryWithCacheSize{15}) + assert.Equal(t, 0, cache.Size()) + + // Put with new key and value size less than cache size, should be added to cache + cache.Put(uuid.New(), &testEntryWithCacheSize{5}) + assert.Equal(t, 5, cache.Size()) + + // Put with new key and value size less than cache size, should evict 0 ref items and added to cache + cache.Put(uuid.New(), &testEntryWithCacheSize{10}) + assert.Equal(t, 10, cache.Size()) + + // Put with new key and value size less than cache size, should evict 0 ref items until enough spaces and added to cache + cache.Put(uuid.New(), &testEntryWithCacheSize{3}) + assert.Equal(t, 3, cache.Size()) + cache.Put(uuid.New(), &testEntryWithCacheSize{7}) + assert.Equal(t, 10, cache.Size()) +} + +func TestCache_ItemHasCacheSizeDefined_PutWithSameKeyAndDifferentSizes(t *testing.T) { + t.Parallel() + + maxTotalBytes := 10 + cache := NewLRU(maxTotalBytes) + + key1 := "A" + cache.Put(key1, &testEntryWithCacheSize{4}) + assert.Equal(t, 4, cache.Size()) + + key2 := "B" + cache.Put(key2, &testEntryWithCacheSize{4}) + // 4 + 4 = 8 < 10 should not evict any items + assert.Equal(t, 8, cache.Size()) + // put same key with smaller size, should not evict any items + cache.Put(key2, &testEntryWithCacheSize{3}) + assert.Equal(t, cache.Get(key1), &testEntryWithCacheSize{4}) + // 8 - 4 + 3 = 7 < 10, should not evict any items + assert.Equal(t, 7, cache.Size()) + + // put same key with larger size, but below cache size, should not evict any items + cache.Put(key2, &testEntryWithCacheSize{6}) + // 7 - 3 + 6 = 10 =< 10, should not evict any items + assert.Equal(t, 10, cache.Size()) + // get key1 after to make it the most recently used + assert.Equal(t, cache.Get(key2), &testEntryWithCacheSize{6}) + assert.Equal(t, cache.Get(key1), &testEntryWithCacheSize{4}) + + // put same key with larger size, but take all cache size, should evict all items + cache.Put(key2, &testEntryWithCacheSize{10}) + // 10 - 4 - 6 + 10 = 10 =< 10, should evict all items + assert.Equal(t, 10, cache.Size()) + assert.Equal(t, cache.Get(key1), nil) + assert.Equal(t, cache.Get(key2), &testEntryWithCacheSize{10}) +} + +func TestCache_ItemHasCacheSizeDefined_PutWithSameKey(t *testing.T) { + t.Parallel() + + maxTotalBytes := 10 + cache := NewLRU(maxTotalBytes) + + key := uuid.New() + + // Put with same key and value size greater than cache size, should not be added to cache + cache.Put(key, &testEntryWithCacheSize{15}) + assert.Equal(t, 0, cache.Size()) + + // Put with same key and value size less than cache size, should be added to cache + cache.Put(key, &testEntryWithCacheSize{5}) + assert.Equal(t, 5, cache.Size()) + + // Put with same key and value size less than cache size, should be evicted until enough space and added to cache + cache.Put(key, &testEntryWithCacheSize{10}) + assert.Equal(t, 10, cache.Size()) + + // Put with same key and value size less than cache size, should be evicted until enough space and added to cache + cache.Put(key, &testEntryWithCacheSize{3}) + assert.Equal(t, 3, cache.Size()) + cache.Put(key, &testEntryWithCacheSize{7}) + assert.Equal(t, 7, cache.Size()) +} + +func TestCache_ItemHasCacheSizeDefined_PutIfNotExistWithNewKeys(t *testing.T) { + t.Parallel() + + maxTotalBytes := 10 + cache := NewLRU(maxTotalBytes) + + // PutIfNotExist with new keys with size greater than cache size, should return error and not add to cache + val, err := cache.PutIfNotExist(uuid.New(), &testEntryWithCacheSize{15}) + assert.Equal(t, ErrCacheItemTooLarge, err) + assert.Nil(t, val) + assert.Equal(t, 0, cache.Size()) + + // PutIfNotExist with new keys with size less than cache size, should add to cache + val, err = cache.PutIfNotExist(uuid.New(), &testEntryWithCacheSize{5}) + assert.NoError(t, err) + assert.Equal(t, &testEntryWithCacheSize{5}, val) + assert.Equal(t, 5, cache.Size()) + + // PutIfNotExist with new keys with size less than cache size, should evict item and add to cache + val, err = cache.PutIfNotExist(uuid.New(), &testEntryWithCacheSize{10}) + assert.NoError(t, err) + assert.Equal(t, &testEntryWithCacheSize{10}, val) + assert.Equal(t, 10, cache.Size()) + + // PutIfNotExist with new keys with size less than cache size, should evict item and add to cache + val, err = cache.PutIfNotExist(uuid.New(), &testEntryWithCacheSize{5}) + assert.NoError(t, err) + assert.Equal(t, &testEntryWithCacheSize{5}, val) + assert.Equal(t, 5, cache.Size()) +} + +func TestCache_ItemHasCacheSizeDefined_PutIfNotExistWithSameKey(t *testing.T) { + t.Parallel() + + maxTotalBytes := 10 + cache := NewLRU(maxTotalBytes) + key := uuid.New().String() + + // PutIfNotExist with new keys with size greater than cache size, should return error and not add to cache + val, err := cache.PutIfNotExist(key, &testEntryWithCacheSize{15}) + assert.Equal(t, ErrCacheItemTooLarge, err) + assert.Nil(t, val) + assert.Equal(t, 0, cache.Size()) + + // PutIfNotExist with new keys with size less than cache size, should add to cache + val, err = cache.PutIfNotExist(key, &testEntryWithCacheSize{5}) + assert.NoError(t, err) + assert.Equal(t, &testEntryWithCacheSize{5}, val) + assert.Equal(t, 5, cache.Size()) + + // PutIfNotExist with same keys with size less than cache size, should not be added to cache + val, err = cache.PutIfNotExist(key, &testEntryWithCacheSize{10}) + assert.NoError(t, err) + assert.Equal(t, &testEntryWithCacheSize{5}, val) + assert.Equal(t, 5, cache.Size()) +} + +func TestCache_PutIfNotExistWithNewKeys_Pin(t *testing.T) { + t.Parallel() + + maxTotalBytes := 10 + cache := New(maxTotalBytes, &Options{Pin: true}) + + val, err := cache.PutIfNotExist(uuid.New(), &testEntryWithCacheSize{15}) + assert.Equal(t, ErrCacheItemTooLarge, err) + assert.Nil(t, val) + assert.Equal(t, 0, cache.Size()) + + val, err = cache.PutIfNotExist(uuid.New(), &testEntryWithCacheSize{3}) + assert.NoError(t, err) + assert.Equal(t, &testEntryWithCacheSize{3}, val) + assert.Equal(t, 3, cache.Size()) + + val, err = cache.PutIfNotExist(uuid.New(), &testEntryWithCacheSize{7}) + assert.NoError(t, err) + assert.Equal(t, &testEntryWithCacheSize{7}, val) + assert.Equal(t, 10, cache.Size()) + + val, err = cache.PutIfNotExist(uuid.New(), &testEntryWithCacheSize{8}) + assert.Equal(t, ErrCacheFull, err) + assert.Nil(t, val) + assert.Equal(t, 10, cache.Size()) +} + +func TestCache_PutIfNotExistWithSameKeys_Pin(t *testing.T) { + t.Parallel() + + maxTotalBytes := 10 + cache := New(maxTotalBytes, &Options{Pin: true}) + + key := uuid.New() + val, err := cache.PutIfNotExist(key, &testEntryWithCacheSize{15}) + assert.Equal(t, ErrCacheItemTooLarge, err) + assert.Nil(t, val) + assert.Equal(t, 0, cache.Size()) + + val, err = cache.PutIfNotExist(key, &testEntryWithCacheSize{3}) + assert.NoError(t, err) + assert.Equal(t, &testEntryWithCacheSize{3}, val) + assert.Equal(t, 3, cache.Size()) + + val, err = cache.PutIfNotExist(key, &testEntryWithCacheSize{7}) + assert.NoError(t, err) + assert.Equal(t, &testEntryWithCacheSize{3}, val) + assert.Equal(t, 3, cache.Size()) +} diff --git a/common/cache/simple.go b/common/cache/simple.go index 4da7c34ef0d..dba8cc29c29 100644 --- a/common/cache/simple.go +++ b/common/cache/simple.go @@ -102,7 +102,7 @@ func NewSimple(opts *SimpleOptions) Cache { } return &simple{ iterateList: list.New(), - accessMap: make(map[interface{}]*list.Element, opts.InitialCapacity), + accessMap: make(map[interface{}]*list.Element), rmFunc: opts.RemovedFunc, } } diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index d8bd4b98b67..26525389e12 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -493,8 +493,6 @@ const ( HistoryShutdownDrainDuration = "history.shutdownDrainDuration" // XDCCacheMaxSizeBytes is max size of events cache in bytes XDCCacheMaxSizeBytes = "history.xdcCacheMaxSizeBytes" - // EventsCacheInitialSizeBytes is initial size of events cache in bytes - EventsCacheInitialSizeBytes = "history.eventsCacheInitialSizeBytes" // EventsCacheMaxSizeBytes is max size of events cache in bytes EventsCacheMaxSizeBytes = "history.eventsCacheMaxSizeBytes" // EventsCacheTTL is TTL of events cache diff --git a/common/namespace/registry.go b/common/namespace/registry.go index 213a5fbd855..675c0ec380d 100644 --- a/common/namespace/registry.go +++ b/common/namespace/registry.go @@ -58,9 +58,8 @@ const ( ) const ( - cacheInitialSize = 10 * 1024 - cacheMaxSize = 64 * 1024 - cacheTTL = 0 // 0 means infinity + cacheMaxSize = 64 * 1024 + cacheTTL = 0 // 0 means infinity // CacheRefreshFailureRetryInterval is the wait time // if refreshment encounters error CacheRefreshFailureRetryInterval = 1 * time.Second @@ -78,12 +77,10 @@ const ( var ( cacheOpts = cache.Options{ - InitialCapacity: cacheInitialSize, - TTL: cacheTTL, + TTL: cacheTTL, } readthroughNotFoundCacheOpts = cache.Options{ - InitialCapacity: cacheInitialSize, - TTL: readthroughCacheTTL, + TTL: readthroughCacheTTL, } ) diff --git a/common/persistence/xdc_cache.go b/common/persistence/xdc_cache.go index e6747a2491a..06cbc524985 100644 --- a/common/persistence/xdc_cache.go +++ b/common/persistence/xdc_cache.go @@ -108,9 +108,8 @@ func NewEventsBlobCache( ) *XDCCacheImpl { return &XDCCacheImpl{ cache: cache.New(util.Max(xdcMinCacheSize, maxBytes), &cache.Options{ - InitialCapacity: xdcMinCacheSize, - TTL: ttl, - Pin: false, + TTL: ttl, + Pin: false, }), } } diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 93597e22fed..64e38881b8f 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -74,9 +74,8 @@ type Config struct { // EventsCache settings // Change of these configs require shard restart - EventsCacheInitialSizeBytes dynamicconfig.IntPropertyFn - EventsCacheMaxSizeBytes dynamicconfig.IntPropertyFn - EventsCacheTTL dynamicconfig.DurationPropertyFn + EventsCacheMaxSizeBytes dynamicconfig.IntPropertyFn + EventsCacheTTL dynamicconfig.DurationPropertyFn // ShardController settings RangeSizeBits uint @@ -359,9 +358,8 @@ func NewConfig( HistoryCacheTTL: dc.GetDurationProperty(dynamicconfig.HistoryCacheTTL, time.Hour), HistoryCacheNonUserContextLockTimeout: dc.GetDurationProperty(dynamicconfig.HistoryCacheNonUserContextLockTimeout, 500*time.Millisecond), - EventsCacheInitialSizeBytes: dc.GetIntProperty(dynamicconfig.EventsCacheInitialSizeBytes, 128*1024), // 128KB - EventsCacheMaxSizeBytes: dc.GetIntProperty(dynamicconfig.EventsCacheMaxSizeBytes, 512*1024), // 512KB - EventsCacheTTL: dc.GetDurationProperty(dynamicconfig.EventsCacheTTL, time.Hour), + EventsCacheMaxSizeBytes: dc.GetIntProperty(dynamicconfig.EventsCacheMaxSizeBytes, 512*1024), // 512KB + EventsCacheTTL: dc.GetDurationProperty(dynamicconfig.EventsCacheTTL, time.Hour), RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute), diff --git a/service/history/events/cache.go b/service/history/events/cache.go index 685c4282c55..dfb0ae3df19 100644 --- a/service/history/events/cache.go +++ b/service/history/events/cache.go @@ -79,7 +79,6 @@ var _ Cache = (*CacheImpl)(nil) func NewEventsCache( shardID int32, - initialCount int, maxCount int, ttl time.Duration, eventsMgr persistence.ExecutionManager, @@ -88,7 +87,6 @@ func NewEventsCache( metricsHandler metrics.Handler, ) *CacheImpl { opts := &cache.Options{} - opts.InitialCapacity = initialCount opts.TTL = ttl return &CacheImpl{ diff --git a/service/history/events/cache_test.go b/service/history/events/cache_test.go index 0353078c979..8387908a56e 100644 --- a/service/history/events/cache_test.go +++ b/service/history/events/cache_test.go @@ -89,7 +89,6 @@ func (s *eventsCacheSuite) newTestEventsCache() *CacheImpl { shardId := int32(10) return NewEventsCache( shardId, - 16, 32, time.Minute, s.mockExecutionManager, diff --git a/service/history/history_engine_test.go b/service/history/history_engine_test.go index 45b41a1daec..a9248e8e67e 100644 --- a/service/history/history_engine_test.go +++ b/service/history/history_engine_test.go @@ -161,7 +161,6 @@ func (s *engineSuite) SetupTest() { s.eventsCache = events.NewEventsCache( s.mockShard.GetShardID(), - s.mockShard.GetConfig().EventsCacheInitialSizeBytes(), s.mockShard.GetConfig().EventsCacheMaxSizeBytes(), s.mockShard.GetConfig().EventsCacheTTL(), s.mockShard.GetExecutionManager(), diff --git a/service/history/replication/ack_manager_test.go b/service/history/replication/ack_manager_test.go index d46870e1d81..cb3ce030e4c 100644 --- a/service/history/replication/ack_manager_test.go +++ b/service/history/replication/ack_manager_test.go @@ -307,7 +307,6 @@ func (s *ackManagerSuite) TestGetTasks_SecondPersistenceErrorReturnsPartialResul eventsCache := events.NewEventsCache( s.mockShard.GetShardID(), - s.mockShard.GetConfig().EventsCacheInitialSizeBytes(), s.mockShard.GetConfig().EventsCacheMaxSizeBytes(), s.mockShard.GetConfig().EventsCacheTTL(), s.mockShard.GetExecutionManager(), @@ -359,7 +358,6 @@ func (s *ackManagerSuite) TestGetTasks_FullPage() { eventsCache := events.NewEventsCache( s.mockShard.GetShardID(), - s.mockShard.GetConfig().EventsCacheInitialSizeBytes(), s.mockShard.GetConfig().EventsCacheMaxSizeBytes(), s.mockShard.GetConfig().EventsCacheTTL(), s.mockShard.GetExecutionManager(), @@ -411,7 +409,6 @@ func (s *ackManagerSuite) TestGetTasks_PartialPage() { eventsCache := events.NewEventsCache( s.mockShard.GetShardID(), - s.mockShard.GetConfig().EventsCacheInitialSizeBytes(), s.mockShard.GetConfig().EventsCacheMaxSizeBytes(), s.mockShard.GetConfig().EventsCacheTTL(), s.mockShard.GetExecutionManager(), @@ -500,7 +497,6 @@ func (s *ackManagerSuite) TestGetTasks_FilterNamespace() { eventsCache := events.NewEventsCache( s.mockShard.GetShardID(), - s.mockShard.GetConfig().EventsCacheInitialSizeBytes(), s.mockShard.GetConfig().EventsCacheMaxSizeBytes(), s.mockShard.GetConfig().EventsCacheTTL(), s.mockShard.GetExecutionManager(), diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 7894f103bab..12b3bff69e5 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -1957,7 +1957,6 @@ func newContext( } shardContext.eventsCache = events.NewEventsCache( shardContext.GetShardID(), - shardContext.GetConfig().EventsCacheInitialSizeBytes(), shardContext.GetConfig().EventsCacheMaxSizeBytes(), shardContext.GetConfig().EventsCacheTTL(), shardContext.GetExecutionManager(), diff --git a/service/history/timer_queue_active_task_executor_test.go b/service/history/timer_queue_active_task_executor_test.go index 730e4604c32..311108c083c 100644 --- a/service/history/timer_queue_active_task_executor_test.go +++ b/service/history/timer_queue_active_task_executor_test.go @@ -140,7 +140,6 @@ func (s *timerQueueActiveTaskExecutorSuite) SetupTest() { ) s.mockShard.SetEventsCacheForTesting(events.NewEventsCache( s.mockShard.GetShardID(), - s.mockShard.GetConfig().EventsCacheInitialSizeBytes(), s.mockShard.GetConfig().EventsCacheMaxSizeBytes(), s.mockShard.GetConfig().EventsCacheTTL(), s.mockShard.GetExecutionManager(), diff --git a/service/history/timer_queue_standby_task_executor_test.go b/service/history/timer_queue_standby_task_executor_test.go index 80456e8a8c8..1aa0b862fbb 100644 --- a/service/history/timer_queue_standby_task_executor_test.go +++ b/service/history/timer_queue_standby_task_executor_test.go @@ -142,7 +142,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) SetupTest() { ) s.mockShard.SetEventsCacheForTesting(events.NewEventsCache( s.mockShard.GetShardID(), - s.mockShard.GetConfig().EventsCacheInitialSizeBytes(), s.mockShard.GetConfig().EventsCacheMaxSizeBytes(), s.mockShard.GetConfig().EventsCacheTTL(), s.mockShard.GetExecutionManager(), diff --git a/service/history/transfer_queue_active_task_executor_test.go b/service/history/transfer_queue_active_task_executor_test.go index 50b325563ed..70006d5a8d0 100644 --- a/service/history/transfer_queue_active_task_executor_test.go +++ b/service/history/transfer_queue_active_task_executor_test.go @@ -175,7 +175,6 @@ func (s *transferQueueActiveTaskExecutorSuite) SetupTest() { ) s.mockShard.SetEventsCacheForTesting(events.NewEventsCache( s.mockShard.GetShardID(), - s.mockShard.GetConfig().EventsCacheInitialSizeBytes(), s.mockShard.GetConfig().EventsCacheMaxSizeBytes(), s.mockShard.GetConfig().EventsCacheTTL(), s.mockShard.GetExecutionManager(), diff --git a/service/history/transfer_queue_standby_task_executor_test.go b/service/history/transfer_queue_standby_task_executor_test.go index 4bd1992d35c..1a3528f76ff 100644 --- a/service/history/transfer_queue_standby_task_executor_test.go +++ b/service/history/transfer_queue_standby_task_executor_test.go @@ -149,7 +149,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) SetupTest() { ) s.mockShard.SetEventsCacheForTesting(events.NewEventsCache( s.mockShard.GetShardID(), - s.mockShard.GetConfig().EventsCacheInitialSizeBytes(), s.mockShard.GetConfig().EventsCacheMaxSizeBytes(), s.mockShard.GetConfig().EventsCacheTTL(), s.mockShard.GetExecutionManager(), diff --git a/service/history/visibility_queue_task_executor_test.go b/service/history/visibility_queue_task_executor_test.go index 661fb2943ba..2f642030c34 100644 --- a/service/history/visibility_queue_task_executor_test.go +++ b/service/history/visibility_queue_task_executor_test.go @@ -121,7 +121,6 @@ func (s *visibilityQueueTaskExecutorSuite) SetupTest() { ) s.mockShard.SetEventsCacheForTesting(events.NewEventsCache( s.mockShard.GetShardID(), - s.mockShard.GetConfig().EventsCacheInitialSizeBytes(), s.mockShard.GetConfig().EventsCacheMaxSizeBytes(), s.mockShard.GetConfig().EventsCacheTTL(), s.mockShard.GetExecutionManager(), diff --git a/service/history/workflow/cache/cache.go b/service/history/workflow/cache/cache.go index b4dc87253dc..7dc65675d8b 100644 --- a/service/history/workflow/cache/cache.go +++ b/service/history/workflow/cache/cache.go @@ -98,7 +98,6 @@ const ( func NewCache(shard shard.Context) Cache { opts := &cache.Options{} config := shard.GetConfig() - opts.InitialCapacity = config.HistoryCacheInitialSize() opts.TTL = config.HistoryCacheTTL() opts.Pin = true diff --git a/service/matching/poller_history.go b/service/matching/poller_history.go index e0a7b9bcf26..d199c5c54c0 100644 --- a/service/matching/poller_history.go +++ b/service/matching/poller_history.go @@ -33,7 +33,6 @@ import ( ) const ( - pollerHistoryInitSize = 0 pollerHistoryInitMaxSize = 1000 pollerHistoryTTL = 5 * time.Minute ) @@ -54,9 +53,8 @@ type pollerHistory struct { func newPollerHistory() *pollerHistory { opts := &cache.Options{ - InitialCapacity: pollerHistoryInitSize, - TTL: pollerHistoryTTL, - Pin: false, + TTL: pollerHistoryTTL, + Pin: false, } return &pollerHistory{