Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #73659: Kubelet: add usageNanoCores from CRI stats provider #74933: Fix computing of cpu nano core usage #76770

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kubelet/metrics/collectors/volume_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func TestVolumeStatsCollector(t *testing.T) {

mockStatsProvider := new(statstest.StatsProvider)
mockStatsProvider.On("ListPodStats").Return(podStats, nil)
mockStatsProvider.On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil)
if err := gatherAndCompare(&volumeStatsCollector{statsProvider: mockStatsProvider}, want, metrics); err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/kubelet/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,11 @@ func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Vo
func (_ *fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (_ *fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil }
func (_ *fakeKubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, nil }
func (_ *fakeKubelet) ImageFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (_ *fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil }
func (_ *fakeKubelet) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
return nil, nil
}
func (_ *fakeKubelet) ImageFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (_ *fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil }
func (_ *fakeKubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) {
return nil, nil, nil
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/kubelet/server/stats/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,15 @@ type StatsProvider interface {
//
// ListPodStats returns the stats of all the containers managed by pods.
ListPodStats() ([]statsapi.PodStats, error)
// ListPodCPUAndMemoryStats returns the CPU and memory stats of all the containers managed by pods.
// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
// the containers and returns the stats for all the pod-managed containers.
ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error)
// ListPodStatsAndUpdateCPUNanoCoreUsage returns the stats of all the
// containers managed by pods and force update the cpu usageNanoCores.
// This is a workaround for CRI runtimes that do not integrate with
// cadvisor. See https://github.com/kubernetes/kubernetes/issues/72788
// for more details.
ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error)
// ImageFsStats returns the stats of the image filesystem.
ImageFsStats() (*statsapi.FsStats, error)

Expand Down
8 changes: 7 additions & 1 deletion pkg/kubelet/server/stats/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,16 @@ func (sp *summaryProviderImpl) Get(updateStats bool) (*statsapi.Summary, error)
if err != nil {
return nil, fmt.Errorf("failed to get imageFs stats: %v", err)
}
podStats, err := sp.provider.ListPodStats()
var podStats []statsapi.PodStats
if updateStats {
podStats, err = sp.provider.ListPodStatsAndUpdateCPUNanoCoreUsage()
} else {
podStats, err = sp.provider.ListPodStats()
}
if err != nil {
return nil, fmt.Errorf("failed to list pod stats: %v", err)
}

rlimit, err := sp.provider.RlimitStats()
if err != nil {
return nil, fmt.Errorf("failed to get rlimit stats: %v", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/server/stats/summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestSummaryProviderGetStats(t *testing.T) {
On("GetNodeConfig").Return(nodeConfig).
On("GetPodCgroupRoot").Return(cgroupRoot).
On("ListPodStats").Return(podStats, nil).
On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil).
On("ImageFsStats").Return(imageFsStats, nil).
On("RootFsStats").Return(rootFsStats, nil).
On("RlimitStats").Return(rlimitStats, nil).
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/server/stats/summary_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestSummaryProvider(t *testing.T) {
On("GetNodeConfig").Return(nodeConfig).
On("GetPodCgroupRoot").Return(cgroupRoot).
On("ListPodStats").Return(podStats, nil).
On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil).
On("ImageFsStats").Return(imageFsStats, nil).
On("RootFsStats").Return(rootFsStats, nil).
On("RlimitStats").Return(nil, nil).
Expand Down
23 changes: 23 additions & 0 deletions pkg/kubelet/server/stats/testing/mock_stats_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,29 @@ func (_m *StatsProvider) ListPodStats() ([]v1alpha1.PodStats, error) {
return r0, r1
}

// ListPodStatsAndUpdateCPUNanoCoreUsage provides a mock function with given fields:
func (_m *StatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]v1alpha1.PodStats, error) {
ret := _m.Called()

var r0 []v1alpha1.PodStats
if rf, ok := ret.Get(0).(func() []v1alpha1.PodStats); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]v1alpha1.PodStats)
}
}

var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// ListPodCPUAndMemoryStats provides a mock function with given fields:
func (_m *StatsProvider) ListPodCPUAndMemoryStats() ([]v1alpha1.PodStats, error) {
ret := _m.Called()
Expand Down
8 changes: 8 additions & 0 deletions pkg/kubelet/stats/cadvisor_stats_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ func (p *cadvisorStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
return result, nil
}

// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
// the containers and returns the stats for all the pod-managed containers.
// For cadvisor, cpu nano core usages are pre-computed and cached, so this
// function simply calls ListPodStats.
func (p *cadvisorStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
return p.ListPodStats()
}

// ListPodCPUAndMemoryStats returns the cpu and memory stats of all the pod-managed containers.
func (p *cadvisorStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) {
infos, err := getCadvisorContainerInfo(p.cadvisor)
Expand Down
132 changes: 131 additions & 1 deletion pkg/kubelet/stats/cri_stats_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"path"
"sort"
"strings"
"sync"
"time"

cadvisorfs "github.com/google/cadvisor/fs"
Expand All @@ -39,6 +40,17 @@ import (
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)

var (
// defaultCachePeriod is the default cache period for each cpuUsage.
defaultCachePeriod = 10 * time.Minute
)

// cpuUsageRecord holds the cpu usage stats and the calculated usageNanoCores.
type cpuUsageRecord struct {
stats *runtimeapi.CpuUsage
usageNanoCores *uint64
}

// criStatsProvider implements the containerStatsProvider interface by getting
// the container stats from CRI.
type criStatsProvider struct {
Expand All @@ -55,6 +67,10 @@ type criStatsProvider struct {
imageService internalapi.ImageManagerService
// logMetrics provides the metrics for container logs
logMetricsService LogMetricsService

// cpuUsageCache caches the cpu usage for containers.
cpuUsageCache map[string]*cpuUsageRecord
mutex sync.RWMutex
}

// newCRIStatsProvider returns a containerStatsProvider implementation that
Expand All @@ -72,11 +88,32 @@ func newCRIStatsProvider(
runtimeService: runtimeService,
imageService: imageService,
logMetricsService: logMetricsService,
cpuUsageCache: make(map[string]*cpuUsageRecord),
}
}

// ListPodStats returns the stats of all the pod-managed containers.
func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
// Don't update CPU nano core usage.
return p.listPodStats(false)
}

// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
// the containers and returns the stats for all the pod-managed containers.
// This is a workaround because CRI runtimes do not supply nano core usages,
// so this function calculate the difference between the current and the last
// (cached) cpu stats to calculate this metrics. The implementation assumes a
// single caller to periodically invoke this function to update the metrics. If
// there exist multiple callers, the period used to compute the cpu usage may
// vary and the usage could be incoherent (e.g., spiky). If no caller calls
// this function, the cpu usage will stay nil. Right now, eviction manager is
// the only caller, and it calls this function every 10s.
func (p *criStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
// Update CPU nano core usage.
return p.listPodStats(true)
}

func (p *criStatsProvider) listPodStats(updateCPUNanoCoreUsage bool) ([]statsapi.PodStats, error) {
// Gets node root filesystem information, which will be used to populate
// the available and capacity bytes/inodes in container stats.
rootFsInfo, err := p.cadvisor.RootFsInfo()
Expand Down Expand Up @@ -146,7 +183,7 @@ func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
}

// Fill available stats for full set of required pod stats
cs := p.makeContainerStats(stats, container, &rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata().GetUid())
cs := p.makeContainerStats(stats, container, &rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata().GetUid(), updateCPUNanoCoreUsage)
p.addPodNetworkStats(ps, podSandboxID, caInfos, cs)
p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)

Expand All @@ -160,6 +197,8 @@ func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
}
ps.Containers = append(ps.Containers, *cs)
}
// cleanup outdated caches.
p.cleanupOutdatedCaches()

result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats))
for _, s := range sandboxIDToPodStats {
Expand Down Expand Up @@ -242,6 +281,8 @@ func (p *criStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, erro
}
ps.Containers = append(ps.Containers, *cs)
}
// cleanup outdated caches.
p.cleanupOutdatedCaches()

result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats))
for _, s := range sandboxIDToPodStats {
Expand Down Expand Up @@ -420,6 +461,7 @@ func (p *criStatsProvider) makeContainerStats(
rootFsInfo *cadvisorapiv2.FsInfo,
fsIDtoInfo map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo,
uid string,
updateCPUNanoCoreUsage bool,
) *statsapi.ContainerStats {
result := &statsapi.ContainerStats{
Name: stats.Attributes.Metadata.Name,
Expand All @@ -435,6 +477,15 @@ func (p *criStatsProvider) makeContainerStats(
if stats.Cpu.UsageCoreNanoSeconds != nil {
result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
}
var usageNanoCores *uint64
if updateCPUNanoCoreUsage {
usageNanoCores = p.getAndUpdateContainerUsageNanoCores(stats)
} else {
usageNanoCores = p.getContainerUsageNanoCores(stats)
}
if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores
}
} else {
result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
result.CPU.UsageCoreNanoSeconds = Uint64Ptr(0)
Expand Down Expand Up @@ -497,6 +548,11 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
if stats.Cpu.UsageCoreNanoSeconds != nil {
result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
}

usageNanoCores := p.getContainerUsageNanoCores(stats)
if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores
}
} else {
result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
result.CPU.UsageCoreNanoSeconds = Uint64Ptr(0)
Expand All @@ -514,6 +570,80 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
return result
}

// getContainerUsageNanoCores gets the cached usageNanoCores.
func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
if stats == nil || stats.Attributes == nil {
return nil
}

p.mutex.RLock()
defer p.mutex.RUnlock()

cached, ok := p.cpuUsageCache[stats.Attributes.Id]
if !ok || cached.usageNanoCores == nil {
return nil
}
// return a copy of the usage
latestUsage := *cached.usageNanoCores
return &latestUsage
}

// getContainerUsageNanoCores computes usageNanoCores based on the given and
// the cached usageCoreNanoSeconds, updates the cache with the computed
// usageNanoCores, and returns the usageNanoCores.
func (p *criStatsProvider) getAndUpdateContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
if stats == nil || stats.Attributes == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil {
return nil
}
id := stats.Attributes.Id
usage, err := func() (*uint64, error) {
p.mutex.Lock()
defer p.mutex.Unlock()

cached, ok := p.cpuUsageCache[id]
if !ok || cached.stats.UsageCoreNanoSeconds == nil {
// Cannot compute the usage now, but update the cached stats anyway
p.cpuUsageCache[id] = &cpuUsageRecord{stats: stats.Cpu, usageNanoCores: nil}
return nil, nil
}

newStats := stats.Cpu
cachedStats := cached.stats
nanoSeconds := newStats.Timestamp - cachedStats.Timestamp
if nanoSeconds <= 0 {
return nil, fmt.Errorf("zero or negative interval (%v - %v)", newStats.Timestamp, cachedStats.Timestamp)
}
usageNanoCores := (newStats.UsageCoreNanoSeconds.Value - cachedStats.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds)

// Update cache with new value.
usageToUpdate := usageNanoCores
p.cpuUsageCache[id] = &cpuUsageRecord{stats: newStats, usageNanoCores: &usageToUpdate}

return &usageNanoCores, nil
}()

if err != nil {
// This should not happen. Log now to raise visiblity
klog.Errorf("failed updating cpu usage nano core: %v", err)
}
return usage
}

func (p *criStatsProvider) cleanupOutdatedCaches() {
p.mutex.Lock()
defer p.mutex.Unlock()

for k, v := range p.cpuUsageCache {
if v == nil {
delete(p.cpuUsageCache, k)
}

if time.Since(time.Unix(0, v.stats.Timestamp)) > defaultCachePeriod {
delete(p.cpuUsageCache, k)
}
}
}

// removeTerminatedContainer returns the specified container but with
// the stats of the terminated containers removed.
func removeTerminatedContainer(containers []*runtimeapi.Container) []*runtimeapi.Container {
Expand Down