Skip to content

Commit

Permalink
Merge pull request #83663 from draveness/feature/add-shared-informers…
Browse files Browse the repository at this point in the history
…-factory

feat(scheduler): expose SharedInformerFactory to the framework handle
  • Loading branch information
k8s-ci-robot committed Oct 11, 2019
2 parents faad5d5 + ee4dec6 commit c980000
Show file tree
Hide file tree
Showing 19 changed files with 87 additions and 152 deletions.
11 changes: 1 addition & 10 deletions cmd/kube-scheduler/app/server.go
Expand Up @@ -167,17 +167,8 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis

// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory.Core().V1().Nodes(),
cc.InformerFactory,
cc.PodInformer,
cc.InformerFactory.Core().V1().PersistentVolumes(),
cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
cc.InformerFactory.Core().V1().ReplicationControllers(),
cc.InformerFactory.Apps().V1().ReplicaSets(),
cc.InformerFactory.Apps().V1().StatefulSets(),
cc.InformerFactory.Core().V1().Services(),
cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
cc.InformerFactory.Storage().V1().StorageClasses(),
cc.InformerFactory.Storage().V1beta1().CSINodes(),
cc.Recorder,
cc.ComponentConfig.AlgorithmSource,
stopCh,
Expand Down
5 changes: 1 addition & 4 deletions pkg/scheduler/BUILD
Expand Up @@ -30,11 +30,8 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
Expand Down
11 changes: 1 addition & 10 deletions pkg/scheduler/api/compatibility/compatibility_test.go
Expand Up @@ -1206,17 +1206,8 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {

sched, err := scheduler.New(
client,
informerFactory.Core().V1().Nodes(),
informerFactory,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
informerFactory.Storage().V1beta1().CSINodes(),
nil,
algorithmSrc,
make(chan struct{}),
Expand Down
22 changes: 8 additions & 14 deletions pkg/scheduler/eventhandlers.go
Expand Up @@ -27,9 +27,8 @@ import (
storagev1beta1 "k8s.io/api/storage/v1beta1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/features"
)
Expand Down Expand Up @@ -381,13 +380,8 @@ func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
func AddAllEventHandlers(
sched *Scheduler,
schedulerName string,
nodeInformer coreinformers.NodeInformer,
informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
serviceInformer coreinformers.ServiceInformer,
storageClassInformer storageinformersv1.StorageClassInformer,
csiNodeInformer storageinformersv1beta1.CSINodeInformer,
) {
// scheduled pod cache
podInformer.Informer().AddEventHandler(
Expand Down Expand Up @@ -440,7 +434,7 @@ func AddAllEventHandlers(
},
)

nodeInformer.Informer().AddEventHandler(
informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.addNodeToCache,
UpdateFunc: sched.updateNodeInCache,
Expand All @@ -449,7 +443,7 @@ func AddAllEventHandlers(
)

if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
csiNodeInformer.Informer().AddEventHandler(
informerFactory.Storage().V1beta1().CSINodes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onCSINodeAdd,
UpdateFunc: sched.onCSINodeUpdate,
Expand All @@ -460,7 +454,7 @@ func AddAllEventHandlers(

// On add and delete of PVs, it will affect equivalence cache items
// related to persistent volume
pvInformer.Informer().AddEventHandler(
informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
AddFunc: sched.onPvAdd,
Expand All @@ -469,7 +463,7 @@ func AddAllEventHandlers(
)

// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
pvcInformer.Informer().AddEventHandler(
informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onPvcAdd,
UpdateFunc: sched.onPvcUpdate,
Expand All @@ -479,15 +473,15 @@ func AddAllEventHandlers(
// This is for ServiceAffinity: affected by the selector of the service is updated.
// Also, if new service is added, equivalence cache will also become invalid since
// existing pods may be "captured" by this service and change this predicate result.
serviceInformer.Informer().AddEventHandler(
informerFactory.Core().V1().Services().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onServiceAdd,
UpdateFunc: sched.onServiceUpdate,
DeleteFunc: sched.onServiceDelete,
},
)

storageClassInformer.Informer().AddEventHandler(
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onStorageClassAdd,
},
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/factory/BUILD
Expand Up @@ -32,6 +32,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/factory/factory.go
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
Expand Down Expand Up @@ -131,6 +132,9 @@ type PodPreemptor interface {
// construct a new scheduler.
type Configurator struct {
client clientset.Interface

informerFactory informers.SharedInformerFactory

// a means to list all PersistentVolumes
pVLister corelisters.PersistentVolumeLister
// a means to list all PersistentVolumeClaims
Expand Down Expand Up @@ -196,6 +200,7 @@ type Configurator struct {
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
type ConfigFactoryArgs struct {
Client clientset.Interface
InformerFactory informers.SharedInformerFactory
NodeInformer coreinformers.NodeInformer
PodInformer coreinformers.PodInformer
PvInformer coreinformers.PersistentVolumeInformer
Expand Down Expand Up @@ -243,6 +248,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {

c := &Configurator{
client: args.Client,
informerFactory: args.InformerFactory,
pVLister: args.PvInformer.Lister(),
pVCLister: args.PvcInformer.Lister(),
serviceLister: args.ServiceInformer.Lister(),
Expand Down Expand Up @@ -416,6 +422,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
&plugins,
pluginConfig,
framework.WithClientSet(c.client),
framework.WithInformerFactory(c.informerFactory),
)
if err != nil {
klog.Fatalf("error initializing the scheduling framework: %v", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/factory/factory_test.go
Expand Up @@ -482,6 +482,7 @@ func newConfigFactoryWithFrameworkRegistry(
informerFactory := informers.NewSharedInformerFactory(client, 0)
return NewConfigFactory(&ConfigFactoryArgs{
Client: client,
InformerFactory: informerFactory,
NodeInformer: informerFactory.Core().V1().Nodes(),
PodInformer: informerFactory.Core().V1().Pods(),
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/framework/v1alpha1/BUILD
Expand Up @@ -20,6 +20,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
Expand Down
20 changes: 18 additions & 2 deletions pkg/scheduler/framework/v1alpha1/framework.go
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
Expand Down Expand Up @@ -58,7 +59,8 @@ type framework struct {
unreservePlugins []UnreservePlugin
permitPlugins []PermitPlugin

clientSet clientset.Interface
clientSet clientset.Interface
informerFactory informers.SharedInformerFactory
}

// extensionPoint encapsulates desired and applied set of plugins at a specific extension
Expand Down Expand Up @@ -89,7 +91,8 @@ func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint
}

type frameworkOptions struct {
clientSet clientset.Interface
clientSet clientset.Interface
informerFactory informers.SharedInformerFactory
}

// Option for the framework.
Expand All @@ -102,6 +105,13 @@ func WithClientSet(clientSet clientset.Interface) Option {
}
}

// WithInformerFactory sets informer factory for the scheduling framework.
func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option {
return func(o *frameworkOptions) {
o.informerFactory = informerFactory
}
}

var defaultFrameworkOptions = frameworkOptions{}

var _ = Framework(&framework{})
Expand All @@ -119,6 +129,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
pluginNameToWeightMap: make(map[string]int),
waitingPods: newWaitingPodsMap(),
clientSet: options.clientSet,
informerFactory: options.informerFactory,
}
if plugins == nil {
return f, nil
Expand Down Expand Up @@ -584,6 +595,11 @@ func (f *framework) ClientSet() clientset.Interface {
return f.clientSet
}

// SharedInformerFactory returns a shared informer factory.
func (f *framework) SharedInformerFactory() informers.SharedInformerFactory {
return f.informerFactory
}

func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
pgMap := make(map[string]config.Plugin)

Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/framework/v1alpha1/interface.go
Expand Up @@ -25,6 +25,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
Expand Down Expand Up @@ -452,4 +453,6 @@ type FrameworkHandle interface {

// ClientSet returns a kubernetes clientSet.
ClientSet() clientset.Interface

SharedInformerFactory() informers.SharedInformerFactory
}
1 change: 1 addition & 0 deletions pkg/scheduler/internal/queue/BUILD
Expand Up @@ -43,6 +43,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
],
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/internal/queue/scheduling_queue_test.go
Expand Up @@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/component-base/metrics/testutil"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
Expand Down Expand Up @@ -228,6 +229,10 @@ func (*fakeFramework) ClientSet() clientset.Interface {
return nil
}

func (*fakeFramework) SharedInformerFactory() informers.SharedInformerFactory {
return nil
}

func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
q := NewPriorityQueue(nil, &fakeFramework{})
if err := q.Add(&medPriorityPod); err != nil {
Expand Down

0 comments on commit c980000

Please sign in to comment.