Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler): remove deprecated pattern in scheduler priority #84905

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/scheduler/BUILD
Expand Up @@ -85,7 +85,6 @@ go_test(
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
Expand Down
5 changes: 1 addition & 4 deletions pkg/scheduler/algorithm/priorities/types.go
Expand Up @@ -48,10 +48,7 @@ type PriorityConfig struct {
Name string
Map PriorityMapFunction
Reduce PriorityReduceFunction
// TODO: Remove it after migrating all functions to
// Map-Reduce pattern.
Function PriorityFunction
Weight int64
Weight int64
}

// EmptyPriorityMetadataProducer returns a no-op PriorityMetadataProducer type.
Expand Down
37 changes: 7 additions & 30 deletions pkg/scheduler/algorithm_factory.go
Expand Up @@ -81,7 +81,6 @@ type PriorityFunctionFactory2 func(PluginFactoryArgs) (priorities.PriorityMapFun

// PriorityConfigFactory produces a PriorityConfig from the given function and weight
type PriorityConfigFactory struct {
Function PriorityFunctionFactory
MapReduceFunction PriorityFunctionFactory2
Weight int64
}
Expand Down Expand Up @@ -341,19 +340,6 @@ func RegisterPredicateMetadataProducer(producer predicates.PredicateMetadataProd
predicateMetadataProducer = producer
}

// RegisterPriorityFunction registers a priority function with the algorithm registry. Returns the name,
// with which the function was registered.
// DEPRECATED
// Use Map-Reduce pattern for priority functions.
func RegisterPriorityFunction(name string, function priorities.PriorityFunction, weight int) string {
return RegisterPriorityConfigFactory(name, PriorityConfigFactory{
Function: func(PluginFactoryArgs) priorities.PriorityFunction {
return function
},
Weight: int64(weight),
})
}

// RegisterPriorityMapReduceFunction registers a priority function with the algorithm registry. Returns the name,
// with which the function was registered.
func RegisterPriorityMapReduceFunction(
Expand Down Expand Up @@ -449,7 +435,6 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy, args *pl
klog.V(2).Infof("Priority type %s already registered, reusing.", name)
// set/update the weight based on the policy
pcf = &PriorityConfigFactory{
Function: existingPcf.Function,
MapReduceFunction: existingPcf.MapReduceFunction,
Weight: policy.Weight,
}
Expand Down Expand Up @@ -579,21 +564,13 @@ func getPriorityFunctionConfigs(names sets.String, args PluginFactoryArgs) ([]pr
if !ok {
return nil, fmt.Errorf("invalid priority name %s specified - no corresponding function found", name)
}
if factory.Function != nil {
configs = append(configs, priorities.PriorityConfig{
Name: name,
Function: factory.Function(args),
Weight: factory.Weight,
})
} else {
mapFunction, reduceFunction := factory.MapReduceFunction(args)
configs = append(configs, priorities.PriorityConfig{
Name: name,
Map: mapFunction,
Reduce: reduceFunction,
Weight: factory.Weight,
})
}
mapFunction, reduceFunction := factory.MapReduceFunction(args)
configs = append(configs, priorities.PriorityConfig{
Name: name,
Map: mapFunction,
Reduce: reduceFunction,
Weight: factory.Weight,
})
}
if err := validateSelectedConfigs(configs); err != nil {
return nil, err
Expand Down
24 changes: 12 additions & 12 deletions pkg/scheduler/core/extender_test.go
Expand Up @@ -18,6 +18,7 @@ package core

import (
"context"
"errors"
"fmt"
"reflect"
"sort"
Expand All @@ -40,7 +41,6 @@ import (
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/util"
)
Expand Down Expand Up @@ -107,16 +107,16 @@ func machine2PrioritizerExtender(pod *v1.Pod, nodes []*v1.Node) (*framework.Node
return &result, nil
}

func machine2Prioritizer(_ *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
result := []framework.NodeScore{}
for _, node := range nodes {
score := 10
if node.Name == "machine2" {
score = 100
}
result = append(result, framework.NodeScore{Name: node.Name, Score: int64(score)})
func machine2Prioritizer(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, errors.New("node not found")
}
score := 10
if node.Name == "machine2" {
score = 100
}
return result, nil
return framework.NodeScore{Name: node.Name, Score: int64(score)}, nil
}

type FakeExtender struct {
Expand Down Expand Up @@ -457,7 +457,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
},
{
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: machine2Prioritizer, Weight: 20}},
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 20}},
extenders: []FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
Expand All @@ -482,7 +482,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
// because of the errors from errorPredicateExtender and/or
// errorPrioritizerExtender.
predicates: map[string]predicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: machine2Prioritizer, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 1}},
extenders: []FakeExtender{
{
predicates: []fitPredicate{errorPredicateExtender},
Expand Down
24 changes: 1 addition & 23 deletions pkg/scheduler/core/generic_scheduler.go
Expand Up @@ -744,35 +744,13 @@ func (g *genericScheduler) prioritizeNodes(

results := make([]framework.NodeScoreList, len(g.prioritizers))

// DEPRECATED: we can remove this when all priorityConfigs implement the
// Map-Reduce pattern.
for i := range g.prioritizers {
if g.prioritizers[i].Function != nil {
wg.Add(1)
go func(index int) {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_legacy").Inc()
defer func() {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_legacy").Dec()
wg.Done()
}()
var err error
results[index], err = g.prioritizers[index].Function(pod, g.nodeInfoSnapshot, nodes)
if err != nil {
appendError(err)
}
}(i)
} else {
results[i] = make(framework.NodeScoreList, len(nodes))
}
results[i] = make(framework.NodeScoreList, len(nodes))
}

workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name]
for i := range g.prioritizers {
if g.prioritizers[i].Function != nil {
continue
}

var err error
results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo)
if err != nil {
Expand Down
67 changes: 35 additions & 32 deletions pkg/scheduler/core/generic_scheduler_test.go
Expand Up @@ -83,42 +83,35 @@ func hasNoPodsPredicate(pod *v1.Pod, meta algorithmpredicates.PredicateMetadata,
return false, []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}

func numericPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
result := []framework.NodeScore{}
for _, node := range nodes {
score, err := strconv.Atoi(node.Name)
if err != nil {
return nil, err
}
result = append(result, framework.NodeScore{
Name: node.Name,
Score: int64(score),
})
func numericMapPriority(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
score, err := strconv.Atoi(node.Name)
if err != nil {
return framework.NodeScore{}, err
}
return result, nil

return framework.NodeScore{
Name: node.Name,
Score: int64(score),
}, nil
}

func reverseNumericPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) {
func reverseNumericReducePriority(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error {
var maxScore float64
minScore := math.MaxFloat64
reverseResult := []framework.NodeScore{}
result, err := numericPriority(pod, sharedLister, nodes)
if err != nil {
return nil, err
}

for _, hostPriority := range result {
maxScore = math.Max(maxScore, float64(hostPriority.Score))
minScore = math.Min(minScore, float64(hostPriority.Score))
}
for _, hostPriority := range result {
reverseResult = append(reverseResult, framework.NodeScore{
for i, hostPriority := range result {
result[i] = framework.NodeScore{
Name: hostPriority.Name,
Score: int64(maxScore + minScore - float64(hostPriority.Score)),
})
}
}

return reverseResult, nil
return nil
}

func trueMapPriority(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
Expand Down Expand Up @@ -326,7 +319,7 @@ func TestGenericScheduler(t *testing.T) {
},
{
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}},
expectedHosts: sets.NewString("3"),
Expand All @@ -335,16 +328,26 @@ func TestGenericScheduler(t *testing.T) {
},
{
predicates: map[string]algorithmpredicates.FitPredicate{"matches": matchesPredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
expectedHosts: sets.NewString("2"),
name: "test 5",
wErr: nil,
},
{
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}, {Function: reverseNumericPriority, Weight: 2}},
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{
{
Map: numericMapPriority,
Weight: 1,
},
{
Map: numericMapPriority,
Reduce: reverseNumericReducePriority,
Weight: 2,
},
},
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
expectedHosts: sets.NewString("1"),
Expand All @@ -353,7 +356,7 @@ func TestGenericScheduler(t *testing.T) {
},
{
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate, "false": falsePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
name: "test 7",
Expand Down Expand Up @@ -385,7 +388,7 @@ func TestGenericScheduler(t *testing.T) {
},
},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"1", "2"},
name: "test 8",
wErr: &FitError{
Expand Down Expand Up @@ -591,7 +594,7 @@ func TestGenericScheduler(t *testing.T) {
{
name: "test with filter plugin returning Unschedulable status",
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
Expand All @@ -608,7 +611,7 @@ func TestGenericScheduler(t *testing.T) {
{
name: "test with filter plugin returning UnschedulableAndUnresolvable status",
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
Expand All @@ -625,7 +628,7 @@ func TestGenericScheduler(t *testing.T) {
{
name: "test with partial failed filter plugin",
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
Expand Down Expand Up @@ -2156,7 +2159,7 @@ func TestPreempt(t *testing.T) {
internalqueue.NewSchedulingQueue(nil, nil),
map[string]algorithmpredicates.FitPredicate{"matches": predicate},
predMetaProducer,
[]priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
[]priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
priorities.EmptyPriorityMetadataProducer,
emptySnapshot,
emptyFramework,
Expand Down