Skip to content

Commit

Permalink
create a new scheduler constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
denkensk committed Oct 10, 2018
1 parent d3fe0ea commit a74fd15
Show file tree
Hide file tree
Showing 13 changed files with 335 additions and 141 deletions.
24 changes: 17 additions & 7 deletions cmd/kube-scheduler/app/server.go
Expand Up @@ -141,14 +141,24 @@ func run(cmd *cobra.Command, args []string, opts *options.Options) error {
return fmt.Errorf("unable to register configz: %s", err)
}

// Build a scheduler config from the provided algorithm source.
schedulerConfig, err := NewSchedulerConfig(cc)
if err != nil {
return err
var storageClassInformer storageinformers.StorageClassInformer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
storageClassInformer = c.InformerFactory.Storage().V1().StorageClasses()
}

// Create the scheduler.
sched := scheduler.NewFromConfig(schedulerConfig)
sched, err := scheduler.New(c.Client, c.InformerFactory.Core().V1().Nodes(), c.PodInformer,
c.InformerFactory.Core().V1().PersistentVolumes(), c.InformerFactory.Core().V1().PersistentVolumeClaims(),
c.InformerFactory.Core().V1().ReplicationControllers(), c.InformerFactory.Apps().V1().ReplicaSets(),
c.InformerFactory.Apps().V1().StatefulSets(), c.InformerFactory.Core().V1().Services(),
c.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(), storageClassInformer, c.Recorder, c.ComponentConfig.AlgorithmSource,
scheduler.WithName(c.ComponentConfig.SchedulerName), scheduler.WithHardPodAffinitySymmetricWeight(c.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithEquivalenceClassCacheEnabled(c.ComponentConfig.EnableContentionProfiling),
scheduler.WithPreemptionDisabled(c.ComponentConfig.DisablePreemption), scheduler.WithPercentageOfNodesToScore(c.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*c.ComponentConfig.BindTimeoutSeconds))
if err != nil {
return err
}

// Prepare the event broadcaster.
if cc.Broadcaster != nil && cc.EventClient != nil {
Expand Down Expand Up @@ -284,7 +294,7 @@ func newHealthzHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, s
}

// NewSchedulerConfig creates the scheduler configuration. This is exposed for use by tests.
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Config, error) {
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*factory.Config, error) {
var storageClassInformer storageinformers.StorageClassInformer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
storageClassInformer = s.InformerFactory.Storage().V1().StorageClasses()
Expand Down Expand Up @@ -312,7 +322,7 @@ func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Con
})

source := s.ComponentConfig.AlgorithmSource
var config *scheduler.Config
var config *factory.Config
switch {
case source.Provider != nil:
// Create the config from a named algorithm provider.
Expand Down
11 changes: 9 additions & 2 deletions pkg/scheduler/BUILD
Expand Up @@ -13,18 +13,24 @@ go_library(
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/api/latest:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
Expand All @@ -43,6 +49,7 @@ go_test(
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library",
"//pkg/scheduler/testing:go_default_library",
Expand Down
3 changes: 1 addition & 2 deletions pkg/scheduler/factory/BUILD
Expand Up @@ -16,7 +16,6 @@ go_library(
"//pkg/apis/core/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
Expand Down Expand Up @@ -50,6 +49,7 @@ go_library(
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
Expand All @@ -64,7 +64,6 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/api/testing:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/api:go_default_library",
Expand Down
110 changes: 101 additions & 9 deletions pkg/scheduler/factory/factory.go
Expand Up @@ -48,11 +48,11 @@ import (
policylisters "k8s.io/client-go/listers/policy/v1beta1"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/helper"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
Expand All @@ -78,6 +78,98 @@ var (
maxPDVolumeCountPredicateKeys = []string{predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred, predicates.MaxEBSVolumeCountPred}
)

// Binder knows how to write a binding.
type Binder interface {
Bind(binding *v1.Binding) error
}

// PodConditionUpdater updates the condition of a pod based on the passed
// PodCondition
type PodConditionUpdater interface {
Update(pod *v1.Pod, podCondition *v1.PodCondition) error
}

// Config is an implementation of the Scheduler's configured input data.
// TODO over time we should make this struct a hidden implementation detail of the scheduler.
type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache schedulerinternalcache.Cache
// Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod
Ecache *equivalence.Cache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
PodPreemptor PodPreemptor

// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() *v1.Pod

// WaitForCacheSync waits for scheduler cache to populate.
// It returns true if it was successful, false if the controller should shutdown.
WaitForCacheSync func() bool

// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*v1.Pod, error)

// Recorder is the EventRecorder to use
Recorder record.EventRecorder

// Close this to shut down the scheduler.
StopEverything chan struct{}

// VolumeBinder handles PVC/PV binding for the pod.
VolumeBinder *volumebinder.VolumeBinder

// Disable pod preemption or not.
DisablePreemption bool
}

// PodPreemptor has methods needed to delete a pod and to update
// annotations of the preemptor pod.
type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
RemoveNominatedNodeName(pod *v1.Pod) error
}

// Configurator defines I/O, caching, and other functionality needed to
// construct a new scheduler. An implementation of this can be seen in
// factory.go.
type Configurator interface {
// Exposed for testing
GetHardPodAffinitySymmetricWeight() int32
// Exposed for testing
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error)

// Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler
GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error)
GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error)

// Needs to be exposed for things like integration tests where we want to make fake nodes.
GetNodeLister() corelisters.NodeLister
// Exposed for testing
GetClient() clientset.Interface
// Exposed for testing
GetScheduledPodLister() corelisters.PodLister

Create() (*Config, error)
CreateFromProvider(providerName string) (*Config, error)
CreateFromConfig(policy schedulerapi.Policy) (*Config, error)
CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error)
}

// configFactory is the default implementation of the scheduler.Configurator interface.
type configFactory struct {
client clientset.Interface
Expand Down Expand Up @@ -164,7 +256,7 @@ type ConfigFactoryArgs struct {

// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
stopEverything := make(chan struct{})
schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)

Expand Down Expand Up @@ -992,12 +1084,12 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) {
}

// Create creates a scheduler with the default algorithm provider.
func (c *configFactory) Create() (*scheduler.Config, error) {
func (c *configFactory) Create() (*Config, error) {
return c.CreateFromProvider(DefaultProvider)
}

// Creates a scheduler from the name of a registered algorithm provider.
func (c *configFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) {
func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
glog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
provider, err := GetAlgorithmProvider(providerName)
if err != nil {
Expand All @@ -1008,7 +1100,7 @@ func (c *configFactory) CreateFromProvider(providerName string) (*scheduler.Conf
}

// Creates a scheduler from the configuration file
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler.Config, error) {
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
glog.V(2).Infof("Creating scheduler from configuration: %v", policy)

// validate the policy configuration
Expand Down Expand Up @@ -1079,7 +1171,7 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
}

// getBinderFunc returns an func which returns an extender that supports bind or a default binder based on the given pod.
func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) scheduler.Binder {
func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder {
var extenderBinder algorithm.SchedulerExtender
for i := range extenders {
if extenders[i].IsBinder() {
Expand All @@ -1088,7 +1180,7 @@ func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) f
}
}
defaultBinder := &binder{c.client}
return func(pod *v1.Pod) scheduler.Binder {
return func(pod *v1.Pod) Binder {
if extenderBinder != nil && extenderBinder.IsInterested(pod) {
return extenderBinder
}
Expand All @@ -1097,7 +1189,7 @@ func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) f
}

// Creates a scheduler from a set of registered fit predicate keys and priority keys.
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)

if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
Expand Down Expand Up @@ -1148,7 +1240,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
)

podBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{
return &Config{
SchedulerCache: c.schedulerCache,
Ecache: c.equivalencePodCache,
// The scheduler only needs to consider schedulable nodes.
Expand Down
3 changes: 1 addition & 2 deletions pkg/scheduler/factory/factory_test.go
Expand Up @@ -36,7 +36,6 @@ import (
"k8s.io/client-go/tools/cache"
utiltesting "k8s.io/client-go/util/testing"
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
Expand Down Expand Up @@ -540,7 +539,7 @@ func TestSkipPodUpdate(t *testing.T) {
}
}

func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeight int32) scheduler.Configurator {
func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeight int32) Configurator {
informerFactory := informers.NewSharedInformerFactory(client, 0)
return NewConfigFactory(&ConfigFactoryArgs{
v1.DefaultSchedulerName,
Expand Down

0 comments on commit a74fd15

Please sign in to comment.