Skip to content

Commit

Permalink
remove the event cache initial size and refactor putInternal (#4698)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
remove the cache initial size from lru cache and related configs. also
refactor putInternal to better support different scenarios.

and added more unittests for cache size.

<!-- Tell your future self why have you made these changes -->
**Why?**
in #4621 I change the history
cache to use item bytes instead item count. However it accidentally
touched default number of key when cache initialized


<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
unittest

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**
Memory usage increase and pined mutable state will never leave the cache, which cause block workflow.

<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
  • Loading branch information
yujieli-temporal committed Aug 8, 2023
1 parent a3a4ac5 commit a608721
Show file tree
Hide file tree
Showing 20 changed files with 318 additions and 93 deletions.
6 changes: 0 additions & 6 deletions common/cache/cache.go
Expand Up @@ -64,9 +64,6 @@ type Options struct {
// are older than the TTL will not be returned.
TTL time.Duration

// InitialCapacity controls the initial capacity of the cache
InitialCapacity int

// Pin prevents in-use objects from getting evicted.
Pin bool

Expand All @@ -76,9 +73,6 @@ type Options struct {

// SimpleOptions provides options that can be used to configure SimpleCache
type SimpleOptions struct {
// InitialCapacity controls the initial capacity of the cache
InitialCapacity int

// RemovedFunc is an optional function called when an element
// is scheduled for deletion
RemovedFunc RemovedFunc
Expand Down
131 changes: 82 additions & 49 deletions common/cache/lru.go
Expand Up @@ -40,6 +40,8 @@ var (
ErrCacheItemTooLarge = errors.New("cache item size is larger than max cache capacity")
)

const emptyEntrySize = 0

// lru is a concurrent fixed size cache that evicts elements in lru order
type (
lru struct {
Expand Down Expand Up @@ -152,7 +154,7 @@ func New(maxSize int, opts *Options) Cache {

return &lru{
byAccess: list.New(),
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
byKey: make(map[interface{}]*list.Element),
ttl: opts.TTL,
maxSize: maxSize,
currSize: 0,
Expand All @@ -167,14 +169,6 @@ func NewLRU(maxSize int) Cache {
return New(maxSize, nil)
}

// NewLRUWithInitialCapacity creates a new LRU cache with an initial capacity
// and a max size
func NewLRUWithInitialCapacity(initialCapacity, maxSize int) Cache {
return New(maxSize, &Options{
InitialCapacity: initialCapacity,
})
}

// Get retrieves the value stored under the given key
func (c *lru) Get(key interface{}) interface{} {
if c.maxSize == 0 { //
Expand Down Expand Up @@ -274,85 +268,124 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
if c.maxSize == 0 {
return nil, nil
}
entrySize := getSize(value)
if entrySize > c.maxSize {
newEntrySize := getSize(value)
if newEntrySize > c.maxSize {
return nil, ErrCacheItemTooLarge
}

c.mut.Lock()
defer c.mut.Unlock()

c.currSize += entrySize
c.tryEvictUntilEnoughSpace()
// If there is still not enough space, remove the new entry size from the current size and return an error
if c.currSize > c.maxSize {
c.currSize -= entrySize
return nil, ErrCacheFull
}

elt := c.byKey[key]
// If the entry exists, check if it has expired or update the value
if elt != nil {
entry := elt.Value.(*entryImpl)
if c.isEntryExpired(entry, time.Now().UTC()) {
// Entry has expired
c.deleteInternal(elt)
} else {
existing := entry.value
existingEntry := elt.Value.(*entryImpl)
if !c.isEntryExpired(existingEntry, time.Now().UTC()) {
existingVal := existingEntry.value
if allowUpdate {
entry.value = value
if c.ttl != 0 {
entry.createTime = time.Now().UTC()
newCacheSize := c.calculateNewCacheSize(newEntrySize, existingEntry.Size())
if newCacheSize > c.maxSize {
c.tryEvictUntilEnoughSpaceWithSkipEntry(newEntrySize, existingEntry)
// calculate again after eviction
newCacheSize = c.calculateNewCacheSize(newEntrySize, existingEntry.Size())
if newCacheSize > c.maxSize {
// This should never happen since allowUpdate is always **true** for non-pinned cache,
// and if all entries are not pinned(ref==0), then the cache should never be full as long as
// new entry's size is less than max size.
// However, to prevent any unexpected behavior, it checks the cache size again.
return nil, ErrCacheFull
}
}
existingEntry.value = value
existingEntry.size = newEntrySize
c.currSize = newCacheSize
c.updateEntryTTL(existingEntry)
}

c.updateEntryRefCount(existingEntry)
c.byAccess.MoveToFront(elt)
if c.pin {
entry.refCount++
}
return existing, nil
return existingVal, nil
}
}

entry := &entryImpl{
key: key,
value: value,
size: entrySize,
// Entry has expired
c.deleteInternal(elt)
}

if c.pin {
entry.refCount++
c.tryEvictUntilEnoughSpaceWithSkipEntry(newEntrySize, nil)

// check if the new entry can fit in the cache
newCacheSize := c.calculateNewCacheSize(newEntrySize, emptyEntrySize)
if newCacheSize > c.maxSize {
return nil, ErrCacheFull
}

if c.ttl != 0 {
entry.createTime = c.timeSource.Now().UTC()
entry := &entryImpl{
key: key,
value: value,
size: newEntrySize,
}

c.updateEntryTTL(entry)
c.updateEntryRefCount(entry)
element := c.byAccess.PushFront(entry)
c.byKey[key] = element
c.currSize = newCacheSize
return nil, nil
}

func (c *lru) calculateNewCacheSize(newEntrySize int, existingEntrySize int) int {
return c.currSize - existingEntrySize + newEntrySize
}

func (c *lru) deleteInternal(element *list.Element) {
entry := c.byAccess.Remove(element).(*entryImpl)
c.currSize -= entry.Size()
delete(c.byKey, entry.key)
}

// tryEvictUntilEnoughSpace try to evict entries until there is enough space for the new entry
func (c *lru) tryEvictUntilEnoughSpace() {
// tryEvictUntilEnoughSpace try to evict entries until there is enough space for the new entry without
// evicting the existing entry. the existing entry is skipped because it is being updated.
func (c *lru) tryEvictUntilEnoughSpaceWithSkipEntry(newEntrySize int, existingEntry *entryImpl) {
element := c.byAccess.Back()
for c.currSize > c.maxSize && element != nil {
existingEntrySize := 0
if existingEntry != nil {
existingEntrySize = existingEntry.Size()
}

for c.calculateNewCacheSize(newEntrySize, existingEntrySize) > c.maxSize && element != nil {
entry := element.Value.(*entryImpl)
if entry.refCount == 0 {
c.deleteInternal(element)
if existingEntry != nil && entry.key == existingEntry.key {
element = element.Prev()
continue
}
element = c.tryEvictAndGetPreviousElement(entry, element)
}
}

// entry.refCount > 0
// skip, entry still being referenced
element = element.Prev()
func (c *lru) tryEvictAndGetPreviousElement(entry *entryImpl, element *list.Element) *list.Element {
if entry.refCount == 0 {
elementPrev := element.Prev()
// currSize will be updated within deleteInternal
c.deleteInternal(element)
return elementPrev
}
// entry.refCount > 0
// skip, entry still being referenced
return element.Prev()
}

func (c *lru) isEntryExpired(entry *entryImpl, currentTime time.Time) bool {
return entry.refCount == 0 && !entry.createTime.IsZero() && currentTime.After(entry.createTime.Add(c.ttl))
}

func (c *lru) updateEntryTTL(entry *entryImpl) {
if c.ttl != 0 {
entry.createTime = c.timeSource.Now().UTC()
}
}

func (c *lru) updateEntryRefCount(entry *entryImpl) {
if c.pin {
entry.refCount++
}
}

0 comments on commit a608721

Please sign in to comment.