diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 154def113..8f3269df5 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -308,6 +308,11 @@ func (r *Region) compareAndSwapStore(oldStore, newStore *regionStore) bool { return atomic.CompareAndSwapPointer(&r.store, unsafe.Pointer(oldStore), unsafe.Pointer(newStore)) } +func (r *Region) isCacheTTLExpired(ts int64) bool { + lastAccess := atomic.LoadInt64(&r.lastAccess) + return ts-lastAccess > regionCacheTTLSec +} + func (r *Region) checkRegionCacheTTL(ts int64) bool { // Only consider use percentage on this failpoint, for example, "2%return" if _, err := util.EvalFailpoint("invalidateRegionCache"); err == nil { @@ -417,6 +422,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.ctx, c.cancelFunc = context.WithCancel(context.Background()) interval := config.GetGlobalConfig().StoresRefreshInterval go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second) + go c.cacheGC() c.enableForwarding = config.GetGlobalConfig().EnableForwarding return c } @@ -1906,6 +1912,62 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV } } +const cleanCacheInterval = time.Second +const cleanRegionNumPerRound = 50 + +// This function is expected to run in a background goroutine. +// It keeps iterating over the whole region cache, searching for stale region +// info. It runs at cleanCacheInterval and checks only cleanRegionNumPerRound +// regions. In this way, the impact of this background goroutine should be +// negligible. +func (c *RegionCache) cacheGC() { + ticker := time.NewTicker(cleanCacheInterval) + defer ticker.Stop() + + beginning := newBtreeSearchItem([]byte("")) + iterItem := beginning + expired := make([]*btreeItem, cleanRegionNumPerRound) + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + count := 0 + expired = expired[:0] + + // Only RLock when checking TTL to avoid blocking other readers + c.mu.RLock() + ts := time.Now().Unix() + c.mu.sorted.b.AscendGreaterOrEqual(iterItem, func(item *btreeItem) bool { + if count > cleanRegionNumPerRound { + iterItem = item + return false + } + count++ + if item.cachedRegion.isCacheTTLExpired(ts) { + expired = append(expired, item) + } + return true + }) + c.mu.RUnlock() + + // Reach the end of the region cache, start from the beginning + if count <= cleanRegionNumPerRound { + iterItem = beginning + } + + if len(expired) > 0 { + c.mu.Lock() + for _, item := range expired { + c.mu.sorted.b.Delete(item) + c.removeVersionFromCache(item.cachedRegion.VerID(), item.cachedRegion.GetID()) + } + c.mu.Unlock() + } + } + } +} + // btreeItem is BTree's Item that uses []byte to compare. type btreeItem struct { key []byte diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 3e92ac94e..423b8ca5b 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -40,6 +40,7 @@ import ( "fmt" "math/rand" "reflect" + "sync/atomic" "testing" "time" @@ -1608,3 +1609,59 @@ func (s *testRegionCacheSuite) TestShouldNotRetryFlashback() { s.Error(err) s.False(shouldRetry) } + +func (s *testRegionCacheSuite) TestBackgroundCacheGC() { + // Prepare 100 regions + regionCnt := 100 + regions := s.cluster.AllocIDs(regionCnt) + regions = append([]uint64{s.region1}, regions...) + peers := [][]uint64{{s.peer1, s.peer2}} + for i := 0; i < regionCnt; i++ { + peers = append(peers, s.cluster.AllocIDs(2)) + } + for i := 0; i < regionCnt; i++ { + s.cluster.Split(regions[i], regions[i+1], []byte(fmt.Sprintf(regionSplitKeyFormat, i)), peers[i+1], peers[i+1][0]) + } + loadRegionsToCache(s.cache, regionCnt) + s.checkCache(regionCnt) + + // Make parts of the regions stale + remaining := 0 + s.cache.mu.Lock() + now := time.Now().Unix() + for verID, r := range s.cache.mu.regions { + if verID.id%3 == 0 { + atomic.StoreInt64(&r.lastAccess, now-regionCacheTTLSec-10) + } else { + remaining++ + } + } + s.cache.mu.Unlock() + + s.Eventually(func() bool { + s.cache.mu.RLock() + defer s.cache.mu.RUnlock() + return len(s.cache.mu.regions) == remaining + }, 3*time.Second, 200*time.Millisecond) + s.checkCache(remaining) + + // Make another part of the regions stale + remaining = 0 + s.cache.mu.Lock() + now = time.Now().Unix() + for verID, r := range s.cache.mu.regions { + if verID.id%3 == 1 { + atomic.StoreInt64(&r.lastAccess, now-regionCacheTTLSec-10) + } else { + remaining++ + } + } + s.cache.mu.Unlock() + + s.Eventually(func() bool { + s.cache.mu.RLock() + defer s.cache.mu.RUnlock() + return len(s.cache.mu.regions) == remaining + }, 3*time.Second, 200*time.Millisecond) + s.checkCache(remaining) +} diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index a9f495bf5..c0f579ab6 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -304,7 +304,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { cache := NewRegionCache(s.cache.pdClient) defer cache.Close() + cache.mu.Lock() cache.insertRegionToCache(region) + cache.mu.Unlock() // Verify creating the replicaSelector. replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req) @@ -537,7 +539,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Nil(err) // Test accessFollower state filtering epoch-stale stores. - region.lastAccess = time.Now().Unix() + atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) refreshEpochs(regionStore) // Mark all followers as stale. tiKVNum := regionStore.accessStoreNum(tiKVOnly) diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index a8bb345db..4727270a6 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -832,6 +832,17 @@ func (txn *KVTxn) filterAggressiveLockedKeys(lockCtx *tikv.LockCtx, allKeys [][] // LockKeys tries to lock the entries with the keys in KV store. // lockCtx is the context for lock, lockCtx.lockWaitTime in ms func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput ...[]byte) error { + return txn.lockKeys(ctx, lockCtx, nil, keysInput...) +} + +// LockKeysFunc tries to lock the entries with the keys in KV store. +// lockCtx is the context for lock, lockCtx.lockWaitTime in ms +// fn is a function which run before the lock is released. +func (txn *KVTxn) LockKeysFunc(ctx context.Context, lockCtx *tikv.LockCtx, fn func(), keysInput ...[]byte) error { + return txn.lockKeys(ctx, lockCtx, fn, keysInput...) +} + +func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func(), keysInput ...[]byte) error { if txn.interceptor != nil { // User has called txn.SetRPCInterceptor() to explicitly set an interceptor, we // need to bind it to ctx so that the internal client can perceive and execute @@ -869,6 +880,11 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput } } }() + defer func() { + if fn != nil { + fn() + } + }() if !txn.IsPessimistic() && txn.aggressiveLockingContext != nil { return errors.New("trying to perform aggressive locking in optimistic transaction") diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 6a75db0fc..8ba0888d4 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -952,6 +952,19 @@ func (rs *SnapshotRuntimeStats) Clone() *SnapshotRuntimeStats { newRs.backoffTimes[k] += v } } + + if rs.scanDetail != nil { + newRs.scanDetail = rs.scanDetail + } + + if rs.timeDetail != nil { + newRs.timeDetail = rs.timeDetail + } + + if rs.resolveLockDetail != nil { + newRs.resolveLockDetail = rs.resolveLockDetail + } + return &newRs }