Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedulers: add balance witness scheduler #5763

Merged
merged 9 commits into from
Dec 30, 2022
34 changes: 34 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,23 @@ func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int) {
mc.PutStore(store)
}

// AddWitnessStore adds store with specified count of witness.
func (mc *Cluster) AddWitnessStore(storeID uint64, witnessCount int) {
stats := &pdpb.StoreStats{}
stats.Capacity = defaultStoreCapacity
stats.UsedSize = 0
stats.Available = stats.Capacity
store := core.NewStoreInfo(
&metapb.Store{Id: storeID},
core.SetStoreStats(stats),
core.SetWitnessCount(witnessCount),
core.SetLastHeartbeatTS(time.Now()),
)
mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
mc.PutStore(store)
}

// AddRegionStoreWithLeader adds store with specified count of region and leader.
func (mc *Cluster) AddRegionStoreWithLeader(storeID uint64, regionCount int, leaderCounts ...int) {
leaderCount := regionCount
Expand Down Expand Up @@ -345,6 +362,14 @@ func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderStoreID uint64, otherP
return region
}

// AddLeaderRegionWithWitness adds region with specified leader and followers and witness.
func (mc *Cluster) AddLeaderRegionWithWitness(regionID uint64, leaderStoreID uint64, otherPeerStoreIDs []uint64, witnessStoreID uint64) *core.RegionInfo {
origin := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
region := origin.Clone(core.SetApproximateSize(defaultRegionSize/units.MiB), core.SetApproximateKeys(10), core.WithWitness(origin.GetStorePeer(witnessStoreID).Id))
mc.PutRegion(region)
return region
}

// AddLightWeightLeaderRegion adds a light-wight region with specified leader and followers.
func (mc *Cluster) AddLightWeightLeaderRegion(regionID uint64, leaderStoreID uint64, otherPeerStoreIDs ...uint64) *core.RegionInfo {
region := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
Expand Down Expand Up @@ -564,6 +589,15 @@ func (mc *Cluster) UpdatePendingPeerCount(storeID uint64, pendingPeerCount int)
mc.PutStore(newStore)
}

// UpdateWitnessCount updates store witness count.
func (mc *Cluster) UpdateWitnessCount(storeID uint64, witnessCount int) {
store := mc.GetStore(storeID)
newStore := store.Clone(
core.SetWitnessCount(witnessCount),
)
mc.PutStore(newStore)
}

// UpdateStorageRatio updates store storage ratio count.
func (mc *Cluster) UpdateStorageRatio(storeID uint64, usedRatio, availableRatio float64) {
mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) {
Expand Down
5 changes: 5 additions & 0 deletions server/api/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ func (h *schedulerHandler) CreateScheduler(w http.ResponseWriter, r *http.Reques
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
case schedulers.BalanceWitnessName:
if err := h.AddBalanceWitnessScheduler(); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
case schedulers.HotRegionName:
if err := h.AddBalanceHotRegionScheduler(); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
Expand Down
10 changes: 10 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,11 +1003,21 @@ func (c *RaftCluster) RandLearnerRegions(storeID uint64, ranges []core.KeyRange)
return c.core.RandLearnerRegions(storeID, ranges)
}

// RandWitnessRegions returns some random regions that has a witness peer on the store.
func (c *RaftCluster) RandWitnessRegions(storeID uint64, ranges []core.KeyRange) []*core.RegionInfo {
return c.core.RandWitnessRegions(storeID, ranges)
}

// GetLeaderStore returns all stores that contains the region's leader peer.
func (c *RaftCluster) GetLeaderStore(region *core.RegionInfo) *core.StoreInfo {
return c.core.GetLeaderStore(region)
}

// GetNonWitnessVoterStores returns all stores that contains the region's non-witness voter peer.
func (c *RaftCluster) GetNonWitnessVoterStores(region *core.RegionInfo) []*core.StoreInfo {
return c.core.GetNonWitnessVoterStores(region)
}

// GetFollowerStores returns all stores that contains the region's follower peer.
func (c *RaftCluster) GetFollowerStores(region *core.RegionInfo) []*core.StoreInfo {
return c.core.GetFollowerStores(region)
Expand Down
25 changes: 14 additions & 11 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ func TestAddScheduler(t *testing.T) {
re.NoError(co.removeScheduler(schedulers.BalanceRegionName))
re.NoError(co.removeScheduler(schedulers.HotRegionName))
re.NoError(co.removeScheduler(schedulers.SplitBucketName))
re.NoError(co.removeScheduler(schedulers.BalanceWitnessName))
re.Empty(co.schedulers)

stream := mockhbstream.NewHeartbeatStream()
Expand Down Expand Up @@ -788,7 +789,7 @@ func TestPersistScheduler(t *testing.T) {
re.NoError(tc.addLeaderStore(1, 1))
re.NoError(tc.addLeaderStore(2, 1))

re.Len(co.schedulers, 4)
re.Len(co.schedulers, 5)
oc := co.opController
storage := tc.RaftCluster.storage

Expand All @@ -798,14 +799,15 @@ func TestPersistScheduler(t *testing.T) {
evict, err := schedule.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedule.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"}))
re.NoError(err)
re.NoError(co.addScheduler(evict, "2"))
re.Len(co.schedulers, 6)
re.Len(co.schedulers, 7)
sches, _, err := storage.LoadAllScheduleConfig()
re.NoError(err)
re.Len(sches, 6)
re.Len(sches, 7)
re.NoError(co.removeScheduler(schedulers.BalanceLeaderName))
re.NoError(co.removeScheduler(schedulers.BalanceRegionName))
re.NoError(co.removeScheduler(schedulers.HotRegionName))
re.NoError(co.removeScheduler(schedulers.SplitBucketName))
re.NoError(co.removeScheduler(schedulers.BalanceWitnessName))
re.Len(co.schedulers, 2)
re.NoError(co.cluster.opt.Persist(storage))
co.stop()
Expand All @@ -821,15 +823,15 @@ func TestPersistScheduler(t *testing.T) {
defer func() {
config.DefaultSchedulers = config.DefaultSchedulers[:len(config.DefaultSchedulers)-1]
}()
re.Len(newOpt.GetSchedulers(), 4)
re.Len(newOpt.GetSchedulers(), 5)
re.NoError(newOpt.Reload(storage))
// only remains 3 items with independent config.
sches, _, err = storage.LoadAllScheduleConfig()
re.NoError(err)
re.Len(sches, 3)

// option have 6 items because the default scheduler do not remove.
re.Len(newOpt.GetSchedulers(), 7)
re.Len(newOpt.GetSchedulers(), 8)
re.NoError(newOpt.Persist(storage))
tc.RaftCluster.opt = newOpt

Expand All @@ -856,10 +858,10 @@ func TestPersistScheduler(t *testing.T) {

// the scheduler option should contain 6 items
// the `hot scheduler` are disabled
re.Len(co.cluster.opt.GetSchedulers(), 7)
re.Len(co.cluster.opt.GetSchedulers(), 8)
re.NoError(co.removeScheduler(schedulers.GrantLeaderName))
// the scheduler that is not enable by default will be completely deleted
re.Len(co.cluster.opt.GetSchedulers(), 6)
re.Len(co.cluster.opt.GetSchedulers(), 7)
re.Len(co.schedulers, 4)
re.NoError(co.cluster.opt.Persist(co.cluster.storage))
co.stop()
Expand Down Expand Up @@ -891,24 +893,25 @@ func TestRemoveScheduler(t *testing.T) {
re.NoError(tc.addLeaderStore(1, 1))
re.NoError(tc.addLeaderStore(2, 1))

re.Len(co.schedulers, 4)
re.Len(co.schedulers, 5)
oc := co.opController
storage := tc.RaftCluster.storage

gls1, err := schedule.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedule.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}))
re.NoError(err)
re.NoError(co.addScheduler(gls1, "1"))
re.Len(co.schedulers, 5)
re.Len(co.schedulers, 6)
sches, _, err := storage.LoadAllScheduleConfig()
re.NoError(err)
re.Len(sches, 5)
re.Len(sches, 6)

// remove all schedulers
re.NoError(co.removeScheduler(schedulers.BalanceLeaderName))
re.NoError(co.removeScheduler(schedulers.BalanceRegionName))
re.NoError(co.removeScheduler(schedulers.HotRegionName))
re.NoError(co.removeScheduler(schedulers.GrantLeaderName))
re.NoError(co.removeScheduler(schedulers.SplitBucketName))
re.NoError(co.removeScheduler(schedulers.BalanceWitnessName))
// all removed
sches, _, err = storage.LoadAllScheduleConfig()
re.NoError(err)
Expand All @@ -927,7 +930,7 @@ func TestRemoveScheduler(t *testing.T) {
co.run()
re.Empty(co.schedulers)
// the option remains default scheduler
re.Len(co.cluster.opt.GetSchedulers(), 4)
re.Len(co.cluster.opt.GetSchedulers(), 5)
co.stop()
co.wg.Wait()
}
Expand Down
7 changes: 7 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,8 @@ type ScheduleConfig struct {
LeaderSchedulePolicy string `toml:"leader-schedule-policy" json:"leader-schedule-policy"`
// RegionScheduleLimit is the max coexist region schedules.
RegionScheduleLimit uint64 `toml:"region-schedule-limit" json:"region-schedule-limit"`
// WitnessScheduleLimit is the max coexist witness schedules.
WitnessScheduleLimit uint64 `toml:"witness-schedule-limit" json:"witness-schedule-limit"`
// ReplicaScheduleLimit is the max coexist replica schedules.
ReplicaScheduleLimit uint64 `toml:"replica-schedule-limit" json:"replica-schedule-limit"`
// MergeScheduleLimit is the max coexist merge schedules.
Expand Down Expand Up @@ -807,6 +809,7 @@ const (
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 2048
defaultWitnessScheduleLimit = 2048
defaultReplicaScheduleLimit = 64
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 4
Expand Down Expand Up @@ -851,6 +854,9 @@ func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
if !meta.IsDefined("region-schedule-limit") {
adjustUint64(&c.RegionScheduleLimit, defaultRegionScheduleLimit)
}
if !meta.IsDefined("witness-schedule-limit") {
adjustUint64(&c.WitnessScheduleLimit, defaultWitnessScheduleLimit)
}
if !meta.IsDefined("replica-schedule-limit") {
adjustUint64(&c.ReplicaScheduleLimit, defaultReplicaScheduleLimit)
}
Expand Down Expand Up @@ -1052,6 +1058,7 @@ type SchedulerConfig struct {
var DefaultSchedulers = SchedulerConfigs{
{Type: "balance-region"},
{Type: "balance-leader"},
{Type: "balance-witness"},
{Type: "hot-region"},
{Type: "split-bucket"},
}
Expand Down
6 changes: 6 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ const (
maxMergeRegionKeysKey = "schedule.max-merge-region-keys"
leaderScheduleLimitKey = "schedule.leader-schedule-limit"
regionScheduleLimitKey = "schedule.region-schedule-limit"
witnessScheduleLimitKey = "schedule.witness-schedule-limit"
replicaRescheduleLimitKey = "schedule.replica-schedule-limit"
mergeScheduleLimitKey = "schedule.merge-schedule-limit"
hotRegionScheduleLimitKey = "schedule.hot-region-schedule-limit"
Expand Down Expand Up @@ -396,6 +397,11 @@ func (o *PersistOptions) GetRegionScheduleLimit() uint64 {
return o.getTTLUintOr(regionScheduleLimitKey, o.GetScheduleConfig().RegionScheduleLimit)
}

// GetWitnessScheduleLimit returns the limit for region schedule.
func (o *PersistOptions) GetWitnessScheduleLimit() uint64 {
return o.getTTLUintOr(witnessScheduleLimitKey, o.GetScheduleConfig().WitnessScheduleLimit)
}

// GetReplicaScheduleLimit returns the limit for replica schedule.
func (o *PersistOptions) GetReplicaScheduleLimit() uint64 {
return o.getTTLUintOr(replicaRescheduleLimitKey, o.GetScheduleConfig().ReplicaScheduleLimit)
Expand Down
15 changes: 15 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ func (bc *BasicCluster) GetRegionStores(region *RegionInfo) []*StoreInfo {
return Stores
}

// GetNonWitnessVoterStores returns all Stores that contains the non-witness's voter peer.
func (bc *BasicCluster) GetNonWitnessVoterStores(region *RegionInfo) []*StoreInfo {
bc.Stores.mu.RLock()
defer bc.Stores.mu.RUnlock()
var Stores []*StoreInfo
for id := range region.GetNonWitnessVoters() {
if store := bc.Stores.GetStore(id); store != nil {
Stores = append(Stores, store)
}
}
return Stores
}

// GetFollowerStores returns all Stores that contains the region's follower peer.
func (bc *BasicCluster) GetFollowerStores(region *RegionInfo) []*StoreInfo {
bc.Stores.mu.RLock()
Expand Down Expand Up @@ -224,6 +237,7 @@ type RegionSetInformer interface {
RandFollowerRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
RandLeaderRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
RandLearnerRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
RandWitnessRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
RandPendingRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
GetAverageRegionSize() int64
GetStoreRegionCount(storeID uint64) int
Expand All @@ -239,6 +253,7 @@ type StoreSetInformer interface {
GetStore(id uint64) *StoreInfo

GetRegionStores(region *RegionInfo) []*StoreInfo
GetNonWitnessVoterStores(region *RegionInfo) []*StoreInfo
GetFollowerStores(region *RegionInfo) []*StoreInfo
GetLeaderStore(region *RegionInfo) *StoreInfo
}
Expand Down
4 changes: 4 additions & 0 deletions server/core/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
LeaderKind ResourceKind = iota
// RegionKind indicates the region kind resource
RegionKind
// WitnessKind indicates the witness kind resource
WitnessKind
)

func (k ResourceKind) String() string {
Expand All @@ -55,6 +57,8 @@ func (k ResourceKind) String() string {
return "leader"
case RegionKind:
return "region"
case WitnessKind:
return "witness"
default:
return "unknown"
}
Expand Down
44 changes: 44 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,18 @@ func (r *RegionInfo) GetFollower() *metapb.Peer {
return nil
}

// GetNonWitnessVoters returns a map indicate the non-witness voter peers distributed.
func (r *RegionInfo) GetNonWitnessVoters() map[uint64]*metapb.Peer {
peers := r.GetVoters()
nonWitnesses := make(map[uint64]*metapb.Peer, len(peers))
for _, peer := range peers {
if !peer.IsWitness {
nonWitnesses[peer.GetStoreId()] = peer
}
}
return nonWitnesses
}

// GetDiffFollowers returns the followers which is not located in the same
// store as any other followers of the another specified region.
func (r *RegionInfo) GetDiffFollowers(other *RegionInfo) []*metapb.Peer {
Expand Down Expand Up @@ -474,11 +486,29 @@ func (r *RegionInfo) GetBuckets() *metapb.Buckets {
return (*metapb.Buckets)(buckets)
}

// GetStorePeerApproximateSize returns the approximate size of the peer on the specified store.
func (r *RegionInfo) GetStorePeerApproximateSize(storeID uint64) int64 {
peer := r.GetStorePeer(storeID)
if storeID != 0 && peer != nil && peer.IsWitness {
ethercflow marked this conversation as resolved.
Show resolved Hide resolved
return 0
}
return r.approximateSize
}

// GetApproximateSize returns the approximate size of the region.
func (r *RegionInfo) GetApproximateSize() int64 {
return r.approximateSize
}

// GetStorePeerApproximateKeys returns the approximate keys of the peer on the specified store.
func (r *RegionInfo) GetStorePeerApproximateKeys(storeID uint64) int64 {
peer := r.GetStorePeer(storeID)
if storeID != 0 && peer != nil && peer.IsWitness {
return 0
}
return r.approximateKeys
}

// GetApproximateKeys returns the approximate keys of the region.
func (r *RegionInfo) GetApproximateKeys() int64 {
return r.approximateKeys
Expand Down Expand Up @@ -1335,6 +1365,20 @@ func (r *RegionsInfo) RandLearnerRegions(storeID uint64, ranges []KeyRange) []*R
return r.learners[storeID].RandomRegions(randomRegionMaxRetry, ranges)
}

// RandWitnessRegion randomly gets a store's witness region.
func (r *RegionsInfo) RandWitnessRegion(storeID uint64, ranges []KeyRange) *RegionInfo {
r.st.RLock()
defer r.st.RUnlock()
return r.witnesses[storeID].RandomRegion(ranges)
}

// RandWitnessRegions randomly gets a store's n witness regions.
func (r *RegionsInfo) RandWitnessRegions(storeID uint64, ranges []KeyRange) []*RegionInfo {
r.st.RLock()
defer r.st.RUnlock()
return r.witnesses[storeID].RandomRegions(randomRegionMaxRetry, ranges)
}

// GetLeader returns leader RegionInfo by storeID and regionID (now only used in test)
func (r *RegionsInfo) GetLeader(storeID uint64, region *RegionInfo) *RegionInfo {
r.st.RLock()
Expand Down
11 changes: 11 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ func WithPendingPeers(pendingPeers []*metapb.Peer) RegionCreateOption {
}
}

// WithWitness sets the witness for the region.
func WithWitness(peerID uint64) RegionCreateOption {
return func(region *RegionInfo) {
for _, p := range region.GetPeers() {
if p.GetId() == peerID {
p.IsWitness = true
}
}
}
}

// WithWitnesses sets the witnesses for the region.
func WithWitnesses(witnesses []*metapb.Peer) RegionCreateOption {
return func(region *RegionInfo) {
Expand Down