Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create-a-new-scheduler-constructor #69057

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 27 additions & 7 deletions cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,34 @@ func run(cmd *cobra.Command, args []string, opts *options.Options) error {
return fmt.Errorf("unable to register configz: %s", err)
}

// Build a scheduler config from the provided algorithm source.
schedulerConfig, err := NewSchedulerConfig(cc)
if err != nil {
return err
var storageClassInformer storageinformers.StorageClassInformer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
storageClassInformer = c.InformerFactory.Storage().V1().StorageClasses()
}

// Create the scheduler.
sched := scheduler.NewFromConfig(schedulerConfig)
sched, err := scheduler.New(c.Client,
c.InformerFactory.Core().V1().Nodes(),
c.PodInformer,
c.InformerFactory.Core().V1().PersistentVolumes(),
c.InformerFactory.Core().V1().PersistentVolumeClaims(),
c.InformerFactory.Core().V1().ReplicationControllers(),
c.InformerFactory.Apps().V1().ReplicaSets(),
c.InformerFactory.Apps().V1().StatefulSets(),
c.InformerFactory.Core().V1().Services(),
c.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
storageClassInformer,
c.Recorder,
c.ComponentConfig.AlgorithmSource,
scheduler.WithName(c.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(c.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithEquivalenceClassCacheEnabled(c.ComponentConfig.EnableContentionProfiling),
scheduler.WithPreemptionDisabled(c.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(c.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*c.ComponentConfig.BindTimeoutSeconds))
if err != nil {
return err
}

// Prepare the event broadcaster.
if cc.Broadcaster != nil && cc.EventClient != nil {
Expand Down Expand Up @@ -284,7 +304,7 @@ func newHealthzHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, s
}

// NewSchedulerConfig creates the scheduler configuration. This is exposed for use by tests.
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Config, error) {
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*factory.Config, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and this is still in use by test/integration. We'll have to clean it up (delete it) later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need I clean it up in this pr? or next?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this PR

var storageClassInformer storageinformers.StorageClassInformer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
storageClassInformer = s.InformerFactory.Storage().V1().StorageClasses()
Expand Down Expand Up @@ -312,7 +332,7 @@ func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Con
})

source := s.ComponentConfig.AlgorithmSource
var config *scheduler.Config
var config *factory.Config
switch {
case source.Provider != nil:
// Create the config from a named algorithm provider.
Expand Down
16 changes: 14 additions & 2 deletions pkg/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@ go_library(
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/api/latest:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
Expand All @@ -42,7 +48,10 @@ go_test(
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library",
"//pkg/scheduler/testing:go_default_library",
Expand All @@ -53,8 +62,11 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
],
Expand Down
3 changes: 1 addition & 2 deletions pkg/scheduler/factory/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"//pkg/apis/core/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
Expand Down Expand Up @@ -50,6 +49,7 @@ go_library(
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
Expand All @@ -64,7 +64,6 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/api/testing:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/api:go_default_library",
Expand Down
110 changes: 101 additions & 9 deletions pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ import (
policylisters "k8s.io/client-go/listers/policy/v1beta1"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/helper"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
Expand All @@ -78,6 +78,98 @@ var (
maxPDVolumeCountPredicateKeys = []string{predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred, predicates.MaxEBSVolumeCountPred}
)

// Binder knows how to write a binding.
type Binder interface {
Bind(binding *v1.Binding) error
}

// PodConditionUpdater updates the condition of a pod based on the passed
// PodCondition
type PodConditionUpdater interface {
Update(pod *v1.Pod, podCondition *v1.PodCondition) error
}

// Config is an implementation of the Scheduler's configured input data.
// TODO over time we should make this struct a hidden implementation detail of the scheduler.
type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache schedulerinternalcache.Cache
// Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod
Ecache *equivalence.Cache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
PodPreemptor PodPreemptor

// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() *v1.Pod

// WaitForCacheSync waits for scheduler cache to populate.
// It returns true if it was successful, false if the controller should shutdown.
WaitForCacheSync func() bool

// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*v1.Pod, error)

// Recorder is the EventRecorder to use
Recorder record.EventRecorder

// Close this to shut down the scheduler.
StopEverything chan struct{}

// VolumeBinder handles PVC/PV binding for the pod.
VolumeBinder *volumebinder.VolumeBinder

// Disable pod preemption or not.
DisablePreemption bool
}

// PodPreemptor has methods needed to delete a pod and to update
// annotations of the preemptor pod.
type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
RemoveNominatedNodeName(pod *v1.Pod) error
}

// Configurator defines I/O, caching, and other functionality needed to
// construct a new scheduler. An implementation of this can be seen in
// factory.go.
type Configurator interface {
// Exposed for testing
GetHardPodAffinitySymmetricWeight() int32
// Exposed for testing
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error)

// Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler
GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error)
GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error)

// Needs to be exposed for things like integration tests where we want to make fake nodes.
GetNodeLister() corelisters.NodeLister
// Exposed for testing
GetClient() clientset.Interface
// Exposed for testing
GetScheduledPodLister() corelisters.PodLister

Create() (*Config, error)
CreateFromProvider(providerName string) (*Config, error)
CreateFromConfig(policy schedulerapi.Policy) (*Config, error)
CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error)
}

// configFactory is the default implementation of the scheduler.Configurator interface.
type configFactory struct {
client clientset.Interface
Expand Down Expand Up @@ -164,7 +256,7 @@ type ConfigFactoryArgs struct {

// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
stopEverything := make(chan struct{})
schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)

Expand Down Expand Up @@ -992,12 +1084,12 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) {
}

// Create creates a scheduler with the default algorithm provider.
func (c *configFactory) Create() (*scheduler.Config, error) {
func (c *configFactory) Create() (*Config, error) {
return c.CreateFromProvider(DefaultProvider)
}

// Creates a scheduler from the name of a registered algorithm provider.
func (c *configFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) {
func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
glog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
provider, err := GetAlgorithmProvider(providerName)
if err != nil {
Expand All @@ -1008,7 +1100,7 @@ func (c *configFactory) CreateFromProvider(providerName string) (*scheduler.Conf
}

// Creates a scheduler from the configuration file
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler.Config, error) {
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
glog.V(2).Infof("Creating scheduler from configuration: %v", policy)

// validate the policy configuration
Expand Down Expand Up @@ -1079,7 +1171,7 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
}

// getBinderFunc returns an func which returns an extender that supports bind or a default binder based on the given pod.
func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) scheduler.Binder {
func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder {
var extenderBinder algorithm.SchedulerExtender
for i := range extenders {
if extenders[i].IsBinder() {
Expand All @@ -1088,7 +1180,7 @@ func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) f
}
}
defaultBinder := &binder{c.client}
return func(pod *v1.Pod) scheduler.Binder {
return func(pod *v1.Pod) Binder {
if extenderBinder != nil && extenderBinder.IsInterested(pod) {
return extenderBinder
}
Expand All @@ -1097,7 +1189,7 @@ func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) f
}

// Creates a scheduler from a set of registered fit predicate keys and priority keys.
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)

if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
Expand Down Expand Up @@ -1148,7 +1240,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
)

podBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{
return &Config{
SchedulerCache: c.schedulerCache,
Ecache: c.equivalencePodCache,
// The scheduler only needs to consider schedulable nodes.
Expand Down
3 changes: 1 addition & 2 deletions pkg/scheduler/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"k8s.io/client-go/tools/cache"
utiltesting "k8s.io/client-go/util/testing"
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
Expand Down Expand Up @@ -540,7 +539,7 @@ func TestSkipPodUpdate(t *testing.T) {
}
}

func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeight int32) scheduler.Configurator {
func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeight int32) Configurator {
informerFactory := informers.NewSharedInformerFactory(client, 0)
return NewConfigFactory(&ConfigFactoryArgs{
v1.DefaultSchedulerName,
Expand Down