Skip to content

Commit

Permalink
server/coordinator: replace balancer worker with coordinator (#398)
Browse files Browse the repository at this point in the history
* server/coordinator: replace balancer worker with coordinator

Coordinator can control the speed of different schedulers.

Every scheduler has a unique name, we can add API to run or stop any
schedulers dynamically, and different schedulers can run concurrently.
  • Loading branch information
huachaohuang committed Dec 7, 2016
1 parent e4ac0e0 commit 8a2fc58
Show file tree
Hide file tree
Showing 11 changed files with 554 additions and 662 deletions.
8 changes: 4 additions & 4 deletions conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ min-leader-count = 10
max-snapshot-count = 3
min-balance-diff-ratio = 0.01
max-store-down-duration = "30m"
balance-interval = "30s"
max-balance-count = 4
max-balance-retry-per-loop = 4
max-balance-count-per-loop = 1
leader-schedule-limit = 4
leader-schedule-interval = "30s"
storage-schedule-limit = 4
storage-schedule-interval = "30s"

[metric]
# prometheus client push interval, set "0s" to disable prometheus.
Expand Down
102 changes: 1 addition & 101 deletions server/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package server

import (
"fmt"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -82,6 +81,7 @@ func (c *testClusterInfo) addLeaderRegion(regionID uint64, leaderID uint64, foll

func (c *testClusterInfo) updateLeaderCount(storeID uint64, leaderCount, regionCount int) {
store := c.getStore(storeID)
store.stats.TotalRegionCount = regionCount
store.stats.LeaderRegionCount = leaderCount
c.putStore(store)
}
Expand Down Expand Up @@ -285,103 +285,3 @@ func checkTransferLeader(c *C, bop *balanceOperator, sourceID, targetID uint64)
c.Assert(op.OldLeader.GetStoreId(), Equals, sourceID)
c.Assert(op.NewLeader.GetStoreId(), Equals, targetID)
}

var _ = Suite(&testBalancerSuite{})

type testBalancerSuite struct {
testClusterBaseSuite

cfg *ScheduleConfig
opt *scheduleOption
}

func (s *testBalancerSuite) getRootPath() string {
return "test_balancer"
}

func (s *testBalancerSuite) SetUpSuite(c *C) {
s.cfg = newScheduleConfig()
s.cfg.adjust()
s.opt = newScheduleOption(s.cfg)
}

func (s *testBalancerSuite) newClusterInfo(c *C) *clusterInfo {
clusterInfo := newClusterInfo(newMockIDAllocator())

// Set cluster info.
meta := &metapb.Cluster{
Id: 0,
MaxPeerCount: 3,
}
clusterInfo.putMeta(meta)

var (
id uint64
peer *metapb.Peer
err error
)

// Add 4 stores, store id will be 1,2,3,4.
for i := 1; i < 5; i++ {
id, err = clusterInfo.allocID()
c.Assert(err, IsNil)

addr := fmt.Sprintf("127.0.0.1:%d", i)
store := s.newStore(c, id, addr)
clusterInfo.putStore(newStoreInfo(store))
}

// Add 1 peer, id will be 5.
id, err = clusterInfo.allocID()
c.Assert(err, IsNil)
peer = s.newPeer(c, 1, id)

// Add 1 region, id will be 6.
id, err = clusterInfo.allocID()
c.Assert(err, IsNil)

region := s.newRegion(c, id, []byte{}, []byte{}, []*metapb.Peer{peer}, nil)
clusterInfo.putRegion(newRegionInfo(region, peer))

stores := clusterInfo.getStores()
c.Assert(stores, HasLen, 4)

return clusterInfo
}

func (s *testBalancerSuite) updateStore(c *C, clusterInfo *clusterInfo, storeID uint64, capacity uint64, available uint64,
sendingSnapCount uint32, receivingSnapCount uint32, applyingSnapCount uint32) {
stats := &pdpb.StoreStats{
StoreId: storeID,
Capacity: capacity,
Available: available,
SendingSnapCount: sendingSnapCount,
ReceivingSnapCount: receivingSnapCount,
ApplyingSnapCount: applyingSnapCount,
}

c.Assert(clusterInfo.handleStoreHeartbeat(stats), IsNil)
}

func (s *testBalancerSuite) updateStoreState(c *C, clusterInfo *clusterInfo, storeID uint64, state metapb.StoreState) {
store := clusterInfo.getStore(storeID)
store.State = state
clusterInfo.putStore(store)
}

func (s *testBalancerSuite) addRegionPeer(c *C, clusterInfo *clusterInfo, storeID uint64, region *regionInfo) {
r := newReplicaChecker(clusterInfo, s.opt)
bop := r.Check(region)
c.Assert(bop, NotNil)

op, ok := bop.Ops[0].(*onceOperator).Op.(*changePeerOperator)
c.Assert(ok, IsTrue)
c.Assert(op.ChangePeer.GetChangeType(), Equals, raftpb.ConfChangeType_AddNode)

peer := op.ChangePeer.GetPeer()
c.Assert(peer.GetStoreId(), Equals, storeID)

addRegionPeer(c, region.Region, peer)

clusterInfo.putRegion(region)
}
Loading

0 comments on commit 8a2fc58

Please sign in to comment.