From a5733642e029d09a50651f2881c35737dcfc7512 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Thu, 27 Dec 2018 22:21:17 +0800 Subject: [PATCH] *: Add shuffle hot region scheduler (#1361) * add shuffle hot region scheduler Signed-off-by: nolouch --- server/api/api.raml | 5 + server/api/scheduler.go | 10 ++ server/handler.go | 5 + server/schedulers/hot_region.go | 9 +- server/schedulers/scheduler_test.go | 51 ++++++++ server/schedulers/shuffle_hot_region.go | 149 ++++++++++++++++++++++++ tools/pd-ctl/pdctl/command/scheduler.go | 31 +++++ 7 files changed, 255 insertions(+), 5 deletions(-) create mode 100644 server/schedulers/shuffle_hot_region.go diff --git a/server/api/api.raml b/server/api/api.raml index acca7a36a13..0924348eb54 100644 --- a/server/api/api.raml +++ b/server/api/api.raml @@ -237,6 +237,11 @@ types: ShuffleRegionScheduler: type: Scheduler discriminatorValue: shuffle-region-scheduler + ShuffleHotRegionScheduler: + type: Scheduler + discriminatorValue: shuffle-hot-region-scheduler + properties: + limit: integer RandomMergeScheduler: type: Scheduler discriminatorValue: random-merge-scheduler diff --git a/server/api/scheduler.go b/server/api/scheduler.go index ead34a0ec4b..3ca0578f04d 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -144,6 +144,16 @@ func (h *schedulerHandler) Post(w http.ResponseWriter, r *http.Request) { h.r.JSON(w, http.StatusInternalServerError, err.Error()) return } + case "shuffle-hot-region-scheduler": + limit := uint64(1) + l, ok := input["limit"].(float64) + if ok { + limit = uint64(l) + } + if err := h.AddShuffleHotRegionScheduler(limit); err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } default: h.r.JSON(w, http.StatusBadRequest, "unknown scheduler") return diff --git a/server/handler.go b/server/handler.go index 5358f203ce4..e5518d31165 100644 --- a/server/handler.go +++ b/server/handler.go @@ -243,6 +243,11 @@ func (h *Handler) AddShuffleRegionScheduler() error { return h.AddScheduler("shuffle-region") } +// AddShuffleHotRegionScheduler adds a shuffle-hot-region-scheduler. +func (h *Handler) AddShuffleHotRegionScheduler(limit uint64) error { + return h.AddScheduler("shuffle-hot-region", strconv.FormatUint(limit, 10)) +} + // AddRandomMergeScheduler adds a random-merge-scheduler. func (h *Handler) AddRandomMergeScheduler() error { return h.AddScheduler("random-merge") diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 692cfd762e8..cfbe0f9be9c 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -142,11 +142,11 @@ func (h *balanceHotRegionsScheduler) dispatch(typ BalanceType, cluster schedule. defer h.Unlock() switch typ { case hotReadRegionBalance: - h.stats.readStatAsLeader = h.calcScore(cluster.RegionReadStats(), cluster, core.LeaderKind) + h.stats.readStatAsLeader = calcScore(cluster.RegionReadStats(), cluster, core.LeaderKind) return h.balanceHotReadRegions(cluster) case hotWriteRegionBalance: - h.stats.writeStatAsLeader = h.calcScore(cluster.RegionWriteStats(), cluster, core.LeaderKind) - h.stats.writeStatAsPeer = h.calcScore(cluster.RegionWriteStats(), cluster, core.RegionKind) + h.stats.writeStatAsLeader = calcScore(cluster.RegionWriteStats(), cluster, core.LeaderKind) + h.stats.writeStatAsPeer = calcScore(cluster.RegionWriteStats(), cluster, core.RegionKind) return h.balanceHotWriteRegions(cluster) } return nil @@ -199,8 +199,7 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster schedule.Clu return nil } -// calcSocre calculates the statistics of hotspots through the hot cache. -func (h *balanceHotRegionsScheduler) calcScore(items []*core.RegionStat, cluster schedule.Cluster, kind core.ResourceKind) core.StoreHotRegionsStat { +func calcScore(items []*core.RegionStat, cluster schedule.Cluster, kind core.ResourceKind) core.StoreHotRegionsStat { stats := make(core.StoreHotRegionsStat) for _, r := range items { // HotDegree is the update times on the hot cache. If the heartbeat report diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index d164caac536..450c9469cbd 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -275,6 +275,57 @@ func (s *testRejectLeaderSuite) TestRejectLeader(c *C) { testutil.CheckTransferLeader(c, op[0], schedule.OpLeader, 1, 2) } +var _ = Suite(&testShuffleHotRegionSchedulerSuite{}) + +type testShuffleHotRegionSchedulerSuite struct{} + +func (s *testShuffleHotRegionSchedulerSuite) TestBalance(c *C) { + opt := schedule.NewMockSchedulerOptions() + newTestReplication(opt, 3, "zone", "host") + tc := schedule.NewMockCluster(opt) + hb, err := schedule.CreateScheduler("shuffle-hot-region", schedule.NewOperatorController(nil, nil)) + c.Assert(err, IsNil) + + // Add stores 1, 2, 3, 4, 5, 6 with hot peer counts 3, 2, 2, 2, 0, 0. + tc.AddLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"}) + tc.AddLabelsStore(2, 2, map[string]string{"zone": "z2", "host": "h2"}) + tc.AddLabelsStore(3, 2, map[string]string{"zone": "z3", "host": "h3"}) + tc.AddLabelsStore(4, 2, map[string]string{"zone": "z4", "host": "h4"}) + tc.AddLabelsStore(5, 0, map[string]string{"zone": "z5", "host": "h5"}) + tc.AddLabelsStore(6, 0, map[string]string{"zone": "z4", "host": "h6"}) + + // Report store written bytes. + tc.UpdateStorageWrittenBytes(1, 75*1024*1024) + tc.UpdateStorageWrittenBytes(2, 45*1024*1024) + tc.UpdateStorageWrittenBytes(3, 45*1024*1024) + tc.UpdateStorageWrittenBytes(4, 60*1024*1024) + tc.UpdateStorageWrittenBytes(5, 0) + tc.UpdateStorageWrittenBytes(6, 0) + + // Region 1, 2 and 3 are hot regions. + //| region_id | leader_store | follower_store | follower_store | written_bytes | + //|-----------|--------------|----------------|----------------|---------------| + //| 1 | 1 | 2 | 3 | 512KB | + //| 2 | 1 | 3 | 4 | 512KB | + //| 3 | 1 | 2 | 4 | 512KB | + tc.AddLeaderRegionWithWriteInfo(1, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithWriteInfo(2, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 3, 4) + tc.AddLeaderRegionWithWriteInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 4) + opt.HotRegionLowThreshold = 0 + + // try to get an operator + var op []*schedule.Operator + for i := 0; i < 100; i++ { + op = hb.Schedule(tc) + if op != nil { + break + } + } + c.Assert(op, NotNil) + c.Assert(op[0].Step(1).(schedule.PromoteLearner).ToStore, Equals, op[0].Step(2).(schedule.TransferLeader).ToStore) + c.Assert(op[0].Step(1).(schedule.PromoteLearner).ToStore, Not(Equals), 6) +} + var _ = Suite(&testEvictLeaderSuite{}) type testEvictLeaderSuite struct{} diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go new file mode 100644 index 00000000000..bf9ae71b870 --- /dev/null +++ b/server/schedulers/shuffle_hot_region.go @@ -0,0 +1,149 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "math/rand" + "strconv" + "time" + + "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/schedule" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +func init() { + schedule.RegisterScheduler("shuffle-hot-region", func(opController *schedule.OperatorController, args []string) (schedule.Scheduler, error) { + limit := uint64(1) + if len(args) == 1 { + l, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return nil, errors.WithStack(err) + } + limit = l + } + return newShuffleHotRegionScheduler(opController, limit), nil + }) +} + +// ShuffleHotRegionScheduler mainly used to test. +// It will randomly pick a hot peer, and move the peer +// to a random store, and then transfer the leader to +// the hot peer. +type shuffleHotRegionScheduler struct { + *baseScheduler + stats *storeStatistics + r *rand.Rand + limit uint64 + types []BalanceType +} + +// newShuffleHotRegionScheduler creates an admin scheduler that random balance hot regions +func newShuffleHotRegionScheduler(opController *schedule.OperatorController, limit uint64) schedule.Scheduler { + base := newBaseScheduler(opController) + return &shuffleHotRegionScheduler{ + baseScheduler: base, + limit: limit, + stats: newStoreStaticstics(), + types: []BalanceType{hotReadRegionBalance, hotWriteRegionBalance}, + r: rand.New(rand.NewSource(time.Now().UnixNano())), + } +} + +func (s *shuffleHotRegionScheduler) GetName() string { + return "shuffle-hot-region-scheduler" +} + +func (s *shuffleHotRegionScheduler) GetType() string { + return "shuffle-hot-region" +} + +func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool { + return s.opController.OperatorCount(schedule.OpHotRegion) < s.limit && + s.opController.OperatorCount(schedule.OpRegion) < cluster.GetRegionScheduleLimit() && + s.opController.OperatorCount(schedule.OpLeader) < cluster.GetLeaderScheduleLimit() +} + +func (s *shuffleHotRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule.Operator { + schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() + i := s.r.Int() % len(s.types) + return s.dispatch(s.types[i], cluster) +} + +func (s *shuffleHotRegionScheduler) dispatch(typ BalanceType, cluster schedule.Cluster) []*schedule.Operator { + switch typ { + case hotReadRegionBalance: + s.stats.readStatAsLeader = calcScore(cluster.RegionReadStats(), cluster, core.LeaderKind) + return s.randomSchedule(cluster, s.stats.readStatAsLeader) + case hotWriteRegionBalance: + s.stats.writeStatAsLeader = calcScore(cluster.RegionWriteStats(), cluster, core.LeaderKind) + return s.randomSchedule(cluster, s.stats.writeStatAsLeader) + } + return nil +} + +func (s *shuffleHotRegionScheduler) randomSchedule(cluster schedule.Cluster, storeStats core.StoreHotRegionsStat) []*schedule.Operator { + for _, stats := range storeStats { + i := s.r.Intn(stats.RegionsStat.Len()) + r := stats.RegionsStat[i] + // select src region + srcRegion := cluster.GetRegion(r.RegionID) + if srcRegion == nil || len(srcRegion.GetDownPeers()) != 0 || len(srcRegion.GetPendingPeers()) != 0 { + continue + } + srcStoreID := srcRegion.GetLeader().GetStoreId() + srcStore := cluster.GetStore(srcStoreID) + filters := []schedule.Filter{ + schedule.StoreStateFilter{MoveRegion: true}, + schedule.NewExcludedFilter(srcRegion.GetStoreIds(), srcRegion.GetStoreIds()), + schedule.NewDistinctScoreFilter(cluster.GetLocationLabels(), cluster.GetRegionStores(srcRegion), srcStore), + } + stores := cluster.GetStores() + destStoreIDs := make([]uint64, 0, len(stores)) + for _, store := range stores { + if schedule.FilterTarget(cluster, store, filters) { + continue + } + destStoreIDs = append(destStoreIDs, store.GetId()) + } + if len(destStoreIDs) == 0 { + return nil + } + // random pick a dest store + destStoreID := destStoreIDs[s.r.Intn(len(destStoreIDs))] + if destStoreID == 0 { + return nil + } + srcPeer := srcRegion.GetStorePeer(srcStoreID) + if srcPeer == nil { + return nil + } + destPeer, err := cluster.AllocPeer(destStoreID) + if err != nil { + log.Errorf("failed to allocate peer: %v", err) + return nil + } + schedulerCounter.WithLabelValues(s.GetName(), "create_operator").Inc() + st := []schedule.OperatorStep{ + schedule.AddLearner{ToStore: destStoreID, PeerID: destPeer.GetId()}, + schedule.PromoteLearner{ToStore: destStoreID, PeerID: destPeer.GetId()}, + schedule.TransferLeader{ToStore: destStoreID, FromStore: srcStoreID}, + schedule.RemovePeer{FromStore: srcRegion.GetLeader().GetStoreId()}, + } + return []*schedule.Operator{schedule.NewOperator("randomMoveHotRegion", srcRegion.GetID(), srcRegion.GetRegionEpoch(), schedule.OpRegion|schedule.OpLeader, st...)} + } + schedulerCounter.WithLabelValues(s.GetName(), "skip").Inc() + return nil +} diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 4b581805b99..8d4d27e2552 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -71,6 +71,7 @@ func NewAddSchedulerCommand() *cobra.Command { c.AddCommand(NewEvictLeaderSchedulerCommand()) c.AddCommand(NewShuffleLeaderSchedulerCommand()) c.AddCommand(NewShuffleRegionSchedulerCommand()) + c.AddCommand(NewShuffleHotRegionSchedulerCommand()) c.AddCommand(NewScatterRangeSchedulerCommand()) c.AddCommand(NewBalanceLeaderSchedulerCommand()) c.AddCommand(NewBalanceRegionSchedulerCommand()) @@ -139,6 +140,36 @@ func NewShuffleRegionSchedulerCommand() *cobra.Command { return c } +// NewShuffleHotRegionSchedulerCommand returns a command to add a shuffle-hot-region-scheduler. +func NewShuffleHotRegionSchedulerCommand() *cobra.Command { + c := &cobra.Command{ + Use: "shuffle-hot-region-scheduler [limit]", + Short: "add a scheduler to shuffle hot regions", + Run: addSchedulerForShuffleHotRegionCommandFunc, + } + return c +} + +func addSchedulerForShuffleHotRegionCommandFunc(cmd *cobra.Command, args []string) { + if len(args) > 1 { + cmd.Println(cmd.UsageString()) + return + } + limit := uint64(1) + if len(args) == 1 { + l, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + cmd.Println("Error: ", err) + return + } + limit = l + } + input := make(map[string]interface{}) + input["name"] = cmd.Name() + input["limit"] = limit + postJSON(cmd, schedulersPrefix, input) +} + // NewBalanceLeaderSchedulerCommand returns a command to add a balance-leader-scheduler. func NewBalanceLeaderSchedulerCommand() *cobra.Command { c := &cobra.Command{