Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid copying PriorityConfig and SchedulerExtender structs for every node while running priority functions #71722

Merged
merged 3 commits into from
Dec 6, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 27 additions & 25 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,24 +657,26 @@ func PrioritizeNodes(

// DEPRECATED: we can remove this when all priorityConfigs implement the
// Map-Reduce pattern.
workqueue.ParallelizeUntil(context.TODO(), 16, len(priorityConfigs), func(i int) {
priorityConfig := priorityConfigs[i]
if priorityConfig.Function == nil {
for i := range priorityConfigs {
if priorityConfigs[i].Function != nil {
wg.Add(1)
go func(index int) {
defer wg.Done()
var err error
results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
if err != nil {
appendError(err)
}
}(i)
} else {
results[i] = make(schedulerapi.HostPriorityList, len(nodes))
return
}

var err error
results[i], err = priorityConfig.Function(pod, nodeNameToInfo, nodes)
if err != nil {
appendError(err)
}
})
}

workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := nodeNameToInfo[nodes[index].Name]
for i, priorityConfig := range priorityConfigs {
if priorityConfig.Function != nil {
for i := range priorityConfigs {
if priorityConfigs[i].Function != nil {
continue
}

Expand All @@ -687,22 +689,22 @@ func PrioritizeNodes(
}
})

for i, priorityConfig := range priorityConfigs {
if priorityConfig.Reduce == nil {
for i := range priorityConfigs {
if priorityConfigs[i].Reduce == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity. There will never be a situation when Function != nil and Reduce != nil, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. The Map/Reduce functions are the new style for priority functions. The old-style was using "Function". No priority function is supposed to mix the two and none of our priority functions do that.

continue
}
wg.Add(1)
go func(index int, config algorithm.PriorityConfig) {
go func(index int) {
defer wg.Done()
if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
appendError(err)
}
if klog.V(10) {
for _, hostPriority := range results[index] {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, config.Name, hostPriority.Score)
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
}
}
}(i, priorityConfig)
}(i)
}
// Wait for all computations to be finished.
wg.Wait()
Expand All @@ -722,14 +724,14 @@ func PrioritizeNodes(

if len(extenders) != 0 && nodes != nil {
combinedScores := make(map[string]int, len(nodeNameToInfo))
for _, extender := range extenders {
if !extender.IsInterested(pod) {
for i := range extenders {
if !extenders[i].IsInterested(pod) {
continue
}
wg.Add(1)
go func(ext algorithm.SchedulerExtender) {
go func(extIndex int) {
defer wg.Done()
prioritizedList, weight, err := ext.Prioritize(pod, nodes)
prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
return
Expand All @@ -738,12 +740,12 @@ func PrioritizeNodes(
for i := range *prioritizedList {
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
if klog.V(10) {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, ext.Name(), score)
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
}
combinedScores[host] += score * weight
}
mu.Unlock()
}(extender)
}(i)
}
// wait for all go routines to finish
wg.Wait()
Expand Down