Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedulers: dependency inverse. #731

Merged
merged 2 commits into from
Sep 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"github.com/pingcap/pd/pkg/metricutil"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/server/api"

// Register schedulers.
_ "github.com/pingcap/pd/server/schedulers"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion server/api/redirector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"net/url"
"strings"

"github.com/ngaut/log"
log "github.com/Sirupsen/logrus"
"github.com/pingcap/pd/server"
)

Expand Down
2 changes: 2 additions & 0 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/pingcap/pd/server/core"
)

// TODO: remove this file.

const (
bootstrapBalanceCount = 10
bootstrapBalanceDiff = 2
Expand Down
23 changes: 16 additions & 7 deletions server/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
"github.com/pingcap/pd/server/schedulers"
_ "github.com/pingcap/pd/server/schedulers" // Register schedulers for tests.
)

// TODO: move tests to schedulers directory.

type testClusterInfo struct {
*clusterInfo
}
Expand Down Expand Up @@ -271,7 +273,9 @@ func (s *testBalanceLeaderSchedulerSuite) SetUpTest(c *C) {
s.cluster = newClusterInfo(newMockIDAllocator())
s.tc = newTestClusterInfo(s.cluster)
_, opt := newTestScheduleConfig()
s.lb = schedulers.NewBalanceLeaderScheduler(opt)
lb, err := schedule.CreateScheduler("balanceLeader", opt)
c.Assert(err, IsNil)
s.lb = lb
}

func (s *testBalanceLeaderSchedulerSuite) schedule() schedule.Operator {
Expand Down Expand Up @@ -403,7 +407,8 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) {
tc := newTestClusterInfo(cluster)

_, opt := newTestScheduleConfig()
sb := schedulers.NewBalanceRegionScheduler(opt)
sb, err := schedule.CreateScheduler("balanceRegion", opt)
c.Assert(err, IsNil)

opt.SetMaxReplicas(1)

Expand Down Expand Up @@ -441,7 +446,8 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) {
_, opt := newTestScheduleConfig()
opt.rep = newTestReplication(3, "zone", "rack", "host")

sb := schedulers.NewBalanceRegionScheduler(opt)
sb, err := schedule.CreateScheduler("balanceRegion", opt)
c.Assert(err, IsNil)

// Store 1 has the largest region score, so the balancer try to replace peer in store 1.
tc.addLabelsStore(1, 6, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
Expand Down Expand Up @@ -505,7 +511,8 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas5(c *C) {
_, opt := newTestScheduleConfig()
opt.rep = newTestReplication(5, "zone", "rack", "host")

sb := schedulers.NewBalanceRegionScheduler(opt)
sb, err := schedule.CreateScheduler("balanceRegion", opt)
c.Assert(err, IsNil)

tc.addLabelsStore(1, 4, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.addLabelsStore(2, 5, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"})
Expand Down Expand Up @@ -540,7 +547,8 @@ func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) {
tc := newTestClusterInfo(cluster)

_, opt := newTestScheduleConfig()
sb := schedulers.NewBalanceRegionScheduler(opt)
sb, err := schedule.CreateScheduler("balanceRegion", opt)
c.Assert(err, IsNil)
opt.SetMaxReplicas(1)

tc.addRegionStore(1, 10)
Expand Down Expand Up @@ -907,7 +915,8 @@ func (s *testBalanceHotRegionSchedulerSuite) TestBalance(c *C) {
tc := newTestClusterInfo(cluster)

_, opt := newTestScheduleConfig()
hb := schedulers.NewBalanceHotRegionScheduler(opt)
hb, err := schedule.CreateScheduler("hotRegion", opt)
c.Assert(err, IsNil)

// Add stores 1, 2, 3, 4, 5 with region counts 3, 2, 2, 2, 0.
tc.addRegionStore(1, 3)
Expand Down
10 changes: 6 additions & 4 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
"github.com/pingcap/pd/server/schedulers"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -131,9 +130,12 @@ func (c *coordinator) run() {
}
}
log.Info("coordinator: Run scheduler")
c.addScheduler(schedulers.NewBalanceLeaderScheduler(c.opt), minScheduleInterval)
c.addScheduler(schedulers.NewBalanceRegionScheduler(c.opt), minScheduleInterval)
c.addScheduler(schedulers.NewBalanceHotRegionScheduler(c.opt), minSlowScheduleInterval)
s, _ := schedule.CreateScheduler("balanceLeader", c.opt)
c.addScheduler(s, minScheduleInterval)
s, _ = schedule.CreateScheduler("balanceRegion", c.opt)
c.addScheduler(s, minScheduleInterval)
s, _ = schedule.CreateScheduler("hotRegion", c.opt)
c.addScheduler(s, minSlowScheduleInterval)
}

func (c *coordinator) stop() {
Expand Down
14 changes: 9 additions & 5 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/pd/pkg/testutil"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
"github.com/pingcap/pd/server/schedulers"
)

type testOperator struct {
Expand Down Expand Up @@ -342,11 +341,13 @@ func (s *testCoordinatorSuite) TestAddScheduler(c *C) {
// Add regions 3 with leader in store 3 and followers in stores 1,2
tc.addLeaderRegion(3, 3, 1, 2)

gls := schedulers.NewGrantLeaderScheduler(opt, 0)
gls, err := schedule.CreateScheduler("grantLeader", opt, "0")
c.Assert(err, IsNil)
c.Assert(co.addScheduler(gls, minScheduleInterval), NotNil)
c.Assert(co.removeScheduler(gls.GetName()), NotNil)

gls = schedulers.NewGrantLeaderScheduler(opt, 1)
gls, err = schedule.CreateScheduler("grantLeader", opt, "1")
c.Assert(err, IsNil)
c.Assert(co.addScheduler(gls, minScheduleInterval), IsNil)

// Transfer all leaders to store 1.
Expand Down Expand Up @@ -457,8 +458,10 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
hbStreams := newHeartbeatStreams(cluster.getClusterID())
defer hbStreams.Close()
co := newCoordinator(cluster, opt, hbStreams)
scheduler, err := schedule.CreateScheduler("balanceLeader", opt)
c.Assert(err, IsNil)
lb := &mockLimitScheduler{
Scheduler: schedulers.NewBalanceLeaderScheduler(opt),
Scheduler: scheduler,
}
sc := newScheduleController(co, lb, minScheduleInterval)

Expand Down Expand Up @@ -512,7 +515,8 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) {
hbStreams := newHeartbeatStreams(cluster.getClusterID())
defer hbStreams.Close()
co := newCoordinator(cluster, opt, hbStreams)
lb := schedulers.NewBalanceLeaderScheduler(opt)
lb, err := schedule.CreateScheduler("balanceLeader", opt)
c.Assert(err, IsNil)
sc := newScheduleController(co, lb, minScheduleInterval)

// If no operator for x seconds, the next check should be in x/2 seconds.
Expand Down
33 changes: 27 additions & 6 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
package server

import (
"strconv"

"github.com/juju/errors"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
"github.com/pingcap/pd/server/schedulers"
)

var (
Expand Down Expand Up @@ -86,27 +87,47 @@ func (h *Handler) RemoveScheduler(name string) error {

// AddBalanceLeaderScheduler adds a balance-leader-scheduler.
func (h *Handler) AddBalanceLeaderScheduler() error {
return h.AddScheduler(schedulers.NewBalanceLeaderScheduler(h.opt))
s, err := schedule.CreateScheduler("balanceLeader", h.opt)
if err != nil {
return errors.Trace(err)
}
return h.AddScheduler(s)
}

// AddGrantLeaderScheduler adds a grant-leader-scheduler.
func (h *Handler) AddGrantLeaderScheduler(storeID uint64) error {
return h.AddScheduler(schedulers.NewGrantLeaderScheduler(h.opt, storeID))
s, err := schedule.CreateScheduler("grantLeader", h.opt, strconv.FormatUint(storeID, 10))
if err != nil {
return errors.Trace(err)
}
return h.AddScheduler(s)
}

// AddEvictLeaderScheduler adds an evict-leader-scheduler.
func (h *Handler) AddEvictLeaderScheduler(storeID uint64) error {
return h.AddScheduler(schedulers.NewEvictLeaderScheduler(h.opt, storeID))
s, err := schedule.CreateScheduler("evictLeader", h.opt, strconv.FormatUint(storeID, 10))
if err != nil {
return errors.Trace(err)
}
return h.AddScheduler(s)
}

// AddShuffleLeaderScheduler adds a shuffle-leader-scheduler.
func (h *Handler) AddShuffleLeaderScheduler() error {
return h.AddScheduler(schedulers.NewShuffleLeaderScheduler(h.opt))
s, err := schedule.CreateScheduler("shuffleLeader", h.opt)
if err != nil {
return errors.Trace(err)
}
return h.AddScheduler(s)
}

// AddShuffleRegionScheduler adds a shuffle-region-scheduler.
func (h *Handler) AddShuffleRegionScheduler() error {
return h.AddScheduler(schedulers.NewShuffleRegionScheduler(h.opt))
s, err := schedule.CreateScheduler("shuffleRegion", h.opt)
if err != nil {
return errors.Trace(err)
}
return h.AddScheduler(s)
}

// GetOperator returns the region operator.
Expand Down
9 changes: 0 additions & 9 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,6 @@ var (
Help: "Status of the scheduler.",
}, []string{"kind", "type"})

schedulerCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "scheduler",
Name: "event_count",
Help: "Counter of scheduler events.",
}, []string{"type", "name"})

regionHeartbeatCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Expand Down Expand Up @@ -105,7 +97,6 @@ func init() {
prometheus.MustRegister(clusterStatusGauge)
prometheus.MustRegister(timeJumpBackCounter)
prometheus.MustRegister(schedulerStatusGauge)
prometheus.MustRegister(schedulerCounter)
prometheus.MustRegister(regionHeartbeatCounter)
prometheus.MustRegister(hotSpotStatusGauge)
prometheus.MustRegister(tsoCounter)
Expand Down
File renamed without changes.
File renamed without changes.
26 changes: 26 additions & 0 deletions server/schedule/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package schedule

import (
"fmt"

"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/server/core"
)
Expand Down Expand Up @@ -50,3 +53,26 @@ type Scheduler interface {
Cleanup(cluster Cluster)
Schedule(cluster Cluster) Operator
}

// CreateSchedulerFunc is for creating scheudler.
type CreateSchedulerFunc func(opt Options, args []string) (Scheduler, error)

var schedulerMap = make(map[string]CreateSchedulerFunc)

// RegisterScheduler binds a scheduler creator. It should be called in init()
// func of a package.
func RegisterScheduler(name string, createFn CreateSchedulerFunc) {
if _, ok := schedulerMap[name]; ok {
panic(fmt.Sprintf("duplicated scheduler name: %v", name))
}
schedulerMap[name] = createFn
}

// CreateScheduler creates a scheduler with registered creator func.
func CreateScheduler(name string, opt Options, args ...string) (Scheduler, error) {
fn, ok := schedulerMap[name]
if !ok {
return nil, errors.Errorf("create func of %v is not registered", name)
}
return fn(opt, args)
}
Loading