Skip to content

Commit

Permalink
add more metrics in kcmas (#169)
Browse files Browse the repository at this point in the history
* add more metric for kcmas and refine implementation for malachite fetcher

* add locks for registerred metric fetcher
  • Loading branch information
waynepeking348 committed Jul 31, 2023
1 parent f9e16b5 commit 71cad3a
Show file tree
Hide file tree
Showing 25 changed files with 788 additions and 682 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2
github.com/google/cadvisor v0.44.1
github.com/kubewharf/katalyst-api v0.1.12
github.com/kubewharf/katalyst-api v0.1.14
github.com/opencontainers/runc v1.1.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.1.12 h1:dmfXMzknvgAPL/DI5hUmU9JGbrI6X3TUs4M9a0jZxYg=
github.com/kubewharf/katalyst-api v0.1.12/go.mod h1:iVILS5UL5PRtkUPH2Iu1K/gFGTPMNItnth5fmQ80VGE=
github.com/kubewharf/katalyst-api v0.1.14 h1:sqqON9qztKq5s8PVkbT2R0OvQJFRhBYN/odTrXFJZ5U=
github.com/kubewharf/katalyst-api v0.1.14/go.mod h1:iVILS5UL5PRtkUPH2Iu1K/gFGTPMNItnth5fmQ80VGE=
github.com/kubewharf/kubelet v1.24.6-kubewharf.6 h1:36IfOYzDL4Eb8uwJgpq2080lIn04Il+MbmFx5yi46UA=
github.com/kubewharf/kubelet v1.24.6-kubewharf.6/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/sysadvisor/metacache/metacache.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type AdvisorMetaWriter interface {
SetRegionInfo(regionName string, regionInfo *types.RegionInfo) error
}

type AdvisorNotifier struct{}

type MetaCache interface {
MetaReader
RawMetaWriter
Expand Down
73 changes: 73 additions & 0 deletions pkg/agent/sysadvisor/plugin/metric-emitter/syncer/node/advisor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package node

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"

apimetricnode "github.com/kubewharf/katalyst-api/pkg/metric/node"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
pkgconsts "github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/custom-metric/store/data"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/metric"
)

func (n *MetricSyncerNode) advisorMetric(ctx context.Context) {
tags := n.generateMetricTag(ctx)
general.InfofV(4, "get metric advisor metric for node")

// todo add metrics for knob-status

pod2Pools := make(map[string]string)
n.metaReader.RangeContainer(func(podUID string, containerName string, containerInfo *types.ContainerInfo) bool {
pod2Pools[podUID] = containerInfo.OwnerPoolName
return true
})

pool2Pods := make(map[string][]*v1.Pod)
_, err := n.metaServer.GetPodList(ctx, func(pod *v1.Pod) bool {
if pool, ok := pod2Pools[string(pod.UID)]; ok {
pool2Pods[pool] = append(pool2Pods[pool], pod)
}
return false
})
if err != nil {
general.Errorf("get podList from metaServer failed: %v", err)
return
}

for pool, pods := range pool2Pods {
v := n.metaServer.AggregatePodMetric(pods, pkgconsts.MetricLoad1MinContainer, metric.AggregatorSum, metric.DefaultContainerMetricFilter)
_ = n.dataEmitter.StoreFloat64(apimetricnode.CustomMetricNodeAdvisorPoolLoad1Min, v.Value, metrics.MetricTypeNameRaw, append(tags,
[]metrics.MetricTag{
{
Key: fmt.Sprintf("%s", data.CustomMetricLabelKeyTimestamp),
Val: fmt.Sprintf("%v", v.Time.UnixMilli()),
},
{
Key: fmt.Sprintf("%s%s", data.CustomMetricLabelSelectorPrefixKey, "pool"),
Val: fmt.Sprintf("%v", pool),
},
}...)...)
}

}
10 changes: 8 additions & 2 deletions pkg/agent/sysadvisor/plugin/metric-emitter/syncer/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package node
import (
"context"
"fmt"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

apimetricnode "github.com/kubewharf/katalyst-api/pkg/metric/node"
Expand All @@ -41,7 +43,8 @@ import (
// nodeRawMetricNameMapping maps the raw metricName (collected from agent.MetricsFetcher)
// to the standard metricName (used by custom-metric-api-server)
var nodeRawMetricNameMapping = map[string]string{
consts.MetricLoad1MinSystem: apimetricnode.CustomMetricNodeCPULoad1Min,
consts.MetricLoad1MinSystem: apimetricnode.CustomMetricNodeCPULoad1Min,

consts.MetricMemFreeSystem: apimetricnode.CustomMetricNodeMemoryFree,
consts.MetricMemAvailableSystem: apimetricnode.CustomMetricNodeMemoryAvailable,
}
Expand Down Expand Up @@ -103,6 +106,9 @@ func (n *MetricSyncerNode) Run(ctx context.Context) {
}

go n.receiveRawNode(ctx, rChan)
go wait.Until(func() {
n.advisorMetric(ctx)
}, time.Second*3, ctx.Done())
}

// receiveRawNode receives notified response from raw data source
Expand Down Expand Up @@ -139,7 +145,7 @@ func (n *MetricSyncerNode) receiveRawNode(ctx context.Context, rChan chan metric

// generateMetricTag generates tags that are bounded to current Node object
func (n *MetricSyncerNode) generateMetricTag(ctx context.Context) (tags []metrics.MetricTag) {
if n.node == nil {
if n.node == nil && n.metaServer != nil && n.metaServer.NodeFetcher != nil {
node, err := n.metaServer.GetNode(ctx)
if err != nil {
klog.Warningf("get current node failed: %v", err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
var podRawMetricNameMapping = map[string]string{
consts.MetricLoad1MinContainer: apimetricpod.CustomMetricPodCPULoad1Min,
consts.MetricCPUUsageContainer: apimetricpod.CustomMetricPodCPUUsage,

consts.MetricMemRssContainer: apimetricpod.CustomMetricPodMemoryRSS,
}

type podRawChanel struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/sysadvisor/types/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type ControlKnob map[ControlKnobName]ControlKnobValue
type ControlKnobName string

const (
// ControlKnobNonReclaimedCPUSize refers to cpu requirement of non reclaimed workloads
// ControlKnobNonReclaimedCPUSize refers to cpu requirement of non-reclaimed workloads
ControlKnobNonReclaimedCPUSize ControlKnobName = "non-reclaimed-cpu-size"

// ControlKnobNonReclaimedCPUSizeUpper refers to the upper cpu size, for isolated pods now
Expand Down
11 changes: 5 additions & 6 deletions pkg/metaserver/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,17 @@ func NewMetaAgent(conf *config.Configuration, clientSet *client.GenericClientSet
}

metaAgent := &MetaAgent{
start: false,
PodFetcher: podFetcher,
NodeFetcher: node.NewRemoteNodeFetcher(conf.NodeName, clientSet.KubeClient.CoreV1().Nodes()),
CNRFetcher: cnr.NewCachedCNRFetcher(conf.NodeName, conf.CNRCacheTTL,
clientSet.InternalClient.NodeV1alpha1().CustomNodeResources()),
start: false,
PodFetcher: podFetcher,
NodeFetcher: node.NewRemoteNodeFetcher(conf.NodeName, clientSet.KubeClient.CoreV1().Nodes()),
CNRFetcher: cnr.NewCachedCNRFetcher(conf.NodeName, conf.CNRCacheTTL, clientSet.InternalClient.NodeV1alpha1().CustomNodeResources()),
KubeletConfigFetcher: kubeletconfig.NewKubeletConfigFetcher(conf, emitter),
KatalystMachineInfo: machineInfo,
Conf: conf,
}

if conf.EnableMetricsFetcher {
metaAgent.MetricsFetcher = malachite.NewMalachiteMetricsFetcher(emitter, conf)
metaAgent.MetricsFetcher = malachite.NewMalachiteMetricsFetcher(emitter, metaAgent, conf)
} else {
metaAgent.MetricsFetcher = metric.NewFakeMetricsFetcher(emitter)
}
Expand Down
21 changes: 17 additions & 4 deletions pkg/metaserver/agent/metric/fake_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ package metric

import (
"context"
"sync"

v1 "k8s.io/api/core/v1"

Expand All @@ -49,19 +50,31 @@ func NewFakeMetricsFetcher(emitter metrics.MetricEmitter) MetricsFetcher {
}

type FakeMetricsFetcher struct {
metricStore *metric.MetricStore
emitter metrics.MetricEmitter
sync.RWMutex
metricStore *metric.MetricStore
emitter metrics.MetricEmitter
registeredMetric []func(store *metric.MetricStore)
}

func (f *FakeMetricsFetcher) Run(ctx context.Context) {}
func (f *FakeMetricsFetcher) Run(ctx context.Context) {
f.RLock()
defer f.RUnlock()
for _, fu := range f.registeredMetric {
fu(f.metricStore)
}
}

func (f *FakeMetricsFetcher) RegisterNotifier(scope MetricsScope, req NotifiedRequest, response chan NotifiedResponse) string {
return ""
}

func (f *FakeMetricsFetcher) DeRegisterNotifier(scope MetricsScope, key string) {}

func (f *FakeMetricsFetcher) RegisterExternalMetric(_ func(store *metric.MetricStore)) {}
func (f *FakeMetricsFetcher) RegisterExternalMetric(fu func(store *metric.MetricStore)) {
f.Lock()
defer f.Unlock()
f.registeredMetric = append(f.registeredMetric, fu)
}

func (f *FakeMetricsFetcher) GetNodeMetric(metricName string) (metric.MetricData, error) {
return f.metricStore.GetNodeMetric(metricName)
Expand Down
157 changes: 0 additions & 157 deletions pkg/metaserver/agent/metric/malachite/cgroup/cgroup_test.go

This file was deleted.

0 comments on commit 71cad3a

Please sign in to comment.