Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: cached thresholds in hot peer cache #5728

Merged
merged 9 commits into from Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion server/schedulers/hot_region_test.go
Expand Up @@ -1450,6 +1450,10 @@ func TestHotCacheKeyThresholds(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := config.NewTestOptions()
statistics.DefaultThresholdsUpdateInterval = 0
defer func() {
statistics.DefaultThresholdsUpdateInterval = statistics.StoreHeartBeatReportInterval
}()
{ // only a few regions
tc := mockcluster.NewCluster(ctx, opt)
tc.SetHotRegionCacheHitsThreshold(0)
Expand Down Expand Up @@ -1726,7 +1730,7 @@ func TestHotCacheCheckRegionFlowWithDifferentThreshold(t *testing.T) {
}
}
items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1)
re.Equal(float64(rate)*statistics.HotThresholdRatio, items[0].GetThresholds()[0])
re.Equal(float64(rate)*statistics.HotThresholdRatio, tc.HotCache.GetThresholds(statistics.Write, items[0].StoreID)[0])
// Threshold of store 1,2,3 is 409.6 units.KiB and others are 1 units.KiB
// Make the hot threshold of some store is high and the others are low
rate = 10 * units.KiB
Expand Down
12 changes: 12 additions & 0 deletions server/statistics/hot_cache.go
Expand Up @@ -202,3 +202,15 @@ func (w *HotCache) GetFilledPeriod(kind RWType) int {
}
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(reportIntervalSecs)*time.Second).GetFilledPeriod()
}

// GetThresholds returns thresholds.
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
// This is used for test purpose.
func (w *HotCache) GetThresholds(kind RWType, storeID uint64) []float64 {
switch kind {
case Write:
return w.writeCache.calcHotThresholds(storeID)
case Read:
return w.readCache.calcHotThresholds(storeID)
}
return nil
}
50 changes: 36 additions & 14 deletions server/statistics/hot_peer_cache.go
Expand Up @@ -60,6 +60,15 @@ var MinHotThresholds = [RegionStatCount]float64{
RegionWriteQueryNum: 32,
}

// DefaultThresholdsUpdateInterval is the default interval to update thresholds.
// the refresh interval should be less than store heartbeat interval to keep the next calculate must use the latest threshold.
var DefaultThresholdsUpdateInterval = StoreHeartBeatReportInterval * 0.8 * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about putting them into the same place with StoreHeartBeatReportInterval and 8s directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cannot be const, because tests need.


type thresholdWithTime struct {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
updatedTime time.Time
rates []float64
}

// hotPeerCache saves the hot peer's statistics.
type hotPeerCache struct {
kind RWType
Expand All @@ -69,16 +78,25 @@ type hotPeerCache struct {
topNTTL time.Duration
reportIntervalSecs int
taskQueue chan FlowItemTask
defaultThresholds []float64
thresholdsOfStore map[uint64]*thresholdWithTime // storeID -> thresholds
}

// NewHotPeerCache creates a hotPeerCache
func NewHotPeerCache(kind RWType) *hotPeerCache {
statKinds := kind.RegionStats()
defaultThresholds := make([]float64, DimLen)
for dim, kind := range statKinds {
defaultThresholds[dim] = MinHotThresholds[kind]
}
c := &hotPeerCache{
kind: kind,
peersOfStore: make(map[uint64]*TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
regionsOfStore: make(map[uint64]map[uint64]struct{}),
taskQueue: make(chan FlowItemTask, queueCap),
kind: kind,
peersOfStore: make(map[uint64]*TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
regionsOfStore: make(map[uint64]map[uint64]struct{}),
taskQueue: make(chan FlowItemTask, queueCap),
defaultThresholds: defaultThresholds,
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
thresholdsOfStore: make(map[uint64]*thresholdWithTime),
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
}
if kind == Write {
c.reportIntervalSecs = WriteReportInterval
Expand Down Expand Up @@ -294,19 +312,23 @@ func (f *hotPeerCache) getOldHotPeerStat(regionID, storeID uint64) *HotPeerStat
}

func (f *hotPeerCache) calcHotThresholds(storeID uint64) []float64 {
statKinds := f.kind.RegionStats()
ret := make([]float64, DimLen)
for dim, kind := range statKinds {
ret[dim] = MinHotThresholds[kind]
}
tn, ok := f.peersOfStore[storeID]
if !ok || tn.Len() < TopNN {
return ret
return f.defaultThresholds
}
for i := range ret {
ret[i] = math.Max(tn.GetTopNMin(i).(*HotPeerStat).GetLoad(i)*HotThresholdRatio, ret[i])
thresholds, ok := f.thresholdsOfStore[storeID]
if ok && time.Since(thresholds.updatedTime) <= DefaultThresholdsUpdateInterval {
Copy link
Member

@rleungx rleungx Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it affect the write threshold?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but less interval means update more frequently, so it will only make write cache more accurate

return thresholds.rates
}
return ret
thresholds = &thresholdWithTime{
updatedTime: time.Now(),
rates: make([]float64, DimLen),
}
for i := range thresholds.rates {
thresholds.rates[i] = math.Max(tn.GetTopNMin(i).(*HotPeerStat).GetLoad(i)*HotThresholdRatio, f.defaultThresholds[i])
}
f.thresholdsOfStore[storeID] = thresholds
return thresholds.rates
}

// gets the storeIDs, including old region and new region
Expand Down
41 changes: 27 additions & 14 deletions server/statistics/hot_peer_cache_test.go
Expand Up @@ -316,28 +316,28 @@ func TestUpdateHotPeerStat(t *testing.T) {
// skip interval=0
interval := 0
deltaLoads := []float64{0.0, 0.0, 0.0}
MinHotThresholds[RegionReadBytes] = 0.0
MinHotThresholds[RegionReadKeys] = 0.0
MinHotThresholds[RegionReadQueryNum] = 0.0
cache.defaultThresholds[RegionReadBytes] = 0.0
cache.defaultThresholds[RegionReadKeys] = 0.0
cache.defaultThresholds[RegionReadQueryNum] = 0.0

newItem := cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region)
re.Nil(newItem)

// new peer, interval is larger than report interval, but no hot
interval = 10
deltaLoads = []float64{0.0, 0.0, 0.0}
MinHotThresholds[RegionReadBytes] = 1.0
MinHotThresholds[RegionReadKeys] = 1.0
MinHotThresholds[RegionReadQueryNum] = 1.0
cache.defaultThresholds[RegionReadBytes] = 1.0
cache.defaultThresholds[RegionReadKeys] = 1.0
cache.defaultThresholds[RegionReadQueryNum] = 1.0
newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region)
re.Nil(newItem)

// new peer, interval is less than report interval
interval = 4
deltaLoads = []float64{60.0, 60.0, 60.0}
MinHotThresholds[RegionReadBytes] = 0.0
MinHotThresholds[RegionReadKeys] = 0.0
MinHotThresholds[RegionReadQueryNum] = 0.0
cache.defaultThresholds[RegionReadBytes] = 0.0
cache.defaultThresholds[RegionReadKeys] = 0.0
cache.defaultThresholds[RegionReadQueryNum] = 0.0
newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region)
re.NotNil(newItem)
re.Equal(0, newItem.HotDegree)
Expand Down Expand Up @@ -367,9 +367,9 @@ func TestUpdateHotPeerStat(t *testing.T) {
re.Equal(2, newItem.HotDegree)
re.Equal(2*m, newItem.AntiCount)
// sum of interval is larger than report interval, and cold
MinHotThresholds[RegionReadBytes] = 10.0
MinHotThresholds[RegionReadKeys] = 10.0
MinHotThresholds[RegionReadQueryNum] = 10.0
cache.defaultThresholds[RegionReadBytes] = 10.0
cache.defaultThresholds[RegionReadKeys] = 10.0
cache.defaultThresholds[RegionReadQueryNum] = 10.0
cache.updateStat(newItem)
newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region)
re.Equal(1, newItem.HotDegree)
Expand Down Expand Up @@ -399,6 +399,10 @@ func testMetrics(re *require.Assertions, interval, byteRate, expectThreshold flo
cache := NewHotPeerCache(Read)
storeID := uint64(1)
re.GreaterOrEqual(byteRate, MinHotThresholds[RegionReadBytes])
DefaultThresholdsUpdateInterval = 0
defer func() {
DefaultThresholdsUpdateInterval = StoreHeartBeatReportInterval * time.Second
}()
for i := uint64(1); i < TopNN+10; i++ {
var oldItem *HotPeerStat
var item *HotPeerStat
Expand Down Expand Up @@ -428,7 +432,7 @@ func testMetrics(re *require.Assertions, interval, byteRate, expectThreshold flo
}
thresholds := cache.calcHotThresholds(storeID)
if i < TopNN {
re.Equal(MinHotThresholds[RegionReadBytes], thresholds[ByteDim])
re.Equal(cache.defaultThresholds[RegionReadBytes], thresholds[ByteDim])
} else {
re.Equal(expectThreshold, thresholds[ByteDim])
}
Expand Down Expand Up @@ -668,7 +672,7 @@ func TestHotPeerCacheTopN(t *testing.T) {

cache := NewHotPeerCache(Write)
now := time.Now()
for id := uint64(99); id > 0; id-- {
for id := uint64(0); id < 100; id++ {
meta := &metapb.Region{
Id: id,
Peers: []*metapb.Peer{{Id: id, StoreId: 1}},
Expand All @@ -687,10 +691,19 @@ func TestHotPeerCacheTopN(t *testing.T) {
cache.updateStat(stat)
}
}
if id < 60 {
re.Equal(MinHotThresholds[RegionWriteKeys], cache.calcHotThresholds(1)[KeyDim]) // num<topN, threshold still be default
}
}

re.Contains(cache.peersOfStore, uint64(1))
re.True(typeutil.Float64Equal(4000, cache.peersOfStore[1].GetTopNMin(ByteDim).(*HotPeerStat).GetLoad(ByteDim)))
re.Equal(80.0, cache.calcHotThresholds(1)[KeyDim]) // no update, threshold still be the value at first times.
DefaultThresholdsUpdateInterval = 0
defer func() {
DefaultThresholdsUpdateInterval = StoreHeartBeatReportInterval * time.Second
}()
re.Equal(3200.0, cache.calcHotThresholds(1)[KeyDim])
}

func BenchmarkCheckRegionFlow(b *testing.B) {
Expand Down