Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koordlet: add mid runtimehooks #1984

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/koordlet/runtimehooks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
"k8s.io/component-base/featuregate"

"github.com/koordinator-sh/koordinator/pkg/features"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/batchresource"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/coresched"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/cpunormalization"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/cpuset"
extendedresource "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/extendedresource"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/groupidentity"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/terwayqos"
Expand Down Expand Up @@ -97,7 +97,7 @@ var (
GroupIdentity: groupidentity.Object(),
CPUSetAllocator: cpuset.Object(),
GPUEnvInject: gpu.Object(),
BatchResource: batchresource.Object(),
BatchResource: extendedresource.Object(),
CPUNormalization: cpunormalization.Object(),
CoreSched: coresched.Object(),
TerwayQoS: terwayqos.Object(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/koordlet/runtimehooks/hooks/cpuset/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (r *cpusetRule) getContainerCPUSet(containerReq *protocol.ContainerRequest)
}
// check if cpu resource is allocated in numa-level since there can be numa allocation without cpu
if !numaNode.Resources.Cpu().IsZero() ||
util.GetBatchMilliCPUFromResourceList(numaNode.Resources) > 0 {
util.GetExtendedMilliCPUFromResourceList(numaNode.Resources) > 0 {
isNUMAAware = true
break
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package batchresource
package extendedresourcee

import (
"fmt"
Expand All @@ -24,6 +24,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

"github.com/koordinator-sh/koordinator/apis/extension"
apiext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks"
Expand All @@ -37,8 +38,8 @@ import (
)

const (
name = "BatchResource"
description = "set fundamental cgroups value for batch pod"
name = "ExtendedResource"
description = "set fundamental cgroups value for mid pod and batch pod"

ruleNameForNodeSLO = name + " (nodeSLO)"
ruleNameForNodeMeta = name + " (nodeMeta)"
Expand All @@ -49,7 +50,7 @@ type plugin struct {
executor resourceexecutor.ResourceUpdateExecutor
}

var podQOSConditions = []string{string(apiext.QoSBE)}
var podPriorityConditions = []string{string(extension.PriorityMid), string(extension.PriorityBatch)}

func (p *plugin) Register(op hooks.Options) {
klog.V(5).Infof("register hook %v", name)
Expand All @@ -63,17 +64,17 @@ func (p *plugin) Register(op hooks.Options) {
hooks.Register(rmconfig.PreCreateContainer, name, description+" (container)", p.SetContainerResources)
hooks.Register(rmconfig.PreUpdateContainerResources, name, description+" (container)", p.SetContainerResources)
reconciler.RegisterCgroupReconciler(reconciler.PodLevel, sysutil.CPUShares, description+" (pod cpu shares)",
p.SetPodCPUShares, reconciler.PodQOSFilter(), podQOSConditions...)
p.SetPodCPUShares, reconciler.PodPriorityFilter(), podPriorityConditions...)
reconciler.RegisterCgroupReconciler(reconciler.PodLevel, sysutil.CPUCFSQuota, description+" (pod cfs quota)",
p.SetPodCFSQuota, reconciler.PodQOSFilter(), podQOSConditions...)
p.SetPodCFSQuota, reconciler.PodPriorityFilter(), podPriorityConditions...)
reconciler.RegisterCgroupReconciler(reconciler.PodLevel, sysutil.MemoryLimit, description+" (pod memory limit)",
p.SetPodMemoryLimit, reconciler.PodQOSFilter(), podQOSConditions...)
p.SetPodMemoryLimit, reconciler.PodPriorityFilter(), podPriorityConditions...)
reconciler.RegisterCgroupReconciler(reconciler.ContainerLevel, sysutil.CPUShares, description+" (container cpu shares)",
p.SetContainerCPUShares, reconciler.PodQOSFilter(), podQOSConditions...)
p.SetContainerCPUShares, reconciler.PodPriorityFilter(), podPriorityConditions...)
reconciler.RegisterCgroupReconciler(reconciler.ContainerLevel, sysutil.CPUCFSQuota, description+" (container cfs quota)",
p.SetContainerCFSQuota, reconciler.PodQOSFilter(), podQOSConditions...)
p.SetContainerCFSQuota, reconciler.PodPriorityFilter(), podPriorityConditions...)
reconciler.RegisterCgroupReconciler(reconciler.ContainerLevel, sysutil.MemoryLimit, description+" (container memory limit)",
p.SetContainerMemoryLimit, reconciler.PodQOSFilter(), podQOSConditions...)
p.SetContainerMemoryLimit, reconciler.PodPriorityFilter(), podPriorityConditions...)
p.executor = op.Executor
}

Expand Down Expand Up @@ -125,7 +126,7 @@ func (p *plugin) SetPodCPUShares(proto protocol.HooksProtocol) error {
return fmt.Errorf("pod protocol is nil for plugin %v", name)
}

if !isPodQoSBEByAttr(podCtx.Request.Labels, podCtx.Request.Annotations) {
if !isPodQoSMatchedByAttr(podCtx.Request.Labels, podCtx.Request.Annotations) {
return nil
}

Expand All @@ -141,7 +142,7 @@ func (p *plugin) SetPodCPUShares(proto protocol.HooksProtocol) error {
if c.Requests == nil {
continue
}
containerRequest := util.GetBatchMilliCPUFromResourceList(c.Requests)
containerRequest := util.GetExtendedMilliCPUFromResourceList(c.Requests)
if containerRequest <= 0 {
continue
}
Expand All @@ -159,7 +160,7 @@ func (p *plugin) SetPodCFSQuota(proto protocol.HooksProtocol) error {
return fmt.Errorf("pod protocol is nil for plugin %v", name)
}

if !isPodQoSBEByAttr(podCtx.Request.Labels, podCtx.Request.Annotations) {
if !isPodQoSMatchedByAttr(podCtx.Request.Labels, podCtx.Request.Annotations) {
return nil
}

Expand All @@ -186,7 +187,7 @@ func (p *plugin) SetPodCFSQuota(proto protocol.HooksProtocol) error {
milliCPULimit = -1
break
}
containerLimit := util.GetBatchMilliCPUFromResourceList(c.Limits)
containerLimit := util.GetExtendedMilliCPUFromResourceList(c.Limits)
if containerLimit <= 0 { // pod unlimited once a container is unlimited
milliCPULimit = -1
break
Expand All @@ -212,7 +213,7 @@ func (p *plugin) SetPodMemoryLimit(proto protocol.HooksProtocol) error {
return fmt.Errorf("pod protocol is nil for plugin %v", name)
}

if !isPodQoSBEByAttr(podCtx.Request.Labels, podCtx.Request.Annotations) {
if !isPodQoSMatchedByAttr(podCtx.Request.Labels, podCtx.Request.Annotations) {
return nil
}

Expand All @@ -229,7 +230,7 @@ func (p *plugin) SetPodMemoryLimit(proto protocol.HooksProtocol) error {
memoryLimit = -1
break
}
containerLimit := util.GetBatchMemoryFromResourceList(c.Limits)
containerLimit := util.GetExtendedMemoryFromResourceList(c.Limits)
if containerLimit <= 0 { // pod unlimited once a container is unlimited
memoryLimit = -1
break
Expand Down Expand Up @@ -277,7 +278,7 @@ func (p *plugin) SetContainerCPUShares(proto protocol.HooksProtocol) error {
return fmt.Errorf("container protocol is nil for plugin %v", name)
}

if !isPodQoSBEByAttr(containerCtx.Request.PodLabels, containerCtx.Request.PodAnnotations) {
if !isPodQoSMatchedByAttr(containerCtx.Request.PodLabels, containerCtx.Request.PodAnnotations) {
return nil
}

Expand All @@ -289,7 +290,7 @@ func (p *plugin) SetContainerCPUShares(proto protocol.HooksProtocol) error {

milliCPURequest := int64(0)
if containerSpec.Requests != nil {
containerRequest := util.GetBatchMilliCPUFromResourceList(containerSpec.Requests)
containerRequest := util.GetExtendedMilliCPUFromResourceList(containerSpec.Requests)
if containerRequest > 0 {
milliCPURequest = containerRequest
}
Expand All @@ -306,7 +307,7 @@ func (p *plugin) SetContainerCFSQuota(proto protocol.HooksProtocol) error {
return fmt.Errorf("container protocol is nil for plugin %v", name)
}

if !isPodQoSBEByAttr(containerCtx.Request.PodLabels, containerCtx.Request.PodAnnotations) {
if !isPodQoSMatchedByAttr(containerCtx.Request.PodLabels, containerCtx.Request.PodAnnotations) {
return nil
}

Expand All @@ -329,7 +330,7 @@ func (p *plugin) SetContainerCFSQuota(proto protocol.HooksProtocol) error {

milliCPULimit := int64(0)
if containerSpec.Limits != nil {
containerLimit := util.GetBatchMilliCPUFromResourceList(containerSpec.Limits)
containerLimit := util.GetExtendedMilliCPUFromResourceList(containerSpec.Limits)
if containerLimit > 0 {
milliCPULimit = containerLimit
}
Expand All @@ -354,7 +355,7 @@ func (p *plugin) SetContainerMemoryLimit(proto protocol.HooksProtocol) error {
return fmt.Errorf("container protocol is nil for plugin %v", name)
}

if !isPodQoSBEByAttr(containerCtx.Request.PodLabels, containerCtx.Request.PodAnnotations) {
if !isPodQoSMatchedByAttr(containerCtx.Request.PodLabels, containerCtx.Request.PodAnnotations) {
return nil
}

Expand All @@ -366,7 +367,7 @@ func (p *plugin) SetContainerMemoryLimit(proto protocol.HooksProtocol) error {

memoryLimit := int64(0)
if containerSpec.Limits != nil {
containerLimit := util.GetBatchMemoryFromResourceList(containerSpec.Limits)
containerLimit := util.GetExtendedMemoryFromResourceList(containerSpec.Limits)
if containerLimit > 0 {
memoryLimit = containerLimit
}
Expand All @@ -379,6 +380,7 @@ func (p *plugin) SetContainerMemoryLimit(proto protocol.HooksProtocol) error {
return nil
}

func isPodQoSBEByAttr(labels map[string]string, annotations map[string]string) bool {
return apiext.GetQoSClassByAttrs(labels, annotations) == apiext.QoSBE
func isPodQoSMatchedByAttr(labels map[string]string, annotations map[string]string) bool {
podQoS := apiext.GetQoSClassByAttrs(labels, annotations)
return podQoS == apiext.QoSBE || podQoS == apiext.QoSLS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which cgroup will Mid+LS pods use
/besteffort may be not appropriate
more things need to be clarify if using /burstable, such as the fairness of cpushares value between Prodand Mid

Copy link
Contributor Author

@j4ckstraw j4ckstraw Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is just a filter which means it allow LS and BE.
I think mid+LS pods use /besteffort or /burstable should be restricted in validating webhook.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mid+BE in /besteffort seems reasonable, while Mid+LS should better use /burstable(but this needs some hack works since kubelet is not allowed yet)

I think now we can let koordlet supports Mid+BE first.

So here are the things we should fix:

  1. keep podQOSConditions still as apiext.QoSBE
  2. define independent GetContainerMidMilliCPURequest and Memory instead of using GetContainerBatchMilliCPURequest

Moreover, there are several QoS policies for BE, we need to clarify whether they should support Mid+BE and how to support, for example the evict order of pods. it will be better if we have some docs first.

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package batchresource
package extendedresourcee

import (
"encoding/json"
Expand Down Expand Up @@ -808,7 +808,7 @@ func Test_plugin_SetContainerResources(t *testing.T) {
}
}

func Test_isPodQoSBEByAttr(t *testing.T) {
func Test_isPodQoSMatchedByAttr(t *testing.T) {
tests := []struct {
name string
arg map[string]string
Expand All @@ -823,11 +823,11 @@ func Test_isPodQoSBEByAttr(t *testing.T) {
want: true,
},
{
name: "qos is not BE",
name: "qos is LS",
arg: map[string]string{
apiext.LabelPodQoS: string(apiext.QoSLS),
},
want: false,
want: true,
},
{
name: "qos is not BE 1",
Expand All @@ -836,7 +836,7 @@ func Test_isPodQoSBEByAttr(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := isPodQoSBEByAttr(tt.arg, tt.arg1)
got := isPodQoSMatchedByAttr(tt.arg, tt.arg1)
assert.Equal(t, tt.want, got)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package batchresource
package extendedresourcee

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package batchresource
package extendedresourcee

import (
"encoding/json"
Expand Down
35 changes: 30 additions & 5 deletions pkg/koordlet/runtimehooks/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,40 @@ func PodQOSFilter() Filter {
return singletonPodQOSFilter
}

type podPriorityFilter struct{}

const (
PodPriorityFilterName = "podPriority"
)

func (p *podPriorityFilter) Name() string {
return PodPriorityFilterName
}

func (p *podPriorityFilter) Filter(podMeta *statesinformer.PodMeta) string {
priority := apiext.GetPodPriorityClassRaw(podMeta.Pod)

return string(priority)
}

var singletonPodPriorityFilter *podPriorityFilter

// PodPriorityFilter returns a Filter which filters pod priority name
func PodPriorityFilter() *podPriorityFilter {
if singletonPodPriorityFilter == nil {
singletonPodPriorityFilter = &podPriorityFilter{}
}
return singletonPodPriorityFilter
}

type reconcileFunc func(protocol.HooksProtocol) error

// RegisterCgroupReconciler registers a cgroup reconciler according to the cgroup file, reconcile function and filter
// conditions. A cgroup file of one level can have multiple reconcile functions with different filtered conditions.
//
// e.g. pod-level cfs_quota can be registered both by cpuset hook and batchresource hook. While cpuset hook reconciles
// cfs_quota for LSE and LSR pods, batchresource reconciles pods of BE QoS.
// e.g. pod-level cfs_quota can be registered both by cpuset hook and extendedresource hook. While cpuset hook reconciles
// cfs_quota for LSE and LSR pods, extendedresource reconciles pods of BE QoS.
//
// TODO: support priority+qos filter.
func RegisterCgroupReconciler(level ReconcilerLevel, cgroupFile system.Resource, description string,
fn reconcileFunc, filter Filter, conditions ...string) {
if len(conditions) <= 0 { // default condition
Expand All @@ -153,12 +178,12 @@ func RegisterCgroupReconciler(level ReconcilerLevel, cgroupFile system.Resource,
continue
}

// if reconciler exist
if r.filter.Name() != filter.Name() {
klog.Fatalf("%v of level %v is already registered with filter %v by %v, cannot change to %v by %v",
klog.Warningf("%v of level %v is already registered with filter %v by [%v], and now add new filter %v by [%v]",
cgroupFile.ResourceType(), level, r.filter.Name(), r.description, filter.Name(), description)
}

// NOTE: different filter should have different condition, so no conflict here if many filter registered
for _, condition := range conditions {
if _, ok := r.fn[condition]; ok {
klog.Fatalf("%v of level %v is already registered with condition %v by %v, cannot change by %v",
Expand Down