Skip to content

Commit

Permalink
Merge 20dc513 into 9ca675f
Browse files Browse the repository at this point in the history
  • Loading branch information
huachaohuang committed Dec 1, 2016
2 parents 9ca675f + 20dc513 commit 9db6e15
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 15 deletions.
12 changes: 3 additions & 9 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newLeaderBalancer(opt *scheduleOption) *leaderBalancer {
}

func (l *leaderBalancer) GetName() string {
return "leader_balancer"
return "leader-balancer"
}

func (l *leaderBalancer) GetResourceKind() ResourceKind {
Expand All @@ -53,13 +53,7 @@ func (l *leaderBalancer) Schedule(cluster *clusterInfo) *balanceOperator {
return nil
}

newLeader := region.GetStorePeer(target.GetId())
if newLeader == nil {
return nil
}

transferLeader := newTransferLeaderOperator(region.GetId(), region.Leader, newLeader)
return newBalanceOperator(region, balanceOP, transferLeader)
return transferLeader(region, target.GetId())
}

type storageBalancer struct {
Expand All @@ -80,7 +74,7 @@ func newStorageBalancer(opt *scheduleOption) *storageBalancer {
}

func (s *storageBalancer) GetName() string {
return "storage_balancer"
return "storage-balancer"
}

func (s *storageBalancer) GetResourceKind() ResourceKind {
Expand Down
54 changes: 49 additions & 5 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type coordinator struct {
opt *scheduleOption
checker *replicaChecker

schedulers map[string]Scheduler
controllers map[string]Controller

operators map[ResourceKind]map[uint64]*balanceOperator

regionCache *expireRegionCache
Expand All @@ -51,6 +54,8 @@ func newCoordinator(cluster *clusterInfo, opt *scheduleOption) *coordinator {
cluster: cluster,
opt: opt,
checker: newReplicaChecker(cluster, opt),
schedulers: make(map[string]Scheduler),
controllers: make(map[string]Controller),
regionCache: newExpireRegionCache(regionCacheTTL, regionCacheTTL),
histories: newLRUCache(historiesCacheSize),
events: newFifoCache(eventsCacheSize),
Expand Down Expand Up @@ -86,18 +91,57 @@ func (c *coordinator) dispatch(region *regionInfo) *pdpb.RegionHeartbeatResponse
}

func (c *coordinator) run() {
c.wg.Add(1)
go c.runScheduler(newLeaderBalancer(c.opt), newLeaderController(c))

c.wg.Add(1)
go c.runScheduler(newStorageBalancer(c.opt), newStorageController(c))
c.addScheduler(newLeaderBalancer(c.opt), newLeaderController(c))
c.addScheduler(newStorageBalancer(c.opt), newStorageController(c))
}

func (c *coordinator) stop() {
c.cancel()
c.wg.Wait()
}

func (c *coordinator) getSchedulers() []string {
c.RLock()
defer c.RUnlock()

var names []string
for name := range c.schedulers {
names = append(names, name)
}
return names
}

func (c *coordinator) addScheduler(s Scheduler, ctrl Controller) bool {
c.Lock()
defer c.Unlock()

if _, ok := c.schedulers[s.GetName()]; ok {
return false
}

c.wg.Add(1)
go c.runScheduler(s, ctrl)

c.schedulers[s.GetName()] = s
c.controllers[s.GetName()] = ctrl
return true
}

func (c *coordinator) removeScheduler(name string) bool {
c.Lock()
defer c.Unlock()

ctrl, ok := c.controllers[name]
if !ok {
return false
}

ctrl.Stop()
delete(c.schedulers, name)
delete(c.controllers, name)
return true
}

func (c *coordinator) runScheduler(s Scheduler, ctrl Controller) {
defer c.wg.Done()

Expand Down
44 changes: 43 additions & 1 deletion server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (s *testCoordinatorSuite) TestSchedule(c *C) {
cfg.MinBalanceDiffRatio = 0.1
cfg.LeaderScheduleInterval.Duration = 100 * time.Millisecond
cfg.StorageScheduleInterval.Duration = 100 * time.Millisecond
cluster.putMeta(&metapb.Cluster{Id: 1, MaxPeerCount: 3})

co := newCoordinator(cluster, opt)
co.run()
Expand Down Expand Up @@ -141,6 +140,49 @@ func (s *testCoordinatorSuite) TestSchedule(c *C) {
checkRemovePeerResp(c, resp, 4)
}

func (s *testCoordinatorSuite) TestAddScheduler(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)

cfg, opt := newTestScheduleConfig()
cfg.LeaderScheduleInterval.Duration = 10 * time.Millisecond

co := newCoordinator(cluster, opt)
co.run()
defer co.stop()

c.Assert(co.schedulers, HasLen, 2)
c.Assert(co.removeScheduler("leader-balancer"), IsTrue)
c.Assert(co.removeScheduler("storage-balancer"), IsTrue)
c.Assert(co.schedulers, HasLen, 0)

// Add stores 1,2,3
tc.addLeaderStore(1, 1, 1)
tc.addLeaderStore(2, 1, 1)
tc.addLeaderStore(3, 1, 1)
// Add regions 1 with leader in store 1 and followers in stores 2,3
tc.addLeaderRegion(1, 1, 2, 3)
// Add regions 2 with leader in store 2 and followers in stores 1,3
tc.addLeaderRegion(2, 2, 1, 3)
// Add regions 3 with leader in store 3 and followers in stores 1,2
tc.addLeaderRegion(3, 3, 1, 2)

gls := newGrantLeaderScheduler(1)
c.Assert(co.removeScheduler(gls.GetName()), IsFalse)
c.Assert(co.addScheduler(gls, newLeaderController(co)), IsTrue)

// Transfer all leaders to store 1.
time.Sleep(100 * time.Millisecond)
checkTransferLeaderResp(c, co.dispatch(cluster.getRegion(2)), 1)
tc.addLeaderRegion(2, 1, 2, 3)
time.Sleep(100 * time.Millisecond)
checkTransferLeaderResp(c, co.dispatch(cluster.getRegion(3)), 1)
tc.addLeaderRegion(3, 1, 2, 3)
time.Sleep(100 * time.Millisecond)
c.Assert(co.dispatch(cluster.getRegion(2)), IsNil)
c.Assert(co.dispatch(cluster.getRegion(3)), IsNil)
}

var _ = Suite(&testControllerSuite{})

type testControllerSuite struct{}
Expand Down
35 changes: 35 additions & 0 deletions server/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,41 @@ type Scheduler interface {
Schedule(cluster *clusterInfo) *balanceOperator
}

// grantLeaderScheduler transfers all leaders to peers in the store.
type grantLeaderScheduler struct {
StoreID uint64 `json:"store_id"`
}

func newGrantLeaderScheduler(storeID uint64) *grantLeaderScheduler {
return &grantLeaderScheduler{StoreID: storeID}
}

func (s *grantLeaderScheduler) GetName() string {
return "grant-leader-scheduler"
}

func (s *grantLeaderScheduler) GetResourceKind() ResourceKind {
return leaderKind
}

func (s *grantLeaderScheduler) Schedule(cluster *clusterInfo) *balanceOperator {
region := cluster.randFollowerRegion(s.StoreID)
if region == nil {
return nil
}
return transferLeader(region, s.StoreID)
}

// transferLeader returns an operator to transfer leader to the store.
func transferLeader(region *regionInfo, storeID uint64) *balanceOperator {
newLeader := region.GetStorePeer(storeID)
if newLeader == nil {
return nil
}
transferLeader := newTransferLeaderOperator(region.GetId(), region.Leader, newLeader)
return newBalanceOperator(region, balanceOP, transferLeader)
}

// scheduleLeader schedules a region to transfer leader from the source store to the target store.
func scheduleLeader(cluster *clusterInfo, s Selector) (*regionInfo, *storeInfo, *storeInfo) {
sourceStores := cluster.getStores()
Expand Down

0 comments on commit 9db6e15

Please sign in to comment.