Skip to content

Commit

Permalink
*: new store region score function for balance (#1014)
Browse files Browse the repository at this point in the history
  • Loading branch information
Connor1996 committed Apr 19, 2018
1 parent 5bc920f commit e94ccf5
Show file tree
Hide file tree
Showing 33 changed files with 654 additions and 542 deletions.
4 changes: 3 additions & 1 deletion conf/config.toml
Expand Up @@ -61,7 +61,9 @@ leader-schedule-limit = 4
region-schedule-limit = 4
replica-schedule-limit = 8
merge-schedule-limit = 8
tolerant-size-ratio = 2.5
tolerant-size-ratio = 5
high-space-reatio = 0.8
low-space-ratio = 0.6

# customized schedulers, the format is as below
# if empty, it will use balance-leader, balance-region, hot-region as default
Expand Down
4 changes: 1 addition & 3 deletions server/api/label.go
Expand Up @@ -72,8 +72,6 @@ func (h *labelsHandler) GetStores(w http.ResponseWriter, r *http.Request) {
return
}

maxDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration

stores := cluster.GetStores()
storesInfo := &StoresInfo{
Stores: make([]*StoreInfo, 0, len(stores)),
Expand All @@ -87,7 +85,7 @@ func (h *labelsHandler) GetStores(w http.ResponseWriter, r *http.Request) {
return
}

storeInfo := newStoreInfo(store, maxDownTime)
storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store)
storesInfo.Stores = append(storesInfo.Stores, storeInfo)
}
storesInfo.Count = len(storesInfo.Stores)
Expand Down
16 changes: 6 additions & 10 deletions server/api/store.go
Expand Up @@ -66,7 +66,7 @@ const (
downStateName = "Down"
)

func newStoreInfo(store *core.StoreInfo, maxStoreDownTime time.Duration) *StoreInfo {
func newStoreInfo(opt *server.ScheduleConfig, store *core.StoreInfo) *StoreInfo {
s := &StoreInfo{
Store: &MetaStore{
Store: store.Store,
Expand All @@ -77,11 +77,11 @@ func newStoreInfo(store *core.StoreInfo, maxStoreDownTime time.Duration) *StoreI
Available: typeutil.ByteSize(store.Stats.GetAvailable()),
LeaderCount: store.LeaderCount,
LeaderWeight: store.LeaderWeight,
LeaderScore: store.LeaderScore(),
LeaderScore: store.LeaderScore(0),
LeaderSize: store.LeaderSize,
RegionCount: store.RegionCount,
RegionWeight: store.RegionWeight,
RegionScore: store.RegionScore(),
RegionScore: store.RegionScore(opt.HighSpaceRatio, opt.LowSpaceRatio, 0),
RegionSize: store.RegionSize,
SendingSnapCount: store.Stats.GetSendingSnapCount(),
ReceivingSnapCount: store.Stats.GetReceivingSnapCount(),
Expand All @@ -103,7 +103,7 @@ func newStoreInfo(store *core.StoreInfo, maxStoreDownTime time.Duration) *StoreI
}

if store.State == metapb.StoreState_Up {
if store.DownTime() > maxStoreDownTime {
if store.DownTime() > opt.MaxStoreDownTime.Duration {
s.Store.StateName = downStateName
} else if store.IsDisconnected() {
s.Store.StateName = disconnectedName
Expand Down Expand Up @@ -137,8 +137,6 @@ func (h *storeHandler) Get(w http.ResponseWriter, r *http.Request) {
return
}

maxStoreDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration

vars := mux.Vars(r)
storeIDStr := vars["id"]
storeID, err := strconv.ParseUint(storeIDStr, 10, 64)
Expand All @@ -153,7 +151,7 @@ func (h *storeHandler) Get(w http.ResponseWriter, r *http.Request) {
return
}

storeInfo := newStoreInfo(store, maxStoreDownTime)
storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store)
h.rd.JSON(w, http.StatusOK, storeInfo)
}

Expand Down Expand Up @@ -324,8 +322,6 @@ func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

maxStoreDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration

stores := cluster.GetStores()
StoresInfo := &StoresInfo{
Stores: make([]*StoreInfo, 0, len(stores)),
Expand All @@ -345,7 +341,7 @@ func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

storeInfo := newStoreInfo(store, maxStoreDownTime)
storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store)
StoresInfo.Stores = append(StoresInfo.Stores, storeInfo)
}
StoresInfo.Count = len(StoresInfo.Stores)
Expand Down
6 changes: 3 additions & 3 deletions server/api/store_test.go
Expand Up @@ -270,14 +270,14 @@ func (s *testStoreSuite) TestDownState(c *C) {
Stats: &pdpb.StoreStats{},
LastHeartbeatTS: time.Now(),
}
storeInfo := newStoreInfo(store, time.Hour)
storeInfo := newStoreInfo(s.svr.GetScheduleConfig(), store)
c.Assert(storeInfo.Store.StateName, Equals, metapb.StoreState_Up.String())

store.LastHeartbeatTS = time.Now().Add(-time.Minute * 2)
storeInfo = newStoreInfo(store, time.Hour)
storeInfo = newStoreInfo(s.svr.GetScheduleConfig(), store)
c.Assert(storeInfo.Store.StateName, Equals, disconnectedName)

store.LastHeartbeatTS = time.Now().Add(-time.Hour * 2)
storeInfo = newStoreInfo(store, time.Hour)
storeInfo = newStoreInfo(s.svr.GetScheduleConfig(), store)
c.Assert(storeInfo.Store.StateName, Equals, downStateName)
}
4 changes: 1 addition & 3 deletions server/api/trend.go
Expand Up @@ -107,8 +107,6 @@ func (h *trendHandler) Handle(w http.ResponseWriter, r *http.Request) {
}

func (h *trendHandler) getTrendStores() ([]trendStore, error) {
maxStoreDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration

var readStats, writeStats core.StoreHotRegionsStat
if hotRead := h.GetHotReadRegions(); hotRead != nil {
readStats = hotRead.AsLeader
Expand All @@ -123,7 +121,7 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) {

trendStores := make([]trendStore, 0, len(stores))
for _, store := range stores {
info := newStoreInfo(store, maxStoreDownTime)
info := newStoreInfo(h.svr.GetScheduleConfig(), store)
s := trendStore{
ID: info.Store.GetId(),
Address: info.Store.GetAddress(),
Expand Down
30 changes: 8 additions & 22 deletions server/cache.go
Expand Up @@ -167,28 +167,6 @@ func (c *clusterInfo) GetStores() []*core.StoreInfo {
return c.BasicCluster.GetStores()
}

// GetStoresAverageScore returns the total resource score of all unfiltered stores.
func (c *clusterInfo) GetStoresAverageScore(kind core.ResourceKind, filters ...schedule.Filter) float64 {
c.RLock()
defer c.RUnlock()

var totalResourceSize int64
var totalResourceWeight float64
for _, s := range c.BasicCluster.GetStores() {
if schedule.FilterSource(c, s, filters) {
continue
}

totalResourceWeight += s.ResourceWeight(kind)
totalResourceSize += s.ResourceSize(kind)
}

if totalResourceWeight == 0 {
return 0
}
return float64(totalResourceSize) / totalResourceWeight
}

func (c *clusterInfo) getMetaStores() []*metapb.Store {
c.RLock()
defer c.RUnlock()
Expand Down Expand Up @@ -564,6 +542,14 @@ func (c *clusterInfo) GetTolerantSizeRatio() float64 {
return c.opt.GetTolerantSizeRatio()
}

func (c *clusterInfo) GetLowSpaceRatio() float64 {
return c.opt.GetLowSpaceRatio()
}

func (c *clusterInfo) GetHighSpaceRatio() float64 {
return c.opt.GetHighSpaceRatio()
}

func (c *clusterInfo) GetMaxSnapshotCount() uint64 {
return c.opt.GetMaxSnapshotCount()
}
Expand Down
20 changes: 19 additions & 1 deletion server/config.go
Expand Up @@ -352,6 +352,18 @@ type ScheduleConfig struct {
MergeScheduleLimit uint64 `toml:"merge-schedule-limit,omitempty" json:"merge-schedule-limit"`
// TolerantSizeRatio is the ratio of buffer size for balance scheduler.
TolerantSizeRatio float64 `toml:"tolerant-size-ratio,omitempty" json:"tolerant-size-ratio"`
//
// high space stage transition stage low space stage
// |--------------------|-----------------------------|-------------------------|
// ^ ^ ^ ^
// 0 HighSpaceRatio * capacity LowSpaceRatio * capacity capacity
//
// LowSpaceRatio is the lowest usage ratio of store which regraded as low space.
// When in low space, store region score increases to very large and varies inversely with available size.
LowSpaceRatio float64 `toml:"low-space-ratio,omitempty" json:"low-space-ratio"`
// HighSpaceRatio is the highest usage ratio of store which regraded as high space.
// High space means there is a lot of spare capacity, and store region score varies directly with used size.
HighSpaceRatio float64 `toml:"high-space-ratio,omitempty" json:"high-space-ratio"`
// EnableRaftLearner is the option for using AddLearnerNode instead of AddNode
EnableRaftLearner bool `toml:"enable-raft-learner" json:"enable-raft-learner,string"`
// Schedulers support for loding customized schedulers
Expand All @@ -371,6 +383,8 @@ func (c *ScheduleConfig) clone() *ScheduleConfig {
ReplicaScheduleLimit: c.ReplicaScheduleLimit,
MergeScheduleLimit: c.MergeScheduleLimit,
TolerantSizeRatio: c.TolerantSizeRatio,
LowSpaceRatio: c.LowSpaceRatio,
HighSpaceRatio: c.HighSpaceRatio,
EnableRaftLearner: c.EnableRaftLearner,
Schedulers: schedulers,
}
Expand All @@ -396,7 +410,9 @@ const (
defaultRegionScheduleLimit = 4
defaultReplicaScheduleLimit = 8
defaultMergeScheduleLimit = 8
defaultTolerantSizeRatio = 2.5
defaultTolerantSizeRatio = 5
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
)

var defaultSchedulers = SchedulerConfigs{
Expand All @@ -416,6 +432,8 @@ func (c *ScheduleConfig) adjust() {
adjustUint64(&c.ReplicaScheduleLimit, defaultReplicaScheduleLimit)
adjustUint64(&c.MergeScheduleLimit, defaultMergeScheduleLimit)
adjustFloat64(&c.TolerantSizeRatio, defaultTolerantSizeRatio)
adjustFloat64(&c.LowSpaceRatio, defaultLowSpaceRatio)
adjustFloat64(&c.HighSpaceRatio, defaultHighSpaceRatio)
adjustSchedulers(&c.Schedulers, defaultSchedulers)
}

Expand Down
2 changes: 2 additions & 0 deletions server/coordinator.go
Expand Up @@ -436,6 +436,7 @@ func (c *coordinator) addOperatorLocked(op *schedule.Operator) bool {
if old, ok := c.operators[regionID]; ok {
if !isHigherPriorityOperator(op, old) {
log.Infof("[region %v] cancel add operator, old: %s", regionID, old)
operatorCounter.WithLabelValues(op.Desc(), "canceled").Inc()
return false
}
log.Infof("[region %v] replace old operator: %s", regionID, old)
Expand Down Expand Up @@ -470,6 +471,7 @@ func (c *coordinator) addOperators(ops ...*schedule.Operator) bool {
for _, op := range ops {
if old := c.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) {
log.Infof("[region %v] cancel add operators, old: %s", op.RegionID(), old)
operatorCounter.WithLabelValues(op.Desc(), "canceled").Inc()
return false
}
}
Expand Down
4 changes: 2 additions & 2 deletions server/coordinator_test.go
Expand Up @@ -65,8 +65,8 @@ func (c *testClusterInfo) addRegionStore(storeID uint64, regionCount int) {
store.LastHeartbeatTS = time.Now()
store.RegionCount = regionCount
store.RegionSize = int64(regionCount) * 10
store.Stats.Capacity = uint64(1024)
store.Stats.Available = store.Stats.Capacity
store.Stats.Capacity = 1000 * (1 << 20)
store.Stats.Available = store.Stats.Capacity - uint64(store.RegionSize)
c.putStore(store)
}

Expand Down
56 changes: 43 additions & 13 deletions server/core/store.go
Expand Up @@ -45,6 +45,7 @@ type StoreInfo struct {
func NewStoreInfo(store *metapb.Store) *StoreInfo {
return &StoreInfo{
Store: store,
Stats: &pdpb.StoreStats{},
LeaderWeight: 1.0,
RegionWeight: 1.0,
}
Expand Down Expand Up @@ -103,15 +104,46 @@ func (s *StoreInfo) DownTime() time.Duration {
}

const minWeight = 1e-6
const maxScore = 1024 * 1024 * 1024

// LeaderScore returns the store's leader score: leaderCount / leaderWeight.
func (s *StoreInfo) LeaderScore() float64 {
return float64(s.LeaderSize) / math.Max(s.LeaderWeight, minWeight)
// LeaderScore returns the store's leader score: leaderSize / leaderWeight.
func (s *StoreInfo) LeaderScore(delta int64) float64 {
return float64(s.LeaderSize+delta) / math.Max(s.LeaderWeight, minWeight)
}

// RegionScore returns the store's region score: regionSize / regionWeight.
func (s *StoreInfo) RegionScore() float64 {
return float64(s.RegionSize) / math.Max(s.RegionWeight, minWeight)
// RegionScore returns the store's region score.
func (s *StoreInfo) RegionScore(highSpaceRatio, lowSpaceRatio float64, delta int64) float64 {
if s.RegionSize == 0 {
return float64(delta)
}

capacity := float64(s.Stats.GetCapacity()) / (1 << 20)
available := float64(s.Stats.GetAvailable()) / (1 << 20)

var score float64

// because of rocksdb compression, region size is larger than actual used size
amplification := float64(s.RegionSize) / (float64(s.Stats.GetUsedSize()) / (1 << 20))

if available-float64(delta)/amplification >= (1-highSpaceRatio)*capacity {
score = float64(s.RegionSize + delta)
} else if available-float64(delta)/amplification <= (1-lowSpaceRatio)*capacity {
score = maxScore - (available - float64(delta)/amplification)
} else {
// to make the score function continuous, we use linear function y = k * x + b as transition period
// from above we know that there are two points must on the function image
// p1(highSpaceRatio*capacity*amplification, highSpaceRatio*capacity*amplification) and
// p2(lowSpaceRatio*capacity*amplification, maxScore-(1-lowSpaceRatio)*capacity)
// so k = (y2 - y1) / (x2 - x1)
x1, y1 := highSpaceRatio*capacity*amplification, highSpaceRatio*capacity*amplification
x2, y2 := lowSpaceRatio*capacity*amplification, maxScore-(1-lowSpaceRatio)*capacity

k := (y2 - y1) / (x2 - x1)
b := y1 - k*x1
score = k*float64(s.RegionSize+delta) + b
}

return score / math.Max(s.RegionWeight, minWeight)
}

// StorageSize returns store's used storage size reported from tikv.
Expand All @@ -127,11 +159,9 @@ func (s *StoreInfo) AvailableRatio() float64 {
return float64(s.Stats.GetAvailable()) / float64(s.Stats.GetCapacity())
}

const storeLowSpaceThreshold = 0.2

// IsLowSpace checks if the store is lack of space.
func (s *StoreInfo) IsLowSpace() bool {
return s.Stats != nil && s.AvailableRatio() < storeLowSpaceThreshold
func (s *StoreInfo) IsLowSpace(lowSpaceRatio float64) bool {
return s.Stats != nil && s.AvailableRatio() < 1-lowSpaceRatio
}

// ResourceCount reutrns count of leader/region in the store.
Expand Down Expand Up @@ -159,12 +189,12 @@ func (s *StoreInfo) ResourceSize(kind ResourceKind) int64 {
}

// ResourceScore reutrns score of leader/region in the store.
func (s *StoreInfo) ResourceScore(kind ResourceKind) float64 {
func (s *StoreInfo) ResourceScore(kind ResourceKind, highSpaceRatio, lowSpaceRatio float64, delta int64) float64 {
switch kind {
case LeaderKind:
return s.LeaderScore()
return s.LeaderScore(delta)
case RegionKind:
return s.RegionScore()
return s.RegionScore(highSpaceRatio, lowSpaceRatio, delta)
default:
return 0
}
Expand Down
8 changes: 8 additions & 0 deletions server/option.go
Expand Up @@ -120,6 +120,14 @@ func (o *scheduleOption) GetTolerantSizeRatio() float64 {
return o.load().TolerantSizeRatio
}

func (o *scheduleOption) GetLowSpaceRatio() float64 {
return o.load().LowSpaceRatio
}

func (o *scheduleOption) GetHighSpaceRatio() float64 {
return o.load().HighSpaceRatio
}

func (o *scheduleOption) IsRaftLearnerEnabled() bool {
return o.load().EnableRaftLearner
}
Expand Down
2 changes: 1 addition & 1 deletion server/region_statistics.go
Expand Up @@ -163,7 +163,7 @@ func (l *labelLevelStatistics) Observe(region *core.RegionInfo, stores []*core.S
func (l *labelLevelStatistics) Collect() {
for level, count := range l.labelLevelCounter {
typ := fmt.Sprintf("level_%d", level)
regionStatusGauge.WithLabelValues(typ).Set(float64(count))
regionLabelLevelGauge.WithLabelValues(typ).Set(float64(count))
}
}

Expand Down
20 changes: 16 additions & 4 deletions server/schedule/basic_cluster.go
Expand Up @@ -73,10 +73,22 @@ func (m OpInfluence) GetStoreInfluence(id uint64) *StoreInfluence {

// StoreInfluence records influences that pending operators will make.
type StoreInfluence struct {
RegionSize int
RegionCount int
LeaderSize int
LeaderCount int
RegionSize int64
RegionCount int64
LeaderSize int64
LeaderCount int64
}

// ResourceSize returns delta size of leader/region by influence.
func (s StoreInfluence) ResourceSize(kind core.ResourceKind) int64 {
switch kind {
case core.LeaderKind:
return s.LeaderSize
case core.RegionKind:
return s.RegionSize
default:
return 0
}
}

// NewBasicCluster creates a BasicCluster.
Expand Down

0 comments on commit e94ccf5

Please sign in to comment.