Skip to content

Commit

Permalink
fix: add: enable-pull-scheduler-estimator-event flag
Browse files Browse the repository at this point in the history
Signed-off-by: prodan <pengshihaoren@gmail.com>
  • Loading branch information
prodanlabs committed Jun 27, 2022
1 parent 03cd025 commit e75e2fc
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 11 deletions.
3 changes: 3 additions & 0 deletions cmd/scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Options struct {

// EnableSchedulerEstimator represents whether the accurate scheduler estimator should be enabled.
EnableSchedulerEstimator bool
// EnablePullSchedulerEstimatorEvent represents whether the pull mode enabled the accurate scheduler estimator event.
EnablePullSchedulerEstimatorEvent bool
// SchedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service.
SchedulerEstimatorTimeout metav1.Duration
// SchedulerEstimatorPort is the port that the accurate scheduler estimator server serves at.
Expand Down Expand Up @@ -80,6 +82,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.BoolVar(&o.EnableSchedulerEstimator, "enable-scheduler-estimator", false, "Enable calling cluster scheduler estimator for adjusting replicas.")
fs.BoolVar(&o.EnablePullSchedulerEstimatorEvent, "enable-pull-scheduler-estimator-event", true, "Enable scheduler estimator event handler in pull mode.")
fs.DurationVar(&o.SchedulerEstimatorTimeout.Duration, "scheduler-estimator-timeout", 3*time.Second, "Specifies the timeout period of calling the scheduler estimator service.")
fs.IntVar(&o.SchedulerEstimatorPort, "scheduler-estimator-port", defaultEstimatorPort, "The secure port on which to connect the accurate scheduler estimator.")
fs.BoolVar(&o.EnableEmptyWorkloadPropagation, "enable-empty-workload-propagation", false, "Enable workload with replicas 0 to be propagated to member clusters.")
Expand Down
2 changes: 1 addition & 1 deletion cmd/scheduler/app/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func run(opts *options.Options, stopChan <-chan struct{}, registryOptions ...Opt

sched, err := scheduler.NewScheduler(dynamicClientSet, karmadaClient, kubeClientSet,
scheduler.WithOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithEnableSchedulerEstimator(opts.EnableSchedulerEstimator),
scheduler.WithEnableSchedulerEstimator(opts.EnableSchedulerEstimator, opts.EnablePullSchedulerEstimatorEvent),
scheduler.WithSchedulerEstimatorPort(opts.SchedulerEstimatorPort),
scheduler.WithSchedulerEstimatorTimeout(opts.SchedulerEstimatorTimeout),
scheduler.WithEnableEmptyWorkloadPropagation(opts.EnableEmptyWorkloadPropagation),
Expand Down
14 changes: 11 additions & 3 deletions pkg/scheduler/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (s *Scheduler) addCluster(obj interface{}) {
}
klog.V(3).Infof("Add event for cluster %s", cluster.Name)

if s.enableSchedulerEstimator {
if s.enableSchedulerEstimatorEvent(cluster.Spec.SyncMode) {
s.schedulerEstimatorWorker.Add(cluster.Name)
}
}
Expand Down Expand Up @@ -245,7 +245,7 @@ func (s *Scheduler) updateCluster(_, newObj interface{}) {
}
klog.V(3).Infof("Update event for cluster %s", newCluster.Name)

if s.enableSchedulerEstimator {
if s.enableSchedulerEstimatorEvent(newCluster.Spec.SyncMode) {
s.schedulerEstimatorWorker.Add(newCluster.Name)
}

Expand Down Expand Up @@ -283,7 +283,15 @@ func (s *Scheduler) deleteCluster(obj interface{}) {
s.enqueueAffectedBinding(cluster.Name)
s.enqueueAffectedClusterBinding(cluster.Name)

if s.enableSchedulerEstimator {
if s.enableSchedulerEstimatorEvent(cluster.Spec.SyncMode) {
s.schedulerEstimatorWorker.Add(cluster.Name)
}
}

func (s *Scheduler) enableSchedulerEstimatorEvent(clusterSyncMode clusterv1alpha1.ClusterSyncMode) bool {
if s.enableSchedulerEstimator && clusterSyncMode == clusterv1alpha1.Pull {
return s.enablePullSchedulerEstimatorEvent
}

return s.enableSchedulerEstimator
}
15 changes: 10 additions & 5 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,20 @@ type Scheduler struct {

eventRecorder record.EventRecorder

enableSchedulerEstimator bool
schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache
schedulerEstimatorPort int
schedulerEstimatorWorker util.AsyncWorker
enableSchedulerEstimator bool
enablePullSchedulerEstimatorEvent bool
schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache
schedulerEstimatorPort int
schedulerEstimatorWorker util.AsyncWorker

enableEmptyWorkloadPropagation bool
}

type schedulerOptions struct {
// enableSchedulerEstimator represents whether the accurate scheduler estimator should be enabled.
enableSchedulerEstimator bool
// enablePullSchedulerEstimatorEvent represents whether the pull mode enabled the accurate scheduler estimator event.
enablePullSchedulerEstimatorEvent bool
// schedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service.
schedulerEstimatorTimeout metav1.Duration
// schedulerEstimatorPort is the port that the accurate scheduler estimator server serves at.
Expand All @@ -105,9 +108,10 @@ type schedulerOptions struct {
type Option func(*schedulerOptions)

// WithEnableSchedulerEstimator sets the enableSchedulerEstimator for scheduler
func WithEnableSchedulerEstimator(enableSchedulerEstimator bool) Option {
func WithEnableSchedulerEstimator(enableSchedulerEstimator, enablePullSchedulerEstimatorEvent bool) Option {
return func(o *schedulerOptions) {
o.enableSchedulerEstimator = enableSchedulerEstimator
o.enablePullSchedulerEstimatorEvent = enablePullSchedulerEstimatorEvent
}
}

Expand Down Expand Up @@ -183,6 +187,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse

if options.enableSchedulerEstimator {
sched.enableSchedulerEstimator = options.enableSchedulerEstimator
sched.enablePullSchedulerEstimatorEvent = options.enablePullSchedulerEstimatorEvent
sched.schedulerEstimatorPort = options.schedulerEstimatorPort
sched.schedulerEstimatorCache = estimatorclient.NewSchedulerEstimatorCache()
schedulerEstimatorWorkerOptions := util.Options{
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestCreateScheduler(t *testing.T) {
{
name: "scheduler with enableSchedulerEstimator enabled",
opts: []Option{
WithEnableSchedulerEstimator(true),
WithEnableSchedulerEstimator(true, true),
WithSchedulerEstimatorPort(port),
},
enableSchedulerEstimator: true,
Expand All @@ -39,7 +39,7 @@ func TestCreateScheduler(t *testing.T) {
{
name: "scheduler with enableSchedulerEstimator disabled, WithSchedulerEstimatorPort enabled",
opts: []Option{
WithEnableSchedulerEstimator(false),
WithEnableSchedulerEstimator(false, false),
WithSchedulerEstimatorPort(port),
},
enableSchedulerEstimator: false,
Expand Down

0 comments on commit e75e2fc

Please sign in to comment.