Skip to content

Commit

Permalink
schedulers: add balance witness scheduler
Browse files Browse the repository at this point in the history
close #5762

Signed-off-by: Wenbo Zhang <ethercflow@gmail.com>
  • Loading branch information
ethercflow committed Dec 9, 2022
1 parent 776003b commit 0a352b3
Show file tree
Hide file tree
Showing 27 changed files with 819 additions and 37 deletions.
34 changes: 34 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,23 @@ func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int) {
mc.PutStore(store)
}

// AddWitnessStore adds store with specified count of witness.
func (mc *Cluster) AddWitnessStore(storeID uint64, witnessCount int) {
stats := &pdpb.StoreStats{}
stats.Capacity = defaultStoreCapacity
stats.UsedSize = 0
stats.Available = stats.Capacity
store := core.NewStoreInfo(
&metapb.Store{Id: storeID},
core.SetStoreStats(stats),
core.SetWitnessCount(witnessCount),
core.SetLastHeartbeatTS(time.Now()),
)
mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
mc.PutStore(store)
}

// AddRegionStoreWithLeader adds store with specified count of region and leader.
func (mc *Cluster) AddRegionStoreWithLeader(storeID uint64, regionCount int, leaderCounts ...int) {
leaderCount := regionCount
Expand Down Expand Up @@ -345,6 +362,14 @@ func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderStoreID uint64, otherP
return region
}

// AddLeaderRegionWithWitness adds region with specified leader and followers and witness.
func (mc *Cluster) AddLeaderRegionWithWitness(regionID uint64, leaderStoreID uint64, otherPeerStoreIDs []uint64, witnessStoreID uint64) *core.RegionInfo {
origin := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
region := origin.Clone(core.SetApproximateSize(defaultRegionSize/units.MiB), core.SetApproximateKeys(10), core.WithWitness(origin.GetStorePeer(witnessStoreID).Id))
mc.PutRegion(region)
return region
}

// AddLightWeightLeaderRegion adds a light-wight region with specified leader and followers.
func (mc *Cluster) AddLightWeightLeaderRegion(regionID uint64, leaderStoreID uint64, otherPeerStoreIDs ...uint64) *core.RegionInfo {
region := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
Expand Down Expand Up @@ -564,6 +589,15 @@ func (mc *Cluster) UpdatePendingPeerCount(storeID uint64, pendingPeerCount int)
mc.PutStore(newStore)
}

// UpdateWitnessCount updates store witness count.
func (mc *Cluster) UpdateWitnessCount(storeID uint64, witnessCount int) {
store := mc.GetStore(storeID)
newStore := store.Clone(
core.SetWitnessCount(witnessCount),
)
mc.PutStore(newStore)
}

// UpdateStorageRatio updates store storage ratio count.
func (mc *Cluster) UpdateStorageRatio(storeID uint64, usedRatio, availableRatio float64) {
mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) {
Expand Down
5 changes: 5 additions & 0 deletions server/api/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ func (h *schedulerHandler) CreateScheduler(w http.ResponseWriter, r *http.Reques
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
case schedulers.BalanceWitnessName:
if err := h.AddBalanceWitnessScheduler(); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
case schedulers.HotRegionName:
if err := h.AddBalanceHotRegionScheduler(); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
Expand Down
10 changes: 10 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,11 +1003,21 @@ func (c *RaftCluster) RandLearnerRegions(storeID uint64, ranges []core.KeyRange)
return c.core.RandLearnerRegions(storeID, ranges)
}

// RandWitnessRegions returns some random regions that has a witness peer on the store.
func (c *RaftCluster) RandWitnessRegions(storeID uint64, ranges []core.KeyRange) []*core.RegionInfo {
return c.core.RandWitnessRegions(storeID, ranges)
}

// GetLeaderStore returns all stores that contains the region's leader peer.
func (c *RaftCluster) GetLeaderStore(region *core.RegionInfo) *core.StoreInfo {
return c.core.GetLeaderStore(region)
}

// GetNonWitnessVoterStores returns all stores that contains the region's non-witness voter peer.
func (c *RaftCluster) GetNonWitnessVoterStores(region *core.RegionInfo) []*core.StoreInfo {
return c.core.GetNonWitnessVoterStores(region)
}

// GetFollowerStores returns all stores that contains the region's follower peer.
func (c *RaftCluster) GetFollowerStores(region *core.RegionInfo) []*core.StoreInfo {
return c.core.GetFollowerStores(region)
Expand Down
7 changes: 7 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,8 @@ type ScheduleConfig struct {
LeaderSchedulePolicy string `toml:"leader-schedule-policy" json:"leader-schedule-policy"`
// RegionScheduleLimit is the max coexist region schedules.
RegionScheduleLimit uint64 `toml:"region-schedule-limit" json:"region-schedule-limit"`
// WitnessScheduleLimit is the max coexist witness schedules.
WitnessScheduleLimit uint64 `toml:"witness-schedule-limit" json:"witness-schedule-limit"`
// ReplicaScheduleLimit is the max coexist replica schedules.
ReplicaScheduleLimit uint64 `toml:"replica-schedule-limit" json:"replica-schedule-limit"`
// MergeScheduleLimit is the max coexist merge schedules.
Expand Down Expand Up @@ -807,6 +809,7 @@ const (
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 2048
defaultWitnessScheduleLimit = 2048
defaultReplicaScheduleLimit = 64
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 4
Expand Down Expand Up @@ -851,6 +854,9 @@ func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
if !meta.IsDefined("region-schedule-limit") {
adjustUint64(&c.RegionScheduleLimit, defaultRegionScheduleLimit)
}
if !meta.IsDefined("witness-schedule-limit") {
adjustUint64(&c.WitnessScheduleLimit, defaultWitnessScheduleLimit)
}
if !meta.IsDefined("replica-schedule-limit") {
adjustUint64(&c.ReplicaScheduleLimit, defaultReplicaScheduleLimit)
}
Expand Down Expand Up @@ -1052,6 +1058,7 @@ type SchedulerConfig struct {
var DefaultSchedulers = SchedulerConfigs{
{Type: "balance-region"},
{Type: "balance-leader"},
{Type: "balance-witness-scheduler"},
{Type: "hot-region"},
{Type: "split-bucket"},
}
Expand Down
6 changes: 6 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ const (
maxMergeRegionKeysKey = "schedule.max-merge-region-keys"
leaderScheduleLimitKey = "schedule.leader-schedule-limit"
regionScheduleLimitKey = "schedule.region-schedule-limit"
witnessScheduleLimitKey = "schedule.witness-schedule-limit"
replicaRescheduleLimitKey = "schedule.replica-schedule-limit"
mergeScheduleLimitKey = "schedule.merge-schedule-limit"
hotRegionScheduleLimitKey = "schedule.hot-region-schedule-limit"
Expand Down Expand Up @@ -396,6 +397,11 @@ func (o *PersistOptions) GetRegionScheduleLimit() uint64 {
return o.getTTLUintOr(regionScheduleLimitKey, o.GetScheduleConfig().RegionScheduleLimit)
}

// GetWitnessScheduleLimit returns the limit for region schedule.
func (o *PersistOptions) GetWitnessScheduleLimit() uint64 {
return o.getTTLUintOr(witnessScheduleLimitKey, o.GetScheduleConfig().WitnessScheduleLimit)
}

// GetReplicaScheduleLimit returns the limit for replica schedule.
func (o *PersistOptions) GetReplicaScheduleLimit() uint64 {
return o.getTTLUintOr(replicaRescheduleLimitKey, o.GetScheduleConfig().ReplicaScheduleLimit)
Expand Down
15 changes: 15 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ func (bc *BasicCluster) GetRegionStores(region *RegionInfo) []*StoreInfo {
return Stores
}

// GetNonWitnessVoterStores returns all Stores that contains the non-witness's voter peer.
func (bc *BasicCluster) GetNonWitnessVoterStores(region *RegionInfo) []*StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
var Stores []*StoreInfo
for id := range region.GetNonWitnessVoters() {
if store := bc.Stores.GetStore(id); store != nil {
Stores = append(Stores, store)
}
}
return Stores
}

// GetFollowerStores returns all Stores that contains the region's follower peer.
func (bc *BasicCluster) GetFollowerStores(region *RegionInfo) []*StoreInfo {
bc.Stores.mu.RLock()
Expand Down Expand Up @@ -224,6 +237,7 @@ type RegionSetInformer interface {
RandFollowerRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
RandLeaderRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
RandLearnerRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
RandWitnessRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
RandPendingRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
GetAverageRegionSize() int64
GetStoreRegionCount(storeID uint64) int
Expand All @@ -239,6 +253,7 @@ type StoreSetInformer interface {
GetStore(id uint64) *StoreInfo

GetRegionStores(region *RegionInfo) []*StoreInfo
GetNonWitnessVoterStores(region *RegionInfo) []*StoreInfo
GetFollowerStores(region *RegionInfo) []*StoreInfo
GetLeaderStore(region *RegionInfo) *StoreInfo
}
Expand Down
4 changes: 4 additions & 0 deletions server/core/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
LeaderKind ResourceKind = iota
// RegionKind indicates the region kind resource
RegionKind
// WitnessKind indicates the witness kind resource
WitnessKind
)

func (k ResourceKind) String() string {
Expand All @@ -55,6 +57,8 @@ func (k ResourceKind) String() string {
return "leader"
case RegionKind:
return "region"
case WitnessKind:
return "witness"
default:
return "unknown"
}
Expand Down
49 changes: 46 additions & 3 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,18 @@ func (r *RegionInfo) GetFollower() *metapb.Peer {
return nil
}

// GetNonWitnessVoters returns a map indicate the non-witness voter peers distributed.
func (r *RegionInfo) GetNonWitnessVoters() map[uint64]*metapb.Peer {
peers := r.GetVoters()
nonWitnesses := make(map[uint64]*metapb.Peer, len(peers))
for _, peer := range peers {
if !peer.IsWitness {
nonWitnesses[peer.GetStoreId()] = peer
}
}
return nonWitnesses
}

// GetDiffFollowers returns the followers which is not located in the same
// store as any other followers of the another specified region.
func (r *RegionInfo) GetDiffFollowers(other *RegionInfo) []*metapb.Peer {
Expand Down Expand Up @@ -474,11 +486,27 @@ func (r *RegionInfo) GetBuckets() *metapb.Buckets {
return (*metapb.Buckets)(buckets)
}

// GetStorePeerApproximateSize returns the approximate size of the peer on the specified store.
func (r *RegionInfo) GetStorePeerApproximateSize(storeID uint64) int64 {
if storeID != 0 && r.GetStorePeer(storeID).IsWitness {
return 0
}
return r.approximateSize
}

// GetApproximateSize returns the approximate size of the region.
func (r *RegionInfo) GetApproximateSize() int64 {
return r.approximateSize
}

// GetStorePeerApproximateKeys returns the approximate keys of the peer on the specified store.
func (r *RegionInfo) GetStorePeerApproximateKeys(storeID uint64) int64 {
if storeID != 0 && r.GetStorePeer(storeID).IsWitness {
return 0
}
return r.approximateKeys
}

// GetApproximateKeys returns the approximate keys of the region.
func (r *RegionInfo) GetApproximateKeys() int64 {
return r.approximateKeys
Expand Down Expand Up @@ -914,7 +942,7 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi
}
}

item := &regionItem{region}
item := &regionItem{region, 0}
r.subRegions[region.GetID()] = item
// It has been removed and all information needs to be updated again.
// Set peers then.
Expand All @@ -924,6 +952,7 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi
store = newRegionTree()
peersMap[storeID] = store
}
item.storeID = storeID
store.update(item, false)
}

Expand Down Expand Up @@ -1335,6 +1364,20 @@ func (r *RegionsInfo) RandLearnerRegions(storeID uint64, ranges []KeyRange) []*R
return r.learners[storeID].RandomRegions(randomRegionMaxRetry, ranges)
}

// RandWitnessRegion randomly gets a store's witness region.
func (r *RegionsInfo) RandWitnessRegion(storeID uint64, ranges []KeyRange) *RegionInfo {
r.st.RLock()
defer r.st.RUnlock()
return r.witnesses[storeID].RandomRegion(ranges)
}

// RandWitnessRegions randomly gets a store's n witness regions.
func (r *RegionsInfo) RandWitnessRegions(storeID uint64, ranges []KeyRange) []*RegionInfo {
r.st.RLock()
defer r.st.RUnlock()
return r.witnesses[storeID].RandomRegions(randomRegionMaxRetry, ranges)
}

// GetLeader returns leader RegionInfo by storeID and regionID (now only used in test)
func (r *RegionsInfo) GetLeader(storeID uint64, region *RegionInfo) *RegionInfo {
r.st.RLock()
Expand Down Expand Up @@ -1410,8 +1453,8 @@ func (r *RegionInfo) GetWriteLoads() []float64 {
func (r *RegionsInfo) GetRangeCount(startKey, endKey []byte) int {
r.t.RLock()
defer r.t.RUnlock()
start := &regionItem{&RegionInfo{meta: &metapb.Region{StartKey: startKey}}}
end := &regionItem{&RegionInfo{meta: &metapb.Region{StartKey: endKey}}}
start := &regionItem{&RegionInfo{meta: &metapb.Region{StartKey: startKey}}, 0}
end := &regionItem{&RegionInfo{meta: &metapb.Region{StartKey: endKey}}, 0}
// it returns 0 if startKey is nil.
_, startIndex := r.tree.tree.GetWithIndex(start)
var endIndex int
Expand Down
11 changes: 11 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ func WithPendingPeers(pendingPeers []*metapb.Peer) RegionCreateOption {
}
}

// WithWitness sets the witness for the region.
func WithWitness(peerID uint64) RegionCreateOption {
return func(region *RegionInfo) {
for _, p := range region.GetPeers() {
if p.GetId() == peerID {
p.IsWitness = true
}
}
}
}

// WithWitnesses sets the witnesses for the region.
func WithWitnesses(witnesses []*metapb.Peer) RegionCreateOption {
return func(region *RegionInfo) {
Expand Down
12 changes: 7 additions & 5 deletions server/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

type regionItem struct {
*RegionInfo
// it may be 0 if it's not in store dedicated region tree
storeID uint64
}

// GetStartKey returns the start key of the region.
Expand Down Expand Up @@ -109,7 +111,7 @@ func (t *regionTree) overlaps(item *regionItem) []*regionItem {
// insert the region.
func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*regionItem) []*RegionInfo {
region := item.RegionInfo
t.totalSize += region.approximateSize
t.totalSize += region.GetStorePeerApproximateSize(item.storeID)
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
t.totalWriteBytesRate += regionWriteBytesRate
t.totalWriteKeysRate += regionWriteKeysRate
Expand All @@ -130,7 +132,7 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
zap.Uint64("region-id", old.GetID()),
logutil.ZapRedactStringer("delete-region", RegionToHexMeta(old.GetMeta())),
logutil.ZapRedactStringer("update-region", RegionToHexMeta(region.GetMeta())))
t.totalSize -= old.approximateSize
t.totalSize -= old.GetStorePeerApproximateSize(overlap.storeID)
regionWriteBytesRate, regionWriteKeysRate = old.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
Expand All @@ -141,12 +143,12 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re

// updateStat is used to update statistics when regionItem.RegionInfo is directly replaced.
func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
t.totalSize += region.approximateSize
t.totalSize += region.GetApproximateSize()
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
t.totalWriteBytesRate += regionWriteBytesRate
t.totalWriteKeysRate += regionWriteKeysRate

t.totalSize -= origin.approximateSize
t.totalSize -= origin.GetApproximateSize()
regionWriteBytesRate, regionWriteKeysRate = origin.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
Expand All @@ -165,7 +167,7 @@ func (t *regionTree) remove(region *RegionInfo) {
return
}

t.totalSize -= result.GetApproximateSize()
t.totalSize -= result.GetStorePeerApproximateSize(result.storeID)
regionWriteBytesRate, regionWriteKeysRate := result.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
Expand Down
5 changes: 5 additions & 0 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ func (s *StoreInfo) GetSlowScore() uint64 {
return s.rawStats.GetSlowScore()
}

// WitnessScore returns the store's witness score.
func (s *StoreInfo) WitnessScore(delta int64) float64 {
return float64(int64(s.GetWitnessCount()) + delta)
}

// IsSlow checks if the slow score reaches the threshold.
func (s *StoreInfo) IsSlow() bool {
s.mu.RLock()
Expand Down
5 changes: 5 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@ func (h *Handler) AddBalanceLeaderScheduler() error {
return h.AddScheduler(schedulers.BalanceLeaderType)
}

// AddBalanceWitnessScheduler adds a balance-witness-scheduler.
func (h *Handler) AddBalanceWitnessScheduler() error {
return h.AddScheduler(schedulers.BalanceWitnessType)
}

// AddBalanceRegionScheduler adds a balance-region-scheduler.
func (h *Handler) AddBalanceRegionScheduler() error {
return h.AddScheduler(schedulers.BalanceRegionType)
Expand Down

0 comments on commit 0a352b3

Please sign in to comment.