Skip to content

Commit

Permalink
koordlet: add metrics for batch resources (koordinator-sh#913)
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube authored and lucming committed Feb 8, 2023
1 parent 731a99d commit 08506ae
Show file tree
Hide file tree
Showing 18 changed files with 593 additions and 68 deletions.
2 changes: 1 addition & 1 deletion pkg/koordlet/koordlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (d *daemon) Run(stopCh <-chan struct{}) {

go func() {
if err := d.runtimeHook.Run(stopCh); err != nil {
klog.Errorf("Unable to run the runtimeHook: ", err)
klog.Error("Unable to run the runtimeHook: ", err)
os.Exit(1)
}
}()
Expand Down
1 change: 1 addition & 0 deletions pkg/koordlet/metrics/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
Name: "start_time",
Help: "the start time of koordlet",
}, []string{NodeKey})

CollectNodeCPUInfoStatus = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: KoordletSubsystem,
Name: "collect_node_cpu_info_status",
Expand Down
5 changes: 4 additions & 1 deletion pkg/koordlet/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

func init() {
prometheus.MustRegister(CommonCollectors...)
prometheus.MustRegister(ResourceSummaryCollectors...)
prometheus.MustRegister(CPICollectors...)
prometheus.MustRegister(PSICollectors...)
prometheus.MustRegister(CPUSuppressCollector...)
Expand All @@ -38,7 +39,7 @@ const (
NodeKey = "node"

StatusKey = "status"
StatusSucceed = "succeed"
StatusSucceed = "succeeded"
StatusFailed = "failed"

EvictionReasonKey = "reason"
Expand All @@ -50,6 +51,8 @@ const (
PodUID = "pod_uid"
PodName = "pod_name"
PodNamespace = "pod_namespace"

ResourceKey = "resource"
)

var (
Expand Down
110 changes: 108 additions & 2 deletions pkg/koordlet/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
"github.com/koordinator-sh/koordinator/pkg/util"
)

func TestGenNodeLabels(t *testing.T) {
Expand Down Expand Up @@ -131,8 +133,8 @@ func TestCommonCollectors(t *testing.T) {
RecordCollectNodeCPUInfoStatus(testingErr)
RecordCollectNodeCPUInfoStatus(nil)
RecordBESuppressCores("cfsQuota", float64(1000))
RecordBESuppressLSUsedCPU(float64(1000))
RecordNodeUsedCPU(float64(2000))
RecordBESuppressLSUsedCPU(1.0)
RecordNodeUsedCPU(2.0)
RecordContainerScaledCFSBurstUS(testingPod.Namespace, testingPod.Name, testingContainer.ContainerID, testingContainer.Name, 1000000)
RecordContainerScaledCFSQuotaUS(testingPod.Namespace, testingPod.Name, testingContainer.ContainerID, testingContainer.Name, 1000000)
RecordPodEviction("evictByCPU")
Expand All @@ -144,3 +146,107 @@ func TestCommonCollectors(t *testing.T) {
RecordPodPSI(testingPod, testingPSI)
})
}

func TestResourceSummaryCollectors(t *testing.T) {
testingNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
Labels: map[string]string{},
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100"),
corev1.ResourceMemory: resource.MustParse("200Gi"),
apiext.BatchCPU: resource.MustParse("50000"),
apiext.BatchMemory: resource.MustParse("80Gi"),
},
Capacity: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100"),
corev1.ResourceMemory: resource.MustParse("200Gi"),
apiext.BatchCPU: resource.MustParse("50000"),
apiext.BatchMemory: resource.MustParse("80Gi"),
},
},
}
testingPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test_pod",
Namespace: "test_pod_namespace",
UID: "test01",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test_container",
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("4Gi"),
},
},
},
},
},
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "test_container",
ContainerID: "containerd://testxxx",
},
},
},
}
testingBatchPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test_batch_pod",
Namespace: "test_batch_pod_namespace",
UID: "batch01",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test_batch_container",
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
apiext.BatchCPU: resource.MustParse("1000"),
apiext.BatchMemory: resource.MustParse("2Gi"),
},
Limits: corev1.ResourceList{
apiext.BatchCPU: resource.MustParse("1000"),
apiext.BatchMemory: resource.MustParse("2Gi"),
},
},
},
},
},
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "test_batch_container",
ContainerID: "containerd://batchxxx",
},
},
},
}

t.Run("test not panic", func(t *testing.T) {
Register(testingNode)
defer Register(nil)

RecordNodeResourceAllocatable(string(apiext.BatchCPU), float64(util.QuantityPtr(testingNode.Status.Allocatable[apiext.BatchCPU]).Value()))
RecordNodeResourceAllocatable(string(apiext.BatchMemory), float64(util.QuantityPtr(testingNode.Status.Allocatable[apiext.BatchMemory]).Value()))
RecordContainerResourceRequests(string(corev1.ResourceCPU), &testingPod.Status.ContainerStatuses[0], testingPod, float64(testingPod.Spec.Containers[0].Resources.Requests.Cpu().Value()))
RecordContainerResourceRequests(string(corev1.ResourceMemory), &testingPod.Status.ContainerStatuses[0], testingPod, float64(testingPod.Spec.Containers[0].Resources.Requests.Memory().Value()))
RecordContainerResourceRequests(string(apiext.BatchCPU), &testingBatchPod.Status.ContainerStatuses[0], testingBatchPod, float64(util.QuantityPtr(testingBatchPod.Spec.Containers[0].Resources.Requests[apiext.BatchCPU]).Value()))
RecordContainerResourceRequests(string(apiext.BatchMemory), &testingBatchPod.Status.ContainerStatuses[0], testingBatchPod, float64(util.QuantityPtr(testingBatchPod.Spec.Containers[0].Resources.Requests[apiext.BatchMemory]).Value()))
RecordContainerResourceLimits(string(apiext.BatchCPU), &testingBatchPod.Status.ContainerStatuses[0], testingBatchPod, float64(util.QuantityPtr(testingBatchPod.Spec.Containers[0].Resources.Limits[apiext.BatchCPU]).Value()))
RecordContainerResourceLimits(string(apiext.BatchMemory), &testingBatchPod.Status.ContainerStatuses[0], testingBatchPod, float64(util.QuantityPtr(testingBatchPod.Spec.Containers[0].Resources.Limits[apiext.BatchMemory]).Value()))

ResetContainerResourceRequests()
ResetContainerResourceLimits()
})
}
94 changes: 94 additions & 0 deletions pkg/koordlet/metrics/resource_summary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
corev1 "k8s.io/api/core/v1"

"github.com/prometheus/client_golang/prometheus"
)

var (
NodeResourceAllocatable = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: KoordletSubsystem,
Name: "node_resource_allocatable",
Help: "the node allocatable of resources updated by koordinator",
}, []string{NodeKey, ResourceKey})

ContainerResourceRequests = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: KoordletSubsystem,
Name: "container_resource_requests",
Help: "the container requests of resources updated by koordinator",
}, []string{NodeKey, ResourceKey, PodUID, PodName, PodNamespace, ContainerID, ContainerName})

ContainerResourceLimits = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: KoordletSubsystem,
Name: "container_resource_limits",
Help: "the container limits of resources updated by koordinator",
}, []string{NodeKey, ResourceKey, PodUID, PodName, PodNamespace, ContainerID, ContainerName})

ResourceSummaryCollectors = []prometheus.Collector{
NodeResourceAllocatable,
ContainerResourceRequests,
ContainerResourceLimits,
}
)

func RecordNodeResourceAllocatable(resourceName string, value float64) {
labels := genNodeLabels()
if labels == nil {
return
}
labels[ResourceKey] = resourceName
NodeResourceAllocatable.With(labels).Set(value)
}

func RecordContainerResourceRequests(resourceName string, status *corev1.ContainerStatus, pod *corev1.Pod, value float64) {
labels := genNodeLabels()
if labels == nil {
return
}
labels[ResourceKey] = resourceName
labels[PodUID] = string(pod.UID)
labels[PodName] = pod.Name
labels[PodNamespace] = pod.Namespace
labels[ContainerID] = status.ContainerID
labels[ContainerName] = status.Name
ContainerResourceRequests.With(labels).Set(value)
}

func ResetContainerResourceRequests() {
ContainerResourceRequests.Reset()
}

func RecordContainerResourceLimits(resourceName string, status *corev1.ContainerStatus, pod *corev1.Pod, value float64) {
labels := genNodeLabels()
if labels == nil {
return
}
labels[ResourceKey] = resourceName
labels[PodUID] = string(pod.UID)
labels[PodName] = pod.Name
labels[PodNamespace] = pod.Namespace
labels[ContainerID] = status.ContainerID
labels[ContainerName] = status.Name
ContainerResourceLimits.With(labels).Set(value)
}

func ResetContainerResourceLimits() {
ContainerResourceLimits.Reset()
}
20 changes: 10 additions & 10 deletions pkg/koordlet/metricsadvisor/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ func (c *collector) Run(stopCh <-chan struct{}) error {
go wait.Until(func() {
c.collectGPUUsage()
c.collectNodeResUsed()
// add sync metaService cache check before collect pod information
// add sync statesInformer cache check before collect pod information
// because collect function will get all pods.
if !cache.WaitForCacheSync(stopCh, c.statesInformer.HasSynced) {
klog.Errorf("timed out waiting for meta service caches to sync")
// Koordlet exit because of metaService sync failed.
klog.Errorf("timed out waiting for states informer caches to sync")
// Koordlet exit because of statesInformer sync failed.
os.Exit(1)
return
}
Expand All @@ -142,11 +142,11 @@ func (c *collector) Run(stopCh <-chan struct{}) error {

ic := NewPerformanceCollector(c.statesInformer, c.metricCache, c.config.CPICollectorTimeWindowSeconds)
util.RunFeature(func() {
// add sync metaService cache check before collect pod information
// add sync statesInformer cache check before collect pod information
// because collect function will get all pods.
if !cache.WaitForCacheSync(stopCh, c.statesInformer.HasSynced) {
// Koordlet exit because of metaService sync failed.
klog.Fatalf("timed out waiting for meta service caches to sync")
// Koordlet exit because of statesInformer sync failed.
klog.Fatalf("timed out waiting for states informer caches to sync")
return
}
ic.collectContainerCPI()
Expand All @@ -158,11 +158,11 @@ func (c *collector) Run(stopCh <-chan struct{}) error {
klog.Fatalf("collect psi fail, need anolis os")
return
}
// add sync metaService cache check before collect pod information
// add sync statesInformer cache check before collect pod information
// because collect function will get all pods.
if !cache.WaitForCacheSync(stopCh, c.statesInformer.HasSynced) {
// Koordlet exit because of metaService sync failed.
klog.Fatalf("timed out waiting for meta service caches to sync")
// Koordlet exit because of statesInformer sync failed.
klog.Fatalf("timed out waiting for states informer caches to sync")
return
}
ic.collectContainerPSI()
Expand Down Expand Up @@ -217,7 +217,7 @@ func (c *collector) collectNodeResUsed() {

// update collect time
c.state.RefreshTime(nodeResUsedUpdateTime)
metrics.RecordNodeUsedCPU(cpuUsageValue * 1000)
metrics.RecordNodeUsedCPU(cpuUsageValue) // in cpu cores

klog.Infof("collectNodeResUsed finished %+v", nodeMetric)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/koordlet/metricsadvisor/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import (

func TestNewCollector(t *testing.T) {
type args struct {
cfg *Config
metaService statesinformer.StatesInformer
metricCache metriccache.MetricCache
cfg *Config
statesInformer statesinformer.StatesInformer
metricCache metriccache.MetricCache
}
tests := []struct {
name string
Expand All @@ -46,15 +46,15 @@ func TestNewCollector(t *testing.T) {
{
name: "new-collector",
args: args{
cfg: &Config{},
metaService: nil,
metricCache: nil,
cfg: &Config{},
statesInformer: nil,
metricCache: nil,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewCollector(tt.args.cfg, tt.args.metaService, tt.args.metricCache); got == nil {
if got := NewCollector(tt.args.cfg, tt.args.statesInformer, tt.args.metricCache); got == nil {
t.Errorf("NewCollector() = %v", got)
}
})
Expand Down
18 changes: 9 additions & 9 deletions pkg/koordlet/metricsadvisor/performance_collector_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import (

func TestNewPerformanceCollector(t *testing.T) {
type args struct {
cfg *Config
metaService statesinformer.StatesInformer
metricCache metriccache.MetricCache
timeWindow int
cfg *Config
statesInformer statesinformer.StatesInformer
metricCache metriccache.MetricCache
timeWindow int
}
tests := []struct {
name string
Expand All @@ -49,16 +49,16 @@ func TestNewPerformanceCollector(t *testing.T) {
{
name: "new-performance-collector",
args: args{
cfg: &Config{},
metaService: nil,
metricCache: nil,
timeWindow: 10,
cfg: &Config{},
statesInformer: nil,
metricCache: nil,
timeWindow: 10,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewPerformanceCollector(tt.args.metaService, tt.args.metricCache, tt.args.timeWindow); got == nil {
if got := NewPerformanceCollector(tt.args.statesInformer, tt.args.metricCache, tt.args.timeWindow); got == nil {
t.Errorf("NewPerformanceCollector() = %v", got)
}
})
Expand Down
Loading

0 comments on commit 08506ae

Please sign in to comment.