Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NormalizeScore extension point for scheduler framework. #80383

Merged
merged 1 commit into from Aug 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/scheduler/core/generic_scheduler.go
Expand Up @@ -780,6 +780,18 @@ func PrioritizeNodes(
return schedulerapi.HostPriorityList{}, scoreStatus.AsError()
}

// Run the Normalize Score plugins.
status := framework.RunNormalizeScorePlugins(pluginContext, pod, scoresMap)
if !status.IsSuccess() {
return schedulerapi.HostPriorityList{}, status.AsError()
}

// Apply weights for scores.
status = framework.ApplyScoreWeights(pluginContext, pod, scoresMap)
if !status.IsSuccess() {
return schedulerapi.HostPriorityList{}, status.AsError()
}

// Summarize all scores.
result := make(schedulerapi.HostPriorityList, 0, len(nodes))

Expand Down
10 changes: 9 additions & 1 deletion pkg/scheduler/framework/v1alpha1/BUILD
Expand Up @@ -39,6 +39,14 @@ filegroup(

go_test(
name = "go_default_test",
srcs = ["interface_test.go"],
srcs = [
"framework_test.go",
"interface_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/apis/config:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)
100 changes: 82 additions & 18 deletions pkg/scheduler/framework/v1alpha1/framework.go
Expand Up @@ -34,20 +34,21 @@ import (
// framework is the component responsible for initializing and running scheduler
// plugins.
type framework struct {
registry Registry
nodeInfoSnapshot *cache.NodeInfoSnapshot
waitingPods *waitingPodsMap
pluginNameToWeightMap map[string]int
queueSortPlugins []QueueSortPlugin
prefilterPlugins []PrefilterPlugin
filterPlugins []FilterPlugin
scorePlugins []ScorePlugin
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
bindPlugins []BindPlugin
postbindPlugins []PostbindPlugin
unreservePlugins []UnreservePlugin
permitPlugins []PermitPlugin
registry Registry
nodeInfoSnapshot *cache.NodeInfoSnapshot
waitingPods *waitingPodsMap
pluginNameToWeightMap map[string]int
queueSortPlugins []QueueSortPlugin
prefilterPlugins []PrefilterPlugin
filterPlugins []FilterPlugin
scorePlugins []ScorePlugin
scoreWithNormalizePlugins []ScoreWithNormalizePlugin
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
bindPlugins []BindPlugin
postbindPlugins []PostbindPlugin
unreservePlugins []UnreservePlugin
permitPlugins []PermitPlugin
}

const (
Expand Down Expand Up @@ -131,6 +132,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if plugins.Score != nil {
for _, sc := range plugins.Score.Enabled {
if pg, ok := pluginsMap[sc.Name]; ok {
// First, make sure the plugin implements ScorePlugin interface.
p, ok := pg.(ScorePlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend score plugin", sc.Name)
Expand All @@ -139,6 +141,13 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
return nil, fmt.Errorf("score plugin %v is not configured with weight", p.Name())
}
f.scorePlugins = append(f.scorePlugins, p)

// Next, if the plugin also implements ScoreWithNormalizePlugin interface,
// add it to the normalizeScore plugin list.
np, ok := pg.(ScoreWithNormalizePlugin)
if ok {
f.scoreWithNormalizePlugins = append(f.scoreWithNormalizePlugins, np)
}
} else {
return nil, fmt.Errorf("score plugin %v does not exist", sc.Name)
}
Expand Down Expand Up @@ -317,14 +326,12 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
errCh := schedutil.NewErrorChannel()
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
for _, pl := range f.scorePlugins {
// Score plugins' weight has been checked when they are initialized.
weight := f.pluginNameToWeightMap[pl.Name()]
score, status := pl.Score(pc, pod, nodes[index].Name)
if !status.IsSuccess() {
errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel)
return
}
pluginToNodeScoreMap[pl.Name()][index] = score * weight
pluginToNodeScoreMap[pl.Name()][index] = score
}
})

Expand All @@ -337,6 +344,64 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
return pluginToNodeScoreMap, nil
}

// RunNormalizeScorePlugins runs the NormalizeScore function of Score plugins.
// It should be called after RunScorePlugins with the PluginToNodeScoreMap result.
// It then modifies the map with normalized scores. It returns a non-success Status
// if any of the NormalizeScore functions returns a non-success status.
func (f *framework) RunNormalizeScorePlugins(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScoreMap) *Status {
liu-cong marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(context.Background())
errCh := schedutil.NewErrorChannel()
workqueue.ParallelizeUntil(ctx, 16, len(f.scoreWithNormalizePlugins), func(index int) {
pl := f.scoreWithNormalizePlugins[index]
nodeScoreList, ok := scores[pl.Name()]
liu-cong marked this conversation as resolved.
Show resolved Hide resolved
liu-cong marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
err := fmt.Errorf("normalize score plugin %v has no corresponding scores in the PluginToNodeScoreMap", pl.Name())
errCh.SendErrorWithCancel(err, cancel)
}
status := pl.NormalizeScore(pc, pod, nodeScoreList)
if !status.IsSuccess() {
err := fmt.Errorf("normalize score plugin %v failed with error %v", pl.Name(), status.Message())
errCh.SendErrorWithCancel(err, cancel)
}
})

if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while running normalize score plugin for pod %v: %v", pod.Name, err)
klog.Error(msg)
return NewStatus(Error, msg)
}

return nil
}

// ApplyScoreWeights applies weights to the score results. It should be called after
// RunNormalizeScorePlugins.
func (f *framework) ApplyScoreWeights(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScoreMap) *Status {
ctx, cancel := context.WithCancel(context.Background())
errCh := schedutil.NewErrorChannel()
workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
// Score plugins' weight has been checked when they are initialized.
weight := f.pluginNameToWeightMap[pl.Name()]
nodeScoreList, ok := scores[pl.Name()]
liu-cong marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
err := fmt.Errorf("score plugin %v has no corresponding scores in the PluginToNodeScoreMap", pl.Name())
errCh.SendErrorWithCancel(err, cancel)
}
for i := range nodeScoreList {
nodeScoreList[i] = nodeScoreList[i] * weight
}
})

if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while applying score weights for pod %v: %v", pod.Name, err)
klog.Error(msg)
return NewStatus(Error, msg)
}

return nil
}

// RunPrebindPlugins runs the set of configured prebind plugins. It returns a
// failure (bool) if any of the plugins returns an error. It also returns an
// error containing the rejection message or the error occurred in the plugin.
Expand Down Expand Up @@ -520,7 +585,6 @@ func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
find(plugins.Filter)
find(plugins.PostFilter)
find(plugins.Score)
find(plugins.NormalizeScore)
find(plugins.Reserve)
find(plugins.Permit)
find(plugins.PreBind)
Expand Down