diff --git a/server/balancer.go b/server/balancer.go index b8db15c79665..a0055cc8a77f 100644 --- a/server/balancer.go +++ b/server/balancer.go @@ -201,16 +201,18 @@ func (cb *capacityBalancer) Balance(cluster *clusterInfo) (*score, *balanceOpera } type leaderBalancer struct { - filters []Filter - st scoreType - - cfg *BalanceConfig + cfg *BalanceConfig + st scoreType + selector Selector } func newLeaderBalancer(cfg *BalanceConfig) *leaderBalancer { + var filters []Filter + filters = append(filters, newStateFilter(cfg)) + filters = append(filters, newLeaderCountFilter(cfg)) + lb := &leaderBalancer{cfg: cfg, st: leaderScore} - lb.filters = append(lb.filters, newStateFilter(cfg)) - lb.filters = append(lb.filters, newLeaderCountFilter(cfg)) + lb.selector = newBalanceSelector(newScorer(lb.st), filters) return lb } @@ -218,61 +220,14 @@ func (lb *leaderBalancer) ScoreType() scoreType { return lb.st } -// selectBalanceRegion tries to select a store leader region to do balance. -func (lb *leaderBalancer) selectBalanceRegion(cluster *clusterInfo, stores []*storeInfo) (*regionInfo, *metapb.Peer) { - store := selectFromStore(stores, nil, lb.filters, lb.st) - if store == nil { - return nil, nil - } - - // Random select one leader region from store. - region := cluster.randLeaderRegion(store.GetId()) - if region == nil { - return nil, nil - } - - newLeader := lb.selectNewLeaderPeer(cluster, region.GetFollowers()) - if newLeader == nil { - return nil, nil - } - - return region, newLeader -} - -func (lb *leaderBalancer) selectNewLeaderPeer(cluster *clusterInfo, peers map[uint64]*metapb.Peer) *metapb.Peer { - stores := make([]*storeInfo, 0, len(peers)) - for storeID := range peers { - stores = append(stores, cluster.getStore(storeID)) - } - - store := selectToStore(stores, nil, nil, lb.st) - if store == nil { - return nil - } - - storeID := store.GetId() - return peers[storeID] -} - // Balance tries to select a store region to do balance. // The balance type is leader transfer. func (lb *leaderBalancer) Balance(cluster *clusterInfo) (*score, *balanceOperator, error) { - // If cluster max peer count config is 1, we cannot do leader transfer, - meta := cluster.getMeta() - if meta.GetMaxPeerCount() == 1 { - return nil, nil, nil - } - - stores := cluster.getStores() - region, newLeader := lb.selectBalanceRegion(cluster, stores) - if region == nil || newLeader == nil { - return nil, nil, nil - } - - // If region peer count is not equal to max peer count, no need to do leader transfer. - if len(region.GetPeers()) != int(cluster.getMeta().GetMaxPeerCount()) { + region, _, target := scheduleLeader(cluster, lb.selector) + if region == nil { return nil, nil, nil } + newLeader := region.GetStorePeer(target.GetId()) score, ok := checkAndGetDiffScore(cluster, region.Leader, newLeader, lb.st, lb.cfg) if !ok { diff --git a/server/cache.go b/server/cache.go index 2bb688f0dd6a..9f913ba3c1b7 100644 --- a/server/cache.go +++ b/server/cache.go @@ -420,6 +420,18 @@ func (c *clusterInfo) randFollowerRegion(storeID uint64) *regionInfo { return c.regions.randFollowerRegion(storeID) } +func (c *clusterInfo) getRegionStores(region *regionInfo) []*storeInfo { + c.RLock() + defer c.RUnlock() + var stores []*storeInfo + for id := range region.GetStoreIds() { + if store := c.stores.getStore(id); store != nil { + stores = append(stores, store) + } + } + return stores +} + // handleStoreHeartbeat updates the store status. func (c *clusterInfo) handleStoreHeartbeat(stats *pdpb.StoreStats) error { c.Lock() diff --git a/server/cache_test.go b/server/cache_test.go index d5a1c14a1bb8..85e5a7656202 100644 --- a/server/cache_test.go +++ b/server/cache_test.go @@ -353,6 +353,12 @@ func (s *testClusterInfoSuite) testRegionHeartbeat(c *C, cache *clusterInfo) { c.Assert(region, DeepEquals, regions[region.GetId()].Region) } + for _, region := range regions { + for _, store := range cache.getRegionStores(region) { + c.Assert(region.GetStorePeer(store.GetId()), NotNil) + } + } + // Test with kv. if kv := cache.kv; kv != nil { for _, region := range regions { diff --git a/server/scheduler.go b/server/scheduler.go new file mode 100644 index 000000000000..eb1678275771 --- /dev/null +++ b/server/scheduler.go @@ -0,0 +1,37 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +func scheduleLeader(cluster *clusterInfo, s Selector) (*regionInfo, *storeInfo, *storeInfo) { + sourceStores := cluster.getStores() + + source := s.SelectSource(sourceStores) + if source == nil { + return nil, nil, nil + } + + region := cluster.randLeaderRegion(source.GetId()) + if region == nil { + return nil, nil, nil + } + + targetStores := cluster.getRegionStores(region) + + target := s.SelectTarget(targetStores) + if target == nil || target.GetId() == source.GetId() { + return nil, nil, nil + } + + return region, source, target +} diff --git a/server/selector.go b/server/selector.go new file mode 100644 index 000000000000..732871315a6a --- /dev/null +++ b/server/selector.go @@ -0,0 +1,62 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +// Selector is an interface to select source and target store to schedule. +type Selector interface { + SelectSource(stores []*storeInfo, filters ...Filter) *storeInfo + SelectTarget(stores []*storeInfo, filters ...Filter) *storeInfo +} + +type balanceSelector struct { + scorer Scorer + filters []Filter +} + +func newBalanceSelector(scorer Scorer, filters []Filter) *balanceSelector { + return &balanceSelector{ + scorer: scorer, + filters: filters, + } +} + +func (s *balanceSelector) SelectSource(stores []*storeInfo, filters ...Filter) *storeInfo { + filters = append(s.filters, filters...) + + var result *storeInfo + for _, store := range stores { + if filterSource(store, filters) { + continue + } + if result == nil || s.scorer.Score(result) < s.scorer.Score(store) { + result = store + } + } + return result +} + +func (s *balanceSelector) SelectTarget(stores []*storeInfo, filters ...Filter) *storeInfo { + filters = append(s.filters, filters...) + + var result *storeInfo + for _, store := range stores { + if filterTarget(store, filters) { + continue + } + if result == nil || s.scorer.Score(result) > s.scorer.Score(store) { + result = store + } + } + return result +}