Skip to content

Commit

Permalink
enhance: The LRU cache evicts items and retries loading if the disk l…
Browse files Browse the repository at this point in the history
…imit is reached. (#32819)

Signed-off-by: sunby <sunbingyi1992@gmail.com>
  • Loading branch information
sunby committed May 8, 2024
1 parent a8db16a commit 17a79f4
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
4 changes: 2 additions & 2 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1539,11 +1539,11 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
}

if predictDiskUsage > uint64(float64(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64())*paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()) {
return 0, 0, fmt.Errorf("load segment failed, disk space is not enough, diskUsage = %v MB, predictDiskUsage = %v MB, totalDisk = %v MB, thresholdFactor = %f",
return 0, 0, merr.WrapErrServiceDiskLimitExceeded(float32(predictDiskUsage), float32(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64()), fmt.Sprintf("load segment failed, disk space is not enough, diskUsage = %v MB, predictDiskUsage = %v MB, totalDisk = %v MB, thresholdFactor = %f",
toMB(diskUsage),
toMB(predictDiskUsage),
toMB(uint64(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64())),
paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat())
paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()))
}

return predictMemUsage - memUsage, predictDiskUsage - diskUsage, nil
Expand Down
24 changes: 24 additions & 0 deletions pkg/util/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ func (c *lruCache[K, V]) getAndPin(ctx context.Context, key K) (*cacheItem[K, V]
timer := time.Now()
value, err := c.loader(ctx, key)
c.stats.TotalLoadTimeMs.Add(uint64(time.Since(timer).Milliseconds()))
// Try to evict one item if there is not enough disk space, then retry.
if merr.ErrServiceDiskLimitExceeded.Is(err) {
c.evictItems(ctx, 1)
value, err = c.loader(ctx, key)
}
if err != nil {
c.stats.LoadFailCount.Inc()
log.Debug("loader failed for key", zap.Any("key", key))
Expand Down Expand Up @@ -459,6 +464,25 @@ func (c *lruCache[K, V]) evict(ctx context.Context, key K) {
}
}

func (c *lruCache[K, V]) evictItems(ctx context.Context, n int) {
c.rwlock.Lock()
defer c.rwlock.Unlock()

toEvict := make([]K, 0)
for p := c.accessList.Back(); p != nil && n > 0; p = p.Prev() {
evictItem := p.Value.(*cacheItem[K, V])
if evictItem.pinCount.Load() > 0 {
continue
}
toEvict = append(toEvict, evictItem.key)
n--
}

for _, key := range toEvict {
c.evict(ctx, key)
}
}

func (c *lruCache[K, V]) MarkItemNeedReload(ctx context.Context, key K) bool {
c.rwlock.Lock()
defer c.rwlock.Unlock()
Expand Down

0 comments on commit 17a79f4

Please sign in to comment.