Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: cached thresholds in hot peer cache #5728

Merged
merged 9 commits into from Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 13 additions & 1 deletion server/schedulers/hot_region_test.go
Expand Up @@ -1450,6 +1450,10 @@ func TestHotCacheKeyThresholds(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := config.NewTestOptions()
statistics.ThresholdsUpdateInterval = 0
defer func() {
statistics.ThresholdsUpdateInterval = 8 * time.Second
}()
{ // only a few regions
tc := mockcluster.NewCluster(ctx, opt)
tc.SetHotRegionCacheHitsThreshold(0)
Expand Down Expand Up @@ -1526,6 +1530,10 @@ func TestHotCacheByteAndKey(t *testing.T) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
tc.SetHotRegionCacheHitsThreshold(0)
statistics.ThresholdsUpdateInterval = 0
defer func() {
statistics.ThresholdsUpdateInterval = 8 * time.Second
}()
regions := []testRegionInfo{}
for i := 1; i <= 500; i++ {
regions = append(regions, testRegionInfo{
Expand Down Expand Up @@ -1717,6 +1725,10 @@ func TestHotCacheCheckRegionFlowWithDifferentThreshold(t *testing.T) {
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "host"})
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
statistics.ThresholdsUpdateInterval = 0
defer func() {
statistics.ThresholdsUpdateInterval = statistics.StoreHeartBeatReportInterval
}()
// some peers are hot, and some are cold #3198

rate := uint64(512 * units.KiB)
Expand All @@ -1726,7 +1738,7 @@ func TestHotCacheCheckRegionFlowWithDifferentThreshold(t *testing.T) {
}
}
items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1)
re.Equal(float64(rate)*statistics.HotThresholdRatio, items[0].GetThresholds()[0])
re.Equal(float64(rate)*statistics.HotThresholdRatio, tc.HotCache.GetThresholds(statistics.Write, items[0].StoreID)[0])
// Threshold of store 1,2,3 is 409.6 units.KiB and others are 1 units.KiB
// Make the hot threshold of some store is high and the others are low
rate = 10 * units.KiB
Expand Down
16 changes: 14 additions & 2 deletions server/statistics/hot_cache.go
Expand Up @@ -196,9 +196,21 @@ func (w *HotCache) GetFilledPeriod(kind RWType) int {
var reportIntervalSecs int
switch kind {
case Write:
reportIntervalSecs = w.writeCache.reportIntervalSecs
reportIntervalSecs = w.writeCache.kind.ReportInterval()
case Read:
reportIntervalSecs = w.readCache.reportIntervalSecs
reportIntervalSecs = w.readCache.kind.ReportInterval()
}
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(reportIntervalSecs)*time.Second).GetFilledPeriod()
}

// GetThresholds returns thresholds.
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
// This is used for test purpose.
func (w *HotCache) GetThresholds(kind RWType, storeID uint64) []float64 {
switch kind {
case Write:
return w.writeCache.calcHotThresholds(storeID)
case Read:
return w.readCache.calcHotThresholds(storeID)
}
return nil
}
67 changes: 40 additions & 27 deletions server/statistics/hot_peer_cache.go
Expand Up @@ -46,6 +46,10 @@ const (
queueCap = 20000
)

// ThresholdsUpdateInterval is the default interval to update thresholds.
// the refresh interval should be less than store heartbeat interval to keep the next calculate must use the latest threshold.
var ThresholdsUpdateInterval = 8 * time.Second

// Denoising is an option to calculate flow base on the real heartbeats. Should
// only turn off by the simulator and the test.
var Denoising = true
Expand All @@ -60,33 +64,34 @@ var MinHotThresholds = [RegionStatCount]float64{
RegionWriteQueryNum: 32,
}

type thresholds struct {
updatedTime time.Time
rates []float64
}

// hotPeerCache saves the hot peer's statistics.
type hotPeerCache struct {
kind RWType
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs
topNTTL time.Duration
reportIntervalSecs int
taskQueue chan FlowItemTask
kind RWType
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs
topNTTL time.Duration
taskQueue chan FlowItemTask
thresholdsOfStore map[uint64]*thresholds // storeID -> thresholds
// TODO: consider to remove store info when store is offline.
}

// NewHotPeerCache creates a hotPeerCache
func NewHotPeerCache(kind RWType) *hotPeerCache {
c := &hotPeerCache{
kind: kind,
peersOfStore: make(map[uint64]*TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
regionsOfStore: make(map[uint64]map[uint64]struct{}),
taskQueue: make(chan FlowItemTask, queueCap),
}
if kind == Write {
c.reportIntervalSecs = WriteReportInterval
} else {
c.reportIntervalSecs = ReadReportInterval
return &hotPeerCache{
kind: kind,
peersOfStore: make(map[uint64]*TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
regionsOfStore: make(map[uint64]map[uint64]struct{}),
taskQueue: make(chan FlowItemTask, queueCap),
thresholdsOfStore: make(map[uint64]*thresholds),
topNTTL: time.Duration(3*kind.ReportInterval()) * time.Second,
}
c.topNTTL = time.Duration(3*c.reportIntervalSecs) * time.Second
return c
}

// TODO: rename RegionStats as PeerStats
Expand Down Expand Up @@ -292,19 +297,27 @@ func (f *hotPeerCache) getOldHotPeerStat(regionID, storeID uint64) *HotPeerStat
}

func (f *hotPeerCache) calcHotThresholds(storeID uint64) []float64 {
t, ok := f.thresholdsOfStore[storeID]
if ok && time.Since(t.updatedTime) <= ThresholdsUpdateInterval {
return t.rates
}
t = &thresholds{
updatedTime: time.Now(),
rates: make([]float64, DimLen),
}
f.thresholdsOfStore[storeID] = t
statKinds := f.kind.RegionStats()
ret := make([]float64, DimLen)
for dim, kind := range statKinds {
ret[dim] = MinHotThresholds[kind]
t.rates[dim] = MinHotThresholds[kind]
}
tn, ok := f.peersOfStore[storeID]
if !ok || tn.Len() < TopNN {
return ret
return t.rates
}
for i := range ret {
ret[i] = math.Max(tn.GetTopNMin(i).(*HotPeerStat).GetLoad(i)*HotThresholdRatio, ret[i])
for i := range t.rates {
t.rates[i] = math.Max(tn.GetTopNMin(i).(*HotPeerStat).GetLoad(i)*HotThresholdRatio, t.rates[i])
}
return ret
return t.rates
}

// gets the storeIDs, including old region and new region
Expand Down Expand Up @@ -465,7 +478,7 @@ func (f *hotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldIt
func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat {
regionStats := f.kind.RegionStats()
// interval is not 0 which is guaranteed by the caller.
if interval.Seconds() >= float64(f.reportIntervalSecs) {
if interval.Seconds() >= float64(f.kind.ReportInterval()) {
f.initItem(newItem)
}
newItem.actionType = Add
Expand Down
19 changes: 18 additions & 1 deletion server/statistics/hot_peer_cache_test.go
Expand Up @@ -312,6 +312,10 @@ func TestUpdateHotPeerStat(t *testing.T) {
region := core.NewRegionInfo(&metapb.Region{Id: regionID, Peers: []*metapb.Peer{peer}}, peer)
// we statistic read peer info from store heartbeat rather than region heartbeat
m := RegionHeartBeatReportInterval / StoreHeartBeatReportInterval
ThresholdsUpdateInterval = 0
defer func() {
ThresholdsUpdateInterval = 8 * time.Second
}()

// skip interval=0
interval := 0
Expand Down Expand Up @@ -399,6 +403,10 @@ func testMetrics(re *require.Assertions, interval, byteRate, expectThreshold flo
cache := NewHotPeerCache(Read)
storeID := uint64(1)
re.GreaterOrEqual(byteRate, MinHotThresholds[RegionReadBytes])
ThresholdsUpdateInterval = 0
defer func() {
ThresholdsUpdateInterval = 8 * time.Second
}()
for i := uint64(1); i < TopNN+10; i++ {
var oldItem *HotPeerStat
var item *HotPeerStat
Expand Down Expand Up @@ -667,7 +675,7 @@ func TestHotPeerCacheTopN(t *testing.T) {

cache := NewHotPeerCache(Write)
now := time.Now()
for id := uint64(99); id > 0; id-- {
for id := uint64(0); id < 100; id++ {
meta := &metapb.Region{
Id: id,
Peers: []*metapb.Peer{{Id: id, StoreId: 1}},
Expand All @@ -686,10 +694,19 @@ func TestHotPeerCacheTopN(t *testing.T) {
cache.updateStat(stat)
}
}
if id < 60 {
re.Equal(MinHotThresholds[RegionWriteKeys], cache.calcHotThresholds(1)[KeyDim]) // num<topN, threshold still be default
}
}

re.Contains(cache.peersOfStore, uint64(1))
re.True(typeutil.Float64Equal(4000, cache.peersOfStore[1].GetTopNMin(ByteDim).(*HotPeerStat).GetLoad(ByteDim)))
re.Equal(32.0, cache.calcHotThresholds(1)[KeyDim]) // no update, threshold still be the value at first times.
ThresholdsUpdateInterval = 0
defer func() {
ThresholdsUpdateInterval = 8 * time.Second
}()
re.Equal(3200.0, cache.calcHotThresholds(1)[KeyDim])
}

func BenchmarkCheckRegionFlow(b *testing.B) {
Expand Down