Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedulers: region should splitted if region is too hot. #6618

Merged
merged 23 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,8 +808,8 @@ func (mc *Cluster) MockRegionInfo(regionID uint64, leaderStoreID uint64,
followerStoreIDs, learnerStoreIDs []uint64, epoch *metapb.RegionEpoch) *core.RegionInfo {
region := &metapb.Region{
Id: regionID,
StartKey: []byte(fmt.Sprintf("%20d", regionID)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In before, regions key will order by [1,2,3,4,5], there are no key range for the buckets.
In this pr, regions key will order by [10,20,30,40,50], I can put some buckets in one regions.

EndKey: []byte(fmt.Sprintf("%20d", regionID+1)),
StartKey: []byte(fmt.Sprintf("%20d0", regionID)),
EndKey: []byte(fmt.Sprintf("%20d0", regionID+1)),
RegionEpoch: epoch,
}
var leader *metapb.Peer
Expand Down
111 changes: 82 additions & 29 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ var (
hotSchedulerSnapshotSenderLimitCounter = schedulerCounter.WithLabelValues(HotRegionName, "snapshot_sender_limit")

// counter related with the split region
hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys")
hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot")
hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success")
hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer")
hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys")
hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot")
hotSchedulerRegionBucketsSingleHotSpotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_single_hot_spot")
hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success")
hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer")
hotSchedulerRegionTooHotNeedSplitCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_is_too_hot_need_split")

hotSchedulerMoveLeaderCounter = schedulerCounter.WithLabelValues(HotRegionName, moveLeader.String())
hotSchedulerMovePeerCounter = schedulerCounter.WithLabelValues(HotRegionName, movePeer.String())
Expand Down Expand Up @@ -159,21 +161,23 @@ func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sche.
// It makes each dim rate or count become `weight` times to the origin value.
func (h *baseHotScheduler) summaryPendingInfluence() {
for id, p := range h.regionPendings {
from := h.stInfos[p.from]
to := h.stInfos[p.to]
maxZombieDur := p.maxZombieDuration
weight, needGC := calcPendingInfluence(p.op, maxZombieDur)

if needGC {
delete(h.regionPendings, id)
continue
}
for _, from := range p.froms {
from := h.stInfos[from]
to := h.stInfos[p.to]
maxZombieDur := p.maxZombieDuration
weight, needGC := calcPendingInfluence(p.op, maxZombieDur)

if needGC {
delete(h.regionPendings, id)
continue
}

if from != nil && weight > 0 {
from.AddInfluence(&p.origin, -weight)
}
if to != nil && weight > 0 {
to.AddInfluence(&p.origin, weight)
if from != nil && weight > 0 {
from.AddInfluence(&p.origin, -weight)
}
if to != nil && weight > 0 {
to.AddInfluence(&p.origin, weight)
}
}
}
for storeID, info := range h.stInfos {
Expand Down Expand Up @@ -214,7 +218,8 @@ var (
// as it implies that this dimension is sufficiently uniform.
stddevThreshold = 0.1

splitBucket = "split-hot-region"
splitBucket = "split-hot-region"
splitProgressiveRank = int64(-5)
)

type hotScheduler struct {
Expand Down Expand Up @@ -294,7 +299,7 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster sche.ScheduleClus
return nil
}

func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool {
func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore []uint64, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool {
regionID := op.RegionID()
_, ok := h.regionPendings[regionID]
if ok {
Expand Down Expand Up @@ -651,6 +656,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
}
snapshotFilter := filter.NewSnapshotSendFilter(bs.GetStores(), constant.Medium)
splitThresholds := bs.sche.conf.getSplitThresholds()
for _, srcStore := range bs.filterSrcStores() {
bs.cur.srcStore = srcStore
srcStoreID := srcStore.GetID()
Expand All @@ -664,6 +670,16 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
}
bs.cur.mainPeerStat = mainPeerStat
if tooHotNeedSplit(srcStore, mainPeerStat, splitThresholds) && bs.GetStoreConfig().IsEnableRegionBucket() {
hotSchedulerRegionTooHotNeedSplitCounter.Inc()
ops := bs.createSplitOperator([]*core.RegionInfo{bs.cur.region}, true /*too hot need to split*/)
if len(ops) > 0 {
bs.ops = ops
bs.cur.calcPeersRate(bs.firstPriority, bs.secondPriority)
bs.best = bs.cur
return ops
}
}

for _, dstStore := range bs.filterDstStores() {
bs.cur.dstStore = dstStore
Expand Down Expand Up @@ -723,24 +739,38 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool {
if bs.best == nil || len(bs.ops) == 0 {
return false
}
if bs.best.srcStore.IsTiFlash() != bs.best.dstStore.IsTiFlash() {
isSplit := bs.ops[0].Kind() == operator.OpSplit
if !isSplit && bs.best.srcStore.IsTiFlash() != bs.best.dstStore.IsTiFlash() {
hotSchedulerNotSameEngineCounter.Inc()
return false
}
maxZombieDur := bs.calcMaxZombieDur()

// TODO: Process operators atomically.
// main peer
srcStoreID := bs.best.srcStore.GetID()
dstStoreID := bs.best.dstStore.GetID()

srcStoreIDs := make([]uint64, 0)
dstStoreID := uint64(0)
if isSplit {
region := bs.GetRegion(bs.ops[0].RegionID())
for id := range region.GetStoreIDs() {
srcStoreIDs = append(srcStoreIDs, id)
}
} else {
srcStoreIDs = append(srcStoreIDs, bs.best.srcStore.GetID())
dstStoreID = bs.best.dstStore.GetID()
}
infl := bs.collectPendingInfluence(bs.best.mainPeerStat)
if !bs.sche.tryAddPendingInfluence(bs.ops[0], srcStoreID, dstStoreID, infl, maxZombieDur) {
if !bs.sche.tryAddPendingInfluence(bs.ops[0], srcStoreIDs, dstStoreID, infl, maxZombieDur) {
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
return false
}
if isSplit {
return true
}
// revert peers
if bs.best.revertPeerStat != nil && len(bs.ops) > 1 {
infl := bs.collectPendingInfluence(bs.best.revertPeerStat)
if !bs.sche.tryAddPendingInfluence(bs.ops[1], dstStoreID, srcStoreID, infl, maxZombieDur) {
if !bs.sche.tryAddPendingInfluence(bs.ops[1], srcStoreIDs, dstStoreID, infl, maxZombieDur) {
return false
}
}
Expand Down Expand Up @@ -1243,7 +1273,7 @@ func (bs *balanceSolver) getMinRate(dim int) float64 {

// betterThan checks if `bs.cur` is a better solution than `old`.
func (bs *balanceSolver) betterThanV1(old *solution) bool {
if old == nil {
if old == nil || bs.cur.progressiveRank <= splitProgressiveRank {
return true
}
if bs.cur.progressiveRank != old.progressiveRank {
Expand Down Expand Up @@ -1435,7 +1465,7 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
}
}
if len(splitRegions) > 0 {
return bs.createSplitOperator(splitRegions)
return bs.createSplitOperator(splitRegions, false /* region is too big need split before move */)
}

srcStoreID := bs.cur.srcStore.GetID()
Expand Down Expand Up @@ -1475,7 +1505,8 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
}

// createSplitOperator creates split operators for the given regions.
func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*operator.Operator {
// isTooHot true indicates that the region is too hot and needs split.
func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo, isTooHot bool) []*operator.Operator {
if len(regions) == 0 {
return nil
}
Expand All @@ -1492,6 +1523,11 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper
hotSchedulerRegionBucketsNotHotCounter.Inc()
return
}
// skip if only one hot buckets exists on this region.
if len(stats) <= 1 && isTooHot {
hotSchedulerRegionBucketsSingleHotSpotCounter.Inc()
return
}
startKey, endKey := region.GetStartKey(), region.GetEndKey()
splitKey := make([][]byte, 0)
for _, stat := range stats {
Expand All @@ -1505,7 +1541,13 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper
// Otherwise, we should append the current start key and end key.
// E.g. [a, b), [c, d) -> [a, b), [c, d) split keys is [a,b,c,d]
if bytes.Equal(stat.StartKey, splitKey[len(splitKey)-1]) {
splitKey[len(splitKey)-1] = stat.EndKey
// If the region is too hot, we should split all the buckets to balance the load.
// otherwise, we should split the buckets that are too hot.
if isTooHot {
splitKey = append(splitKey, stat.EndKey)
} else {
splitKey[len(splitKey)-1] = stat.EndKey
}
} else {
splitKey = append(splitKey, stat.StartKey, stat.EndKey)
}
Expand All @@ -1529,6 +1571,10 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper
for _, region := range regions {
createFunc(region)
}
// the split bucket's priority is highest
if len(operators) > 0 {
bs.cur.progressiveRank = splitProgressiveRank
}
return operators
}

Expand Down Expand Up @@ -1789,3 +1835,10 @@ func dimToString(dim int) string {
func prioritiesToDim(priorities []string) (firstPriority int, secondPriority int) {
return stringToDim(priorities[0]), stringToDim(priorities[1])
}

// tooHotNeedSplit returns true if any dim of the hot region is greater than the store threshold.
func tooHotNeedSplit(store *statistics.StoreLoadDetail, region *statistics.HotPeerStat, splitThresholds float64) bool {
return slice.AnyOf(store.LoadPred.Current.Loads, func(i int) bool {
return region.Loads[i] > store.LoadPred.Current.Loads[i]*splitThresholds
})
}
13 changes: 13 additions & 0 deletions pkg/schedule/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig {
EnableForTiFlash: true,
RankFormulaVersion: "v2",
ForbidRWType: "none",
SplitThresholds: 0.2,
}
cfg.applyPrioritiesConfig(defaultPrioritiesConfig)
return cfg
Expand Down Expand Up @@ -102,6 +103,7 @@ func (conf *hotRegionSchedulerConfig) getValidConf() *hotRegionSchedulerConfig {
EnableForTiFlash: conf.EnableForTiFlash,
RankFormulaVersion: conf.getRankFormulaVersionLocked(),
ForbidRWType: conf.getForbidRWTypeLocked(),
SplitThresholds: conf.SplitThresholds,
}
}

Expand Down Expand Up @@ -143,6 +145,8 @@ type hotRegionSchedulerConfig struct {
RankFormulaVersion string `json:"rank-formula-version"`
// forbid read or write scheduler, only for test
ForbidRWType string `json:"forbid-rw-type,omitempty"`
// SplitThresholds is the threshold to split hot region if the flow of on hot region exceeds it.
SplitThresholds float64 `json:"split-thresholds"`
}

func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) {
Expand Down Expand Up @@ -316,6 +320,12 @@ func (conf *hotRegionSchedulerConfig) IsForbidRWType(rw statistics.RWType) bool
return rw.String() == conf.ForbidRWType
}

func (conf *hotRegionSchedulerConfig) getSplitThresholds() float64 {
conf.RLock()
defer conf.RUnlock()
return conf.SplitThresholds
}

func (conf *hotRegionSchedulerConfig) getForbidRWTypeLocked() string {
switch conf.ForbidRWType {
case statistics.Read.String(), statistics.Write.String():
Expand Down Expand Up @@ -377,6 +387,9 @@ func (conf *hotRegionSchedulerConfig) valid() error {
conf.ForbidRWType != "none" && conf.ForbidRWType != "" {
return errs.ErrSchedulerConfig.FastGenByArgs("invalid forbid-rw-type")
}
if conf.SplitThresholds < 0.01 || conf.SplitThresholds > 1.0 {
return errs.ErrSchedulerConfig.FastGenByArgs("invalid split-thresholds, should be in range [0.01, 1.0]")
}
return nil
}

Expand Down