From 17a79f4ca90749472899318ffbb6c4684fc7f169 Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Wed, 8 May 2024 14:45:30 +0800 Subject: [PATCH] enhance: The LRU cache evicts items and retries loading if the disk limit is reached. (#32819) Signed-off-by: sunby --- .../querynodev2/segments/segment_loader.go | 4 ++-- pkg/util/cache/cache.go | 24 +++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index a9b077d7e5afa..4d0a0eb37cbf1 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -1539,11 +1539,11 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn } if predictDiskUsage > uint64(float64(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64())*paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()) { - return 0, 0, fmt.Errorf("load segment failed, disk space is not enough, diskUsage = %v MB, predictDiskUsage = %v MB, totalDisk = %v MB, thresholdFactor = %f", + return 0, 0, merr.WrapErrServiceDiskLimitExceeded(float32(predictDiskUsage), float32(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64()), fmt.Sprintf("load segment failed, disk space is not enough, diskUsage = %v MB, predictDiskUsage = %v MB, totalDisk = %v MB, thresholdFactor = %f", toMB(diskUsage), toMB(predictDiskUsage), toMB(uint64(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64())), - paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()) + paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat())) } return predictMemUsage - memUsage, predictDiskUsage - diskUsage, nil diff --git a/pkg/util/cache/cache.go b/pkg/util/cache/cache.go index 81fcaa91a9ab5..a4cb10683bae0 100644 --- a/pkg/util/cache/cache.go +++ b/pkg/util/cache/cache.go @@ -349,6 +349,11 @@ func (c *lruCache[K, V]) getAndPin(ctx context.Context, key K) (*cacheItem[K, V] timer := time.Now() value, err := c.loader(ctx, key) c.stats.TotalLoadTimeMs.Add(uint64(time.Since(timer).Milliseconds())) + // Try to evict one item if there is not enough disk space, then retry. + if merr.ErrServiceDiskLimitExceeded.Is(err) { + c.evictItems(ctx, 1) + value, err = c.loader(ctx, key) + } if err != nil { c.stats.LoadFailCount.Inc() log.Debug("loader failed for key", zap.Any("key", key)) @@ -459,6 +464,25 @@ func (c *lruCache[K, V]) evict(ctx context.Context, key K) { } } +func (c *lruCache[K, V]) evictItems(ctx context.Context, n int) { + c.rwlock.Lock() + defer c.rwlock.Unlock() + + toEvict := make([]K, 0) + for p := c.accessList.Back(); p != nil && n > 0; p = p.Prev() { + evictItem := p.Value.(*cacheItem[K, V]) + if evictItem.pinCount.Load() > 0 { + continue + } + toEvict = append(toEvict, evictItem.key) + n-- + } + + for _, key := range toEvict { + c.evict(ctx, key) + } +} + func (c *lruCache[K, V]) MarkItemNeedReload(ctx context.Context, key K) bool { c.rwlock.Lock() defer c.rwlock.Unlock()