Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koord-controller-manager: noderesource supports configuration plugins #1418

Merged
merged 1 commit into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/koord-manager/options/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@ limitations under the License.
package options

import (
"flag"

"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/koordinator-sh/koordinator/pkg/slo-controller/nodemetric"
"github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource"
"github.com/koordinator-sh/koordinator/pkg/slo-controller/nodeslo"
)

var controllerInitFlags = map[string]func(*flag.FlagSet){
noderesource.Name: noderesource.InitFlags,
}

var controllerAddFuncs = map[string]func(manager.Manager) error{
nodemetric.Name: nodemetric.Add,
noderesource.Name: noderesource.Add,
Expand Down
13 changes: 9 additions & 4 deletions cmd/koord-manager/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ import (
)

type Options struct {
ControllerAddFuncs map[string]func(manager.Manager) error
Controllers []string
ControllerAddFuncs map[string]func(manager.Manager) error
Controllers []string
ControllerInitFlags map[string]func(*flag.FlagSet)
}

func NewOptions() *Options {
return &Options{
ControllerAddFuncs: controllerAddFuncs,
Controllers: sets.StringKeySet(controllerAddFuncs).List(),
ControllerInitFlags: controllerInitFlags,
ControllerAddFuncs: controllerAddFuncs,
Controllers: sets.StringKeySet(controllerAddFuncs).List(),
}
}

Expand All @@ -45,6 +47,9 @@ func (o *Options) InitFlags(fs *flag.FlagSet) {
"'-controllers=noderesource' means only the 'noderesource' controller is enabled. "+
"'-controllers=*,-noderesource' means all controllers except the 'noderesource' controller are enabled.\n"+
"All controllers: %s", strings.Join(o.Controllers, ", ")))
for _, initFlagsFn := range o.ControllerInitFlags {
initFlagsFn(fs)
}
}

func (o *Options) ApplyTo(m manager.Manager) error {
Expand Down
38 changes: 26 additions & 12 deletions pkg/slo-controller/noderesource/framework/extender_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,18 @@ type NodePreparePlugin interface {
Execute(strategy *extension.ColocationStrategy, node *corev1.Node, nr *NodeResource) error
}

func RegisterNodePrepareExtender(plugins ...NodePreparePlugin) {
ps := make([]Plugin, len(plugins))
type FilterFn func(string) bool

var AllPass = func(string) bool {
return true
}

func RegisterNodePrepareExtender(filter FilterFn, plugins ...NodePreparePlugin) {
ps := make([]Plugin, 0, len(plugins))
for i := range plugins {
ps[i] = plugins[i]
if filter(plugins[i].Name()) {
ps = append(ps, plugins[i])
}
}
globalNodePrepareExtender.MustRegister(ps...)
}
Expand Down Expand Up @@ -75,10 +83,12 @@ type NodeSyncPlugin interface {
NeedSync(strategy *extension.ColocationStrategy, oldNode, newNode *corev1.Node) (bool, string)
}

func RegisterNodeSyncExtender(plugins ...NodeSyncPlugin) {
ps := make([]Plugin, len(plugins))
func RegisterNodeSyncExtender(filter FilterFn, plugins ...NodeSyncPlugin) {
ps := make([]Plugin, 0, len(plugins))
for i := range plugins {
ps[i] = plugins[i]
if filter(plugins[i].Name()) {
ps = append(ps, plugins[i])
}
}
globalNodeSyncExtender.MustRegister(ps...)
}
Expand Down Expand Up @@ -108,10 +118,12 @@ type NodeMetaSyncPlugin interface {
NeedSyncMeta(strategy *extension.ColocationStrategy, oldNode, newNode *corev1.Node) (bool, string)
}

func RegisterNodeMetaSyncExtender(plugins ...NodeMetaSyncPlugin) {
ps := make([]Plugin, len(plugins))
func RegisterNodeMetaSyncExtender(filter FilterFn, plugins ...NodeMetaSyncPlugin) {
ps := make([]Plugin, 0, len(plugins))
for i := range plugins {
ps[i] = plugins[i]
if filter(plugins[i].Name()) {
ps = append(ps, plugins[i])
}
}
globalNodeMetaSyncExtender.MustRegister(ps...)
}
Expand Down Expand Up @@ -159,10 +171,12 @@ type ResourceCalculatePlugin interface {
Calculate(strategy *extension.ColocationStrategy, node *corev1.Node, podList *corev1.PodList, metrics *ResourceMetrics) ([]ResourceItem, error)
}

func RegisterResourceCalculateExtender(plugins ...ResourceCalculatePlugin) {
ps := make([]Plugin, len(plugins))
func RegisterResourceCalculateExtender(filter FilterFn, plugins ...ResourceCalculatePlugin) {
ps := make([]Plugin, 0, len(plugins))
for i := range plugins {
ps[i] = plugins[i]
if filter(plugins[i].Name()) {
ps = append(ps, plugins[i])
}
}
globalResourceCalculateExtender.MustRegister(ps...)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func Test_NodePrepareExtender(t *testing.T) {
t.Run("prepare extender", func(t *testing.T) {
extender := &SetNodeAnnotation{}
startedSize := globalNodePrepareExtender.Size()
RegisterNodePrepareExtender(extender)
RegisterNodePrepareExtender(AllPass, extender)
assert.Equal(t, startedSize+1, globalNodePrepareExtender.Size())
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -68,11 +68,11 @@ func Test_RegisterAlreadyExistNodePrepareExtender(t *testing.T) {
extender := &SetNodeAnnotation{}
startedSize := globalNodePrepareExtender.Size()
// register for the first time
RegisterNodePrepareExtender(extender)
RegisterNodePrepareExtender(AllPass, extender)
assert.Equal(t, startedSize+1, globalNodePrepareExtender.Size())
// register duplicated
extender1 := &SetNodeAnnotation{}
RegisterNodePrepareExtender(extender1)
RegisterNodePrepareExtender(AllPass, extender1)
assert.Equal(t, startedSize+1, globalNodePrepareExtender.Size())
UnregisterNodePrepareExtender(extender.Name())
assert.Equal(t, startedSize, globalNodePrepareExtender.Size())
Expand Down Expand Up @@ -125,10 +125,10 @@ func TestNodeSyncPlugin(t *testing.T) {
t.Run("node sync extender", func(t *testing.T) {
plugin := &testNodeResourcePlugin{}
startedSize := globalNodeSyncExtender.Size()
RegisterNodeSyncExtender(plugin)
RegisterNodeSyncExtender(AllPass, plugin)
assert.Equal(t, startedSize+1, globalNodeSyncExtender.Size())

RegisterNodeSyncExtender(plugin)
RegisterNodeSyncExtender(AllPass, plugin)
assert.Equal(t, startedSize+1, globalNodeSyncExtender.Size(), "register duplicated")

assert.NotPanics(t, func() {
Expand All @@ -141,10 +141,10 @@ func TestNodeMetaSyncPlugin(t *testing.T) {
t.Run("node sync extender", func(t *testing.T) {
plugin := &testNodeResourcePlugin{}
startedSize := globalNodeMetaSyncExtender.Size()
RegisterNodeMetaSyncExtender(plugin)
RegisterNodeMetaSyncExtender(AllPass, plugin)
assert.Equal(t, startedSize+1, globalNodeMetaSyncExtender.Size())

RegisterNodeMetaSyncExtender(plugin)
RegisterNodeMetaSyncExtender(AllPass, plugin)
assert.Equal(t, startedSize+1, globalNodeMetaSyncExtender.Size(), "register duplicated")

assert.NotPanics(t, func() {
Expand All @@ -157,10 +157,10 @@ func TestResourceCalculatePlugin(t *testing.T) {
t.Run("resource calculate extender", func(t *testing.T) {
plugin := &testNodeResourcePlugin{}
startedSize := globalResourceCalculateExtender.Size()
RegisterResourceCalculateExtender(plugin)
RegisterResourceCalculateExtender(AllPass, plugin)
assert.Equal(t, startedSize+1, globalResourceCalculateExtender.Size())

RegisterResourceCalculateExtender(plugin)
RegisterResourceCalculateExtender(AllPass, plugin)
assert.Equal(t, startedSize+1, globalResourceCalculateExtender.Size(), "register duplicated")

assert.NotPanics(t, func() {
Expand Down
35 changes: 35 additions & 0 deletions pkg/slo-controller/noderesource/noderesource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ package noderesource

import (
"context"
"flag"
"fmt"
"strings"

"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
Expand All @@ -43,6 +47,10 @@ const (
disableInConfig string = "DisableInConfig"
)

var (
NodeResourcePlugins []string
)

type NodeResourceReconciler struct {
Client client.Client
Recorder record.EventRecorder
Expand Down Expand Up @@ -121,7 +129,34 @@ func (r *NodeResourceReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}

func InitFlags(fs *flag.FlagSet) {
pflag.StringSliceVar(&NodeResourcePlugins, "noderesourceplugins", NodeResourcePlugins, fmt.Sprintf("A list of noderesource plugins to enable. "+
"'-noderesourceplugins=*' enables all plugins. "+
"'-noderesourceplugins=BatchResource' means only the 'BatchResource' plugin is enabled. "+
"'-noderesourceplugins=*,-BatchResource' means all plugins except the 'BatchResource' plugin are enabled.\n"+
"All plugins: %s", strings.Join(NodeResourcePlugins, ", ")))
}

func isPluginEnabled(pluginName string) bool {
hasStar := false
for _, p := range NodeResourcePlugins {
if p == Name {
return true
}
if p == "-"+Name {
return false
}
if p == "*" {
hasStar = true
}
}
return hasStar
}

func Add(mgr ctrl.Manager) error {
// init plugins for NodeResource
addPlugins(isPluginEnabled)

reconciler := &NodeResourceReconciler{
Recorder: mgr.GetEventRecorderFor("noderesource-controller"),
Client: mgr.GetClient(),
Expand Down
12 changes: 6 additions & 6 deletions pkg/slo-controller/noderesource/plugins/midresource/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func (p *Plugin) Reset(node *corev1.Node, message string) []framework.ResourceIt
return items
}

// Calculate calculates Batch resources using the formula below:
// Node.Total - Node.Reserved - System.Used - Pod(non-BE).Used, System.Used = Node.Used - Pod(All).Used.
// Calculate calculates Mid resources using the formula below:
// min(ProdReclaimable, NodeAllocable * MidThresholdRatio).
func (p *Plugin) Calculate(strategy *extension.ColocationStrategy, node *corev1.Node, podList *corev1.PodList,
metrics *framework.ResourceMetrics) ([]framework.ResourceItem, error) {
if strategy == nil || node == nil || node.Status.Allocatable == nil || podList == nil ||
Expand Down Expand Up @@ -143,7 +143,7 @@ func (p *Plugin) calculate(strategy *extension.ColocationStrategy, node *corev1.
node.Name, allocatableMilliCPU)
allocatableMilliCPU = 0
}
cpu := resource.NewQuantity(allocatableMilliCPU, resource.DecimalSI)
cpuInMilliCores := resource.NewQuantity(allocatableMilliCPU, resource.DecimalSI)

memThresholdRatio := 1.0
if strategy != nil && strategy.MidMemoryThresholdPercent != nil {
Expand All @@ -160,14 +160,14 @@ func (p *Plugin) calculate(strategy *extension.ColocationStrategy, node *corev1.
memory := resource.NewQuantity(allocatableMemory, resource.BinarySI)

klog.V(6).Infof("calculated mid allocatable for node %s, cpu(milli-core) %v, memory(byte) %v",
node.Name, cpu.String(), memory.String())
node.Name, cpuInMilliCores.String(), memory.String())

return []framework.ResourceItem{
{
Name: extension.MidCPU,
Quantity: cpu, // in milli-cores
Quantity: cpuInMilliCores, // in milli-cores
Message: fmt.Sprintf("midAllocatable[CPU(milli-core)]:%v = min(nodeAllocatable:%v * thresholdRatio:%v, ProdReclaimable:%v)",
cpu.Value(), nodeAllocatable.Cpu().MilliValue(), cpuThresholdRatio, prodReclaimable.Cpu().MilliValue()),
cpuInMilliCores.Value(), nodeAllocatable.Cpu().MilliValue(), cpuThresholdRatio, prodReclaimable.Cpu().MilliValue()),
},
{
Name: extension.MidMemory,
Expand Down
14 changes: 10 additions & 4 deletions pkg/slo-controller/noderesource/plugins_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,17 @@ import (
// NOTE: functions in this file can be overwritten for extension

func init() {
// set default plugins
NodeResourcePlugins = append(NodeResourcePlugins, midresource.PluginName)
NodeResourcePlugins = append(NodeResourcePlugins, batchresource.PluginName)
}

func addPlugins(filter func(string) bool) {
// NOTE: plugins run in order of the registration.
framework.RegisterNodePrepareExtender(nodePreparePlugins...)
framework.RegisterNodeSyncExtender(nodeSyncPlugins...)
framework.RegisterNodeMetaSyncExtender(nodeMetaSyncPlugins...)
framework.RegisterResourceCalculateExtender(resourceCalculatePlugins...)
framework.RegisterNodePrepareExtender(filter, nodePreparePlugins...)
framework.RegisterNodeSyncExtender(filter, nodeSyncPlugins...)
framework.RegisterNodeMetaSyncExtender(filter, nodeMetaSyncPlugins...)
framework.RegisterResourceCalculateExtender(filter, resourceCalculatePlugins...)
}

var (
Expand Down
9 changes: 7 additions & 2 deletions pkg/slo-controller/noderesource/resource_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import (
"github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource/framework"
)

func init() {
addPlugins(framework.AllPass)
}

type FakeCfgCache struct {
cfg extension.ColocationCfg
available bool
Expand Down Expand Up @@ -1005,6 +1009,7 @@ func Test_calculateNodeResource(t *testing.T) {
}...),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
memoryCalculateByReq := extension.CalculateByPodRequest
Expand Down Expand Up @@ -1738,7 +1743,7 @@ func Test_updateNodeResource(t *testing.T) {
}
oldNodeCopy := tt.args.oldNode.DeepCopy()
if len(tt.fields.prepareNodeMetaSyncPlugin) > 0 {
framework.RegisterNodeMetaSyncExtender(tt.fields.prepareNodeMetaSyncPlugin...)
framework.RegisterNodeMetaSyncExtender(framework.AllPass, tt.fields.prepareNodeMetaSyncPlugin...)
defer func() {
for _, p := range tt.fields.prepareNodeMetaSyncPlugin {
framework.UnregisterNodeMetaSyncExtender(p.Name())
Expand Down Expand Up @@ -2061,7 +2066,7 @@ func Test_isNodeResourceSyncNeeded(t *testing.T) {
Clock: clock.RealClock{},
}
if len(tt.fields.prepareNodeMetaSyncPlugin) > 0 {
framework.RegisterNodeMetaSyncExtender(tt.fields.prepareNodeMetaSyncPlugin...)
framework.RegisterNodeMetaSyncExtender(framework.AllPass, tt.fields.prepareNodeMetaSyncPlugin...)
defer func() {
for _, p := range tt.fields.prepareNodeMetaSyncPlugin {
framework.UnregisterNodeMetaSyncExtender(p.Name())
Expand Down