Skip to content

Commit

Permalink
fix the event cache initialize size and rename the config
Browse files Browse the repository at this point in the history
  • Loading branch information
yujieli-temporal committed Aug 3, 2023
1 parent 9de9ccf commit 4406ef3
Show file tree
Hide file tree
Showing 20 changed files with 277 additions and 90 deletions.
6 changes: 0 additions & 6 deletions common/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ type Options struct {
// are older than the TTL will not be returned.
TTL time.Duration

// InitialCapacity controls the initial capacity of the cache
InitialCapacity int

// Pin prevents in-use objects from getting evicted.
Pin bool

Expand All @@ -76,9 +73,6 @@ type Options struct {

// SimpleOptions provides options that can be used to configure SimpleCache
type SimpleOptions struct {
// InitialCapacity controls the initial capacity of the cache
InitialCapacity int

// RemovedFunc is an optional function called when an element
// is scheduled for deletion
RemovedFunc RemovedFunc
Expand Down
120 changes: 75 additions & 45 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func New(maxSize int, opts *Options) Cache {

return &lru{
byAccess: list.New(),
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
byKey: make(map[interface{}]*list.Element),
ttl: opts.TTL,
maxSize: maxSize,
currSize: 0,
Expand All @@ -167,14 +167,6 @@ func NewLRU(maxSize int) Cache {
return New(maxSize, nil)
}

// NewLRUWithInitialCapacity creates a new LRU cache with an initial capacity
// and a max size
func NewLRUWithInitialCapacity(initialCapacity, maxSize int) Cache {
return New(maxSize, &Options{
InitialCapacity: initialCapacity,
})
}

// Get retrieves the value stored under the given key
func (c *lru) Get(key interface{}) interface{} {
if c.maxSize == 0 { //
Expand Down Expand Up @@ -274,61 +266,66 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
if c.maxSize == 0 {
return nil, nil
}
entrySize := getSize(value)
if entrySize > c.maxSize {
newEntrySize := getSize(value)
if newEntrySize > c.maxSize {
return nil, ErrCacheItemTooLarge
}

c.mut.Lock()
defer c.mut.Unlock()

c.currSize += entrySize
c.tryEvictUntilEnoughSpace()
// If there is still not enough space, remove the new entry size from the current size and return an error
if c.currSize > c.maxSize {
c.currSize -= entrySize
return nil, ErrCacheFull
if allowUpdate {
c.tryEvictUntilEnoughSpace(newEntrySize)
} else {
c.tryEvictUntilEnoughSpaceWithSkipKey(newEntrySize, key)
}

elt := c.byKey[key]
// If the entry exists, check if it has expired or update the value
if elt != nil {
entry := elt.Value.(*entryImpl)
if c.isEntryExpired(entry, time.Now().UTC()) {
// Entry has expired
c.deleteInternal(elt)
} else {
existing := entry.value
existingEntry := elt.Value.(*entryImpl)
if !c.isEntryExpired(existingEntry, time.Now().UTC()) {
existing := existingEntry.value
if allowUpdate {
entry.value = value
if c.ttl != 0 {
entry.createTime = time.Now().UTC()
newCacheSize := c.currSize - existingEntry.Size() + newEntrySize
if newCacheSize > c.maxSize {
// This should never happen since allowUpdate is always **true** for non-pinned cache,
// and if all entries are not pinned, then the cache should never be full as long as
// new entry's size is less than max size.
// However, to prevent any unexpected behavior, it checks the cache size again.
return nil, ErrCacheFull
}
c.currSize -= existingEntry.Size()
existingEntry.value = value
c.currSize += newEntrySize
c.updateEntryTTL(existingEntry)
}

c.updateEntryRefCount(existingEntry)
c.byAccess.MoveToFront(elt)
if c.pin {
entry.refCount++
}
return existing, nil
}
}

entry := &entryImpl{
key: key,
value: value,
size: entrySize,
// Entry has expired
c.deleteInternal(elt)
}

if c.pin {
entry.refCount++
// check if the new entry can fit in the cache
if c.currSize+newEntrySize > c.maxSize {
return nil, ErrCacheFull
}

if c.ttl != 0 {
entry.createTime = c.timeSource.Now().UTC()
entry := &entryImpl{
key: key,
value: value,
size: newEntrySize,
}

c.updateEntryTTL(entry)
c.updateEntryRefCount(entry)
element := c.byAccess.PushFront(entry)
c.byKey[key] = element
c.currSize += newEntrySize
return nil, nil
}

Expand All @@ -339,20 +336,53 @@ func (c *lru) deleteInternal(element *list.Element) {
}

// tryEvictUntilEnoughSpace try to evict entries until there is enough space for the new entry
func (c *lru) tryEvictUntilEnoughSpace() {
func (c *lru) tryEvictUntilEnoughSpace(newEntrySize int) {
element := c.byAccess.Back()
for c.currSize > c.maxSize && element != nil {
// currSize will be updated within deleteInternal
for c.currSize+newEntrySize > c.maxSize && element != nil {
entry := element.Value.(*entryImpl)
if entry.refCount == 0 {
c.deleteInternal(element)
element = c.tryEvictAndGetPreviousElement(entry, element)
}
}

// tryEvictUntilEnoughSpace try to evict entries until there is enough space for the new entry
func (c *lru) tryEvictUntilEnoughSpaceWithSkipKey(newEntrySize int, key interface{}) {
element := c.byAccess.Back()
// currSize will be updated within deleteInternal
for c.currSize+newEntrySize > c.maxSize && element != nil {
entry := element.Value.(*entryImpl)
// do not delete the entry that the key request to be updated but not allowed
if entry.key == key {
element = element.Prev()
continue
}
element = c.tryEvictAndGetPreviousElement(entry, element)
}
}

// entry.refCount > 0
// skip, entry still being referenced
element = element.Prev()
func (c *lru) tryEvictAndGetPreviousElement(entry *entryImpl, element *list.Element) *list.Element {
if entry.refCount == 0 {
elementPrev := element.Prev()
c.deleteInternal(element)
return elementPrev
}
// entry.refCount > 0
// skip, entry still being referenced
return element.Prev()
}

func (c *lru) isEntryExpired(entry *entryImpl, currentTime time.Time) bool {
return entry.refCount == 0 && !entry.createTime.IsZero() && currentTime.After(entry.createTime.Add(c.ttl))
}

func (c *lru) updateEntryTTL(entry *entryImpl) {
if c.ttl != 0 {
entry.createTime = c.timeSource.Now().UTC()
}
}

func (c *lru) updateEntryRefCount(entry *entryImpl) {
if c.pin {
entry.refCount++
}
}
Loading

0 comments on commit 4406ef3

Please sign in to comment.