From efb2bb71cd4dede007289634bfe3c51484625ba9 Mon Sep 17 00:00:00 2001 From: Dan Mace Date: Tue, 7 Nov 2017 09:41:39 -0500 Subject: [PATCH] Refactor scheduler config API Refactor the kube-scheduler configuration API, command setup, and server setup according to the guidelines established in #32215 and using the kube-proxy refactor (#34727) as a model of a well factored component adhering to said guidelines. * Config API: clarify meaning and use of algorithm source by replacing modality derived from bools and string emptiness checks with an explicit AlgorithmSource type hierarchy. * Config API: consolidate client connection config with common structs. * Config API: split and simplify healthz/metrics server configuration. * Config API: clarify leader election configuration. * Config API: improve defaulting. * CLI: deprecate all flags except `--config`. * CLI: port all flags to new config API. * CLI: refactor to match kube-proxy Cobra command style. * Server: refactor away configurator.go to clarify application wiring. * Server: refactor to more clearly separate wiring/setup from running. Fixes #52428. --- cmd/hyperkube/kube-scheduler.go | 28 +- cmd/kubeadm/app/preflight/checks.go | 6 +- pkg/apis/componentconfig/types.go | 115 ++- pkg/apis/componentconfig/v1alpha1/defaults.go | 72 +- pkg/apis/componentconfig/v1alpha1/types.go | 123 +++- .../componentconfig/zz_generated.deepcopy.go | 159 ++++ plugin/cmd/kube-scheduler/app/configurator.go | 196 ----- .../kube-scheduler/app/configurator_test.go | 31 - .../cmd/kube-scheduler/app/options/options.go | 97 --- .../app/options/options_test.go | 103 --- plugin/cmd/kube-scheduler/app/server.go | 693 +++++++++++++++--- plugin/cmd/kube-scheduler/scheduler.go | 32 +- plugin/pkg/scheduler/scheduler.go | 8 + test/integration/scheduler/scheduler_test.go | 139 ++-- 14 files changed, 1072 insertions(+), 730 deletions(-) delete mode 100644 plugin/cmd/kube-scheduler/app/configurator.go delete mode 100644 plugin/cmd/kube-scheduler/app/configurator_test.go delete mode 100644 plugin/cmd/kube-scheduler/app/options/options.go delete mode 100644 plugin/cmd/kube-scheduler/app/options/options_test.go diff --git a/cmd/hyperkube/kube-scheduler.go b/cmd/hyperkube/kube-scheduler.go index 5e5abb2c6308f..cfd68dc15dc93 100644 --- a/cmd/hyperkube/kube-scheduler.go +++ b/cmd/hyperkube/kube-scheduler.go @@ -17,24 +17,38 @@ limitations under the License. package main import ( + "flag" + + "k8s.io/apiserver/pkg/server/healthz" "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app" - "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" ) // NewScheduler creates a new hyperkube Server object that includes the // description and flags. func NewScheduler() *Server { - s := options.NewSchedulerServer() + healthz.DefaultHealthz() + + command := app.NewSchedulerCommand() hks := Server{ name: "scheduler", AlternativeName: "kube-scheduler", SimpleUsage: "scheduler", - Long: "Implements a Kubernetes scheduler. This will assign pods to kubelets based on capacity and constraints.", - Run: func(_ *Server, _ []string, stopCh <-chan struct{}) error { - return app.Run(s) - }, + Long: command.Long, } - s.AddFlags(hks.Flags()) + + serverFlags := hks.Flags() + serverFlags.AddFlagSet(command.Flags()) + + // FIXME this is here because hyperkube does its own flag parsing, and we need + // the command to know about the go flag set. Remove this once hyperkube is + // refactored to use cobra throughout. + command.Flags().AddGoFlagSet(flag.CommandLine) + + hks.Run = func(_ *Server, args []string, stopCh <-chan struct{}) error { + command.SetArgs(args) + return command.Execute() + } + return &hks } diff --git a/cmd/kubeadm/app/preflight/checks.go b/cmd/kubeadm/app/preflight/checks.go index 66c31a726caac..6afd5b31b5416 100644 --- a/cmd/kubeadm/app/preflight/checks.go +++ b/cmd/kubeadm/app/preflight/checks.go @@ -52,7 +52,7 @@ import ( "k8s.io/kubernetes/pkg/util/initsystem" versionutil "k8s.io/kubernetes/pkg/util/version" kubeadmversion "k8s.io/kubernetes/pkg/version" - schoptions "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" + schedulerapp "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app" "k8s.io/kubernetes/test/e2e_node/system" ) @@ -409,9 +409,9 @@ func (eac ExtraArgsCheck) Check() (warnings, errors []error) { warnings = append(warnings, argsCheck("kube-controller-manager", eac.ControllerManagerExtraArgs, flags)...) } if len(eac.SchedulerExtraArgs) > 0 { + command := schedulerapp.NewSchedulerCommand() flags := pflag.NewFlagSet("", pflag.ContinueOnError) - s := schoptions.NewSchedulerServer() - s.AddFlags(flags) + flags.AddFlagSet(command.Flags()) warnings = append(warnings, argsCheck("kube-scheduler", eac.SchedulerExtraArgs, flags)...) } return warnings, nil diff --git a/pkg/apis/componentconfig/types.go b/pkg/apis/componentconfig/types.go index e9d1b33a423cb..f4c218185b4a1 100644 --- a/pkg/apis/componentconfig/types.go +++ b/pkg/apis/componentconfig/types.go @@ -20,58 +20,107 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// ClientConnectionConfiguration contains details for constructing a client. +type ClientConnectionConfiguration struct { + // kubeConfigFile is the path to a kubeconfig file. + KubeConfigFile string + // acceptContentTypes defines the Accept header sent by clients when connecting to a server, overriding the + // default value of 'application/json'. This field will control all connections to the server used by a particular + // client. + AcceptContentTypes string + // contentType is the content type used when sending data to the server from this client. + ContentType string + // cps controls the number of queries per second allowed for this connection. + QPS float32 + // burst allows extra queries to accumulate when a client is exceeding its rate. + Burst int +} + +// SchedulerPolicyConfigMapKey defines the key of the element in the +// scheduler's policy ConfigMap that contains scheduler's policy config. +const SchedulerPolicyConfigMapKey string = "policy.cfg" + +// SchedulerPolicySource configures a means to obtain a scheduler Policy. One +// source field must be specified, and source fields are mutually exclusive. +type SchedulerPolicySource struct { + // File is a file policy source. + File *SchedulerPolicyFileSource + // ConfigMap is a config map policy source. + ConfigMap *SchedulerPolicyConfigMapSource +} + +// SchedulerPolicyFileSource is a policy serialized to disk and accessed via +// path. +type SchedulerPolicyFileSource struct { + // Path is the location of a serialized policy. + Path string +} + +// SchedulerPolicyConfigMapSource is a policy serialized into a config map value +// under the SchedulerPolicyConfigMapKey key. +type SchedulerPolicyConfigMapSource struct { + // Namespace is the namespace of the policy config map. + Namespace string + // Name is the name of hte policy config map. + Name string +} + +// SchedulerAlgorithmSource is the source of a scheduler algorithm. One source +// field must be specified, and source fields are mutually exclusive. +type SchedulerAlgorithmSource struct { + // Policy is a policy based algorithm source. + Policy *SchedulerPolicySource + // Provider is the name of a scheduling algorithm provider to use. + Provider *string +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type KubeSchedulerConfiguration struct { metav1.TypeMeta - // port is the port that the scheduler's http service runs on. - Port int32 - // address is the IP address to serve on. - Address string - // algorithmProvider is the scheduling algorithm provider to use. - AlgorithmProvider string - // policyConfigFile is the filepath to the scheduler policy configuration. - PolicyConfigFile string - // enableProfiling enables profiling via web interface. - EnableProfiling bool - // enableContentionProfiling enables lock contention profiling, if enableProfiling is true. - EnableContentionProfiling bool - // contentType is contentType of requests sent to apiserver. - ContentType string - // kubeAPIQPS is the QPS to use while talking with kubernetes apiserver. - KubeAPIQPS float32 - // kubeAPIBurst is the QPS burst to use while talking with kubernetes apiserver. - KubeAPIBurst int32 // schedulerName is name of the scheduler, used to select which pods // will be processed by this scheduler, based on pod's "spec.SchedulerName". SchedulerName string + // AlgorithmSource specifies the scheduler algorithm source. + AlgorithmSource SchedulerAlgorithmSource // RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule // corresponding to every RequiredDuringScheduling affinity rule. // HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100. HardPodAffinitySymmetricWeight int + + // LeaderElection defines the configuration of leader election client. + LeaderElection KubeSchedulerLeaderElectionConfiguration + + // ClientConnection specifies the kubeconfig file and client connection + // settings for the proxy server to use when communicating with the apiserver. + ClientConnection ClientConnectionConfiguration + // HealthzBindAddress is the IP address and port for the health check server to serve on, + // defaulting to 0.0.0.0:10251 + HealthzBindAddress string + // MetricsBindAddress is the IP address and port for the metrics server to + // serve on, defaulting to 0.0.0.0:10251. + MetricsBindAddress string + // EnableProfiling enables profiling via web interface on /debug/pprof + // handler. Profiling handlers will be handled by metrics server. + EnableProfiling bool + // EnableContentionProfiling enables lock contention profiling, if + // EnableProfiling is true. + EnableContentionProfiling bool + // Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity. // DEPRECATED: This is no longer used. FailureDomains string - // leaderElection defines the configuration of leader election client. - LeaderElection LeaderElectionConfiguration +} + +// KubeSchedulerLeaderElectionConfiguration expands LeaderElectionConfiguration +// to include scheduler specific configuration. +type KubeSchedulerLeaderElectionConfiguration struct { + LeaderElectionConfiguration // LockObjectNamespace defines the namespace of the lock object LockObjectNamespace string // LockObjectName defines the lock object name LockObjectName string - // PolicyConfigMapName is the name of the ConfigMap object that specifies - // the scheduler's policy config. If UseLegacyPolicyConfig is true, scheduler - // uses PolicyConfigFile. If UseLegacyPolicyConfig is false and - // PolicyConfigMapName is not empty, the ConfigMap object with this name must - // exist in PolicyConfigMapNamespace before scheduler initialization. - PolicyConfigMapName string - // PolicyConfigMapNamespace is the namespace where the above policy config map - // is located. If none is provided default system namespace ("kube-system") - // will be used. - PolicyConfigMapNamespace string - // UseLegacyPolicyConfig tells the scheduler to ignore Policy ConfigMap and - // to use PolicyConfigFile if available. - UseLegacyPolicyConfig bool } // LeaderElectionConfiguration defines the configuration of leader election diff --git a/pkg/apis/componentconfig/v1alpha1/defaults.go b/pkg/apis/componentconfig/v1alpha1/defaults.go index 59cadbe100a07..86d61c39e5a95 100644 --- a/pkg/apis/componentconfig/v1alpha1/defaults.go +++ b/pkg/apis/componentconfig/v1alpha1/defaults.go @@ -17,6 +17,8 @@ limitations under the License. package v1alpha1 import ( + "net" + "strconv" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,41 +33,63 @@ func addDefaultingFuncs(scheme *kruntime.Scheme) error { } func SetDefaults_KubeSchedulerConfiguration(obj *KubeSchedulerConfiguration) { - if obj.Port == 0 { - obj.Port = ports.SchedulerPort + if len(obj.SchedulerName) == 0 { + obj.SchedulerName = api.DefaultSchedulerName } - if obj.Address == "" { - obj.Address = "0.0.0.0" + + if obj.HardPodAffinitySymmetricWeight == 0 { + obj.HardPodAffinitySymmetricWeight = api.DefaultHardPodAffinitySymmetricWeight } - if obj.AlgorithmProvider == "" { - obj.AlgorithmProvider = "DefaultProvider" + + if obj.AlgorithmSource.Policy == nil && + (obj.AlgorithmSource.Provider == nil || len(*obj.AlgorithmSource.Provider) == 0) { + val := SchedulerDefaultProviderName + obj.AlgorithmSource.Provider = &val } - if obj.ContentType == "" { - obj.ContentType = "application/vnd.kubernetes.protobuf" + + if policy := obj.AlgorithmSource.Policy; policy != nil { + if policy.ConfigMap != nil && len(policy.ConfigMap.Namespace) == 0 { + obj.AlgorithmSource.Policy.ConfigMap.Namespace = api.NamespaceSystem + } } - if obj.KubeAPIQPS == 0 { - obj.KubeAPIQPS = 50.0 + + if host, port, err := net.SplitHostPort(obj.HealthzBindAddress); err == nil { + if len(host) == 0 { + host = "0.0.0.0" + } + obj.HealthzBindAddress = net.JoinHostPort(host, port) + } else { + obj.HealthzBindAddress = net.JoinHostPort("0.0.0.0", strconv.Itoa(ports.SchedulerPort)) } - if obj.KubeAPIBurst == 0 { - obj.KubeAPIBurst = 100 + + if host, port, err := net.SplitHostPort(obj.MetricsBindAddress); err == nil { + if len(host) == 0 { + host = "0.0.0.0" + } + obj.MetricsBindAddress = net.JoinHostPort(host, port) + } else { + obj.MetricsBindAddress = net.JoinHostPort("0.0.0.0", strconv.Itoa(ports.SchedulerPort)) } - if obj.SchedulerName == "" { - obj.SchedulerName = api.DefaultSchedulerName + + if len(obj.ClientConnection.ContentType) == 0 { + obj.ClientConnection.ContentType = "application/vnd.kubernetes.protobuf" } - if obj.HardPodAffinitySymmetricWeight == 0 { - obj.HardPodAffinitySymmetricWeight = api.DefaultHardPodAffinitySymmetricWeight + if obj.ClientConnection.QPS == 0.0 { + obj.ClientConnection.QPS = 50.0 } - if obj.FailureDomains == "" { - obj.FailureDomains = kubeletapis.DefaultFailureDomains + if obj.ClientConnection.Burst == 0 { + obj.ClientConnection.Burst = 100 } - if obj.LockObjectNamespace == "" { - obj.LockObjectNamespace = SchedulerDefaultLockObjectNamespace + + if len(obj.LeaderElection.LockObjectNamespace) == 0 { + obj.LeaderElection.LockObjectNamespace = SchedulerDefaultLockObjectNamespace } - if obj.LockObjectName == "" { - obj.LockObjectName = SchedulerDefaultLockObjectName + if len(obj.LeaderElection.LockObjectName) == 0 { + obj.LeaderElection.LockObjectName = SchedulerDefaultLockObjectName } - if obj.PolicyConfigMapNamespace == "" { - obj.PolicyConfigMapNamespace = api.NamespaceSystem + + if len(obj.FailureDomains) == 0 { + obj.FailureDomains = kubeletapis.DefaultFailureDomains } } diff --git a/pkg/apis/componentconfig/v1alpha1/types.go b/pkg/apis/componentconfig/v1alpha1/types.go index 64dd5fda89f7a..4414d5b193d14 100644 --- a/pkg/apis/componentconfig/v1alpha1/types.go +++ b/pkg/apis/componentconfig/v1alpha1/types.go @@ -20,57 +20,92 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// ClientConnectionConfiguration contains details for constructing a client. +type ClientConnectionConfiguration struct { + // kubeConfigFile is the path to a kubeconfig file. + KubeConfigFile string `json:"kubeconfig"` + // acceptContentTypes defines the Accept header sent by clients when connecting to a server, overriding the + // default value of 'application/json'. This field will control all connections to the server used by a particular + // client. + AcceptContentTypes string `json:"acceptContentTypes"` + // contentType is the content type used when sending data to the server from this client. + ContentType string `json:"contentType"` + // cps controls the number of queries per second allowed for this connection. + QPS float32 `json:"qps"` + // burst allows extra queries to accumulate when a client is exceeding its rate. + Burst int `json:"burst"` +} + +// SchedulerPolicySource configures a means to obtain a scheduler Policy. One +// source field must be specified, and source fields are mutually exclusive. +type SchedulerPolicySource struct { + // File is a file policy source. + File *SchedulerPolicyFileSource `json:"file,omitempty"` + // ConfigMap is a config map policy source. + ConfigMap *SchedulerPolicyConfigMapSource `json:"configMap,omitempty"` +} + +// SchedulerPolicyFileSource is a policy serialized to disk and accessed via +// path. +type SchedulerPolicyFileSource struct { + // Path is the location of a serialized policy. + Path string `json:"path"` +} + +// SchedulerPolicyConfigMapSource is a policy serialized into a config map value +// under the SchedulerPolicyConfigMapKey key. +type SchedulerPolicyConfigMapSource struct { + // Namespace is the namespace of the policy config map. + Namespace string `json:"namespace"` + // Name is the name of hte policy config map. + Name string `json:"name"` +} + +// SchedulerAlgorithmSource is the source of a scheduler algorithm. One source +// field must be specified, and source fields are mutually exclusive. +type SchedulerAlgorithmSource struct { + // Policy is a policy based algorithm source. + Policy *SchedulerPolicySource `json:"policy,omitempty"` + // Provider is the name of a scheduling algorithm provider to use. + Provider *string `json:"provider,omitempty"` +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type KubeSchedulerConfiguration struct { metav1.TypeMeta `json:",inline"` - // port is the port that the scheduler's http service runs on. - Port int `json:"port"` - // address is the IP address to serve on. - Address string `json:"address"` - // algorithmProvider is the scheduling algorithm provider to use. - AlgorithmProvider string `json:"algorithmProvider"` - // policyConfigFile is the filepath to the scheduler policy configuration. - PolicyConfigFile string `json:"policyConfigFile"` - // enableProfiling enables profiling via web interface. - EnableProfiling *bool `json:"enableProfiling"` - // enableContentionProfiling enables lock contention profiling, if enableProfiling is true. - EnableContentionProfiling bool `json:"enableContentionProfiling"` - // contentType is contentType of requests sent to apiserver. - ContentType string `json:"contentType"` - // kubeAPIQPS is the QPS to use while talking with kubernetes apiserver. - KubeAPIQPS float32 `json:"kubeAPIQPS"` - // kubeAPIBurst is the QPS burst to use while talking with kubernetes apiserver. - KubeAPIBurst int `json:"kubeAPIBurst"` - // schedulerName is name of the scheduler, used to select which pods + // SchedulerName is name of the scheduler, used to select which pods // will be processed by this scheduler, based on pod's "spec.SchedulerName". SchedulerName string `json:"schedulerName"` + // AlgorithmSource specifies the scheduler algorithm source. + AlgorithmSource SchedulerAlgorithmSource `json:"algorithmSource"` // RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule // corresponding to every RequiredDuringScheduling affinity rule. // HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100. HardPodAffinitySymmetricWeight int `json:"hardPodAffinitySymmetricWeight"` + + // LeaderElection defines the configuration of leader election client. + LeaderElection KubeSchedulerLeaderElectionConfiguration `json:"leaderElection"` + + // ClientConnection specifies the kubeconfig file and client connection + // settings for the proxy server to use when communicating with the apiserver. + ClientConnection ClientConnectionConfiguration `json:"clientConnection"` + // HealthzBindAddress is the IP address and port for the health check server to serve on, + // defaulting to 0.0.0.0:10251 + HealthzBindAddress string `json:"healthzBindAddress"` + // MetricsBindAddress is the IP address and port for the metrics server to + // serve on, defaulting to 0.0.0.0:10251. + MetricsBindAddress string `json:"metricsBindAddress"` + // EnableProfiling enables profiling via web interface on /debug/pprof + // handler. Profiling handlers will be handled by metrics server. + EnableProfiling bool `json:"enableProfiling"` + // EnableContentionProfiling enables lock contention profiling, if + // EnableProfiling is true. + EnableContentionProfiling bool `json:"enableContentionProfiling"` + // Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity. FailureDomains string `json:"failureDomains"` - // leaderElection defines the configuration of leader election client. - LeaderElection LeaderElectionConfiguration `json:"leaderElection"` - // LockObjectNamespace defines the namespace of the lock object - LockObjectNamespace string `json:"lockObjectNamespace"` - // LockObjectName defines the lock object name - LockObjectName string `json:"lockObjectName"` - // PolicyConfigMapName is the name of the ConfigMap object that specifies - // the scheduler's policy config. If UseLegacyPolicyConfig is true, scheduler - // uses PolicyConfigFile. If UseLegacyPolicyConfig is false and - // PolicyConfigMapName is not empty, the ConfigMap object with this name must - // exist in PolicyConfigMapNamespace before scheduler initialization. - PolicyConfigMapName string `json:"policyConfigMapName"` - // PolicyConfigMapNamespace is the namespace where the above policy config map - // is located. If none is provided default system namespace ("kube-system") - // will be used. - PolicyConfigMapNamespace string `json:"policyConfigMapNamespace"` - // UseLegacyPolicyConfig tells the scheduler to ignore Policy ConfigMap and - // to use PolicyConfigFile if available. - UseLegacyPolicyConfig bool `json:"useLegacyPolicyConfig"` } // LeaderElectionConfiguration defines the configuration of leader election @@ -101,10 +136,22 @@ type LeaderElectionConfiguration struct { ResourceLock string `json:"resourceLock"` } +// KubeSchedulerLeaderElectionConfiguration expands LeaderElectionConfiguration +// to include scheduler specific configuration. +type KubeSchedulerLeaderElectionConfiguration struct { + LeaderElectionConfiguration `json:",inline"` + // LockObjectNamespace defines the namespace of the lock object + LockObjectNamespace string `json:"lockObjectNamespace"` + // LockObjectName defines the lock object name + LockObjectName string `json:"lockObjectName"` +} + const ( // "kube-system" is the default scheduler lock object namespace SchedulerDefaultLockObjectNamespace string = "kube-system" // "kube-scheduler" is the default scheduler lock object name SchedulerDefaultLockObjectName = "kube-scheduler" + + SchedulerDefaultProviderName = "DefaultProvider" ) diff --git a/pkg/apis/componentconfig/zz_generated.deepcopy.go b/pkg/apis/componentconfig/zz_generated.deepcopy.go index 48c44ecad355a..51b29e460d6b2 100644 --- a/pkg/apis/componentconfig/zz_generated.deepcopy.go +++ b/pkg/apis/componentconfig/zz_generated.deepcopy.go @@ -36,6 +36,10 @@ func init() { // Deprecated: deepcopy registration will go away when static deepcopy is fully implemented. func RegisterDeepCopies(scheme *runtime.Scheme) error { return scheme.AddGeneratedDeepCopyFuncs( + conversion.GeneratedDeepCopyFunc{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error { + in.(*ClientConnectionConfiguration).DeepCopyInto(out.(*ClientConnectionConfiguration)) + return nil + }, InType: reflect.TypeOf(&ClientConnectionConfiguration{})}, conversion.GeneratedDeepCopyFunc{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error { in.(*GroupResource).DeepCopyInto(out.(*GroupResource)) return nil @@ -52,6 +56,10 @@ func RegisterDeepCopies(scheme *runtime.Scheme) error { in.(*KubeSchedulerConfiguration).DeepCopyInto(out.(*KubeSchedulerConfiguration)) return nil }, InType: reflect.TypeOf(&KubeSchedulerConfiguration{})}, + conversion.GeneratedDeepCopyFunc{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error { + in.(*KubeSchedulerLeaderElectionConfiguration).DeepCopyInto(out.(*KubeSchedulerLeaderElectionConfiguration)) + return nil + }, InType: reflect.TypeOf(&KubeSchedulerLeaderElectionConfiguration{})}, conversion.GeneratedDeepCopyFunc{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error { in.(*LeaderElectionConfiguration).DeepCopyInto(out.(*LeaderElectionConfiguration)) return nil @@ -64,6 +72,22 @@ func RegisterDeepCopies(scheme *runtime.Scheme) error { in.(*PortRangeVar).DeepCopyInto(out.(*PortRangeVar)) return nil }, InType: reflect.TypeOf(&PortRangeVar{})}, + conversion.GeneratedDeepCopyFunc{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error { + in.(*SchedulerAlgorithmSource).DeepCopyInto(out.(*SchedulerAlgorithmSource)) + return nil + }, InType: reflect.TypeOf(&SchedulerAlgorithmSource{})}, + conversion.GeneratedDeepCopyFunc{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error { + in.(*SchedulerPolicyConfigMapSource).DeepCopyInto(out.(*SchedulerPolicyConfigMapSource)) + return nil + }, InType: reflect.TypeOf(&SchedulerPolicyConfigMapSource{})}, + conversion.GeneratedDeepCopyFunc{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error { + in.(*SchedulerPolicyFileSource).DeepCopyInto(out.(*SchedulerPolicyFileSource)) + return nil + }, InType: reflect.TypeOf(&SchedulerPolicyFileSource{})}, + conversion.GeneratedDeepCopyFunc{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error { + in.(*SchedulerPolicySource).DeepCopyInto(out.(*SchedulerPolicySource)) + return nil + }, InType: reflect.TypeOf(&SchedulerPolicySource{})}, conversion.GeneratedDeepCopyFunc{Fn: func(in interface{}, out interface{}, c *conversion.Cloner) error { in.(*VolumeConfiguration).DeepCopyInto(out.(*VolumeConfiguration)) return nil @@ -71,6 +95,22 @@ func RegisterDeepCopies(scheme *runtime.Scheme) error { ) } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClientConnectionConfiguration) DeepCopyInto(out *ClientConnectionConfiguration) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClientConnectionConfiguration. +func (in *ClientConnectionConfiguration) DeepCopy() *ClientConnectionConfiguration { + if in == nil { + return nil + } + out := new(ClientConnectionConfiguration) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GroupResource) DeepCopyInto(out *GroupResource) { *out = *in @@ -172,7 +212,9 @@ func (in *KubeControllerManagerConfiguration) DeepCopyObject() runtime.Object { func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfiguration) { *out = *in out.TypeMeta = in.TypeMeta + in.AlgorithmSource.DeepCopyInto(&out.AlgorithmSource) out.LeaderElection = in.LeaderElection + out.ClientConnection = in.ClientConnection return } @@ -195,6 +237,23 @@ func (in *KubeSchedulerConfiguration) DeepCopyObject() runtime.Object { } } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KubeSchedulerLeaderElectionConfiguration) DeepCopyInto(out *KubeSchedulerLeaderElectionConfiguration) { + *out = *in + out.LeaderElectionConfiguration = in.LeaderElectionConfiguration + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KubeSchedulerLeaderElectionConfiguration. +func (in *KubeSchedulerLeaderElectionConfiguration) DeepCopy() *KubeSchedulerLeaderElectionConfiguration { + if in == nil { + return nil + } + out := new(KubeSchedulerLeaderElectionConfiguration) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LeaderElectionConfiguration) DeepCopyInto(out *LeaderElectionConfiguration) { *out = *in @@ -255,6 +314,106 @@ func (in *PortRangeVar) DeepCopy() *PortRangeVar { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchedulerAlgorithmSource) DeepCopyInto(out *SchedulerAlgorithmSource) { + *out = *in + if in.Policy != nil { + in, out := &in.Policy, &out.Policy + if *in == nil { + *out = nil + } else { + *out = new(SchedulerPolicySource) + (*in).DeepCopyInto(*out) + } + } + if in.Provider != nil { + in, out := &in.Provider, &out.Provider + if *in == nil { + *out = nil + } else { + *out = new(string) + **out = **in + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulerAlgorithmSource. +func (in *SchedulerAlgorithmSource) DeepCopy() *SchedulerAlgorithmSource { + if in == nil { + return nil + } + out := new(SchedulerAlgorithmSource) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchedulerPolicyConfigMapSource) DeepCopyInto(out *SchedulerPolicyConfigMapSource) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulerPolicyConfigMapSource. +func (in *SchedulerPolicyConfigMapSource) DeepCopy() *SchedulerPolicyConfigMapSource { + if in == nil { + return nil + } + out := new(SchedulerPolicyConfigMapSource) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchedulerPolicyFileSource) DeepCopyInto(out *SchedulerPolicyFileSource) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulerPolicyFileSource. +func (in *SchedulerPolicyFileSource) DeepCopy() *SchedulerPolicyFileSource { + if in == nil { + return nil + } + out := new(SchedulerPolicyFileSource) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchedulerPolicySource) DeepCopyInto(out *SchedulerPolicySource) { + *out = *in + if in.File != nil { + in, out := &in.File, &out.File + if *in == nil { + *out = nil + } else { + *out = new(SchedulerPolicyFileSource) + **out = **in + } + } + if in.ConfigMap != nil { + in, out := &in.ConfigMap, &out.ConfigMap + if *in == nil { + *out = nil + } else { + *out = new(SchedulerPolicyConfigMapSource) + **out = **in + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulerPolicySource. +func (in *SchedulerPolicySource) DeepCopy() *SchedulerPolicySource { + if in == nil { + return nil + } + out := new(SchedulerPolicySource) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VolumeConfiguration) DeepCopyInto(out *VolumeConfiguration) { *out = *in diff --git a/plugin/cmd/kube-scheduler/app/configurator.go b/plugin/cmd/kube-scheduler/app/configurator.go deleted file mode 100644 index 567a3bfd381fb..0000000000000 --- a/plugin/cmd/kube-scheduler/app/configurator.go +++ /dev/null @@ -1,196 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package app - -import ( - "fmt" - "io/ioutil" - "os" - - appsinformers "k8s.io/client-go/informers/apps/v1beta1" - coreinformers "k8s.io/client-go/informers/core/v1" - extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1" - policyinformers "k8s.io/client-go/informers/policy/v1beta1" - "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - utilfeature "k8s.io/apiserver/pkg/util/feature" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" - - clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/tools/record" - - "k8s.io/api/core/v1" - - "k8s.io/kubernetes/pkg/api/legacyscheme" - "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/plugin/pkg/scheduler" - _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" - schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" - latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest" - "k8s.io/kubernetes/plugin/pkg/scheduler/factory" - - "github.com/golang/glog" -) - -func createRecorder(kubecli *clientset.Clientset, s *options.SchedulerServer) record.EventRecorder { - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubecli.CoreV1().RESTClient()).Events("")}) - return eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: s.SchedulerName}) -} - -func createClients(s *options.SchedulerServer) (*clientset.Clientset, *clientset.Clientset, error) { - kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig) - if err != nil { - return nil, nil, fmt.Errorf("unable to build config from flags: %v", err) - } - - kubeconfig.ContentType = s.ContentType - // Override kubeconfig qps/burst settings from flags - kubeconfig.QPS = s.KubeAPIQPS - kubeconfig.Burst = int(s.KubeAPIBurst) - kubeClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "scheduler")) - if err != nil { - glog.Fatalf("Invalid API configuration: %v", err) - } - leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election")) - return kubeClient, leaderElectionClient, nil -} - -// CreateScheduler encapsulates the entire creation of a runnable scheduler. -func CreateScheduler( - s *options.SchedulerServer, - kubecli clientset.Interface, - nodeInformer coreinformers.NodeInformer, - podInformer coreinformers.PodInformer, - pvInformer coreinformers.PersistentVolumeInformer, - pvcInformer coreinformers.PersistentVolumeClaimInformer, - replicationControllerInformer coreinformers.ReplicationControllerInformer, - replicaSetInformer extensionsinformers.ReplicaSetInformer, - statefulSetInformer appsinformers.StatefulSetInformer, - serviceInformer coreinformers.ServiceInformer, - pdbInformer policyinformers.PodDisruptionBudgetInformer, - recorder record.EventRecorder, -) (*scheduler.Scheduler, error) { - configurator := factory.NewConfigFactory( - s.SchedulerName, - kubecli, - nodeInformer, - podInformer, - pvInformer, - pvcInformer, - replicationControllerInformer, - replicaSetInformer, - statefulSetInformer, - serviceInformer, - pdbInformer, - s.HardPodAffinitySymmetricWeight, - utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache), - ) - - // Rebuild the configurator with a default Create(...) method. - configurator = &schedulerConfigurator{ - configurator, - s.PolicyConfigFile, - s.AlgorithmProvider, - s.PolicyConfigMapName, - s.PolicyConfigMapNamespace, - s.UseLegacyPolicyConfig, - } - - return scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) { - cfg.Recorder = recorder - }) -} - -// schedulerConfigurator is an interface wrapper that provides a way to create -// a scheduler from a user provided config file or ConfigMap object. -type schedulerConfigurator struct { - scheduler.Configurator - policyFile string - algorithmProvider string - policyConfigMap string - policyConfigMapNamespace string - useLegacyPolicyConfig bool -} - -// getSchedulerPolicyConfig finds and decodes scheduler's policy config. If no -// such policy is found, it returns nil, nil. -func (sc schedulerConfigurator) getSchedulerPolicyConfig() (*schedulerapi.Policy, error) { - var configData []byte - var policyConfigMapFound bool - var policy schedulerapi.Policy - - // If not in legacy mode, try to find policy ConfigMap. - if !sc.useLegacyPolicyConfig && len(sc.policyConfigMap) != 0 { - namespace := sc.policyConfigMapNamespace - policyConfigMap, err := sc.GetClient().CoreV1().ConfigMaps(namespace).Get(sc.policyConfigMap, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("Error getting scheduler policy ConfigMap: %v.", err) - } - if policyConfigMap != nil { - var configString string - configString, policyConfigMapFound = policyConfigMap.Data[options.SchedulerPolicyConfigMapKey] - if !policyConfigMapFound { - return nil, fmt.Errorf("No element with key = '%v' is found in the ConfigMap 'Data'.", options.SchedulerPolicyConfigMapKey) - } - glog.V(5).Infof("Scheduler policy ConfigMap: %v", configString) - configData = []byte(configString) - } - } - - // If we are in legacy mode or ConfigMap name is empty, try to use policy - // config file. - if !policyConfigMapFound { - if _, err := os.Stat(sc.policyFile); err != nil { - // No config file is found. - return nil, nil - } - var err error - configData, err = ioutil.ReadFile(sc.policyFile) - if err != nil { - return nil, fmt.Errorf("unable to read policy config: %v", err) - } - } - - if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil { - return nil, fmt.Errorf("invalid configuration: %v", err) - } - return &policy, nil -} - -// Create implements the interface for the Configurator, hence it is exported -// even though the struct is not. -func (sc schedulerConfigurator) Create() (*scheduler.Config, error) { - policy, err := sc.getSchedulerPolicyConfig() - if err != nil { - return nil, err - } - // If no policy is found, create scheduler from algorithm provider. - if policy == nil { - if sc.Configurator != nil { - return sc.Configurator.CreateFromProvider(sc.algorithmProvider) - } - return nil, fmt.Errorf("Configurator was nil") - } - - return sc.CreateFromConfig(*policy) -} diff --git a/plugin/cmd/kube-scheduler/app/configurator_test.go b/plugin/cmd/kube-scheduler/app/configurator_test.go deleted file mode 100644 index c4bc1bd142451..0000000000000 --- a/plugin/cmd/kube-scheduler/app/configurator_test.go +++ /dev/null @@ -1,31 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package app - -import ( - "testing" -) - -func TestSchedulerConfiguratorFailure(t *testing.T) { - sc := &schedulerConfigurator{ - // policyfile and algorithm are intentionally undefined. - } - _, err := sc.Create() - if err == nil { - t.Fatalf("Expected error message when creating with incomplete configurator.") - } -} diff --git a/plugin/cmd/kube-scheduler/app/options/options.go b/plugin/cmd/kube-scheduler/app/options/options.go deleted file mode 100644 index bac92c6d578de..0000000000000 --- a/plugin/cmd/kube-scheduler/app/options/options.go +++ /dev/null @@ -1,97 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package options provides the scheduler flags -package options - -import ( - "fmt" - - utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/legacyscheme" - "k8s.io/kubernetes/pkg/apis/componentconfig" - "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" - "k8s.io/kubernetes/pkg/client/leaderelectionconfig" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" - "k8s.io/kubernetes/plugin/pkg/scheduler/factory" - - // add the kubernetes feature gates - _ "k8s.io/kubernetes/pkg/features" - // install the componentconfig api so we get its defaulting and conversion functions - _ "k8s.io/kubernetes/pkg/apis/componentconfig/install" - - "github.com/spf13/pflag" -) - -// SchedulerPolicyConfigMapKey defines the key of the element in the -// scheduler's policy ConfigMap that contains scheduler's policy config. -const SchedulerPolicyConfigMapKey string = "policy.cfg" - -// SchedulerServer has all the context and params needed to run a Scheduler -type SchedulerServer struct { - componentconfig.KubeSchedulerConfiguration - // Master is the address of the Kubernetes API server (overrides any - // value in kubeconfig). - Master string - // Kubeconfig is Path to kubeconfig file with authorization and master - // location information. - Kubeconfig string - // Dynamic configuration for scheduler features. -} - -// NewSchedulerServer creates a new SchedulerServer with default parameters -func NewSchedulerServer() *SchedulerServer { - versioned := &v1alpha1.KubeSchedulerConfiguration{} - legacyscheme.Scheme.Default(versioned) - cfg := componentconfig.KubeSchedulerConfiguration{} - legacyscheme.Scheme.Convert(versioned, &cfg, nil) - cfg.LeaderElection.LeaderElect = true - s := SchedulerServer{ - KubeSchedulerConfiguration: cfg, - } - return &s -} - -// AddFlags adds flags for a specific SchedulerServer to the specified FlagSet -func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) { - fs.Int32Var(&s.Port, "port", s.Port, "The port that the scheduler's http service runs on") - fs.StringVar(&s.Address, "address", s.Address, "The IP address to serve on (set to 0.0.0.0 for all interfaces)") - fs.StringVar(&s.AlgorithmProvider, "algorithm-provider", s.AlgorithmProvider, "The scheduling algorithm provider to use, one of: "+factory.ListAlgorithmProviders()) - fs.StringVar(&s.PolicyConfigFile, "policy-config-file", s.PolicyConfigFile, "File with scheduler policy configuration. This file is used if policy ConfigMap is not provided or --use-legacy-policy-config==true") - usage := fmt.Sprintf("Name of the ConfigMap object that contains scheduler's policy configuration. It must exist in the system namespace before scheduler initialization if --use-legacy-policy-config==false. The config must be provided as the value of an element in 'Data' map with the key='%v'", SchedulerPolicyConfigMapKey) - fs.StringVar(&s.PolicyConfigMapName, "policy-configmap", s.PolicyConfigMapName, usage) - fs.StringVar(&s.PolicyConfigMapNamespace, "policy-configmap-namespace", s.PolicyConfigMapNamespace, "The namespace where policy ConfigMap is located. The system namespace will be used if this is not provided or is empty.") - fs.BoolVar(&s.UseLegacyPolicyConfig, "use-legacy-policy-config", false, "When set to true, scheduler will ignore policy ConfigMap and uses policy config file") - fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") - fs.BoolVar(&s.EnableContentionProfiling, "contention-profiling", false, "Enable lock contention profiling, if profiling is enabled") - fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") - fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.") - fs.StringVar(&s.ContentType, "kube-api-content-type", s.ContentType, "Content type of requests sent to apiserver.") - fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver") - fs.Int32Var(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver") - fs.StringVar(&s.SchedulerName, "scheduler-name", s.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's \"spec.SchedulerName\".") - fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object.") - fs.StringVar(&s.LockObjectName, "lock-object-name", s.LockObjectName, "Define the name of the lock object.") - fs.IntVar(&s.HardPodAffinitySymmetricWeight, "hard-pod-affinity-symmetric-weight", api.DefaultHardPodAffinitySymmetricWeight, - "RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule corresponding "+ - "to every RequiredDuringScheduling affinity rule. --hard-pod-affinity-symmetric-weight represents the weight of implicit PreferredDuringScheduling affinity rule.") - fs.MarkDeprecated("hard-pod-affinity-symmetric-weight", "This option was moved to the policy configuration file") - fs.StringVar(&s.FailureDomains, "failure-domains", kubeletapis.DefaultFailureDomains, "Indicate the \"all topologies\" set for an empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.") - fs.MarkDeprecated("failure-domains", "Doesn't have any effect. Will be removed in future version.") - leaderelectionconfig.BindFlags(&s.LeaderElection, fs) - utilfeature.DefaultFeatureGate.AddFlag(fs) -} diff --git a/plugin/cmd/kube-scheduler/app/options/options_test.go b/plugin/cmd/kube-scheduler/app/options/options_test.go deleted file mode 100644 index bd6172ccb37d7..0000000000000 --- a/plugin/cmd/kube-scheduler/app/options/options_test.go +++ /dev/null @@ -1,103 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package options - -import ( - "reflect" - "testing" - "time" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/diff" - "k8s.io/kubernetes/pkg/apis/componentconfig" - - "github.com/spf13/pflag" -) - -func TestAddFlags(t *testing.T) { - f := pflag.NewFlagSet("addflagstest", pflag.ContinueOnError) - s := NewSchedulerServer() - s.AddFlags(f) - - args := []string{ - "--address=192.168.4.20", - "--algorithm-provider=FooProvider", - "--contention-profiling=true", - "--failure-domains=kubernetes.io/hostname", - "--hard-pod-affinity-symmetric-weight=0", - "--kube-api-burst=80", - "--kube-api-content-type=application/vnd.kubernetes.protobuf", - "--kube-api-qps=40.0", - "--kubeconfig=/foo/bar/kubeconfig", - "--leader-elect=true", - "--leader-elect-lease-duration=20s", - "--leader-elect-renew-deadline=15s", - "--leader-elect-resource-lock=endpoints", - "--leader-elect-retry-period=3s", - "--lock-object-name=test-lock-object-name", - "--lock-object-namespace=test-lock-object-ns", - "--master=192.168.4.20", - "--policy-config-file=/foo/bar/policyconfig", - "--policy-configmap=test-policy-configmap", - "--policy-configmap-namespace=test-policy-configmap-ns", - "--port=10000", - "--profiling=false", - "--scheduler-name=test-scheduler-name", - "--use-legacy-policy-config=true", - } - - f.Parse(args) - - expected := &SchedulerServer{ - KubeSchedulerConfiguration: componentconfig.KubeSchedulerConfiguration{ - Port: 10000, - Address: "192.168.4.20", - AlgorithmProvider: "FooProvider", - PolicyConfigFile: "/foo/bar/policyconfig", - EnableContentionProfiling: true, - EnableProfiling: false, - - ContentType: "application/vnd.kubernetes.protobuf", - KubeAPIQPS: 40.0, - KubeAPIBurst: 80, - SchedulerName: "test-scheduler-name", - LeaderElection: componentconfig.LeaderElectionConfiguration{ - ResourceLock: "endpoints", - LeaderElect: true, - LeaseDuration: metav1.Duration{Duration: 20 * time.Second}, - RenewDeadline: metav1.Duration{Duration: 15 * time.Second}, - RetryPeriod: metav1.Duration{Duration: 3 * time.Second}, - }, - - LockObjectNamespace: "test-lock-object-ns", - LockObjectName: "test-lock-object-name", - - PolicyConfigMapName: "test-policy-configmap", - PolicyConfigMapNamespace: "test-policy-configmap-ns", - UseLegacyPolicyConfig: true, - - HardPodAffinitySymmetricWeight: 0, - FailureDomains: "kubernetes.io/hostname", - }, - Kubeconfig: "/foo/bar/kubeconfig", - Master: "192.168.4.20", - } - - if !reflect.DeepEqual(expected, s) { - t.Errorf("Got different run options than expected.\nDifference detected on:\n%s", diff.ObjectReflectDiff(expected, s)) - } -} diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index 2805462795746..53b058b91a92f 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -18,37 +18,308 @@ limitations under the License. package app import ( + "errors" "fmt" + "io/ioutil" "net" "net/http" "net/http/pprof" "os" + "reflect" goruntime "runtime" "strconv" + "time" - "k8s.io/apiserver/pkg/server/healthz" - + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/server/healthz" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" + clientgoclientset "k8s.io/client-go/kubernetes" + clientset "k8s.io/client-go/kubernetes" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/api/legacyscheme" + "k8s.io/kubernetes/pkg/apis/componentconfig" + componentconfigv1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" + "k8s.io/kubernetes/pkg/client/leaderelectionconfig" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/features" + cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" + "k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/version" - "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" + "k8s.io/kubernetes/pkg/version/verflag" + "k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest" + "k8s.io/kubernetes/plugin/pkg/scheduler/factory" "github.com/golang/glog" - "github.com/prometheus/client_golang/prometheus" "github.com/spf13/cobra" "github.com/spf13/pflag" + + "github.com/prometheus/client_golang/prometheus" ) +// SchedulerServer has all the context and params needed to run a Scheduler +type Options struct { + // ConfigFile is the location of the scheduler server's configuration file. + ConfigFile string + + // config is the scheduler server's configuration object. + config *componentconfig.KubeSchedulerConfiguration + + scheme *runtime.Scheme + codecs serializer.CodecFactory + + // The fields below here are placeholders for flags that can't be directly + // mapped into componentconfig.KubeSchedulerConfiguration. + // + // TODO remove these fields once the deprecated flags are removed. + + // master is the address of the Kubernetes API server (overrides any + // value in kubeconfig). + master string + healthzAddress string + healthzPort int32 + policyConfigFile string + policyConfigMapName string + policyConfigMapNamespace string + useLegacyPolicyConfig bool + algorithmProvider string +} + +// AddFlags adds flags for a specific SchedulerServer to the specified FlagSet +func AddFlags(options *Options, fs *pflag.FlagSet) { + fs.StringVar(&options.ConfigFile, "config", options.ConfigFile, "The path to the configuration file.") + + // All flags below here are deprecated and will eventually be removed. + + fs.Int32Var(&options.healthzPort, "port", ports.SchedulerPort, "The port that the scheduler's http service runs on") + fs.StringVar(&options.healthzAddress, "address", options.healthzAddress, "The IP address to serve on (set to 0.0.0.0 for all interfaces)") + fs.StringVar(&options.algorithmProvider, "algorithm-provider", options.algorithmProvider, "The scheduling algorithm provider to use, one of: "+factory.ListAlgorithmProviders()) + fs.StringVar(&options.policyConfigFile, "policy-config-file", options.policyConfigFile, "File with scheduler policy configuration. This file is used if policy ConfigMap is not provided or --use-legacy-policy-config==true") + usage := fmt.Sprintf("Name of the ConfigMap object that contains scheduler's policy configuration. It must exist in the system namespace before scheduler initialization if --use-legacy-policy-config==false. The config must be provided as the value of an element in 'Data' map with the key='%v'", componentconfig.SchedulerPolicyConfigMapKey) + fs.StringVar(&options.policyConfigMapName, "policy-configmap", options.policyConfigMapName, usage) + fs.StringVar(&options.policyConfigMapNamespace, "policy-configmap-namespace", options.policyConfigMapNamespace, "The namespace where policy ConfigMap is located. The system namespace will be used if this is not provided or is empty.") + fs.BoolVar(&options.useLegacyPolicyConfig, "use-legacy-policy-config", false, "When set to true, scheduler will ignore policy ConfigMap and uses policy config file") + fs.BoolVar(&options.config.EnableProfiling, "profiling", options.config.EnableProfiling, "Enable profiling via web interface host:port/debug/pprof/") + fs.BoolVar(&options.config.EnableContentionProfiling, "contention-profiling", options.config.EnableContentionProfiling, "Enable lock contention profiling, if profiling is enabled") + fs.StringVar(&options.master, "master", options.master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") + fs.StringVar(&options.config.ClientConnection.KubeConfigFile, "kubeconfig", options.config.ClientConnection.KubeConfigFile, "Path to kubeconfig file with authorization and master location information.") + fs.StringVar(&options.config.ClientConnection.ContentType, "kube-api-content-type", options.config.ClientConnection.ContentType, "Content type of requests sent to apiserver.") + fs.Float32Var(&options.config.ClientConnection.QPS, "kube-api-qps", options.config.ClientConnection.QPS, "QPS to use while talking with kubernetes apiserver") + fs.IntVar(&options.config.ClientConnection.Burst, "kube-api-burst", options.config.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver") + fs.StringVar(&options.config.SchedulerName, "scheduler-name", options.config.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's \"spec.SchedulerName\".") + fs.StringVar(&options.config.LeaderElection.LockObjectNamespace, "lock-object-namespace", options.config.LeaderElection.LockObjectNamespace, "Define the namespace of the lock object.") + fs.StringVar(&options.config.LeaderElection.LockObjectName, "lock-object-name", options.config.LeaderElection.LockObjectName, "Define the name of the lock object.") + fs.IntVar(&options.config.HardPodAffinitySymmetricWeight, "hard-pod-affinity-symmetric-weight", options.config.HardPodAffinitySymmetricWeight, + "RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule corresponding "+ + "to every RequiredDuringScheduling affinity rule. --hard-pod-affinity-symmetric-weight represents the weight of implicit PreferredDuringScheduling affinity rule.") + fs.MarkDeprecated("hard-pod-affinity-symmetric-weight", "This option was moved to the policy configuration file") + fs.StringVar(&options.config.FailureDomains, "failure-domains", options.config.FailureDomains, "Indicate the \"all topologies\" set for an empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.") + fs.MarkDeprecated("failure-domains", "Doesn't have any effect. Will be removed in future version.") + leaderelectionconfig.BindFlags(&options.config.LeaderElection.LeaderElectionConfiguration, fs) + utilfeature.DefaultFeatureGate.AddFlag(fs) +} + +func NewOptions() (*Options, error) { + o := &Options{ + config: new(componentconfig.KubeSchedulerConfiguration), + } + + o.scheme = runtime.NewScheme() + o.codecs = serializer.NewCodecFactory(o.scheme) + + if err := componentconfig.AddToScheme(o.scheme); err != nil { + return nil, err + } + if err := componentconfigv1alpha1.AddToScheme(o.scheme); err != nil { + return nil, err + } + + return o, nil +} + +func (o *Options) Complete() error { + if len(o.ConfigFile) == 0 { + glog.Warning("WARNING: all flags than --config are deprecated. Please begin using a config file ASAP.") + o.applyDeprecatedHealthzAddressToConfig() + o.applyDeprecatedHealthzPortToConfig() + o.applyDeprecatedAlgorithmSourceOptionsToConfig() + } + + return nil +} + +// applyDeprecatedHealthzAddressToConfig sets o.config.HealthzBindAddress and +// o.config.MetricsBindAddress from flags passed on the command line based on +// the following rules: +// +// 1. If --address is empty, leave the config as-is. +// 2. Otherwise, use the value of --address for the address portion of +// o.config.HealthzBindAddress +func (o *Options) applyDeprecatedHealthzAddressToConfig() { + if len(o.healthzAddress) == 0 { + return + } + + _, port, err := net.SplitHostPort(o.config.HealthzBindAddress) + if err != nil { + glog.Fatalf("invalid healthz bind address %q: %v", o.config.HealthzBindAddress, err) + } + o.config.HealthzBindAddress = net.JoinHostPort(o.healthzAddress, port) + o.config.MetricsBindAddress = net.JoinHostPort(o.healthzAddress, port) +} + +// applyDeprecatedHealthzPortToConfig sets o.config.HealthzBindAddress and +// o.config.MetricsBindAddress from flags passed on the command line based on +// the following rules: +// +// 1. If --port is -1, disable the healthz server. +// 2. Otherwise, use the value of --port for the port portion of +// o.config.HealthzBindAddress +func (o *Options) applyDeprecatedHealthzPortToConfig() { + if o.healthzPort == -1 { + o.config.HealthzBindAddress = "" + return + } + + host, _, err := net.SplitHostPort(o.config.HealthzBindAddress) + if err != nil { + glog.Fatalf("invalid healthz bind address %q: %v", o.config.HealthzBindAddress, err) + } + o.config.HealthzBindAddress = net.JoinHostPort(host, strconv.Itoa(int(o.healthzPort))) + o.config.MetricsBindAddress = net.JoinHostPort(host, strconv.Itoa(int(o.healthzPort))) +} + +// applyDeprecatedAlgorithmSourceOptionsToConfig sets o.config.AlgorithmSource from +// flags passed on the command line in the following precedence order: +// +// 1. --use-legacy-policy-config to use a policy file. +// 2. --policy-configmap to use a policy config map value. +// 3. --algorithm-provider to use a named algorithm provider. +func (o *Options) applyDeprecatedAlgorithmSourceOptionsToConfig() { + switch { + case o.useLegacyPolicyConfig: + o.config.AlgorithmSource = componentconfig.SchedulerAlgorithmSource{ + Policy: &componentconfig.SchedulerPolicySource{ + File: &componentconfig.SchedulerPolicyFileSource{ + Path: o.policyConfigFile, + }, + }, + } + case len(o.policyConfigMapName) > 0: + o.config.AlgorithmSource = componentconfig.SchedulerAlgorithmSource{ + Policy: &componentconfig.SchedulerPolicySource{ + ConfigMap: &componentconfig.SchedulerPolicyConfigMapSource{ + Name: o.policyConfigMapName, + Namespace: o.policyConfigMapNamespace, + }, + }, + } + case len(o.algorithmProvider) > 0: + o.config.AlgorithmSource = componentconfig.SchedulerAlgorithmSource{ + Provider: &o.algorithmProvider, + } + } +} + +// Validate validates all the required options. +func (o *Options) Validate(args []string) error { + if len(args) != 0 { + return errors.New("no arguments are supported") + } + + return nil +} + +// loadConfigFromFile loads the contents of file and decodes it as a +// KubeSchedulerConfiguration object. +func (o *Options) loadConfigFromFile(file string) (*componentconfig.KubeSchedulerConfiguration, error) { + data, err := ioutil.ReadFile(file) + if err != nil { + return nil, err + } + + return o.loadConfig(data) +} + +// loadConfig decodes data as a KubeProxyConfiguration object. +func (o *Options) loadConfig(data []byte) (*componentconfig.KubeSchedulerConfiguration, error) { + configObj, gvk, err := o.codecs.UniversalDecoder().Decode(data, nil, nil) + if err != nil { + return nil, err + } + config, ok := configObj.(*componentconfig.KubeSchedulerConfiguration) + if !ok { + return nil, fmt.Errorf("got unexpected config type: %v", gvk) + } + return config, nil +} + +func (o *Options) ApplyDefaults(in *componentconfig.KubeSchedulerConfiguration) (*componentconfig.KubeSchedulerConfiguration, error) { + external, err := o.scheme.ConvertToVersion(in, componentconfigv1alpha1.SchemeGroupVersion) + if err != nil { + return nil, err + } + + o.scheme.Default(external) + + internal, err := o.scheme.ConvertToVersion(external, componentconfig.SchemeGroupVersion) + if err != nil { + return nil, err + } + + out := internal.(*componentconfig.KubeSchedulerConfiguration) + + return out, nil +} + +func (o *Options) Run() error { + config := o.config + + if len(o.ConfigFile) > 0 { + if c, err := o.loadConfigFromFile(o.ConfigFile); err != nil { + return err + } else { + config = c + } + } + + // Apply algorithms based on feature gates. + // TODO: make configurable? + algorithmprovider.ApplyFeatureGates() + + server, err := NewSchedulerServer(config, o.master) + if err != nil { + return err + } + + stop := make(chan struct{}) + return server.Run(stop) +} + // NewSchedulerCommand creates a *cobra.Command object with default parameters func NewSchedulerCommand() *cobra.Command { - s := options.NewSchedulerServer() - s.AddFlags(pflag.CommandLine) + opts, err := NewOptions() + if err != nil { + glog.Fatalf("unable to initialize command options: %v", err) + } + cmd := &cobra.Command{ Use: "kube-scheduler", Long: `The Kubernetes scheduler is a policy-rich, topology-aware, @@ -59,133 +330,373 @@ constraints, affinity and anti-affinity specifications, data locality, inter-wor interference, deadlines, and so on. Workload-specific requirements will be exposed through the API as necessary.`, Run: func(cmd *cobra.Command, args []string) { + verflag.PrintAndExitIfRequested() + cmdutil.CheckErr(opts.Complete()) + cmdutil.CheckErr(opts.Validate(args)) + cmdutil.CheckErr(opts.Run()) }, } + opts.config, err = opts.ApplyDefaults(opts.config) + if err != nil { + glog.Fatalf("unable to apply config defaults: %v", err) + } + + flags := cmd.Flags() + AddFlags(opts, flags) + + cmd.MarkFlagFilename("config", "yaml", "yml", "json") + return cmd } -// Run runs the specified SchedulerServer. This should never exit. -func Run(s *options.SchedulerServer) error { - // To help debugging, immediately log version - glog.Infof("Version: %+v", version.Get()) +// SchedulerServer represents all the parameters required to start the +// Kubernetes scheduler server. +type SchedulerServer struct { + SchedulerName string + Client clientset.Interface + InformerFactory informers.SharedInformerFactory + PodInformer coreinformers.PodInformer + AlgorithmSource componentconfig.SchedulerAlgorithmSource + HardPodAffinitySymmetricWeight int + EventClient v1core.EventsGetter + Recorder record.EventRecorder + Broadcaster record.EventBroadcaster + // LeaderElection is optional. + LeaderElection *leaderelection.LeaderElectionConfig + // HealthzServer is optional. + HealthzServer *http.Server + // MetricsServer is optional. + MetricsServer *http.Server +} - kubeClient, leaderElectionClient, err := createClients(s) - if err != nil { - return fmt.Errorf("unable to create kube client: %v", err) +// NewSchedulerServer creates a runnable SchedulerServer from configuration. +func NewSchedulerServer(config *componentconfig.KubeSchedulerConfiguration, master string) (*SchedulerServer, error) { + if config == nil { + return nil, errors.New("config is required") } - recorder := createRecorder(kubeClient, s) - - informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) - // cache only non-terminal pods - - podInformer := factory.NewPodInformer(kubeClient, 0, s.SchedulerName) - // Apply algorithms based on feature gates. - algorithmprovider.ApplyFeatureGates() + // Configz registration. + if c, err := configz.New("componentconfig"); err == nil { + c.Set(config) + } else { + return nil, fmt.Errorf("unable to register configz: %s", err) + } - sched, err := CreateScheduler( - s, - kubeClient, - informerFactory.Core().V1().Nodes(), - podInformer, - informerFactory.Core().V1().PersistentVolumes(), - informerFactory.Core().V1().PersistentVolumeClaims(), - informerFactory.Core().V1().ReplicationControllers(), - informerFactory.Extensions().V1beta1().ReplicaSets(), - informerFactory.Apps().V1beta1().StatefulSets(), - informerFactory.Core().V1().Services(), - informerFactory.Policy().V1beta1().PodDisruptionBudgets(), - recorder, - ) + // Prepare some Kube clients. + client, leaderElectionClient, eventClient, err := createClients(config.ClientConnection, master) if err != nil { - return fmt.Errorf("error creating scheduler: %v", err) + return nil, err } - if s.Port != -1 { - go startHTTP(s) - } + // Prepare event clients. + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: config.SchedulerName}) - stop := make(chan struct{}) - defer close(stop) - go podInformer.Informer().Run(stop) - informerFactory.Start(stop) - // Waiting for all cache to sync before scheduling. - informerFactory.WaitForCacheSync(stop) - controller.WaitForCacheSync("scheduler", stop, podInformer.Informer().HasSynced) + // Set up leader election if enabled. + var leaderElectionConfig *leaderelection.LeaderElectionConfig + if config.LeaderElection.LeaderElect { + leaderElectionConfig, err = makeLeaderElectionConfig(config.LeaderElection, leaderElectionClient, recorder) + if err != nil { + return nil, err + } + } - run := func(stopCh <-chan struct{}) { - sched.Run() - <-stopCh + // Prepare a healthz server. If the metrics bind address is the same as the + // healthz bind address, consolidate the servers into one. + var healthzServer *http.Server + if len(config.HealthzBindAddress) != 0 { + healthzServer = makeHealthzServer(config) } - if !s.LeaderElection.LeaderElect { - run(stop) - return fmt.Errorf("finished without leader elect") + // Prepare a separate metrics server only if the bind address differs from the + // healthz bind address. + var metricsServer *http.Server + if len(config.MetricsBindAddress) > 0 && config.HealthzBindAddress != config.MetricsBindAddress { + metricsServer = makeMetricsServer(config) } - id, err := os.Hostname() + return &SchedulerServer{ + SchedulerName: config.SchedulerName, + Client: client, + InformerFactory: informers.NewSharedInformerFactory(client, 0), + PodInformer: factory.NewPodInformer(client, 0, config.SchedulerName), + AlgorithmSource: config.AlgorithmSource, + HardPodAffinitySymmetricWeight: config.HardPodAffinitySymmetricWeight, + EventClient: eventClient, + Recorder: recorder, + Broadcaster: eventBroadcaster, + LeaderElection: leaderElectionConfig, + HealthzServer: healthzServer, + MetricsServer: metricsServer, + }, nil +} + +// makeLeaderElectionConfig builds a leader election configuration. It will +// create a new resource lock associated with the configuration. +func makeLeaderElectionConfig(config componentconfig.KubeSchedulerLeaderElectionConfiguration, client clientset.Interface, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) { + hostname, err := os.Hostname() if err != nil { - return fmt.Errorf("unable to get hostname: %v", err) + return nil, fmt.Errorf("unable to get hostname: %v", err) } - rl, err := resourcelock.New(s.LeaderElection.ResourceLock, - s.LockObjectNamespace, - s.LockObjectName, - leaderElectionClient.CoreV1(), + rl, err := resourcelock.New(config.ResourceLock, + config.LockObjectNamespace, + config.LockObjectName, + client.CoreV1(), resourcelock.ResourceLockConfig{ - Identity: id, + Identity: hostname, EventRecorder: recorder, }) if err != nil { - return fmt.Errorf("error creating lock: %v", err) + return nil, fmt.Errorf("couldn't create resource lock: %v", err) } - leaderElector, err := leaderelection.NewLeaderElector( - leaderelection.LeaderElectionConfig{ - Lock: rl, - LeaseDuration: s.LeaderElection.LeaseDuration.Duration, - RenewDeadline: s.LeaderElection.RenewDeadline.Duration, - RetryPeriod: s.LeaderElection.RetryPeriod.Duration, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: run, - OnStoppedLeading: func() { - utilruntime.HandleError(fmt.Errorf("lost master")) - }, - }, - }) - if err != nil { - return err - } - - leaderElector.Run() - - return fmt.Errorf("lost lease") + return &leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: config.LeaseDuration.Duration, + RenewDeadline: config.RenewDeadline.Duration, + RetryPeriod: config.RetryPeriod.Duration, + }, nil } -func startHTTP(s *options.SchedulerServer) { +// makeHealthzServer creates a healthz server from the config, and will also +// embed the metrics handler if the healthz and metrics address configurations +// are the same. +func makeHealthzServer(config *componentconfig.KubeSchedulerConfiguration) *http.Server { mux := http.NewServeMux() healthz.InstallHandler(mux) - if s.EnableProfiling { + if config.HealthzBindAddress == config.MetricsBindAddress { + configz.InstallHandler(mux) + mux.Handle("/metrics", prometheus.Handler()) + } + if config.EnableProfiling { mux.HandleFunc("/debug/pprof/", pprof.Index) mux.HandleFunc("/debug/pprof/profile", pprof.Profile) mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) - if s.EnableContentionProfiling { + if config.EnableContentionProfiling { goruntime.SetBlockProfileRate(1) } } - if c, err := configz.New("componentconfig"); err == nil { - c.Set(s.KubeSchedulerConfiguration) - } else { - glog.Errorf("unable to register configz: %s", err) + return &http.Server{ + Addr: config.HealthzBindAddress, + Handler: mux, } +} + +// makeMetricsServer builds a metrics server from the config. +func makeMetricsServer(config *componentconfig.KubeSchedulerConfiguration) *http.Server { + mux := http.NewServeMux() configz.InstallHandler(mux) mux.Handle("/metrics", prometheus.Handler()) - - server := &http.Server{ - Addr: net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))), + if config.EnableProfiling { + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + if config.EnableContentionProfiling { + goruntime.SetBlockProfileRate(1) + } + } + return &http.Server{ + Addr: config.MetricsBindAddress, Handler: mux, } - glog.Fatal(server.ListenAndServe()) +} + +// createClients creates a kube client and an event client from the given config and masterOverride. +// TODO remove masterOverride when CLI flags are removed. +func createClients(config componentconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, clientset.Interface, v1core.EventsGetter, error) { + if len(config.KubeConfigFile) == 0 && len(masterOverride) == 0 { + glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.") + } + + // This creates a client, first loading any specified kubeconfig + // file, and then overriding the Master flag, if non-empty. + kubeConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + &clientcmd.ClientConfigLoadingRules{ExplicitPath: config.KubeConfigFile}, + &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterOverride}}).ClientConfig() + if err != nil { + return nil, nil, nil, err + } + + kubeConfig.AcceptContentTypes = config.AcceptContentTypes + kubeConfig.ContentType = config.ContentType + kubeConfig.QPS = config.QPS + //TODO make config struct use int instead of int32? + kubeConfig.Burst = int(config.Burst) + + client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "scheduler")) + if err != nil { + return nil, nil, nil, err + } + + leaderElectionClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "leader-election")) + if err != nil { + return nil, nil, nil, err + } + + eventClient, err := clientgoclientset.NewForConfig(kubeConfig) + if err != nil { + return nil, nil, nil, err + } + + return client, leaderElectionClient, eventClient.CoreV1(), nil +} + +// Run runs the SchedulerServer. This should never exit. +func (s *SchedulerServer) Run(stop chan struct{}) error { + // To help debugging, immediately log version + glog.Infof("Version: %+v", version.Get()) + + // Build a scheduler config from the provided algorithm source. + schedulerConfig, err := s.SchedulerConfig() + if err != nil { + return err + } + + // Create the scheduler. + sched := scheduler.NewFromConfig(schedulerConfig) + + // Prepare the event broadcaster. + if !reflect.ValueOf(s.Broadcaster).IsNil() && !reflect.ValueOf(s.EventClient).IsNil() { + s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")}) + } + + // Start up the healthz server. + if s.HealthzServer != nil { + go wait.Until(func() { + glog.Infof("starting healthz server on %v", s.HealthzServer.Addr) + err := s.HealthzServer.ListenAndServe() + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to start healthz server: %v", err)) + } + }, 5*time.Second, stop) + } + + // Start up the metrics server. + if s.MetricsServer != nil { + go wait.Until(func() { + glog.Infof("starting metrics server on %v", s.MetricsServer.Addr) + err := s.MetricsServer.ListenAndServe() + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to start metrics server: %v", err)) + } + }, 5*time.Second, stop) + } + + // Start all informers. + go s.PodInformer.Informer().Run(stop) + s.InformerFactory.Start(stop) + + // Wait for all caches to sync before scheduling. + s.InformerFactory.WaitForCacheSync(stop) + controller.WaitForCacheSync("scheduler", stop, s.PodInformer.Informer().HasSynced) + + // Prepare a reusable run function. + run := func(stopCh <-chan struct{}) { + sched.Run() + <-stopCh + } + + // If leader election is enabled, run via LeaderElector until done and exit. + if s.LeaderElection != nil { + s.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ + OnStartedLeading: run, + OnStoppedLeading: func() { + utilruntime.HandleError(fmt.Errorf("lost master")) + }, + } + leaderElector, err := leaderelection.NewLeaderElector(*s.LeaderElection) + if err != nil { + return fmt.Errorf("couldn't create leader elector: %v", err) + } + + leaderElector.Run() + + return fmt.Errorf("lost lease") + } + + // Leader election is disabled, so run inline until done. + run(stop) + return fmt.Errorf("finished without leader elect") +} + +// SchedulerConfig creates the scheduler configuration. This is exposed for use +// by tests. +func (s *SchedulerServer) SchedulerConfig() (*scheduler.Config, error) { + // Set up the configurator which can create schedulers from configs. + configurator := factory.NewConfigFactory( + s.SchedulerName, + s.Client, + s.InformerFactory.Core().V1().Nodes(), + s.PodInformer, + s.InformerFactory.Core().V1().PersistentVolumes(), + s.InformerFactory.Core().V1().PersistentVolumeClaims(), + s.InformerFactory.Core().V1().ReplicationControllers(), + s.InformerFactory.Extensions().V1beta1().ReplicaSets(), + s.InformerFactory.Apps().V1beta1().StatefulSets(), + s.InformerFactory.Core().V1().Services(), + s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(), + s.HardPodAffinitySymmetricWeight, + utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache), + ) + + source := s.AlgorithmSource + var config *scheduler.Config + switch { + case source.Provider != nil: + // Create the config from a named algorithm provider. + sc, err := configurator.CreateFromProvider(*source.Provider) + if err != nil { + return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err) + } + config = sc + case source.Policy != nil: + // Create the config from a user specified policy source. + policy := &schedulerapi.Policy{} + switch { + case source.Policy.File != nil: + // Use a policy serialized in a file. + policyFile := source.Policy.File.Path + _, err := os.Stat(policyFile) + if err != nil { + return nil, fmt.Errorf("missing policy config file %s", policyFile) + } + data, err := ioutil.ReadFile(policyFile) + if err != nil { + return nil, fmt.Errorf("couldn't read policy config: %v", err) + } + err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy) + if err != nil { + return nil, fmt.Errorf("invalid policy: %v", err) + } + case source.Policy.ConfigMap != nil: + // Use a policy serialized in a config map value. + policyRef := source.Policy.ConfigMap + policyConfigMap, err := s.Client.CoreV1().ConfigMaps(policyRef.Namespace).Get(policyRef.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("couldn't get policy config map %s/%s: %v", policyRef.Namespace, policyRef.Name, err) + } + data, found := policyConfigMap.Data[componentconfig.SchedulerPolicyConfigMapKey] + if !found { + return nil, fmt.Errorf("missing policy config map value at key %q", componentconfig.SchedulerPolicyConfigMapKey) + } + err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy) + if err != nil { + return nil, fmt.Errorf("invalid policy: %v", err) + } + } + sc, err := configurator.CreateFromConfig(*policy) + if err != nil { + return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err) + } + config = sc + default: + return nil, fmt.Errorf("unsupported algorithm source: %v", source) + } + // Additional tweaks to the config produced by the configurator. + config.Recorder = s.Recorder + return config, nil } diff --git a/plugin/cmd/kube-scheduler/scheduler.go b/plugin/cmd/kube-scheduler/scheduler.go index e2ea5b4b2964c..07ab2ca116853 100644 --- a/plugin/cmd/kube-scheduler/scheduler.go +++ b/plugin/cmd/kube-scheduler/scheduler.go @@ -17,27 +17,31 @@ limitations under the License. package main import ( - "k8s.io/apiserver/pkg/util/flag" - "k8s.io/apiserver/pkg/util/logs" - "k8s.io/kubernetes/pkg/version/verflag" - "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app" - "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" + goflag "flag" + "os" - "github.com/golang/glog" "github.com/spf13/pflag" + + utilflag "k8s.io/apiserver/pkg/util/flag" + "k8s.io/apiserver/pkg/util/logs" + _ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration + _ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration + "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app" ) func main() { - s := options.NewSchedulerServer() - s.AddFlags(pflag.CommandLine) - - flag.InitFlags() + command := app.NewSchedulerCommand() + + // TODO: once we switch everything over to Cobra commands, we can go back to calling + // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the + // normalize func and add the go flag set by hand. + pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc) + pflag.CommandLine.AddGoFlagSet(goflag.CommandLine) + // utilflag.InitFlags() logs.InitLogs() defer logs.FlushLogs() - verflag.PrintAndExitIfRequested() - - if err := app.Run(s); err != nil { - glog.Fatalf("scheduler app failed to run: %v", err) + if err := command.Execute(); err != nil { + os.Exit(1) } } diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 0470504787327..df18fba450c25 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -151,6 +151,14 @@ func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Schedul return s, nil } +// NewFromConfig returns a new scheduler using the provided Config. +func NewFromConfig(config *Config) *Scheduler { + metrics.Register() + return &Scheduler{ + config: config, + } +} + // Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately. func (sched *Scheduler) Run() { if !sched.config.WaitForCacheSync() { diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index b24d952d6b05d..9a6a8322c3373 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -43,9 +43,9 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/apis/componentconfig" "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app" - "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" + schedulerapp "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app" "k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" @@ -107,7 +107,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { policyConfigMap := v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceSystem, Name: configPolicyName}, Data: map[string]string{ - options.SchedulerPolicyConfigMapKey: `{ + componentconfig.SchedulerPolicyConfigMapKey: `{ "kind" : "Policy", "apiVersion" : "v1", "predicates" : [ @@ -127,29 +127,34 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.CoreV1().RESTClient()).Events("")}) - ss := options.NewSchedulerServer() - ss.HardPodAffinitySymmetricWeight = v1.DefaultHardPodAffinitySymmetricWeight - ss.PolicyConfigMapName = configPolicyName - sched, err := app.CreateScheduler(ss, clientSet, - informerFactory.Core().V1().Nodes(), - informerFactory.Core().V1().Pods(), - informerFactory.Core().V1().PersistentVolumes(), - informerFactory.Core().V1().PersistentVolumeClaims(), - informerFactory.Core().V1().ReplicationControllers(), - informerFactory.Extensions().V1beta1().ReplicaSets(), - informerFactory.Apps().V1beta1().StatefulSets(), - informerFactory.Core().V1().Services(), - informerFactory.Policy().V1beta1().PodDisruptionBudgets(), - eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}), - ) + + ss := &schedulerapp.SchedulerServer{ + SchedulerName: v1.DefaultSchedulerName, + AlgorithmSource: componentconfig.SchedulerAlgorithmSource{ + Policy: &componentconfig.SchedulerPolicySource{ + ConfigMap: &componentconfig.SchedulerPolicyConfigMapSource{ + Namespace: policyConfigMap.Namespace, + Name: policyConfigMap.Name, + }, + }, + }, + HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, + Client: clientSet, + InformerFactory: informerFactory, + PodInformer: factory.NewPodInformer(clientSet, 0, v1.DefaultSchedulerName), + EventClient: clientSet.CoreV1(), + Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}), + Broadcaster: eventBroadcaster, + } + + config, err := ss.SchedulerConfig() if err != nil { - t.Fatalf("Error creating scheduler: %v", err) + t.Fatalf("couldn't make scheduler config: %v", err) } - defer close(sched.Config().StopEverything) // Verify that the config is applied correctly. - schedPredicates := sched.Config().Algorithm.Predicates() - schedPrioritizers := sched.Config().Algorithm.Prioritizers() + schedPredicates := config.Algorithm.Predicates() + schedPrioritizers := config.Algorithm.Prioritizers() // Includes one mandatory predicates. if len(schedPredicates) != 3 || len(schedPrioritizers) != 2 { t.Errorf("Unexpected number of predicates or priority functions. Number of predicates: %v, number of prioritizers: %v", len(schedPredicates), len(schedPrioritizers)) @@ -180,83 +185,31 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.CoreV1().RESTClient()).Events("")}) - ss := options.NewSchedulerServer() - ss.PolicyConfigMapName = "non-existent-config" - - _, err := app.CreateScheduler(ss, clientSet, - informerFactory.Core().V1().Nodes(), - informerFactory.Core().V1().Pods(), - informerFactory.Core().V1().PersistentVolumes(), - informerFactory.Core().V1().PersistentVolumeClaims(), - informerFactory.Core().V1().ReplicationControllers(), - informerFactory.Extensions().V1beta1().ReplicaSets(), - informerFactory.Apps().V1beta1().StatefulSets(), - informerFactory.Core().V1().Services(), - informerFactory.Policy().V1beta1().PodDisruptionBudgets(), - eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}), - ) + ss := &schedulerapp.SchedulerServer{ + SchedulerName: v1.DefaultSchedulerName, + AlgorithmSource: componentconfig.SchedulerAlgorithmSource{ + Policy: &componentconfig.SchedulerPolicySource{ + ConfigMap: &componentconfig.SchedulerPolicyConfigMapSource{ + Namespace: "non-existent-config", + Name: "non-existent-config", + }, + }, + }, + HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, + Client: clientSet, + InformerFactory: informerFactory, + PodInformer: factory.NewPodInformer(clientSet, 0, v1.DefaultSchedulerName), + EventClient: clientSet.CoreV1(), + Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}), + Broadcaster: eventBroadcaster, + } + _, err := ss.SchedulerConfig() if err == nil { t.Fatalf("Creation of scheduler didn't fail while the policy ConfigMap didn't exist.") } } -// TestSchedulerCreationInLegacyMode ensures that creation of the scheduler -// works fine when legacy mode is enabled. -func TestSchedulerCreationInLegacyMode(t *testing.T) { - _, s, closeFn := framework.RunAMaster(nil) - defer closeFn() - - ns := framework.CreateTestingNamespace("configmap", s, t) - defer framework.DeleteTestingNamespace(ns, s, t) - - clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}}) - defer clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{}) - informerFactory := informers.NewSharedInformerFactory(clientSet, 0) - - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.CoreV1().RESTClient()).Events("")}) - - ss := options.NewSchedulerServer() - ss.HardPodAffinitySymmetricWeight = v1.DefaultHardPodAffinitySymmetricWeight - ss.PolicyConfigMapName = "non-existent-configmap" - ss.UseLegacyPolicyConfig = true - - sched, err := app.CreateScheduler(ss, clientSet, - informerFactory.Core().V1().Nodes(), - informerFactory.Core().V1().Pods(), - informerFactory.Core().V1().PersistentVolumes(), - informerFactory.Core().V1().PersistentVolumeClaims(), - informerFactory.Core().V1().ReplicationControllers(), - informerFactory.Extensions().V1beta1().ReplicaSets(), - informerFactory.Apps().V1beta1().StatefulSets(), - informerFactory.Core().V1().Services(), - informerFactory.Policy().V1beta1().PodDisruptionBudgets(), - eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}), - ) - if err != nil { - t.Fatalf("Creation of scheduler in legacy mode failed: %v", err) - } - informerFactory.Start(sched.Config().StopEverything) - defer close(sched.Config().StopEverything) - sched.Run() - - _, err = createNode(clientSet, "test-node", nil) - if err != nil { - t.Fatalf("Failed to create node: %v", err) - } - pod, err := createPausePodWithResource(clientSet, "test-pod", "configmap", nil) - if err != nil { - t.Fatalf("Failed to create pod: %v", err) - } - err = waitForPodToSchedule(clientSet, pod) - if err != nil { - t.Errorf("Failed to schedule a pod: %v", err) - } else { - t.Logf("Pod got scheduled on a node.") - } -} - func TestUnschedulableNodes(t *testing.T) { context := initTest(t, "unschedulable-nodes") defer cleanupTest(t, context)