Skip to content

Commit

Permalink
koord-manager: revise batch resource calculation, add default priorit…
Browse files Browse the repository at this point in the history
…yclass and qosclass, fix noderesource plugin arg

Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed Jul 7, 2023
1 parent b288455 commit a88f244
Show file tree
Hide file tree
Showing 9 changed files with 415 additions and 161 deletions.
4 changes: 0 additions & 4 deletions apis/extension/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ const (
PriorityNone PriorityClass = ""
)

var (
DefaultPriorityClass = PriorityNone
)

// Define Koordinator priority as a variable value to support customizing different priority ranges
var (
PriorityProdValueMax int32 = 9999
Expand Down
45 changes: 45 additions & 0 deletions apis/extension/priority_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package extension

import corev1 "k8s.io/api/core/v1"

// NOTE: functions in this file can be overwritten for extension

var DefaultPriorityClass = PriorityNone

// GetPodPriorityClassWithDefault gets the pod's PriorityClass with the default config.
func GetPodPriorityClassWithDefault(pod *corev1.Pod) PriorityClass {
priorityClass := GetPriorityClass(pod)
if priorityClass != PriorityNone {
return priorityClass
}

return GetPodPriorityClassWithQoS(GetPodQoSClassWithDefault(pod))
}

// GetPodPriorityClassWithQoS returns the default PriorityClass according to its QoSClass when the pod does not specify
// a PriorityClass explicitly.
func GetPodPriorityClassWithQoS(qos QoSClass) PriorityClass {
switch qos {
case QoSSystem, QoSLSE, QoSLSR, QoSLS:
return PriorityProd
case QoSBE:
return PriorityBatch
}
return DefaultPriorityClass
}
44 changes: 43 additions & 1 deletion apis/extension/qos_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,44 @@ limitations under the License.

package extension

import corev1 "k8s.io/api/core/v1"
import (
corev1 "k8s.io/api/core/v1"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
)

// NOTE: functions in this file can be overwritten for extension

// QoSClassForGuaranteed indicates the QoSClass which a Guaranteed Pod without a koordinator QoSClass specified should
// be regarded by default.
// TODO: add component options to customize it.
var QoSClassForGuaranteed = QoSLSR

// GetPodQoSClassWithDefault gets the pod's QoSClass with the default config.
func GetPodQoSClassWithDefault(pod *corev1.Pod) QoSClass {
qosClass := GetPodQoSClass(pod)
if qosClass != QoSNone {
return qosClass
}

return GetPodQoSClassWithKubeQoS(GetKubeQosClass(pod))
}

// GetPodQoSClassWithKubeQoS returns the default QoSClass according to its kubernetes QoSClass when the pod does not
// specify a koordinator QoSClass explicitly.
// https://koordinator.sh/docs/architecture/qos#koordinator-qos-vs-kubernetes-qos
func GetPodQoSClassWithKubeQoS(kubeQOS corev1.PodQOSClass) QoSClass {
switch kubeQOS {
case corev1.PodQOSGuaranteed:
return QoSClassForGuaranteed
case corev1.PodQOSBurstable:
return QoSLS
case corev1.PodQOSBestEffort:
return QoSBE
}
// should never reach here
return QoSNone
}

func GetPodQoSClass(pod *corev1.Pod) QoSClass {
if pod == nil || pod.Labels == nil {
return QoSNone
Expand All @@ -34,3 +68,11 @@ func GetQoSClassByAttrs(labels, annotations map[string]string) QoSClass {
}
return QoSNone
}

func GetKubeQosClass(pod *corev1.Pod) corev1.PodQOSClass {
qosClass := pod.Status.QOSClass
if len(qosClass) > 0 {
return qosClass
}
return v1qos.GetPodQOS(pod)
}
7 changes: 2 additions & 5 deletions pkg/koordlet/resmanager/cgroup_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ func getPodResourceQoSByQoSClass(pod *corev1.Pod, strategy *slov1alpha1.Resource
return nil
}
var resourceQoS *slov1alpha1.ResourceQOS
podQoS := apiext.GetPodQoSClass(pod)
podQoS := apiext.GetPodQoSClassWithDefault(pod)
switch podQoS {
case apiext.QoSLSR:
resourceQoS = strategy.LSRClass
Expand All @@ -561,10 +561,7 @@ func getPodResourceQoSByQoSClass(pod *corev1.Pod, strategy *slov1alpha1.Resource
case apiext.QoSBE:
resourceQoS = strategy.BEClass
default:
// qos=None pods uses config mapped from kubeQoS
resourceQoS = getKubeQoSResourceQoSByQoSClass(util.GetKubeQosClass(pod), strategy, config)
klog.V(6).Infof("get pod ResourceQOS according to kubeQoS for QoS=None pods, pod %s, "+
"resourceQoS %v", util.GetPodKey(pod), util.DumpJSON(resourceQoS))
// should never reach here
}
return resourceQoS
}
12 changes: 6 additions & 6 deletions pkg/slo-controller/noderesource/noderesource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,20 @@ func (r *NodeResourceReconciler) Reconcile(ctx context.Context, req ctrl.Request
}

func InitFlags(fs *flag.FlagSet) {
pflag.StringSliceVar(&NodeResourcePlugins, "noderesourceplugins", NodeResourcePlugins, fmt.Sprintf("A list of noderesource plugins to enable. "+
"'-noderesourceplugins=*' enables all plugins. "+
"'-noderesourceplugins=BatchResource' means only the 'BatchResource' plugin is enabled. "+
"'-noderesourceplugins=*,-BatchResource' means all plugins except the 'BatchResource' plugin are enabled.\n"+
pflag.StringSliceVar(&NodeResourcePlugins, "noderesource-plugins", NodeResourcePlugins, fmt.Sprintf("A list of noderesource plugins to enable. "+
"'-noderesource-plugins=*' enables all plugins. "+
"'-noderesource-plugins=BatchResource' means only the 'BatchResource' plugin is enabled. "+
"'-noderesource-plugins=*,-BatchResource' means all plugins except the 'BatchResource' plugin are enabled.\n"+
"All plugins: %s", strings.Join(NodeResourcePlugins, ", ")))
}

func isPluginEnabled(pluginName string) bool {
hasStar := false
for _, p := range NodeResourcePlugins {
if p == Name {
if p == pluginName {
return true
}
if p == "-"+Name {
if p == "-"+pluginName {
return false
}
if p == "*" {
Expand Down
59 changes: 30 additions & 29 deletions pkg/slo-controller/noderesource/plugins/batchresource/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (p *Plugin) Reset(node *corev1.Node, message string) []framework.ResourceIt
}

// Calculate calculates Batch resources using the formula below:
// Node.Total - Node.Reserved - System.Used - Pod(non-BE).Used, System.Used = Node.Used - Pod(All).Used.
// Node.Total - Node.Reserved - System.Used - Pod(High-Priority).Used, System.Used = Node.Used - Pod(All).Used.
func (p *Plugin) Calculate(strategy *extension.ColocationStrategy, node *corev1.Node, podList *corev1.PodList,
resourceMetrics *framework.ResourceMetrics) ([]framework.ResourceItem, error) {
if strategy == nil || node == nil || podList == nil || resourceMetrics == nil || resourceMetrics.NodeMetric == nil {
Expand All @@ -93,41 +93,43 @@ func (p *Plugin) Calculate(strategy *extension.ColocationStrategy, node *corev1.
"degrade node resource because of abnormal nodeMetric, reason: degradedByBatchResource"), nil
}

// NOTE: currently, non-BE pods are considered as LS, and BE pods are considered using Batch
podLSRequest := util.NewZeroResourceList()
podLSUsed := util.NewZeroResourceList()
// pod(All).Used = pod(LS).Used + pod(BE).Used
// compute the requests and usages according to the pods' priority classes.
// HP means High-Priority (i.e. not Batch or Free) pods
// pod(HP).Used = pod(All).Used - pod(Batch/Free).Used
podAllUsed := util.NewZeroResourceList()
podHPRequest := util.NewZeroResourceList()
podHPUsed := util.NewZeroResourceList()

nodeMetric := resourceMetrics.NodeMetric
podMetricMap := make(map[string]*slov1alpha1.PodMetricInfo)
for _, podMetric := range nodeMetric.Status.PodsMetric {
podMetricMap[util.GetPodMetricKey(podMetric)] = podMetric
}

for _, pod := range podList.Items {
for i := range podList.Items {
pod := &podList.Items[i]
if pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending {
continue
}

qosClass := extension.GetPodQoSClass(&pod)

podRequest := util.GetPodRequest(&pod, corev1.ResourceCPU, corev1.ResourceMemory)
if qosClass != extension.QoSBE {
podLSRequest = quotav1.Add(podLSRequest, podRequest)
priorityClass := extension.GetPodPriorityClassWithDefault(pod)
podRequest := util.GetPodRequest(pod, corev1.ResourceCPU, corev1.ResourceMemory)
isPodHighPriority := priorityClass != extension.PriorityBatch && priorityClass != extension.PriorityFree
if isPodHighPriority {
podHPRequest = quotav1.Add(podHPRequest, podRequest)
}
podKey := util.GetPodKey(&pod)
podKey := util.GetPodKey(pod)
podMetric, ok := podMetricMap[podKey]
if !ok {
if qosClass != extension.QoSBE {
podLSUsed = quotav1.Add(podLSUsed, podRequest)
if isPodHighPriority {
podHPUsed = quotav1.Add(podHPUsed, podRequest)
}
podAllUsed = quotav1.Add(podAllUsed, podRequest)
continue
}

if qosClass != extension.QoSBE {
podLSUsed = quotav1.Add(podLSUsed, getPodMetricUsage(podMetric))
if isPodHighPriority {
podHPUsed = quotav1.Add(podHPUsed, getPodMetricUsage(podMetric))
}
podAllUsed = quotav1.Add(podAllUsed, getPodMetricUsage(podMetric))
}
Expand All @@ -144,8 +146,7 @@ func (p *Plugin) Calculate(strategy *extension.ColocationStrategy, node *corev1.
systemUsed = quotav1.Max(systemUsed, nodeAnnoReserved)

batchAllocatable, cpuMsg, memMsg := calculateBatchResourceByPolicy(strategy, node, nodeAllocatable,
nodeReservation, systemUsed,
podLSRequest, podLSUsed)
nodeReservation, systemUsed, podHPRequest, podHPUsed)

metrics.RecordNodeExtendedResourceAllocatableInternal(node, string(extension.BatchCPU), metrics.UnitInteger, float64(batchAllocatable.Cpu().MilliValue())/1000)
metrics.RecordNodeExtendedResourceAllocatableInternal(node, string(extension.BatchMemory), metrics.UnitByte, float64(batchAllocatable.Memory().Value()))
Expand All @@ -167,31 +168,31 @@ func (p *Plugin) Calculate(strategy *extension.ColocationStrategy, node *corev1.
}

func calculateBatchResourceByPolicy(strategy *extension.ColocationStrategy, node *corev1.Node,
nodeAllocatable, nodeReserve, systemUsed, podLSReq, podLSUsed corev1.ResourceList) (corev1.ResourceList, string, string) {
// Node(BE).Alloc = Node.Total - Node.Reserved - System.Used - Pod(LS).Used
nodeAllocatable, nodeReserve, systemUsed, podHPReq, podHPUsed corev1.ResourceList) (corev1.ResourceList, string, string) {
// Node(Batch).Alloc = Node.Total - Node.Reserved - System.Used - Pod(Prod/Mid).Used
batchAllocatableByUsage := quotav1.Max(quotav1.Subtract(quotav1.Subtract(quotav1.Subtract(
nodeAllocatable, nodeReserve), systemUsed), podLSUsed), util.NewZeroResourceList())
nodeAllocatable, nodeReserve), systemUsed), podHPUsed), util.NewZeroResourceList())

// Node(BE).Alloc = Node.Total - Node.Reserved - Pod(LS).Request
// Node(Batch).Alloc = Node.Total - Node.Reserved - Pod(Prod/Mid).Request
batchAllocatableByRequest := quotav1.Max(quotav1.Subtract(quotav1.Subtract(nodeAllocatable, nodeReserve),
podLSReq), util.NewZeroResourceList())
podHPReq), util.NewZeroResourceList())

batchAllocatable := batchAllocatableByUsage
cpuMsg := fmt.Sprintf("batchAllocatable[CPU(Milli-Core)]:%v = nodeAllocatable:%v - nodeReservation:%v - systemUsage:%v - podLSUsed:%v",
cpuMsg := fmt.Sprintf("batchAllocatable[CPU(Milli-Core)]:%v = nodeAllocatable:%v - nodeReservation:%v - systemUsage:%v - podHPUsed:%v",
batchAllocatable.Cpu().MilliValue(), nodeAllocatable.Cpu().MilliValue(), nodeReserve.Cpu().MilliValue(),
systemUsed.Cpu().MilliValue(), podLSUsed.Cpu().MilliValue())
systemUsed.Cpu().MilliValue(), podHPUsed.Cpu().MilliValue())

var memMsg string
if strategy != nil && strategy.MemoryCalculatePolicy != nil && *strategy.MemoryCalculatePolicy == extension.CalculateByPodRequest {
batchAllocatable[corev1.ResourceMemory] = *batchAllocatableByRequest.Memory()
memMsg = fmt.Sprintf("batchAllocatable[Mem(GB)]:%v = nodeAllocatable:%v - nodeReservation:%v - podLSRequest:%v",
memMsg = fmt.Sprintf("batchAllocatable[Mem(GB)]:%v = nodeAllocatable:%v - nodeReservation:%v - podHPRequest:%v",
batchAllocatable.Memory().ScaledValue(resource.Giga), nodeAllocatable.Memory().ScaledValue(resource.Giga),
nodeReserve.Memory().ScaledValue(resource.Giga), podLSReq.Memory().ScaledValue(resource.Giga))
nodeReserve.Memory().ScaledValue(resource.Giga), podHPReq.Memory().ScaledValue(resource.Giga))
} else { // use CalculatePolicy "usage" by default
memMsg = fmt.Sprintf("batchAllocatable[Mem(GB)]:%v = nodeAllocatable:%v - nodeReservation:%v - systemUsage:%v - podLSUsed:%v",
memMsg = fmt.Sprintf("batchAllocatable[Mem(GB)]:%v = nodeAllocatable:%v - nodeReservation:%v - systemUsage:%v - podHPUsed:%v",
batchAllocatable.Memory().ScaledValue(resource.Giga), nodeAllocatable.Memory().ScaledValue(resource.Giga),
nodeReserve.Memory().ScaledValue(resource.Giga), systemUsed.Memory().ScaledValue(resource.Giga),
podLSUsed.Memory().ScaledValue(resource.Giga))
podHPUsed.Memory().ScaledValue(resource.Giga))
}

return batchAllocatable, cpuMsg, memMsg
Expand Down

0 comments on commit a88f244

Please sign in to comment.