Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move RunNormalizeScorePlugins and ApplyScoreWeights into RunScorePlugins; Also add unit tests for RunScorePlugins. #81614

Merged
merged 1 commit into from Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 0 additions & 12 deletions pkg/scheduler/core/generic_scheduler.go
Expand Up @@ -784,18 +784,6 @@ func PrioritizeNodes(
return schedulerapi.HostPriorityList{}, scoreStatus.AsError()
}

// Run the Normalize Score plugins.
status := framework.RunNormalizeScorePlugins(pluginContext, pod, scoresMap)
if !status.IsSuccess() {
return schedulerapi.HostPriorityList{}, status.AsError()
}

// Apply weights for scores.
status = framework.ApplyScoreWeights(pluginContext, pod, scoresMap)
if !status.IsSuccess() {
return schedulerapi.HostPriorityList{}, status.AsError()
}

// Summarize all scores.
result := make(schedulerapi.HostPriorityList, 0, len(nodes))

Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/framework/v1alpha1/BUILD
Expand Up @@ -48,6 +48,7 @@ go_test(
deps = [
"//pkg/scheduler/apis/config:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)
50 changes: 11 additions & 39 deletions pkg/scheduler/framework/v1alpha1/framework.go
Expand Up @@ -360,6 +360,8 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
}
ctx, cancel := context.WithCancel(context.Background())
errCh := schedutil.NewErrorChannel()

// Run Score method for each node in parallel.
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
for _, pl := range f.scorePlugins {
nodeName := nodes[index].Name
Expand All @@ -374,83 +376,53 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
}
}
})

if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while running score plugin for pod %q: %v", pod.Name, err)
klog.Error(msg)
return nil, NewStatus(Error, msg)
}

return pluginToNodeScores, nil
}

// RunNormalizeScorePlugins runs the NormalizeScore function of Score plugins.
// It should be called after RunScorePlugins with the PluginToNodeScores result.
// It then modifies the list with normalized scores. It returns a non-success Status
// if any of the NormalizeScore functions returns a non-success status.
func (f *framework) RunNormalizeScorePlugins(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScores) *Status {
ctx, cancel := context.WithCancel(context.Background())
errCh := schedutil.NewErrorChannel()
// Run NormalizeScore method for each ScoreWithNormalizePlugin in parallel.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not keep the functions and just make them local (normalizeScores and applyScoreWeights)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just found the 3 steps are very similar (parallel execution and receive error) and it looks good to me to put them in the same function (saves a few lines of code). But no particular reason really.

If you like the separately, I can change it.

workqueue.ParallelizeUntil(ctx, 16, len(f.scoreWithNormalizePlugins), func(index int) {
pl := f.scoreWithNormalizePlugins[index]
nodeScoreList, ok := scores[pl.Name()]
if !ok {
err := fmt.Errorf("normalize score plugin %q has no corresponding scores in the PluginToNodeScores", pl.Name())
errCh.SendErrorWithCancel(err, cancel)
return
}
nodeScoreList := pluginToNodeScores[pl.Name()]
status := pl.NormalizeScore(pc, pod, nodeScoreList)
if !status.IsSuccess() {
err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message())
errCh.SendErrorWithCancel(err, cancel)
return
}
})

if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while running normalize score plugin for pod %q: %v", pod.Name, err)
klog.Error(msg)
return NewStatus(Error, msg)
return nil, NewStatus(Error, msg)
}

return nil
}

// ApplyScoreWeights applies weights to the score results. It should be called after
// RunNormalizeScorePlugins.
func (f *framework) ApplyScoreWeights(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScores) *Status {
ctx, cancel := context.WithCancel(context.Background())
errCh := schedutil.NewErrorChannel()
// Apply score defaultWeights for each ScorePlugin in parallel.
workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
// Score plugins' weight has been checked when they are initialized.
weight := f.pluginNameToWeightMap[pl.Name()]
nodeScoreList, ok := scores[pl.Name()]
if !ok {
err := fmt.Errorf("score plugin %q has no corresponding scores in the PluginToNodeScores", pl.Name())
errCh.SendErrorWithCancel(err, cancel)
return
}
nodeScoreList := pluginToNodeScores[pl.Name()]

for i, nodeScore := range nodeScoreList {
// return error if score plugin returns invalid score.
if nodeScore.Score > MaxNodeScore || nodeScore.Score < MinNodeScore {
err := fmt.Errorf("score plugin %q returns an invalid score %q, it should in the range of [MinNodeScore, MaxNodeScore] after normalizing", pl.Name(), nodeScore.Score)
err := fmt.Errorf("score plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), nodeScore.Score, MinNodeScore, MaxNodeScore)
errCh.SendErrorWithCancel(err, cancel)
return
}

nodeScoreList[i].Score = nodeScore.Score * weight
}
})

if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while applying score weights for pod %q: %v", pod.Name, err)
msg := fmt.Sprintf("error while applying score defaultWeights for pod %q: %v", pod.Name, err)
klog.Error(msg)
return NewStatus(Error, msg)
return nil, NewStatus(Error, msg)
}

return nil
return pluginToNodeScores, nil
}

// RunPrebindPlugins runs the set of configured prebind plugins. It returns a
Expand Down