Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedulers,test: avoid some test branches not being reached and remove schedulePeerPr #8087

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/schedule/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,13 @@ func newGrantHotRegionHandler(config *grantHotRegionSchedulerConfig) http.Handle

func (s *grantHotRegionScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) {
grantHotRegionCounter.Inc()
rw := s.randomRWType()
s.prepareForBalance(rw, cluster)
return s.dispatch(rw, cluster), nil
typ := s.randomType()
s.prepareForBalance(typ, cluster)
return s.dispatch(typ, cluster), nil
}

func (s *grantHotRegionScheduler) dispatch(typ utils.RWType, cluster sche.SchedulerCluster) []*operator.Operator {
stLoadInfos := s.stLoadInfos[buildResourceType(typ, constant.RegionKind)]
func (s *grantHotRegionScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator {
stLoadInfos := s.stLoadInfos[typ]
infos := make([]*statistics.StoreLoadDetail, len(stLoadInfos))
index := 0
for _, info := range stLoadInfos {
Expand Down
96 changes: 52 additions & 44 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,26 @@ const (
// HotRegionName is balance hot region scheduler name.
HotRegionName = "balance-hot-region-scheduler"
// HotRegionType is balance hot region scheduler type.
HotRegionType = "hot-region"
splitHotReadBuckets = "split-hot-read-region"
splitHotWriteBuckets = "split-hot-write-region"
splitProgressiveRank = int64(-5)
minHotScheduleInterval = time.Second
maxHotScheduleInterval = 20 * time.Second
HotRegionType = "hot-region"
splitHotReadBuckets = "split-hot-read-region"
splitHotWriteBuckets = "split-hot-write-region"
splitProgressiveRank = int64(-5)
minHotScheduleInterval = time.Second
maxHotScheduleInterval = 20 * time.Second
defaultPendingAmpFactor = 2.0
defaultStddevThreshold = 0.1
defaultTopnPosition = 10
)

var (
// schedulePeerPr the probability of schedule the hot peer.
schedulePeerPr = 0.66
// pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together
pendingAmpFactor = 2.0
pendingAmpFactor = defaultPendingAmpFactor
// If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension,
// as it implies that this dimension is sufficiently uniform.
stddevThreshold = 0.1
stddevThreshold = defaultStddevThreshold
// topnPosition is the position of the topn peer in the hot peer list.
// We use it to judge whether to schedule the hot peer in some cases.
topnPosition = 10
topnPosition = defaultTopnPosition
// statisticsInterval is the interval to update statistics information.
statisticsInterval = time.Second
)
Expand Down Expand Up @@ -121,7 +122,7 @@ type baseHotScheduler struct {
// this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't
// be selected if its owner region is tracked in this attribute.
regionPendings map[uint64]*pendingInfluence
types []utils.RWType
types []resourceType
r *rand.Rand
updateReadTime time.Time
updateWriteTime time.Time
Expand All @@ -131,26 +132,26 @@ func newBaseHotScheduler(opController *operator.Controller, sampleDuration time.
base := NewBaseScheduler(opController)
ret := &baseHotScheduler{
BaseScheduler: base,
types: []utils.RWType{utils.Write, utils.Read},
regionPendings: make(map[uint64]*pendingInfluence),
stHistoryLoads: statistics.NewStoreHistoryLoads(utils.DimLen, sampleDuration, sampleInterval),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
ret.types = append(ret.types, ty)
ret.stLoadInfos[ty] = map[uint64]*statistics.StoreLoadDetail{}
}
return ret
}

// prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for
// each store, only update read or write load detail
func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.SchedulerCluster) {
func (h *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.SchedulerCluster) {
storeInfos := statistics.SummaryStoreInfos(cluster.GetStores())
h.summaryPendingInfluence(storeInfos)
storesLoads := cluster.GetStoresLoads()
isTraceRegionFlow := cluster.GetSchedulerConfig().IsTraceRegionFlow()

prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, resource constant.ResourceKind) {
prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, rw utils.RWType, resource constant.ResourceKind) {
ty := buildResourceType(rw, resource)
h.stLoadInfos[ty] = statistics.SummaryStoresLoad(
storeInfos,
Expand All @@ -160,23 +161,25 @@ func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.Sched
isTraceRegionFlow,
rw, resource)
}
switch rw {
case utils.Read:
switch typ {
case readLeader, readPeer:
// update read statistics
if time.Since(h.updateReadTime) >= statisticsInterval {
regionRead := cluster.RegionReadStats()
prepare(regionRead, constant.LeaderKind)
prepare(regionRead, constant.RegionKind)
prepare(regionRead, utils.Read, constant.LeaderKind)
prepare(regionRead, utils.Read, constant.RegionKind)
h.updateReadTime = time.Now()
}
case utils.Write:
case writeLeader, writePeer:
// update write statistics
if time.Since(h.updateWriteTime) >= statisticsInterval {
regionWrite := cluster.RegionWriteStats()
prepare(regionWrite, constant.LeaderKind)
prepare(regionWrite, constant.RegionKind)
prepare(regionWrite, utils.Write, constant.LeaderKind)
prepare(regionWrite, utils.Write, constant.RegionKind)
h.updateWriteTime = time.Now()
}
default:
log.Error("invalid resource type", zap.String("type", typ.String()))
}
}

Expand Down Expand Up @@ -223,7 +226,7 @@ func setHotPendingInfluenceMetrics(storeLabel, rwTy, dim string, load float64) {
HotPendingSum.WithLabelValues(storeLabel, rwTy, dim).Set(load)
}

func (h *baseHotScheduler) randomRWType() utils.RWType {
func (h *baseHotScheduler) randomType() resourceType {
return h.types[h.r.Int()%len(h.types)]
}

Expand Down Expand Up @@ -324,24 +327,32 @@ func (h *hotScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {

func (h *hotScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) {
hotSchedulerCounter.Inc()
rw := h.randomRWType()
return h.dispatch(rw, cluster), nil
typ := h.randomType()
return h.dispatch(typ, cluster), nil
}

func (h *hotScheduler) dispatch(typ utils.RWType, cluster sche.SchedulerCluster) []*operator.Operator {
func (h *hotScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator {
h.Lock()
defer h.Unlock()
h.updateHistoryLoadConfig(h.conf.GetHistorySampleDuration(), h.conf.GetHistorySampleInterval())
h.prepareForBalance(typ, cluster)
// it can not move earlier to support to use api and metrics.
if h.conf.IsForbidRWType(typ) {
return nil
}
// IsForbidRWType can not be move earlier to support to use api and metrics.
switch typ {
case utils.Read:
case readLeader, readPeer:
if h.conf.IsForbidRWType(utils.Read) {
return nil
}
return h.balanceHotReadRegions(cluster)
case utils.Write:
return h.balanceHotWriteRegions(cluster)
case writePeer:
if h.conf.IsForbidRWType(utils.Write) {
return nil
}
return h.balanceHotWritePeers(cluster)
case writeLeader:
if h.conf.IsForbidRWType(utils.Write) {
return nil
}
return h.balanceHotWriteLeaders(cluster)
}
return nil
}
Expand Down Expand Up @@ -406,19 +417,16 @@ func (h *hotScheduler) balanceHotReadRegions(cluster sche.SchedulerCluster) []*o
return nil
}

func (h *hotScheduler) balanceHotWriteRegions(cluster sche.SchedulerCluster) []*operator.Operator {
// prefer to balance by peer
s := h.r.Intn(100)
switch {
case s < int(schedulePeerPr*100):
peerSolver := newBalanceSolver(h, cluster, utils.Write, movePeer)
ops := peerSolver.solve()
if len(ops) > 0 && peerSolver.tryAddPendingInfluence() {
return ops
}
default:
func (h *hotScheduler) balanceHotWritePeers(cluster sche.SchedulerCluster) []*operator.Operator {
peerSolver := newBalanceSolver(h, cluster, utils.Write, movePeer)
ops := peerSolver.solve()
if len(ops) > 0 && peerSolver.tryAddPendingInfluence() {
return ops
}
return nil
}

func (h *hotScheduler) balanceHotWriteLeaders(cluster sche.SchedulerCluster) []*operator.Operator {
leaderSolver := newBalanceSolver(h, cluster, utils.Write, transferLeader)
ops := leaderSolver.solve()
if len(ops) > 0 && leaderSolver.tryAddPendingInfluence() {
Expand Down