diff --git a/conf/config.toml b/conf/config.toml index b4617aa0981..8f792254f15 100644 --- a/conf/config.toml +++ b/conf/config.toml @@ -61,7 +61,9 @@ leader-schedule-limit = 4 region-schedule-limit = 4 replica-schedule-limit = 8 merge-schedule-limit = 8 -tolerant-size-ratio = 2.5 +tolerant-size-ratio = 5 +high-space-reatio = 0.8 +low-space-ratio = 0.6 # customized schedulers, the format is as below # if empty, it will use balance-leader, balance-region, hot-region as default diff --git a/server/api/label.go b/server/api/label.go index 12fcf05a312..5586db236a9 100644 --- a/server/api/label.go +++ b/server/api/label.go @@ -72,8 +72,6 @@ func (h *labelsHandler) GetStores(w http.ResponseWriter, r *http.Request) { return } - maxDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration - stores := cluster.GetStores() storesInfo := &StoresInfo{ Stores: make([]*StoreInfo, 0, len(stores)), @@ -87,7 +85,7 @@ func (h *labelsHandler) GetStores(w http.ResponseWriter, r *http.Request) { return } - storeInfo := newStoreInfo(store, maxDownTime) + storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store) storesInfo.Stores = append(storesInfo.Stores, storeInfo) } storesInfo.Count = len(storesInfo.Stores) diff --git a/server/api/store.go b/server/api/store.go index b7a403f9120..48db0d3f2af 100644 --- a/server/api/store.go +++ b/server/api/store.go @@ -66,7 +66,7 @@ const ( downStateName = "Down" ) -func newStoreInfo(store *core.StoreInfo, maxStoreDownTime time.Duration) *StoreInfo { +func newStoreInfo(opt *server.ScheduleConfig, store *core.StoreInfo) *StoreInfo { s := &StoreInfo{ Store: &MetaStore{ Store: store.Store, @@ -77,11 +77,11 @@ func newStoreInfo(store *core.StoreInfo, maxStoreDownTime time.Duration) *StoreI Available: typeutil.ByteSize(store.Stats.GetAvailable()), LeaderCount: store.LeaderCount, LeaderWeight: store.LeaderWeight, - LeaderScore: store.LeaderScore(), + LeaderScore: store.LeaderScore(0), LeaderSize: store.LeaderSize, RegionCount: store.RegionCount, RegionWeight: store.RegionWeight, - RegionScore: store.RegionScore(), + RegionScore: store.RegionScore(opt.HighSpaceRatio, opt.LowSpaceRatio, 0), RegionSize: store.RegionSize, SendingSnapCount: store.Stats.GetSendingSnapCount(), ReceivingSnapCount: store.Stats.GetReceivingSnapCount(), @@ -103,7 +103,7 @@ func newStoreInfo(store *core.StoreInfo, maxStoreDownTime time.Duration) *StoreI } if store.State == metapb.StoreState_Up { - if store.DownTime() > maxStoreDownTime { + if store.DownTime() > opt.MaxStoreDownTime.Duration { s.Store.StateName = downStateName } else if store.IsDisconnected() { s.Store.StateName = disconnectedName @@ -137,8 +137,6 @@ func (h *storeHandler) Get(w http.ResponseWriter, r *http.Request) { return } - maxStoreDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration - vars := mux.Vars(r) storeIDStr := vars["id"] storeID, err := strconv.ParseUint(storeIDStr, 10, 64) @@ -153,7 +151,7 @@ func (h *storeHandler) Get(w http.ResponseWriter, r *http.Request) { return } - storeInfo := newStoreInfo(store, maxStoreDownTime) + storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store) h.rd.JSON(w, http.StatusOK, storeInfo) } @@ -324,8 +322,6 @@ func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - maxStoreDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration - stores := cluster.GetStores() StoresInfo := &StoresInfo{ Stores: make([]*StoreInfo, 0, len(stores)), @@ -345,7 +341,7 @@ func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - storeInfo := newStoreInfo(store, maxStoreDownTime) + storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store) StoresInfo.Stores = append(StoresInfo.Stores, storeInfo) } StoresInfo.Count = len(StoresInfo.Stores) diff --git a/server/api/store_test.go b/server/api/store_test.go index 10732c8e675..d2a8eaf58e5 100644 --- a/server/api/store_test.go +++ b/server/api/store_test.go @@ -270,14 +270,14 @@ func (s *testStoreSuite) TestDownState(c *C) { Stats: &pdpb.StoreStats{}, LastHeartbeatTS: time.Now(), } - storeInfo := newStoreInfo(store, time.Hour) + storeInfo := newStoreInfo(s.svr.GetScheduleConfig(), store) c.Assert(storeInfo.Store.StateName, Equals, metapb.StoreState_Up.String()) store.LastHeartbeatTS = time.Now().Add(-time.Minute * 2) - storeInfo = newStoreInfo(store, time.Hour) + storeInfo = newStoreInfo(s.svr.GetScheduleConfig(), store) c.Assert(storeInfo.Store.StateName, Equals, disconnectedName) store.LastHeartbeatTS = time.Now().Add(-time.Hour * 2) - storeInfo = newStoreInfo(store, time.Hour) + storeInfo = newStoreInfo(s.svr.GetScheduleConfig(), store) c.Assert(storeInfo.Store.StateName, Equals, downStateName) } diff --git a/server/api/trend.go b/server/api/trend.go index 71cd1883767..eb8cd2a0951 100644 --- a/server/api/trend.go +++ b/server/api/trend.go @@ -107,8 +107,6 @@ func (h *trendHandler) Handle(w http.ResponseWriter, r *http.Request) { } func (h *trendHandler) getTrendStores() ([]trendStore, error) { - maxStoreDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration - var readStats, writeStats core.StoreHotRegionsStat if hotRead := h.GetHotReadRegions(); hotRead != nil { readStats = hotRead.AsLeader @@ -123,7 +121,7 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) { trendStores := make([]trendStore, 0, len(stores)) for _, store := range stores { - info := newStoreInfo(store, maxStoreDownTime) + info := newStoreInfo(h.svr.GetScheduleConfig(), store) s := trendStore{ ID: info.Store.GetId(), Address: info.Store.GetAddress(), diff --git a/server/cache.go b/server/cache.go index 5917981dcd6..e8a721ccb7f 100644 --- a/server/cache.go +++ b/server/cache.go @@ -167,28 +167,6 @@ func (c *clusterInfo) GetStores() []*core.StoreInfo { return c.BasicCluster.GetStores() } -// GetStoresAverageScore returns the total resource score of all unfiltered stores. -func (c *clusterInfo) GetStoresAverageScore(kind core.ResourceKind, filters ...schedule.Filter) float64 { - c.RLock() - defer c.RUnlock() - - var totalResourceSize int64 - var totalResourceWeight float64 - for _, s := range c.BasicCluster.GetStores() { - if schedule.FilterSource(c, s, filters) { - continue - } - - totalResourceWeight += s.ResourceWeight(kind) - totalResourceSize += s.ResourceSize(kind) - } - - if totalResourceWeight == 0 { - return 0 - } - return float64(totalResourceSize) / totalResourceWeight -} - func (c *clusterInfo) getMetaStores() []*metapb.Store { c.RLock() defer c.RUnlock() @@ -564,6 +542,14 @@ func (c *clusterInfo) GetTolerantSizeRatio() float64 { return c.opt.GetTolerantSizeRatio() } +func (c *clusterInfo) GetLowSpaceRatio() float64 { + return c.opt.GetLowSpaceRatio() +} + +func (c *clusterInfo) GetHighSpaceRatio() float64 { + return c.opt.GetHighSpaceRatio() +} + func (c *clusterInfo) GetMaxSnapshotCount() uint64 { return c.opt.GetMaxSnapshotCount() } diff --git a/server/config.go b/server/config.go index e4fdd44a5ad..54085af6f0f 100644 --- a/server/config.go +++ b/server/config.go @@ -352,6 +352,18 @@ type ScheduleConfig struct { MergeScheduleLimit uint64 `toml:"merge-schedule-limit,omitempty" json:"merge-schedule-limit"` // TolerantSizeRatio is the ratio of buffer size for balance scheduler. TolerantSizeRatio float64 `toml:"tolerant-size-ratio,omitempty" json:"tolerant-size-ratio"` + // + // high space stage transition stage low space stage + // |--------------------|-----------------------------|-------------------------| + // ^ ^ ^ ^ + // 0 HighSpaceRatio * capacity LowSpaceRatio * capacity capacity + // + // LowSpaceRatio is the lowest usage ratio of store which regraded as low space. + // When in low space, store region score increases to very large and varies inversely with available size. + LowSpaceRatio float64 `toml:"low-space-ratio,omitempty" json:"low-space-ratio"` + // HighSpaceRatio is the highest usage ratio of store which regraded as high space. + // High space means there is a lot of spare capacity, and store region score varies directly with used size. + HighSpaceRatio float64 `toml:"high-space-ratio,omitempty" json:"high-space-ratio"` // EnableRaftLearner is the option for using AddLearnerNode instead of AddNode EnableRaftLearner bool `toml:"enable-raft-learner" json:"enable-raft-learner,string"` // Schedulers support for loding customized schedulers @@ -371,6 +383,8 @@ func (c *ScheduleConfig) clone() *ScheduleConfig { ReplicaScheduleLimit: c.ReplicaScheduleLimit, MergeScheduleLimit: c.MergeScheduleLimit, TolerantSizeRatio: c.TolerantSizeRatio, + LowSpaceRatio: c.LowSpaceRatio, + HighSpaceRatio: c.HighSpaceRatio, EnableRaftLearner: c.EnableRaftLearner, Schedulers: schedulers, } @@ -396,7 +410,9 @@ const ( defaultRegionScheduleLimit = 4 defaultReplicaScheduleLimit = 8 defaultMergeScheduleLimit = 8 - defaultTolerantSizeRatio = 2.5 + defaultTolerantSizeRatio = 5 + defaultLowSpaceRatio = 0.8 + defaultHighSpaceRatio = 0.6 ) var defaultSchedulers = SchedulerConfigs{ @@ -416,6 +432,8 @@ func (c *ScheduleConfig) adjust() { adjustUint64(&c.ReplicaScheduleLimit, defaultReplicaScheduleLimit) adjustUint64(&c.MergeScheduleLimit, defaultMergeScheduleLimit) adjustFloat64(&c.TolerantSizeRatio, defaultTolerantSizeRatio) + adjustFloat64(&c.LowSpaceRatio, defaultLowSpaceRatio) + adjustFloat64(&c.HighSpaceRatio, defaultHighSpaceRatio) adjustSchedulers(&c.Schedulers, defaultSchedulers) } diff --git a/server/coordinator.go b/server/coordinator.go index d80798b5b0f..897a5f71255 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -436,6 +436,7 @@ func (c *coordinator) addOperatorLocked(op *schedule.Operator) bool { if old, ok := c.operators[regionID]; ok { if !isHigherPriorityOperator(op, old) { log.Infof("[region %v] cancel add operator, old: %s", regionID, old) + operatorCounter.WithLabelValues(op.Desc(), "canceled").Inc() return false } log.Infof("[region %v] replace old operator: %s", regionID, old) @@ -470,6 +471,7 @@ func (c *coordinator) addOperators(ops ...*schedule.Operator) bool { for _, op := range ops { if old := c.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) { log.Infof("[region %v] cancel add operators, old: %s", op.RegionID(), old) + operatorCounter.WithLabelValues(op.Desc(), "canceled").Inc() return false } } diff --git a/server/coordinator_test.go b/server/coordinator_test.go index 16da80d53a1..52e0c8b9e8f 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -65,8 +65,8 @@ func (c *testClusterInfo) addRegionStore(storeID uint64, regionCount int) { store.LastHeartbeatTS = time.Now() store.RegionCount = regionCount store.RegionSize = int64(regionCount) * 10 - store.Stats.Capacity = uint64(1024) - store.Stats.Available = store.Stats.Capacity + store.Stats.Capacity = 1000 * (1 << 20) + store.Stats.Available = store.Stats.Capacity - uint64(store.RegionSize) c.putStore(store) } diff --git a/server/core/store.go b/server/core/store.go index afd85e5bfc4..b40005b2368 100644 --- a/server/core/store.go +++ b/server/core/store.go @@ -45,6 +45,7 @@ type StoreInfo struct { func NewStoreInfo(store *metapb.Store) *StoreInfo { return &StoreInfo{ Store: store, + Stats: &pdpb.StoreStats{}, LeaderWeight: 1.0, RegionWeight: 1.0, } @@ -103,15 +104,46 @@ func (s *StoreInfo) DownTime() time.Duration { } const minWeight = 1e-6 +const maxScore = 1024 * 1024 * 1024 -// LeaderScore returns the store's leader score: leaderCount / leaderWeight. -func (s *StoreInfo) LeaderScore() float64 { - return float64(s.LeaderSize) / math.Max(s.LeaderWeight, minWeight) +// LeaderScore returns the store's leader score: leaderSize / leaderWeight. +func (s *StoreInfo) LeaderScore(delta int64) float64 { + return float64(s.LeaderSize+delta) / math.Max(s.LeaderWeight, minWeight) } -// RegionScore returns the store's region score: regionSize / regionWeight. -func (s *StoreInfo) RegionScore() float64 { - return float64(s.RegionSize) / math.Max(s.RegionWeight, minWeight) +// RegionScore returns the store's region score. +func (s *StoreInfo) RegionScore(highSpaceRatio, lowSpaceRatio float64, delta int64) float64 { + if s.RegionSize == 0 { + return float64(delta) + } + + capacity := float64(s.Stats.GetCapacity()) / (1 << 20) + available := float64(s.Stats.GetAvailable()) / (1 << 20) + + var score float64 + + // because of rocksdb compression, region size is larger than actual used size + amplification := float64(s.RegionSize) / (float64(s.Stats.GetUsedSize()) / (1 << 20)) + + if available-float64(delta)/amplification >= (1-highSpaceRatio)*capacity { + score = float64(s.RegionSize + delta) + } else if available-float64(delta)/amplification <= (1-lowSpaceRatio)*capacity { + score = maxScore - (available - float64(delta)/amplification) + } else { + // to make the score function continuous, we use linear function y = k * x + b as transition period + // from above we know that there are two points must on the function image + // p1(highSpaceRatio*capacity*amplification, highSpaceRatio*capacity*amplification) and + // p2(lowSpaceRatio*capacity*amplification, maxScore-(1-lowSpaceRatio)*capacity) + // so k = (y2 - y1) / (x2 - x1) + x1, y1 := highSpaceRatio*capacity*amplification, highSpaceRatio*capacity*amplification + x2, y2 := lowSpaceRatio*capacity*amplification, maxScore-(1-lowSpaceRatio)*capacity + + k := (y2 - y1) / (x2 - x1) + b := y1 - k*x1 + score = k*float64(s.RegionSize+delta) + b + } + + return score / math.Max(s.RegionWeight, minWeight) } // StorageSize returns store's used storage size reported from tikv. @@ -127,11 +159,9 @@ func (s *StoreInfo) AvailableRatio() float64 { return float64(s.Stats.GetAvailable()) / float64(s.Stats.GetCapacity()) } -const storeLowSpaceThreshold = 0.2 - // IsLowSpace checks if the store is lack of space. -func (s *StoreInfo) IsLowSpace() bool { - return s.Stats != nil && s.AvailableRatio() < storeLowSpaceThreshold +func (s *StoreInfo) IsLowSpace(lowSpaceRatio float64) bool { + return s.Stats != nil && s.AvailableRatio() < 1-lowSpaceRatio } // ResourceCount reutrns count of leader/region in the store. @@ -159,12 +189,12 @@ func (s *StoreInfo) ResourceSize(kind ResourceKind) int64 { } // ResourceScore reutrns score of leader/region in the store. -func (s *StoreInfo) ResourceScore(kind ResourceKind) float64 { +func (s *StoreInfo) ResourceScore(kind ResourceKind, highSpaceRatio, lowSpaceRatio float64, delta int64) float64 { switch kind { case LeaderKind: - return s.LeaderScore() + return s.LeaderScore(delta) case RegionKind: - return s.RegionScore() + return s.RegionScore(highSpaceRatio, lowSpaceRatio, delta) default: return 0 } diff --git a/server/option.go b/server/option.go index 8e9ec36b20a..c49f6c497c0 100644 --- a/server/option.go +++ b/server/option.go @@ -120,6 +120,14 @@ func (o *scheduleOption) GetTolerantSizeRatio() float64 { return o.load().TolerantSizeRatio } +func (o *scheduleOption) GetLowSpaceRatio() float64 { + return o.load().LowSpaceRatio +} + +func (o *scheduleOption) GetHighSpaceRatio() float64 { + return o.load().HighSpaceRatio +} + func (o *scheduleOption) IsRaftLearnerEnabled() bool { return o.load().EnableRaftLearner } diff --git a/server/region_statistics.go b/server/region_statistics.go index eac83ededa5..34f214ca146 100644 --- a/server/region_statistics.go +++ b/server/region_statistics.go @@ -163,7 +163,7 @@ func (l *labelLevelStatistics) Observe(region *core.RegionInfo, stores []*core.S func (l *labelLevelStatistics) Collect() { for level, count := range l.labelLevelCounter { typ := fmt.Sprintf("level_%d", level) - regionStatusGauge.WithLabelValues(typ).Set(float64(count)) + regionLabelLevelGauge.WithLabelValues(typ).Set(float64(count)) } } diff --git a/server/schedule/basic_cluster.go b/server/schedule/basic_cluster.go index 87da5ea7568..dd8814ce847 100644 --- a/server/schedule/basic_cluster.go +++ b/server/schedule/basic_cluster.go @@ -73,10 +73,22 @@ func (m OpInfluence) GetStoreInfluence(id uint64) *StoreInfluence { // StoreInfluence records influences that pending operators will make. type StoreInfluence struct { - RegionSize int - RegionCount int - LeaderSize int - LeaderCount int + RegionSize int64 + RegionCount int64 + LeaderSize int64 + LeaderCount int64 +} + +// ResourceSize returns delta size of leader/region by influence. +func (s StoreInfluence) ResourceSize(kind core.ResourceKind) int64 { + switch kind { + case core.LeaderKind: + return s.LeaderSize + case core.RegionKind: + return s.RegionSize + default: + return 0 + } } // NewBasicCluster creates a BasicCluster. diff --git a/server/schedule/filters.go b/server/schedule/filters.go index 5c52b040fff..b105deb9215 100644 --- a/server/schedule/filters.go +++ b/server/schedule/filters.go @@ -197,7 +197,7 @@ func (f *storageThresholdFilter) FilterSource(opt Options, store *core.StoreInfo } func (f *storageThresholdFilter) FilterTarget(opt Options, store *core.StoreInfo) bool { - return store.IsLowSpace() + return store.IsLowSpace(opt.GetLowSpaceRatio()) } // distinctScoreFilter ensures that distinct score will not decrease. diff --git a/server/schedulers/mockcluster.go b/server/schedule/mockcluster.go similarity index 66% rename from server/schedulers/mockcluster.go rename to server/schedule/mockcluster.go index b03400e6a45..36b632d63ed 100644 --- a/server/schedulers/mockcluster.go +++ b/server/schedule/mockcluster.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedulers +package schedule import ( "fmt" @@ -22,60 +22,49 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" - "github.com/pingcap/pd/server/schedule" log "github.com/sirupsen/logrus" ) -type mockCluster struct { - *schedule.BasicCluster +// MockCluster is used to mock clusterInfo for test use +type MockCluster struct { + *BasicCluster id *core.MockIDAllocator *MockSchedulerOptions } -// NewMockCluster creates a new mockCluster -func newMockCluster(opt *MockSchedulerOptions) *mockCluster { - return &mockCluster{ - BasicCluster: schedule.NewBasicCluster(), +// NewMockCluster creates a new MockCluster +func NewMockCluster(opt *MockSchedulerOptions) *MockCluster { + return &MockCluster{ + BasicCluster: NewBasicCluster(), id: core.NewMockIDAllocator(), MockSchedulerOptions: opt, } } -func (mc *mockCluster) allocID() (uint64, error) { +func (mc *MockCluster) allocID() (uint64, error) { return mc.id.Alloc() } // ScanRegions scan region with start key, until number greater than limit. -func (mc *mockCluster) ScanRegions(startKey []byte, limit int) []*core.RegionInfo { +func (mc *MockCluster) ScanRegions(startKey []byte, limit int) []*core.RegionInfo { return mc.Regions.ScanRange(startKey, limit) } -// GetStoresAverageScore returns the total resource score of all unfiltered stores. -func (mc *mockCluster) GetStoresAverageScore(kind core.ResourceKind, filters ...schedule.Filter) float64 { - var totalResourceSize int64 - var totalResourceWeight float64 - for _, s := range mc.BasicCluster.GetStores() { - if schedule.FilterSource(mc, s, filters) { - continue - } - - totalResourceWeight += s.ResourceWeight(kind) - totalResourceSize += s.ResourceSize(kind) - } - - if totalResourceWeight == 0 { - return 0 - } - return float64(totalResourceSize) / totalResourceWeight +// LoadRegion put region info without leader +func (mc *MockCluster) LoadRegion(regionID uint64, followerIds ...uint64) { + // regions load from etcd will have no leader + r := mc.newMockRegionInfo(regionID, 0, followerIds...) + r.Leader = nil + mc.PutRegion(r) } // IsRegionHot checks if the region is hot -func (mc *mockCluster) IsRegionHot(id uint64) bool { +func (mc *MockCluster) IsRegionHot(id uint64) bool { return mc.BasicCluster.IsRegionHot(id, mc.GetHotRegionLowThreshold()) } // RandHotRegionFromStore random picks a hot region in specify store. -func (mc *mockCluster) RandHotRegionFromStore(store uint64, kind schedule.FlowKind) *core.RegionInfo { +func (mc *MockCluster) RandHotRegionFromStore(store uint64, kind FlowKind) *core.RegionInfo { r := mc.HotCache.RandHotRegionFromStore(store, kind, mc.GetHotRegionLowThreshold()) if r == nil { return nil @@ -84,7 +73,7 @@ func (mc *mockCluster) RandHotRegionFromStore(store uint64, kind schedule.FlowKi } // AllocPeer allocs a new peer on a store. -func (mc *mockCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { +func (mc *MockCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { peerID, err := mc.allocID() if err != nil { log.Errorf("failed to alloc peer: %v", err) @@ -97,81 +86,64 @@ func (mc *mockCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { return peer, nil } -func (mc *mockCluster) setStoreUp(storeID uint64) { +// SetStoreUp sets store state to be up. +func (mc *MockCluster) SetStoreUp(storeID uint64) { store := mc.GetStore(storeID) store.State = metapb.StoreState_Up store.LastHeartbeatTS = time.Now() mc.PutStore(store) } -func (mc *mockCluster) setStoreDown(storeID uint64) { +// SetStoreDown sets store down. +func (mc *MockCluster) SetStoreDown(storeID uint64) { store := mc.GetStore(storeID) store.State = metapb.StoreState_Up store.LastHeartbeatTS = time.Time{} mc.PutStore(store) } -func (mc *mockCluster) setStoreOffline(storeID uint64) { +// SetStoreOffline sets store state to be offline. +func (mc *MockCluster) SetStoreOffline(storeID uint64) { store := mc.GetStore(storeID) store.State = metapb.StoreState_Offline mc.PutStore(store) } -func (mc *mockCluster) setStoreBusy(storeID uint64, busy bool) { +// SetStoreBusy sets store busy. +func (mc *MockCluster) SetStoreBusy(storeID uint64, busy bool) { store := mc.GetStore(storeID) store.Stats.IsBusy = busy store.LastHeartbeatTS = time.Now() mc.PutStore(store) } -func (mc *mockCluster) addLeaderStore(storeID uint64, leaderCount int) { +// AddLeaderStore adds store with specified count of leader. +func (mc *MockCluster) AddLeaderStore(storeID uint64, leaderCount int) { store := core.NewStoreInfo(&metapb.Store{Id: storeID}) store.Stats = &pdpb.StoreStats{} store.LastHeartbeatTS = time.Now() store.LeaderCount = leaderCount - store.Stats.Capacity = uint64(1024) - store.Stats.Available = store.Stats.Capacity store.LeaderSize = int64(leaderCount) * 10 + store.Stats.Capacity = 1000 * (1 << 20) + store.Stats.Available = store.Stats.Capacity - uint64(store.LeaderSize) mc.PutStore(store) } -func (mc *mockCluster) addRegionStore(storeID uint64, regionCount int) { +// AddRegionStore adds store with specified count of region. +func (mc *MockCluster) AddRegionStore(storeID uint64, regionCount int) { store := core.NewStoreInfo(&metapb.Store{Id: storeID}) store.Stats = &pdpb.StoreStats{} store.LastHeartbeatTS = time.Now() store.RegionCount = regionCount store.RegionSize = int64(regionCount) * 10 - store.Stats.Capacity = uint64(1024) - store.Stats.Available = store.Stats.Capacity + store.Stats.Capacity = 1000 * (1 << 20) + store.Stats.Available = store.Stats.Capacity - uint64(store.RegionSize) mc.PutStore(store) } -func (mc *mockCluster) updateStoreLeaderWeight(storeID uint64, weight float64) { - store := mc.GetStore(storeID) - store.LeaderWeight = weight - mc.PutStore(store) -} - -func (mc *mockCluster) updateStoreRegionWeight(storeID uint64, weight float64) { - store := mc.GetStore(storeID) - store.RegionWeight = weight - mc.PutStore(store) -} - -func (mc *mockCluster) updateStoreLeaderSize(storeID uint64, size int64) { - store := mc.GetStore(storeID) - store.LeaderSize = size - mc.PutStore(store) -} - -func (mc *mockCluster) updateStoreRegionSize(storeID uint64, size int64) { - store := mc.GetStore(storeID) - store.RegionSize = size - mc.PutStore(store) -} - -func (mc *mockCluster) addLabelsStore(storeID uint64, regionCount int, labels map[string]string) { - mc.addRegionStore(storeID, regionCount) +// AddLabelsStore adds store with specified count of region and labels. +func (mc *MockCluster) AddLabelsStore(storeID uint64, regionCount int, labels map[string]string) { + mc.AddRegionStore(storeID, regionCount) store := mc.GetStore(storeID) for k, v := range labels { store.Labels = append(store.Labels, &metapb.StoreLabel{Key: k, Value: v}) @@ -179,92 +151,127 @@ func (mc *mockCluster) addLabelsStore(storeID uint64, regionCount int, labels ma mc.PutStore(store) } -func (mc *mockCluster) addLeaderRegion(regionID uint64, leaderID uint64, followerIds ...uint64) { +// AddLeaderRegion adds region with specified leader and followers. +func (mc *MockCluster) AddLeaderRegion(regionID uint64, leaderID uint64, followerIds ...uint64) { regionInfo := mc.newMockRegionInfo(regionID, leaderID, followerIds...) regionInfo.ApproximateSize = 10 mc.PutRegion(regionInfo) } -func (mc *mockCluster) addLeaderRegionWithRange(regionID uint64, startKey string, endKey string, leaderID uint64, followerIds ...uint64) { +// AddLeaderRegionWithRange adds region with specified leader, followers and key range. +func (mc *MockCluster) AddLeaderRegionWithRange(regionID uint64, startKey string, endKey string, leaderID uint64, followerIds ...uint64) { r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) r.StartKey = []byte(startKey) r.EndKey = []byte(endKey) mc.PutRegion(r) } -func (mc *mockCluster) LoadRegion(regionID uint64, followerIds ...uint64) { - // regions load from etcd will have no leader - r := mc.newMockRegionInfo(regionID, 0, followerIds...) - r.Leader = nil +// AddLeaderRegionWithReadInfo adds region with specified leader, followers and read info. +func (mc *MockCluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64, readBytes uint64, followerIds ...uint64) { + r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) + r.ReadBytes = readBytes + isUpdate, item := mc.BasicCluster.CheckReadStatus(r) + if isUpdate { + mc.HotCache.Update(regionID, item, ReadFlow) + } mc.PutRegion(r) } -func (mc *mockCluster) addLeaderRegionWithWriteInfo(regionID uint64, leaderID uint64, writtenBytes uint64, followerIds ...uint64) { +// AddLeaderRegionWithWriteInfo adds region with specified leader, followers and write info. +func (mc *MockCluster) AddLeaderRegionWithWriteInfo(regionID uint64, leaderID uint64, writtenBytes uint64, followerIds ...uint64) { r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) r.WrittenBytes = writtenBytes isUpdate, item := mc.BasicCluster.CheckWriteStatus(r) if isUpdate { - mc.HotCache.Update(regionID, item, schedule.WriteFlow) + mc.HotCache.Update(regionID, item, WriteFlow) } mc.PutRegion(r) } -func (mc *mockCluster) updateLeaderCount(storeID uint64, leaderCount int) { +// UpdateStoreLeaderWeight updates store leader weight. +func (mc *MockCluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) { + store := mc.GetStore(storeID) + store.LeaderWeight = weight + mc.PutStore(store) +} + +// UpdateStoreRegionWeight updates store region weight. +func (mc *MockCluster) UpdateStoreRegionWeight(storeID uint64, weight float64) { + store := mc.GetStore(storeID) + store.RegionWeight = weight + mc.PutStore(store) +} + +// UpdateStoreLeaderSize updates store leader size. +func (mc *MockCluster) UpdateStoreLeaderSize(storeID uint64, size int64) { + store := mc.GetStore(storeID) + store.LeaderSize = size + store.Stats.Available = store.Stats.Capacity - uint64(store.LeaderSize) + mc.PutStore(store) +} + +// UpdateStoreRegionSize updates store region size. +func (mc *MockCluster) UpdateStoreRegionSize(storeID uint64, size int64) { + store := mc.GetStore(storeID) + store.RegionSize = size + store.Stats.Available = store.Stats.Capacity - uint64(store.RegionSize) + mc.PutStore(store) +} + +// UpdateLeaderCount updates store leader count. +func (mc *MockCluster) UpdateLeaderCount(storeID uint64, leaderCount int) { store := mc.GetStore(storeID) store.LeaderCount = leaderCount store.LeaderSize = int64(leaderCount) * 10 mc.PutStore(store) } -func (mc *mockCluster) updateRegionCount(storeID uint64, regionCount int) { +// UpdateRegionCount updates store region count. +func (mc *MockCluster) UpdateRegionCount(storeID uint64, regionCount int) { store := mc.GetStore(storeID) store.RegionCount = regionCount store.RegionSize = int64(regionCount) * 10 mc.PutStore(store) } -func (mc *mockCluster) updateSnapshotCount(storeID uint64, snapshotCount int) { +// UpdateSnapshotCount updates store snapshot count. +func (mc *MockCluster) UpdateSnapshotCount(storeID uint64, snapshotCount int) { store := mc.GetStore(storeID) store.Stats.ApplyingSnapCount = uint32(snapshotCount) mc.PutStore(store) } -func (mc *mockCluster) updatePendingPeerCount(storeID uint64, pendingPeerCount int) { +// UpdatePendingPeerCount updates store pending peer count. +func (mc *MockCluster) UpdatePendingPeerCount(storeID uint64, pendingPeerCount int) { store := mc.GetStore(storeID) store.PendingPeerCount = pendingPeerCount mc.PutStore(store) } -func (mc *mockCluster) updateStorageRatio(storeID uint64, usedRatio, availableRatio float64) { +// UpdateStorageRatio updates store storage ratio count. +func (mc *MockCluster) UpdateStorageRatio(storeID uint64, usedRatio, availableRatio float64) { store := mc.GetStore(storeID) - store.Stats.Capacity = uint64(1024) + store.Stats.Capacity = 1000 * (1 << 20) store.Stats.UsedSize = uint64(float64(store.Stats.Capacity) * usedRatio) store.Stats.Available = uint64(float64(store.Stats.Capacity) * availableRatio) mc.PutStore(store) } -func (mc *mockCluster) updateStorageWrittenBytes(storeID uint64, BytesWritten uint64) { +// UpdateStorageWrittenBytes updates store written bytes. +func (mc *MockCluster) UpdateStorageWrittenBytes(storeID uint64, BytesWritten uint64) { store := mc.GetStore(storeID) store.Stats.BytesWritten = BytesWritten mc.PutStore(store) } -func (mc *mockCluster) updateStorageReadBytes(storeID uint64, BytesRead uint64) { + +// UpdateStorageReadBytes updates store read bytes. +func (mc *MockCluster) UpdateStorageReadBytes(storeID uint64, BytesRead uint64) { store := mc.GetStore(storeID) store.Stats.BytesRead = BytesRead mc.PutStore(store) } -func (mc *mockCluster) addLeaderRegionWithReadInfo(regionID uint64, leaderID uint64, readBytes uint64, followerIds ...uint64) { - r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) - r.ReadBytes = readBytes - isUpdate, item := mc.BasicCluster.CheckReadStatus(r) - if isUpdate { - mc.HotCache.Update(regionID, item, schedule.ReadFlow) - } - mc.PutRegion(r) -} - -func (mc *mockCluster) newMockRegionInfo(regionID uint64, leaderID uint64, followerIds ...uint64) *core.RegionInfo { +func (mc *MockCluster) newMockRegionInfo(regionID uint64, leaderID uint64, followerIds ...uint64) *core.RegionInfo { region := &metapb.Region{ Id: regionID, StartKey: []byte(fmt.Sprintf("%20d", regionID)), @@ -280,14 +287,15 @@ func (mc *mockCluster) newMockRegionInfo(regionID uint64, leaderID uint64, follo return core.NewRegionInfo(region, leader) } -func (mc *mockCluster) applyOperator(op *schedule.Operator) { +// ApplyOperator mocks apply oeprator. +func (mc *MockCluster) ApplyOperator(op *Operator) { region := mc.GetRegion(op.RegionID()) for !op.IsFinish() { if step := op.Check(region); step != nil { switch s := step.(type) { - case schedule.TransferLeader: + case TransferLeader: region.Leader = region.GetStorePeer(s.ToStore) - case schedule.AddPeer: + case AddPeer: if region.GetStorePeer(s.ToStore) != nil { panic("Add peer that exists") } @@ -296,12 +304,12 @@ func (mc *mockCluster) applyOperator(op *schedule.Operator) { StoreId: s.ToStore, } region.Peers = append(region.Peers, peer) - case schedule.RemovePeer: + case RemovePeer: if region.GetStorePeer(s.FromStore) == nil { panic("Remove peer that doesn't exist") } region.RemoveStorePeer(s.FromStore) - case schedule.AddLearner: + case AddLearner: if region.GetStorePeer(s.ToStore) != nil { panic("Add learner that exists") } @@ -319,31 +327,38 @@ func (mc *mockCluster) applyOperator(op *schedule.Operator) { mc.PutRegion(region) } -func (mc *mockCluster) GetOpt() schedule.NamespaceOptions { +// GetOpt mocks method. +func (mc *MockCluster) GetOpt() NamespaceOptions { return mc.MockSchedulerOptions } -func (mc *mockCluster) GetLeaderScheduleLimit() uint64 { +// GetLeaderScheduleLimit mocks method. +func (mc *MockCluster) GetLeaderScheduleLimit() uint64 { return mc.MockSchedulerOptions.GetLeaderScheduleLimit(namespace.DefaultNamespace) } -func (mc *mockCluster) GetRegionScheduleLimit() uint64 { +// GetRegionScheduleLimit mocks method. +func (mc *MockCluster) GetRegionScheduleLimit() uint64 { return mc.MockSchedulerOptions.GetRegionScheduleLimit(namespace.DefaultNamespace) } -func (mc *mockCluster) GetReplicaScheduleLimit() uint64 { +// GetReplicaScheduleLimit mocks method. +func (mc *MockCluster) GetReplicaScheduleLimit() uint64 { return mc.MockSchedulerOptions.GetReplicaScheduleLimit(namespace.DefaultNamespace) } -func (mc *mockCluster) GetMergeScheduleLimit() uint64 { +// GetMergeScheduleLimit mocks method. +func (mc *MockCluster) GetMergeScheduleLimit() uint64 { return mc.MockSchedulerOptions.GetMergeScheduleLimit(namespace.DefaultNamespace) } -func (mc *mockCluster) GetMaxReplicas() int { +// GetMaxReplicas mocks method. +func (mc *MockCluster) GetMaxReplicas() int { return mc.MockSchedulerOptions.GetMaxReplicas(namespace.DefaultNamespace) } -func (mc *mockCluster) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool { +// CheckLabelProperty checks label property. +func (mc *MockCluster) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool { for _, cfg := range mc.LabelProperties[typ] { for _, l := range labels { if l.Key == cfg.Key && l.Value == cfg.Value { @@ -365,6 +380,8 @@ const ( defaultReplicaScheduleLimit = 8 defaultMergeScheduleLimit = 8 defaultTolerantSizeRatio = 2.5 + defaultLowSpaceRatio = 0.8 + defaultHighSpaceRatio = 0.6 ) // MockSchedulerOptions is a mock of SchedulerOptions @@ -382,11 +399,14 @@ type MockSchedulerOptions struct { LocationLabels []string HotRegionLowThreshold int TolerantSizeRatio float64 + LowSpaceRatio float64 + HighSpaceRatio float64 EnableRaftLearner bool LabelProperties map[string][]*metapb.StoreLabel } -func newMockSchedulerOptions() *MockSchedulerOptions { +// NewMockSchedulerOptions creates a mock schedule option. +func NewMockSchedulerOptions() *MockSchedulerOptions { mso := &MockSchedulerOptions{} mso.RegionScheduleLimit = defaultRegionScheduleLimit mso.LeaderScheduleLimit = defaultLeaderScheduleLimit @@ -395,10 +415,12 @@ func newMockSchedulerOptions() *MockSchedulerOptions { mso.MaxSnapshotCount = defaultMaxSnapshotCount mso.MaxStoreDownTime = defaultMaxStoreDownTime mso.MaxReplicas = defaultMaxReplicas - mso.HotRegionLowThreshold = schedule.HotRegionLowThreshold + mso.HotRegionLowThreshold = HotRegionLowThreshold mso.MaxPendingPeerCount = defaultMaxPendingPeerCount mso.MaxMergeRegionSize = defaultMaxMergeRegionSize mso.TolerantSizeRatio = defaultTolerantSizeRatio + mso.LowSpaceRatio = defaultLowSpaceRatio + mso.HighSpaceRatio = defaultHighSpaceRatio return mso } @@ -462,6 +484,16 @@ func (mso *MockSchedulerOptions) GetTolerantSizeRatio() float64 { return mso.TolerantSizeRatio } +// GetLowSpaceRatio mock method +func (mso *MockSchedulerOptions) GetLowSpaceRatio() float64 { + return mso.LowSpaceRatio +} + +// GetHighSpaceRatio mock method +func (mso *MockSchedulerOptions) GetHighSpaceRatio() float64 { + return mso.HighSpaceRatio +} + // SetMaxReplicas mock method func (mso *MockSchedulerOptions) SetMaxReplicas(replicas int) { mso.MaxReplicas = replicas diff --git a/server/schedule/namespace_checker.go b/server/schedule/namespace_checker.go index 864953cd660..83e07e9d01d 100644 --- a/server/schedule/namespace_checker.go +++ b/server/schedule/namespace_checker.go @@ -94,7 +94,6 @@ func (n *NamespaceChecker) SelectBestPeerToRelocate(region *core.RegionInfo, tar func (n *NamespaceChecker) SelectBestStoreToRelocate(region *core.RegionInfo, targets []*core.StoreInfo, filters ...Filter) uint64 { newFilters := []Filter{ NewStateFilter(), - NewStorageThresholdFilter(), NewExcludedFilter(nil, region.GetStoreIds()), } filters = append(filters, newFilters...) diff --git a/server/schedule/operator.go b/server/schedule/operator.go index 8b8887db981..937070659d3 100644 --- a/server/schedule/operator.go +++ b/server/schedule/operator.go @@ -57,9 +57,9 @@ func (tl TransferLeader) Influence(opInfluence OpInfluence, region *core.RegionI from := opInfluence.GetStoreInfluence(tl.FromStore) to := opInfluence.GetStoreInfluence(tl.ToStore) - from.LeaderSize -= int(region.ApproximateSize) + from.LeaderSize -= region.ApproximateSize from.LeaderCount-- - to.LeaderSize += int(region.ApproximateSize) + to.LeaderSize += region.ApproximateSize to.LeaderCount++ } @@ -88,7 +88,7 @@ func (ap AddPeer) IsFinish(region *core.RegionInfo) bool { func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to := opInfluence.GetStoreInfluence(ap.ToStore) - to.RegionSize += int(region.ApproximateSize) + to.RegionSize += region.ApproximateSize to.RegionCount++ } @@ -117,7 +117,7 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool { func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to := opInfluence.GetStoreInfluence(al.ToStore) - to.RegionSize += int(region.ApproximateSize) + to.RegionSize += region.ApproximateSize to.RegionCount++ } @@ -162,7 +162,7 @@ func (rp RemovePeer) IsFinish(region *core.RegionInfo) bool { func (rp RemovePeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { from := opInfluence.GetStoreInfluence(rp.FromStore) - from.RegionSize -= int(region.ApproximateSize) + from.RegionSize -= region.ApproximateSize from.RegionCount-- } @@ -194,9 +194,12 @@ func (mr MergeRegion) IsFinish(region *core.RegionInfo) bool { // Influence calculates the store difference that current step make func (mr MergeRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo) { if mr.IsPassive { - for _, peer := range region.GetPeers() { - o := opInfluence.GetStoreInfluence(peer.GetStoreId()) + for _, p := range region.GetPeers() { + o := opInfluence.GetStoreInfluence(p.GetStoreId()) o.RegionCount-- + if region.Leader.GetId() == p.GetId() { + o.LeaderCount-- + } } } } diff --git a/server/schedule/operator_test.go b/server/schedule/operator_test.go index d49f67c2cca..82c38b5dc35 100644 --- a/server/schedule/operator_test.go +++ b/server/schedule/operator_test.go @@ -160,7 +160,7 @@ func (s *testOperatorSuite) TestInfluence(c *C) { MergeRegion{IsPassive: true}.Influence(opInfluence, region) c.Assert(*opInfluence[1], DeepEquals, StoreInfluence{ LeaderSize: -10, - LeaderCount: -1, + LeaderCount: -2, RegionSize: -10, RegionCount: -2, }) diff --git a/server/schedule/opts.go b/server/schedule/opts.go index 400774f2e56..a5977870d17 100644 --- a/server/schedule/opts.go +++ b/server/schedule/opts.go @@ -40,6 +40,8 @@ type Options interface { GetHotRegionLowThreshold() int GetTolerantSizeRatio() float64 + GetLowSpaceRatio() float64 + GetHighSpaceRatio() float64 IsRaftLearnerEnabled() bool CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index b179b8fed85..318e476489c 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -72,7 +72,6 @@ func NewRegionScatterer(cluster Cluster, classifier namespace.Classifier) *Regio filters := []Filter{ NewStateFilter(), NewHealthFilter(), - NewStorageThresholdFilter(), } return &RegionScatterer{ diff --git a/server/schedule/replica.go b/server/schedule/replica.go index 5f7e2094e71..cf42422e3fd 100644 --- a/server/schedule/replica.go +++ b/server/schedule/replica.go @@ -40,7 +40,7 @@ func DistinctScore(labels []string, stores []*core.StoreInfo, other *core.StoreI // Returns 0 if store A is as good as store B. // Returns 1 if store A is better than store B. // Returns -1 if store B is better than store A. -func compareStoreScore(storeA *core.StoreInfo, scoreA float64, storeB *core.StoreInfo, scoreB float64) int { +func compareStoreScore(opt Options, storeA *core.StoreInfo, scoreA float64, storeB *core.StoreInfo, scoreB float64) int { // The store with higher score is better. if scoreA > scoreB { return 1 @@ -49,10 +49,12 @@ func compareStoreScore(storeA *core.StoreInfo, scoreA float64, storeB *core.Stor return -1 } // The store with lower region score is better. - if storeA.RegionScore() < storeB.RegionScore() { + if storeA.RegionScore(opt.GetHighSpaceRatio(), opt.GetLowSpaceRatio(), 0) < + storeB.RegionScore(opt.GetHighSpaceRatio(), opt.GetLowSpaceRatio(), 0) { return 1 } - if storeA.RegionScore() > storeB.RegionScore() { + if storeA.RegionScore(opt.GetHighSpaceRatio(), opt.GetLowSpaceRatio(), 0) > + storeB.RegionScore(opt.GetHighSpaceRatio(), opt.GetLowSpaceRatio(), 0) { return -1 } return 0 diff --git a/server/schedule/replica_checker.go b/server/schedule/replica_checker.go index 732ce5c6c10..e5db0b4b2af 100644 --- a/server/schedule/replica_checker.go +++ b/server/schedule/replica_checker.go @@ -120,7 +120,6 @@ func (r *ReplicaChecker) selectBestStoreToAddReplica(region *core.RegionInfo, fi // Add some must have filters. newFilters := []Filter{ NewStateFilter(), - NewStorageThresholdFilter(), NewPendingPeerCountFilter(), NewExcludedFilter(nil, region.GetStoreIds()), } diff --git a/server/schedule/replica_test.go b/server/schedule/replica_test.go index f4e34f46bda..081a304d8fb 100644 --- a/server/schedule/replica_test.go +++ b/server/schedule/replica_test.go @@ -18,6 +18,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/server/core" ) @@ -27,7 +28,14 @@ func TestSchedule(t *testing.T) { var _ = Suite(&testReplicationSuite{}) -type testReplicationSuite struct{} +type testReplicationSuite struct { + tc *MockCluster +} + +func (s *testReplicationSuite) SetUpSuite(c *C) { + opt := NewMockSchedulerOptions() + s.tc = NewMockCluster(opt) +} func (s *testReplicationSuite) TestDistinctScore(c *C) { labels := []string{"zone", "rack", "host"} @@ -68,13 +76,13 @@ func (s *testReplicationSuite) TestCompareStoreScore(c *C) { store2 := s.newStoreInfo(2, 1, nil) store3 := s.newStoreInfo(3, 3, nil) - c.Assert(compareStoreScore(store1, 2, store2, 1), Equals, 1) - c.Assert(compareStoreScore(store1, 1, store2, 1), Equals, 0) - c.Assert(compareStoreScore(store1, 1, store2, 2), Equals, -1) + c.Assert(compareStoreScore(s.tc, store1, 2, store2, 1), Equals, 1) + c.Assert(compareStoreScore(s.tc, store1, 1, store2, 1), Equals, 0) + c.Assert(compareStoreScore(s.tc, store1, 1, store2, 2), Equals, -1) - c.Assert(compareStoreScore(store1, 2, store3, 1), Equals, 1) - c.Assert(compareStoreScore(store1, 1, store3, 1), Equals, 1) - c.Assert(compareStoreScore(store1, 1, store3, 2), Equals, -1) + c.Assert(compareStoreScore(s.tc, store1, 2, store3, 1), Equals, 1) + c.Assert(compareStoreScore(s.tc, store1, 1, store3, 1), Equals, 1) + c.Assert(compareStoreScore(s.tc, store1, 1, store3, 2), Equals, -1) } func (s *testReplicationSuite) newStoreInfo(id uint64, regionCount int, labels map[string]string) *core.StoreInfo { @@ -91,5 +99,8 @@ func (s *testReplicationSuite) newStoreInfo(id uint64, regionCount int, labels m }) store.RegionCount = regionCount store.RegionSize = int64(regionCount) * 10 + store.Stats = &pdpb.StoreStats{} + store.Stats.Capacity = uint64(1024) + store.Stats.Available = store.Stats.Capacity return store } diff --git a/server/schedule/scheduler.go b/server/schedule/scheduler.go index 24670b13aa7..f08b3f6e022 100644 --- a/server/schedule/scheduler.go +++ b/server/schedule/scheduler.go @@ -35,7 +35,6 @@ type Cluster interface { GetFollowerStores(region *core.RegionInfo) []*core.StoreInfo GetLeaderStore(region *core.RegionInfo) *core.StoreInfo GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo) - GetStoresAverageScore(kind core.ResourceKind, filters ...Filter) float64 ScanRegions(startKey []byte, limit int) []*core.RegionInfo BlockStore(id uint64) error diff --git a/server/schedule/selector.go b/server/schedule/selector.go index e2e9c76fe19..2bad351a40c 100644 --- a/server/schedule/selector.go +++ b/server/schedule/selector.go @@ -48,7 +48,9 @@ func (s *balanceSelector) SelectSource(opt Options, stores []*core.StoreInfo, fi if FilterSource(opt, store, filters) { continue } - if result == nil || result.ResourceScore(s.kind) < store.ResourceScore(s.kind) { + if result == nil || + result.ResourceScore(s.kind, opt.GetHighSpaceRatio(), opt.GetLowSpaceRatio(), 0) < + store.ResourceScore(s.kind, opt.GetHighSpaceRatio(), opt.GetLowSpaceRatio(), 0) { result = store } } @@ -63,7 +65,9 @@ func (s *balanceSelector) SelectTarget(opt Options, stores []*core.StoreInfo, fi if FilterTarget(opt, store, filters) { continue } - if result == nil || result.ResourceScore(s.kind) > store.ResourceScore(s.kind) { + if result == nil || + result.ResourceScore(s.kind, opt.GetHighSpaceRatio(), opt.GetLowSpaceRatio(), 0) > + store.ResourceScore(s.kind, opt.GetHighSpaceRatio(), opt.GetLowSpaceRatio(), 0) { result = store } } @@ -100,7 +104,7 @@ func (s *replicaSelector) SelectSource(opt Options, stores []*core.StoreInfo, fi continue } score := DistinctScore(s.labels, s.regionStores, store) - if best == nil || compareStoreScore(store, score, best, bestScore) < 0 { + if best == nil || compareStoreScore(opt, store, score, best, bestScore) < 0 { best, bestScore = store, score } } @@ -120,7 +124,7 @@ func (s *replicaSelector) SelectTarget(opt Options, stores []*core.StoreInfo, fi continue } score := DistinctScore(s.labels, s.regionStores, store) - if best == nil || compareStoreScore(store, score, best, bestScore) > 0 { + if best == nil || compareStoreScore(opt, store, score, best, bestScore) > 0 { best, bestScore = store, score } } diff --git a/server/schedulers/adjacent_region.go b/server/schedulers/adjacent_region.go index e20c378d13f..2032ac4f1e4 100644 --- a/server/schedulers/adjacent_region.go +++ b/server/schedulers/adjacent_region.go @@ -88,7 +88,6 @@ func newBalanceAdjacentRegionScheduler(limiter *schedule.Limiter, args ...uint64 schedule.NewStateFilter(), schedule.NewHealthFilter(), schedule.NewSnapshotCountFilter(), - schedule.NewStorageThresholdFilter(), schedule.NewPendingPeerCountFilter(), schedule.NewRejectLeaderFilter(), } diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index 1c8cadce579..8d7963913ff 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -14,6 +14,7 @@ package schedulers import ( + "fmt" "strconv" "github.com/pingcap/pd/server/cache" @@ -148,23 +149,25 @@ func (l *balanceLeaderScheduler) transferLeaderIn(target *core.StoreInfo, cluste } func (l *balanceLeaderScheduler) createOperator(region *core.RegionInfo, source, target *core.StoreInfo, cluster schedule.Cluster, opInfluence schedule.OpInfluence) []*schedule.Operator { - log.Debugf("[%s] verify balance region %d, from: %d, to: %d", l.GetName(), region.GetId(), source.GetId(), target.GetId()) if cluster.IsRegionHot(region.GetId()) { log.Debugf("[%s] region %d is hot region, ignore it", l.GetName(), region.GetId()) schedulerCounter.WithLabelValues(l.GetName(), "region_hot").Inc() return nil } - sourceSize := source.LeaderSize + int64(opInfluence.GetStoreInfluence(source.GetId()).LeaderSize) - targetSize := target.LeaderSize + int64(opInfluence.GetStoreInfluence(target.GetId()).LeaderSize) - regionSize := float64(region.ApproximateSize) * cluster.GetTolerantSizeRatio() - if !shouldBalance(sourceSize, source.LeaderWeight, targetSize, target.LeaderWeight, regionSize) { - log.Debugf("[%s] skip balance region%d, source size: %v, source weight: %v, target size: %v, target weight: %v, region size: %v", l.GetName(), region.GetId(), sourceSize, source.LeaderWeight, targetSize, target.LeaderWeight, region.ApproximateSize) + + if !shouldBalance(cluster, source, target, core.LeaderKind, region, opInfluence) { + log.Debugf(`[%s] skip balance region%d, source size: %v, source score: %v, source influence: %v, + target size: %v, target score: %v, target influence: %v, region size: %v`, l.GetName(), region.GetId(), + source.LeaderSize, source.LeaderScore(0), opInfluence.GetStoreInfluence(source.GetId()).ResourceSize(core.LeaderKind), + target.LeaderSize, target.LeaderScore(0), opInfluence.GetStoreInfluence(target.GetId()).ResourceSize(core.LeaderKind), + region.ApproximateSize) schedulerCounter.WithLabelValues(l.GetName(), "skip").Inc() return nil } + schedulerCounter.WithLabelValues(l.GetName(), "new_operator").Inc() + balanceLeaderCounter.WithLabelValues("move_leader", fmt.Sprintf("store%d-to-store%d", source.GetId(), target.GetId())).Inc() step := schedule.TransferLeader{FromStore: region.Leader.GetStoreId(), ToStore: target.GetId()} - log.Debugf("[%s] start balance region %d, from: %d, to: %d", l.GetName(), region.GetId(), source.GetId(), target.GetId()) - op := schedule.NewOperator("balanceLeader", region.GetId(), schedule.OpBalance|schedule.OpLeader, step) + op := schedule.NewOperator("balance-leader", region.GetId(), schedule.OpBalance|schedule.OpLeader, step) return []*schedule.Operator{op} } diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index c903ef39bd9..3f3fdca73e5 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -14,6 +14,7 @@ package schedulers import ( + "fmt" "strconv" "github.com/pingcap/kvproto/pkg/metapb" @@ -47,7 +48,6 @@ func newBalanceRegionScheduler(limiter *schedule.Limiter) schedule.Scheduler { schedule.NewStateFilter(), schedule.NewHealthFilter(), schedule.NewSnapshotCountFilter(), - schedule.NewStorageThresholdFilter(), schedule.NewPendingPeerCountFilter(), } base := newBaseScheduler(limiter) @@ -137,26 +137,30 @@ func (s *balanceRegionScheduler) transferPeer(cluster schedule.Cluster, region * checker := schedule.NewReplicaChecker(cluster, nil) storeID, _ := checker.SelectBestReplacementStore(region, oldPeer, scoreGuard) if storeID == 0 { - schedulerCounter.WithLabelValues(s.GetName(), "no_store").Inc() + schedulerCounter.WithLabelValues(s.GetName(), "no_replacement").Inc() return nil } target := cluster.GetStore(storeID) log.Debugf("[region %d] source store id is %v, target store id is %v", region.GetId(), source.GetId(), target.GetId()) - sourceSize := source.RegionSize + int64(opInfluence.GetStoreInfluence(source.GetId()).RegionSize) - targetSize := target.RegionSize + int64(opInfluence.GetStoreInfluence(target.GetId()).RegionSize) - regionSize := float64(region.ApproximateSize) * cluster.GetTolerantSizeRatio() - if !shouldBalance(sourceSize, source.RegionWeight, targetSize, target.RegionWeight, regionSize) { - log.Debugf("[%s] skip balance region%d, source size: %v, source weight: %v, target size: %v, target weight: %v, region size: %v", s.GetName(), region.GetId(), sourceSize, source.RegionWeight, targetSize, target.RegionWeight, region.ApproximateSize) + if !shouldBalance(cluster, source, target, core.RegionKind, region, opInfluence) { + log.Debugf(`[%s] skip balance region%d, source size: %v, source score: %v, source influence: %v, + target size: %v, target score: %v, target influence: %v, region size: %v`, s.GetName(), region.GetId(), + source.RegionSize, source.RegionScore(cluster.GetHighSpaceRatio(), cluster.GetLowSpaceRatio(), 0), + opInfluence.GetStoreInfluence(source.GetId()).ResourceSize(core.RegionKind), + target.RegionSize, target.RegionScore(cluster.GetHighSpaceRatio(), cluster.GetLowSpaceRatio(), 0), + opInfluence.GetStoreInfluence(target.GetId()).ResourceSize(core.RegionKind), + region.ApproximateSize) schedulerCounter.WithLabelValues(s.GetName(), "skip").Inc() return nil } + newPeer, err := cluster.AllocPeer(storeID) if err != nil { schedulerCounter.WithLabelValues(s.GetName(), "no_peer").Inc() return nil } - + balanceRegionCounter.WithLabelValues("move_peer", fmt.Sprintf("store%d-to-store%d", source.GetId(), target.GetId())).Inc() return schedule.CreateMovePeerOperator("balance-region", cluster, region, schedule.OpBalance, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId()) } diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index f02439f2fbf..3c1cc80d783 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -24,12 +24,7 @@ import ( "github.com/pingcap/pd/server/schedule" ) -func newTestScheduleConfig() *MockSchedulerOptions { - mso := newMockSchedulerOptions() - return mso -} - -func newTestReplication(mso *MockSchedulerOptions, maxReplicas int, locationLabels ...string) { +func newTestReplication(mso *schedule.MockSchedulerOptions, maxReplicas int, locationLabels ...string) { mso.MaxReplicas = maxReplicas mso.LocationLabels = locationLabels } @@ -41,48 +36,72 @@ type testBalanceSpeedSuite struct{} type testBalanceSpeedCase struct { sourceCount uint64 targetCount uint64 - avgScore float64 regionSize int64 - diff int expectedResult bool } func (s *testBalanceSpeedSuite) TestShouldBalance(c *C) { - testCases := []struct { - sourceSize int64 - sourceWeight float64 - targetSize int64 - targetWeight float64 - moveSize float64 - result bool - }{ - {100, 1, 80, 1, 5, true}, - {100, 1, 80, 1, 15, false}, - {100, 1, 120, 2, 10, true}, - {100, 1, 180, 2, 10, false}, - {100, 0.5, 180, 1, 10, false}, - {100, 0.5, 180, 1, 5, true}, - {100, 1, 10, 0, 10, false}, // targetWeight=0 - {100, 0, 10, 0, 10, false}, - {100, 0, 500, 1, 50, true}, // sourceWeight=0 + tests := []testBalanceSpeedCase{ + // all store capacity is 1024MB + // size = count * 10 + + // target size is zero + {2, 0, 1, true}, + {2, 0, 10, false}, + // all in high space stage + {10, 5, 1, true}, + {10, 5, 20, false}, + {10, 10, 1, false}, + {10, 10, 20, false}, + // all in transition stage + {70, 50, 1, true}, + {70, 50, 50, false}, + {70, 70, 1, false}, + // all in low space stage + {90, 80, 1, true}, + {90, 80, 50, false}, + {90, 90, 1, false}, + // one in high space stage, other in transition stage + {65, 55, 5, true}, + {65, 50, 50, false}, + // one in transition space stage, other in low space stage + {80, 70, 5, true}, + {80, 70, 50, false}, + } + + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) + + for _, t := range tests { + tc.AddLeaderStore(1, int(t.sourceCount)) + tc.AddLeaderStore(2, int(t.targetCount)) + source := tc.GetStore(1) + target := tc.GetStore(2) + region := &core.RegionInfo{ApproximateSize: t.regionSize} + c.Assert(shouldBalance(tc, source, target, core.LeaderKind, region, schedule.NewOpInfluence(nil, tc)), Equals, t.expectedResult) } - for _, t := range testCases { - c.Assert(shouldBalance(t.sourceSize, t.sourceWeight, t.targetSize, t.targetWeight, t.moveSize), Equals, t.result) + for _, t := range tests { + tc.AddRegionStore(1, int(t.sourceCount)) + tc.AddRegionStore(2, int(t.targetCount)) + source := tc.GetStore(1) + target := tc.GetStore(2) + region := &core.RegionInfo{ApproximateSize: t.regionSize} + c.Assert(shouldBalance(tc, source, target, core.RegionKind, region, schedule.NewOpInfluence(nil, tc)), Equals, t.expectedResult) } } func (s *testBalanceSpeedSuite) TestBalanceLimit(c *C) { - opt := newTestScheduleConfig() - tc := newMockCluster(opt) - tc.addLeaderStore(1, 10) - tc.addLeaderStore(2, 20) - tc.addLeaderStore(3, 30) + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) + tc.AddLeaderStore(1, 10) + tc.AddLeaderStore(2, 20) + tc.AddLeaderStore(3, 30) // StandDeviation is sqrt((10^2+0+10^2)/3). c.Assert(adjustBalanceLimit(tc, core.LeaderKind), Equals, uint64(math.Sqrt(200.0/3.0))) - tc.setStoreOffline(1) + tc.SetStoreOffline(1) // StandDeviation is sqrt((5^2+5^2)/2). c.Assert(adjustBalanceLimit(tc, core.LeaderKind), Equals, uint64(math.Sqrt(50.0/2.0))) } @@ -90,13 +109,13 @@ func (s *testBalanceSpeedSuite) TestBalanceLimit(c *C) { var _ = Suite(&testBalanceLeaderSchedulerSuite{}) type testBalanceLeaderSchedulerSuite struct { - tc *mockCluster + tc *schedule.MockCluster lb schedule.Scheduler } func (s *testBalanceLeaderSchedulerSuite) SetUpTest(c *C) { - opt := newTestScheduleConfig() - s.tc = newMockCluster(opt) + opt := schedule.NewMockSchedulerOptions() + s.tc = schedule.NewMockCluster(opt) lb, err := schedule.CreateScheduler("balance-leader", schedule.NewLimiter()) c.Assert(err, IsNil) s.lb = lb @@ -110,33 +129,33 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceLimit(c *C) { // Stores: 1 2 3 4 // Leaders: 1 0 0 0 // Region1: L F F F - s.tc.addLeaderStore(1, 1) - s.tc.addLeaderStore(2, 0) - s.tc.addLeaderStore(3, 0) - s.tc.addLeaderStore(4, 0) - s.tc.addLeaderRegion(1, 1, 2, 3, 4) + s.tc.AddLeaderStore(1, 1) + s.tc.AddLeaderStore(2, 0) + s.tc.AddLeaderStore(3, 0) + s.tc.AddLeaderStore(4, 0) + s.tc.AddLeaderRegion(1, 1, 2, 3, 4) c.Check(s.schedule(nil), IsNil) // Stores: 1 2 3 4 // Leaders: 16 0 0 0 // Region1: L F F F - s.tc.updateLeaderCount(1, 16) + s.tc.UpdateLeaderCount(1, 16) c.Check(s.schedule(nil), NotNil) // Stores: 1 2 3 4 // Leaders: 7 8 9 10 // Region1: F F F L - s.tc.updateLeaderCount(1, 7) - s.tc.updateLeaderCount(2, 8) - s.tc.updateLeaderCount(3, 9) - s.tc.updateLeaderCount(4, 10) - s.tc.addLeaderRegion(1, 4, 1, 2, 3) + s.tc.UpdateLeaderCount(1, 7) + s.tc.UpdateLeaderCount(2, 8) + s.tc.UpdateLeaderCount(3, 9) + s.tc.UpdateLeaderCount(4, 10) + s.tc.AddLeaderRegion(1, 4, 1, 2, 3) c.Check(s.schedule(nil), IsNil) // Stores: 1 2 3 4 // Leaders: 7 8 9 16 // Region1: F F F L - s.tc.updateLeaderCount(4, 16) + s.tc.UpdateLeaderCount(4, 16) c.Check(s.schedule(nil), NotNil) } @@ -144,11 +163,11 @@ func (s *testBalanceLeaderSchedulerSuite) TestScheduleWithOpInfluence(c *C) { // Stores: 1 2 3 4 // Leaders: 7 8 9 14 // Region1: F F F L - s.tc.addLeaderStore(1, 7) - s.tc.addLeaderStore(2, 8) - s.tc.addLeaderStore(3, 9) - s.tc.addLeaderStore(4, 14) - s.tc.addLeaderRegion(1, 4, 1, 2, 3) + s.tc.AddLeaderStore(1, 7) + s.tc.AddLeaderStore(2, 8) + s.tc.AddLeaderStore(3, 9) + s.tc.AddLeaderStore(4, 14) + s.tc.AddLeaderRegion(1, 4, 1, 2, 3) op := s.schedule(nil)[0] c.Check(op, NotNil) // After considering the scheduled operator, leaders of store1 and store4 are 8 @@ -159,11 +178,11 @@ func (s *testBalanceLeaderSchedulerSuite) TestScheduleWithOpInfluence(c *C) { // Stores: 1 2 3 4 // Leaders: 8 8 9 13 // Region1: F F F L - s.tc.updateLeaderCount(1, 8) - s.tc.updateLeaderCount(2, 8) - s.tc.updateLeaderCount(3, 9) - s.tc.updateLeaderCount(4, 13) - s.tc.addLeaderRegion(1, 4, 1, 2, 3) + s.tc.UpdateLeaderCount(1, 8) + s.tc.UpdateLeaderCount(2, 8) + s.tc.UpdateLeaderCount(3, 9) + s.tc.UpdateLeaderCount(4, 13) + s.tc.AddLeaderRegion(1, 4, 1, 2, 3) c.Check(s.schedule(nil), IsNil) } @@ -171,27 +190,27 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceFilter(c *C) { // Stores: 1 2 3 4 // Leaders: 1 2 3 16 // Region1: F F F L - s.tc.addLeaderStore(1, 1) - s.tc.addLeaderStore(2, 2) - s.tc.addLeaderStore(3, 3) - s.tc.addLeaderStore(4, 16) - s.tc.addLeaderRegion(1, 4, 1, 2, 3) + s.tc.AddLeaderStore(1, 1) + s.tc.AddLeaderStore(2, 2) + s.tc.AddLeaderStore(3, 3) + s.tc.AddLeaderStore(4, 16) + s.tc.AddLeaderRegion(1, 4, 1, 2, 3) CheckTransferLeader(c, s.schedule(nil)[0], schedule.OpBalance, 4, 1) // Test stateFilter. // if store 4 is offline, we schould consider it // because it still provides services - s.tc.setStoreOffline(4) + s.tc.SetStoreOffline(4) CheckTransferLeader(c, s.schedule(nil)[0], schedule.OpBalance, 4, 1) // If store 1 is down, it will be filtered, // store 2 becomes the store with least leaders. - s.tc.setStoreDown(1) + s.tc.SetStoreDown(1) CheckTransferLeader(c, s.schedule(nil)[0], schedule.OpBalance, 4, 2) // Test healthFilter. // If store 2 is busy, it will be filtered, // store 3 becomes the store with least leaders. - s.tc.setStoreBusy(2, true) + s.tc.SetStoreBusy(2, true) CheckTransferLeader(c, s.schedule(nil)[0], schedule.OpBalance, 4, 3) } @@ -201,17 +220,17 @@ func (s *testBalanceLeaderSchedulerSuite) TestLeaderWeight(c *C) { // Weight: 0.5 0.9 1 2 // Region1: L F F F - s.tc.addLeaderStore(1, 10) - s.tc.addLeaderStore(2, 10) - s.tc.addLeaderStore(3, 10) - s.tc.addLeaderStore(4, 10) - s.tc.updateStoreLeaderWeight(1, 0.5) - s.tc.updateStoreLeaderWeight(2, 0.9) - s.tc.updateStoreLeaderWeight(3, 1) - s.tc.updateStoreLeaderWeight(4, 2) - s.tc.addLeaderRegion(1, 1, 2, 3, 4) + s.tc.AddLeaderStore(1, 10) + s.tc.AddLeaderStore(2, 10) + s.tc.AddLeaderStore(3, 10) + s.tc.AddLeaderStore(4, 10) + s.tc.UpdateStoreLeaderWeight(1, 0.5) + s.tc.UpdateStoreLeaderWeight(2, 0.9) + s.tc.UpdateStoreLeaderWeight(3, 1) + s.tc.UpdateStoreLeaderWeight(4, 2) + s.tc.AddLeaderRegion(1, 1, 2, 3, 4) CheckTransferLeader(c, s.schedule(nil)[0], schedule.OpBalance, 1, 4) - s.tc.updateLeaderCount(4, 30) + s.tc.UpdateLeaderCount(4, 30) CheckTransferLeader(c, s.schedule(nil)[0], schedule.OpBalance, 1, 3) } @@ -220,12 +239,12 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelector(c *C) { // Leaders: 1 2 3 16 // Region1: - F F L // Region2: F F L - - s.tc.addLeaderStore(1, 1) - s.tc.addLeaderStore(2, 2) - s.tc.addLeaderStore(3, 3) - s.tc.addLeaderStore(4, 16) - s.tc.addLeaderRegion(1, 4, 2, 3) - s.tc.addLeaderRegion(2, 3, 1, 2) + s.tc.AddLeaderStore(1, 1) + s.tc.AddLeaderStore(2, 2) + s.tc.AddLeaderStore(3, 3) + s.tc.AddLeaderStore(4, 16) + s.tc.AddLeaderRegion(1, 4, 2, 3) + s.tc.AddLeaderRegion(2, 3, 1, 2) // store4 has max leader score, store1 has min leader score. // The scheduler try to move a leader out of 16 first. CheckTransferLeader(c, s.schedule(nil)[0], schedule.OpBalance, 4, 2) @@ -234,8 +253,8 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelector(c *C) { // Leaders: 1 14 15 16 // Region1: - F F L // Region2: F F L - - s.tc.updateLeaderCount(2, 14) - s.tc.updateLeaderCount(3, 15) + s.tc.UpdateLeaderCount(2, 14) + s.tc.UpdateLeaderCount(3, 15) // Cannot move leader out of store4, move a leader into store1. CheckTransferLeader(c, s.schedule(nil)[0], schedule.OpBalance, 3, 1) @@ -243,9 +262,9 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelector(c *C) { // Leaders: 1 2 15 16 // Region1: - F L F // Region2: L F F - - s.tc.addLeaderStore(2, 2) - s.tc.addLeaderRegion(1, 3, 2, 4) - s.tc.addLeaderRegion(2, 1, 2, 3) + s.tc.AddLeaderStore(2, 2) + s.tc.AddLeaderRegion(1, 3, 2, 4) + s.tc.AddLeaderRegion(2, 1, 2, 3) // No leader in store16, no follower in store1. No operator is created. c.Assert(s.schedule(nil), IsNil) // store4 and store1 are marked taint. @@ -256,12 +275,12 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelector(c *C) { // Leaders: 9 10 10 11 // Region1: - F F L // Region2: L F F - - s.tc.addLeaderStore(1, 10) - s.tc.addLeaderStore(2, 10) - s.tc.addLeaderStore(3, 10) - s.tc.addLeaderStore(4, 10) - s.tc.addLeaderRegion(1, 4, 2, 3) - s.tc.addLeaderRegion(2, 1, 2, 3) + s.tc.AddLeaderStore(1, 10) + s.tc.AddLeaderStore(2, 10) + s.tc.AddLeaderStore(3, 10) + s.tc.AddLeaderStore(4, 10) + s.tc.AddLeaderRegion(1, 4, 2, 3) + s.tc.AddLeaderRegion(2, 1, 2, 3) // The cluster is balanced. c.Assert(s.schedule(nil), IsNil) // store1, store4 are marked taint. c.Assert(s.schedule(nil), IsNil) // store2, store3 are marked taint. @@ -271,10 +290,10 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelector(c *C) { // Leaders: 11 13 0 16 // Region1: - F F L // Region2: L F F - - s.tc.addLeaderStore(1, 11) - s.tc.addLeaderStore(2, 13) - s.tc.addLeaderStore(3, 0) - s.tc.addLeaderStore(4, 16) + s.tc.AddLeaderStore(1, 11) + s.tc.AddLeaderStore(2, 13) + s.tc.AddLeaderStore(3, 0) + s.tc.AddLeaderStore(4, 16) c.Assert(s.schedule(nil), IsNil) // All stores are marked taint. CheckTransferLeader(c, s.schedule(nil)[0], schedule.OpBalance, 4, 3) // The taint store will be clear. } @@ -284,8 +303,8 @@ var _ = Suite(&testBalanceRegionSchedulerSuite{}) type testBalanceRegionSchedulerSuite struct{} func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) { - opt := newTestScheduleConfig() - tc := newMockCluster(opt) + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) sb, err := schedule.CreateScheduler("balance-region", schedule.NewLimiter()) c.Assert(err, IsNil) @@ -294,17 +313,17 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) { opt.SetMaxReplicas(1) // Add stores 1,2,3,4. - tc.addRegionStore(1, 6) - tc.addRegionStore(2, 8) - tc.addRegionStore(3, 8) - tc.addRegionStore(4, 16) + tc.AddRegionStore(1, 6) + tc.AddRegionStore(2, 8) + tc.AddRegionStore(3, 8) + tc.AddRegionStore(4, 16) // Add region 1 with leader in store 4. - tc.addLeaderRegion(1, 4) + tc.AddLeaderRegion(1, 4) CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 4, 1) // Test stateFilter. - tc.setStoreOffline(1) - tc.updateRegionCount(2, 6) + tc.SetStoreOffline(1) + tc.UpdateRegionCount(2, 6) cache.Remove(4) // When store 1 is offline, it will be filtered, // store 2 becomes the store with least regions. @@ -318,8 +337,8 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) { } func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) { - opt := newTestScheduleConfig() - tc := newMockCluster(opt) + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) newTestReplication(opt, 3, "zone", "rack", "host") @@ -328,113 +347,113 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) { cache := sb.(*balanceRegionScheduler).taintStores // Store 1 has the largest region score, so the balancer try to replace peer in store 1. - tc.addLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) - tc.addLabelsStore(2, 15, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) - tc.addLabelsStore(3, 14, map[string]string{"zone": "z1", "rack": "r2", "host": "h2"}) + tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(2, 15, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) + tc.AddLabelsStore(3, 14, map[string]string{"zone": "z1", "rack": "r2", "host": "h2"}) - tc.addLeaderRegion(1, 1, 2, 3) + tc.AddLeaderRegion(1, 1, 2, 3) // This schedule try to replace peer in store 1, but we have no other stores, // so store 1 will be set in the cache and skipped next schedule. c.Assert(sb.Schedule(tc, schedule.NewOpInfluence(nil, tc)), IsNil) c.Assert(cache.Exists(1), IsTrue) // Store 4 has smaller region score than store 2. - tc.addLabelsStore(4, 2, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) + tc.AddLabelsStore(4, 2, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 2, 4) // Store 5 has smaller region score than store 1. - tc.addLabelsStore(5, 2, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(5, 2, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) cache.Remove(1) // Delete store 1 from cache, or it will be skipped. CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 1, 5) // Store 6 has smaller region score than store 5. - tc.addLabelsStore(6, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(6, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 1, 6) // Store 7 has smaller region score with store 6. - tc.addLabelsStore(7, 0, map[string]string{"zone": "z1", "rack": "r1", "host": "h2"}) + tc.AddLabelsStore(7, 0, map[string]string{"zone": "z1", "rack": "r1", "host": "h2"}) CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 1, 7) // If store 7 is not available, will choose store 6. - tc.setStoreDown(7) + tc.SetStoreDown(7) CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 1, 6) // Store 8 has smaller region score than store 7, but the distinct score decrease. - tc.addLabelsStore(8, 1, map[string]string{"zone": "z1", "rack": "r2", "host": "h3"}) + tc.AddLabelsStore(8, 1, map[string]string{"zone": "z1", "rack": "r2", "host": "h3"}) CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 1, 6) // Take down 4,5,6,7 - tc.setStoreDown(4) - tc.setStoreDown(5) - tc.setStoreDown(6) - tc.setStoreDown(7) + tc.SetStoreDown(4) + tc.SetStoreDown(5) + tc.SetStoreDown(6) + tc.SetStoreDown(7) c.Assert(sb.Schedule(tc, schedule.NewOpInfluence(nil, tc)), IsNil) c.Assert(cache.Exists(1), IsTrue) cache.Remove(1) // Store 9 has different zone with other stores but larger region score than store 1. - tc.addLabelsStore(9, 20, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(9, 20, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) c.Assert(sb.Schedule(tc, schedule.NewOpInfluence(nil, tc)), IsNil) } func (s *testBalanceRegionSchedulerSuite) TestReplicas5(c *C) { - opt := newTestScheduleConfig() - tc := newMockCluster(opt) + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) newTestReplication(opt, 5, "zone", "rack", "host") sb, err := schedule.CreateScheduler("balance-region", schedule.NewLimiter()) c.Assert(err, IsNil) - tc.addLabelsStore(1, 4, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) - tc.addLabelsStore(2, 5, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) - tc.addLabelsStore(3, 6, map[string]string{"zone": "z3", "rack": "r1", "host": "h1"}) - tc.addLabelsStore(4, 7, map[string]string{"zone": "z4", "rack": "r1", "host": "h1"}) - tc.addLabelsStore(5, 28, map[string]string{"zone": "z5", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(1, 4, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(2, 5, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(3, 6, map[string]string{"zone": "z3", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(4, 7, map[string]string{"zone": "z4", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(5, 28, map[string]string{"zone": "z5", "rack": "r1", "host": "h1"}) - tc.addLeaderRegion(1, 1, 2, 3, 4, 5) + tc.AddLeaderRegion(1, 1, 2, 3, 4, 5) // Store 6 has smaller region score. - tc.addLabelsStore(6, 1, map[string]string{"zone": "z5", "rack": "r2", "host": "h1"}) + tc.AddLabelsStore(6, 1, map[string]string{"zone": "z5", "rack": "r2", "host": "h1"}) CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 5, 6) // Store 7 has larger region score and same distinct score with store 6. - tc.addLabelsStore(7, 5, map[string]string{"zone": "z6", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(7, 5, map[string]string{"zone": "z6", "rack": "r1", "host": "h1"}) CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 5, 6) // Store 1 has smaller region score and higher distinct score. - tc.addLeaderRegion(1, 2, 3, 4, 5, 6) + tc.AddLeaderRegion(1, 2, 3, 4, 5, 6) CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 5, 1) // Store 6 has smaller region score and higher distinct score. - tc.addLabelsStore(11, 29, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) - tc.addLabelsStore(12, 8, map[string]string{"zone": "z2", "rack": "r2", "host": "h1"}) - tc.addLabelsStore(13, 7, map[string]string{"zone": "z3", "rack": "r2", "host": "h1"}) - tc.addLeaderRegion(1, 2, 3, 11, 12, 13) + tc.AddLabelsStore(11, 29, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) + tc.AddLabelsStore(12, 8, map[string]string{"zone": "z2", "rack": "r2", "host": "h1"}) + tc.AddLabelsStore(13, 7, map[string]string{"zone": "z3", "rack": "r2", "host": "h1"}) + tc.AddLeaderRegion(1, 2, 3, 11, 12, 13) CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 11, 6) } func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) { - opt := newTestScheduleConfig() - tc := newMockCluster(opt) + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) sb, err := schedule.CreateScheduler("balance-region", schedule.NewLimiter()) c.Assert(err, IsNil) opt.SetMaxReplicas(1) - tc.addRegionStore(1, 10) - tc.addRegionStore(2, 10) - tc.addRegionStore(3, 10) - tc.addRegionStore(4, 10) - tc.updateStoreRegionWeight(1, 0.5) - tc.updateStoreRegionWeight(2, 0.9) - tc.updateStoreRegionWeight(3, 1.0) - tc.updateStoreRegionWeight(4, 2.0) + tc.AddRegionStore(1, 10) + tc.AddRegionStore(2, 10) + tc.AddRegionStore(3, 10) + tc.AddRegionStore(4, 10) + tc.UpdateStoreRegionWeight(1, 0.5) + tc.UpdateStoreRegionWeight(2, 0.9) + tc.UpdateStoreRegionWeight(3, 1.0) + tc.UpdateStoreRegionWeight(4, 2.0) - tc.addLeaderRegion(1, 1) + tc.AddLeaderRegion(1, 1) CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 1, 4) - tc.updateRegionCount(4, 30) + tc.UpdateRegionCount(4, 30) CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 1, 3) } @@ -443,20 +462,20 @@ var _ = Suite(&testReplicaCheckerSuite{}) type testReplicaCheckerSuite struct{} func (s *testReplicaCheckerSuite) TestBasic(c *C) { - opt := newTestScheduleConfig() - tc := newMockCluster(opt) + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) rc := schedule.NewReplicaChecker(tc, namespace.DefaultClassifier) opt.MaxSnapshotCount = 2 // Add stores 1,2,3,4. - tc.addRegionStore(1, 4) - tc.addRegionStore(2, 3) - tc.addRegionStore(3, 2) - tc.addRegionStore(4, 1) + tc.AddRegionStore(1, 4) + tc.AddRegionStore(2, 3) + tc.AddRegionStore(3, 2) + tc.AddRegionStore(4, 1) // Add region 1 with leader in store 1 and follower in store 2. - tc.addLeaderRegion(1, 1, 2) + tc.AddLeaderRegion(1, 1, 2) // Region has 2 peers, we need to add a new peer. region := tc.GetRegion(1) @@ -464,27 +483,17 @@ func (s *testReplicaCheckerSuite) TestBasic(c *C) { // Test healthFilter. // If store 4 is down, we add to store 3. - tc.setStoreDown(4) + tc.SetStoreDown(4) CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 3) - tc.setStoreUp(4) + tc.SetStoreUp(4) CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 4) // Test snapshotCountFilter. // If snapshotCount > MaxSnapshotCount, we add to store 3. - tc.updateSnapshotCount(4, 3) + tc.UpdateSnapshotCount(4, 3) CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 3) // If snapshotCount < MaxSnapshotCount, we can add peer again. - tc.updateSnapshotCount(4, 1) - CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 4) - - // Test storageThresholdFilter. - // If availableRatio < storageAvailableRatioThreshold(0.2), we can not add peer. - tc.updateStorageRatio(4, 0.9, 0.1) - CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 3) - tc.updateStorageRatio(4, 0.5, 0.1) - CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 3) - // If availableRatio > storageAvailableRatioThreshold(0.2), we can add peer again. - tc.updateStorageRatio(4, 0.7, 0.3) + tc.UpdateSnapshotCount(4, 1) CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 4) // Add peer in store 4, and we have enough replicas. @@ -499,7 +508,7 @@ func (s *testReplicaCheckerSuite) TestBasic(c *C) { region.RemoveStorePeer(1) // Peer in store 2 is down, remove it. - tc.setStoreDown(2) + tc.SetStoreDown(2) downPeer := &pdpb.PeerStats{ Peer: region.GetStorePeer(2), DownSeconds: 24 * 60 * 60, @@ -510,42 +519,42 @@ func (s *testReplicaCheckerSuite) TestBasic(c *C) { c.Assert(rc.Check(region), IsNil) // Peer in store 3 is offline, transfer peer to store 1. - tc.setStoreOffline(3) + tc.SetStoreOffline(3) CheckTransferPeer(c, rc.Check(region), schedule.OpReplica, 3, 1) } func (s *testReplicaCheckerSuite) TestLostStore(c *C) { - opt := newTestScheduleConfig() - tc := newMockCluster(opt) + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) - tc.addRegionStore(1, 1) - tc.addRegionStore(2, 1) + tc.AddRegionStore(1, 1) + tc.AddRegionStore(2, 1) rc := schedule.NewReplicaChecker(tc, namespace.DefaultClassifier) // now region peer in store 1,2,3.but we just have store 1,2 // This happens only in recovering the PD tc // should not panic - tc.addLeaderRegion(1, 1, 2, 3) + tc.AddLeaderRegion(1, 1, 2, 3) region := tc.GetRegion(1) op := rc.Check(region) c.Assert(op, IsNil) } func (s *testReplicaCheckerSuite) TestOffline(c *C) { - opt := newTestScheduleConfig() - tc := newMockCluster(opt) + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) newTestReplication(opt, 3, "zone", "rack", "host") rc := schedule.NewReplicaChecker(tc, namespace.DefaultClassifier) - tc.addLabelsStore(1, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) - tc.addLabelsStore(2, 2, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) - tc.addLabelsStore(3, 3, map[string]string{"zone": "z3", "rack": "r1", "host": "h1"}) - tc.addLabelsStore(4, 4, map[string]string{"zone": "z3", "rack": "r2", "host": "h1"}) + tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(2, 2, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(3, 3, map[string]string{"zone": "z3", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(4, 4, map[string]string{"zone": "z3", "rack": "r2", "host": "h1"}) - tc.addLeaderRegion(1, 1) + tc.AddLeaderRegion(1, 1) region := tc.GetRegion(1) // Store 2 has different zone and smallest region score. @@ -564,15 +573,15 @@ func (s *testReplicaCheckerSuite) TestOffline(c *C) { checkRemovePeer(c, rc.Check(region), 4) // Test healthFilter. - tc.setStoreBusy(4, true) + tc.SetStoreBusy(4, true) c.Assert(rc.Check(region), IsNil) - tc.setStoreBusy(4, false) + tc.SetStoreBusy(4, false) checkRemovePeer(c, rc.Check(region), 4) // Test offline // the number of region peers more than the maxReplicas // remove the peer - tc.setStoreOffline(3) + tc.SetStoreOffline(3) checkRemovePeer(c, rc.Check(region), 3) region.RemoveStorePeer(4) // the number of region peers equals the maxReplicas @@ -580,57 +589,57 @@ func (s *testReplicaCheckerSuite) TestOffline(c *C) { CheckTransferPeer(c, rc.Check(region), schedule.OpReplica, 3, 4) // Store 5 has a same label score with store 4,but the region score smaller than store 4, we will choose store 5. - tc.addLabelsStore(5, 3, map[string]string{"zone": "z4", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(5, 3, map[string]string{"zone": "z4", "rack": "r1", "host": "h1"}) CheckTransferPeer(c, rc.Check(region), schedule.OpReplica, 3, 5) // Store 5 has too many snapshots, choose store 4 - tc.updateSnapshotCount(5, 10) + tc.UpdateSnapshotCount(5, 10) CheckTransferPeer(c, rc.Check(region), schedule.OpReplica, 3, 4) - tc.updatePendingPeerCount(4, 30) + tc.UpdatePendingPeerCount(4, 30) c.Assert(rc.Check(region), IsNil) } func (s *testReplicaCheckerSuite) TestDistinctScore(c *C) { - opt := newTestScheduleConfig() - tc := newMockCluster(opt) + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) newTestReplication(opt, 3, "zone", "rack", "host") rc := schedule.NewReplicaChecker(tc, namespace.DefaultClassifier) - tc.addLabelsStore(1, 9, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) - tc.addLabelsStore(2, 8, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(1, 9, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(2, 8, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) // We need 3 replicas. - tc.addLeaderRegion(1, 1) + tc.AddLeaderRegion(1, 1) region := tc.GetRegion(1) CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 2) peer2, _ := tc.AllocPeer(2) region.AddPeer(peer2) // Store 1,2,3 have the same zone, rack, and host. - tc.addLabelsStore(3, 5, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(3, 5, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 3) // Store 4 has smaller region score. - tc.addLabelsStore(4, 4, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(4, 4, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 4) // Store 5 has a different host. - tc.addLabelsStore(5, 5, map[string]string{"zone": "z1", "rack": "r1", "host": "h2"}) + tc.AddLabelsStore(5, 5, map[string]string{"zone": "z1", "rack": "r1", "host": "h2"}) CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 5) // Store 6 has a different rack. - tc.addLabelsStore(6, 6, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) + tc.AddLabelsStore(6, 6, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 6) // Store 7 has a different zone. - tc.addLabelsStore(7, 7, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(7, 7, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 7) // Test stateFilter. - tc.setStoreOffline(7) + tc.SetStoreOffline(7) CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 6) - tc.setStoreUp(7) + tc.SetStoreUp(7) CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 7) // Add peer to store 7. @@ -648,18 +657,13 @@ func (s *testReplicaCheckerSuite) TestDistinctScore(c *C) { // Store 8 has the same zone and different rack with store 7. // Store 1 has the same zone and different rack with store 6. // So store 8 and store 1 are equivalent. - tc.addLabelsStore(8, 1, map[string]string{"zone": "z2", "rack": "r2", "host": "h1"}) - c.Assert(rc.Check(region), IsNil) - - // Store 9 has a different zone, but it is almost full. - tc.addLabelsStore(9, 1, map[string]string{"zone": "z3", "rack": "r1", "host": "h1"}) - tc.updateStorageRatio(9, 0.9, 0.1) + tc.AddLabelsStore(8, 1, map[string]string{"zone": "z2", "rack": "r2", "host": "h1"}) c.Assert(rc.Check(region), IsNil) // Store 10 has a different zone. // Store 2 and 6 have the same distinct score, but store 2 has larger region score. // So replace peer in store 2 with store 10. - tc.addLabelsStore(10, 1, map[string]string{"zone": "z3", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(10, 1, map[string]string{"zone": "z3", "rack": "r1", "host": "h1"}) CheckTransferPeer(c, rc.Check(region), schedule.OpReplica, 2, 10) peer10, _ := tc.AllocPeer(10) region.AddPeer(peer10) @@ -669,21 +673,21 @@ func (s *testReplicaCheckerSuite) TestDistinctScore(c *C) { } func (s *testReplicaCheckerSuite) TestDistinctScore2(c *C) { - opt := newTestScheduleConfig() - tc := newMockCluster(opt) + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) newTestReplication(opt, 5, "zone", "host") rc := schedule.NewReplicaChecker(tc, namespace.DefaultClassifier) - tc.addLabelsStore(1, 1, map[string]string{"zone": "z1", "host": "h1"}) - tc.addLabelsStore(2, 1, map[string]string{"zone": "z1", "host": "h2"}) - tc.addLabelsStore(3, 1, map[string]string{"zone": "z1", "host": "h3"}) - tc.addLabelsStore(4, 1, map[string]string{"zone": "z2", "host": "h1"}) - tc.addLabelsStore(5, 1, map[string]string{"zone": "z2", "host": "h2"}) - tc.addLabelsStore(6, 1, map[string]string{"zone": "z3", "host": "h1"}) + tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "host": "h1"}) + tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "host": "h2"}) + tc.AddLabelsStore(3, 1, map[string]string{"zone": "z1", "host": "h3"}) + tc.AddLabelsStore(4, 1, map[string]string{"zone": "z2", "host": "h1"}) + tc.AddLabelsStore(5, 1, map[string]string{"zone": "z2", "host": "h2"}) + tc.AddLabelsStore(6, 1, map[string]string{"zone": "z3", "host": "h1"}) - tc.addLeaderRegion(1, 1, 2, 4) + tc.AddLeaderRegion(1, 1, 2, 4) region := tc.GetRegion(1) CheckAddPeer(c, rc.Check(region), schedule.OpReplica, 6) @@ -700,15 +704,15 @@ func (s *testReplicaCheckerSuite) TestDistinctScore2(c *C) { var _ = Suite(&testMergeCheckerSuite{}) type testMergeCheckerSuite struct { - cluster *mockCluster + cluster *schedule.MockCluster mc *schedule.MergeChecker regions []*core.RegionInfo } func (s *testMergeCheckerSuite) SetUpSuite(c *C) { - cfg := newTestScheduleConfig() + cfg := schedule.NewMockSchedulerOptions() cfg.MaxMergeRegionSize = 2 - s.cluster = newMockCluster(cfg) + s.cluster = schedule.NewMockCluster(cfg) s.regions = []*core.RegionInfo{ { Region: &metapb.Region{ @@ -868,30 +872,30 @@ var _ = Suite(&testBalanceHotWriteRegionSchedulerSuite{}) type testBalanceHotWriteRegionSchedulerSuite struct{} func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) { - opt := newTestScheduleConfig() + opt := schedule.NewMockSchedulerOptions() newTestReplication(opt, 3, "zone", "host") - tc := newMockCluster(opt) + tc := schedule.NewMockCluster(opt) hb, err := schedule.CreateScheduler("hot-write-region", schedule.NewLimiter()) c.Assert(err, IsNil) // Add stores 1, 2, 3, 4, 5, 6 with region counts 3, 2, 2, 2, 0, 0. - tc.addLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"}) - tc.addLabelsStore(2, 2, map[string]string{"zone": "z2", "host": "h2"}) - tc.addLabelsStore(3, 2, map[string]string{"zone": "z3", "host": "h3"}) - tc.addLabelsStore(4, 2, map[string]string{"zone": "z4", "host": "h4"}) - tc.addLabelsStore(5, 0, map[string]string{"zone": "z2", "host": "h5"}) - tc.addLabelsStore(6, 0, map[string]string{"zone": "z5", "host": "h6"}) - tc.addLabelsStore(7, 0, map[string]string{"zone": "z5", "host": "h7"}) - tc.setStoreDown(7) + tc.AddLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"}) + tc.AddLabelsStore(2, 2, map[string]string{"zone": "z2", "host": "h2"}) + tc.AddLabelsStore(3, 2, map[string]string{"zone": "z3", "host": "h3"}) + tc.AddLabelsStore(4, 2, map[string]string{"zone": "z4", "host": "h4"}) + tc.AddLabelsStore(5, 0, map[string]string{"zone": "z2", "host": "h5"}) + tc.AddLabelsStore(6, 0, map[string]string{"zone": "z5", "host": "h6"}) + tc.AddLabelsStore(7, 0, map[string]string{"zone": "z5", "host": "h7"}) + tc.SetStoreDown(7) // Report store written bytes. - tc.updateStorageWrittenBytes(1, 75*1024*1024) - tc.updateStorageWrittenBytes(2, 45*1024*1024) - tc.updateStorageWrittenBytes(3, 45*1024*1024) - tc.updateStorageWrittenBytes(4, 60*1024*1024) - tc.updateStorageWrittenBytes(5, 0) - tc.updateStorageWrittenBytes(6, 0) + tc.UpdateStorageWrittenBytes(1, 75*1024*1024) + tc.UpdateStorageWrittenBytes(2, 45*1024*1024) + tc.UpdateStorageWrittenBytes(3, 45*1024*1024) + tc.UpdateStorageWrittenBytes(4, 60*1024*1024) + tc.UpdateStorageWrittenBytes(5, 0) + tc.UpdateStorageWrittenBytes(6, 0) // Region 1, 2 and 3 are hot regions. //| region_id | leader_sotre | follower_store | follower_store | written_bytes | @@ -899,9 +903,9 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) { //| 1 | 1 | 2 | 3 | 512KB | //| 2 | 1 | 3 | 4 | 512KB | //| 3 | 1 | 2 | 4 | 512KB | - tc.addLeaderRegionWithWriteInfo(1, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) - tc.addLeaderRegionWithWriteInfo(2, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 3, 4) - tc.addLeaderRegionWithWriteInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 4) + tc.AddLeaderRegionWithWriteInfo(1, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithWriteInfo(2, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 3, 4) + tc.AddLeaderRegionWithWriteInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 4) opt.HotRegionLowThreshold = 0 // Will transfer a hot region from store 1, because the total count of peers @@ -930,17 +934,17 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) { //| 3 | 6 | 2 | 4 | 512KB | //| 4 | 5 | 6 | 1 | 512KB | //| 5 | 3 | 4 | 5 | 512KB | - tc.updateStorageWrittenBytes(1, 60*1024*1024) - tc.updateStorageWrittenBytes(2, 30*1024*1024) - tc.updateStorageWrittenBytes(3, 60*1024*1024) - tc.updateStorageWrittenBytes(4, 30*1024*1024) - tc.updateStorageWrittenBytes(5, 0*1024*1024) - tc.updateStorageWrittenBytes(6, 30*1024*1024) - tc.addLeaderRegionWithWriteInfo(1, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) - tc.addLeaderRegionWithWriteInfo(2, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) - tc.addLeaderRegionWithWriteInfo(3, 6, 512*1024*schedule.RegionHeartBeatReportInterval, 1, 4) - tc.addLeaderRegionWithWriteInfo(4, 5, 512*1024*schedule.RegionHeartBeatReportInterval, 6, 4) - tc.addLeaderRegionWithWriteInfo(5, 3, 512*1024*schedule.RegionHeartBeatReportInterval, 4, 5) + tc.UpdateStorageWrittenBytes(1, 60*1024*1024) + tc.UpdateStorageWrittenBytes(2, 30*1024*1024) + tc.UpdateStorageWrittenBytes(3, 60*1024*1024) + tc.UpdateStorageWrittenBytes(4, 30*1024*1024) + tc.UpdateStorageWrittenBytes(5, 0*1024*1024) + tc.UpdateStorageWrittenBytes(6, 30*1024*1024) + tc.AddLeaderRegionWithWriteInfo(1, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithWriteInfo(2, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithWriteInfo(3, 6, 512*1024*schedule.RegionHeartBeatReportInterval, 1, 4) + tc.AddLeaderRegionWithWriteInfo(4, 5, 512*1024*schedule.RegionHeartBeatReportInterval, 6, 4) + tc.AddLeaderRegionWithWriteInfo(5, 3, 512*1024*schedule.RegionHeartBeatReportInterval, 4, 5) // We can find that the leader of all hot regions are on store 1, // so one of the leader will transfer to another store. checkTransferLeaderFrom(c, hb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpHotRegion, 1) @@ -957,24 +961,24 @@ var _ = Suite(&testBalanceHotReadRegionSchedulerSuite{}) type testBalanceHotReadRegionSchedulerSuite struct{} func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) { - opt := newTestScheduleConfig() - tc := newMockCluster(opt) + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) hb, err := schedule.CreateScheduler("hot-read-region", schedule.NewLimiter()) c.Assert(err, IsNil) // Add stores 1, 2, 3, 4, 5 with region counts 3, 2, 2, 2, 0. - tc.addRegionStore(1, 3) - tc.addRegionStore(2, 2) - tc.addRegionStore(3, 2) - tc.addRegionStore(4, 2) - tc.addRegionStore(5, 0) + tc.AddRegionStore(1, 3) + tc.AddRegionStore(2, 2) + tc.AddRegionStore(3, 2) + tc.AddRegionStore(4, 2) + tc.AddRegionStore(5, 0) // Report store read bytes. - tc.updateStorageReadBytes(1, 75*1024*1024) - tc.updateStorageReadBytes(2, 45*1024*1024) - tc.updateStorageReadBytes(3, 45*1024*1024) - tc.updateStorageReadBytes(4, 60*1024*1024) - tc.updateStorageReadBytes(5, 0) + tc.UpdateStorageReadBytes(1, 75*1024*1024) + tc.UpdateStorageReadBytes(2, 45*1024*1024) + tc.UpdateStorageReadBytes(3, 45*1024*1024) + tc.UpdateStorageReadBytes(4, 60*1024*1024) + tc.UpdateStorageReadBytes(5, 0) // Region 1, 2 and 3 are hot regions. //| region_id | leader_sotre | follower_store | follower_store | read_bytes | @@ -982,11 +986,11 @@ func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) { //| 1 | 1 | 2 | 3 | 512KB | //| 2 | 2 | 1 | 3 | 512KB | //| 3 | 1 | 2 | 3 | 512KB | - tc.addLeaderRegionWithReadInfo(1, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) - tc.addLeaderRegionWithReadInfo(2, 2, 512*1024*schedule.RegionHeartBeatReportInterval, 1, 3) - tc.addLeaderRegionWithReadInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(1, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(2, 2, 512*1024*schedule.RegionHeartBeatReportInterval, 1, 3) + tc.AddLeaderRegionWithReadInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) // lower than hot read flow rate, but higher than write flow rate - tc.addLeaderRegionWithReadInfo(11, 1, 24*1024*schedule.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(11, 1, 24*1024*schedule.RegionHeartBeatReportInterval, 2, 3) opt.HotRegionLowThreshold = 0 c.Assert(tc.IsRegionHot(1), IsTrue) c.Assert(tc.IsRegionHot(11), IsFalse) @@ -1005,17 +1009,17 @@ func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) { // which is hot for store 1 is more larger than other stores. CheckTransferLeader(c, hb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpHotRegion, 1, 3) // assume handle the operator - tc.addLeaderRegionWithReadInfo(3, 3, 512*1024*schedule.RegionHeartBeatReportInterval, 1, 2) + tc.AddLeaderRegionWithReadInfo(3, 3, 512*1024*schedule.RegionHeartBeatReportInterval, 1, 2) // After transfer a hot region leader from store 1 to store 3 // the tree region leader will be evenly distributed in three stores - tc.updateStorageReadBytes(1, 60*1024*1024) - tc.updateStorageReadBytes(2, 30*1024*1024) - tc.updateStorageReadBytes(3, 60*1024*1024) - tc.updateStorageReadBytes(4, 30*1024*1024) - tc.updateStorageReadBytes(5, 30*1024*1024) - tc.addLeaderRegionWithReadInfo(4, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) - tc.addLeaderRegionWithReadInfo(5, 4, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 5) + tc.UpdateStorageReadBytes(1, 60*1024*1024) + tc.UpdateStorageReadBytes(2, 30*1024*1024) + tc.UpdateStorageReadBytes(3, 60*1024*1024) + tc.UpdateStorageReadBytes(4, 30*1024*1024) + tc.UpdateStorageReadBytes(5, 30*1024*1024) + tc.AddLeaderRegionWithReadInfo(4, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(5, 4, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 5) // Now appear two read hot region in store 1 and 4 // We will Transfer peer from 1 to 5 diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 9955f267c0d..22e6be4c0d0 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -261,7 +261,6 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto schedule.NewSnapshotCountFilter(), schedule.NewExcludedFilter(srcRegion.GetStoreIds(), srcRegion.GetStoreIds()), schedule.NewDistinctScoreFilter(cluster.GetLocationLabels(), cluster.GetRegionStores(srcRegion), srcStore), - schedule.NewStorageThresholdFilter(), } destStoreIDs := make([]uint64, 0, len(stores)) for _, store := range stores { diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index 7b14d00acc3..1de6eae94dc 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -26,23 +26,23 @@ var _ = Suite(&testShuffleLeaderSuite{}) type testShuffleLeaderSuite struct{} func (s *testShuffleLeaderSuite) TestShuffle(c *C) { - opt := newTestScheduleConfig() - tc := newMockCluster(opt) + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) sl, err := schedule.CreateScheduler("shuffle-leader", schedule.NewLimiter()) c.Assert(err, IsNil) c.Assert(sl.Schedule(tc, schedule.OpInfluence{}), IsNil) // Add stores 1,2,3,4 - tc.addLeaderStore(1, 6) - tc.addLeaderStore(2, 7) - tc.addLeaderStore(3, 8) - tc.addLeaderStore(4, 9) + tc.AddLeaderStore(1, 6) + tc.AddLeaderStore(2, 7) + tc.AddLeaderStore(3, 8) + tc.AddLeaderStore(4, 9) // Add regions 1,2,3,4 with leaders in stores 1,2,3,4 - tc.addLeaderRegion(1, 1, 2, 3, 4) - tc.addLeaderRegion(2, 2, 3, 4, 1) - tc.addLeaderRegion(3, 3, 4, 1, 2) - tc.addLeaderRegion(4, 4, 1, 2, 3) + tc.AddLeaderRegion(1, 1, 2, 3, 4) + tc.AddLeaderRegion(2, 2, 3, 4, 1) + tc.AddLeaderRegion(3, 3, 4, 1, 2) + tc.AddLeaderRegion(4, 4, 1, 2, 3) for i := 0; i < 4; i++ { op := sl.Schedule(tc, schedule.NewOpInfluence(nil, tc)) @@ -56,26 +56,26 @@ var _ = Suite(&testBalanceAdjacentRegionSuite{}) type testBalanceAdjacentRegionSuite struct{} func (s *testBalanceAdjacentRegionSuite) TestBalance(c *C) { - opt := newTestScheduleConfig() - tc := newMockCluster(opt) + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) sc, err := schedule.CreateScheduler("adjacent-region", schedule.NewLimiter()) c.Assert(err, IsNil) c.Assert(sc.Schedule(tc, schedule.NewOpInfluence(nil, tc)), IsNil) // Add stores 1,2,3,4 - tc.addLeaderStore(1, 5) - tc.addLeaderStore(2, 0) - tc.addLeaderStore(3, 0) - tc.addLeaderStore(4, 0) + tc.AddLeaderStore(1, 5) + tc.AddLeaderStore(2, 0) + tc.AddLeaderStore(3, 0) + tc.AddLeaderStore(4, 0) // Add regions - tc.addLeaderRegionWithRange(1, "", "a", 1, 2, 3) - tc.addLeaderRegionWithRange(2, "a", "b", 1, 2, 3) - tc.addLeaderRegionWithRange(3, "b", "c", 1, 3, 4) - tc.addLeaderRegionWithRange(4, "c", "d", 1, 2, 3) - tc.addLeaderRegionWithRange(5, "e", "f", 1, 2, 3) - tc.addLeaderRegionWithRange(6, "f", "g", 1, 2, 3) - tc.addLeaderRegionWithRange(7, "z", "", 1, 2, 3) + tc.AddLeaderRegionWithRange(1, "", "a", 1, 2, 3) + tc.AddLeaderRegionWithRange(2, "a", "b", 1, 2, 3) + tc.AddLeaderRegionWithRange(3, "b", "c", 1, 3, 4) + tc.AddLeaderRegionWithRange(4, "c", "d", 1, 2, 3) + tc.AddLeaderRegionWithRange(5, "e", "f", 1, 2, 3) + tc.AddLeaderRegionWithRange(6, "f", "g", 1, 2, 3) + tc.AddLeaderRegionWithRange(7, "z", "", 1, 2, 3) // check and do operator // transfer peer from store 1 to 4 for region 1 because the distribution of @@ -83,26 +83,26 @@ func (s *testBalanceAdjacentRegionSuite) TestBalance(c *C) { // to a new store checkTransferPeerWithLeaderTransfer(c, sc.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpAdjacent, 1, 4) // suppose we add peer in store 4, transfer leader to store 2, remove peer in store 1 - tc.addLeaderRegionWithRange(1, "", "a", 2, 3, 4) + tc.AddLeaderRegionWithRange(1, "", "a", 2, 3, 4) // transfer leader from store 1 to store 2 for region 2 because we have a different peer location, // we can directly transfer leader to peer 2. we priority to transfer leader because less overhead CheckTransferLeader(c, sc.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpAdjacent, 1, 2) - tc.addLeaderRegionWithRange(2, "a", "b", 2, 1, 3) + tc.AddLeaderRegionWithRange(2, "a", "b", 2, 1, 3) // transfer leader from store 1 to store 2 for region 3 CheckTransferLeader(c, sc.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpAdjacent, 1, 4) - tc.addLeaderRegionWithRange(3, "b", "c", 4, 1, 3) + tc.AddLeaderRegionWithRange(3, "b", "c", 4, 1, 3) // transfer peer from store 1 to store 4 for region 5 // the region 5 just adjacent the region 6 checkTransferPeerWithLeaderTransfer(c, sc.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpAdjacent, 1, 4) - tc.addLeaderRegionWithRange(5, "e", "f", 2, 3, 4) + tc.AddLeaderRegionWithRange(5, "e", "f", 2, 3, 4) c.Assert(sc.Schedule(tc, schedule.NewOpInfluence(nil, tc)), IsNil) c.Assert(sc.Schedule(tc, schedule.NewOpInfluence(nil, tc)), IsNil) CheckTransferLeader(c, sc.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpAdjacent, 2, 4) - tc.addLeaderRegionWithRange(1, "", "a", 4, 2, 3) + tc.AddLeaderRegionWithRange(1, "", "a", 4, 2, 3) for i := 0; i < 10; i++ { c.Assert(sc.Schedule(tc, schedule.NewOpInfluence(nil, tc)), IsNil) } @@ -141,18 +141,18 @@ func (s *testScatterRegionSuite) TestFiveStores(c *C) { } func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) { - opt := newTestScheduleConfig() - tc := newMockCluster(opt) + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) // Add stores 1~6. for i := uint64(1); i <= numStores; i++ { - tc.addRegionStore(i, 0) + tc.AddRegionStore(i, 0) } // Add regions 1~4. seq := newSequencer(numStores) for i := uint64(1); i <= numRegions; i++ { - tc.addLeaderRegion(i, seq.next(), seq.next(), seq.next()) + tc.AddLeaderRegion(i, seq.next(), seq.next(), seq.next()) } scatterer := schedule.NewRegionScatterer(tc, namespace.DefaultClassifier) @@ -161,7 +161,7 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) { region := tc.GetRegion(i) if op := scatterer.Scatter(region); op != nil { log.Info(op) - tc.applyOperator(op) + tc.ApplyOperator(op) } } @@ -184,19 +184,19 @@ var _ = Suite(&testRejectLeaderSuite{}) type testRejectLeaderSuite struct{} func (s *testRejectLeaderSuite) TestRejectLeader(c *C) { - opt := newTestScheduleConfig() + opt := schedule.NewMockSchedulerOptions() opt.LabelProperties = map[string][]*metapb.StoreLabel{ schedule.RejectLeader: {{Key: "noleader", Value: "true"}}, } - tc := newMockCluster(opt) + tc := schedule.NewMockCluster(opt) // Add 2 stores 1,2. - tc.addLabelsStore(1, 1, map[string]string{"noleader": "true"}) - tc.updateLeaderCount(1, 1) - tc.addLeaderStore(2, 10) + tc.AddLabelsStore(1, 1, map[string]string{"noleader": "true"}) + tc.UpdateLeaderCount(1, 1) + tc.AddLeaderStore(2, 10) // Add 2 regions with leader on 1 and 2. - tc.addLeaderRegion(1, 1, 2) - tc.addLeaderRegion(2, 2, 1) + tc.AddLeaderRegion(1, 1, 2) + tc.AddLeaderRegion(2, 2, 1) // The label scheduler transfers leader out of store1. sl, err := schedule.CreateScheduler("label", schedule.NewLimiter()) diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index e8aff20d6d8..cbfec749af8 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -85,15 +85,14 @@ func minDuration(a, b time.Duration) time.Duration { return b } -func shouldBalance(sourceSize int64, sourceWeight float64, targetSize int64, targetWeight float64, moveSize float64) bool { - if targetWeight == 0 { - return false - } - if sourceWeight == 0 { - return true - } +func shouldBalance(cluster schedule.Cluster, source, target *core.StoreInfo, kind core.ResourceKind, region *core.RegionInfo, opInfluence schedule.OpInfluence) bool { + regionSize := int64(float64(region.ApproximateSize) * cluster.GetTolerantSizeRatio()) + sourceDelta := opInfluence.GetStoreInfluence(source.GetId()).ResourceSize(kind) - regionSize + targetDelta := opInfluence.GetStoreInfluence(target.GetId()).ResourceSize(kind) + regionSize + // Make sure after move, source score is still greater than target score. - return (float64(sourceSize)-moveSize)/sourceWeight > (float64(targetSize)+moveSize)/targetWeight + return source.ResourceScore(kind, cluster.GetHighSpaceRatio(), cluster.GetLowSpaceRatio(), sourceDelta) > + target.ResourceScore(kind, cluster.GetHighSpaceRatio(), cluster.GetLowSpaceRatio(), targetDelta) } func adjustBalanceLimit(cluster schedule.Cluster, kind core.ResourceKind) uint64 { diff --git a/server/store_statistics.go b/server/store_statistics.go index b4efb9dc46a..ce5e3c7409f 100644 --- a/server/store_statistics.go +++ b/server/store_statistics.go @@ -63,7 +63,7 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) { s.Tombstone++ return } - if store.IsLowSpace() { + if store.IsLowSpace(s.opt.GetLowSpaceRatio()) { s.LowSpace++ } @@ -74,12 +74,16 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) { s.LeaderCount += store.LeaderCount id := strconv.FormatUint(store.GetId(), 10) - storeStatusGauge.WithLabelValues(s.namespace, id, "region_score").Set(store.RegionScore()) - storeStatusGauge.WithLabelValues(s.namespace, id, "leader_score").Set(store.LeaderScore()) + storeStatusGauge.WithLabelValues(s.namespace, id, "region_score").Set(store.RegionScore(s.opt.GetHighSpaceRatio(), s.opt.GetLowSpaceRatio(), 0)) + storeStatusGauge.WithLabelValues(s.namespace, id, "leader_score").Set(store.LeaderScore(0)) storeStatusGauge.WithLabelValues(s.namespace, id, "region_size").Set(float64(store.RegionSize)) storeStatusGauge.WithLabelValues(s.namespace, id, "region_count").Set(float64(store.RegionCount)) storeStatusGauge.WithLabelValues(s.namespace, id, "leader_size").Set(float64(store.LeaderSize)) storeStatusGauge.WithLabelValues(s.namespace, id, "leader_count").Set(float64(store.LeaderCount)) + storeStatusGauge.WithLabelValues(s.namespace, id, "storage_available").Set(float64(store.Stats.GetAvailable())) + storeStatusGauge.WithLabelValues(s.namespace, id, "storage_capacity").Set(float64(store.Stats.GetCapacity())) + storeStatusGauge.WithLabelValues(s.namespace, id, "storage_used").Set(float64(store.Stats.GetUsedSize())) + storeStatusGauge.WithLabelValues(s.namespace, id, "score_amplify").Set(float64(store.RegionSize) / float64(store.Stats.GetUsedSize()) * (1 << 20)) } func (s *storeStatistics) Collect() {