diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index e0a45e9ce16e..0dd1062eb81b 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -23,6 +23,7 @@ go_library( "//pkg/scheduler/core:go_default_library", "//pkg/scheduler/framework/plugins:go_default_library", "//pkg/scheduler/framework/plugins/nodelabel:go_default_library", + "//pkg/scheduler/framework/plugins/requestedtocapacityratio:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache/debugger:go_default_library", diff --git a/pkg/scheduler/algorithm_factory.go b/pkg/scheduler/algorithm_factory.go index 8666a631d244..85890523bb05 100644 --- a/pkg/scheduler/algorithm_factory.go +++ b/pkg/scheduler/algorithm_factory.go @@ -34,6 +34,7 @@ import ( schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/framework/plugins" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/requestedtocapacityratio" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" "k8s.io/kubernetes/pkg/scheduler/volumebinder" @@ -372,8 +373,9 @@ func RegisterPriorityConfigFactory(name string, pcf PriorityConfigFactory) strin // RegisterCustomPriorityFunction registers a custom priority function with the algorithm registry. // Returns the name, with which the priority function was registered. -func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string { +func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy, args *plugins.ConfigProducerArgs) string { var pcf *PriorityConfigFactory + name := policy.Name validatePriorityOrDie(policy) @@ -401,17 +403,23 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string { Weight: policy.Weight, } } else if policy.Argument.RequestedToCapacityRatioArguments != nil { + scoringFunctionShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(policy.Argument.RequestedToCapacityRatioArguments) + args.RequestedToCapacityRatioArgs = &requestedtocapacityratio.Args{ + FunctionShape: scoringFunctionShape, + ResourceToWeightMap: resources, + } pcf = &PriorityConfigFactory{ MapReduceFunction: func(args PluginFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) { - scoringFunctionShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(policy.Argument.RequestedToCapacityRatioArguments) p := priorities.RequestedToCapacityRatioResourceAllocationPriority(scoringFunctionShape, resources) return p.PriorityMap, nil }, Weight: policy.Weight, } + // We do not allow specifying the name for custom plugins, see #83472 + name = requestedtocapacityratio.Name } - } else if existingPcf, ok := priorityFunctionMap[policy.Name]; ok { - klog.V(2).Infof("Priority type %s already registered, reusing.", policy.Name) + } else if existingPcf, ok := priorityFunctionMap[name]; ok { + klog.V(2).Infof("Priority type %s already registered, reusing.", name) // set/update the weight based on the policy pcf = &PriorityConfigFactory{ Function: existingPcf.Function, @@ -421,10 +429,10 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string { } if pcf == nil { - klog.Fatalf("Invalid configuration: Priority type not found for %s", policy.Name) + klog.Fatalf("Invalid configuration: Priority type not found for %s", name) } - return RegisterPriorityConfigFactory(policy.Name, *pcf) + return RegisterPriorityConfigFactory(name, *pcf) } func buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(arguments *schedulerapi.RequestedToCapacityRatioArguments) (priorities.FunctionShape, priorities.ResourceToWeightMap) { diff --git a/pkg/scheduler/api/compatibility/compatibility_test.go b/pkg/scheduler/api/compatibility/compatibility_test.go index 09614a8170f2..2d2defcd662e 100644 --- a/pkg/scheduler/api/compatibility/compatibility_test.go +++ b/pkg/scheduler/api/compatibility/compatibility_test.go @@ -788,7 +788,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "EqualPriority", "SelectorSpreadPriority", "InterPodAffinityPriority", - "RequestedToCapacityRatioPriority", ), wantPlugins: map[string][]kubeschedulerconfig.Plugin{ "FilterPlugin": { @@ -814,6 +813,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesMostAllocated", Weight: 2}, {Name: "NodeAffinity", Weight: 2}, {Name: "NodePreferAvoidPods", Weight: 2}, + {Name: "RequestedToCapacityRatio", Weight: 2}, {Name: "TaintToleration", Weight: 2}, }, }, @@ -896,7 +896,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "EqualPriority", "SelectorSpreadPriority", "InterPodAffinityPriority", - "RequestedToCapacityRatioPriority", ), wantPlugins: map[string][]kubeschedulerconfig.Plugin{ "FilterPlugin": { @@ -923,6 +922,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesMostAllocated", Weight: 2}, {Name: "NodeAffinity", Weight: 2}, {Name: "NodePreferAvoidPods", Weight: 2}, + {Name: "RequestedToCapacityRatio", Weight: 2}, {Name: "TaintToleration", Weight: 2}, }, }, @@ -1004,7 +1004,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "EqualPriority", "SelectorSpreadPriority", "InterPodAffinityPriority", - "RequestedToCapacityRatioPriority", ), wantPlugins: map[string][]kubeschedulerconfig.Plugin{ "FilterPlugin": { @@ -1032,6 +1031,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesMostAllocated", Weight: 2}, {Name: "NodeAffinity", Weight: 2}, {Name: "NodePreferAvoidPods", Weight: 2}, + {Name: "RequestedToCapacityRatio", Weight: 2}, {Name: "TaintToleration", Weight: 2}, }, }, @@ -1117,7 +1117,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "EqualPriority", "SelectorSpreadPriority", "InterPodAffinityPriority", - "RequestedToCapacityRatioPriority", ), wantPlugins: map[string][]kubeschedulerconfig.Plugin{ "FilterPlugin": { @@ -1145,6 +1144,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "NodeResourcesMostAllocated", Weight: 2}, {Name: "NodeAffinity", Weight: 2}, {Name: "NodePreferAvoidPods", Weight: 2}, + {Name: "RequestedToCapacityRatio", Weight: 2}, {Name: "TaintToleration", Weight: 2}, }, }, @@ -1194,6 +1194,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "NodeResourcesLeastAllocated": "LeastRequestedPriority", "NodeResourcesBalancedAllocation": "BalancedResourceAllocation", "NodeResourcesMostAllocated": "MostRequestedPriority", + "RequestedToCapacityRatio": "RequestedToCapacityRatioPriority", } for v, tc := range schedulerFiles { diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index c28f4abb3bfc..623d4a57de66 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -341,7 +341,7 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, er } else { for _, priority := range policy.Priorities { klog.V(2).Infof("Registering priority: %s", priority.Name) - priorityKeys.Insert(RegisterCustomPriorityFunction(priority)) + priorityKeys.Insert(RegisterCustomPriorityFunction(priority, c.configProducerArgs)) } } diff --git a/pkg/scheduler/framework/plugins/BUILD b/pkg/scheduler/framework/plugins/BUILD index 9a30d1535e11..457637ad657d 100644 --- a/pkg/scheduler/framework/plugins/BUILD +++ b/pkg/scheduler/framework/plugins/BUILD @@ -20,6 +20,7 @@ go_library( "//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library", "//pkg/scheduler/framework/plugins/nodevolumelimits:go_default_library", "//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library", + "//pkg/scheduler/framework/plugins/requestedtocapacityratio:go_default_library", "//pkg/scheduler/framework/plugins/tainttoleration:go_default_library", "//pkg/scheduler/framework/plugins/volumebinding:go_default_library", "//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library", @@ -55,6 +56,7 @@ filegroup( "//pkg/scheduler/framework/plugins/nodeunschedulable:all-srcs", "//pkg/scheduler/framework/plugins/nodevolumelimits:all-srcs", "//pkg/scheduler/framework/plugins/podtopologyspread:all-srcs", + "//pkg/scheduler/framework/plugins/requestedtocapacityratio:all-srcs", "//pkg/scheduler/framework/plugins/tainttoleration:all-srcs", "//pkg/scheduler/framework/plugins/volumebinding:all-srcs", "//pkg/scheduler/framework/plugins/volumerestrictions:all-srcs", diff --git a/pkg/scheduler/framework/plugins/default_registry.go b/pkg/scheduler/framework/plugins/default_registry.go index 8066afb4e2e0..ec2e87ef34a5 100644 --- a/pkg/scheduler/framework/plugins/default_registry.go +++ b/pkg/scheduler/framework/plugins/default_registry.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/requestedtocapacityratio" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions" @@ -78,6 +79,7 @@ func NewDefaultRegistry(args *RegistryArgs) framework.Registry { nodevolumelimits.CinderName: nodevolumelimits.NewCinder, interpodaffinity.Name: interpodaffinity.New, nodelabel.Name: nodelabel.New, + requestedtocapacityratio.Name: requestedtocapacityratio.New, } } @@ -89,6 +91,8 @@ type ConfigProducerArgs struct { Weight int32 // NodeLabelArgs is the args for the NodeLabel plugin. NodeLabelArgs *nodelabel.Args + // RequestedToCapacityRatioArgs is the args for the RequestedToCapacityRatio plugin. + RequestedToCapacityRatioArgs *requestedtocapacityratio.Args } // ConfigProducer produces a framework's configuration. @@ -200,16 +204,7 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry { registry.RegisterPredicate(nodelabel.Name, func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { plugins.Filter = appendToPluginSet(plugins.Filter, nodelabel.Name, nil) - encoding, err := json.Marshal(args.NodeLabelArgs) - if err != nil { - klog.Fatalf("Failed to marshal %+v", args.NodeLabelArgs) - return - } - config := config.PluginConfig{ - Name: nodelabel.Name, - Args: runtime.Unknown{Raw: encoding}, - } - pluginConfig = append(pluginConfig, config) + pluginConfig = append(pluginConfig, makePluginConfig(nodelabel.Name, args.NodeLabelArgs)) return }) @@ -253,6 +248,13 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry { return }) + registry.RegisterPriority(requestedtocapacityratio.Name, + func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { + plugins.Score = appendToPluginSet(plugins.Score, requestedtocapacityratio.Name, &args.Weight) + pluginConfig = append(pluginConfig, makePluginConfig(requestedtocapacityratio.Name, args.RequestedToCapacityRatioArgs)) + return + }) + return registry } @@ -285,3 +287,16 @@ func appendToPluginSet(set *config.PluginSet, name string, weight *int32) *confi set.Enabled = append(set.Enabled, cfg) return set } + +func makePluginConfig(pluginName string, args interface{}) config.PluginConfig { + encoding, err := json.Marshal(args) + if err != nil { + klog.Fatal(fmt.Errorf("Failed to marshal %+v: %v", args, err)) + return config.PluginConfig{} + } + config := config.PluginConfig{ + Name: pluginName, + Args: runtime.Unknown{Raw: encoding}, + } + return config +} diff --git a/pkg/scheduler/framework/plugins/requestedtocapacityratio/BUILD b/pkg/scheduler/framework/plugins/requestedtocapacityratio/BUILD new file mode 100644 index 000000000000..d4c9f2f88917 --- /dev/null +++ b/pkg/scheduler/framework/plugins/requestedtocapacityratio/BUILD @@ -0,0 +1,43 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["requested_to_capacity_ratio.go"], + importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/requestedtocapacityratio", + visibility = ["//visibility:public"], + deps = [ + "//pkg/scheduler/algorithm/priorities:go_default_library", + "//pkg/scheduler/framework/plugins/migration:go_default_library", + "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["requested_to_capacity_ratio_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//pkg/scheduler/nodeinfo/snapshot:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/scheduler/framework/plugins/requestedtocapacityratio/requested_to_capacity_ratio.go b/pkg/scheduler/framework/plugins/requestedtocapacityratio/requested_to_capacity_ratio.go new file mode 100644 index 000000000000..35568aef4424 --- /dev/null +++ b/pkg/scheduler/framework/plugins/requestedtocapacityratio/requested_to_capacity_ratio.go @@ -0,0 +1,80 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package requestedtocapacityratio + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" +) + +// Name of this plugin. +const Name = "RequestedToCapacityRatio" + +// Args holds the args that are used to configure the plugin. +type Args struct { + FunctionShape priorities.FunctionShape + ResourceToWeightMap priorities.ResourceToWeightMap +} + +// New initializes a new plugin and returns it. +func New(plArgs *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) { + args := &Args{} + if err := framework.DecodeInto(plArgs, args); err != nil { + return nil, err + } + p := priorities.RequestedToCapacityRatioResourceAllocationPriority(args.FunctionShape, args.ResourceToWeightMap) + return &RequestedToCapacityRatio{ + handle: handle, + prioritize: p.PriorityMap, + }, nil +} + +// RequestedToCapacityRatio is a score plugin that allow users to apply bin packing +// on core resources like CPU, Memory as well as extended resources like accelerators. +type RequestedToCapacityRatio struct { + handle framework.FrameworkHandle + prioritize priorities.PriorityMapFunction +} + +var _ framework.ScorePlugin = &RequestedToCapacityRatio{} + +// Name returns name of the plugin. It is used in logs, etc. +func (pl *RequestedToCapacityRatio) Name() string { + return Name +} + +// Score invoked at the score extension point. +func (pl *RequestedToCapacityRatio) Score(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + // Note that RequestedToCapacityRatioPriority doesn't use priority metadata, hence passing nil here. + s, err := pl.prioritize(pod, nil, nodeInfo) + return s.Score, migration.ErrorToFrameworkStatus(err) +} + +// ScoreExtensions of the Score plugin. +func (pl *RequestedToCapacityRatio) ScoreExtensions() framework.ScoreExtensions { + return nil +} diff --git a/pkg/scheduler/framework/plugins/requestedtocapacityratio/requested_to_capacity_ratio_test.go b/pkg/scheduler/framework/plugins/requestedtocapacityratio/requested_to_capacity_ratio_test.go new file mode 100644 index 000000000000..860ae2661917 --- /dev/null +++ b/pkg/scheduler/framework/plugins/requestedtocapacityratio/requested_to_capacity_ratio_test.go @@ -0,0 +1,121 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package requestedtocapacityratio + +import ( + "context" + "reflect" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" +) + +func TestRequestedToCapacityRatio(t *testing.T) { + type test struct { + name string + requestedPod *v1.Pod + nodes []*v1.Node + scheduledPods []*v1.Pod + expectedPriorities framework.NodeScoreList + } + + tests := []test{ + { + name: "nothing scheduled, nothing requested (default - least requested nodes have priority)", + requestedPod: makePod("", 0, 0), + nodes: []*v1.Node{makeNode("node1", 4000, 10000), makeNode("node2", 4000, 10000)}, + scheduledPods: []*v1.Pod{makePod("node1", 0, 0), makePod("node2", 0, 0)}, + expectedPriorities: []framework.NodeScore{{Name: "node1", Score: 100}, {Name: "node2", Score: 100}}, + }, + { + name: "nothing scheduled, resources requested, differently sized machines (default - least requested nodes have priority)", + requestedPod: makePod("", 3000, 5000), + nodes: []*v1.Node{makeNode("node1", 4000, 10000), makeNode("node2", 6000, 10000)}, + scheduledPods: []*v1.Pod{makePod("node1", 0, 0), makePod("node2", 0, 0)}, + expectedPriorities: []framework.NodeScore{{Name: "node1", Score: 38}, {Name: "node2", Score: 50}}, + }, + { + name: "no resources requested, pods scheduled with resources (default - least requested nodes have priority)", + requestedPod: makePod("", 0, 0), + nodes: []*v1.Node{makeNode("node1", 4000, 10000), makeNode("node2", 6000, 10000)}, + scheduledPods: []*v1.Pod{makePod("node1", 3000, 5000), makePod("node2", 3000, 5000)}, + expectedPriorities: []framework.NodeScore{{Name: "node1", Score: 38}, {Name: "node2", Score: 50}}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + state := framework.NewCycleState() + snapshot := nodeinfosnapshot.NewSnapshot(test.scheduledPods, test.nodes) + fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot)) + args := &runtime.Unknown{Raw: []byte(`{"FunctionShape" : [{"Utilization" : 0, "Score" : 100}, {"Utilization" : 100, "Score" : 0}], "ResourceToWeightMap" : {"memory" : 1, "cpu" : 1}}`)} + p, _ := New(args, fh) + + var gotPriorities framework.NodeScoreList + for _, n := range test.nodes { + score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.requestedPod, n.Name) + if !status.IsSuccess() { + t.Errorf("unexpected error: %v", status) + } + gotPriorities = append(gotPriorities, framework.NodeScore{Name: n.Name, Score: score}) + } + + if !reflect.DeepEqual(test.expectedPriorities, gotPriorities) { + t.Errorf("expected:\n\t%+v,\ngot:\n\t%+v", test.expectedPriorities, gotPriorities) + } + }) + } +} + +func makeNode(name string, milliCPU, memory int64) *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI), + }, + }, + } +} + +func makePod(node string, milliCPU, memory int64) *v1.Pod { + return &v1.Pod{ + Spec: v1.PodSpec{ + NodeName: node, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(memory, resource.DecimalSI), + }, + }, + }, + }, + }, + } +}