Skip to content

Commit

Permalink
Merge 67179a5 into 9093d4a
Browse files Browse the repository at this point in the history
  • Loading branch information
huachaohuang committed Dec 7, 2016
2 parents 9093d4a + 67179a5 commit 8ce2655
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 0 deletions.
5 changes: 5 additions & 0 deletions server/api/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func (h *schedulerHandler) Post(w http.ResponseWriter, r *http.Request) {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
case "shuffle-leader-scheduler":
if err := h.AddShuffleLeaderScheduler(); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}

h.r.JSON(w, http.StatusOK, nil)
Expand Down
5 changes: 5 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,8 @@ func (h *Handler) AddLeaderBalancer() error {
func (h *Handler) AddGrantLeaderScheduler(storeID uint64) error {
return h.AddLeaderScheduler(newGrantLeaderScheduler(storeID))
}

// AddShuffleLeaderScheduler adds a shuffle-leader-scheduler.
func (h *Handler) AddShuffleLeaderScheduler() error {
return h.AddLeaderScheduler(newShuffleLeaderScheduler())
}
48 changes: 48 additions & 0 deletions server/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,54 @@ func (s *grantLeaderScheduler) Schedule(cluster *clusterInfo) *balanceOperator {
return transferLeader(region, s.StoreID)
}

type shuffleLeaderScheduler struct {
selector Selector
source *storeInfo
}

func newShuffleLeaderScheduler() *shuffleLeaderScheduler {
return &shuffleLeaderScheduler{
selector: newRandomSelector(),
}
}

func (s *shuffleLeaderScheduler) GetName() string {
return "shuffle-leader-scheduler"
}

func (s *shuffleLeaderScheduler) GetResourceKind() ResourceKind {
return leaderKind
}

func (s *shuffleLeaderScheduler) Schedule(cluster *clusterInfo) *balanceOperator {
// We shuffle leaders between stores:
// 1. select a store as a source store randomly.
// 2. transfer a leader from the store to another store.
// 3. transfer a leader to the store from another store.
// These will not change store's leader count, but swap leaders between stores.

// Select a source store and transfer a leader from it.
if s.source == nil {
region, source, target := scheduleLeader(cluster, s.selector)
if region == nil {
return nil
}
s.source = source // Mark the source store.
return transferLeader(region, target.GetId())
}

// Reset the source store.
source := s.source
s.source = nil

// Transfer a leader to the source store.
region := cluster.randFollowerRegion(source.GetId())
if region == nil {
return nil
}
return transferLeader(region, source.GetId())
}

// transferLeader returns an operator to transfer leader to the store.
func transferLeader(region *regionInfo, storeID uint64) *balanceOperator {
newLeader := region.GetStorePeer(storeID)
Expand Down
54 changes: 54 additions & 0 deletions server/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import . "github.com/pingcap/check"

var _ = Suite(&testShuffleLeaderSuite{})

type testShuffleLeaderSuite struct{}

func (s *testShuffleLeaderSuite) Test(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)

sl := newShuffleLeaderScheduler()
c.Assert(sl.Schedule(cluster), IsNil)

// Add stores 1,2,3,4
tc.addLeaderStore(1, 6, 30)
tc.addLeaderStore(2, 7, 30)
tc.addLeaderStore(3, 8, 30)
tc.addLeaderStore(4, 9, 30)
// Add regions 1,2,3,4 with leaders in stores 1,2,3,4
tc.addLeaderRegion(1, 1, 2, 3, 4)
tc.addLeaderRegion(1, 2, 3, 4, 1)
tc.addLeaderRegion(2, 2, 3, 4, 1)
tc.addLeaderRegion(2, 3, 4, 1, 2)
tc.addLeaderRegion(3, 3, 4, 1, 2)
tc.addLeaderRegion(3, 4, 1, 2, 3)
tc.addLeaderRegion(4, 4, 1, 2, 3)
tc.addLeaderRegion(4, 1, 2, 3, 4)

for i := 0; i < 4; i++ {
bop := sl.Schedule(cluster)
op := bop.Ops[0].(*transferLeaderOperator)

sourceID := op.OldLeader.GetStoreId()

bop = sl.Schedule(cluster)
op = bop.Ops[0].(*transferLeaderOperator)
c.Assert(op.NewLeader.GetStoreId(), Equals, sourceID)
}
}
37 changes: 37 additions & 0 deletions server/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package server

import "math/rand"

// Selector is an interface to select source and target store to schedule.
type Selector interface {
SelectSource(stores []*storeInfo, filters ...Filter) *storeInfo
Expand Down Expand Up @@ -60,3 +62,38 @@ func (s *balanceSelector) SelectTarget(stores []*storeInfo, filters ...Filter) *
}
return result
}

type randomSelector struct{}

func newRandomSelector() *randomSelector {
return &randomSelector{}
}

func (s *randomSelector) Select(stores []*storeInfo) *storeInfo {
if len(stores) == 0 {
return nil
}
return stores[rand.Int()%len(stores)]
}

func (s *randomSelector) SelectSource(stores []*storeInfo, filters ...Filter) *storeInfo {
var candidates []*storeInfo
for _, store := range stores {
if filterSource(store, filters) {
continue
}
candidates = append(candidates, store)
}
return s.Select(candidates)
}

func (s *randomSelector) SelectTarget(stores []*storeInfo, filters ...Filter) *storeInfo {
var candidates []*storeInfo
for _, store := range stores {
if filterTarget(store, filters) {
continue
}
candidates = append(candidates, store)
}
return s.Select(candidates)
}
2 changes: 2 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package server

import (
"math/rand"
"net/http"
"path"
"strconv"
Expand Down Expand Up @@ -98,6 +99,7 @@ func NewServer(cfg *Config) (*Server, error) {
// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(cfg *Config) (*Server, error) {
log.Infof("PD config - %v", cfg)
rand.Seed(time.Now().UnixNano())

s := &Server{
cfg: cfg,
Expand Down

0 comments on commit 8ce2655

Please sign in to comment.