From 08506aebfc7c8e83199cc6943524d145ce009de6 Mon Sep 17 00:00:00 2001 From: Frame Date: Thu, 5 Jan 2023 17:00:18 +0800 Subject: [PATCH] koordlet: add metrics for batch resources (#913) Signed-off-by: saintube --- pkg/koordlet/koordlet.go | 2 +- pkg/koordlet/metrics/common.go | 1 + pkg/koordlet/metrics/metrics.go | 5 +- pkg/koordlet/metrics/metrics_test.go | 110 +++++++++++- pkg/koordlet/metrics/resource_summary.go | 94 ++++++++++ pkg/koordlet/metricsadvisor/collector.go | 20 +-- pkg/koordlet/metricsadvisor/collector_test.go | 14 +- .../performance_collector_linux_test.go | 18 +- pkg/koordlet/resmanager/cpu_suppress.go | 2 +- pkg/koordlet/resmanager/resmanager.go | 2 +- pkg/koordlet/resmanager/resmanager_test.go | 18 +- pkg/koordlet/statesinformer/config_test.go | 19 +- pkg/koordlet/statesinformer/states_node.go | 32 +++- .../statesinformer/states_node_test.go | 58 +++++- pkg/koordlet/statesinformer/states_pods.go | 60 ++++++- .../statesinformer/states_pods_test.go | 165 +++++++++++++++++- pkg/util/resource.go | 4 + pkg/util/resource_test.go | 37 ++++ 18 files changed, 593 insertions(+), 68 deletions(-) create mode 100644 pkg/koordlet/metrics/resource_summary.go diff --git a/pkg/koordlet/koordlet.go b/pkg/koordlet/koordlet.go index 600586e97..2af636c9c 100644 --- a/pkg/koordlet/koordlet.go +++ b/pkg/koordlet/koordlet.go @@ -199,7 +199,7 @@ func (d *daemon) Run(stopCh <-chan struct{}) { go func() { if err := d.runtimeHook.Run(stopCh); err != nil { - klog.Errorf("Unable to run the runtimeHook: ", err) + klog.Error("Unable to run the runtimeHook: ", err) os.Exit(1) } }() diff --git a/pkg/koordlet/metrics/common.go b/pkg/koordlet/metrics/common.go index 6227f3cf1..008b71fd6 100644 --- a/pkg/koordlet/metrics/common.go +++ b/pkg/koordlet/metrics/common.go @@ -26,6 +26,7 @@ var ( Name: "start_time", Help: "the start time of koordlet", }, []string{NodeKey}) + CollectNodeCPUInfoStatus = prometheus.NewCounterVec(prometheus.CounterOpts{ Subsystem: KoordletSubsystem, Name: "collect_node_cpu_info_status", diff --git a/pkg/koordlet/metrics/metrics.go b/pkg/koordlet/metrics/metrics.go index e93b3bf16..7217da4f6 100644 --- a/pkg/koordlet/metrics/metrics.go +++ b/pkg/koordlet/metrics/metrics.go @@ -26,6 +26,7 @@ import ( func init() { prometheus.MustRegister(CommonCollectors...) + prometheus.MustRegister(ResourceSummaryCollectors...) prometheus.MustRegister(CPICollectors...) prometheus.MustRegister(PSICollectors...) prometheus.MustRegister(CPUSuppressCollector...) @@ -38,7 +39,7 @@ const ( NodeKey = "node" StatusKey = "status" - StatusSucceed = "succeed" + StatusSucceed = "succeeded" StatusFailed = "failed" EvictionReasonKey = "reason" @@ -50,6 +51,8 @@ const ( PodUID = "pod_uid" PodName = "pod_name" PodNamespace = "pod_namespace" + + ResourceKey = "resource" ) var ( diff --git a/pkg/koordlet/metrics/metrics_test.go b/pkg/koordlet/metrics/metrics_test.go index 74c863613..ff2e4b2bf 100644 --- a/pkg/koordlet/metrics/metrics_test.go +++ b/pkg/koordlet/metrics/metrics_test.go @@ -26,8 +26,10 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apiext "github.com/koordinator-sh/koordinator/apis/extension" koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" + "github.com/koordinator-sh/koordinator/pkg/util" ) func TestGenNodeLabels(t *testing.T) { @@ -131,8 +133,8 @@ func TestCommonCollectors(t *testing.T) { RecordCollectNodeCPUInfoStatus(testingErr) RecordCollectNodeCPUInfoStatus(nil) RecordBESuppressCores("cfsQuota", float64(1000)) - RecordBESuppressLSUsedCPU(float64(1000)) - RecordNodeUsedCPU(float64(2000)) + RecordBESuppressLSUsedCPU(1.0) + RecordNodeUsedCPU(2.0) RecordContainerScaledCFSBurstUS(testingPod.Namespace, testingPod.Name, testingContainer.ContainerID, testingContainer.Name, 1000000) RecordContainerScaledCFSQuotaUS(testingPod.Namespace, testingPod.Name, testingContainer.ContainerID, testingContainer.Name, 1000000) RecordPodEviction("evictByCPU") @@ -144,3 +146,107 @@ func TestCommonCollectors(t *testing.T) { RecordPodPSI(testingPod, testingPSI) }) } + +func TestResourceSummaryCollectors(t *testing.T) { + testingNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Labels: map[string]string{}, + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("200Gi"), + apiext.BatchCPU: resource.MustParse("50000"), + apiext.BatchMemory: resource.MustParse("80Gi"), + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("200Gi"), + apiext.BatchCPU: resource.MustParse("50000"), + apiext.BatchMemory: resource.MustParse("80Gi"), + }, + }, + } + testingPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test_pod", + Namespace: "test_pod_namespace", + UID: "test01", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test_container", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("4Gi"), + }, + }, + }, + }, + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "test_container", + ContainerID: "containerd://testxxx", + }, + }, + }, + } + testingBatchPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test_batch_pod", + Namespace: "test_batch_pod_namespace", + UID: "batch01", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test_batch_container", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + apiext.BatchCPU: resource.MustParse("1000"), + apiext.BatchMemory: resource.MustParse("2Gi"), + }, + Limits: corev1.ResourceList{ + apiext.BatchCPU: resource.MustParse("1000"), + apiext.BatchMemory: resource.MustParse("2Gi"), + }, + }, + }, + }, + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "test_batch_container", + ContainerID: "containerd://batchxxx", + }, + }, + }, + } + + t.Run("test not panic", func(t *testing.T) { + Register(testingNode) + defer Register(nil) + + RecordNodeResourceAllocatable(string(apiext.BatchCPU), float64(util.QuantityPtr(testingNode.Status.Allocatable[apiext.BatchCPU]).Value())) + RecordNodeResourceAllocatable(string(apiext.BatchMemory), float64(util.QuantityPtr(testingNode.Status.Allocatable[apiext.BatchMemory]).Value())) + RecordContainerResourceRequests(string(corev1.ResourceCPU), &testingPod.Status.ContainerStatuses[0], testingPod, float64(testingPod.Spec.Containers[0].Resources.Requests.Cpu().Value())) + RecordContainerResourceRequests(string(corev1.ResourceMemory), &testingPod.Status.ContainerStatuses[0], testingPod, float64(testingPod.Spec.Containers[0].Resources.Requests.Memory().Value())) + RecordContainerResourceRequests(string(apiext.BatchCPU), &testingBatchPod.Status.ContainerStatuses[0], testingBatchPod, float64(util.QuantityPtr(testingBatchPod.Spec.Containers[0].Resources.Requests[apiext.BatchCPU]).Value())) + RecordContainerResourceRequests(string(apiext.BatchMemory), &testingBatchPod.Status.ContainerStatuses[0], testingBatchPod, float64(util.QuantityPtr(testingBatchPod.Spec.Containers[0].Resources.Requests[apiext.BatchMemory]).Value())) + RecordContainerResourceLimits(string(apiext.BatchCPU), &testingBatchPod.Status.ContainerStatuses[0], testingBatchPod, float64(util.QuantityPtr(testingBatchPod.Spec.Containers[0].Resources.Limits[apiext.BatchCPU]).Value())) + RecordContainerResourceLimits(string(apiext.BatchMemory), &testingBatchPod.Status.ContainerStatuses[0], testingBatchPod, float64(util.QuantityPtr(testingBatchPod.Spec.Containers[0].Resources.Limits[apiext.BatchMemory]).Value())) + + ResetContainerResourceRequests() + ResetContainerResourceLimits() + }) +} diff --git a/pkg/koordlet/metrics/resource_summary.go b/pkg/koordlet/metrics/resource_summary.go new file mode 100644 index 000000000..0032d3040 --- /dev/null +++ b/pkg/koordlet/metrics/resource_summary.go @@ -0,0 +1,94 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + corev1 "k8s.io/api/core/v1" + + "github.com/prometheus/client_golang/prometheus" +) + +var ( + NodeResourceAllocatable = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: KoordletSubsystem, + Name: "node_resource_allocatable", + Help: "the node allocatable of resources updated by koordinator", + }, []string{NodeKey, ResourceKey}) + + ContainerResourceRequests = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: KoordletSubsystem, + Name: "container_resource_requests", + Help: "the container requests of resources updated by koordinator", + }, []string{NodeKey, ResourceKey, PodUID, PodName, PodNamespace, ContainerID, ContainerName}) + + ContainerResourceLimits = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: KoordletSubsystem, + Name: "container_resource_limits", + Help: "the container limits of resources updated by koordinator", + }, []string{NodeKey, ResourceKey, PodUID, PodName, PodNamespace, ContainerID, ContainerName}) + + ResourceSummaryCollectors = []prometheus.Collector{ + NodeResourceAllocatable, + ContainerResourceRequests, + ContainerResourceLimits, + } +) + +func RecordNodeResourceAllocatable(resourceName string, value float64) { + labels := genNodeLabels() + if labels == nil { + return + } + labels[ResourceKey] = resourceName + NodeResourceAllocatable.With(labels).Set(value) +} + +func RecordContainerResourceRequests(resourceName string, status *corev1.ContainerStatus, pod *corev1.Pod, value float64) { + labels := genNodeLabels() + if labels == nil { + return + } + labels[ResourceKey] = resourceName + labels[PodUID] = string(pod.UID) + labels[PodName] = pod.Name + labels[PodNamespace] = pod.Namespace + labels[ContainerID] = status.ContainerID + labels[ContainerName] = status.Name + ContainerResourceRequests.With(labels).Set(value) +} + +func ResetContainerResourceRequests() { + ContainerResourceRequests.Reset() +} + +func RecordContainerResourceLimits(resourceName string, status *corev1.ContainerStatus, pod *corev1.Pod, value float64) { + labels := genNodeLabels() + if labels == nil { + return + } + labels[ResourceKey] = resourceName + labels[PodUID] = string(pod.UID) + labels[PodName] = pod.Name + labels[PodNamespace] = pod.Namespace + labels[ContainerID] = status.ContainerID + labels[ContainerName] = status.Name + ContainerResourceLimits.With(labels).Set(value) +} + +func ResetContainerResourceLimits() { + ContainerResourceLimits.Reset() +} diff --git a/pkg/koordlet/metricsadvisor/collector.go b/pkg/koordlet/metricsadvisor/collector.go index 6699a2e0b..eaf0b1914 100644 --- a/pkg/koordlet/metricsadvisor/collector.go +++ b/pkg/koordlet/metricsadvisor/collector.go @@ -125,11 +125,11 @@ func (c *collector) Run(stopCh <-chan struct{}) error { go wait.Until(func() { c.collectGPUUsage() c.collectNodeResUsed() - // add sync metaService cache check before collect pod information + // add sync statesInformer cache check before collect pod information // because collect function will get all pods. if !cache.WaitForCacheSync(stopCh, c.statesInformer.HasSynced) { - klog.Errorf("timed out waiting for meta service caches to sync") - // Koordlet exit because of metaService sync failed. + klog.Errorf("timed out waiting for states informer caches to sync") + // Koordlet exit because of statesInformer sync failed. os.Exit(1) return } @@ -142,11 +142,11 @@ func (c *collector) Run(stopCh <-chan struct{}) error { ic := NewPerformanceCollector(c.statesInformer, c.metricCache, c.config.CPICollectorTimeWindowSeconds) util.RunFeature(func() { - // add sync metaService cache check before collect pod information + // add sync statesInformer cache check before collect pod information // because collect function will get all pods. if !cache.WaitForCacheSync(stopCh, c.statesInformer.HasSynced) { - // Koordlet exit because of metaService sync failed. - klog.Fatalf("timed out waiting for meta service caches to sync") + // Koordlet exit because of statesInformer sync failed. + klog.Fatalf("timed out waiting for states informer caches to sync") return } ic.collectContainerCPI() @@ -158,11 +158,11 @@ func (c *collector) Run(stopCh <-chan struct{}) error { klog.Fatalf("collect psi fail, need anolis os") return } - // add sync metaService cache check before collect pod information + // add sync statesInformer cache check before collect pod information // because collect function will get all pods. if !cache.WaitForCacheSync(stopCh, c.statesInformer.HasSynced) { - // Koordlet exit because of metaService sync failed. - klog.Fatalf("timed out waiting for meta service caches to sync") + // Koordlet exit because of statesInformer sync failed. + klog.Fatalf("timed out waiting for states informer caches to sync") return } ic.collectContainerPSI() @@ -217,7 +217,7 @@ func (c *collector) collectNodeResUsed() { // update collect time c.state.RefreshTime(nodeResUsedUpdateTime) - metrics.RecordNodeUsedCPU(cpuUsageValue * 1000) + metrics.RecordNodeUsedCPU(cpuUsageValue) // in cpu cores klog.Infof("collectNodeResUsed finished %+v", nodeMetric) } diff --git a/pkg/koordlet/metricsadvisor/collector_test.go b/pkg/koordlet/metricsadvisor/collector_test.go index 808e4cc05..abceb7977 100644 --- a/pkg/koordlet/metricsadvisor/collector_test.go +++ b/pkg/koordlet/metricsadvisor/collector_test.go @@ -35,9 +35,9 @@ import ( func TestNewCollector(t *testing.T) { type args struct { - cfg *Config - metaService statesinformer.StatesInformer - metricCache metriccache.MetricCache + cfg *Config + statesInformer statesinformer.StatesInformer + metricCache metriccache.MetricCache } tests := []struct { name string @@ -46,15 +46,15 @@ func TestNewCollector(t *testing.T) { { name: "new-collector", args: args{ - cfg: &Config{}, - metaService: nil, - metricCache: nil, + cfg: &Config{}, + statesInformer: nil, + metricCache: nil, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := NewCollector(tt.args.cfg, tt.args.metaService, tt.args.metricCache); got == nil { + if got := NewCollector(tt.args.cfg, tt.args.statesInformer, tt.args.metricCache); got == nil { t.Errorf("NewCollector() = %v", got) } }) diff --git a/pkg/koordlet/metricsadvisor/performance_collector_linux_test.go b/pkg/koordlet/metricsadvisor/performance_collector_linux_test.go index df7c25691..33d741cad 100644 --- a/pkg/koordlet/metricsadvisor/performance_collector_linux_test.go +++ b/pkg/koordlet/metricsadvisor/performance_collector_linux_test.go @@ -37,10 +37,10 @@ import ( func TestNewPerformanceCollector(t *testing.T) { type args struct { - cfg *Config - metaService statesinformer.StatesInformer - metricCache metriccache.MetricCache - timeWindow int + cfg *Config + statesInformer statesinformer.StatesInformer + metricCache metriccache.MetricCache + timeWindow int } tests := []struct { name string @@ -49,16 +49,16 @@ func TestNewPerformanceCollector(t *testing.T) { { name: "new-performance-collector", args: args{ - cfg: &Config{}, - metaService: nil, - metricCache: nil, - timeWindow: 10, + cfg: &Config{}, + statesInformer: nil, + metricCache: nil, + timeWindow: 10, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := NewPerformanceCollector(tt.args.metaService, tt.args.metricCache, tt.args.timeWindow); got == nil { + if got := NewPerformanceCollector(tt.args.statesInformer, tt.args.metricCache, tt.args.timeWindow); got == nil { t.Errorf("NewPerformanceCollector() = %v", got) } }) diff --git a/pkg/koordlet/resmanager/cpu_suppress.go b/pkg/koordlet/resmanager/cpu_suppress.go index ac54f2467..f63317f71 100644 --- a/pkg/koordlet/resmanager/cpu_suppress.go +++ b/pkg/koordlet/resmanager/cpu_suppress.go @@ -146,7 +146,7 @@ func (r *CPUSuppress) calculateBESuppressCPU(node *corev1.Node, nodeMetric *metr node.Status.Allocatable.Cpu().Format) nodeBESuppressCPU.Sub(podLSUsedCPU) nodeBESuppressCPU.Sub(systemUsedCPU) - metrics.RecordBESuppressLSUsedCPU(float64(podLSUsedCPU.Value())) + metrics.RecordBESuppressLSUsedCPU(float64(podLSUsedCPU.MilliValue()) / 1000) klog.Infof("nodeSuppressBE[CPU(Core)]:%v = node.Total:%v * SLOPercent:%v%% - systemUsage:%v - podLSUsed:%v\n", nodeBESuppressCPU.Value(), node.Status.Allocatable.Cpu().Value(), beCPUUsedThreshold, systemUsedCPU.Value(), podLSUsedCPU.Value()) diff --git a/pkg/koordlet/resmanager/resmanager.go b/pkg/koordlet/resmanager/resmanager.go index 567258f41..0cf8b70fd 100644 --- a/pkg/koordlet/resmanager/resmanager.go +++ b/pkg/koordlet/resmanager/resmanager.go @@ -124,7 +124,7 @@ func (r *resmanager) Run(stopCh <-chan struct{}) error { go configextensions.RunQOSGreyCtrlPlugins(r.kubeClient, stopCh) if !cache.WaitForCacheSync(stopCh, r.statesInformer.HasSynced) { - return fmt.Errorf("time out waiting for kubelet meta service caches to sync") + return fmt.Errorf("time out waiting for states informer caches to sync") } cgroupResourceReconcile := NewCgroupResourcesReconcile(r) diff --git a/pkg/koordlet/resmanager/resmanager_test.go b/pkg/koordlet/resmanager/resmanager_test.go index 9e8c448ce..5a34efe0e 100644 --- a/pkg/koordlet/resmanager/resmanager_test.go +++ b/pkg/koordlet/resmanager/resmanager_test.go @@ -54,10 +54,11 @@ func TestNewResManager(t *testing.T) { kubeClient := &kubernetes.Clientset{} crdClient := &clientsetalpha1.Clientset{} nodeName := "test-node" - metaService := mock_statesinformer.NewMockStatesInformer(ctrl) + statesInformer := mock_statesinformer.NewMockStatesInformer(ctrl) metricCache := mock_metriccache.NewMockMetricCache(ctrl) - _ = NewResManager(NewDefaultConfig(), scheme, kubeClient, crdClient, nodeName, metaService, metricCache, int64(metricsadvisor.NewDefaultConfig().CollectResUsedIntervalSeconds)) + r := NewResManager(NewDefaultConfig(), scheme, kubeClient, crdClient, nodeName, statesInformer, metricCache, int64(metricsadvisor.NewDefaultConfig().CollectResUsedIntervalSeconds)) + assert.NotNil(t, r) }) } @@ -151,11 +152,13 @@ func Test_EvictPodsIfNotEvicted(t *testing.T) { client := clientsetfake.NewSimpleClientset() r := &resmanager{eventRecorder: fakeRecorder, kubeClient: client, podsEvicted: expireCache.NewCacheDefault()} stop := make(chan struct{}) - r.podsEvicted.Run(stop) + err := r.podsEvicted.Run(stop) + assert.NoError(t, err) defer func() { stop <- struct{}{} }() // create pod - client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + _, err = client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + assert.NoError(t, err) // evict success r.evictPodsIfNotEvicted([]*corev1.Pod{pod}, node, "evict pod first", "") @@ -187,16 +190,17 @@ func Test_evictPod(t *testing.T) { fakeRecorder := &FakeRecorder{} client := clientsetfake.NewSimpleClientset() - resmanager := &resmanager{statesInformer: mockStatesInformer, eventRecorder: fakeRecorder, kubeClient: client} + r := &resmanager{statesInformer: mockStatesInformer, eventRecorder: fakeRecorder, kubeClient: client} // create pod - client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + _, err := client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + assert.NoError(t, err) // check pod existPod, err := client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) assert.NotNil(t, existPod, "pod exist in k8s!", err) // evict success - resmanager.evictPod(pod, node, "evict pod first", "") + r.evictPod(pod, node, "evict pod first", "") getEvictObject, err := client.Tracker().Get(podsResource, pod.Namespace, pod.Name) assert.NoError(t, err) assert.NotNil(t, getEvictObject, "evictPod Fail", err) diff --git a/pkg/koordlet/statesinformer/config_test.go b/pkg/koordlet/statesinformer/config_test.go index 6e491013f..988d4155c 100644 --- a/pkg/koordlet/statesinformer/config_test.go +++ b/pkg/koordlet/statesinformer/config_test.go @@ -18,10 +18,10 @@ package statesinformer import ( "flag" - "reflect" "testing" "time" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" ) @@ -47,9 +47,8 @@ func TestNewDefaultConfig(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := NewDefaultConfig(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("NewDefaultConfig() = %v, want %v", got, tt.want) - } + got := NewDefaultConfig() + assert.Equal(t, tt.want, got) }) } } @@ -103,7 +102,6 @@ func TestConfig_InitFlags(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - raw := &Config{ KubeletPreferredAddressType: tt.fields.KubeletPreferredAddressType, KubeletSyncInterval: tt.fields.KubeletSyncInterval, @@ -111,15 +109,14 @@ func TestConfig_InitFlags(t *testing.T) { InsecureKubeletTLS: tt.fields.InsecureKubeletTLS, KubeletReadOnlyPort: tt.fields.KubeletReadOnlyPort, NodeTopologySyncInterval: tt.fields.NodeTopologySyncInterval, - DisableQueryKubeletConfig: true, - EnableNodeMetricReport: false, + DisableQueryKubeletConfig: tt.fields.DisableQueryKubeletConfig, + EnableNodeMetricReport: tt.fields.EnableNodeMetricReport, } c := NewDefaultConfig() c.InitFlags(tt.args.fs) - tt.args.fs.Parse(cmdArgs[1:]) - if !reflect.DeepEqual(raw, c) { - t.Fatalf("InitFlags got: \n%+v, \nwant: \n%+v", c, raw) - } + err := tt.args.fs.Parse(cmdArgs[1:]) + assert.NoError(t, err) + assert.Equal(t, raw, c) }) } } diff --git a/pkg/koordlet/statesinformer/states_node.go b/pkg/koordlet/statesinformer/states_node.go index 0184ae5f4..0eaf55e59 100644 --- a/pkg/koordlet/statesinformer/states_node.go +++ b/pkg/koordlet/statesinformer/states_node.go @@ -30,7 +30,9 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + apiext "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" + "github.com/koordinator-sh/koordinator/pkg/util" ) const ( @@ -153,5 +155,33 @@ func (s *nodeInformer) syncNode(newNode *corev1.Node) { s.node = newNode.DeepCopy() // also register node for metrics - metrics.Register(newNode) + recordNodeResourceMetrics(newNode) +} + +func recordNodeResourceMetrics(node *corev1.Node) { + // register node labels + metrics.Register(node) + // record node resource metrics + recordNodeResources(node) + + klog.V(5).Info("record node prometheus metrics successfully") +} + +func recordNodeResources(node *corev1.Node) { + if node == nil || node.Status.Allocatable == nil { + klog.V(4).Infof("failed to record node resources metrics, node is invalid: %v", node) + return + } + + // record node allocatable of BatchCPU & BatchMemory + if q, ok := node.Status.Allocatable[apiext.BatchCPU]; ok { + metrics.RecordNodeResourceAllocatable(string(apiext.BatchCPU), float64(util.QuantityPtr(q).Value())) + } else { + metrics.RecordNodeResourceAllocatable(string(apiext.BatchCPU), 0) + } + if q, ok := node.Status.Allocatable[apiext.BatchMemory]; ok { + metrics.RecordNodeResourceAllocatable(string(apiext.BatchMemory), float64(util.QuantityPtr(q).Value())) + } else { + metrics.RecordNodeResourceAllocatable(string(apiext.BatchMemory), 0) + } } diff --git a/pkg/koordlet/statesinformer/states_node_test.go b/pkg/koordlet/statesinformer/states_node_test.go index ac95d853a..f558c20e3 100644 --- a/pkg/koordlet/statesinformer/states_node_test.go +++ b/pkg/koordlet/statesinformer/states_node_test.go @@ -20,22 +20,62 @@ import ( "testing" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apiext "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" ) func Test_statesInformer_syncNode(t *testing.T) { - testingNode := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Labels: map[string]string{}, + tests := []struct { + name string + arg *corev1.Node + }{ + { + name: "node is nil", + arg: nil, + }, + { + name: "node is incomplete", + arg: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Labels: map[string]string{}, + }, + }, + }, + { + name: "node is valid", + arg: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Labels: map[string]string{}, + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("200Gi"), + apiext.BatchCPU: resource.MustParse("50000"), + apiext.BatchMemory: resource.MustParse("80Gi"), + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("200Gi"), + apiext.BatchCPU: resource.MustParse("50000"), + apiext.BatchMemory: resource.MustParse("80Gi"), + }, + }, + }, }, } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &nodeInformer{} + metrics.Register(tt.arg) + defer metrics.Register(nil) - m := &nodeInformer{} - metrics.Register(testingNode) - defer metrics.Register(nil) - - m.syncNode(testingNode) + m.syncNode(tt.arg) + }) + } } diff --git a/pkg/koordlet/statesinformer/states_pods.go b/pkg/koordlet/statesinformer/states_pods.go index 63403353e..be978f2da 100644 --- a/pkg/koordlet/statesinformer/states_pods.go +++ b/pkg/koordlet/statesinformer/states_pods.go @@ -28,6 +28,8 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client/config" + apiext "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" "github.com/koordinator-sh/koordinator/pkg/koordlet/pleg" koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" @@ -117,6 +119,7 @@ func (s *podsInformer) Start(stopCh <-chan struct{}) { klog.Fatalf("Unable to run the pleg: ", err) } }() + klog.V(2).Infof("pod informer started") <-stopCh } @@ -146,16 +149,21 @@ func (s *podsInformer) syncPods() error { return err } newPodMap := make(map[string]*PodMeta, len(podList.Items)) + // reset pod container metrics + resetPodMetrics() for _, pod := range podList.Items { - newPodMap[string(pod.UID)] = &PodMeta{ + podMeta := &PodMeta{ Pod: pod.DeepCopy(), CgroupDir: genPodCgroupParentDir(&pod), } + newPodMap[string(pod.UID)] = podMeta + // record pod container metrics + recordPodResourceMetrics(podMeta) } s.podMap = newPodMap s.podHasSynced.Store(true) s.podUpdatedTime = time.Now() - klog.Infof("get pods success, len %d", len(s.podMap)) + klog.Infof("get pods success, len %d, time %s", len(s.podMap), s.podUpdatedTime.String()) s.callbackRunner.SendCallback(RegisterTypeAllPods) return nil } @@ -233,3 +241,51 @@ func genPodCgroupParentDir(pod *corev1.Pod) string { // e.g. kubepods-burstable.slice/kubepods-burstable-pod9dba1d9e_67ba_4db6_8a73_fb3ea297c363.slice/ return koordletutil.GetPodKubeRelativePath(pod) } + +func resetPodMetrics() { + metrics.ResetContainerResourceRequests() + metrics.ResetContainerResourceLimits() +} + +func recordPodResourceMetrics(podMeta *PodMeta) { + if podMeta == nil || podMeta.Pod == nil { + klog.V(5).Infof("failed to record pod resources metric, pod is invalid: %v", podMeta) + return + } + pod := podMeta.Pod + + // record (regular) container metrics + containerStatusMap := map[string]*corev1.ContainerStatus{} + for i := range pod.Status.ContainerStatuses { + containerStatus := &pod.Status.ContainerStatuses[i] + containerStatusMap[containerStatus.Name] = containerStatus + } + for i := range pod.Spec.Containers { + c := &pod.Spec.Containers[i] + containerStatus, ok := containerStatusMap[c.Name] + if !ok { + klog.V(6).Infof("skip record container resources metric, container %s/%s/%s status not exist", + pod.Namespace, pod.Name, c.Name) + continue + } + recordContainerResourceMetrics(c, containerStatus, pod) + } + + klog.V(6).Infof("record pod prometheus metrics successfully, pod %s", pod.Namespace, pod.Name) +} + +func recordContainerResourceMetrics(container *corev1.Container, containerStatus *corev1.ContainerStatus, pod *corev1.Pod) { + // record pod requests/limits of BatchCPU & BatchMemory + if q, ok := container.Resources.Requests[apiext.BatchCPU]; ok { + metrics.RecordContainerResourceRequests(string(apiext.BatchCPU), containerStatus, pod, float64(util.QuantityPtr(q).Value())) + } + if q, ok := container.Resources.Requests[apiext.BatchMemory]; ok { + metrics.RecordContainerResourceRequests(string(apiext.BatchMemory), containerStatus, pod, float64(util.QuantityPtr(q).Value())) + } + if q, ok := container.Resources.Limits[apiext.BatchCPU]; ok { + metrics.RecordContainerResourceLimits(string(apiext.BatchCPU), containerStatus, pod, float64(util.QuantityPtr(q).Value())) + } + if q, ok := container.Resources.Limits[apiext.BatchMemory]; ok { + metrics.RecordContainerResourceLimits(string(apiext.BatchMemory), containerStatus, pod, float64(util.QuantityPtr(q).Value())) + } +} diff --git a/pkg/koordlet/statesinformer/states_pods_test.go b/pkg/koordlet/statesinformer/states_pods_test.go index b8a32de79..f51efead2 100644 --- a/pkg/koordlet/statesinformer/states_pods_test.go +++ b/pkg/koordlet/statesinformer/states_pods_test.go @@ -25,13 +25,17 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.uber.org/atomic" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" + apiext "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" ) @@ -138,17 +142,16 @@ func Test_statesInformer_syncPods(t *testing.T) { callbackRunner: NewCallbackRunner(), } - m.syncPods() + err := m.syncPods() + assert.NoError(t, err) if len(m.GetAllPods()) != 1 { t.Fatal("failed to update pods") } m.kubelet = &testErrorKubeletStub{} - err := m.syncPods() - if err == nil { - t.Fatalf("need not nil error, but get error %+v", err) - } + err = m.syncPods() + assert.Error(t, err) } func Test_newKubeletStub(t *testing.T) { @@ -250,7 +253,8 @@ func Test_newKubeletStub(t *testing.T) { func setConfigs(t *testing.T, dir string) { // Set KUBECONFIG env value kubeconfigEnvPath := filepath.Join(dir, "kubeconfig-text-context") - os.WriteFile(kubeconfigEnvPath, []byte(genKubeconfig("from-env")), 0644) + err := os.WriteFile(kubeconfigEnvPath, []byte(genKubeconfig("from-env")), 0644) + assert.NoError(t, err) t.Setenv(clientcmd.RecommendedConfigPathEnvVar, kubeconfigEnvPath) } @@ -295,3 +299,152 @@ func Test_statesInformer_syncKubeletLoop(t *testing.T) { time.Sleep(5 * time.Second) close(stopCh) } + +func Test_resetPodMetrics(t *testing.T) { + testingNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Labels: map[string]string{}, + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("200Gi"), + apiext.BatchCPU: resource.MustParse("50000"), + apiext.BatchMemory: resource.MustParse("80Gi"), + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("200Gi"), + apiext.BatchCPU: resource.MustParse("50000"), + apiext.BatchMemory: resource.MustParse("80Gi"), + }, + }, + } + assert.NotPanics(t, func() { + metrics.Register(testingNode) + defer metrics.Register(nil) + + resetPodMetrics() + }) +} + +func Test_recordPodResourceMetrics(t *testing.T) { + testingNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Labels: map[string]string{}, + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("200Gi"), + apiext.BatchCPU: resource.MustParse("50000"), + apiext.BatchMemory: resource.MustParse("80Gi"), + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("200Gi"), + apiext.BatchCPU: resource.MustParse("50000"), + apiext.BatchMemory: resource.MustParse("80Gi"), + }, + }, + } + testingPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test_pod", + Namespace: "test_pod_namespace", + UID: "test01", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test_container", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("4Gi"), + }, + }, + }, + }, + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "test_container", + ContainerID: "containerd://testxxx", + }, + }, + }, + } + testingBatchPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test_batch_pod", + Namespace: "test_batch_pod_namespace", + UID: "batch01", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test_batch_container", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + apiext.BatchCPU: resource.MustParse("1000"), + apiext.BatchMemory: resource.MustParse("2Gi"), + }, + Limits: corev1.ResourceList{ + apiext.BatchCPU: resource.MustParse("1000"), + apiext.BatchMemory: resource.MustParse("2Gi"), + }, + }, + }, + }, + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "test_batch_container", + ContainerID: "containerd://batchxxx", + }, + }, + }, + } + tests := []struct { + name string + arg *PodMeta + }{ + { + name: "pod meta is invalid", + arg: nil, + }, + { + name: "pod meta is invalid 1", + arg: &PodMeta{}, + }, + { + name: "record a normally pod", + arg: &PodMeta{ + Pod: testingPod, + }, + }, + { + name: "record a batch pod", + arg: &PodMeta{ + Pod: testingBatchPod, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metrics.Register(testingNode) + defer metrics.Register(nil) + + recordPodResourceMetrics(tt.arg) + }) + } +} diff --git a/pkg/util/resource.go b/pkg/util/resource.go index c9b4dc76c..ea2109780 100644 --- a/pkg/util/resource.go +++ b/pkg/util/resource.go @@ -68,3 +68,7 @@ func IsResourceDiff(old, new corev1.ResourceList, resourceName corev1.ResourceNa return newQuant >= oldQuant*(1+diffThreshold) || newQuant <= oldQuant*(1-diffThreshold) } + +func QuantityPtr(q resource.Quantity) *resource.Quantity { + return &q +} diff --git a/pkg/util/resource_test.go b/pkg/util/resource_test.go index 7323916dd..c5e0473e3 100644 --- a/pkg/util/resource_test.go +++ b/pkg/util/resource_test.go @@ -19,6 +19,7 @@ package util import ( "testing" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" ) @@ -92,3 +93,39 @@ func TestIsResourceDiff(t *testing.T) { }) } } + +func TestQuantityPtr(t *testing.T) { + testQuantity := resource.MustParse("1000") + testQuantityPtr := &testQuantity + testQuantity1 := resource.MustParse("20Gi") + testQuantityPtr1 := &testQuantity1 + testQuantityPtr2 := resource.NewQuantity(1000, resource.DecimalSI) + testQuantity2 := *testQuantityPtr2 + tests := []struct { + name string + arg resource.Quantity + want *resource.Quantity + }{ + { + name: "quantity 0", + arg: testQuantity, + want: testQuantityPtr, + }, + { + name: "quantity 1", + arg: testQuantity1, + want: testQuantityPtr1, + }, + { + name: "quantity 2", + arg: testQuantity2, + want: testQuantityPtr2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := QuantityPtr(tt.arg) + assert.Equal(t, tt.want, got) + }) + } +}