Skip to content

Commit

Permalink
Merge af6f001 into 049a81e
Browse files Browse the repository at this point in the history
  • Loading branch information
huachaohuang committed Nov 18, 2016
2 parents 049a81e + af6f001 commit 3f75b90
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 56 deletions.
67 changes: 11 additions & 56 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,78 +201,33 @@ func (cb *capacityBalancer) Balance(cluster *clusterInfo) (*score, *balanceOpera
}

type leaderBalancer struct {
filters []Filter
st scoreType

cfg *BalanceConfig
cfg *BalanceConfig
st scoreType
selector Selector
}

func newLeaderBalancer(cfg *BalanceConfig) *leaderBalancer {
var filters []Filter
filters = append(filters, newStateFilter(cfg))
filters = append(filters, newLeaderCountFilter(cfg))

lb := &leaderBalancer{cfg: cfg, st: leaderScore}
lb.filters = append(lb.filters, newStateFilter(cfg))
lb.filters = append(lb.filters, newLeaderCountFilter(cfg))
lb.selector = newBalanceSelector(newScorer(lb.st), filters)
return lb
}

func (lb *leaderBalancer) ScoreType() scoreType {
return lb.st
}

// selectBalanceRegion tries to select a store leader region to do balance.
func (lb *leaderBalancer) selectBalanceRegion(cluster *clusterInfo, stores []*storeInfo) (*regionInfo, *metapb.Peer) {
store := selectFromStore(stores, nil, lb.filters, lb.st)
if store == nil {
return nil, nil
}

// Random select one leader region from store.
region := cluster.randLeaderRegion(store.GetId())
if region == nil {
return nil, nil
}

newLeader := lb.selectNewLeaderPeer(cluster, region.GetFollowers())
if newLeader == nil {
return nil, nil
}

return region, newLeader
}

func (lb *leaderBalancer) selectNewLeaderPeer(cluster *clusterInfo, peers map[uint64]*metapb.Peer) *metapb.Peer {
stores := make([]*storeInfo, 0, len(peers))
for storeID := range peers {
stores = append(stores, cluster.getStore(storeID))
}

store := selectToStore(stores, nil, nil, lb.st)
if store == nil {
return nil
}

storeID := store.GetId()
return peers[storeID]
}

// Balance tries to select a store region to do balance.
// The balance type is leader transfer.
func (lb *leaderBalancer) Balance(cluster *clusterInfo) (*score, *balanceOperator, error) {
// If cluster max peer count config is 1, we cannot do leader transfer,
meta := cluster.getMeta()
if meta.GetMaxPeerCount() == 1 {
return nil, nil, nil
}

stores := cluster.getStores()
region, newLeader := lb.selectBalanceRegion(cluster, stores)
if region == nil || newLeader == nil {
return nil, nil, nil
}

// If region peer count is not equal to max peer count, no need to do leader transfer.
if len(region.GetPeers()) != int(cluster.getMeta().GetMaxPeerCount()) {
region, _, target := scheduleLeader(cluster, lb.selector)
if region == nil {
return nil, nil, nil
}
newLeader := region.GetStorePeer(target.GetId())

score, ok := checkAndGetDiffScore(cluster, region.Leader, newLeader, lb.st, lb.cfg)
if !ok {
Expand Down
12 changes: 12 additions & 0 deletions server/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,18 @@ func (c *clusterInfo) randFollowerRegion(storeID uint64) *regionInfo {
return c.regions.randFollowerRegion(storeID)
}

func (c *clusterInfo) getRegionStores(region *regionInfo) []*storeInfo {
c.RLock()
defer c.RUnlock()
var stores []*storeInfo
for id := range region.GetStoreIds() {
if store := c.stores.getStore(id); store != nil {
stores = append(stores, store)
}
}
return stores
}

// handleStoreHeartbeat updates the store status.
func (c *clusterInfo) handleStoreHeartbeat(stats *pdpb.StoreStats) error {
c.Lock()
Expand Down
6 changes: 6 additions & 0 deletions server/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,12 @@ func (s *testClusterInfoSuite) testRegionHeartbeat(c *C, cache *clusterInfo) {
c.Assert(region, DeepEquals, regions[region.GetId()].Region)
}

for _, region := range regions {
for _, store := range cache.getRegionStores(region) {
c.Assert(region.GetStorePeer(store.GetId()), NotNil)
}
}

// Test with kv.
if kv := cache.kv; kv != nil {
for _, region := range regions {
Expand Down
37 changes: 37 additions & 0 deletions server/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package server

func scheduleLeader(cluster *clusterInfo, s Selector) (*regionInfo, *storeInfo, *storeInfo) {
sourceStores := cluster.getStores()

source := s.SelectSource(sourceStores)
if source == nil {
return nil, nil, nil
}

region := cluster.randLeaderRegion(source.GetId())
if region == nil {
return nil, nil, nil
}

targetStores := cluster.getRegionStores(region)

target := s.SelectTarget(targetStores)
if target == nil || target.GetId() == source.GetId() {
return nil, nil, nil
}

return region, source, target
}
62 changes: 62 additions & 0 deletions server/selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package server

// Selector is an interface to select source and target store to schedule.
type Selector interface {
SelectSource(stores []*storeInfo, filters ...Filter) *storeInfo
SelectTarget(stores []*storeInfo, filters ...Filter) *storeInfo
}

type balanceSelector struct {
scorer Scorer
filters []Filter
}

func newBalanceSelector(scorer Scorer, filters []Filter) *balanceSelector {
return &balanceSelector{
scorer: scorer,
filters: filters,
}
}

func (s *balanceSelector) SelectSource(stores []*storeInfo, filters ...Filter) *storeInfo {
filters = append(s.filters, filters...)

var result *storeInfo
for _, store := range stores {
if filterSource(store, filters) {
continue
}
if result == nil || s.scorer.Score(result) < s.scorer.Score(store) {
result = store
}
}
return result
}

func (s *balanceSelector) SelectTarget(stores []*storeInfo, filters ...Filter) *storeInfo {
filters = append(s.filters, filters...)

var result *storeInfo
for _, store := range stores {
if filterTarget(store, filters) {
continue
}
if result == nil || s.scorer.Score(result) > s.scorer.Score(store) {
result = store
}
}
return result
}

0 comments on commit 3f75b90

Please sign in to comment.