Skip to content

Commit

Permalink
Merge dd62393 into 9e7cb96
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed Jul 5, 2017
2 parents 9e7cb96 + dd62393 commit d6de88b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 17 deletions.
46 changes: 29 additions & 17 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (s *balanceRegionScheduler) transferPeer(cluster *clusterInfo, region *Regi
scoreGuard := newDistinctScoreFilter(s.rep, stores, source)

checker := newReplicaChecker(s.opt, cluster)
newPeer, _ := checker.selectBestPeer(region, scoreGuard)
newPeer := checker.SelectBestPeerToAddReplica(region, scoreGuard)
if newPeer == nil {
return nil
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func (r *replicaChecker) Check(region *RegionInfo) Operator {
}

if len(region.GetPeers()) < r.rep.GetMaxReplicas() {
newPeer, _ := r.selectBestPeer(region, r.filters...)
newPeer := r.SelectBestPeerToAddReplica(region, r.filters...)
if newPeer == nil {
return nil
}
Expand All @@ -257,8 +257,21 @@ func (r *replicaChecker) Check(region *RegionInfo) Operator {
return r.checkBestReplacement(region)
}

// selectBestPeer returns the best peer in other stores.
func (r *replicaChecker) selectBestPeer(region *RegionInfo, filters ...Filter) (*metapb.Peer, float64) {
// SelectBestPeerToAddReplica returns a new peer that to be used to add a replica.
func (r *replicaChecker) SelectBestPeerToAddReplica(region *RegionInfo, filters ...Filter) *metapb.Peer {
storeID, _ := r.SelectBestStoreToAddReplica(region, filters...)
if storeID == 0 {
return nil
}
newPeer, err := r.cluster.allocPeer(storeID)
if err != nil {
return nil
}
return newPeer
}

// SelectBestStoreToAddReplica returns the store to add a replica.
func (r *replicaChecker) SelectBestStoreToAddReplica(region *RegionInfo, filters ...Filter) (uint64, float64) {
// Add some must have filters.
filters = append(filters, newStateFilter(r.opt))
filters = append(filters, newStorageThresholdFilter(r.opt))
Expand All @@ -284,15 +297,10 @@ func (r *replicaChecker) selectBestPeer(region *RegionInfo, filters ...Filter) (
}

if bestStore == nil || filterTarget(bestStore, r.filters) {
return nil, 0
return 0, 0
}

newPeer, err := r.cluster.allocPeer(bestStore.GetId())
if err != nil {
log.Errorf("failed to allocate peer: %v", err)
return nil, 0
}
return newPeer, bestScore
return bestStore.GetId(), bestScore
}

// selectWorstPeer returns the worst peer in the region.
Expand Down Expand Up @@ -322,12 +330,12 @@ func (r *replicaChecker) selectWorstPeer(region *RegionInfo, filters ...Filter)
return region.GetStorePeer(worstStore.GetId()), worstScore
}

// selectBestReplacement returns the best peer to replace the region peer.
func (r *replicaChecker) selectBestReplacement(region *RegionInfo, peer *metapb.Peer) (*metapb.Peer, float64) {
// selectBestReplacement returns the best store to replace the region peer.
func (r *replicaChecker) selectBestReplacement(region *RegionInfo, peer *metapb.Peer) (uint64, float64) {
// Get a new region without the peer we are going to replace.
newRegion := region.clone()
newRegion.RemoveStorePeer(peer.GetStoreId())
return r.selectBestPeer(newRegion, newExcludedFilter(nil, region.GetStoreIds()))
return r.SelectBestStoreToAddReplica(newRegion, newExcludedFilter(nil, region.GetStoreIds()))
}

func (r *replicaChecker) checkDownPeer(region *RegionInfo) Operator {
Expand Down Expand Up @@ -368,7 +376,7 @@ func (r *replicaChecker) checkOfflinePeer(region *RegionInfo) Operator {
return newRemovePeer(region, peer)
}

newPeer, _ := r.selectBestPeer(region)
newPeer := r.SelectBestPeerToAddReplica(region)
if newPeer == nil {
return nil
}
Expand All @@ -383,14 +391,18 @@ func (r *replicaChecker) checkBestReplacement(region *RegionInfo) Operator {
if oldPeer == nil {
return nil
}
newPeer, newScore := r.selectBestReplacement(region, oldPeer)
if newPeer == nil {
storeID, newScore := r.selectBestReplacement(region, oldPeer)
if storeID == 0 {
return nil
}
// Make sure the new peer is better than the old peer.
if newScore <= oldScore {
return nil
}
newPeer, err := r.cluster.allocPeer(storeID)
if err != nil {
return nil
}
return newTransferPeer(region, RegionKind, oldPeer, newPeer)
}

Expand Down
1 change: 1 addition & 0 deletions server/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ func (c *clusterInfo) allocID() (uint64, error) {
func (c *clusterInfo) allocPeer(storeID uint64) (*metapb.Peer, error) {
peerID, err := c.allocID()
if err != nil {
log.Errorf("failed to alloc peer: %v", err)
return nil, errors.Trace(err)
}
peer := &metapb.Peer{
Expand Down

0 comments on commit d6de88b

Please sign in to comment.