Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 180 additions & 28 deletions lib/uffdpager/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package uffdpager
import (
"container/list"
"sync"
"sync/atomic"
"time"
)

type pageKey struct {
Expand All @@ -17,74 +19,163 @@ type cacheEntry struct {
}

type PageCache struct {
mu sync.Mutex
shards []*pageCacheShard
}

type pageCacheShard struct {
mu sync.RWMutex
maxBytes int64
bytes int64
items map[pageKey]*list.Element
lru *list.List
hits int64
misses int64

hits atomic.Int64
misses atomic.Int64

lookupNanos atomic.Int64
lookupMaxNanos atomic.Int64
addNanos atomic.Int64
addMaxNanos atomic.Int64
}

func NewPageCache(maxBytes int64) *PageCache {
maxBytes = normalizeCacheMaxBytes(maxBytes)
shardCount := pageCacheShardCount(maxBytes)
shards := make([]*pageCacheShard, shardCount)
shardMax := maxBytes / int64(shardCount)
if shardMax <= 0 {
shardMax = maxBytes
}
for i := range shards {
shards[i] = &pageCacheShard{
maxBytes: shardMax,
items: make(map[pageKey]*list.Element),
lru: list.New(),
}
}
return &PageCache{
maxBytes: normalizeCacheMaxBytes(maxBytes),
items: make(map[pageKey]*list.Element),
lru: list.New(),
shards: shards,
}
}

func (c *PageCache) Get(cacheKey string, offset int64, size int) ([]byte, bool) {
key := pageKey{cacheKey: cacheKey, offset: offset, size: size}
data, ok := c.lookup(key, true)
if !ok {
return nil, false
}
return append([]byte(nil), data...), true
}

c.mu.Lock()
defer c.mu.Unlock()
// Borrow returns the immutable cached page without copying or touching LRU state.
func (c *PageCache) Borrow(cacheKey string, offset int64, size int) ([]byte, bool) {
return c.lookup(pageKey{cacheKey: cacheKey, offset: offset, size: size}, false)
}

func (c *PageCache) lookup(key pageKey, touch bool) ([]byte, bool) {
start := time.Now()
shard := c.shardFor(key)

elem, ok := c.items[key]
if !touch {
shard.mu.RLock()
elem, ok := shard.items[key]
if !ok {
shard.mu.RUnlock()
shard.misses.Add(1)
shard.recordLookupDuration(time.Since(start))
return nil, false
}
data := elem.Value.(*cacheEntry).data
shard.mu.RUnlock()
shard.hits.Add(1)
shard.recordLookupDuration(time.Since(start))
return data, true
}

shard.mu.Lock()
elem, ok := shard.items[key]
if !ok {
c.misses++
shard.mu.Unlock()
shard.misses.Add(1)
shard.recordLookupDuration(time.Since(start))
return nil, false
}
c.hits++
c.lru.MoveToFront(elem)
shard.lru.MoveToFront(elem)
entry := elem.Value.(*cacheEntry)
data := append([]byte(nil), entry.data...)
data := entry.data
shard.mu.Unlock()
shard.hits.Add(1)
shard.recordLookupDuration(time.Since(start))
return data, true
}

func (c *PageCache) Add(cacheKey string, offset int64, data []byte) {
if len(data) == 0 {
return
}
start := time.Now()
key := pageKey{cacheKey: cacheKey, offset: offset, size: len(data)}
value := append([]byte(nil), data...)
shard := c.shardFor(key)

c.mu.Lock()
defer c.mu.Unlock()
shard.mu.Lock()
defer shard.mu.Unlock()
defer func() {
shard.recordAddDuration(time.Since(start))
}()

if elem, ok := c.items[key]; ok {
if elem, ok := shard.items[key]; ok {
entry := elem.Value.(*cacheEntry)
c.bytes -= int64(len(entry.data))
shard.bytes -= int64(len(entry.data))
entry.data = value
c.bytes += int64(len(entry.data))
c.lru.MoveToFront(elem)
c.evictLocked()
shard.bytes += int64(len(entry.data))
shard.lru.MoveToFront(elem)
shard.evictLocked()
return
}

elem := c.lru.PushFront(&cacheEntry{key: key, data: value})
c.items[key] = elem
c.bytes += int64(len(value))
c.evictLocked()
elem := shard.lru.PushFront(&cacheEntry{key: key, data: value})
shard.items[key] = elem
shard.bytes += int64(len(value))
shard.evictLocked()
}

func (c *PageCache) SnapshotStats() (bytes, maxBytes int64, items int, hits, misses int64) {
c.mu.Lock()
defer c.mu.Unlock()
return c.bytes, c.maxBytes, len(c.items), c.hits, c.misses
for _, shard := range c.shards {
shard.mu.Lock()
bytes += shard.bytes
maxBytes += shard.maxBytes
items += len(shard.items)
shard.mu.Unlock()
hits += shard.hits.Load()
misses += shard.misses.Load()
}
return bytes, maxBytes, items, hits, misses
}

func (c *PageCache) SnapshotTimingStats() (shards int, lookupNanos, lookupMaxNanos, addNanos, addMaxNanos int64) {
for _, shard := range c.shards {
shards++
lookupNanos += shard.lookupNanos.Load()
if max := shard.lookupMaxNanos.Load(); max > lookupMaxNanos {
lookupMaxNanos = max
}
addNanos += shard.addNanos.Load()
if max := shard.addMaxNanos.Load(); max > addMaxNanos {
addMaxNanos = max
}
}
return shards, lookupNanos, lookupMaxNanos, addNanos, addMaxNanos
}

func (c *PageCache) evictLocked() {
func (c *PageCache) shardFor(key pageKey) *pageCacheShard {
if len(c.shards) == 1 {
return c.shards[0]
}
return c.shards[int(hashPageKey(key)%uint64(len(c.shards)))]
}

func (c *pageCacheShard) evictLocked() {
for c.bytes > c.maxBytes && c.lru.Len() > 0 {
elem := c.lru.Back()
entry := elem.Value.(*cacheEntry)
Expand All @@ -93,3 +184,64 @@ func (c *PageCache) evictLocked() {
c.lru.Remove(elem)
}
}

func (c *pageCacheShard) recordLookupDuration(duration time.Duration) {
nanos := duration.Nanoseconds()
c.lookupNanos.Add(nanos)
atomicMaxCacheInt64(&c.lookupMaxNanos, nanos)
}

func (c *pageCacheShard) recordAddDuration(duration time.Duration) {
nanos := duration.Nanoseconds()
c.addNanos.Add(nanos)
atomicMaxCacheInt64(&c.addMaxNanos, nanos)
}

func pageCacheShardCount(maxBytes int64) int {
const targetShardBytes = int64(64 << 20)
count := int(maxBytes / targetShardBytes)
if count < 1 {
return 1
}
if count > 64 {
return 64
}
return count
}

func hashPageKey(key pageKey) uint64 {
const (
offset64 = 14695981039346656037
prime64 = 1099511628211
)
hash := uint64(offset64)
for i := 0; i < len(key.cacheKey); i++ {
hash ^= uint64(key.cacheKey[i])
hash *= prime64
}
hash ^= uint64(key.offset)
hash *= prime64
hash ^= uint64(key.offset >> 32)
hash *= prime64
hash ^= uint64(key.size)
hash *= prime64
return mixPageHash(hash)
}

func mixPageHash(hash uint64) uint64 {
hash ^= hash >> 33
hash *= 0xff51afd7ed558ccd
hash ^= hash >> 33
hash *= 0xc4ceb9fe1a85ec53
hash ^= hash >> 33
return hash
}

func atomicMaxCacheInt64(target *atomic.Int64, candidate int64) {
for {
current := target.Load()
if candidate <= current || target.CompareAndSwap(current, candidate) {
return
}
}
}
21 changes: 21 additions & 0 deletions lib/uffdpager/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,24 @@ func TestPageCacheEvictsLRUWhenBounded(t *testing.T) {
t.Fatalf("expected recently used page to remain")
}
}

func TestPageCacheDistributesAlignedPagesAcrossShards(t *testing.T) {
cache := NewPageCache(4 << 30)
page := bytes.Repeat([]byte{1}, 4096)

for i := range 4096 {
cache.Add("snapshot-a", int64(i*4096), page)
}

usedShards := 0
for _, shard := range cache.shards {
shard.mu.Lock()
if len(shard.items) > 0 {
usedShards++
}
shard.mu.Unlock()
}
if usedShards < len(cache.shards)/2 {
t.Fatalf("expected aligned pages to spread across shards, used %d of %d", usedShards, len(cache.shards))
}
}
Loading
Loading