Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule, server: persist scheduler list to etcd #769

Merged
merged 20 commits into from Sep 30, 2017
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 3 additions & 7 deletions conf/config.toml
Expand Up @@ -53,13 +53,9 @@ replica-schedule-limit = 24

# customized schedulers, the format is as below
# if empty, it will use balance-leader, balance-region, hot-region as default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use two PRs, one is to change the name, the other is to persist the config.

[schedule.schedulers]
# [schedule.schedulers.balance-leader]
#
# [schedule.schedulers.balance-region]
#
# [schedule.schedulers.hot-region]

# [[schedule.schedulers]]
# type = "evict-leader"
# args = ["1"]


[replication]
Expand Down
2 changes: 1 addition & 1 deletion server/cluster.go
Expand Up @@ -107,7 +107,7 @@ func (c *RaftCluster) start() error {
return nil
}
c.cachedCluster = cluster
c.coordinator = newCoordinator(c.cachedCluster, c.s.scheduleOpt, c.s.hbStreams)
c.coordinator = newCoordinator(c.cachedCluster, c.s.scheduleOpt, c.s.hbStreams, c.s.kv)
c.quit = make(chan struct{})

c.wg.Add(2)
Expand Down
72 changes: 59 additions & 13 deletions server/config.go
Expand Up @@ -17,6 +17,7 @@ import (
"flag"
"fmt"
"net/url"
"reflect"
"strings"
"sync/atomic"
"time"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/pingcap/pd/pkg/logutil"
"github.com/pingcap/pd/pkg/metricutil"
"github.com/pingcap/pd/pkg/typeutil"
"github.com/pingcap/pd/server/schedule"
)

// Config is the pd server configuration.
Expand Down Expand Up @@ -287,8 +289,7 @@ func (c *Config) String() string {

// configFromFile loads config from file.
func (c *Config) configFromFile(path string) error {
meta, err := toml.DecodeFile(path, c)
c.Schedule.Meta = meta
_, err := toml.DecodeFile(path, c)
return errors.Trace(err)
}

Expand All @@ -308,14 +309,29 @@ type ScheduleConfig struct {
ReplicaScheduleLimit uint64 `toml:"replica-schedule-limit,omitempty" json:"replica-schedule-limit"`
// Schedulers support for loding customized schedulers
Schedulers SchedulerConfigs `toml:"schedulers,omitempty" json:"schedulers"`
}

// Meta is meta information return from toml.Decode
// passing it to scheduler handler for supporting customized arguments
Meta toml.MetaData
func (c *ScheduleConfig) clone() *ScheduleConfig {
schedulers := make(SchedulerConfigs, len(c.Schedulers))
copy(schedulers, c.Schedulers)
return &ScheduleConfig{
MaxSnapshotCount: c.MaxSnapshotCount,
MaxStoreDownTime: c.MaxStoreDownTime,
LeaderScheduleLimit: c.LeaderScheduleLimit,
RegionScheduleLimit: c.RegionScheduleLimit,
ReplicaScheduleLimit: c.ReplicaScheduleLimit,
Schedulers: schedulers,
}
}

// SchedulerConfigs is a map of customized scheduler configuration.
type SchedulerConfigs map[string]toml.Primitive
// SchedulerConfigs is a slice of customized scheduler configuration.
type SchedulerConfigs []SchedulerConfig

// SchedulerConfig is customized scheduler configuration
type SchedulerConfig struct {
Type string `toml:"type" json:"type"`
Args []string `toml:"args,omitempty" json:"args"`
}

const (
defaultMaxReplicas = 3
Expand All @@ -327,9 +343,9 @@ const (
)

var defaultSchedulers = SchedulerConfigs{
"balance-region": toml.Primitive{},
"balance-leader": toml.Primitive{},
"hot-region": toml.Primitive{},
{Type: "balance-region"},
{Type: "balance-leader"},
{Type: "hot-region"},
}

func (c *ScheduleConfig) adjust() {
Expand All @@ -354,7 +370,7 @@ type ReplicationConfig struct {
}

func (c *ReplicationConfig) clone() *ReplicationConfig {
locationLabels := make(typeutil.StringSlice, 0, len(c.LocationLabels))
locationLabels := make(typeutil.StringSlice, len(c.LocationLabels))
copy(locationLabels, c.LocationLabels)
return &ReplicationConfig{
MaxReplicas: c.MaxReplicas,
Expand Down Expand Up @@ -427,8 +443,38 @@ func (o *scheduleOption) GetSchedulers() SchedulerConfigs {
return o.load().Schedulers
}

func (o *scheduleOption) GetMeta() toml.MetaData {
return o.load().Meta
func (o *scheduleOption) AddSchedulerCfg(tp string, args []string) bool {
c := o.load()
v := c.clone()
for _, schedulerCfg := range v.Schedulers {
// comparing args is to cover the case that there are schedulers in same type but not with same name
// such as two schedulers of type "evict-leader",
// one name is "evict-leader-scheduler-1" and the other is "evict-leader-scheduler-2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just compare their names?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that we are going to add two schedulers, "evict-leader-scheduler-1" and "evict-leader-scheduler-2", but when creating them we should pass "evit-leader" to CreateScheduler() not their name. And schedulers reloading will call CreateScheduler(), thus the name store in schedulerCfg is their type "evit-leader", not their name "evict-leader-scheduler-1" or "evict-leader-scheduler-2". And for here, "evit-leader" is not enough to distinguish "evict-leader-scheduler-1" with "evict-leader-scheduler-2"

if reflect.DeepEqual(schedulerCfg, SchedulerConfig{tp, args}) {
return false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about add a Equals method to SchedulerConfig?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}
v.Schedulers = append(v.Schedulers, SchedulerConfig{Type: tp, Args: args})
o.store(v)
return true
}

func (o *scheduleOption) RemoveSchedulerCfg(name string) (bool, error) {
c := o.load()
v := c.clone()
for i, schedulerCfg := range v.Schedulers {
// To create a temporary scheduler is just used to get scheduler's name
tmp, err := schedule.CreateScheduler(schedulerCfg.Type, o, schedulerCfg.Args...)
if err != nil {
return false, errors.Trace(err)
}
if tmp.GetName() == name {
v.Schedulers = append(v.Schedulers[:i], v.Schedulers[i+1:]...)
o.store(v)
return true, nil
}
}
return false, nil
}

func (o *scheduleOption) persist(kv *kv) error {
Expand Down
52 changes: 42 additions & 10 deletions server/coordinator.go
Expand Up @@ -68,9 +68,10 @@ type coordinator struct {
schedulers map[string]*scheduleController
histories cache.Cache
hbStreams *heartbeatStreams
kv *kv
}

func newCoordinator(cluster *clusterInfo, opt *scheduleOption, hbStreams *heartbeatStreams) *coordinator {
func newCoordinator(cluster *clusterInfo, opt *scheduleOption, hbStreams *heartbeatStreams, kv *kv) *coordinator {
ctx, cancel := context.WithCancel(context.Background())
return &coordinator{
ctx: ctx,
Expand All @@ -83,6 +84,7 @@ func newCoordinator(cluster *clusterInfo, opt *scheduleOption, hbStreams *heartb
schedulers: make(map[string]*scheduleController),
histories: cache.NewDefaultCache(historiesCacheSize),
hbStreams: hbStreams,
kv: kv,
}
}

Expand Down Expand Up @@ -133,18 +135,32 @@ func (c *coordinator) run() {
}
log.Info("coordinator: Run scheduler")

for name := range c.opt.GetSchedulers() {
// note: CreateScheduler just needs specific scheduler config
// so schedulers(a map of scheduler configs) wrapped by c.opt has redundant configs
s, err := schedule.CreateScheduler(name, c.opt)
k := 0
scheduleCfg := c.opt.load()
for _, schedulerCfg := range scheduleCfg.Schedulers {
s, err := schedule.CreateScheduler(schedulerCfg.Type, c.opt, schedulerCfg.Args...)
if err != nil {
log.Errorf("can not create scheduler %s: %v", name, err)
log.Errorf("can not create scheduler %s: %v", schedulerCfg.Type, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you can remove invalid persist schedulers in here according to the error types.

} else {
log.Infof("create scheduler %s", name)
if err := c.addScheduler(s, s.GetInterval()); err != nil {
log.Errorf("can not add scheduler %s: %v", name, err)
log.Infof("create scheduler %s", s.GetName())
if err = c.addScheduler(s, s.GetInterval(), schedulerCfg.Args...); err != nil {
log.Errorf("can not add scheduler %s: %v", s.GetName(), err)
}
}

// only record valid scheduler config
if err == nil {
scheduleCfg.Schedulers[k] = schedulerCfg
k++
}
}

// remove invalid scheduler config
if k != len(scheduleCfg.Schedulers) {
scheduleCfg.Schedulers = scheduleCfg.Schedulers[:k]
if err := c.opt.persist(c.kv); err != nil {
log.Errorf("can't persist schedule config: %v", err)
}
}
}

Expand Down Expand Up @@ -254,7 +270,7 @@ func (c *coordinator) shouldRun() bool {
return c.cluster.isPrepared()
}

func (c *coordinator) addScheduler(scheduler schedule.Scheduler, interval time.Duration) error {
func (c *coordinator) addScheduler(scheduler schedule.Scheduler, interval time.Duration, args ...string) error {
c.Lock()
defer c.Unlock()

Expand All @@ -270,6 +286,11 @@ func (c *coordinator) addScheduler(scheduler schedule.Scheduler, interval time.D
c.wg.Add(1)
go c.runScheduler(s)
c.schedulers[s.GetName()] = s
if c.opt.AddSchedulerCfg(s.GetType(), args) {
if err := c.opt.persist(c.kv); err != nil {
log.Errorf("can't persist when add scheduler %v: %v", s.GetName(), err)
}
}
return nil
}

Expand All @@ -284,6 +305,17 @@ func (c *coordinator) removeScheduler(name string) error {

s.Stop()
delete(c.schedulers, name)

ok, err := c.opt.RemoveSchedulerCfg(name)
if err != nil {
return errors.Trace(err)
}

if ok {
if err := c.opt.persist(c.kv); err != nil {
log.Errorf("can't persist when remove scheduler %v: %v", s.GetName(), err)
}
}
return nil
}

Expand Down