Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove the event cache initial size and refactor putInternal #4698

Merged
merged 8 commits into from Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 0 additions & 6 deletions common/cache/cache.go
Expand Up @@ -64,9 +64,6 @@ type Options struct {
// are older than the TTL will not be returned.
TTL time.Duration

// InitialCapacity controls the initial capacity of the cache
InitialCapacity int

// Pin prevents in-use objects from getting evicted.
Pin bool

Expand All @@ -76,9 +73,6 @@ type Options struct {

// SimpleOptions provides options that can be used to configure SimpleCache
type SimpleOptions struct {
// InitialCapacity controls the initial capacity of the cache
InitialCapacity int

// RemovedFunc is an optional function called when an element
// is scheduled for deletion
RemovedFunc RemovedFunc
Expand Down
131 changes: 82 additions & 49 deletions common/cache/lru.go
Expand Up @@ -40,6 +40,8 @@ var (
ErrCacheItemTooLarge = errors.New("cache item size is larger than max cache capacity")
)

const emptyEntrySize = 0

// lru is a concurrent fixed size cache that evicts elements in lru order
type (
lru struct {
Expand Down Expand Up @@ -152,7 +154,7 @@ func New(maxSize int, opts *Options) Cache {

return &lru{
byAccess: list.New(),
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
byKey: make(map[interface{}]*list.Element),
ttl: opts.TTL,
maxSize: maxSize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for this PR, but if I were writing code using this, I'd want to control both max total size and max element size separately, so that I can prevent one element from using the entire cache space. just an idea for the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, I think is future improvement for the cache.

currSize: 0,
Expand All @@ -167,14 +169,6 @@ func NewLRU(maxSize int) Cache {
return New(maxSize, nil)
}

// NewLRUWithInitialCapacity creates a new LRU cache with an initial capacity
// and a max size
func NewLRUWithInitialCapacity(initialCapacity, maxSize int) Cache {
return New(maxSize, &Options{
InitialCapacity: initialCapacity,
})
}

// Get retrieves the value stored under the given key
func (c *lru) Get(key interface{}) interface{} {
if c.maxSize == 0 { //
Expand Down Expand Up @@ -274,85 +268,124 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
if c.maxSize == 0 {
return nil, nil
}
entrySize := getSize(value)
if entrySize > c.maxSize {
newEntrySize := getSize(value)
if newEntrySize > c.maxSize {
return nil, ErrCacheItemTooLarge
}

c.mut.Lock()
defer c.mut.Unlock()

c.currSize += entrySize
c.tryEvictUntilEnoughSpace()
// If there is still not enough space, remove the new entry size from the current size and return an error
if c.currSize > c.maxSize {
c.currSize -= entrySize
return nil, ErrCacheFull
}

elt := c.byKey[key]
// If the entry exists, check if it has expired or update the value
if elt != nil {
entry := elt.Value.(*entryImpl)
if c.isEntryExpired(entry, time.Now().UTC()) {
// Entry has expired
c.deleteInternal(elt)
} else {
existing := entry.value
existingEntry := elt.Value.(*entryImpl)
if !c.isEntryExpired(existingEntry, time.Now().UTC()) {
existingVal := existingEntry.value
if allowUpdate {
entry.value = value
if c.ttl != 0 {
entry.createTime = time.Now().UTC()
newCacheSize := c.calculateNewCacheSize(newEntrySize, existingEntry.Size())
if newCacheSize > c.maxSize {
c.tryEvictUntilEnoughSpaceWithSkipEntry(newEntrySize, existingEntry)
// calculate again after eviction
newCacheSize = c.calculateNewCacheSize(newEntrySize, existingEntry.Size())
if newCacheSize > c.maxSize {
// This should never happen since allowUpdate is always **true** for non-pinned cache,
// and if all entries are not pinned(ref==0), then the cache should never be full as long as
// new entry's size is less than max size.
// However, to prevent any unexpected behavior, it checks the cache size again.
return nil, ErrCacheFull
}
}
existingEntry.value = value
existingEntry.size = newEntrySize
c.currSize = newCacheSize
c.updateEntryTTL(existingEntry)
}
yujieli-temporal marked this conversation as resolved.
Show resolved Hide resolved

c.updateEntryRefCount(existingEntry)
c.byAccess.MoveToFront(elt)
if c.pin {
entry.refCount++
}
return existing, nil
return existingVal, nil
}
}

entry := &entryImpl{
key: key,
value: value,
size: entrySize,
// Entry has expired
c.deleteInternal(elt)
}

if c.pin {
entry.refCount++
c.tryEvictUntilEnoughSpaceWithSkipEntry(newEntrySize, nil)

// check if the new entry can fit in the cache
newCacheSize := c.calculateNewCacheSize(newEntrySize, emptyEntrySize)
if newCacheSize > c.maxSize {
return nil, ErrCacheFull
}

if c.ttl != 0 {
entry.createTime = c.timeSource.Now().UTC()
entry := &entryImpl{
key: key,
value: value,
size: newEntrySize,
}

c.updateEntryTTL(entry)
c.updateEntryRefCount(entry)
element := c.byAccess.PushFront(entry)
c.byKey[key] = element
c.currSize = newCacheSize
return nil, nil
}

func (c *lru) calculateNewCacheSize(newEntrySize int, existingEntrySize int) int {
return c.currSize - existingEntrySize + newEntrySize
}

func (c *lru) deleteInternal(element *list.Element) {
entry := c.byAccess.Remove(element).(*entryImpl)
c.currSize -= entry.Size()
delete(c.byKey, entry.key)
}

// tryEvictUntilEnoughSpace try to evict entries until there is enough space for the new entry
func (c *lru) tryEvictUntilEnoughSpace() {
// tryEvictUntilEnoughSpace try to evict entries until there is enough space for the new entry without
// evicting the existing entry. the existing entry is skipped because it is being updated.
func (c *lru) tryEvictUntilEnoughSpaceWithSkipEntry(newEntrySize int, existingEntry *entryImpl) {
element := c.byAccess.Back()
for c.currSize > c.maxSize && element != nil {
existingEntrySize := 0
if existingEntry != nil {
existingEntrySize = existingEntry.Size()
}

for c.calculateNewCacheSize(newEntrySize, existingEntrySize) > c.maxSize && element != nil {
entry := element.Value.(*entryImpl)
if entry.refCount == 0 {
c.deleteInternal(element)
if existingEntry != nil && entry.key == existingEntry.key {
element = element.Prev()
continue
}
element = c.tryEvictAndGetPreviousElement(entry, element)
}
}

// entry.refCount > 0
// skip, entry still being referenced
element = element.Prev()
func (c *lru) tryEvictAndGetPreviousElement(entry *entryImpl, element *list.Element) *list.Element {
if entry.refCount == 0 {
elementPrev := element.Prev()
// currSize will be updated within deleteInternal
c.deleteInternal(element)
return elementPrev
}
// entry.refCount > 0
// skip, entry still being referenced
return element.Prev()
}

func (c *lru) isEntryExpired(entry *entryImpl, currentTime time.Time) bool {
return entry.refCount == 0 && !entry.createTime.IsZero() && currentTime.After(entry.createTime.Add(c.ttl))
}

func (c *lru) updateEntryTTL(entry *entryImpl) {
if c.ttl != 0 {
entry.createTime = c.timeSource.Now().UTC()
}
}

func (c *lru) updateEntryRefCount(entry *entryImpl) {
if c.pin {
entry.refCount++
}
}