Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QoS Manager Framework #590

Merged
merged 6 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/koordlet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ func main() {
os.Exit(1)
}

// Init config from ConfigMap.
if err = cfg.InitFromConfigMap(); err != nil {
klog.Error("Unable to init config from ConfigMap: ", err)
os.Exit(1)
}

d, err := agent.NewDaemon(cfg)
if err != nil {
klog.Error("Unable to setup koordlet daemon: ", err)
Expand Down
22 changes: 22 additions & 0 deletions pkg/features/koordlet_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ limitations under the License.
package features

import (
"fmt"

"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/component-base/featuregate"

slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
)

const (
Expand Down Expand Up @@ -75,3 +79,21 @@ var (
Accelerators: {Default: false, PreRelease: featuregate.Alpha},
}
)

// IsFeatureDisabled returns whether the featuregate is disabled by nodeSLO config
func IsFeatureDisabled(nodeSLO *slov1alpha1.NodeSLO, feature featuregate.Feature) (bool, error) {
if nodeSLO == nil || nodeSLO.Spec == (slov1alpha1.NodeSLOSpec{}) {
return true, fmt.Errorf("cannot parse feature config for invalid nodeSLO %v", nodeSLO)
}

spec := nodeSLO.Spec
switch feature {
case BECPUSuppress, BEMemoryEvict, BECPUEvict:
if spec.ResourceUsedThresholdWithBE == nil || spec.ResourceUsedThresholdWithBE.Enable == nil {
return true, fmt.Errorf("cannot parse feature config for invalid nodeSLO %v", nodeSLO)
}
return !(*spec.ResourceUsedThresholdWithBE.Enable), nil
default:
return true, fmt.Errorf("cannot parse feature config for unsupported feature %s", feature)
}
}
34 changes: 34 additions & 0 deletions pkg/koordlet/common/reason/reason.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Copyright 2022 The Koordinator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reason

const (
UpdateCPU = "UpdateCPU"
UpdateMemory = "UpdateMemory"
UpdateCgroups = "UpdateCgroups" // update cgroups excluding the options already stated above
UpdateSystemConfig = "UpdateSystemConfig"
UpdateResctrlSchemata = "UpdateResctrlSchemata" // update resctrl l3 cat schemata
UpdateResctrlTasks = "UpdateResctrlTasks" // update resctrl tasks

EvictPodByNodeMemoryUsage = "EvictPodByNodeMemoryUsage"
EvictPodByBECPUSatisfaction = "EvictPodByBECPUSatisfaction"

AdjustBEByNodeCPUUsage = "AdjustBEByNodeCPUUsage"

EvictPodSuccess = "evictPodSuccess"
EvictPodFail = "evictPodFail"
)
79 changes: 79 additions & 0 deletions pkg/koordlet/common/testutil/k8s.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2022 The Koordinator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package testutil

import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
)

type FakeRecorder struct {
eventReason string
}

func (f *FakeRecorder) Event(object runtime.Object, eventType, reason, message string) {
f.eventReason = reason
fmt.Printf("send event:eventType:%s,reason:%s,message:%s", eventType, reason, message)
}

func (f *FakeRecorder) Eventf(object runtime.Object, eventType, reason, messageFmt string, args ...interface{}) {
f.eventReason = reason
fmt.Printf("send event:eventType:%s,reason:%s,message:%s", eventType, reason, messageFmt)
}

func (f *FakeRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventType, reason, messageFmt string, args ...interface{}) {
f.Eventf(object, eventType, reason, messageFmt, args...)
}

func MockTestNode(cpu, memory string) *corev1.Node {
return &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
Namespace: "default",
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(cpu),
corev1.ResourceMemory: resource.MustParse(memory),
},
Capacity: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(cpu),
corev1.ResourceMemory: resource.MustParse(memory),
},
},
}
}

func MockTestPod(qosClass apiext.QoSClass, name string) *corev1.Pod {
return &corev1.Pod{
TypeMeta: metav1.TypeMeta{Kind: "Pod"},
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(name),
Labels: map[string]string{
apiext.LabelPodQoS: string(qosClass),
},
},
}
}
22 changes: 22 additions & 0 deletions pkg/koordlet/common/types/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
Copyright 2022 The Koordinator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package types

const (
DefaultCFSPeriod = 100000
DefaultMemUnlimit = 9223372036854771712
)
54 changes: 54 additions & 0 deletions pkg/koordlet/common/utils/containers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright 2022 The Koordinator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/util"
"github.com/koordinator-sh/koordinator/pkg/util/runtime"
)

// KillContainers kills containers inside the pod
func KillContainers(pod *corev1.Pod, message string) {
for _, container := range pod.Spec.Containers {
containerID, containerStatus, err := util.FindContainerIdAndStatusByName(&pod.Status, container.Name)
if err != nil {
klog.Errorf("failed to find container id and status, error: %v", err)
return
}

if containerStatus == nil || containerStatus.State.Running == nil {
return
}

if containerID != "" {
runtimeType, _, _ := util.ParseContainerId(containerStatus.ContainerID)
runtimeHandler, err := runtime.GetRuntimeHandler(runtimeType)
if err != nil || runtimeHandler == nil {
klog.Errorf("%s, kill container(%s) error! GetRuntimeHandler fail! error: %v", message, containerStatus.ContainerID, err)
continue
}
if err := runtimeHandler.StopContainer(containerID, 0); err != nil {
klog.Errorf("%s, stop container error! error: %v", message, err)
}
} else {
klog.Warningf("%s, get container ID failed, pod %s/%s containerName %s status: %v", message, pod.Namespace, pod.Name, container.Name, pod.Status.ContainerStatuses)
}
}
}
36 changes: 36 additions & 0 deletions pkg/koordlet/common/utils/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
Copyright 2022 The Koordinator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
corev1 "k8s.io/api/core/v1"

"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
"github.com/koordinator-sh/koordinator/pkg/util"
)

func GetPodMetas(pods []*corev1.Pod) []*statesinformer.PodMeta {
podMetas := make([]*statesinformer.PodMeta, len(pods))

for index, pod := range pods {
cgroupDir := util.GetPodKubeRelativePath(pod)
podMeta := &statesinformer.PodMeta{CgroupDir: cgroupDir, Pod: pod.DeepCopy()}
podMetas[index] = podMeta
}

return podMetas
}
47 changes: 47 additions & 0 deletions pkg/koordlet/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ limitations under the License.
package config

import (
"context"
"encoding/json"
"errors"
"flag"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
cliflag "k8s.io/component-base/cli/flag"
"sigs.k8s.io/controller-runtime/pkg/client/config"
Expand All @@ -28,44 +33,61 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/audit"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metricsadvisor"
qosmanagerconfig "github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/config"
"github.com/koordinator-sh/koordinator/pkg/koordlet/reporter"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resmanager"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
"github.com/koordinator-sh/koordinator/pkg/util/system"
)

const (
DefaultKoordletConfigMapNamespace = "koordinator-system"
DefaultKoordletConfigMapName = "koordlet-config"

CMKeyQoSPluginExtraConfigs = "qos-plugin-extra-configs"
)

type Configuration struct {
ConfigMapName string
ConfigMapNamesapce string
KubeRestConf *rest.Config
StatesInformerConf *statesinformer.Config
ReporterConf *reporter.Config
CollectorConf *metricsadvisor.Config
MetricCacheConf *metriccache.Config
ResManagerConf *resmanager.Config
QosManagerConf *qosmanagerconfig.Config
RuntimeHookConf *runtimehooks.Config
AuditConf *audit.Config
FeatureGates map[string]bool
}

func NewConfiguration() *Configuration {
return &Configuration{
ConfigMapName: DefaultKoordletConfigMapName,
ConfigMapNamesapce: DefaultKoordletConfigMapNamespace,
StatesInformerConf: statesinformer.NewDefaultConfig(),
ReporterConf: reporter.NewDefaultConfig(),
CollectorConf: metricsadvisor.NewDefaultConfig(),
MetricCacheConf: metriccache.NewDefaultConfig(),
ResManagerConf: resmanager.NewDefaultConfig(),
QosManagerConf: qosmanagerconfig.NewDefaultConfig(),
RuntimeHookConf: runtimehooks.NewDefaultConfig(),
AuditConf: audit.NewDefaultConfig(),
}
}

func (c *Configuration) InitFlags(fs *flag.FlagSet) {
fs.StringVar(&c.ConfigMapName, "configmap-name", DefaultKoordletConfigMapName, "determines the name the koordlet configmap uses.")
fs.StringVar(&c.ConfigMapNamesapce, "configmap-namespace", DefaultKoordletConfigMapNamespace, "determines the namespace of configmap uses.")
system.Conf.InitFlags(fs)
c.StatesInformerConf.InitFlags(fs)
c.ReporterConf.InitFlags(fs)
c.CollectorConf.InitFlags(fs)
c.MetricCacheConf.InitFlags(fs)
c.ResManagerConf.InitFlags(fs)
c.QosManagerConf.InitFlags(fs)
c.RuntimeHookConf.InitFlags(fs)
c.AuditConf.InitFlags(fs)
fs.Var(cliflag.NewMapStringBool(&c.FeatureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+
Expand All @@ -81,3 +103,28 @@ func (c *Configuration) InitClient() error {
c.KubeRestConf = cfg
return nil
}

func (c *Configuration) InitFromConfigMap() error {
if c.KubeRestConf == nil {
return errors.New("KubeRestConf is nil")
}
cli, err := kubernetes.NewForConfig(c.KubeRestConf)
if err != nil {
return err
}
cm, err := cli.CoreV1().ConfigMaps(c.ConfigMapNamesapce).Get(context.TODO(), c.ConfigMapName, metav1.GetOptions{})
if err != nil {
return err
}

// Setup extra configs for QoS Manager.
if qosPluginExtraConfigRaw, found := cm.Data[CMKeyQoSPluginExtraConfigs]; found {
var extraConfigs map[string]string
if err = json.Unmarshal([]byte(qosPluginExtraConfigRaw), &extraConfigs); err != nil {
return err
}
c.QosManagerConf.PluginExtraConfigs = extraConfigs
}

return nil
}
Loading