From acc66ede1afe36bb9a151b0f69bab71687ea916e Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Tue, 12 Nov 2019 14:27:28 -0500 Subject: [PATCH] Pass InformerFactory instead of indivisual informers in scheduler configuration logic --- pkg/scheduler/BUILD | 6 +- pkg/scheduler/algorithm_factory.go | 28 +-- .../defaults/register_predicates.go | 37 +++- .../defaults/register_priorities.go | 15 +- pkg/scheduler/factory.go | 160 +++--------------- pkg/scheduler/factory_test.go | 54 +++--- pkg/scheduler/scheduler.go | 84 +++++---- 7 files changed, 143 insertions(+), 241 deletions(-) diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 8055bc6f4dab..44473230159f 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -44,12 +44,8 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", - "//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library", - "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", - "//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", @@ -71,6 +67,7 @@ go_test( deps = [ "//pkg/api/testing:go_default_library", "//pkg/controller/volume/scheduling:go_default_library", + "//pkg/features:go_default_library", "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/priorities:go_default_library", @@ -99,6 +96,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", diff --git a/pkg/scheduler/algorithm_factory.go b/pkg/scheduler/algorithm_factory.go index 87b14f844943..0bab0aafbcaf 100644 --- a/pkg/scheduler/algorithm_factory.go +++ b/pkg/scheduler/algorithm_factory.go @@ -25,10 +25,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" - appslisters "k8s.io/client-go/listers/apps/v1" - corelisters "k8s.io/client-go/listers/core/v1" - policylisters "k8s.io/client-go/listers/policy/v1beta1" - storagelisters "k8s.io/client-go/listers/storage/v1" + "k8s.io/client-go/informers" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -45,17 +42,8 @@ import ( // PluginFactoryArgs are passed to all plugin factory functions. type PluginFactoryArgs struct { - NodeInfoLister schedulerlisters.NodeInfoLister - PodLister schedulerlisters.PodLister - ServiceLister corelisters.ServiceLister - ControllerLister corelisters.ReplicationControllerLister - ReplicaSetLister appslisters.ReplicaSetLister - StatefulSetLister appslisters.StatefulSetLister - PDBLister policylisters.PodDisruptionBudgetLister - CSINodeLister storagelisters.CSINodeLister - PVLister corelisters.PersistentVolumeLister - PVCLister corelisters.PersistentVolumeClaimLister - StorageClassLister storagelisters.StorageClassLister + SharedLister schedulerlisters.SharedLister + InformerFactory informers.SharedInformerFactory VolumeBinder *volumebinder.VolumeBinder HardPodAffinitySymmetricWeight int32 } @@ -280,9 +268,9 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy, pluginArgs predicateFactory = func(args PluginFactoryArgs) predicates.FitPredicate { predicate, precomputationFunction := predicates.NewServiceAffinityPredicate( - args.NodeInfoLister, - args.PodLister, - args.ServiceLister, + args.SharedLister.NodeInfos(), + args.SharedLister.Pods(), + args.InformerFactory.Core().V1().Services().Lister(), pluginArgs.ServiceAffinityArgs.AffinityLabels, ) @@ -404,8 +392,8 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy, configPr pcf = &PriorityConfigFactory{ MapReduceFunction: func(args PluginFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) { return priorities.NewServiceAntiAffinityPriority( - args.PodLister, - args.ServiceLister, + args.SharedLister.Pods(), + args.InformerFactory.Core().V1().Services().Lister(), configProducerArgs.ServiceAffinityArgs.AntiAffinityLabelsPreference, ) }, diff --git a/pkg/scheduler/algorithmprovider/defaults/register_predicates.go b/pkg/scheduler/algorithmprovider/defaults/register_predicates.go index f70dd8483c22..8e932a14dbb7 100644 --- a/pkg/scheduler/algorithmprovider/defaults/register_predicates.go +++ b/pkg/scheduler/algorithmprovider/defaults/register_predicates.go @@ -55,40 +55,63 @@ func init() { scheduler.RegisterFitPredicateFactory( predicates.NoVolumeZoneConflictPred, func(args scheduler.PluginFactoryArgs) predicates.FitPredicate { - return predicates.NewVolumeZonePredicate(args.PVLister, args.PVCLister, args.StorageClassLister) + pvLister := args.InformerFactory.Core().V1().PersistentVolumes().Lister() + pvcLister := args.InformerFactory.Core().V1().PersistentVolumeClaims().Lister() + storageClassLister := args.InformerFactory.Storage().V1().StorageClasses().Lister() + return predicates.NewVolumeZonePredicate(pvLister, pvcLister, storageClassLister) }, ) // Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node scheduler.RegisterFitPredicateFactory( predicates.MaxEBSVolumeCountPred, func(args scheduler.PluginFactoryArgs) predicates.FitPredicate { - return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, args.CSINodeLister, args.StorageClassLister, args.PVLister, args.PVCLister) + csiNodeLister := scheduler.GetCSINodeLister(args.InformerFactory) + pvLister := args.InformerFactory.Core().V1().PersistentVolumes().Lister() + pvcLister := args.InformerFactory.Core().V1().PersistentVolumeClaims().Lister() + storageClassLister := args.InformerFactory.Storage().V1().StorageClasses().Lister() + return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, csiNodeLister, storageClassLister, pvLister, pvcLister) }, ) // Fit is determined by whether or not there would be too many GCE PD volumes attached to the node scheduler.RegisterFitPredicateFactory( predicates.MaxGCEPDVolumeCountPred, func(args scheduler.PluginFactoryArgs) predicates.FitPredicate { - return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, args.CSINodeLister, args.StorageClassLister, args.PVLister, args.PVCLister) + csiNodeLister := scheduler.GetCSINodeLister(args.InformerFactory) + pvLister := args.InformerFactory.Core().V1().PersistentVolumes().Lister() + pvcLister := args.InformerFactory.Core().V1().PersistentVolumeClaims().Lister() + storageClassLister := args.InformerFactory.Storage().V1().StorageClasses().Lister() + return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, csiNodeLister, storageClassLister, pvLister, pvcLister) }, ) // Fit is determined by whether or not there would be too many Azure Disk volumes attached to the node scheduler.RegisterFitPredicateFactory( predicates.MaxAzureDiskVolumeCountPred, func(args scheduler.PluginFactoryArgs) predicates.FitPredicate { - return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.CSINodeLister, args.StorageClassLister, args.PVLister, args.PVCLister) + csiNodeLister := scheduler.GetCSINodeLister(args.InformerFactory) + pvLister := args.InformerFactory.Core().V1().PersistentVolumes().Lister() + pvcLister := args.InformerFactory.Core().V1().PersistentVolumeClaims().Lister() + storageClassLister := args.InformerFactory.Storage().V1().StorageClasses().Lister() + return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, csiNodeLister, storageClassLister, pvLister, pvcLister) }, ) scheduler.RegisterFitPredicateFactory( predicates.MaxCSIVolumeCountPred, func(args scheduler.PluginFactoryArgs) predicates.FitPredicate { - return predicates.NewCSIMaxVolumeLimitPredicate(args.CSINodeLister, args.PVLister, args.PVCLister, args.StorageClassLister) + csiNodeLister := scheduler.GetCSINodeLister(args.InformerFactory) + pvLister := args.InformerFactory.Core().V1().PersistentVolumes().Lister() + pvcLister := args.InformerFactory.Core().V1().PersistentVolumeClaims().Lister() + storageClassLister := args.InformerFactory.Storage().V1().StorageClasses().Lister() + return predicates.NewCSIMaxVolumeLimitPredicate(csiNodeLister, pvLister, pvcLister, storageClassLister) }, ) scheduler.RegisterFitPredicateFactory( predicates.MaxCinderVolumeCountPred, func(args scheduler.PluginFactoryArgs) predicates.FitPredicate { - return predicates.NewMaxPDVolumeCountPredicate(predicates.CinderVolumeFilterType, args.CSINodeLister, args.StorageClassLister, args.PVLister, args.PVCLister) + csiNodeLister := scheduler.GetCSINodeLister(args.InformerFactory) + pvLister := args.InformerFactory.Core().V1().PersistentVolumes().Lister() + pvcLister := args.InformerFactory.Core().V1().PersistentVolumeClaims().Lister() + storageClassLister := args.InformerFactory.Storage().V1().StorageClasses().Lister() + return predicates.NewMaxPDVolumeCountPredicate(predicates.CinderVolumeFilterType, csiNodeLister, storageClassLister, pvLister, pvcLister) }, ) @@ -96,7 +119,7 @@ func init() { scheduler.RegisterFitPredicateFactory( predicates.MatchInterPodAffinityPred, func(args scheduler.PluginFactoryArgs) predicates.FitPredicate { - return predicates.NewPodAffinityPredicate(args.NodeInfoLister, args.PodLister) + return predicates.NewPodAffinityPredicate(args.SharedLister.NodeInfos(), args.SharedLister.Pods()) }, ) diff --git a/pkg/scheduler/algorithmprovider/defaults/register_priorities.go b/pkg/scheduler/algorithmprovider/defaults/register_priorities.go index c2ff24ff2030..021367393c3e 100644 --- a/pkg/scheduler/algorithmprovider/defaults/register_priorities.go +++ b/pkg/scheduler/algorithmprovider/defaults/register_priorities.go @@ -26,7 +26,11 @@ func init() { // Register functions that extract metadata used by priorities computations. scheduler.RegisterPriorityMetadataProducerFactory( func(args scheduler.PluginFactoryArgs) priorities.MetadataProducer { - return priorities.NewMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister, args.HardPodAffinitySymmetricWeight) + serviceLister := args.InformerFactory.Core().V1().Services().Lister() + controllerLister := args.InformerFactory.Core().V1().ReplicationControllers().Lister() + replicaSetLister := args.InformerFactory.Apps().V1().ReplicaSets().Lister() + statefulSetLister := args.InformerFactory.Apps().V1().StatefulSets().Lister() + return priorities.NewMetadataFactory(serviceLister, controllerLister, replicaSetLister, statefulSetLister, args.HardPodAffinitySymmetricWeight) }) // ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing @@ -37,7 +41,8 @@ func init() { priorities.ServiceSpreadingPriority, scheduler.PriorityConfigFactory{ MapReduceFunction: func(args scheduler.PluginFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) { - return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}, algorithm.EmptyStatefulSetLister{}) + serviceLister := args.InformerFactory.Core().V1().Services().Lister() + return priorities.NewSelectorSpreadPriority(serviceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}, algorithm.EmptyStatefulSetLister{}) }, Weight: 1, }, @@ -54,7 +59,11 @@ func init() { priorities.SelectorSpreadPriority, scheduler.PriorityConfigFactory{ MapReduceFunction: func(args scheduler.PluginFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) { - return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister) + serviceLister := args.InformerFactory.Core().V1().Services().Lister() + controllerLister := args.InformerFactory.Core().V1().ReplicationControllers().Lister() + replicaSetLister := args.InformerFactory.Apps().V1().ReplicaSets().Lister() + statefulSetLister := args.InformerFactory.Apps().V1().StatefulSets().Lister() + return priorities.NewSelectorSpreadPriority(serviceLister, controllerLister, replicaSetLister, statefulSetLister) }, Weight: 1, }, diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 33b5d991d023..145d6f52db72 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -27,21 +27,16 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" - appsinformers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" - policyinformers "k8s.io/client-go/informers/policy/v1beta1" - storageinformersv1 "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" - appslisters "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1beta1" - storagelistersv1 "k8s.io/client-go/listers/storage/v1" + storagelisters "k8s.io/client-go/listers/storage/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog" - "k8s.io/kubernetes/pkg/features" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" @@ -74,28 +69,7 @@ type Configurator struct { informerFactory informers.SharedInformerFactory - // a means to list all PersistentVolumes - pVLister corelisters.PersistentVolumeLister - // a means to list all PersistentVolumeClaims - pVCLister corelisters.PersistentVolumeClaimLister - // a means to list all services - serviceLister corelisters.ServiceLister - // a means to list all controllers - controllerLister corelisters.ReplicationControllerLister - // a means to list all replicasets - replicaSetLister appslisters.ReplicaSetLister - // a means to list all statefulsets - statefulSetLister appslisters.StatefulSetLister - // a means to list all PodDisruptionBudgets - pdbLister policylisters.PodDisruptionBudgetLister - // a means to list all StorageClasses - storageClassLister storagelistersv1.StorageClassLister - // a means to list all CSINodes - csiNodeLister storagelistersv1.CSINodeLister - // a means to list all Nodes - nodeLister corelisters.NodeLister - // a means to list all Pods - podLister corelisters.PodLister + podInformer coreinformers.PodInformer // Close this to stop all reflectors StopEverything <-chan struct{} @@ -138,110 +112,6 @@ type Configurator struct { configProducerArgs *plugins.ConfigProducerArgs } -// ConfigFactoryArgs is a set arguments passed to NewConfigFactory. -type ConfigFactoryArgs struct { - Client clientset.Interface - InformerFactory informers.SharedInformerFactory - NodeInformer coreinformers.NodeInformer - PodInformer coreinformers.PodInformer - PvInformer coreinformers.PersistentVolumeInformer - PvcInformer coreinformers.PersistentVolumeClaimInformer - ReplicationControllerInformer coreinformers.ReplicationControllerInformer - ReplicaSetInformer appsinformers.ReplicaSetInformer - StatefulSetInformer appsinformers.StatefulSetInformer - ServiceInformer coreinformers.ServiceInformer - PdbInformer policyinformers.PodDisruptionBudgetInformer - StorageClassInformer storageinformersv1.StorageClassInformer - CSINodeInformer storageinformersv1.CSINodeInformer - VolumeBinder *volumebinder.VolumeBinder - SchedulerCache internalcache.Cache - HardPodAffinitySymmetricWeight int32 - DisablePreemption bool - PercentageOfNodesToScore int32 - BindTimeoutSeconds int64 - PodInitialBackoffSeconds int64 - PodMaxBackoffSeconds int64 - StopCh <-chan struct{} - Registry framework.Registry - Plugins *schedulerapi.Plugins - PluginConfig []schedulerapi.PluginConfig - PluginConfigProducerRegistry *plugins.ConfigProducerRegistry -} - -// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only -// return the interface. -func NewConfigFactory(args *ConfigFactoryArgs) *Configurator { - stopEverything := args.StopCh - if stopEverything == nil { - stopEverything = wait.NeverStop - } - - // storageClassInformer is only enabled through VolumeScheduling feature gate - var storageClassLister storagelistersv1.StorageClassLister - if args.StorageClassInformer != nil { - storageClassLister = args.StorageClassInformer.Lister() - } - - var csiNodeLister storagelistersv1.CSINodeLister - if args.CSINodeInformer != nil { - csiNodeLister = args.CSINodeInformer.Lister() - } - - var pdbLister policylisters.PodDisruptionBudgetLister - if args.PdbInformer != nil { - pdbLister = args.PdbInformer.Lister() - } - - c := &Configurator{ - client: args.Client, - informerFactory: args.InformerFactory, - pVLister: args.PvInformer.Lister(), - pVCLister: args.PvcInformer.Lister(), - serviceLister: args.ServiceInformer.Lister(), - controllerLister: args.ReplicationControllerInformer.Lister(), - replicaSetLister: args.ReplicaSetInformer.Lister(), - statefulSetLister: args.StatefulSetInformer.Lister(), - pdbLister: pdbLister, - nodeLister: args.NodeInformer.Lister(), - podLister: args.PodInformer.Lister(), - storageClassLister: storageClassLister, - csiNodeLister: csiNodeLister, - volumeBinder: args.VolumeBinder, - schedulerCache: args.SchedulerCache, - StopEverything: stopEverything, - hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight, - disablePreemption: args.DisablePreemption, - percentageOfNodesToScore: args.PercentageOfNodesToScore, - bindTimeoutSeconds: args.BindTimeoutSeconds, - podInitialBackoffSeconds: args.PodInitialBackoffSeconds, - podMaxBackoffSeconds: args.PodMaxBackoffSeconds, - enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(features.NonPreemptingPriority), - registry: args.Registry, - plugins: args.Plugins, - pluginConfig: args.PluginConfig, - pluginConfigProducerRegistry: args.PluginConfigProducerRegistry, - nodeInfoSnapshot: nodeinfosnapshot.NewEmptySnapshot(), - } - c.factoryArgs = PluginFactoryArgs{ - NodeInfoLister: c.nodeInfoSnapshot.NodeInfos(), - PodLister: c.nodeInfoSnapshot.Pods(), - ServiceLister: c.serviceLister, - ControllerLister: c.controllerLister, - ReplicaSetLister: c.replicaSetLister, - StatefulSetLister: c.statefulSetLister, - PDBLister: c.pdbLister, - CSINodeLister: c.csiNodeLister, - PVLister: c.pVLister, - PVCLister: c.pVCLister, - StorageClassLister: c.storageClassLister, - VolumeBinder: c.volumeBinder, - HardPodAffinitySymmetricWeight: c.hardPodAffinitySymmetricWeight, - } - c.configProducerArgs = &plugins.ConfigProducerArgs{} - - return c -} - // GetHardPodAffinitySymmetricWeight is exposed for testing. func (c *Configurator) GetHardPodAffinitySymmetricWeight() int32 { return c.hardPodAffinitySymmetricWeight @@ -404,8 +274,8 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e // Setup cache debugger. debugger := cachedebugger.New( - c.nodeLister, - c.podLister, + c.informerFactory.Core().V1().Nodes().Lister(), + c.podInformer.Lister(), c.schedulerCache, podQueue, ) @@ -427,8 +297,8 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e framework, extenders, c.volumeBinder, - c.pVCLister, - c.pdbLister, + c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(), + GetPodDisruptionBudgetLister(c.informerFactory), c.alwaysCheckAllPredicates, c.disablePreemption, c.percentageOfNodesToScore, @@ -649,3 +519,19 @@ func (b *binder) Bind(binding *v1.Binding) error { klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding) } + +// GetPodDisruptionBudgetLister returns pdb lister from the given informer factory. Returns nil if PodDisruptionBudget feature is disabled. +func GetPodDisruptionBudgetLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister { + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) { + return informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister() + } + return nil +} + +// GetCSINodeLister returns CSINode lister from the given informer factory. Returns nil if CSINodeInfo feature is disabled. +func GetCSINodeLister(informerFactory informers.SharedInformerFactory) storagelisters.CSINodeLister { + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CSINodeInfo) { + return informerFactory.Storage().V1().CSINodes().Lister() + } + return nil +} diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 729724273fe0..197092ad84ca 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" @@ -40,6 +41,7 @@ import ( clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" apitesting "k8s.io/kubernetes/pkg/api/testing" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -52,6 +54,7 @@ import ( internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" ) const ( @@ -539,32 +542,31 @@ func newConfigFactoryWithFrameworkRegistry( client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}, registry framework.Registry, pluginConfigProducerRegistry *frameworkplugins.ConfigProducerRegistry) *Configurator { informerFactory := informers.NewSharedInformerFactory(client, 0) - return NewConfigFactory(&ConfigFactoryArgs{ - Client: client, - InformerFactory: informerFactory, - NodeInformer: informerFactory.Core().V1().Nodes(), - PodInformer: informerFactory.Core().V1().Pods(), - PvInformer: informerFactory.Core().V1().PersistentVolumes(), - PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(), - ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(), - ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(), - StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(), - ServiceInformer: informerFactory.Core().V1().Services(), - PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(), - StorageClassInformer: informerFactory.Storage().V1().StorageClasses(), - CSINodeInformer: informerFactory.Storage().V1().CSINodes(), - HardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight, - DisablePreemption: disablePodPreemption, - PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, - BindTimeoutSeconds: bindTimeoutSeconds, - PodInitialBackoffSeconds: podInitialBackoffDurationSeconds, - PodMaxBackoffSeconds: podMaxBackoffDurationSeconds, - StopCh: stopCh, - Registry: registry, - Plugins: nil, - PluginConfig: []schedulerapi.PluginConfig{}, - PluginConfigProducerRegistry: pluginConfigProducerRegistry, - }) + snapshot := nodeinfosnapshot.NewEmptySnapshot() + return &Configurator{ + client: client, + informerFactory: informerFactory, + podInformer: informerFactory.Core().V1().Pods(), + hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight, + disablePreemption: disablePodPreemption, + percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, + bindTimeoutSeconds: bindTimeoutSeconds, + podInitialBackoffSeconds: podInitialBackoffDurationSeconds, + podMaxBackoffSeconds: podMaxBackoffDurationSeconds, + StopEverything: stopCh, + enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.NonPreemptingPriority), + registry: registry, + plugins: nil, + pluginConfig: []schedulerapi.PluginConfig{}, + pluginConfigProducerRegistry: pluginConfigProducerRegistry, + nodeInfoSnapshot: snapshot, + factoryArgs: PluginFactoryArgs{ + SharedLister: snapshot, + InformerFactory: informerFactory, + HardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight, + }, + configProducerArgs: &frameworkplugins.ConfigProducerArgs{}, + } } func newConfigFactory( diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index ebc2833bcf57..a695edfa352b 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -32,8 +32,6 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" - policyv1beta1informers "k8s.io/client-go/informers/policy/v1beta1" - storageinformers "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" @@ -47,6 +45,7 @@ import ( internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -264,12 +263,17 @@ func New(client clientset.Interface, stopCh <-chan struct{}, opts ...Option) (*Scheduler, error) { + stopEverything := stopCh + if stopEverything == nil { + stopEverything = wait.NeverStop + } + options := defaultSchedulerOptions for _, opt := range opts { opt(&options) } - schedulerCache := internalcache.New(30*time.Second, stopCh) + schedulerCache := internalcache.New(30*time.Second, stopEverything) volumeBinder := volumebinder.NewVolumeBinder( client, informerFactory.Core().V1().Nodes(), @@ -287,44 +291,36 @@ func New(client clientset.Interface, } registry.Merge(options.frameworkOutOfTreeRegistry) - var pdbInformer policyv1beta1informers.PodDisruptionBudgetInformer - if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) { - pdbInformer = informerFactory.Policy().V1beta1().PodDisruptionBudgets() - } - - var csiNodeInformer storageinformers.CSINodeInformer - if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CSINodeInfo) { - csiNodeInformer = informerFactory.Storage().V1().CSINodes() - } - - // Set up the configurator which can create schedulers from configs. - configurator := NewConfigFactory(&ConfigFactoryArgs{ - Client: client, - InformerFactory: informerFactory, - PodInformer: podInformer, - NodeInformer: informerFactory.Core().V1().Nodes(), - PvInformer: informerFactory.Core().V1().PersistentVolumes(), - PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(), - ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(), - ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(), - StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(), - ServiceInformer: informerFactory.Core().V1().Services(), - PdbInformer: pdbInformer, - StorageClassInformer: informerFactory.Storage().V1().StorageClasses(), - CSINodeInformer: csiNodeInformer, - VolumeBinder: volumeBinder, - SchedulerCache: schedulerCache, - HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight, - DisablePreemption: options.disablePreemption, - PercentageOfNodesToScore: options.percentageOfNodesToScore, - BindTimeoutSeconds: options.bindTimeoutSeconds, - PodInitialBackoffSeconds: options.podInitialBackoffSeconds, - PodMaxBackoffSeconds: options.podMaxBackoffSeconds, - Registry: registry, - PluginConfigProducerRegistry: options.frameworkConfigProducerRegistry, - Plugins: options.frameworkPlugins, - PluginConfig: options.frameworkPluginConfig, - }) + snapshot := nodeinfosnapshot.NewEmptySnapshot() + + configurator := &Configurator{ + client: client, + informerFactory: informerFactory, + podInformer: podInformer, + volumeBinder: volumeBinder, + schedulerCache: schedulerCache, + StopEverything: stopEverything, + hardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight, + disablePreemption: options.disablePreemption, + percentageOfNodesToScore: options.percentageOfNodesToScore, + bindTimeoutSeconds: options.bindTimeoutSeconds, + podInitialBackoffSeconds: options.podInitialBackoffSeconds, + podMaxBackoffSeconds: options.podMaxBackoffSeconds, + enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.NonPreemptingPriority), + registry: registry, + plugins: options.frameworkPlugins, + pluginConfig: options.frameworkPluginConfig, + pluginConfigProducerRegistry: options.frameworkConfigProducerRegistry, + nodeInfoSnapshot: snapshot, + factoryArgs: PluginFactoryArgs{ + SharedLister: snapshot, + InformerFactory: informerFactory, + VolumeBinder: volumeBinder, + HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight, + }, + configProducerArgs: &frameworkplugins.ConfigProducerArgs{}, + } + var sched *Scheduler source := schedulerAlgorithmSource switch { @@ -360,7 +356,7 @@ func New(client clientset.Interface, // Additional tweaks to the config produced by the configurator. sched.Recorder = recorder sched.DisablePreemption = options.disablePreemption - sched.StopEverything = stopCh + sched.StopEverything = stopEverything sched.podConditionUpdater = &podConditionUpdaterImpl{client} sched.podPreemptor = &podPreemptorImpl{client} sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced @@ -581,11 +577,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { fwk := sched.Framework podInfo := sched.NextPod() - pod := podInfo.Pod // pod could be nil when schedulerQueue is closed - if pod == nil { + if podInfo == nil || podInfo.Pod == nil { return } + pod := podInfo.Pod if pod.DeletionTimestamp != nil { sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)