Skip to content

Commit

Permalink
server/scheduler: add grant-leader-scheduler (#406)
Browse files Browse the repository at this point in the history
* server/coordinator: combine scheduler and controller
  • Loading branch information
huachaohuang committed Dec 8, 2016
1 parent a691ee9 commit 3cb0604
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 22 deletions.
12 changes: 3 additions & 9 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newLeaderBalancer(opt *scheduleOption) *leaderBalancer {
}

func (l *leaderBalancer) GetName() string {
return "leader_balancer"
return "leader-balancer"
}

func (l *leaderBalancer) GetResourceKind() ResourceKind {
Expand All @@ -53,13 +53,7 @@ func (l *leaderBalancer) Schedule(cluster *clusterInfo) *balanceOperator {
return nil
}

newLeader := region.GetStorePeer(target.GetId())
if newLeader == nil {
return nil
}

transferLeader := newTransferLeaderOperator(region.GetId(), region.Leader, newLeader)
return newBalanceOperator(region, balanceOP, transferLeader)
return transferLeader(region, target.GetId())
}

type storageBalancer struct {
Expand All @@ -80,7 +74,7 @@ func newStorageBalancer(opt *scheduleOption) *storageBalancer {
}

func (s *storageBalancer) GetName() string {
return "storage_balancer"
return "storage-balancer"
}

func (s *storageBalancer) GetResourceKind() ResourceKind {
Expand Down
83 changes: 71 additions & 12 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type coordinator struct {
opt *scheduleOption
checker *replicaChecker

operators map[ResourceKind]map[uint64]*balanceOperator
schedulers map[string]*ScheduleController
operators map[ResourceKind]map[uint64]*balanceOperator

regionCache *expireRegionCache
histories *lruCache
Expand All @@ -51,6 +52,7 @@ func newCoordinator(cluster *clusterInfo, opt *scheduleOption) *coordinator {
cluster: cluster,
opt: opt,
checker: newReplicaChecker(cluster, opt),
schedulers: make(map[string]*ScheduleController),
regionCache: newExpireRegionCache(regionCacheTTL, regionCacheTTL),
histories: newLRUCache(historiesCacheSize),
events: newFifoCache(eventsCacheSize),
Expand Down Expand Up @@ -86,36 +88,73 @@ func (c *coordinator) dispatch(region *regionInfo) *pdpb.RegionHeartbeatResponse
}

func (c *coordinator) run() {
c.wg.Add(1)
go c.runScheduler(newLeaderBalancer(c.opt), newLeaderController(c))

c.wg.Add(1)
go c.runScheduler(newStorageBalancer(c.opt), newStorageController(c))
c.addScheduler(newLeaderScheduleController(c, newLeaderBalancer(c.opt)))
c.addScheduler(newStorageScheduleController(c, newStorageBalancer(c.opt)))
}

func (c *coordinator) stop() {
c.cancel()
c.wg.Wait()
}

func (c *coordinator) runScheduler(s Scheduler, ctrl Controller) {
func (c *coordinator) getSchedulers() []string {
c.RLock()
defer c.RUnlock()

var names []string
for name := range c.schedulers {
names = append(names, name)
}
return names
}

func (c *coordinator) addScheduler(s *ScheduleController) bool {
c.Lock()
defer c.Unlock()

if _, ok := c.schedulers[s.GetName()]; ok {
return false
}

c.wg.Add(1)
go c.runScheduler(s)

c.schedulers[s.GetName()] = s
return true
}

func (c *coordinator) removeScheduler(name string) bool {
c.Lock()
defer c.Unlock()

s, ok := c.schedulers[name]
if !ok {
return false
}

s.Stop()
delete(c.schedulers, name)
return true
}

func (c *coordinator) runScheduler(s *ScheduleController) {
defer c.wg.Done()

timer := time.NewTimer(ctrl.GetInterval())
timer := time.NewTimer(s.GetInterval())
defer timer.Stop()

for {
select {
case <-timer.C:
timer.Reset(ctrl.GetInterval())
if !ctrl.AllowSchedule() {
timer.Reset(s.GetInterval())
if !s.AllowSchedule() {
continue
}
if op := s.Schedule(c.cluster); op != nil {
c.addOperator(s.GetResourceKind(), op)
}
case <-ctrl.Ctx().Done():
log.Infof("%v stopped: %v", s.GetName(), ctrl.Ctx().Err())
case <-s.Ctx().Done():
log.Infof("%v stopped: %v", s.GetName(), s.Ctx().Err())
return
}
}
Expand Down Expand Up @@ -269,6 +308,26 @@ func (s *storageController) AllowSchedule() bool {
return s.c.getOperatorCount(storageKind) < int(s.c.opt.GetStorageScheduleLimit())
}

// ScheduleController combines Scheduler with Controller.
type ScheduleController struct {
Scheduler
Controller
}

func newLeaderScheduleController(c *coordinator, s Scheduler) *ScheduleController {
return &ScheduleController{
Scheduler: s,
Controller: newLeaderController(c),
}
}

func newStorageScheduleController(c *coordinator, s Scheduler) *ScheduleController {
return &ScheduleController{
Scheduler: s,
Controller: newStorageController(c),
}
}

func collectOperatorCounterMetrics(bop *balanceOperator) {
metrics := make(map[string]uint64)
prefix := ""
Expand Down
44 changes: 43 additions & 1 deletion server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (s *testCoordinatorSuite) TestSchedule(c *C) {
cfg.MinBalanceDiffRatio = 0.1
cfg.LeaderScheduleInterval.Duration = 100 * time.Millisecond
cfg.StorageScheduleInterval.Duration = 100 * time.Millisecond
cluster.putMeta(&metapb.Cluster{Id: 1, MaxPeerCount: 3})

co := newCoordinator(cluster, opt)
co.run()
Expand Down Expand Up @@ -141,6 +140,49 @@ func (s *testCoordinatorSuite) TestSchedule(c *C) {
checkRemovePeerResp(c, resp, 4)
}

func (s *testCoordinatorSuite) TestAddScheduler(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)

cfg, opt := newTestScheduleConfig()
cfg.LeaderScheduleInterval.Duration = 10 * time.Millisecond

co := newCoordinator(cluster, opt)
co.run()
defer co.stop()

c.Assert(co.schedulers, HasLen, 2)
c.Assert(co.removeScheduler("leader-balancer"), IsTrue)
c.Assert(co.removeScheduler("storage-balancer"), IsTrue)
c.Assert(co.schedulers, HasLen, 0)

// Add stores 1,2,3
tc.addLeaderStore(1, 1, 1)
tc.addLeaderStore(2, 1, 1)
tc.addLeaderStore(3, 1, 1)
// Add regions 1 with leader in store 1 and followers in stores 2,3
tc.addLeaderRegion(1, 1, 2, 3)
// Add regions 2 with leader in store 2 and followers in stores 1,3
tc.addLeaderRegion(2, 2, 1, 3)
// Add regions 3 with leader in store 3 and followers in stores 1,2
tc.addLeaderRegion(3, 3, 1, 2)

gls := newGrantLeaderScheduler(1)
c.Assert(co.removeScheduler(gls.GetName()), IsFalse)
c.Assert(co.addScheduler(newLeaderScheduleController(co, gls)), IsTrue)

// Transfer all leaders to store 1.
time.Sleep(100 * time.Millisecond)
checkTransferLeaderResp(c, co.dispatch(cluster.getRegion(2)), 1)
tc.addLeaderRegion(2, 1, 2, 3)
time.Sleep(100 * time.Millisecond)
checkTransferLeaderResp(c, co.dispatch(cluster.getRegion(3)), 1)
tc.addLeaderRegion(3, 1, 2, 3)
time.Sleep(100 * time.Millisecond)
c.Assert(co.dispatch(cluster.getRegion(2)), IsNil)
c.Assert(co.dispatch(cluster.getRegion(3)), IsNil)
}

var _ = Suite(&testControllerSuite{})

type testControllerSuite struct{}
Expand Down
35 changes: 35 additions & 0 deletions server/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,41 @@ type Scheduler interface {
Schedule(cluster *clusterInfo) *balanceOperator
}

// grantLeaderScheduler transfers all leaders to peers in the store.
type grantLeaderScheduler struct {
StoreID uint64 `json:"store_id"`
}

func newGrantLeaderScheduler(storeID uint64) *grantLeaderScheduler {
return &grantLeaderScheduler{StoreID: storeID}
}

func (s *grantLeaderScheduler) GetName() string {
return "grant-leader-scheduler"
}

func (s *grantLeaderScheduler) GetResourceKind() ResourceKind {
return leaderKind
}

func (s *grantLeaderScheduler) Schedule(cluster *clusterInfo) *balanceOperator {
region := cluster.randFollowerRegion(s.StoreID)
if region == nil {
return nil
}
return transferLeader(region, s.StoreID)
}

// transferLeader returns an operator to transfer leader to the store.
func transferLeader(region *regionInfo, storeID uint64) *balanceOperator {
newLeader := region.GetStorePeer(storeID)
if newLeader == nil {
return nil
}
transferLeader := newTransferLeaderOperator(region.GetId(), region.Leader, newLeader)
return newBalanceOperator(region, balanceOP, transferLeader)
}

// scheduleLeader schedules a region to transfer leader from the source store to the target store.
func scheduleLeader(cluster *clusterInfo, s Selector) (*regionInfo, *storeInfo, *storeInfo) {
sourceStores := cluster.getStores()
Expand Down

0 comments on commit 3cb0604

Please sign in to comment.