Skip to content

Commit

Permalink
koordlet: support numa topology reporting
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed Aug 2, 2023
1 parent 53efaaf commit 66c0920
Show file tree
Hide file tree
Showing 18 changed files with 1,226 additions and 128 deletions.
1 change: 1 addition & 0 deletions pkg/koordlet/metriccache/metric_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

const (
NodeCPUInfoKey = "node_cpu_info"
NodeNUMAInfoKey = "node_numa_info"
NodeLocalStorageInfoKey = "node_local_storage_info"
)

Expand Down
20 changes: 20 additions & 0 deletions pkg/koordlet/metrics/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ var (
Help: "the count of CollectNodeCPUInfo status",
}, []string{NodeKey, StatusKey})

CollectNodeNUMAInfoStatus = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: KoordletSubsystem,
Name: "collect_node_numa_info_status",
Help: "the count of CollectNodeNUMAInfo status",
}, []string{NodeKey, StatusKey})

CollectNodeLocalStorageInfoStatus = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: KoordletSubsystem,
Name: "collect_node_local_storage_info_status",
Expand Down Expand Up @@ -62,6 +68,8 @@ var (
CommonCollectors = []prometheus.Collector{
KoordletStartTime,
CollectNodeCPUInfoStatus,
CollectNodeNUMAInfoStatus,
CollectNodeLocalStorageInfoStatus,
PodEviction,
PodEvictionDetail.GetCounterVec(),
NodeUsedCPU,
Expand All @@ -87,6 +95,18 @@ func RecordCollectNodeCPUInfoStatus(err error) {
CollectNodeCPUInfoStatus.With(labels).Inc()
}

func RecordCollectNodeNUMAInfoStatus(err error) {
labels := genNodeLabels()
if labels == nil {
return
}
labels[StatusKey] = StatusSucceed
if err != nil {
labels[StatusKey] = StatusFailed
}
CollectNodeNUMAInfoStatus.With(labels).Inc()
}

func RecordCollectNodeLocalStorageInfoStatus(err error) {
labels := genNodeLabels()
if labels == nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/koordlet/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package metrics

import (
"fmt"

"testing"
"time"

Expand Down Expand Up @@ -132,6 +131,10 @@ func TestCommonCollectors(t *testing.T) {
RecordKoordletStartTime(testingNode.Name, float64(testingNow.Unix()))
RecordCollectNodeCPUInfoStatus(testingErr)
RecordCollectNodeCPUInfoStatus(nil)
RecordCollectNodeNUMAInfoStatus(testingErr)
RecordCollectNodeNUMAInfoStatus(nil)
RecordCollectNodeLocalStorageInfoStatus(testingErr)
RecordCollectNodeLocalStorageInfoStatus(nil)
RecordBESuppressCores("cfsQuota", float64(1000))
RecordBESuppressLSUsedCPU(1.0)
RecordNodeUsedCPU(2.0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package nodecpuinfo
package nodeinfo

import (
"time"
Expand All @@ -30,7 +30,7 @@ import (
)

const (
CollectorName = "NodeCPUInfoCollector"
CollectorName = "NodeInfoCollector"
)

// TODO more ut is needed for this plugin
Expand All @@ -55,31 +55,66 @@ func (n *nodeInfoCollector) Enabled() bool {
func (n *nodeInfoCollector) Setup(s *framework.Context) {}

func (n *nodeInfoCollector) Run(stopCh <-chan struct{}) {
go wait.Until(n.collectNodeCPUInfo, n.collectInterval, stopCh)
go wait.Until(n.collectNodeInfo, n.collectInterval, stopCh)
}

func (n *nodeInfoCollector) Started() bool {
return n.started.Load()
}

func (n *nodeInfoCollector) collectNodeCPUInfo() {
func (n *nodeInfoCollector) collectNodeInfo() {
started := time.Now()

err := n.collectNodeCPUInfo()
if err != nil {
klog.Warningf("failed to collect node CPU info, err: %s", err)
return
}

err = n.collectNodeNUMAInfo()
if err != nil {
klog.Warningf("failed to collect node NUMA info, err: %s", err)
return
}

n.started.Store(true)
klog.V(4).Infof("collect node info finished, elapsed %s", time.Since(started).String())
}

func (n *nodeInfoCollector) collectNodeCPUInfo() error {
klog.V(6).Info("start collect node cpu info")

localCPUInfo, err := koordletutil.GetLocalCPUInfo()
if err != nil {
klog.Warningf("failed to collect node cpu info, err: %s", err)
metrics.RecordCollectNodeCPUInfoStatus(err)
return
return err
}

nodeCPUInfo := &metriccache.NodeCPUInfo{
BasicInfo: localCPUInfo.BasicInfo,
ProcessorInfos: localCPUInfo.ProcessorInfos,
TotalInfo: localCPUInfo.TotalInfo,
}
klog.V(6).Infof("collect cpu info finished, nodeCPUInfo %v", nodeCPUInfo)
klog.V(6).Infof("collect cpu info finished, info: %+v", nodeCPUInfo)

n.storage.Set(metriccache.NodeCPUInfoKey, nodeCPUInfo)
n.started.Store(true)
klog.Infof("collectNodeCPUInfo finished, cpu info: processors %v", len(nodeCPUInfo.ProcessorInfos))
klog.V(4).Infof("collectNodeCPUInfo finished, processors num %v", len(nodeCPUInfo.ProcessorInfos))
metrics.RecordCollectNodeCPUInfoStatus(nil)
return nil
}

func (n *nodeInfoCollector) collectNodeNUMAInfo() error {
klog.V(6).Info("start collect node NUMA info")

nodeNUMAInfo, err := koordletutil.GetNodeNUMAInfo()
if err != nil {
metrics.RecordCollectNodeNUMAInfoStatus(err)
return err
}
klog.V(6).Info("collect NUMA info successfully, info %+v", nodeNUMAInfo)

n.storage.Set(metriccache.NodeNUMAInfoKey, nodeNUMAInfo)
klog.V(4).Infof("collectNodeNUMAInfo finished, NUMA node num %v", len(nodeNUMAInfo.NUMAInfos))
metrics.RecordCollectNodeNUMAInfoStatus(nil)
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nodeinfo

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/atomic"

"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metricsadvisor/framework"
koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
)

func TestNodeInfoCollector(t *testing.T) {
helper := system.NewFileTestUtil(t)
defer helper.Cleanup()
metricCache, err := metriccache.NewMetricCache(&metriccache.Config{
TSDBPath: t.TempDir(),
TSDBEnablePromMetrics: false,
})
assert.NoError(t, err)
defer func() {
err = metricCache.Close()
assert.NoError(t, err)
}()
t.Run("test", func(t *testing.T) {
opt := &framework.Options{
Config: &framework.Config{
CollectNodeCPUInfoInterval: 60 * time.Second,
},
MetricCache: metricCache,
}
c := New(opt)
assert.NotNil(t, c)

c.Setup(nil)
assert.True(t, c.Enabled())
assert.False(t, c.Started())

collector := c.(*nodeInfoCollector)
assert.NotPanics(t, func() {
collector.collectNodeInfo()
})
})
}

func Test_collectNodeNUMAInfo(t *testing.T) {
helper := system.NewFileTestUtil(t)
defer helper.Cleanup()
metricCache, err := metriccache.NewMetricCache(&metriccache.Config{
TSDBPath: t.TempDir(),
TSDBEnablePromMetrics: false,
})
assert.NoError(t, err)
defer func() {
err = metricCache.Close()
assert.NoError(t, err)
}()

numaMemInfoContentStr0 := `Node 0 MemTotal: 263432804 kB
Node 0 MemFree: 254391744 kB
Node 0 MemAvailable: 256703236 kB
Node 0 Buffers: 958096 kB
Node 0 Cached: 0 kB
Node 0 SwapCached: 0 kB
Node 0 Active: 2786012 kB
Node 0 Inactive: 2223752 kB
Node 0 Active(anon): 289488 kB
Node 0 Inactive(anon): 1300 kB
Node 0 Active(file): 2496524 kB
Node 0 Inactive(file): 2222452 kB
Node 0 Unevictable: 0 kB
Node 0 Mlocked: 0 kB
Node 0 SwapTotal: 0 kB
Node 0 SwapFree: 0 kB
Node 0 Dirty: 624 kB
Node 0 Writeback: 0 kB
Node 0 AnonPages: 281748 kB
Node 0 Mapped: 495936 kB
Node 0 Shmem: 2340 kB
Node 0 Slab: 1097040 kB
Node 0 SReclaimable: 445164 kB
Node 0 SUnreclaim: 651876 kB
Node 0 KernelStack: 20944 kB
Node 0 PageTables: 7896 kB
Node 0 NFS_Unstable: 0 kB
Node 0 Bounce: 0 kB
Node 0 WritebackTmp: 0 kB
Node 0 AnonHugePages: 38912 kB
Node 0 ShmemHugePages: 0 kB
Node 0 ShmemPmdMapped: 0 kB
Node 0 HugePages_Total: 0
Node 0 HugePages_Free: 0
Node 0 HugePages_Rsvd: 0
Node 0 HugePages_Surp: 0`
numaMemInfoContentStr1 := `Node 1 MemTotal: 263432000 kB
Node 1 MemFree: 254391744 kB
Node 1 MemAvailable: 256703236 kB
Node 1 Buffers: 958096 kB
Node 1 Cached: 0 kB
Node 1 SwapCached: 0 kB
Node 1 Active: 2786012 kB
Node 1 Inactive: 2223752 kB
Node 1 Active(anon): 289488 kB
Node 1 Inactive(anon): 1300 kB
Node 1 Active(file): 2496524 kB
Node 1 Inactive(file): 2222452 kB
Node 1 Unevictable: 0 kB
Node 1 Mlocked: 0 kB
Node 1 SwapTotal: 0 kB
Node 1 SwapFree: 0 kB
Node 1 Dirty: 624 kB
Node 1 Writeback: 0 kB
Node 1 AnonPages: 281748 kB
Node 1 Mapped: 495936 kB
Node 1 Shmem: 2340 kB
Node 1 Slab: 1097040 kB
Node 1 SReclaimable: 445164 kB
Node 1 SUnreclaim: 651876 kB
Node 1 KernelStack: 20944 kB
Node 1 PageTables: 7896 kB
Node 1 NFS_Unstable: 0 kB
Node 1 Bounce: 0 kB
Node 1 WritebackTmp: 0 kB
Node 1 AnonHugePages: 38912 kB
Node 1 ShmemHugePages: 0 kB
Node 1 ShmemPmdMapped: 0 kB
Node 1 HugePages_Total: 0
Node 1 HugePages_Free: 0
Node 1 HugePages_Rsvd: 0
Node 1 HugePages_Surp: 0`
numaMemInfoPath0 := system.GetNUMAMemInfoPath("node0")
helper.WriteFileContents(numaMemInfoPath0, numaMemInfoContentStr0)
numaMemInfoPath1 := system.GetNUMAMemInfoPath("node1")
helper.WriteFileContents(numaMemInfoPath1, numaMemInfoContentStr1)
testMemInfo0 := &koordletutil.MemInfo{
MemTotal: 263432804, MemFree: 254391744, MemAvailable: 256703236,
Buffers: 958096, Cached: 0, SwapCached: 0,
Active: 2786012, Inactive: 2223752, ActiveAnon: 289488,
InactiveAnon: 1300, ActiveFile: 2496524, InactiveFile: 2222452,
Unevictable: 0, Mlocked: 0, SwapTotal: 0,
SwapFree: 0, Dirty: 624, Writeback: 0,
AnonPages: 281748, Mapped: 495936, Shmem: 2340,
Slab: 1097040, SReclaimable: 445164, SUnreclaim: 651876,
KernelStack: 20944, PageTables: 7896, NFS_Unstable: 0,
Bounce: 0, WritebackTmp: 0, AnonHugePages: 38912,
HugePages_Total: 0, HugePages_Free: 0, HugePages_Rsvd: 0,
HugePages_Surp: 0,
}
testMemInfo1 := &koordletutil.MemInfo{
MemTotal: 263432000, MemFree: 254391744, MemAvailable: 256703236,
Buffers: 958096, Cached: 0, SwapCached: 0,
Active: 2786012, Inactive: 2223752, ActiveAnon: 289488,
InactiveAnon: 1300, ActiveFile: 2496524, InactiveFile: 2222452,
Unevictable: 0, Mlocked: 0, SwapTotal: 0,
SwapFree: 0, Dirty: 624, Writeback: 0,
AnonPages: 281748, Mapped: 495936, Shmem: 2340,
Slab: 1097040, SReclaimable: 445164, SUnreclaim: 651876,
KernelStack: 20944, PageTables: 7896, NFS_Unstable: 0,
Bounce: 0, WritebackTmp: 0, AnonHugePages: 38912,
HugePages_Total: 0, HugePages_Free: 0, HugePages_Rsvd: 0,
HugePages_Surp: 0,
}
expected := &koordletutil.NodeNUMAInfo{
NUMAInfos: []koordletutil.NUMAInfo{
{
NUMANodeID: 0,
MemInfo: testMemInfo0,
},
{
NUMANodeID: 1,
MemInfo: testMemInfo1,
},
},
MemInfoMap: map[int32]*koordletutil.MemInfo{
0: testMemInfo0,
1: testMemInfo1,
},
}
t.Run("test", func(t *testing.T) {
c := &nodeInfoCollector{
collectInterval: 60 * time.Second,
storage: metricCache,
started: atomic.NewBool(false),
}

// test collect successfully
err = c.collectNodeNUMAInfo()
assert.NoError(t, err)
nodeNUMAInfoRaw, ok := c.storage.Get(metriccache.NodeNUMAInfoKey)
assert.True(t, ok)
nodeNUMAInfo, ok := nodeNUMAInfoRaw.(*koordletutil.NodeNUMAInfo)
assert.True(t, ok)
assert.Equal(t, expected, nodeNUMAInfo)

// test collect failed when sys files are missing
helper.Cleanup()
err = c.collectNodeNUMAInfo()
assert.Error(t, err)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ func (n *nodeResourceCollector) collectNodeResUsed() {
// get the accumulated cpu ticks
currentCPUTick, err0 := koordletutil.GetCPUStatUsageTicks()
// NOTE: The collected memory usage is in kilobytes not bytes.
memUsageKB, err1 := koordletutil.GetMemInfoUsageKB()
memInfo, err1 := koordletutil.GetMemInfo()
if err0 != nil || err1 != nil {
klog.Warningf("failed to collect node usage, CPU err: %s, Memory err: %s", err0, err1)
return
}

memUsageValue := 1024 * float64(memUsageKB)
memUsageValue := float64(memInfo.MemUsageBytes())
memUsageMetrics, err := metriccache.NodeMemoryUsageMetric.GenerateSample(nil, collectTime, memUsageValue)
if err != nil {
klog.Warningf("generate node cpu metrics failed, err %v", err)
Expand Down

0 comments on commit 66c0920

Please sign in to comment.