From 94df87bcfe32db6d383d378bf4b3f874437dabff Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Mon, 1 Jun 2026 06:46:09 -0400 Subject: [PATCH] Optimize UFFD pager cache concurrency --- lib/uffdpager/cache.go | 208 +++++++++++++++++++++++++++++----- lib/uffdpager/cache_test.go | 21 ++++ lib/uffdpager/server_linux.go | 94 ++++++++++++--- lib/uffdpager/types.go | 26 ++++- 4 files changed, 301 insertions(+), 48 deletions(-) diff --git a/lib/uffdpager/cache.go b/lib/uffdpager/cache.go index 01aedc19..91b0345d 100644 --- a/lib/uffdpager/cache.go +++ b/lib/uffdpager/cache.go @@ -3,6 +3,8 @@ package uffdpager import ( "container/list" "sync" + "sync/atomic" + "time" ) type pageKey struct { @@ -17,38 +19,93 @@ type cacheEntry struct { } type PageCache struct { - mu sync.Mutex + shards []*pageCacheShard +} + +type pageCacheShard struct { + mu sync.RWMutex maxBytes int64 bytes int64 items map[pageKey]*list.Element lru *list.List - hits int64 - misses int64 + + hits atomic.Int64 + misses atomic.Int64 + + lookupNanos atomic.Int64 + lookupMaxNanos atomic.Int64 + addNanos atomic.Int64 + addMaxNanos atomic.Int64 } func NewPageCache(maxBytes int64) *PageCache { + maxBytes = normalizeCacheMaxBytes(maxBytes) + shardCount := pageCacheShardCount(maxBytes) + shards := make([]*pageCacheShard, shardCount) + shardMax := maxBytes / int64(shardCount) + if shardMax <= 0 { + shardMax = maxBytes + } + for i := range shards { + shards[i] = &pageCacheShard{ + maxBytes: shardMax, + items: make(map[pageKey]*list.Element), + lru: list.New(), + } + } return &PageCache{ - maxBytes: normalizeCacheMaxBytes(maxBytes), - items: make(map[pageKey]*list.Element), - lru: list.New(), + shards: shards, } } func (c *PageCache) Get(cacheKey string, offset int64, size int) ([]byte, bool) { key := pageKey{cacheKey: cacheKey, offset: offset, size: size} + data, ok := c.lookup(key, true) + if !ok { + return nil, false + } + return append([]byte(nil), data...), true +} - c.mu.Lock() - defer c.mu.Unlock() +// Borrow returns the immutable cached page without copying or touching LRU state. +func (c *PageCache) Borrow(cacheKey string, offset int64, size int) ([]byte, bool) { + return c.lookup(pageKey{cacheKey: cacheKey, offset: offset, size: size}, false) +} + +func (c *PageCache) lookup(key pageKey, touch bool) ([]byte, bool) { + start := time.Now() + shard := c.shardFor(key) - elem, ok := c.items[key] + if !touch { + shard.mu.RLock() + elem, ok := shard.items[key] + if !ok { + shard.mu.RUnlock() + shard.misses.Add(1) + shard.recordLookupDuration(time.Since(start)) + return nil, false + } + data := elem.Value.(*cacheEntry).data + shard.mu.RUnlock() + shard.hits.Add(1) + shard.recordLookupDuration(time.Since(start)) + return data, true + } + + shard.mu.Lock() + elem, ok := shard.items[key] if !ok { - c.misses++ + shard.mu.Unlock() + shard.misses.Add(1) + shard.recordLookupDuration(time.Since(start)) return nil, false } - c.hits++ - c.lru.MoveToFront(elem) + shard.lru.MoveToFront(elem) entry := elem.Value.(*cacheEntry) - data := append([]byte(nil), entry.data...) + data := entry.data + shard.mu.Unlock() + shard.hits.Add(1) + shard.recordLookupDuration(time.Since(start)) return data, true } @@ -56,35 +113,69 @@ func (c *PageCache) Add(cacheKey string, offset int64, data []byte) { if len(data) == 0 { return } + start := time.Now() key := pageKey{cacheKey: cacheKey, offset: offset, size: len(data)} value := append([]byte(nil), data...) + shard := c.shardFor(key) - c.mu.Lock() - defer c.mu.Unlock() + shard.mu.Lock() + defer shard.mu.Unlock() + defer func() { + shard.recordAddDuration(time.Since(start)) + }() - if elem, ok := c.items[key]; ok { + if elem, ok := shard.items[key]; ok { entry := elem.Value.(*cacheEntry) - c.bytes -= int64(len(entry.data)) + shard.bytes -= int64(len(entry.data)) entry.data = value - c.bytes += int64(len(entry.data)) - c.lru.MoveToFront(elem) - c.evictLocked() + shard.bytes += int64(len(entry.data)) + shard.lru.MoveToFront(elem) + shard.evictLocked() return } - elem := c.lru.PushFront(&cacheEntry{key: key, data: value}) - c.items[key] = elem - c.bytes += int64(len(value)) - c.evictLocked() + elem := shard.lru.PushFront(&cacheEntry{key: key, data: value}) + shard.items[key] = elem + shard.bytes += int64(len(value)) + shard.evictLocked() } func (c *PageCache) SnapshotStats() (bytes, maxBytes int64, items int, hits, misses int64) { - c.mu.Lock() - defer c.mu.Unlock() - return c.bytes, c.maxBytes, len(c.items), c.hits, c.misses + for _, shard := range c.shards { + shard.mu.Lock() + bytes += shard.bytes + maxBytes += shard.maxBytes + items += len(shard.items) + shard.mu.Unlock() + hits += shard.hits.Load() + misses += shard.misses.Load() + } + return bytes, maxBytes, items, hits, misses +} + +func (c *PageCache) SnapshotTimingStats() (shards int, lookupNanos, lookupMaxNanos, addNanos, addMaxNanos int64) { + for _, shard := range c.shards { + shards++ + lookupNanos += shard.lookupNanos.Load() + if max := shard.lookupMaxNanos.Load(); max > lookupMaxNanos { + lookupMaxNanos = max + } + addNanos += shard.addNanos.Load() + if max := shard.addMaxNanos.Load(); max > addMaxNanos { + addMaxNanos = max + } + } + return shards, lookupNanos, lookupMaxNanos, addNanos, addMaxNanos } -func (c *PageCache) evictLocked() { +func (c *PageCache) shardFor(key pageKey) *pageCacheShard { + if len(c.shards) == 1 { + return c.shards[0] + } + return c.shards[int(hashPageKey(key)%uint64(len(c.shards)))] +} + +func (c *pageCacheShard) evictLocked() { for c.bytes > c.maxBytes && c.lru.Len() > 0 { elem := c.lru.Back() entry := elem.Value.(*cacheEntry) @@ -93,3 +184,64 @@ func (c *PageCache) evictLocked() { c.lru.Remove(elem) } } + +func (c *pageCacheShard) recordLookupDuration(duration time.Duration) { + nanos := duration.Nanoseconds() + c.lookupNanos.Add(nanos) + atomicMaxCacheInt64(&c.lookupMaxNanos, nanos) +} + +func (c *pageCacheShard) recordAddDuration(duration time.Duration) { + nanos := duration.Nanoseconds() + c.addNanos.Add(nanos) + atomicMaxCacheInt64(&c.addMaxNanos, nanos) +} + +func pageCacheShardCount(maxBytes int64) int { + const targetShardBytes = int64(64 << 20) + count := int(maxBytes / targetShardBytes) + if count < 1 { + return 1 + } + if count > 64 { + return 64 + } + return count +} + +func hashPageKey(key pageKey) uint64 { + const ( + offset64 = 14695981039346656037 + prime64 = 1099511628211 + ) + hash := uint64(offset64) + for i := 0; i < len(key.cacheKey); i++ { + hash ^= uint64(key.cacheKey[i]) + hash *= prime64 + } + hash ^= uint64(key.offset) + hash *= prime64 + hash ^= uint64(key.offset >> 32) + hash *= prime64 + hash ^= uint64(key.size) + hash *= prime64 + return mixPageHash(hash) +} + +func mixPageHash(hash uint64) uint64 { + hash ^= hash >> 33 + hash *= 0xff51afd7ed558ccd + hash ^= hash >> 33 + hash *= 0xc4ceb9fe1a85ec53 + hash ^= hash >> 33 + return hash +} + +func atomicMaxCacheInt64(target *atomic.Int64, candidate int64) { + for { + current := target.Load() + if candidate <= current || target.CompareAndSwap(current, candidate) { + return + } + } +} diff --git a/lib/uffdpager/cache_test.go b/lib/uffdpager/cache_test.go index f360b8e3..80fdc4ed 100644 --- a/lib/uffdpager/cache_test.go +++ b/lib/uffdpager/cache_test.go @@ -44,3 +44,24 @@ func TestPageCacheEvictsLRUWhenBounded(t *testing.T) { t.Fatalf("expected recently used page to remain") } } + +func TestPageCacheDistributesAlignedPagesAcrossShards(t *testing.T) { + cache := NewPageCache(4 << 30) + page := bytes.Repeat([]byte{1}, 4096) + + for i := range 4096 { + cache.Add("snapshot-a", int64(i*4096), page) + } + + usedShards := 0 + for _, shard := range cache.shards { + shard.mu.Lock() + if len(shard.items) > 0 { + usedShards++ + } + shard.mu.Unlock() + } + if usedShards < len(cache.shards)/2 { + t.Fatalf("expected aligned pages to spread across shards, used %d of %d", usedShards, len(cache.shards)) + } +} diff --git a/lib/uffdpager/server_linux.go b/lib/uffdpager/server_linux.go index e39e40a9..ccdc3cc4 100644 --- a/lib/uffdpager/server_linux.go +++ b/lib/uffdpager/server_linux.go @@ -67,6 +67,17 @@ type server struct { backingBytesRead atomic.Int64 copies atomic.Int64 copyErrors atomic.Int64 + + activeFaults atomic.Int64 + maxConcurrentFaults atomic.Int64 + faultNanos atomic.Int64 + faultMaxNanos atomic.Int64 + readPageNanos atomic.Int64 + readPageMaxNanos atomic.Int64 + backingReadNanos atomic.Int64 + backingReadMaxNanos atomic.Int64 + copyNanos atomic.Int64 + copyMaxNanos atomic.Int64 } type session struct { @@ -159,20 +170,36 @@ func (s *server) handleHealth(w http.ResponseWriter, r *http.Request) { func (s *server) handleStats(w http.ResponseWriter, r *http.Request) { cacheBytes, cacheMax, cacheItems, hits, misses := s.cache.SnapshotStats() + cacheShards, cacheLookupNanos, cacheLookupMaxNanos, cacheAddNanos, cacheAddMaxNanos := s.cache.SnapshotTimingStats() s.writeJSON(w, http.StatusOK, Stats{ - Version: s.versionKey, - Draining: s.isDraining(), - ActiveSessions: s.activeSessions(), - CacheBytes: cacheBytes, - CacheMax: cacheMax, - CacheItems: cacheItems, - CacheHits: hits, - CacheMisses: misses, - Faults: s.faults.Load(), - OverlayFaults: s.overlayFaults.Load(), - BackingBytesRead: s.backingBytesRead.Load(), - Copies: s.copies.Load(), - CopyErrors: s.copyErrors.Load(), + Version: s.versionKey, + Draining: s.isDraining(), + ActiveSessions: s.activeSessions(), + CacheBytes: cacheBytes, + CacheMax: cacheMax, + CacheItems: cacheItems, + CacheHits: hits, + CacheMisses: misses, + CacheShards: cacheShards, + CacheLookupNanos: cacheLookupNanos, + CacheLookupMaxNanos: cacheLookupMaxNanos, + CacheAddNanos: cacheAddNanos, + CacheAddMaxNanos: cacheAddMaxNanos, + Faults: s.faults.Load(), + OverlayFaults: s.overlayFaults.Load(), + BackingBytesRead: s.backingBytesRead.Load(), + Copies: s.copies.Load(), + CopyErrors: s.copyErrors.Load(), + ActiveFaults: s.activeFaults.Load(), + MaxConcurrentFaults: s.maxConcurrentFaults.Load(), + FaultNanos: s.faultNanos.Load(), + FaultMaxNanos: s.faultMaxNanos.Load(), + ReadPageNanos: s.readPageNanos.Load(), + ReadPageMaxNanos: s.readPageMaxNanos.Load(), + BackingReadNanos: s.backingReadNanos.Load(), + BackingReadMaxNanos: s.backingReadMaxNanos.Load(), + CopyNanos: s.copyNanos.Load(), + CopyMaxNanos: s.copyMaxNanos.Load(), }) } @@ -457,6 +484,16 @@ func readUFFDEvent(fd int, buf []byte) (uffdEvent, bool, error) { } func (s *session) servePageFault(mappings []guestRegionUffdMapping, faultAddr int64) error { + start := time.Now() + active := s.server.activeFaults.Add(1) + atomicMaxInt64(&s.server.maxConcurrentFaults, active) + defer func() { + s.server.activeFaults.Add(-1) + nanos := time.Since(start).Nanoseconds() + s.server.faultNanos.Add(nanos) + atomicMaxInt64(&s.server.faultMaxNanos, nanos) + }() + mapping, pageAddr, pageOffset, pageSize, ok := findMapping(mappings, faultAddr) if !ok { return fmt.Errorf("fault address %#x outside guest mappings", faultAddr) @@ -471,26 +508,44 @@ func (s *session) servePageFault(mappings []guestRegionUffdMapping, faultAddr in if overlay { s.server.overlayFaults.Add(1) } + copyStart := time.Now() if err := uffdCopy(s.uffdFD, uint64(pageAddr), page); err != nil { + nanos := time.Since(copyStart).Nanoseconds() + s.server.copyNanos.Add(nanos) + atomicMaxInt64(&s.server.copyMaxNanos, nanos) return err } + nanos := time.Since(copyStart).Nanoseconds() + s.server.copyNanos.Add(nanos) + atomicMaxInt64(&s.server.copyMaxNanos, nanos) s.server.copies.Add(1) return nil } func (s *session) readPage(offset int64, size int) ([]byte, bool, error) { + start := time.Now() + defer func() { + nanos := time.Since(start).Nanoseconds() + s.server.readPageNanos.Add(nanos) + atomicMaxInt64(&s.server.readPageMaxNanos, nanos) + }() + if page, ok := s.overlays[offset]; ok { if len(page) != size { return nil, true, fmt.Errorf("overlay page at offset %d has size %d, expected %d", offset, len(page), size) } - return append([]byte(nil), page...), true, nil + return page, true, nil } - if page, ok := s.server.cache.Get(s.cacheKey, offset, size); ok { + if page, ok := s.server.cache.Borrow(s.cacheKey, offset, size); ok { return page, false, nil } page := make([]byte, size) + readStart := time.Now() n, err := s.backingFile.ReadAt(page, offset) + readNanos := time.Since(readStart).Nanoseconds() + s.server.backingReadNanos.Add(readNanos) + atomicMaxInt64(&s.server.backingReadMaxNanos, readNanos) if err != nil && !errors.Is(err, io.EOF) { return nil, false, fmt.Errorf("read backing page at %d: %w", offset, err) } @@ -625,6 +680,15 @@ func uffdCopy(fd int, dst uint64, page []byte) error { return errno } +func atomicMaxInt64(target *atomic.Int64, candidate int64) { + for { + current := target.Load() + if candidate <= current || target.CompareAndSwap(current, candidate) { + return + } + } +} + func sanitizeSessionID(id string) string { id = strings.TrimSpace(id) if id == "" { diff --git a/lib/uffdpager/types.go b/lib/uffdpager/types.go index 12dcc49a..4f49e5b4 100644 --- a/lib/uffdpager/types.go +++ b/lib/uffdpager/types.go @@ -41,17 +41,33 @@ type Stats struct { Draining bool `json:"draining"` ActiveSessions int `json:"active_sessions"` - CacheBytes int64 `json:"cache_bytes"` - CacheMax int64 `json:"cache_max"` - CacheItems int `json:"cache_items"` - CacheHits int64 `json:"cache_hits"` - CacheMisses int64 `json:"cache_misses"` + CacheBytes int64 `json:"cache_bytes"` + CacheMax int64 `json:"cache_max"` + CacheItems int `json:"cache_items"` + CacheHits int64 `json:"cache_hits"` + CacheMisses int64 `json:"cache_misses"` + CacheShards int `json:"cache_shards"` + CacheLookupNanos int64 `json:"cache_lookup_nanos"` + CacheLookupMaxNanos int64 `json:"cache_lookup_max_nanos"` + CacheAddNanos int64 `json:"cache_add_nanos"` + CacheAddMaxNanos int64 `json:"cache_add_max_nanos"` Faults int64 `json:"faults"` OverlayFaults int64 `json:"overlay_faults"` BackingBytesRead int64 `json:"backing_bytes_read"` Copies int64 `json:"copies"` CopyErrors int64 `json:"copy_errors"` + + ActiveFaults int64 `json:"active_faults"` + MaxConcurrentFaults int64 `json:"max_concurrent_faults"` + FaultNanos int64 `json:"fault_nanos"` + FaultMaxNanos int64 `json:"fault_max_nanos"` + ReadPageNanos int64 `json:"read_page_nanos"` + ReadPageMaxNanos int64 `json:"read_page_max_nanos"` + BackingReadNanos int64 `json:"backing_read_nanos"` + BackingReadMaxNanos int64 `json:"backing_read_max_nanos"` + CopyNanos int64 `json:"copy_nanos"` + CopyMaxNanos int64 `json:"copy_max_nanos"` } func normalizeCacheMaxBytes(v int64) int64 {