Skip to content

Commit

Permalink
schedule, server: persist scheduler list to etcd (#769)
Browse files Browse the repository at this point in the history
* schudule, server: persist scheduler list to etcd

* server: fix bug that use copy() incorrectly leading to src slice to be empty in clone() of ReplicationConfig

* add test of persist scheduler

* fix dead lock in mustNewCluster

* abandon test of persist-scheduler, amend test of add scheduler

* add Equals method of SchedulerConfig

* clean verbose code

* add comment

* fix typo

* add log

* add test persist-scheduler

* use reflect.DeepEqual instead

* improve test of persist-scheduler and add log

* use meanful argument in config

* auto delete invalid scheduler config

* avoid persist too frequently
  • Loading branch information
Connor1996 committed Sep 30, 2017
1 parent 91e4257 commit 16e898c
Show file tree
Hide file tree
Showing 17 changed files with 244 additions and 68 deletions.
10 changes: 3 additions & 7 deletions conf/config.toml
Expand Up @@ -53,13 +53,9 @@ replica-schedule-limit = 24

# customized schedulers, the format is as below
# if empty, it will use balance-leader, balance-region, hot-region as default
[schedule.schedulers]
# [schedule.schedulers.balance-leader]
#
# [schedule.schedulers.balance-region]
#
# [schedule.schedulers.hot-region]

# [[schedule.schedulers]]
# type = "evict-leader"
# args = ["1"]


[replication]
Expand Down
2 changes: 1 addition & 1 deletion server/cluster.go
Expand Up @@ -107,7 +107,7 @@ func (c *RaftCluster) start() error {
return nil
}
c.cachedCluster = cluster
c.coordinator = newCoordinator(c.cachedCluster, c.s.scheduleOpt, c.s.hbStreams)
c.coordinator = newCoordinator(c.cachedCluster, c.s.scheduleOpt, c.s.hbStreams, c.s.kv)
c.quit = make(chan struct{})

c.wg.Add(2)
Expand Down
72 changes: 59 additions & 13 deletions server/config.go
Expand Up @@ -17,6 +17,7 @@ import (
"flag"
"fmt"
"net/url"
"reflect"
"strings"
"sync/atomic"
"time"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/pingcap/pd/pkg/logutil"
"github.com/pingcap/pd/pkg/metricutil"
"github.com/pingcap/pd/pkg/typeutil"
"github.com/pingcap/pd/server/schedule"
)

// Config is the pd server configuration.
Expand Down Expand Up @@ -287,8 +289,7 @@ func (c *Config) String() string {

// configFromFile loads config from file.
func (c *Config) configFromFile(path string) error {
meta, err := toml.DecodeFile(path, c)
c.Schedule.Meta = meta
_, err := toml.DecodeFile(path, c)
return errors.Trace(err)
}

Expand All @@ -308,14 +309,29 @@ type ScheduleConfig struct {
ReplicaScheduleLimit uint64 `toml:"replica-schedule-limit,omitempty" json:"replica-schedule-limit"`
// Schedulers support for loding customized schedulers
Schedulers SchedulerConfigs `toml:"schedulers,omitempty" json:"schedulers"`
}

// Meta is meta information return from toml.Decode
// passing it to scheduler handler for supporting customized arguments
Meta toml.MetaData
func (c *ScheduleConfig) clone() *ScheduleConfig {
schedulers := make(SchedulerConfigs, len(c.Schedulers))
copy(schedulers, c.Schedulers)
return &ScheduleConfig{
MaxSnapshotCount: c.MaxSnapshotCount,
MaxStoreDownTime: c.MaxStoreDownTime,
LeaderScheduleLimit: c.LeaderScheduleLimit,
RegionScheduleLimit: c.RegionScheduleLimit,
ReplicaScheduleLimit: c.ReplicaScheduleLimit,
Schedulers: schedulers,
}
}

// SchedulerConfigs is a map of customized scheduler configuration.
type SchedulerConfigs map[string]toml.Primitive
// SchedulerConfigs is a slice of customized scheduler configuration.
type SchedulerConfigs []SchedulerConfig

// SchedulerConfig is customized scheduler configuration
type SchedulerConfig struct {
Type string `toml:"type" json:"type"`
Args []string `toml:"args,omitempty" json:"args"`
}

const (
defaultMaxReplicas = 3
Expand All @@ -327,9 +343,9 @@ const (
)

var defaultSchedulers = SchedulerConfigs{
"balance-region": toml.Primitive{},
"balance-leader": toml.Primitive{},
"hot-region": toml.Primitive{},
{Type: "balance-region"},
{Type: "balance-leader"},
{Type: "hot-region"},
}

func (c *ScheduleConfig) adjust() {
Expand All @@ -354,7 +370,7 @@ type ReplicationConfig struct {
}

func (c *ReplicationConfig) clone() *ReplicationConfig {
locationLabels := make(typeutil.StringSlice, 0, len(c.LocationLabels))
locationLabels := make(typeutil.StringSlice, len(c.LocationLabels))
copy(locationLabels, c.LocationLabels)
return &ReplicationConfig{
MaxReplicas: c.MaxReplicas,
Expand Down Expand Up @@ -427,8 +443,38 @@ func (o *scheduleOption) GetSchedulers() SchedulerConfigs {
return o.load().Schedulers
}

func (o *scheduleOption) GetMeta() toml.MetaData {
return o.load().Meta
func (o *scheduleOption) AddSchedulerCfg(tp string, args []string) error {
c := o.load()
v := c.clone()
for _, schedulerCfg := range v.Schedulers {
// comparing args is to cover the case that there are schedulers in same type but not with same name
// such as two schedulers of type "evict-leader",
// one name is "evict-leader-scheduler-1" and the other is "evict-leader-scheduler-2"
if reflect.DeepEqual(schedulerCfg, SchedulerConfig{tp, args}) {
return nil
}
}
v.Schedulers = append(v.Schedulers, SchedulerConfig{Type: tp, Args: args})
o.store(v)
return nil
}

func (o *scheduleOption) RemoveSchedulerCfg(name string) error {
c := o.load()
v := c.clone()
for i, schedulerCfg := range v.Schedulers {
// To create a temporary scheduler is just used to get scheduler's name
tmp, err := schedule.CreateScheduler(schedulerCfg.Type, o, schedulerCfg.Args...)
if err != nil {
return errors.Trace(err)
}
if tmp.GetName() == name {
v.Schedulers = append(v.Schedulers[:i], v.Schedulers[i+1:]...)
o.store(v)
return nil
}
}
return nil
}

func (o *scheduleOption) persist(kv *kv) error {
Expand Down
42 changes: 32 additions & 10 deletions server/coordinator.go
Expand Up @@ -68,9 +68,10 @@ type coordinator struct {
schedulers map[string]*scheduleController
histories cache.Cache
hbStreams *heartbeatStreams
kv *kv
}

func newCoordinator(cluster *clusterInfo, opt *scheduleOption, hbStreams *heartbeatStreams) *coordinator {
func newCoordinator(cluster *clusterInfo, opt *scheduleOption, hbStreams *heartbeatStreams, kv *kv) *coordinator {
ctx, cancel := context.WithCancel(context.Background())
return &coordinator{
ctx: ctx,
Expand All @@ -83,6 +84,7 @@ func newCoordinator(cluster *clusterInfo, opt *scheduleOption, hbStreams *heartb
schedulers: make(map[string]*scheduleController),
histories: cache.NewDefaultCache(historiesCacheSize),
hbStreams: hbStreams,
kv: kv,
}
}

Expand Down Expand Up @@ -133,19 +135,32 @@ func (c *coordinator) run() {
}
log.Info("coordinator: Run scheduler")

for name := range c.opt.GetSchedulers() {
// note: CreateScheduler just needs specific scheduler config
// so schedulers(a map of scheduler configs) wrapped by c.opt has redundant configs
s, err := schedule.CreateScheduler(name, c.opt)
k := 0
scheduleCfg := c.opt.load()
for _, schedulerCfg := range scheduleCfg.Schedulers {
s, err := schedule.CreateScheduler(schedulerCfg.Type, c.opt, schedulerCfg.Args...)
if err != nil {
log.Errorf("can not create scheduler %s: %v", name, err)
log.Errorf("can not create scheduler %s: %v", schedulerCfg.Type, err)
} else {
log.Infof("create scheduler %s", name)
if err := c.addScheduler(s, s.GetInterval()); err != nil {
log.Errorf("can not add scheduler %s: %v", name, err)
log.Infof("create scheduler %s", s.GetName())
if err = c.addScheduler(s, s.GetInterval(), schedulerCfg.Args...); err != nil {
log.Errorf("can not add scheduler %s: %v", s.GetName(), err)
}
}

// only record valid scheduler config
if err == nil {
scheduleCfg.Schedulers[k] = schedulerCfg
k++
}
}

// remove invalid scheduler config and persist
scheduleCfg.Schedulers = scheduleCfg.Schedulers[:k]
if err := c.opt.persist(c.kv); err != nil {
log.Errorf("can't persist schedule config: %v", err)
}

}

func (c *coordinator) stop() {
Expand Down Expand Up @@ -254,7 +269,7 @@ func (c *coordinator) shouldRun() bool {
return c.cluster.isPrepared()
}

func (c *coordinator) addScheduler(scheduler schedule.Scheduler, interval time.Duration) error {
func (c *coordinator) addScheduler(scheduler schedule.Scheduler, interval time.Duration, args ...string) error {
c.Lock()
defer c.Unlock()

Expand All @@ -270,6 +285,8 @@ func (c *coordinator) addScheduler(scheduler schedule.Scheduler, interval time.D
c.wg.Add(1)
go c.runScheduler(s)
c.schedulers[s.GetName()] = s
c.opt.AddSchedulerCfg(s.GetType(), args)

return nil
}

Expand All @@ -284,6 +301,11 @@ func (c *coordinator) removeScheduler(name string) error {

s.Stop()
delete(c.schedulers, name)

if err := c.opt.RemoveSchedulerCfg(name); err != nil {
return errors.Trace(err)
}

return nil
}

Expand Down

0 comments on commit 16e898c

Please sign in to comment.