Skip to content

Commit

Permalink
scheduler: cache history loads in hot region scheduler (#6314) (#6375)
Browse files Browse the repository at this point in the history
close #6297, ref #6314, ref #6328, ref tikv/tikv#14458

Signed-off-by: bufferflies <1045931706@qq.com>

Co-authored-by: bufferflies <1045931706@qq.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Apr 27, 2023
1 parent 387d8bb commit 67fedd8
Show file tree
Hide file tree
Showing 12 changed files with 443 additions and 67 deletions.
3 changes: 3 additions & 0 deletions pkg/core/constant/kind.go
Expand Up @@ -66,6 +66,9 @@ const (
RegionKind
// WitnessKind indicates the witness kind resource
WitnessKind

// ResourceKindLen represents the ResourceKind count
ResourceKindLen
)

func (k ResourceKind) String() string {
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/config/config.go
Expand Up @@ -97,6 +97,7 @@ type StoreConfig interface {
CheckRegionSize(uint64, uint64) error
CheckRegionKeys(uint64, uint64) error
IsEnableRegionBucket() bool
IsRaftKV2() bool
// for test purpose
SetRegionBucketEnabled(bool)
}
121 changes: 94 additions & 27 deletions pkg/schedule/schedulers/hot_region.go
Expand Up @@ -86,7 +86,8 @@ type baseHotScheduler struct {
stInfos map[uint64]*statistics.StoreSummaryInfo
// temporary states but exported to API or metrics
// Every time `Schedule()` will recalculate it.
stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail
stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail
stHistoryLoads *statistics.StoreHistoryLoads
// temporary states
// Every time `Schedule()` will recalculate it.
storesLoads map[uint64][]float64
Expand All @@ -106,6 +107,7 @@ func newBaseHotScheduler(opController *schedule.OperatorController) *baseHotSche
BaseScheduler: base,
types: []statistics.RWType{statistics.Write, statistics.Read},
regionPendings: make(map[uint64]*pendingInfluence),
stHistoryLoads: statistics.NewStoreHistoryLoads(statistics.DimLen),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
Expand All @@ -127,6 +129,7 @@ func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sched
h.stLoadInfos[ty] = statistics.SummaryStoresLoad(
h.stInfos,
h.storesLoads,
h.stHistoryLoads,
regionStats,
isTraceRegionFlow,
rw, resource)
Expand Down Expand Up @@ -277,7 +280,6 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster)
if h.conf.IsForbidRWType(typ) {
return nil
}

switch typ {
case statistics.Read:
return h.balanceHotReadRegions(cluster)
Expand Down Expand Up @@ -461,6 +463,8 @@ type balanceSolver struct {
minorDecRatio float64
maxPeerNum int
minHotDegree int
// todo: remove this after testing more scene in the single rocksdb
isRaftKV2 bool

firstPriorityV2Ratios *rankV2Ratios
secondPriorityV2Ratios *rankV2Ratios
Expand All @@ -474,6 +478,7 @@ type balanceSolver struct {
betterThan func(*solution) bool
rankToDimString func() string
checkByPriorityAndTolerance func(loads []float64, f func(int) bool) bool
checkHistoryLoadsByPriority func(loads [][]float64, f func(int) bool) bool
}

func (bs *balanceSolver) init() {
Expand Down Expand Up @@ -514,6 +519,7 @@ func (bs *balanceSolver) init() {
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetOpts().GetHotRegionCacheHitsThreshold()
bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2()

switch bs.sche.conf.GetRankFormulaVersion() {
case "v1":
Expand All @@ -538,10 +544,13 @@ func (bs *balanceSolver) pickCheckPolicyV1() {
switch {
case bs.resourceTy == writeLeader:
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly
bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceFirstOnly
case bs.sche.conf.IsStrictPickingStoreEnabled():
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceAllOf
bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceAllOf
default:
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly
bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceFirstOnly
}
}

Expand Down Expand Up @@ -610,7 +619,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
if !bs.isValid() {
return nil
}

bs.cur = &solution{}
tryUpdateBestSolution := func() {
if label, ok := bs.filterUniformStore(); ok {
Expand Down Expand Up @@ -789,12 +797,20 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai
continue
}

if bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
hotSchedulerResultCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc()
if !bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("src-store-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}
// only raftkv2 needs to check the history loads.
if bs.isRaftKV2 {
if !bs.checkSrcHistoryLoadsByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, srcToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("src-store-history-loads-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}
}

ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
}
return ret
}
Expand All @@ -805,6 +821,17 @@ func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *sta
})
}

func (bs *balanceSolver) checkSrcHistoryLoadsByPriorityAndTolerance(current, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool {
if len(current.HistoryLoads) == 0 {
return true
}
return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool {
return slice.AllOf(current.HistoryLoads[i], func(j int) bool {
return current.HistoryLoads[i][j] > toleranceRatio*expectLoad.HistoryLoads[i][j]
})
})
}

// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
// The returned hotPeer count in controlled by `max-peer-number`.
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) (ret []*statistics.HotPeerStat) {
Expand Down Expand Up @@ -1003,12 +1030,20 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st
}
if filter.Target(bs.GetOpts(), store, filters) {
id := store.GetID()
if bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
hotSchedulerResultCounter.WithLabelValues("dst-store-failed", strconv.FormatUint(id, 10)).Inc()
if !bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("dst-store-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}
// only raftkv2 needs to check history loads
if bs.isRaftKV2 {
if !bs.checkDstHistoryLoadsByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, dstToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("dst-store-history-loads-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}
}

hotSchedulerResultCounter.WithLabelValues("dst-store-succ-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
ret[id] = detail
}
}
return ret
Expand All @@ -1020,6 +1055,17 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist
})
}

func (bs *balanceSolver) checkDstHistoryLoadsByPriorityAndTolerance(current, expect *statistics.StoreLoad, toleranceRatio float64) bool {
if len(current.HistoryLoads) == 0 {
return true
}
return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool {
return slice.AllOf(current.HistoryLoads[i], func(j int) bool {
return current.HistoryLoads[i][j]*toleranceRatio < expect.HistoryLoads[i][j]
})
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f func(int) bool) bool {
return slice.AllOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
Expand All @@ -1029,6 +1075,15 @@ func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f fun
})
}

func (bs *balanceSolver) checkHistoryLoadsByPriorityAndToleranceAllOf(loads [][]float64, f func(int) bool) bool {
return slice.AllOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
return f(i)
}
return true
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f func(int) bool) bool {
return slice.AnyOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
Expand All @@ -1038,10 +1093,23 @@ func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f fun
})
}

func (bs *balanceSolver) checkHistoryByPriorityAndToleranceAnyOf(loads [][]float64, f func(int) bool) bool {
return slice.AnyOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
return f(i)
}
return false
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceFirstOnly(loads []float64, f func(int) bool) bool {
return f(bs.firstPriority)
}

func (bs *balanceSolver) checkHistoryLoadsByPriorityAndToleranceFirstOnly(_ [][]float64, f func(int) bool) bool {
return f(bs.firstPriority)
}

func (bs *balanceSolver) enableExpectation() bool {
return bs.sche.conf.GetDstToleranceRatio() > 0 && bs.sche.conf.GetSrcToleranceRatio() > 0
}
Expand Down Expand Up @@ -1624,30 +1692,29 @@ func (ty opType) String() string {

type resourceType int

const (
writePeer resourceType = iota
writeLeader
readPeer
readLeader
resourceTypeLen
)

// String implements fmt.Stringer interface.
func (ty resourceType) String() string {
switch ty {
func (rt resourceType) String() string {
switch rt {
case writePeer:
return "write-peer"
case writeLeader:
return "write-leader"
case readPeer:
return "read-peer"
case readLeader:
return "read-leader"
case readPeer:
return "read-peer"
default:
return ""
return "unknown"
}
}

const (
writePeer resourceType = iota
writeLeader
readPeer
readLeader
resourceTypeLen
)

func toResourceType(rwTy statistics.RWType, opTy opType) resourceType {
switch rwTy {
case statistics.Write:
Expand Down

0 comments on commit 67fedd8

Please sign in to comment.