New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
schedule, server: persist scheduler list to etcd #769
Changes from 10 commits
afd7bd1
43a712e
c1e395b
d040b5a
305c92a
349d872
40f88ca
20445fc
26c78b7
99c1147
81902c9
9574357
e295ebe
a85bee5
dacd8c7
c56aa0e
79ab497
8bf0508
769dfc6
3bea025
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,13 +53,16 @@ replica-schedule-limit = 24 | |
|
||
# customized schedulers, the format is as below | ||
# if empty, it will use balance-leader, balance-region, hot-region as default | ||
[schedule.schedulers] | ||
# [schedule.schedulers.balance-leader] | ||
# [[schedule.schedulers]] | ||
# type = "balance-leader" | ||
# args = ["foo", "bar"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bad arg name example, what does foo or bar mean here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nothing, just placeholder There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use more meaningful argument names if possible or remove these. |
||
# | ||
# [schedule.schedulers.balance-region] | ||
# [[schedule.schedulers]] | ||
# type = "balance-region" | ||
# | ||
# [schedule.schedulers.hot-region] | ||
|
||
# [[schedule.schedulers]] | ||
# type = "hot-region" | ||
# | ||
|
||
|
||
[replication] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ import ( | |
"github.com/pingcap/pd/pkg/logutil" | ||
"github.com/pingcap/pd/pkg/metricutil" | ||
"github.com/pingcap/pd/pkg/typeutil" | ||
"github.com/pingcap/pd/server/schedule" | ||
) | ||
|
||
// Config is the pd server configuration. | ||
|
@@ -287,8 +288,7 @@ func (c *Config) String() string { | |
|
||
// configFromFile loads config from file. | ||
func (c *Config) configFromFile(path string) error { | ||
meta, err := toml.DecodeFile(path, c) | ||
c.Schedule.Meta = meta | ||
_, err := toml.DecodeFile(path, c) | ||
return errors.Trace(err) | ||
} | ||
|
||
|
@@ -308,14 +308,29 @@ type ScheduleConfig struct { | |
ReplicaScheduleLimit uint64 `toml:"replica-schedule-limit,omitempty" json:"replica-schedule-limit"` | ||
// Schedulers support for loding customized schedulers | ||
Schedulers SchedulerConfigs `toml:"schedulers,omitempty" json:"schedulers"` | ||
} | ||
|
||
// Meta is meta information return from toml.Decode | ||
// passing it to scheduler handler for supporting customized arguments | ||
Meta toml.MetaData | ||
func (c *ScheduleConfig) clone() *ScheduleConfig { | ||
schedulers := make(SchedulerConfigs, len(c.Schedulers)) | ||
copy(schedulers, c.Schedulers) | ||
return &ScheduleConfig{ | ||
MaxSnapshotCount: c.MaxSnapshotCount, | ||
MaxStoreDownTime: c.MaxStoreDownTime, | ||
LeaderScheduleLimit: c.LeaderScheduleLimit, | ||
RegionScheduleLimit: c.RegionScheduleLimit, | ||
ReplicaScheduleLimit: c.ReplicaScheduleLimit, | ||
Schedulers: schedulers, | ||
} | ||
} | ||
|
||
// SchedulerConfigs is a map of customized scheduler configuration. | ||
type SchedulerConfigs map[string]toml.Primitive | ||
// SchedulerConfigs is a slice of customized scheduler configuration. | ||
type SchedulerConfigs []SchedulerConfig | ||
|
||
// SchedulerConfig is customized scheduler configuration | ||
type SchedulerConfig struct { | ||
Type string `toml:"type" json:"type"` | ||
Args []string `toml:"args,omitempty" json:"args"` | ||
} | ||
|
||
const ( | ||
defaultMaxReplicas = 3 | ||
|
@@ -327,9 +342,29 @@ const ( | |
) | ||
|
||
var defaultSchedulers = SchedulerConfigs{ | ||
"balance-region": toml.Primitive{}, | ||
"balance-leader": toml.Primitive{}, | ||
"hot-region": toml.Primitive{}, | ||
SchedulerConfig{Type: "balance-region"}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seem you can use SchedulerConfigs {
{Type:"xxx"},
{Type:"xxx"},
} |
||
SchedulerConfig{Type: "balance-leader"}, | ||
SchedulerConfig{Type: "hot-region"}, | ||
} | ||
|
||
// Equals is to compare whether two ScheduleConfig is same or not | ||
func (c *SchedulerConfig) Equals(s SchedulerConfig) bool { | ||
stringSliceEqual := func(a, b []string) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use reflect.DeepEqual |
||
if len(a) != len(b) { | ||
return false | ||
} | ||
for i, v := range a { | ||
if v != b[i] { | ||
return false | ||
} | ||
} | ||
return true | ||
} | ||
|
||
// comparing args is to cover the case that there are schedulers in same type but not with same name | ||
// such as two schedulers of type evictLeader, | ||
// one name is "evict-leader-scheduler-1" and the other is "evict-leader-scheduler-2" | ||
return c.Type == s.Type && stringSliceEqual(c.Args, s.Args) | ||
} | ||
|
||
func (c *ScheduleConfig) adjust() { | ||
|
@@ -354,7 +389,7 @@ type ReplicationConfig struct { | |
} | ||
|
||
func (c *ReplicationConfig) clone() *ReplicationConfig { | ||
locationLabels := make(typeutil.StringSlice, 0, len(c.LocationLabels)) | ||
locationLabels := make(typeutil.StringSlice, len(c.LocationLabels)) | ||
copy(locationLabels, c.LocationLabels) | ||
return &ReplicationConfig{ | ||
MaxReplicas: c.MaxReplicas, | ||
|
@@ -427,8 +462,32 @@ func (o *scheduleOption) GetSchedulers() SchedulerConfigs { | |
return o.load().Schedulers | ||
} | ||
|
||
func (o *scheduleOption) GetMeta() toml.MetaData { | ||
return o.load().Meta | ||
func (o *scheduleOption) AddSchedulerCfg(tp string, args []string) bool { | ||
c := o.load() | ||
v := c.clone() | ||
for _, schedulerCfg := range v.Schedulers { | ||
if schedulerCfg.Equals(SchedulerConfig{tp, args}) { | ||
return false | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK |
||
} | ||
v.Schedulers = append(v.Schedulers, SchedulerConfig{Type: tp, Args: args}) | ||
o.store(v) | ||
return true | ||
} | ||
|
||
func (o *scheduleOption) RemoveSchedulerCfg(name string) bool { | ||
c := o.load() | ||
v := c.clone() | ||
for i, schedulerCfg := range v.Schedulers { | ||
// To create a temporary scheduler is just used to get scheduler's name | ||
tmp, _ := schedule.CreateScheduler(schedulerCfg.Type, o, schedulerCfg.Args...) | ||
if tmp.GetName() == name { | ||
v.Schedulers = append(v.Schedulers[:i], v.Schedulers[i+1:]...) | ||
o.store(v) | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
func (o *scheduleOption) persist(kv *kv) error { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,9 +68,10 @@ type coordinator struct { | |
schedulers map[string]*scheduleController | ||
histories cache.Cache | ||
hbStreams *heartbeatStreams | ||
kv *kv | ||
} | ||
|
||
func newCoordinator(cluster *clusterInfo, opt *scheduleOption, hbStreams *heartbeatStreams) *coordinator { | ||
func newCoordinator(cluster *clusterInfo, opt *scheduleOption, hbStreams *heartbeatStreams, kv *kv) *coordinator { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
return &coordinator{ | ||
ctx: ctx, | ||
|
@@ -83,6 +84,7 @@ func newCoordinator(cluster *clusterInfo, opt *scheduleOption, hbStreams *heartb | |
schedulers: make(map[string]*scheduleController), | ||
histories: cache.NewDefaultCache(historiesCacheSize), | ||
hbStreams: hbStreams, | ||
kv: kv, | ||
} | ||
} | ||
|
||
|
@@ -133,16 +135,14 @@ func (c *coordinator) run() { | |
} | ||
log.Info("coordinator: Run scheduler") | ||
|
||
for name := range c.opt.GetSchedulers() { | ||
// note: CreateScheduler just needs specific scheduler config | ||
// so schedulers(a map of scheduler configs) wrapped by c.opt has redundant configs | ||
s, err := schedule.CreateScheduler(name, c.opt) | ||
for _, schedulerCfg := range c.opt.GetSchedulers() { | ||
s, err := schedule.CreateScheduler(schedulerCfg.Type, c.opt, schedulerCfg.Args...) | ||
if err != nil { | ||
log.Errorf("can not create scheduler %s: %v", name, err) | ||
log.Errorf("can not create scheduler %s: %v", schedulerCfg.Type, err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe you can remove invalid persist schedulers in here according to the error types. |
||
} else { | ||
log.Infof("create scheduler %s", name) | ||
if err := c.addScheduler(s, s.GetInterval()); err != nil { | ||
log.Errorf("can not add scheduler %s: %v", name, err) | ||
log.Infof("create scheduler %s", s.GetName()) | ||
if err := c.addScheduler(s, s.GetInterval(), schedulerCfg.Args...); err != nil { | ||
log.Errorf("can not add scheduler %s: %v", s.GetName(), err) | ||
} | ||
} | ||
} | ||
|
@@ -254,7 +254,7 @@ func (c *coordinator) shouldRun() bool { | |
return c.cluster.isPrepared() | ||
} | ||
|
||
func (c *coordinator) addScheduler(scheduler schedule.Scheduler, interval time.Duration) error { | ||
func (c *coordinator) addScheduler(scheduler schedule.Scheduler, interval time.Duration, args ...string) error { | ||
c.Lock() | ||
defer c.Unlock() | ||
|
||
|
@@ -270,6 +270,9 @@ func (c *coordinator) addScheduler(scheduler schedule.Scheduler, interval time.D | |
c.wg.Add(1) | ||
go c.runScheduler(s) | ||
c.schedulers[s.GetName()] = s | ||
if c.opt.AddSchedulerCfg(s.GetType(), args) { | ||
c.opt.persist(c.kv) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. print a log if persist meet error |
||
} | ||
return nil | ||
} | ||
|
||
|
@@ -284,6 +287,10 @@ func (c *coordinator) removeScheduler(name string) error { | |
|
||
s.Stop() | ||
delete(c.schedulers, name) | ||
|
||
if c.opt.RemoveSchedulerCfg(name) { | ||
c.opt.persist(c.kv) | ||
} | ||
return nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use two PRs, one is to change the name, the other is to persist the config.