Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule, server: persist scheduler list to etcd #769

Merged
merged 20 commits into from Sep 30, 2017
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 8 additions & 5 deletions conf/config.toml
Expand Up @@ -53,13 +53,16 @@ replica-schedule-limit = 24

# customized schedulers, the format is as below
# if empty, it will use balance-leader, balance-region, hot-region as default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use two PRs, one is to change the name, the other is to persist the config.

[schedule.schedulers]
# [schedule.schedulers.balance-leader]
# [[schedule.schedulers]]
# type = "balance-leader"
# args = ["foo", "bar"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad arg name example, what does foo or bar mean here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nothing, just placeholder

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use more meaningful argument names if possible or remove these.

#
# [schedule.schedulers.balance-region]
# [[schedule.schedulers]]
# type = "balance-region"
#
# [schedule.schedulers.hot-region]

# [[schedule.schedulers]]
# type = "hot-region"
#


[replication]
Expand Down
2 changes: 1 addition & 1 deletion server/cluster.go
Expand Up @@ -107,7 +107,7 @@ func (c *RaftCluster) start() error {
return nil
}
c.cachedCluster = cluster
c.coordinator = newCoordinator(c.cachedCluster, c.s.scheduleOpt, c.s.hbStreams)
c.coordinator = newCoordinator(c.cachedCluster, c.s.scheduleOpt, c.s.hbStreams, c.s.kv)
c.quit = make(chan struct{})

c.wg.Add(2)
Expand Down
88 changes: 75 additions & 13 deletions server/config.go
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/pd/pkg/logutil"
"github.com/pingcap/pd/pkg/metricutil"
"github.com/pingcap/pd/pkg/typeutil"
"github.com/pingcap/pd/server/schedule"
)

// Config is the pd server configuration.
Expand Down Expand Up @@ -287,8 +288,7 @@ func (c *Config) String() string {

// configFromFile loads config from file.
func (c *Config) configFromFile(path string) error {
meta, err := toml.DecodeFile(path, c)
c.Schedule.Meta = meta
_, err := toml.DecodeFile(path, c)
return errors.Trace(err)
}

Expand All @@ -308,14 +308,29 @@ type ScheduleConfig struct {
ReplicaScheduleLimit uint64 `toml:"replica-schedule-limit,omitempty" json:"replica-schedule-limit"`
// Schedulers support for loding customized schedulers
Schedulers SchedulerConfigs `toml:"schedulers,omitempty" json:"schedulers"`
}

// Meta is meta information return from toml.Decode
// passing it to scheduler handler for supporting customized arguments
Meta toml.MetaData
func (c *ScheduleConfig) clone() *ScheduleConfig {
schedulers := make(SchedulerConfigs, len(c.Schedulers))
copy(schedulers, c.Schedulers)
return &ScheduleConfig{
MaxSnapshotCount: c.MaxSnapshotCount,
MaxStoreDownTime: c.MaxStoreDownTime,
LeaderScheduleLimit: c.LeaderScheduleLimit,
RegionScheduleLimit: c.RegionScheduleLimit,
ReplicaScheduleLimit: c.ReplicaScheduleLimit,
Schedulers: schedulers,
}
}

// SchedulerConfigs is a map of customized scheduler configuration.
type SchedulerConfigs map[string]toml.Primitive
// SchedulerConfigs is a slice of customized scheduler configuration.
type SchedulerConfigs []SchedulerConfig

// SchedulerConfig is customized scheduler configuration
type SchedulerConfig struct {
Type string `toml:"type" json:"type"`
Args []string `toml:"args,omitempty" json:"args"`
}

const (
defaultMaxReplicas = 3
Expand All @@ -327,9 +342,29 @@ const (
)

var defaultSchedulers = SchedulerConfigs{
"balance-region": toml.Primitive{},
"balance-leader": toml.Primitive{},
"hot-region": toml.Primitive{},
SchedulerConfig{Type: "balance-region"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seem you can use

SchedulerConfigs {
    {Type:"xxx"},
    {Type:"xxx"},
}

SchedulerConfig{Type: "balance-leader"},
SchedulerConfig{Type: "hot-region"},
}

// Equals is to compare whether two ScheduleConfig is same or not
func (c *SchedulerConfig) Equals(s SchedulerConfig) bool {
stringSliceEqual := func(a, b []string) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use reflect.DeepEqual

if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}

// comparing args is to cover the case that there are schedulers in same type but not with same name
// such as two schedulers of type evictLeader,
// one name is "evict-leader-scheduler-1" and the other is "evict-leader-scheduler-2"
return c.Type == s.Type && stringSliceEqual(c.Args, s.Args)
}

func (c *ScheduleConfig) adjust() {
Expand All @@ -354,7 +389,7 @@ type ReplicationConfig struct {
}

func (c *ReplicationConfig) clone() *ReplicationConfig {
locationLabels := make(typeutil.StringSlice, 0, len(c.LocationLabels))
locationLabels := make(typeutil.StringSlice, len(c.LocationLabels))
copy(locationLabels, c.LocationLabels)
return &ReplicationConfig{
MaxReplicas: c.MaxReplicas,
Expand Down Expand Up @@ -427,8 +462,35 @@ func (o *scheduleOption) GetSchedulers() SchedulerConfigs {
return o.load().Schedulers
}

func (o *scheduleOption) GetMeta() toml.MetaData {
return o.load().Meta
func (o *scheduleOption) AddSchedulerCfg(tp string, args []string) bool {
c := o.load()
v := c.clone()
for _, schedulerCfg := range v.Schedulers {
if schedulerCfg.Equals(SchedulerConfig{tp, args}) {
return false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about add a Equals method to SchedulerConfig?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}
v.Schedulers = append(v.Schedulers, SchedulerConfig{Type: tp, Args: args})
o.store(v)
return true
}

func (o *scheduleOption) RemoveSchedulerCfg(name string) (bool, error) {
c := o.load()
v := c.clone()
for i, schedulerCfg := range v.Schedulers {
// To create a temporary scheduler is just used to get scheduler's name
tmp, err := schedule.CreateScheduler(schedulerCfg.Type, o, schedulerCfg.Args...)
if err != nil {
return false, errors.Trace(err)
}
if tmp.GetName() == name {
v.Schedulers = append(v.Schedulers[:i], v.Schedulers[i+1:]...)
o.store(v)
return true, nil
}
}
return false, nil
}

func (o *scheduleOption) persist(kv *kv) error {
Expand Down
36 changes: 26 additions & 10 deletions server/coordinator.go
Expand Up @@ -68,9 +68,10 @@ type coordinator struct {
schedulers map[string]*scheduleController
histories cache.Cache
hbStreams *heartbeatStreams
kv *kv
}

func newCoordinator(cluster *clusterInfo, opt *scheduleOption, hbStreams *heartbeatStreams) *coordinator {
func newCoordinator(cluster *clusterInfo, opt *scheduleOption, hbStreams *heartbeatStreams, kv *kv) *coordinator {
ctx, cancel := context.WithCancel(context.Background())
return &coordinator{
ctx: ctx,
Expand All @@ -83,6 +84,7 @@ func newCoordinator(cluster *clusterInfo, opt *scheduleOption, hbStreams *heartb
schedulers: make(map[string]*scheduleController),
histories: cache.NewDefaultCache(historiesCacheSize),
hbStreams: hbStreams,
kv: kv,
}
}

Expand Down Expand Up @@ -133,16 +135,14 @@ func (c *coordinator) run() {
}
log.Info("coordinator: Run scheduler")

for name := range c.opt.GetSchedulers() {
// note: CreateScheduler just needs specific scheduler config
// so schedulers(a map of scheduler configs) wrapped by c.opt has redundant configs
s, err := schedule.CreateScheduler(name, c.opt)
for _, schedulerCfg := range c.opt.GetSchedulers() {
s, err := schedule.CreateScheduler(schedulerCfg.Type, c.opt, schedulerCfg.Args...)
if err != nil {
log.Errorf("can not create scheduler %s: %v", name, err)
log.Errorf("can not create scheduler %s: %v", schedulerCfg.Type, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you can remove invalid persist schedulers in here according to the error types.

} else {
log.Infof("create scheduler %s", name)
if err := c.addScheduler(s, s.GetInterval()); err != nil {
log.Errorf("can not add scheduler %s: %v", name, err)
log.Infof("create scheduler %s", s.GetName())
if err := c.addScheduler(s, s.GetInterval(), schedulerCfg.Args...); err != nil {
log.Errorf("can not add scheduler %s: %v", s.GetName(), err)
}
}
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func (c *coordinator) shouldRun() bool {
return c.cluster.isPrepared()
}

func (c *coordinator) addScheduler(scheduler schedule.Scheduler, interval time.Duration) error {
func (c *coordinator) addScheduler(scheduler schedule.Scheduler, interval time.Duration, args ...string) error {
c.Lock()
defer c.Unlock()

Expand All @@ -270,6 +270,11 @@ func (c *coordinator) addScheduler(scheduler schedule.Scheduler, interval time.D
c.wg.Add(1)
go c.runScheduler(s)
c.schedulers[s.GetName()] = s
if c.opt.AddSchedulerCfg(s.GetType(), args) {
if err := c.opt.persist(c.kv); err != nil {
log.Errorf("can't persist when add scheduler %v: %v", s.GetName(), err)
}
}
return nil
}

Expand All @@ -284,6 +289,17 @@ func (c *coordinator) removeScheduler(name string) error {

s.Stop()
delete(c.schedulers, name)

ok, err := c.opt.RemoveSchedulerCfg(name)
if err != nil {
return errors.Trace(err)
}

if ok {
if err := c.opt.persist(c.kv); err != nil {
log.Errorf("can't persist when remove scheduler %v: %v", s.GetName(), err)
}
}
return nil
}

Expand Down