Skip to content

Commit

Permalink
Merge 441b8e1 into 4acee1d
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed Dec 27, 2018
2 parents 4acee1d + 441b8e1 commit 618adc9
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 5 deletions.
5 changes: 5 additions & 0 deletions server/api/api.raml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ types:
ShuffleRegionScheduler:
type: Scheduler
discriminatorValue: shuffle-region-scheduler
ShuffleHotRegionScheduler:
type: Scheduler
discriminatorValue: shuffle-hot-region-scheduler
properties:
limit: integer
RandomMergeScheduler:
type: Scheduler
discriminatorValue: random-merge-scheduler
Expand Down
10 changes: 10 additions & 0 deletions server/api/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ func (h *schedulerHandler) Post(w http.ResponseWriter, r *http.Request) {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
case "shuffle-hot-region-scheduler":
limit := uint64(1)
l, ok := input["limit"].(float64)
if ok {
limit = uint64(l)
}
if err := h.AddShuffleHotRegionScheduler(limit); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
default:
h.r.JSON(w, http.StatusBadRequest, "unknown scheduler")
return
Expand Down
5 changes: 5 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ func (h *Handler) AddShuffleRegionScheduler() error {
return h.AddScheduler("shuffle-region")
}

// AddShuffleHotRegionScheduler adds a shuffle-hot-region-scheduler.
func (h *Handler) AddShuffleHotRegionScheduler(limit uint64) error {
return h.AddScheduler("shuffle-hot-region", strconv.FormatUint(limit, 10))
}

// AddRandomMergeScheduler adds a random-merge-scheduler.
func (h *Handler) AddRandomMergeScheduler() error {
return h.AddScheduler("random-merge")
Expand Down
9 changes: 4 additions & 5 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ func (h *balanceHotRegionsScheduler) dispatch(typ BalanceType, cluster schedule.
defer h.Unlock()
switch typ {
case hotReadRegionBalance:
h.stats.readStatAsLeader = h.calcScore(cluster.RegionReadStats(), cluster, core.LeaderKind)
h.stats.readStatAsLeader = calcScore(cluster.RegionReadStats(), cluster, core.LeaderKind)
return h.balanceHotReadRegions(cluster)
case hotWriteRegionBalance:
h.stats.writeStatAsLeader = h.calcScore(cluster.RegionWriteStats(), cluster, core.LeaderKind)
h.stats.writeStatAsPeer = h.calcScore(cluster.RegionWriteStats(), cluster, core.RegionKind)
h.stats.writeStatAsLeader = calcScore(cluster.RegionWriteStats(), cluster, core.LeaderKind)
h.stats.writeStatAsPeer = calcScore(cluster.RegionWriteStats(), cluster, core.RegionKind)
return h.balanceHotWriteRegions(cluster)
}
return nil
Expand Down Expand Up @@ -199,8 +199,7 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster schedule.Clu
return nil
}

// calcSocre calculates the statistics of hotspots through the hot cache.
func (h *balanceHotRegionsScheduler) calcScore(items []*core.RegionStat, cluster schedule.Cluster, kind core.ResourceKind) core.StoreHotRegionsStat {
func calcScore(items []*core.RegionStat, cluster schedule.Cluster, kind core.ResourceKind) core.StoreHotRegionsStat {
stats := make(core.StoreHotRegionsStat)
for _, r := range items {
// HotDegree is the update times on the hot cache. If the heartbeat report
Expand Down
51 changes: 51 additions & 0 deletions server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,54 @@ func (s *testRejectLeaderSuite) TestRejectLeader(c *C) {
op = sl.Schedule(tc)
testutil.CheckTransferLeader(c, op[0], schedule.OpLeader, 1, 2)
}

var _ = Suite(&testShuffleHotRegionSchedulerSuite{})

type testShuffleHotRegionSchedulerSuite struct{}

func (s *testShuffleHotRegionSchedulerSuite) TestBalance(c *C) {
opt := schedule.NewMockSchedulerOptions()
newTestReplication(opt, 3, "zone", "host")
tc := schedule.NewMockCluster(opt)
hb, err := schedule.CreateScheduler("shuffle-hot-region", schedule.NewOperatorController(nil, nil))
c.Assert(err, IsNil)

// Add stores 1, 2, 3, 4, 5, 6 with hot peer counts 3, 2, 2, 2, 0, 0.
tc.AddLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 2, map[string]string{"zone": "z2", "host": "h2"})
tc.AddLabelsStore(3, 2, map[string]string{"zone": "z3", "host": "h3"})
tc.AddLabelsStore(4, 2, map[string]string{"zone": "z4", "host": "h4"})
tc.AddLabelsStore(5, 0, map[string]string{"zone": "z5", "host": "h5"})
tc.AddLabelsStore(6, 0, map[string]string{"zone": "z4", "host": "h6"})

// Report store written bytes.
tc.UpdateStorageWrittenBytes(1, 75*1024*1024)
tc.UpdateStorageWrittenBytes(2, 45*1024*1024)
tc.UpdateStorageWrittenBytes(3, 45*1024*1024)
tc.UpdateStorageWrittenBytes(4, 60*1024*1024)
tc.UpdateStorageWrittenBytes(5, 0)
tc.UpdateStorageWrittenBytes(6, 0)

// Region 1, 2 and 3 are hot regions.
//| region_id | leader_store | follower_store | follower_store | written_bytes |
//|-----------|--------------|----------------|----------------|---------------|
//| 1 | 1 | 2 | 3 | 512KB |
//| 2 | 1 | 3 | 4 | 512KB |
//| 3 | 1 | 2 | 4 | 512KB |
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3)
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 3, 4)
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 4)
opt.HotRegionLowThreshold = 0

// try to get an operator
var op []*schedule.Operator
for i := 0; i < 100; i++ {
op = hb.Schedule(tc)
if op != nil {
break
}
}
c.Assert(op, NotNil)
c.Assert(op[0].Step(1).(schedule.PromoteLearner).ToStore, Equals, op[0].Step(2).(schedule.TransferLeader).ToStore)
c.Assert(op[0].Step(1).(schedule.PromoteLearner).ToStore, Not(Equals), 6)
}
149 changes: 149 additions & 0 deletions server/schedulers/shuffle_hot_region.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
"math/rand"
"strconv"
"time"

"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

func init() {
schedule.RegisterScheduler("shuffle-hot-region", func(opController *schedule.OperatorController, args []string) (schedule.Scheduler, error) {
limit := uint64(1)
if len(args) == 1 {
l, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return nil, errors.WithStack(err)
}
limit = l
}
return newShuffleHotRegionScheduler(opController, limit), nil
})
}

// ShuffleHotRegionScheduler mainly used to test.
// It will randomly pick a hot peer, and move the peer
// to a random store, and then transfer the leader to
// the hot peer.
type shuffleHotRegionScheduler struct {
*baseScheduler
stats *storeStatistics
r *rand.Rand
limit uint64
types []BalanceType
}

// newShuffleHotRegionScheduler creates an admin scheduler that random balance hot regions
func newShuffleHotRegionScheduler(opController *schedule.OperatorController, limit uint64) schedule.Scheduler {
base := newBaseScheduler(opController)
return &shuffleHotRegionScheduler{
baseScheduler: base,
limit: limit,
stats: newStoreStaticstics(),
types: []BalanceType{hotReadRegionBalance, hotWriteRegionBalance},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

func (s *shuffleHotRegionScheduler) GetName() string {
return "shuffle-hot-region-scheduler"
}

func (s *shuffleHotRegionScheduler) GetType() string {
return "shuffle-hot-region"
}

func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool {
return s.opController.OperatorCount(schedule.OpHotRegion) < s.limit &&
s.opController.OperatorCount(schedule.OpRegion) < cluster.GetRegionScheduleLimit() &&
s.opController.OperatorCount(schedule.OpLeader) < cluster.GetLeaderScheduleLimit()
}

func (s *shuffleHotRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule.Operator {
schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc()
i := s.r.Int() % len(s.types)
return s.dispatch(s.types[i], cluster)
}

func (s *shuffleHotRegionScheduler) dispatch(typ BalanceType, cluster schedule.Cluster) []*schedule.Operator {
switch typ {
case hotReadRegionBalance:
s.stats.readStatAsLeader = calcScore(cluster.RegionReadStats(), cluster, core.LeaderKind)
return s.randomSchedule(cluster, s.stats.readStatAsLeader)
case hotWriteRegionBalance:
s.stats.writeStatAsLeader = calcScore(cluster.RegionWriteStats(), cluster, core.LeaderKind)
return s.randomSchedule(cluster, s.stats.writeStatAsLeader)
}
return nil
}

func (s *shuffleHotRegionScheduler) randomSchedule(cluster schedule.Cluster, storeStats core.StoreHotRegionsStat) []*schedule.Operator {
for _, stats := range storeStats {
i := s.r.Intn(stats.RegionsStat.Len())
r := stats.RegionsStat[i]
// select src region
srcRegion := cluster.GetRegion(r.RegionID)
if srcRegion == nil || len(srcRegion.GetDownPeers()) != 0 || len(srcRegion.GetPendingPeers()) != 0 {
continue
}
srcStoreID := srcRegion.GetLeader().GetStoreId()
srcStore := cluster.GetStore(srcStoreID)
filters := []schedule.Filter{
schedule.StoreStateFilter{MoveRegion: true},
schedule.NewExcludedFilter(srcRegion.GetStoreIds(), srcRegion.GetStoreIds()),
schedule.NewDistinctScoreFilter(cluster.GetLocationLabels(), cluster.GetRegionStores(srcRegion), srcStore),
}
stores := cluster.GetStores()
destStoreIDs := make([]uint64, 0, len(stores))
for _, store := range stores {
if schedule.FilterTarget(cluster, store, filters) {
continue
}
destStoreIDs = append(destStoreIDs, store.GetId())
}
if len(destStoreIDs) == 0 {
return nil
}
// random pick a dest store
destStoreID := destStoreIDs[s.r.Intn(len(destStoreIDs))]
if destStoreID == 0 {
return nil
}
srcPeer := srcRegion.GetStorePeer(srcStoreID)
if srcPeer == nil {
return nil
}
destPeer, err := cluster.AllocPeer(destStoreID)
if err != nil {
log.Errorf("failed to allocate peer: %v", err)
return nil
}
schedulerCounter.WithLabelValues(s.GetName(), "create_operator").Inc()
st := []schedule.OperatorStep{
schedule.AddLearner{ToStore: destStoreID, PeerID: destPeer.GetId()},
schedule.PromoteLearner{ToStore: destStoreID, PeerID: destPeer.GetId()},
schedule.TransferLeader{ToStore: destStoreID, FromStore: srcStoreID},
schedule.RemovePeer{FromStore: srcRegion.GetLeader().GetStoreId()},
}
return []*schedule.Operator{schedule.NewOperator("randomMoveHotRegion", srcRegion.GetID(), srcRegion.GetRegionEpoch(), schedule.OpRegion|schedule.OpLeader, st...)}
}
schedulerCounter.WithLabelValues(s.GetName(), "skip").Inc()
return nil
}
31 changes: 31 additions & 0 deletions tools/pd-ctl/pdctl/command/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func NewAddSchedulerCommand() *cobra.Command {
c.AddCommand(NewEvictLeaderSchedulerCommand())
c.AddCommand(NewShuffleLeaderSchedulerCommand())
c.AddCommand(NewShuffleRegionSchedulerCommand())
c.AddCommand(NewShuffleHotRegionSchedulerCommand())
c.AddCommand(NewScatterRangeSchedulerCommand())
c.AddCommand(NewBalanceLeaderSchedulerCommand())
c.AddCommand(NewBalanceRegionSchedulerCommand())
Expand Down Expand Up @@ -139,6 +140,36 @@ func NewShuffleRegionSchedulerCommand() *cobra.Command {
return c
}

// NewShuffleHotRegionSchedulerCommand returns a command to add a shuffle-hot-region-scheduler.
func NewShuffleHotRegionSchedulerCommand() *cobra.Command {
c := &cobra.Command{
Use: "shuffle-hot-region-scheduler [limit]",
Short: "add a scheduler to shuffle hot regions",
Run: addSchedulerForShuffleHotRegionCommandFunc,
}
return c
}

func addSchedulerForShuffleHotRegionCommandFunc(cmd *cobra.Command, args []string) {
if len(args) > 1 {
cmd.Println(cmd.UsageString())
return
}
limit := uint64(1)
if len(args) == 1 {
l, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
cmd.Println("Error: ", err)
return
}
limit = l
}
input := make(map[string]interface{})
input["name"] = cmd.Name()
input["limit"] = limit
postJSON(cmd, schedulersPrefix, input)
}

// NewBalanceLeaderSchedulerCommand returns a command to add a balance-leader-scheduler.
func NewBalanceLeaderSchedulerCommand() *cobra.Command {
c := &cobra.Command{
Expand Down

0 comments on commit 618adc9

Please sign in to comment.