Skip to content

Commit

Permalink
schedule: fix store maybe always overloaded (#1586)
Browse files Browse the repository at this point in the history
* fix store maybe always overloaded

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored and nolouch committed Jun 20, 2019
1 parent 98a10f9 commit 6965066
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 71 deletions.
13 changes: 3 additions & 10 deletions server/cluster_info.go
Expand Up @@ -240,18 +240,11 @@ func (c *clusterInfo) UnblockStore(storeID uint64) {
c.core.UnblockStore(storeID)
}

// SetStoreOverload stops balancer from selecting the store.
func (c *clusterInfo) SetStoreOverload(storeID uint64) {
// AttachOverloadStatus attaches the overload status to a store.
func (c *clusterInfo) AttachOverloadStatus(storeID uint64, f func() bool) {
c.Lock()
defer c.Unlock()
c.core.SetStoreOverload(storeID)
}

// ResetStoreOverload allows balancer to select the store.
func (c *clusterInfo) ResetStoreOverload(storeID uint64) {
c.Lock()
defer c.Unlock()
c.core.ResetStoreOverload(storeID)
c.core.AttachOverloadStatus(storeID, f)
}

// GetStores returns all stores in the cluster.
Expand Down
60 changes: 58 additions & 2 deletions server/coordinator_test.go
Expand Up @@ -32,8 +32,8 @@ import (
"github.com/pingcap/pd/server/schedulers"
)

func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind schedule.OperatorKind) *schedule.Operator {
return schedule.NewOperator("test", regionID, regionEpoch, kind)
func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind schedule.OperatorKind, steps ...schedule.OperatorStep) *schedule.Operator {
return schedule.NewOperator("test", regionID, regionEpoch, kind, steps...)
}

func newTestScheduleConfig() (*ScheduleConfig, *scheduleOption, error) {
Expand Down Expand Up @@ -765,6 +765,62 @@ func (s *testOperatorControllerSuite) TestOperatorCount(c *C) {
c.Assert(oc.OperatorCount(schedule.OpLeader), Equals, uint64(0))
}

func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()
oc := schedule.NewOperatorController(tc.clusterInfo, hbStreams)
lb, err := schedule.CreateScheduler("balance-region", oc)
c.Assert(err, IsNil)

c.Assert(tc.addRegionStore(4, 40), IsNil)
c.Assert(tc.addRegionStore(3, 40), IsNil)
c.Assert(tc.addRegionStore(2, 40), IsNil)
c.Assert(tc.addRegionStore(1, 10), IsNil)
c.Assert(tc.addLeaderRegion(1, 2, 3, 4), IsNil)
op1 := lb.Schedule(tc)[0]
c.Assert(op1, NotNil)
c.Assert(oc.AddOperator(op1), IsTrue)
for i := 0; i < 10; i++ {
c.Assert(lb.Schedule(tc), IsNil)
}
oc.RemoveOperator(op1)
time.Sleep(1 * time.Second)
for i := 0; i < 100; i++ {
c.Assert(lb.Schedule(tc), NotNil)
}
}

func (s *testOperatorControllerSuite) TestStoreOverloadedWithReplace(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()
oc := schedule.NewOperatorController(tc.clusterInfo, hbStreams)
lb, err := schedule.CreateScheduler("balance-region", oc)
c.Assert(err, IsNil)

c.Assert(tc.addRegionStore(4, 40), IsNil)
c.Assert(tc.addRegionStore(3, 40), IsNil)
c.Assert(tc.addRegionStore(2, 40), IsNil)
c.Assert(tc.addRegionStore(1, 10), IsNil)
c.Assert(tc.addLeaderRegion(1, 2, 3, 4), IsNil)
c.Assert(tc.addLeaderRegion(2, 1, 3, 4), IsNil)
op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpRegion, schedule.AddPeer{ToStore: 1, PeerID: 1})
c.Assert(oc.AddOperator(op1), IsTrue)
op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpRegion, schedule.AddPeer{ToStore: 2, PeerID: 2})
op2.SetPriorityLevel(core.HighPriority)
c.Assert(oc.AddOperator(op2), IsTrue)
op3 := newTestOperator(1, tc.GetRegion(2).GetRegionEpoch(), schedule.OpRegion, schedule.AddPeer{ToStore: 1, PeerID: 3})
c.Assert(oc.AddOperator(op3), IsFalse)
c.Assert(lb.Schedule(tc), IsNil)
time.Sleep(1 * time.Second)
c.Assert(lb.Schedule(tc), NotNil)
}

var _ = Suite(&testScheduleControllerSuite{})

type testScheduleControllerSuite struct{}
Expand Down
11 changes: 3 additions & 8 deletions server/core/basic_cluster.go
Expand Up @@ -84,14 +84,9 @@ func (bc *BasicCluster) UnblockStore(storeID uint64) {
bc.Stores.UnblockStore(storeID)
}

// SetStoreOverload stops balancer from selecting the store.
func (bc *BasicCluster) SetStoreOverload(storeID uint64) {
bc.Stores.SetStoreOverload(storeID)
}

// ResetStoreOverload allows balancer to select the store.
func (bc *BasicCluster) ResetStoreOverload(storeID uint64) {
bc.Stores.ResetStoreOverload(storeID)
// AttachOverloadStatus attaches the overload status to a store.
func (bc *BasicCluster) AttachOverloadStatus(storeID uint64, f func() bool) {
bc.Stores.AttachOverloadStatus(storeID, f)
}

// RandFollowerRegion returns a random region that has a follower on the store.
Expand Down
28 changes: 9 additions & 19 deletions server/core/store.go
Expand Up @@ -40,7 +40,7 @@ type StoreInfo struct {
lastHeartbeatTS time.Time
leaderWeight float64
regionWeight float64
overloaded bool
overloaded func() bool
}

// NewStoreInfo creates StoreInfo with meta data.
Expand Down Expand Up @@ -87,7 +87,10 @@ func (s *StoreInfo) IsBlocked() bool {

// IsOverloaded returns if the store is overloaded.
func (s *StoreInfo) IsOverloaded() bool {
return s.overloaded
if s.overloaded == nil {
return false
}
return s.overloaded()
}

// IsUp checks if the store's state is Up.
Expand Down Expand Up @@ -520,24 +523,11 @@ func (s *StoresInfo) UnblockStore(storeID uint64) {
s.stores[storeID] = store.Clone(SetStoreUnBlock())
}

// SetStoreOverload set a StoreInfo with storeID overload.
func (s *StoresInfo) SetStoreOverload(storeID uint64) {
store, ok := s.stores[storeID]
if !ok {
log.Fatal("store is overloaded, but it is not found",
zap.Uint64("store-id", storeID))
}
s.stores[storeID] = store.Clone(SetStoreOverload())
}

// ResetStoreOverload reset a StoreInfo with storeID overload.
func (s *StoresInfo) ResetStoreOverload(storeID uint64) {
store, ok := s.stores[storeID]
if !ok {
log.Fatal("store is not overloaded anymore, but it is not found",
zap.Uint64("store-id", storeID))
// AttachOverloadStatus attaches the overload status to a store.
func (s *StoresInfo) AttachOverloadStatus(storeID uint64, f func() bool) {
if store, ok := s.stores[storeID]; ok {
s.stores[storeID] = store.Clone(SetOverloadStatus(f))
}
s.stores[storeID] = store.Clone(ResetStoreOverload())
}

// GetStores gets a complete set of StoreInfo.
Expand Down
21 changes: 7 additions & 14 deletions server/core/store_option.go
Expand Up @@ -65,20 +65,6 @@ func SetStoreUnBlock() StoreCreateOption {
}
}

// SetStoreOverload stops balancer from selecting the store.
func SetStoreOverload() StoreCreateOption {
return func(store *StoreInfo) {
store.overloaded = true
}
}

// ResetStoreOverload allows balancer to select the store.
func ResetStoreOverload() StoreCreateOption {
return func(store *StoreInfo) {
store.overloaded = false
}
}

// SetLeaderCount sets the leader count for the store.
func SetLeaderCount(leaderCount int) StoreCreateOption {
return func(store *StoreInfo) {
Expand Down Expand Up @@ -141,3 +127,10 @@ func SetStoreStats(stats *pdpb.StoreStats) StoreCreateOption {
store.stats = stats
}
}

// SetOverloadStatus sets the overload status for the store.
func SetOverloadStatus(f func() bool) StoreCreateOption {
return func(store *StoreInfo) {
store.overloaded = f
}
}
32 changes: 16 additions & 16 deletions server/schedule/operator_controller.go
Expand Up @@ -278,16 +278,6 @@ func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus {
func (oc *OperatorController) removeOperatorLocked(op *Operator) {
regionID := op.RegionID()
delete(oc.operators, regionID)
opInfluence := NewTotalOpInfluence([]*Operator{op}, oc.cluster)
for storeID := range opInfluence.storesInfluence {
if opInfluence.GetStoreInfluence(storeID).StepCost == 0 {
continue
}
if oc.cluster.GetStore(storeID).IsOverloaded() &&
oc.storesLimit[storeID].Available() >= RegionInfluence {
oc.cluster.ResetStoreOverload(storeID)
}
}
oc.updateCounts(oc.operators)
operatorCounter.WithLabelValues(op.Desc(), "remove").Inc()
}
Expand Down Expand Up @@ -627,18 +617,14 @@ func (o *OperatorRecords) Put(op *Operator, status pdpb.OperatorStatus) {
func (oc *OperatorController) exceedStoreLimit(ops ...*Operator) bool {
opInfluence := NewTotalOpInfluence(ops, oc.cluster)
for storeID := range opInfluence.storesInfluence {
if oc.storesLimit[storeID] == nil {
rate := oc.cluster.GetStoreBalanceRate()
oc.newStoreLimit(storeID, rate)
}
stepCost := opInfluence.GetStoreInfluence(storeID).StepCost
if stepCost == 0 {
continue
}
available := oc.storesLimit[storeID].Available()

available := oc.getOrCreateStoreLimit(storeID).Available()
storeLimit.WithLabelValues(strconv.FormatUint(storeID, 10), "available").Set(float64(available) / float64(RegionInfluence))
if available < stepCost {
oc.cluster.SetStoreOverload(storeID)
return true
}
}
Expand Down Expand Up @@ -671,6 +657,20 @@ func (oc *OperatorController) newStoreLimit(storeID uint64, rate float64) {
oc.storesLimit[storeID] = ratelimit.NewBucketWithRate(rate, capacity)
}

// getOrCreateStoreLimit is used to get or create the limit of a store.
func (oc *OperatorController) getOrCreateStoreLimit(storeID uint64) *ratelimit.Bucket {
if oc.storesLimit[storeID] == nil {
rate := oc.cluster.GetStoreBalanceRate()
oc.newStoreLimit(storeID, rate)
oc.cluster.AttachOverloadStatus(storeID, func() bool {
oc.RLock()
defer oc.RUnlock()
return oc.storesLimit[storeID].Available() < RegionInfluence
})
}
return oc.storesLimit[storeID]
}

// GetAllStoresLimit is used to get limit of all stores.
func (oc *OperatorController) GetAllStoresLimit() map[uint64]float64 {
oc.RLock()
Expand Down
3 changes: 1 addition & 2 deletions server/schedule/scheduler.go
Expand Up @@ -44,8 +44,7 @@ type Cluster interface {
BlockStore(id uint64) error
UnblockStore(id uint64)

SetStoreOverload(id uint64)
ResetStoreOverload(id uint64)
AttachOverloadStatus(id uint64, f func() bool)

IsRegionHot(id uint64) bool
RegionWriteStats() []*statistics.RegionStat
Expand Down

0 comments on commit 6965066

Please sign in to comment.