Skip to content

Commit

Permalink
statistics: cached thresholds in hot peer cache (#5728)
Browse files Browse the repository at this point in the history
ref #5692, ref #5721

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lhy1024 and ti-chi-bot committed Nov 25, 2022
1 parent 7593af1 commit cdcce2d
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 31 deletions.
14 changes: 13 additions & 1 deletion server/schedulers/hot_region_test.go
Expand Up @@ -1450,6 +1450,10 @@ func TestHotCacheKeyThresholds(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := config.NewTestOptions()
statistics.ThresholdsUpdateInterval = 0
defer func() {
statistics.ThresholdsUpdateInterval = 8 * time.Second
}()
{ // only a few regions
tc := mockcluster.NewCluster(ctx, opt)
tc.SetHotRegionCacheHitsThreshold(0)
Expand Down Expand Up @@ -1526,6 +1530,10 @@ func TestHotCacheByteAndKey(t *testing.T) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
tc.SetHotRegionCacheHitsThreshold(0)
statistics.ThresholdsUpdateInterval = 0
defer func() {
statistics.ThresholdsUpdateInterval = 8 * time.Second
}()
regions := []testRegionInfo{}
for i := 1; i <= 500; i++ {
regions = append(regions, testRegionInfo{
Expand Down Expand Up @@ -1717,6 +1725,10 @@ func TestHotCacheCheckRegionFlowWithDifferentThreshold(t *testing.T) {
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "host"})
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
statistics.ThresholdsUpdateInterval = 0
defer func() {
statistics.ThresholdsUpdateInterval = statistics.StoreHeartBeatReportInterval
}()
// some peers are hot, and some are cold #3198

rate := uint64(512 * units.KiB)
Expand All @@ -1726,7 +1738,7 @@ func TestHotCacheCheckRegionFlowWithDifferentThreshold(t *testing.T) {
}
}
items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1)
re.Equal(float64(rate)*statistics.HotThresholdRatio, items[0].GetThresholds()[0])
re.Equal(float64(rate)*statistics.HotThresholdRatio, tc.HotCache.GetThresholds(statistics.Write, items[0].StoreID)[0])
// Threshold of store 1,2,3 is 409.6 units.KiB and others are 1 units.KiB
// Make the hot threshold of some store is high and the others are low
rate = 10 * units.KiB
Expand Down
16 changes: 14 additions & 2 deletions server/statistics/hot_cache.go
Expand Up @@ -196,9 +196,21 @@ func (w *HotCache) GetFilledPeriod(kind RWType) int {
var reportIntervalSecs int
switch kind {
case Write:
reportIntervalSecs = w.writeCache.reportIntervalSecs
reportIntervalSecs = w.writeCache.kind.ReportInterval()
case Read:
reportIntervalSecs = w.readCache.reportIntervalSecs
reportIntervalSecs = w.readCache.kind.ReportInterval()
}
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(reportIntervalSecs)*time.Second).GetFilledPeriod()
}

// GetThresholds returns thresholds.
// This is used for test purpose.
func (w *HotCache) GetThresholds(kind RWType, storeID uint64) []float64 {
switch kind {
case Write:
return w.writeCache.calcHotThresholds(storeID)
case Read:
return w.readCache.calcHotThresholds(storeID)
}
return nil
}
67 changes: 40 additions & 27 deletions server/statistics/hot_peer_cache.go
Expand Up @@ -46,6 +46,10 @@ const (
queueCap = 20000
)

// ThresholdsUpdateInterval is the default interval to update thresholds.
// the refresh interval should be less than store heartbeat interval to keep the next calculate must use the latest threshold.
var ThresholdsUpdateInterval = 8 * time.Second

// Denoising is an option to calculate flow base on the real heartbeats. Should
// only turn off by the simulator and the test.
var Denoising = true
Expand All @@ -60,33 +64,34 @@ var MinHotThresholds = [RegionStatCount]float64{
RegionWriteQueryNum: 32,
}

type thresholds struct {
updatedTime time.Time
rates []float64
}

// hotPeerCache saves the hot peer's statistics.
type hotPeerCache struct {
kind RWType
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs
topNTTL time.Duration
reportIntervalSecs int
taskQueue chan FlowItemTask
kind RWType
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs
topNTTL time.Duration
taskQueue chan FlowItemTask
thresholdsOfStore map[uint64]*thresholds // storeID -> thresholds
// TODO: consider to remove store info when store is offline.
}

// NewHotPeerCache creates a hotPeerCache
func NewHotPeerCache(kind RWType) *hotPeerCache {
c := &hotPeerCache{
kind: kind,
peersOfStore: make(map[uint64]*TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
regionsOfStore: make(map[uint64]map[uint64]struct{}),
taskQueue: make(chan FlowItemTask, queueCap),
}
if kind == Write {
c.reportIntervalSecs = WriteReportInterval
} else {
c.reportIntervalSecs = ReadReportInterval
return &hotPeerCache{
kind: kind,
peersOfStore: make(map[uint64]*TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
regionsOfStore: make(map[uint64]map[uint64]struct{}),
taskQueue: make(chan FlowItemTask, queueCap),
thresholdsOfStore: make(map[uint64]*thresholds),
topNTTL: time.Duration(3*kind.ReportInterval()) * time.Second,
}
c.topNTTL = time.Duration(3*c.reportIntervalSecs) * time.Second
return c
}

// TODO: rename RegionStats as PeerStats
Expand Down Expand Up @@ -292,19 +297,27 @@ func (f *hotPeerCache) getOldHotPeerStat(regionID, storeID uint64) *HotPeerStat
}

func (f *hotPeerCache) calcHotThresholds(storeID uint64) []float64 {
t, ok := f.thresholdsOfStore[storeID]
if ok && time.Since(t.updatedTime) <= ThresholdsUpdateInterval {
return t.rates
}
t = &thresholds{
updatedTime: time.Now(),
rates: make([]float64, DimLen),
}
f.thresholdsOfStore[storeID] = t
statKinds := f.kind.RegionStats()
ret := make([]float64, DimLen)
for dim, kind := range statKinds {
ret[dim] = MinHotThresholds[kind]
t.rates[dim] = MinHotThresholds[kind]
}
tn, ok := f.peersOfStore[storeID]
if !ok || tn.Len() < TopNN {
return ret
return t.rates
}
for i := range ret {
ret[i] = math.Max(tn.GetTopNMin(i).(*HotPeerStat).GetLoad(i)*HotThresholdRatio, ret[i])
for i := range t.rates {
t.rates[i] = math.Max(tn.GetTopNMin(i).(*HotPeerStat).GetLoad(i)*HotThresholdRatio, t.rates[i])
}
return ret
return t.rates
}

// gets the storeIDs, including old region and new region
Expand Down Expand Up @@ -465,7 +478,7 @@ func (f *hotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldIt
func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat {
regionStats := f.kind.RegionStats()
// interval is not 0 which is guaranteed by the caller.
if interval.Seconds() >= float64(f.reportIntervalSecs) {
if interval.Seconds() >= float64(f.kind.ReportInterval()) {
f.initItem(newItem)
}
newItem.actionType = Add
Expand Down
19 changes: 18 additions & 1 deletion server/statistics/hot_peer_cache_test.go
Expand Up @@ -312,6 +312,10 @@ func TestUpdateHotPeerStat(t *testing.T) {
region := core.NewRegionInfo(&metapb.Region{Id: regionID, Peers: []*metapb.Peer{peer}}, peer)
// we statistic read peer info from store heartbeat rather than region heartbeat
m := RegionHeartBeatReportInterval / StoreHeartBeatReportInterval
ThresholdsUpdateInterval = 0
defer func() {
ThresholdsUpdateInterval = 8 * time.Second
}()

// skip interval=0
interval := 0
Expand Down Expand Up @@ -399,6 +403,10 @@ func testMetrics(re *require.Assertions, interval, byteRate, expectThreshold flo
cache := NewHotPeerCache(Read)
storeID := uint64(1)
re.GreaterOrEqual(byteRate, MinHotThresholds[RegionReadBytes])
ThresholdsUpdateInterval = 0
defer func() {
ThresholdsUpdateInterval = 8 * time.Second
}()
for i := uint64(1); i < TopNN+10; i++ {
var oldItem *HotPeerStat
var item *HotPeerStat
Expand Down Expand Up @@ -667,7 +675,7 @@ func TestHotPeerCacheTopN(t *testing.T) {

cache := NewHotPeerCache(Write)
now := time.Now()
for id := uint64(99); id > 0; id-- {
for id := uint64(0); id < 100; id++ {
meta := &metapb.Region{
Id: id,
Peers: []*metapb.Peer{{Id: id, StoreId: 1}},
Expand All @@ -686,10 +694,19 @@ func TestHotPeerCacheTopN(t *testing.T) {
cache.updateStat(stat)
}
}
if id < 60 {
re.Equal(MinHotThresholds[RegionWriteKeys], cache.calcHotThresholds(1)[KeyDim]) // num<topN, threshold still be default
}
}

re.Contains(cache.peersOfStore, uint64(1))
re.True(typeutil.Float64Equal(4000, cache.peersOfStore[1].GetTopNMin(ByteDim).(*HotPeerStat).GetLoad(ByteDim)))
re.Equal(32.0, cache.calcHotThresholds(1)[KeyDim]) // no update, threshold still be the value at first times.
ThresholdsUpdateInterval = 0
defer func() {
ThresholdsUpdateInterval = 8 * time.Second
}()
re.Equal(3200.0, cache.calcHotThresholds(1)[KeyDim])
}

func BenchmarkCheckRegionFlow(b *testing.B) {
Expand Down

0 comments on commit cdcce2d

Please sign in to comment.