Skip to content

Commit

Permalink
schedulers: region should splitted if region is too hot. (tikv#6618)
Browse files Browse the repository at this point in the history
close tikv#6619

Signed-off-by: bufferflies <1045931706@qq.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Dec 1, 2023
1 parent 60f3364 commit aa46114
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 105 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *Service) ScatterRegions(ctx context.Context, request *schedulingpb.Scat
return &schedulingpb.ScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil
}

opsCount, failures, err := c.GetRegionScatterer().ScatterRegionsByID(request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit())
opsCount, failures, err := c.GetRegionScatterer().ScatterRegionsByID(request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()))
if err != nil {
header := s.errorHeader(&schedulingpb.Error{
Type: schedulingpb.ErrorType_UNKNOWN,
Expand Down
4 changes: 2 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,8 +822,8 @@ func (mc *Cluster) MockRegionInfo(regionID uint64, leaderStoreID uint64,
followerStoreIDs, learnerStoreIDs []uint64, epoch *metapb.RegionEpoch) *core.RegionInfo {
region := &metapb.Region{
Id: regionID,
StartKey: []byte(fmt.Sprintf("%20d", regionID)),
EndKey: []byte(fmt.Sprintf("%20d", regionID+1)),
StartKey: []byte(fmt.Sprintf("%20d0", regionID)),
EndKey: []byte(fmt.Sprintf("%20d0", regionID+1)),
RegionEpoch: epoch,
}
var leader *metapb.Peer
Expand Down
86 changes: 63 additions & 23 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ var (
hotSchedulerSnapshotSenderLimitCounter = schedulerCounter.WithLabelValues(HotRegionName, "snapshot_sender_limit")

// counter related with the split region
hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys")
hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot")
hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success")
hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer")
hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys")
hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot")
hotSchedulerRegionBucketsSingleHotSpotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_single_hot_spot")
hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success")
hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer")
hotSchedulerRegionTooHotNeedSplitCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_is_too_hot_need_split")

hotSchedulerMoveLeaderCounter = schedulerCounter.WithLabelValues(HotRegionName, moveLeader.String())
hotSchedulerMovePeerCounter = schedulerCounter.WithLabelValues(HotRegionName, movePeer.String())
Expand Down Expand Up @@ -159,21 +161,23 @@ func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.Sched
// It makes each dim rate or count become `weight` times to the origin value.
func (h *baseHotScheduler) summaryPendingInfluence() {
for id, p := range h.regionPendings {
from := h.stInfos[p.from]
to := h.stInfos[p.to]
maxZombieDur := p.maxZombieDuration
weight, needGC := calcPendingInfluence(p.op, maxZombieDur)

if needGC {
delete(h.regionPendings, id)
continue
}
for _, from := range p.froms {
from := h.stInfos[from]
to := h.stInfos[p.to]
maxZombieDur := p.maxZombieDuration
weight, needGC := calcPendingInfluence(p.op, maxZombieDur)

if needGC {
delete(h.regionPendings, id)
continue
}

if from != nil && weight > 0 {
from.AddInfluence(&p.origin, -weight)
}
if to != nil && weight > 0 {
to.AddInfluence(&p.origin, weight)
if from != nil && weight > 0 {
from.AddInfluence(&p.origin, -weight)
}
if to != nil && weight > 0 {
to.AddInfluence(&p.origin, weight)
}
}
}
for storeID, info := range h.stInfos {
Expand Down Expand Up @@ -214,7 +218,8 @@ var (
// as it implies that this dimension is sufficiently uniform.
stddevThreshold = 0.1

splitBucket = "split-hot-region"
splitBucket = "split-hot-region"
splitProgressiveRank = int64(-5)
)

type hotScheduler struct {
Expand Down Expand Up @@ -293,6 +298,7 @@ func (h *hotScheduler) dispatch(typ utils.RWType, cluster sche.SchedulerCluster)
}
return nil
}

func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore []uint64, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool {
regionID := op.RegionID()
_, ok := h.regionPendings[regionID]
Expand Down Expand Up @@ -650,6 +656,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
}
snapshotFilter := filter.NewSnapshotSendFilter(bs.GetStores(), constant.Medium)
splitThresholds := bs.sche.conf.getSplitThresholds()
for _, srcStore := range bs.filterSrcStores() {
bs.cur.srcStore = srcStore
srcStoreID := srcStore.GetID()
Expand All @@ -663,6 +670,16 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
}
bs.cur.mainPeerStat = mainPeerStat
if tooHotNeedSplit(srcStore, mainPeerStat, splitThresholds) && bs.GetStoreConfig().IsEnableRegionBucket() {
hotSchedulerRegionTooHotNeedSplitCounter.Inc()
ops := bs.createSplitOperator([]*core.RegionInfo{bs.cur.region}, true /*too hot need to split*/)
if len(ops) > 0 {
bs.ops = ops
bs.cur.calcPeersRate(bs.firstPriority, bs.secondPriority)
bs.best = bs.cur
return ops
}
}

for _, dstStore := range bs.filterDstStores() {
bs.cur.dstStore = dstStore
Expand Down Expand Up @@ -1259,7 +1276,7 @@ func (bs *balanceSolver) getMinRate(dim int) float64 {

// betterThan checks if `bs.cur` is a better solution than `old`.
func (bs *balanceSolver) betterThanV1(old *solution) bool {
if old == nil {
if old == nil || bs.cur.progressiveRank <= splitProgressiveRank {
return true
}
if bs.cur.progressiveRank != old.progressiveRank {
Expand Down Expand Up @@ -1451,7 +1468,7 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
}
}
if len(splitRegions) > 0 {
return bs.createSplitOperator(splitRegions)
return bs.createSplitOperator(splitRegions, false /* region is too big need split before move */)
}

srcStoreID := bs.cur.srcStore.GetID()
Expand Down Expand Up @@ -1491,7 +1508,8 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
}

// createSplitOperator creates split operators for the given regions.
func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*operator.Operator {
// isTooHot true indicates that the region is too hot and needs split.
func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo, isTooHot bool) []*operator.Operator {
if len(regions) == 0 {
return nil
}
Expand All @@ -1508,6 +1526,11 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper
hotSchedulerRegionBucketsNotHotCounter.Inc()
return
}
// skip if only one hot buckets exists on this region.
if len(stats) <= 1 && isTooHot {
hotSchedulerRegionBucketsSingleHotSpotCounter.Inc()
return
}
startKey, endKey := region.GetStartKey(), region.GetEndKey()
splitKey := make([][]byte, 0)
for _, stat := range stats {
Expand All @@ -1521,7 +1544,13 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper
// Otherwise, we should append the current start key and end key.
// E.g. [a, b), [c, d) -> [a, b), [c, d) split keys is [a,b,c,d]
if bytes.Equal(stat.StartKey, splitKey[len(splitKey)-1]) {
splitKey[len(splitKey)-1] = stat.EndKey
// If the region is too hot, we should split all the buckets to balance the load.
// otherwise, we should split the buckets that are too hot.
if isTooHot {
splitKey = append(splitKey, stat.EndKey)
} else {
splitKey[len(splitKey)-1] = stat.EndKey
}
} else {
splitKey = append(splitKey, stat.StartKey, stat.EndKey)
}
Expand All @@ -1545,6 +1574,10 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper
for _, region := range regions {
createFunc(region)
}
// the split bucket's priority is highest
if len(operators) > 0 {
bs.cur.progressiveRank = splitProgressiveRank
}
return operators
}

Expand Down Expand Up @@ -1804,3 +1837,10 @@ func dimToString(dim int) string {
func prioritiesToDim(priorities []string) (firstPriority int, secondPriority int) {
return stringToDim(priorities[0]), stringToDim(priorities[1])
}

// tooHotNeedSplit returns true if any dim of the hot region is greater than the store threshold.
func tooHotNeedSplit(store *statistics.StoreLoadDetail, region *statistics.HotPeerStat, splitThresholds float64) bool {
return slice.AnyOf(store.LoadPred.Current.Loads, func(i int) bool {
return region.Loads[i] > store.LoadPred.Current.Loads[i]*splitThresholds
})
}
13 changes: 13 additions & 0 deletions pkg/schedule/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig {
EnableForTiFlash: true,
RankFormulaVersion: "v2",
ForbidRWType: "none",
SplitThresholds: 0.2,
}
cfg.applyPrioritiesConfig(defaultPrioritiesConfig)
return cfg
Expand Down Expand Up @@ -102,6 +103,7 @@ func (conf *hotRegionSchedulerConfig) getValidConf() *hotRegionSchedulerConfig {
EnableForTiFlash: conf.EnableForTiFlash,
RankFormulaVersion: conf.getRankFormulaVersionLocked(),
ForbidRWType: conf.getForbidRWTypeLocked(),
SplitThresholds: conf.SplitThresholds,
}
}

Expand Down Expand Up @@ -143,6 +145,8 @@ type hotRegionSchedulerConfig struct {
RankFormulaVersion string `json:"rank-formula-version"`
// forbid read or write scheduler, only for test
ForbidRWType string `json:"forbid-rw-type,omitempty"`
// SplitThresholds is the threshold to split hot region if the flow of on hot region exceeds it.
SplitThresholds float64 `json:"split-thresholds"`
}

func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) {
Expand Down Expand Up @@ -316,6 +320,12 @@ func (conf *hotRegionSchedulerConfig) IsForbidRWType(rw utils.RWType) bool {
return rw.String() == conf.ForbidRWType
}

func (conf *hotRegionSchedulerConfig) getSplitThresholds() float64 {
conf.RLock()
defer conf.RUnlock()
return conf.SplitThresholds
}

func (conf *hotRegionSchedulerConfig) getForbidRWTypeLocked() string {
switch conf.ForbidRWType {
case utils.Read.String(), utils.Write.String():
Expand Down Expand Up @@ -377,6 +387,9 @@ func (conf *hotRegionSchedulerConfig) valid() error {
conf.ForbidRWType != "none" && conf.ForbidRWType != "" {
return errs.ErrSchedulerConfig.FastGenByArgs("invalid forbid-rw-type")
}
if conf.SplitThresholds < 0.01 || conf.SplitThresholds > 1.0 {
return errs.ErrSchedulerConfig.FastGenByArgs("invalid split-thresholds, should be in range [0.01, 1.0]")
}
return nil
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) {
op.Start()
op.SetStatusReachTime(operator.CREATED, time.Now().Add(-5*utils.StoreHeartBeatReportInterval*time.Second))
op.SetStatusReachTime(operator.STARTED, time.Now().Add((-5*utils.StoreHeartBeatReportInterval+1)*time.Second))
return newPendingInfluence(op, 2, 4, statistics.Influence{}, hb.conf.GetStoreStatZombieDuration())
return newPendingInfluence(op, []uint64{2}, 4, statistics.Influence{}, hb.conf.GetStoreStatZombieDuration())
}
justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence {
infl := notDoneOpInfluence(region, ty)
Expand Down Expand Up @@ -2372,6 +2372,14 @@ func TestConfigValidation(t *testing.T) {
hc.ForbidRWType = "test"
err = hc.valid()
re.Error(err)

hc.SplitThresholds = 0
err = hc.valid()
re.Error(err)

hc.SplitThresholds = 1.1
err = hc.valid()
re.Error(err)
}

type maxZombieDurTestCase struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/hot_region_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int {

// betterThan checks if `bs.cur` is a better solution than `old`.
func (bs *balanceSolver) betterThanV2(old *solution) bool {
if old == nil {
if old == nil || bs.cur.progressiveRank <= splitProgressiveRank {
return true
}
if bs.cur.progressiveRank != old.progressiveRank {
Expand Down
6 changes: 1 addition & 5 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,11 +644,7 @@ func (o *PersistOptions) IsTikvRegionSplitEnabled() bool {

// GetMaxMovableHotPeerSize returns the max movable hot peer size.
func (o *PersistOptions) GetMaxMovableHotPeerSize() int64 {
size := o.GetScheduleConfig().MaxMovableHotPeerSize
if size <= 0 {
size = defaultMaxMovableHotPeerSize
}
return size
return o.GetScheduleConfig().MaxMovableHotPeerSize
}

// IsDebugMetricsEnabled returns if debug metrics is enabled.
Expand Down
72 changes: 0 additions & 72 deletions tests/pdctl/hot/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,75 +494,3 @@ func TestHistoryHotRegions(t *testing.T) {
re.NoError(err)
re.Error(json.Unmarshal(output, &hotRegions))
}

func TestBuckets(t *testing.T) {
// TODO: support forward bucket request in scheduling server in the future.
re := require.New(t)
statistics.Denoising = false
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 1, func(cfg *config.Config, serverName string) { cfg.Schedule.HotRegionCacheHitsThreshold = 0 })
re.NoError(err)
err = cluster.RunInitialServers()
re.NoError(err)
cluster.WaitLeader()
pdAddr := cluster.GetConfig().GetClientURL()
cmd := pdctlCmd.GetRootCmd()

stores := []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 2,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
}

leaderServer := cluster.GetLeaderServer()
re.NoError(leaderServer.BootstrapCluster())
for _, store := range stores {
tests.MustPutStore(re, cluster, store)
}
defer cluster.Destroy()

tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval))
tests.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval))
tests.MustPutRegion(re, cluster, 3, 1, []byte("e"), []byte("f"), core.SetWrittenBytes(9000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval))

stats := &metapb.BucketStats{
ReadBytes: []uint64{10 * units.MiB},
ReadKeys: []uint64{11 * units.MiB},
ReadQps: []uint64{0},
WriteKeys: []uint64{12 * units.MiB},
WriteBytes: []uint64{13 * units.MiB},
WriteQps: []uint64{0},
}
buckets := tests.MustReportBuckets(re, cluster, 1, []byte("a"), []byte("b"), stats)
args := []string{"-u", pdAddr, "hot", "buckets", "1"}
output, err := pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
hotBuckets := handler.HotBucketsResponse{}
re.NoError(json.Unmarshal(output, &hotBuckets))
re.Len(hotBuckets, 1)
re.Len(hotBuckets[1], 1)
item := hotBuckets[1][0]
re.Equal(core.HexRegionKeyStr(buckets.GetKeys()[0]), item.StartKey)
re.Equal(core.HexRegionKeyStr(buckets.GetKeys()[1]), item.EndKey)
re.Equal(1, item.HotDegree)
interval := buckets.GetPeriodInMs() / 1000
re.Equal(buckets.GetStats().ReadBytes[0]/interval, item.ReadBytes)
re.Equal(buckets.GetStats().ReadKeys[0]/interval, item.ReadKeys)
re.Equal(buckets.GetStats().WriteBytes[0]/interval, item.WriteBytes)
re.Equal(buckets.GetStats().WriteKeys[0]/interval, item.WriteKeys)

args = []string{"-u", pdAddr, "hot", "buckets", "2"}
output, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
hotBuckets = handler.HotBucketsResponse{}
re.NoError(json.Unmarshal(output, &hotBuckets))
re.Nil(hotBuckets[2])
}

0 comments on commit aa46114

Please sign in to comment.