Skip to content

Commit

Permalink
RequestedToCapacityRatio as score plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
mrkm4ntr committed Oct 30, 2019
1 parent f8b45a1 commit 4b72af9
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 21 deletions.
1 change: 1 addition & 0 deletions pkg/scheduler/BUILD
Expand Up @@ -23,6 +23,7 @@ go_library(
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
"//pkg/scheduler/framework/plugins/requestedtocapacityratio:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/debugger:go_default_library",
Expand Down
20 changes: 14 additions & 6 deletions pkg/scheduler/algorithm_factory.go
Expand Up @@ -34,6 +34,7 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/requestedtocapacityratio"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
Expand Down Expand Up @@ -372,8 +373,9 @@ func RegisterPriorityConfigFactory(name string, pcf PriorityConfigFactory) strin

// RegisterCustomPriorityFunction registers a custom priority function with the algorithm registry.
// Returns the name, with which the priority function was registered.
func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy, args *plugins.ConfigProducerArgs) string {
var pcf *PriorityConfigFactory
name := policy.Name

validatePriorityOrDie(policy)

Expand Down Expand Up @@ -401,17 +403,23 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
Weight: policy.Weight,
}
} else if policy.Argument.RequestedToCapacityRatioArguments != nil {
scoringFunctionShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(policy.Argument.RequestedToCapacityRatioArguments)
args.RequestedToCapacityRatioArgs = &requestedtocapacityratio.Args{
FunctionShape: scoringFunctionShape,
ResourceToWeightMap: resources,
}
pcf = &PriorityConfigFactory{
MapReduceFunction: func(args PluginFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) {
scoringFunctionShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(policy.Argument.RequestedToCapacityRatioArguments)
p := priorities.RequestedToCapacityRatioResourceAllocationPriority(scoringFunctionShape, resources)
return p.PriorityMap, nil
},
Weight: policy.Weight,
}
// We do not allow specifying the name for custom plugins, see #83472
name = requestedtocapacityratio.Name
}
} else if existingPcf, ok := priorityFunctionMap[policy.Name]; ok {
klog.V(2).Infof("Priority type %s already registered, reusing.", policy.Name)
} else if existingPcf, ok := priorityFunctionMap[name]; ok {
klog.V(2).Infof("Priority type %s already registered, reusing.", name)
// set/update the weight based on the policy
pcf = &PriorityConfigFactory{
Function: existingPcf.Function,
Expand All @@ -421,10 +429,10 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
}

if pcf == nil {
klog.Fatalf("Invalid configuration: Priority type not found for %s", policy.Name)
klog.Fatalf("Invalid configuration: Priority type not found for %s", name)
}

return RegisterPriorityConfigFactory(policy.Name, *pcf)
return RegisterPriorityConfigFactory(name, *pcf)
}

func buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(arguments *schedulerapi.RequestedToCapacityRatioArguments) (priorities.FunctionShape, priorities.ResourceToWeightMap) {
Expand Down
9 changes: 5 additions & 4 deletions pkg/scheduler/api/compatibility/compatibility_test.go
Expand Up @@ -788,7 +788,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"EqualPriority",
"SelectorSpreadPriority",
"InterPodAffinityPriority",
"RequestedToCapacityRatioPriority",
),
wantPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": {
Expand All @@ -814,6 +813,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesMostAllocated", Weight: 2},
{Name: "NodeAffinity", Weight: 2},
{Name: "NodePreferAvoidPods", Weight: 2},
{Name: "RequestedToCapacityRatio", Weight: 2},
{Name: "TaintToleration", Weight: 2},
},
},
Expand Down Expand Up @@ -896,7 +896,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"EqualPriority",
"SelectorSpreadPriority",
"InterPodAffinityPriority",
"RequestedToCapacityRatioPriority",
),
wantPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": {
Expand All @@ -923,6 +922,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesMostAllocated", Weight: 2},
{Name: "NodeAffinity", Weight: 2},
{Name: "NodePreferAvoidPods", Weight: 2},
{Name: "RequestedToCapacityRatio", Weight: 2},
{Name: "TaintToleration", Weight: 2},
},
},
Expand Down Expand Up @@ -1004,7 +1004,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"EqualPriority",
"SelectorSpreadPriority",
"InterPodAffinityPriority",
"RequestedToCapacityRatioPriority",
),
wantPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": {
Expand Down Expand Up @@ -1032,6 +1031,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesMostAllocated", Weight: 2},
{Name: "NodeAffinity", Weight: 2},
{Name: "NodePreferAvoidPods", Weight: 2},
{Name: "RequestedToCapacityRatio", Weight: 2},
{Name: "TaintToleration", Weight: 2},
},
},
Expand Down Expand Up @@ -1117,7 +1117,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"EqualPriority",
"SelectorSpreadPriority",
"InterPodAffinityPriority",
"RequestedToCapacityRatioPriority",
),
wantPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": {
Expand Down Expand Up @@ -1145,6 +1144,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
{Name: "NodeResourcesMostAllocated", Weight: 2},
{Name: "NodeAffinity", Weight: 2},
{Name: "NodePreferAvoidPods", Weight: 2},
{Name: "RequestedToCapacityRatio", Weight: 2},
{Name: "TaintToleration", Weight: 2},
},
},
Expand Down Expand Up @@ -1194,6 +1194,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"NodeResourcesLeastAllocated": "LeastRequestedPriority",
"NodeResourcesBalancedAllocation": "BalancedResourceAllocation",
"NodeResourcesMostAllocated": "MostRequestedPriority",
"RequestedToCapacityRatio": "RequestedToCapacityRatioPriority",
}

for v, tc := range schedulerFiles {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/factory.go
Expand Up @@ -341,7 +341,7 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, er
} else {
for _, priority := range policy.Priorities {
klog.V(2).Infof("Registering priority: %s", priority.Name)
priorityKeys.Insert(RegisterCustomPriorityFunction(priority))
priorityKeys.Insert(RegisterCustomPriorityFunction(priority, c.configProducerArgs))
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/framework/plugins/BUILD
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library",
"//pkg/scheduler/framework/plugins/nodevolumelimits:go_default_library",
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/requestedtocapacityratio:go_default_library",
"//pkg/scheduler/framework/plugins/tainttoleration:go_default_library",
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
"//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library",
Expand Down Expand Up @@ -55,6 +56,7 @@ filegroup(
"//pkg/scheduler/framework/plugins/nodeunschedulable:all-srcs",
"//pkg/scheduler/framework/plugins/nodevolumelimits:all-srcs",
"//pkg/scheduler/framework/plugins/podtopologyspread:all-srcs",
"//pkg/scheduler/framework/plugins/requestedtocapacityratio:all-srcs",
"//pkg/scheduler/framework/plugins/tainttoleration:all-srcs",
"//pkg/scheduler/framework/plugins/volumebinding:all-srcs",
"//pkg/scheduler/framework/plugins/volumerestrictions:all-srcs",
Expand Down
35 changes: 25 additions & 10 deletions pkg/scheduler/framework/plugins/default_registry.go
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/requestedtocapacityratio"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
Expand Down Expand Up @@ -78,6 +79,7 @@ func NewDefaultRegistry(args *RegistryArgs) framework.Registry {
nodevolumelimits.CinderName: nodevolumelimits.NewCinder,
interpodaffinity.Name: interpodaffinity.New,
nodelabel.Name: nodelabel.New,
requestedtocapacityratio.Name: requestedtocapacityratio.New,
}
}

Expand All @@ -89,6 +91,8 @@ type ConfigProducerArgs struct {
Weight int32
// NodeLabelArgs is the args for the NodeLabel plugin.
NodeLabelArgs *nodelabel.Args
// RequestedToCapacityRatioArgs is the args for the RequestedToCapacityRatio plugin.
RequestedToCapacityRatioArgs *requestedtocapacityratio.Args
}

// ConfigProducer produces a framework's configuration.
Expand Down Expand Up @@ -200,16 +204,7 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry {
registry.RegisterPredicate(nodelabel.Name,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodelabel.Name, nil)
encoding, err := json.Marshal(args.NodeLabelArgs)
if err != nil {
klog.Fatalf("Failed to marshal %+v", args.NodeLabelArgs)
return
}
config := config.PluginConfig{
Name: nodelabel.Name,
Args: runtime.Unknown{Raw: encoding},
}
pluginConfig = append(pluginConfig, config)
pluginConfig = append(pluginConfig, makePluginConfig(nodelabel.Name, args.NodeLabelArgs))
return
})

Expand Down Expand Up @@ -253,6 +248,13 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry {
return
})

registry.RegisterPriority(requestedtocapacityratio.Name,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, requestedtocapacityratio.Name, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(requestedtocapacityratio.Name, args.RequestedToCapacityRatioArgs))
return
})

return registry
}

Expand Down Expand Up @@ -285,3 +287,16 @@ func appendToPluginSet(set *config.PluginSet, name string, weight *int32) *confi
set.Enabled = append(set.Enabled, cfg)
return set
}

func makePluginConfig(pluginName string, args interface{}) config.PluginConfig {
encoding, err := json.Marshal(args)
if err != nil {
klog.Fatal(fmt.Errorf("Failed to marshal %+v: %v", args, err))
return config.PluginConfig{}
}
config := config.PluginConfig{
Name: pluginName,
Args: runtime.Unknown{Raw: encoding},
}
return config
}
43 changes: 43 additions & 0 deletions pkg/scheduler/framework/plugins/requestedtocapacityratio/BUILD
@@ -0,0 +1,43 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["requested_to_capacity_ratio.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/requestedtocapacityratio",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["requested_to_capacity_ratio_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
@@ -0,0 +1,80 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package requestedtocapacityratio

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)

// Name of this plugin.
const Name = "RequestedToCapacityRatio"

// Args holds the args that are used to configure the plugin.
type Args struct {
FunctionShape priorities.FunctionShape
ResourceToWeightMap priorities.ResourceToWeightMap
}

// New initializes a new plugin and returns it.
func New(plArgs *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
args := &Args{}
if err := framework.DecodeInto(plArgs, args); err != nil {
return nil, err
}
p := priorities.RequestedToCapacityRatioResourceAllocationPriority(args.FunctionShape, args.ResourceToWeightMap)
return &RequestedToCapacityRatio{
handle: handle,
prioritize: p.PriorityMap,
}, nil
}

// RequestedToCapacityRatio is a score plugin that allow users to apply bin packing
// on core resources like CPU, Memory as well as extended resources like accelerators.
type RequestedToCapacityRatio struct {
handle framework.FrameworkHandle
prioritize priorities.PriorityMapFunction
}

var _ framework.ScorePlugin = &RequestedToCapacityRatio{}

// Name returns name of the plugin. It is used in logs, etc.
func (pl *RequestedToCapacityRatio) Name() string {
return Name
}

// Score invoked at the score extension point.
func (pl *RequestedToCapacityRatio) Score(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
// Note that RequestedToCapacityRatioPriority doesn't use priority metadata, hence passing nil here.
s, err := pl.prioritize(pod, nil, nodeInfo)
return s.Score, migration.ErrorToFrameworkStatus(err)
}

// ScoreExtensions of the Score plugin.
func (pl *RequestedToCapacityRatio) ScoreExtensions() framework.ScoreExtensions {
return nil
}

0 comments on commit 4b72af9

Please sign in to comment.