Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement/update interfaces and skeleton for the scheduling framework #75848

Merged
merged 4 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/kube-scheduler/app/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/util/configz:go_default_library",
"//pkg/util/flag:go_default_library",
Expand Down
2 changes: 2 additions & 0 deletions cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/util/configz"
utilflag "k8s.io/kubernetes/pkg/util/flag"
Expand Down Expand Up @@ -174,6 +175,7 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error
cc.Recorder,
cc.ComponentConfig.AlgorithmSource,
stopCh,
framework.NewRegistry(),
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ go_library(
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
Expand Down Expand Up @@ -58,6 +58,7 @@ go_test(
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
Expand Down Expand Up @@ -97,11 +98,11 @@ filegroup(
"//pkg/scheduler/apis/config:all-srcs",
"//pkg/scheduler/core:all-srcs",
"//pkg/scheduler/factory:all-srcs",
"//pkg/scheduler/framework:all-srcs",
"//pkg/scheduler/internal/cache:all-srcs",
"//pkg/scheduler/internal/queue:all-srcs",
"//pkg/scheduler/metrics:all-srcs",
"//pkg/scheduler/nodeinfo:all-srcs",
"//pkg/scheduler/plugins:all-srcs",
"//pkg/scheduler/testing:all-srcs",
"//pkg/scheduler/util:all-srcs",
"//pkg/scheduler/volumebinder:all-srcs",
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ go_library(
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
Expand Down Expand Up @@ -48,10 +48,10 @@ go_test(
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/core/extender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
Expand Down Expand Up @@ -531,6 +532,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
extenders = append(extenders, &test.extenders[ii])
}
cache := internalcache.New(time.Duration(0), wait.NeverStop)
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil)
for _, name := range test.nodes {
cache.AddNode(createNode(name))
}
Expand All @@ -542,7 +544,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
predicates.EmptyPredicateMetadataProducer,
test.prioritizers,
priorities.EmptyPriorityMetadataProducer,
emptyPluginSet,
fwk,
extenders,
nil,
schedulertesting.FakePersistentVolumeClaimLister{},
Expand Down
14 changes: 7 additions & 7 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
utiltrace "k8s.io/utils/trace"
Expand Down Expand Up @@ -158,11 +158,11 @@ type genericScheduler struct {
priorityMetaProducer priorities.PriorityMetadataProducer
predicateMetaProducer predicates.PredicateMetadataProducer
prioritizers []priorities.PriorityConfig
pluginSet pluginsv1alpha1.PluginSet
framework framework.Framework
extenders []algorithm.SchedulerExtender
lastNodeIndex uint64
alwaysCheckAllPredicates bool
nodeInfoSnapshot internalcache.NodeInfoSnapshot
nodeInfoSnapshot *internalcache.NodeInfoSnapshot

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change this to a pointer? the snapshot wraps a map, which is another reference type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other functions that take a snapshot, take a pointer of the snapshot. I didn't want to change all of those.

volumeBinder *volumebinder.VolumeBinder
pvcLister corelisters.PersistentVolumeClaimLister
pdbLister algorithm.PDBLister
Expand All @@ -174,7 +174,7 @@ type genericScheduler struct {
// functions.
func (g *genericScheduler) snapshot() error {
// Used for all fit and priority funcs.
return g.cache.UpdateNodeInfoSnapshot(&g.nodeInfoSnapshot)
return g.cache.UpdateNodeInfoSnapshot(g.nodeInfoSnapshot)
}

// Schedule tries to schedule the given pod to one of the nodes in the node list.
Expand Down Expand Up @@ -1206,7 +1206,7 @@ func NewGenericScheduler(
predicateMetaProducer predicates.PredicateMetadataProducer,
prioritizers []priorities.PriorityConfig,
priorityMetaProducer priorities.PriorityMetadataProducer,
pluginSet pluginsv1alpha1.PluginSet,
framework framework.Framework,
extenders []algorithm.SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder,
pvcLister corelisters.PersistentVolumeClaimLister,
Expand All @@ -1222,9 +1222,9 @@ func NewGenericScheduler(
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer,
pluginSet: pluginSet,
framework: framework,
extenders: extenders,
nodeInfoSnapshot: internalcache.NewNodeInfoSnapshot(),
nodeInfoSnapshot: framework.NodeInfoSnapshot(),
volumeBinder: volumeBinder,
pvcLister: pvcLister,
pdbLister: pdbLister,
Expand Down
37 changes: 11 additions & 26 deletions pkg/scheduler/core/generic_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
)

Expand Down Expand Up @@ -134,27 +134,9 @@ func getNodeReducePriority(pod *v1.Pod, meta interface{}, nodeNameToInfo map[str
return nil
}

// EmptyPluginSet is a test plugin set used by the default scheduler.
type EmptyPluginSet struct{}

var _ plugins.PluginSet = EmptyPluginSet{}

// ReservePlugins returns a slice of default reserve plugins.
func (r EmptyPluginSet) ReservePlugins() []plugins.ReservePlugin {
return []plugins.ReservePlugin{}
}

// PrebindPlugins returns a slice of default prebind plugins.
func (r EmptyPluginSet) PrebindPlugins() []plugins.PrebindPlugin {
return []plugins.PrebindPlugin{}
}

// Data returns a pointer to PluginData.
func (r EmptyPluginSet) Data() *plugins.PluginData {
return &plugins.PluginData{}
}

var emptyPluginSet = &EmptyPluginSet{}
// EmptyPluginRegistry is a test plugin set used by the default scheduler.
var EmptyPluginRegistry = framework.Registry{}
var emptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil)

func makeNodeList(nodeNames []string) []*v1.Node {
result := make([]*v1.Node, 0, len(nodeNames))
Expand Down Expand Up @@ -456,6 +438,7 @@ func TestGenericScheduler(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cache := internalcache.New(time.Duration(0), wait.NeverStop)
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil)
for _, pod := range test.pods {
cache.AddPod(pod)
}
Expand All @@ -474,7 +457,7 @@ func TestGenericScheduler(t *testing.T) {
algorithmpredicates.EmptyPredicateMetadataProducer,
test.prioritizers,
priorities.EmptyPriorityMetadataProducer,
emptyPluginSet,
fwk,
[]algorithm.SchedulerExtender{},
nil,
pvcLister,
Expand All @@ -498,6 +481,7 @@ func TestGenericScheduler(t *testing.T) {
func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes []*v1.Node) *genericScheduler {
algorithmpredicates.SetPredicatesOrdering(order)
cache := internalcache.New(time.Duration(0), wait.NeverStop)
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil)
for _, n := range nodes {
cache.AddNode(n)
}
Expand All @@ -510,10 +494,10 @@ func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes
algorithmpredicates.EmptyPredicateMetadataProducer,
prioritizers,
priorities.EmptyPriorityMetadataProducer,
emptyPluginSet,
fwk,
nil, nil, nil, nil, false, false,
schedulerapi.DefaultPercentageOfNodesToScore)
cache.UpdateNodeInfoSnapshot(&s.(*genericScheduler).nodeInfoSnapshot)
cache.UpdateNodeInfoSnapshot(s.(*genericScheduler).nodeInfoSnapshot)
return s.(*genericScheduler)

}
Expand Down Expand Up @@ -1483,6 +1467,7 @@ func TestPreempt(t *testing.T) {
t.Logf("===== Running test %v", t.Name())
stop := make(chan struct{})
cache := internalcache.New(time.Duration(0), stop)
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil)
for _, pod := range test.pods {
cache.AddPod(pod)
}
Expand All @@ -1509,7 +1494,7 @@ func TestPreempt(t *testing.T) {
algorithmpredicates.EmptyPredicateMetadataProducer,
[]priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
priorities.EmptyPriorityMetadataProducer,
emptyPluginSet,
fwk,
extenders,
nil,
schedulertesting.FakePersistentVolumeClaimLister{},
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/factory/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ go_library(
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/api/validation:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/debugger:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/plugins:go_default_library",
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
Expand Down Expand Up @@ -61,6 +60,7 @@ go_test(
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/api/latest:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
Expand Down
25 changes: 14 additions & 11 deletions pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/api/validation"
"k8s.io/kubernetes/pkg/scheduler/core"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/plugins"
pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)

Expand Down Expand Up @@ -92,8 +91,8 @@ type Config struct {
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
// the preemptor pod.
PodPreemptor PodPreemptor
// PlugingSet has a set of plugins and data used to run them.
PluginSet pluginsv1alpha1.PluginSet
// Framework runs scheduler plugins at configured extension points.
Framework framework.Framework

// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
Expand Down Expand Up @@ -183,8 +182,8 @@ type configFactory struct {
pdbLister policylisters.PodDisruptionBudgetLister
// a means to list all StorageClasses
storageClassLister storagelisters.StorageClassLister
// pluginRunner has a set of plugins and the context used for running them.
pluginSet pluginsv1alpha1.PluginSet
// framework has a set of plugins and the context used for running them.
framework framework.Framework

// Close this to stop all reflectors
StopEverything <-chan struct{}
Expand Down Expand Up @@ -238,6 +237,7 @@ type ConfigFactoryArgs struct {
PercentageOfNodesToScore int32
BindTimeoutSeconds int64
StopCh <-chan struct{}
Registry framework.Registry
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may not find the other place using Registry except the initialization of framework . Is it better to just put the NewRegistry into the initialization of framework and remove Registry from ConfigFactoryArgs . WDYT? Maybe there is other reason I don't know.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for Registry being here is that we want custom scheduler to be able to pass their own registry to the scheduler and have their own set of plugins.

}

// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
Expand All @@ -248,6 +248,11 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
stopEverything = wait.NeverStop
}
schedulerCache := internalcache.New(30*time.Second, stopEverything)
// TODO(bsalamat): config files should be passed to the framework.
framework, err := framework.NewFramework(args.Registry, nil)
if err != nil {
klog.Fatalf("error initializing the scheduling framework: %v", err)
}

// storageClassInformer is only enabled through VolumeScheduling feature gate
var storageClassLister storagelisters.StorageClassLister
Expand All @@ -267,6 +272,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
statefulSetLister: args.StatefulSetInformer.Lister(),
pdbLister: args.PdbInformer.Lister(),
storageClassLister: storageClassLister,
framework: framework,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
schedulerName: args.SchedulerName,
Expand Down Expand Up @@ -435,17 +441,14 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
return nil, err
}

// TODO(bsalamat): the default registrar should be able to process config files.
c.pluginSet = plugins.NewDefaultPluginSet(pluginsv1alpha1.NewPluginContext(), &c.schedulerCache)

algo := core.NewGenericScheduler(
c.schedulerCache,
c.podQueue,
predicateFuncs,
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
c.pluginSet,
c.framework,
extenders,
c.volumeBinder,
c.pVCLister,
Expand All @@ -463,7 +466,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
GetBinder: getBinderFunc(c.client, extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client},
PluginSet: c.pluginSet,
Framework: c.framework,
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
Expand Down Expand Up @@ -494,6 +495,7 @@ func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight
schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds,
stopCh,
framework.NewRegistry(),
})
}

Expand Down