Skip to content

Commit

Permalink
server: add regionInfo
Browse files Browse the repository at this point in the history
Wrap region information into regionInfo. regionInfo will be cached after
optimizating the cache, so that we can access different information
about a region more easily.

This commit is tedious and breaks a lot of trivial code.
  • Loading branch information
huachaohuang committed Oct 21, 2016
1 parent 4f67c48 commit 9b99e96
Show file tree
Hide file tree
Showing 13 changed files with 284 additions and 194 deletions.
79 changes: 30 additions & 49 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
)

var (
Expand Down Expand Up @@ -127,26 +126,21 @@ func (cb *capacityBalancer) ScoreType() scoreType {
return cb.st
}

func (cb *capacityBalancer) selectBalanceRegion(cluster *clusterInfo, stores []*storeInfo) (*metapb.Region, *metapb.Peer, *metapb.Peer) {
func (cb *capacityBalancer) selectBalanceRegion(cluster *clusterInfo, stores []*storeInfo) (*regionInfo, *metapb.Peer) {
store := selectFromStore(stores, nil, cb.filters, cb.st)
if store == nil {
return nil, nil, nil
return nil, nil
}

storeID := store.store.GetId()
meta := cluster.getMeta()
if meta.GetMaxPeerCount() == 1 {
region := cluster.regions.randLeaderRegion(storeID)
if region == nil {
return nil, nil, nil
}

leader := leaderPeer(region, storeID)
return region, leader, leader
region := cluster.randFollowerRegion(storeID)
if region == nil {
region = cluster.randLeaderRegion(storeID)
}

// Random select one follower region from store.
return cluster.regions.randRegion(storeID)
if region == nil {
return nil, nil
}
return region, region.GetStorePeer(storeID)
}

func (cb *capacityBalancer) selectAddPeer(cluster *clusterInfo, stores []*storeInfo, excluded map[uint64]struct{}) (*metapb.Peer, error) {
Expand Down Expand Up @@ -187,8 +181,8 @@ func (cb *capacityBalancer) selectRemovePeer(cluster *clusterInfo, peers map[uin
// The balance type is follower balance.
func (cb *capacityBalancer) Balance(cluster *clusterInfo) (*score, *balanceOperator, error) {
stores := cluster.getStores()
region, leader, peer := cb.selectBalanceRegion(cluster, stores)
if region == nil || leader == nil || peer == nil {
region, peer := cb.selectBalanceRegion(cluster, stores)
if region == nil || peer == nil {
return nil, nil, nil
}

Expand All @@ -198,8 +192,7 @@ func (cb *capacityBalancer) Balance(cluster *clusterInfo) (*score, *balanceOpera
}

// Select one store to add new peer.
excluded := getExcludedStores(region)
newPeer, err := cb.selectAddPeer(cluster, stores, excluded)
newPeer, err := cb.selectAddPeer(cluster, stores, region.GetStoreIds())
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -237,31 +230,25 @@ func (lb *leaderBalancer) ScoreType() scoreType {
}

// selectBalanceRegion tries to select a store leader region to do balance.
func (lb *leaderBalancer) selectBalanceRegion(cluster *clusterInfo, stores []*storeInfo) (*metapb.Region, *metapb.Peer, *metapb.Peer) {
func (lb *leaderBalancer) selectBalanceRegion(cluster *clusterInfo, stores []*storeInfo) (*regionInfo, *metapb.Peer) {
store := selectFromStore(stores, nil, lb.filters, lb.st)
if store == nil {
return nil, nil, nil
return nil, nil
}

// Random select one leader region from store.
storeID := store.store.GetId()
region := cluster.regions.randLeaderRegion(storeID)
region := cluster.randLeaderRegion(storeID)
if region == nil {
return nil, nil, nil
}

leader := leaderPeer(region, storeID)
if leader == nil {
return nil, nil, nil
return nil, nil
}

followers := getFollowerPeers(region, leader)
newLeader := lb.selectNewLeaderPeer(cluster, followers)
newLeader := lb.selectNewLeaderPeer(cluster, region.GetFollowers())
if newLeader == nil {
return nil, nil, nil
return nil, nil
}

return region, leader, newLeader
return region, newLeader
}

func (lb *leaderBalancer) selectNewLeaderPeer(cluster *clusterInfo, peers map[uint64]*metapb.Peer) *metapb.Peer {
Expand Down Expand Up @@ -289,8 +276,8 @@ func (lb *leaderBalancer) Balance(cluster *clusterInfo) (*score, *balanceOperato
}

stores := cluster.getStores()
region, leader, newLeader := lb.selectBalanceRegion(cluster, stores)
if region == nil || leader == nil || newLeader == nil {
region, newLeader := lb.selectBalanceRegion(cluster, stores)
if region == nil || newLeader == nil {
return nil, nil, nil
}

Expand All @@ -299,39 +286,34 @@ func (lb *leaderBalancer) Balance(cluster *clusterInfo) (*score, *balanceOperato
return nil, nil, nil
}

score, ok := checkAndGetDiffScore(cluster, leader, newLeader, lb.st, lb.cfg)
score, ok := checkAndGetDiffScore(cluster, region.Leader, newLeader, lb.st, lb.cfg)
if !ok {
return nil, nil, nil
}

regionID := region.GetId()
transferLeaderOperator := newTransferLeaderOperator(regionID, leader, newLeader, lb.cfg)
transferLeaderOperator := newTransferLeaderOperator(regionID, region.Leader, newLeader, lb.cfg)
return score, newBalanceOperator(region, balanceOP, transferLeaderOperator), nil
}

// replicaBalancer is used to balance active replica count.
type replicaBalancer struct {
*capacityBalancer
cfg *BalanceConfig
region *metapb.Region
leader *metapb.Peer
downPeers []*pdpb.PeerStats
cfg *BalanceConfig
region *regionInfo
}

func newReplicaBalancer(region *metapb.Region, leader *metapb.Peer, downPeers []*pdpb.PeerStats, cfg *BalanceConfig) *replicaBalancer {
func newReplicaBalancer(region *regionInfo, cfg *BalanceConfig) *replicaBalancer {
return &replicaBalancer{
cfg: cfg,
region: region,
leader: leader,
downPeers: downPeers,
capacityBalancer: newCapacityBalancer(cfg),
}
}

func (rb *replicaBalancer) addPeer(cluster *clusterInfo) (*balanceOperator, error) {
stores := cluster.getStores()
excluded := getExcludedStores(rb.region)
peer, err := rb.selectAddPeer(cluster, stores, excluded)
peer, err := rb.selectAddPeer(cluster, stores, rb.region.GetStoreIds())
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -350,8 +332,7 @@ func (rb *replicaBalancer) removePeer(cluster *clusterInfo, badPeers []*metapb.P
peer = badPeers[0]
} else {
var err error
followers := getFollowerPeers(rb.region, rb.leader)
peer, err = rb.selectRemovePeer(cluster, followers)
peer, err = rb.selectRemovePeer(cluster, rb.region.GetFollowers())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -386,8 +367,8 @@ func (rb *replicaBalancer) collectBadPeers(cluster *clusterInfo) []*metapb.Peer
}

func (rb *replicaBalancer) collectDownPeers(cluster *clusterInfo) []*metapb.Peer {
downPeers := make([]*metapb.Peer, 0, len(rb.downPeers))
for _, stats := range rb.downPeers {
downPeers := make([]*metapb.Peer, 0, len(rb.region.DownPeers))
for _, stats := range rb.region.DownPeers {
peer := stats.GetPeer()
if peer == nil {
continue
Expand Down
Loading

0 comments on commit 9b99e96

Please sign in to comment.