Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix computing of cpu nano core usage #74933

Merged
merged 1 commit into from
Mar 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kubelet/metrics/collectors/volume_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func TestVolumeStatsCollector(t *testing.T) {

mockStatsProvider := new(statstest.StatsProvider)
mockStatsProvider.On("ListPodStats").Return(podStats, nil)
mockStatsProvider.On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil)
if err := testutil.CollectAndCompare(&volumeStatsCollector{statsProvider: mockStatsProvider}, strings.NewReader(want), metrics...); err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/kubelet/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,11 @@ func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Vo
return map[string]volume.Volume{}, true
}

func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil }
func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil }
func (*fakeKubelet) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
return nil, nil
}
func (*fakeKubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, nil }
func (*fakeKubelet) ImageFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (*fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil }
Expand Down
9 changes: 8 additions & 1 deletion pkg/kubelet/server/stats/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,15 @@ type Provider interface {
//
// ListPodStats returns the stats of all the containers managed by pods.
ListPodStats() ([]statsapi.PodStats, error)
// ListPodCPUAndMemoryStats returns the CPU and memory stats of all the containers managed by pods.
// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
// the containers and returns the stats for all the pod-managed containers.
ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error)
// ListPodStatsAndUpdateCPUNanoCoreUsage returns the stats of all the
// containers managed by pods and force update the cpu usageNanoCores.
// This is a workaround for CRI runtimes that do not integrate with
// cadvisor. See https://github.com/kubernetes/kubernetes/issues/72788
// for more details.
ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error)
// ImageFsStats returns the stats of the image filesystem.
ImageFsStats() (*statsapi.FsStats, error)

Expand Down
8 changes: 7 additions & 1 deletion pkg/kubelet/server/stats/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,16 @@ func (sp *summaryProviderImpl) Get(updateStats bool) (*statsapi.Summary, error)
if err != nil {
return nil, fmt.Errorf("failed to get imageFs stats: %v", err)
}
podStats, err := sp.provider.ListPodStats()
var podStats []statsapi.PodStats
if updateStats {
podStats, err = sp.provider.ListPodStatsAndUpdateCPUNanoCoreUsage()
} else {
podStats, err = sp.provider.ListPodStats()
}
if err != nil {
return nil, fmt.Errorf("failed to list pod stats: %v", err)
}

rlimit, err := sp.provider.RlimitStats()
if err != nil {
return nil, fmt.Errorf("failed to get rlimit stats: %v", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/server/stats/summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestSummaryProviderGetStats(t *testing.T) {
On("GetNodeConfig").Return(nodeConfig).
On("GetPodCgroupRoot").Return(cgroupRoot).
On("ListPodStats").Return(podStats, nil).
On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil).
On("ImageFsStats").Return(imageFsStats, nil).
On("RootFsStats").Return(rootFsStats, nil).
On("RlimitStats").Return(rlimitStats, nil).
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/server/stats/summary_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestSummaryProvider(t *testing.T) {
On("GetNodeConfig").Return(nodeConfig).
On("GetPodCgroupRoot").Return(cgroupRoot).
On("ListPodStats").Return(podStats, nil).
On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil).
On("ImageFsStats").Return(imageFsStats, nil).
On("RootFsStats").Return(rootFsStats, nil).
On("RlimitStats").Return(nil, nil).
Expand Down
23 changes: 23 additions & 0 deletions pkg/kubelet/server/stats/testing/mock_stats_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,29 @@ func (_m *StatsProvider) ListPodStats() ([]v1alpha1.PodStats, error) {
return r0, r1
}

// ListPodStatsAndUpdateCPUNanoCoreUsage provides a mock function with given fields:
func (_m *StatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]v1alpha1.PodStats, error) {
ret := _m.Called()

var r0 []v1alpha1.PodStats
if rf, ok := ret.Get(0).(func() []v1alpha1.PodStats); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]v1alpha1.PodStats)
}
}

var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// ListPodCPUAndMemoryStats provides a mock function with given fields:
func (_m *StatsProvider) ListPodCPUAndMemoryStats() ([]v1alpha1.PodStats, error) {
ret := _m.Called()
Expand Down
8 changes: 8 additions & 0 deletions pkg/kubelet/stats/cadvisor_stats_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ func (p *cadvisorStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
return result, nil
}

// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
// the containers and returns the stats for all the pod-managed containers.
// For cadvisor, cpu nano core usages are pre-computed and cached, so this
// function simply calls ListPodStats.
func (p *cadvisorStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
return p.ListPodStats()
}

// ListPodCPUAndMemoryStats returns the cpu and memory stats of all the pod-managed containers.
func (p *cadvisorStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) {
infos, err := getCadvisorContainerInfo(p.cadvisor)
Expand Down
105 changes: 86 additions & 19 deletions pkg/kubelet/stats/cri_stats_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ var (
defaultCachePeriod = 10 * time.Minute
)

// cpuUsageRecord holds the cpu usage stats and the calculated usageNanoCores.
type cpuUsageRecord struct {
stats *runtimeapi.CpuUsage
usageNanoCores *uint64
}

// criStatsProvider implements the containerStatsProvider interface by getting
// the container stats from CRI.
type criStatsProvider struct {
Expand All @@ -63,8 +69,8 @@ type criStatsProvider struct {
logMetricsService LogMetricsService

// cpuUsageCache caches the cpu usage for containers.
cpuUsageCache map[string]*runtimeapi.CpuUsage
mutex sync.Mutex
cpuUsageCache map[string]*cpuUsageRecord
mutex sync.RWMutex
}

// newCRIStatsProvider returns a containerStatsProvider implementation that
Expand All @@ -82,12 +88,32 @@ func newCRIStatsProvider(
runtimeService: runtimeService,
imageService: imageService,
logMetricsService: logMetricsService,
cpuUsageCache: make(map[string]*runtimeapi.CpuUsage),
cpuUsageCache: make(map[string]*cpuUsageRecord),
}
}

// ListPodStats returns the stats of all the pod-managed containers.
func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
// Don't update CPU nano core usage.
return p.listPodStats(false)
}

// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
// the containers and returns the stats for all the pod-managed containers.
// This is a workaround because CRI runtimes do not supply nano core usages,
// so this function calculate the difference between the current and the last
// (cached) cpu stats to calculate this metrics. The implementation assumes a
// single caller to periodically invoke this function to update the metrics. If
// there exist multiple callers, the period used to compute the cpu usage may
// vary and the usage could be incoherent (e.g., spiky). If no caller calls
// this function, the cpu usage will stay nil. Right now, eviction manager is
// the only caller, and it calls this function every 10s.
func (p *criStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
// Update CPU nano core usage.
return p.listPodStats(true)
}

func (p *criStatsProvider) listPodStats(updateCPUNanoCoreUsage bool) ([]statsapi.PodStats, error) {
// Gets node root filesystem information, which will be used to populate
// the available and capacity bytes/inodes in container stats.
rootFsInfo, err := p.cadvisor.RootFsInfo()
Expand Down Expand Up @@ -157,7 +183,7 @@ func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
}

// Fill available stats for full set of required pod stats
cs := p.makeContainerStats(stats, container, &rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata().GetUid())
cs := p.makeContainerStats(stats, container, &rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata().GetUid(), updateCPUNanoCoreUsage)
p.addPodNetworkStats(ps, podSandboxID, caInfos, cs)
p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)

Expand Down Expand Up @@ -435,6 +461,7 @@ func (p *criStatsProvider) makeContainerStats(
rootFsInfo *cadvisorapiv2.FsInfo,
fsIDtoInfo map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo,
uid string,
updateCPUNanoCoreUsage bool,
) *statsapi.ContainerStats {
result := &statsapi.ContainerStats{
Name: stats.Attributes.Metadata.Name,
Expand All @@ -450,8 +477,12 @@ func (p *criStatsProvider) makeContainerStats(
if stats.Cpu.UsageCoreNanoSeconds != nil {
result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
}

usageNanoCores := p.getContainerUsageNanoCores(stats)
var usageNanoCores *uint64
if updateCPUNanoCoreUsage {
usageNanoCores = p.getAndUpdateContainerUsageNanoCores(stats)
} else {
usageNanoCores = p.getContainerUsageNanoCores(stats)
}
if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores
}
Expand Down Expand Up @@ -541,27 +572,63 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
return result
}

// getContainerUsageNanoCores gets usageNanoCores based on cached usageCoreNanoSeconds.
// getContainerUsageNanoCores gets the cached usageNanoCores.
func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
if stats == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil {
if stats == nil || stats.Attributes == nil {
return nil
}

p.mutex.Lock()
defer func() {
// Update cache with new value.
p.cpuUsageCache[stats.Attributes.Id] = stats.Cpu
p.mutex.Unlock()
}()
p.mutex.RLock()
defer p.mutex.RUnlock()

cached, ok := p.cpuUsageCache[stats.Attributes.Id]
if !ok || cached.UsageCoreNanoSeconds == nil {
if !ok || cached.usageNanoCores == nil {
return nil
}
// return a copy of the usage
latestUsage := *cached.usageNanoCores
return &latestUsage
}

nanoSeconds := stats.Cpu.Timestamp - cached.Timestamp
usageNanoCores := (stats.Cpu.UsageCoreNanoSeconds.Value - cached.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds)
return &usageNanoCores
// getContainerUsageNanoCores computes usageNanoCores based on the given and
// the cached usageCoreNanoSeconds, updates the cache with the computed
// usageNanoCores, and returns the usageNanoCores.
func (p *criStatsProvider) getAndUpdateContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
if stats == nil || stats.Attributes == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil {
return nil
}
id := stats.Attributes.Id
usage, err := func() (*uint64, error) {
p.mutex.Lock()
defer p.mutex.Unlock()

cached, ok := p.cpuUsageCache[id]
if !ok || cached.stats.UsageCoreNanoSeconds == nil {
// Cannot compute the usage now, but update the cached stats anyway
p.cpuUsageCache[id] = &cpuUsageRecord{stats: stats.Cpu, usageNanoCores: nil}
return nil, nil
}

newStats := stats.Cpu
cachedStats := cached.stats
nanoSeconds := newStats.Timestamp - cachedStats.Timestamp
if nanoSeconds <= 0 {
return nil, fmt.Errorf("zero or negative interval (%v - %v)", newStats.Timestamp, cachedStats.Timestamp)
}
usageNanoCores := (newStats.UsageCoreNanoSeconds.Value - cachedStats.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds)

// Update cache with new value.
usageToUpdate := usageNanoCores
p.cpuUsageCache[id] = &cpuUsageRecord{stats: newStats, usageNanoCores: &usageToUpdate}

return &usageNanoCores, nil
}()

if err != nil {
// This should not happen. Log now to raise visiblity
klog.Errorf("failed updating cpu usage nano core: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should nil be returned in this case ?

}
return usage
}

func (p *criStatsProvider) cleanupOutdatedCaches() {
Expand All @@ -573,7 +640,7 @@ func (p *criStatsProvider) cleanupOutdatedCaches() {
delete(p.cpuUsageCache, k)
}

if time.Since(time.Unix(0, v.Timestamp)) > defaultCachePeriod {
if time.Since(time.Unix(0, v.stats.Timestamp)) > defaultCachePeriod {
delete(p.cpuUsageCache, k)
}
}
Expand Down
41 changes: 27 additions & 14 deletions pkg/kubelet/stats/cri_stats_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,17 +692,17 @@ func TestGetContainerUsageNanoCores(t *testing.T) {

tests := []struct {
desc string
cpuUsageCache map[string]*runtimeapi.CpuUsage
cpuUsageCache map[string]*cpuUsageRecord
stats *runtimeapi.ContainerStats
expected *uint64
}{
{
desc: "should return nil if stats is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
cpuUsageCache: map[string]*cpuUsageRecord{},
},
{
desc: "should return nil if cpu stats is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
cpuUsageCache: map[string]*cpuUsageRecord{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
Expand All @@ -712,7 +712,7 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
},
{
desc: "should return nil if usageCoreNanoSeconds is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
cpuUsageCache: map[string]*cpuUsageRecord{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
Expand All @@ -725,7 +725,7 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
},
{
desc: "should return nil if cpu stats is not cached yet",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
cpuUsageCache: map[string]*cpuUsageRecord{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
Expand All @@ -751,11 +751,13 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
},
},
},
cpuUsageCache: map[string]*runtimeapi.CpuUsage{
cpuUsageCache: map[string]*cpuUsageRecord{
"1": {
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
stats: &runtimeapi.CpuUsage{
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
},
Expand All @@ -774,11 +776,13 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
},
},
},
cpuUsageCache: map[string]*runtimeapi.CpuUsage{
cpuUsageCache: map[string]*cpuUsageRecord{
"1": {
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
stats: &runtimeapi.CpuUsage{
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
},
Expand All @@ -788,7 +792,16 @@ func TestGetContainerUsageNanoCores(t *testing.T) {

for _, test := range tests {
provider := &criStatsProvider{cpuUsageCache: test.cpuUsageCache}
real := provider.getContainerUsageNanoCores(test.stats)
// Before the update, the cached value should be nil
cached := provider.getContainerUsageNanoCores(test.stats)
assert.Nil(t, cached)

// Update the cache and get the latest value.
real := provider.getAndUpdateContainerUsageNanoCores(test.stats)
assert.Equal(t, test.expected, real, test.desc)

// After the update, the cached value should be up-to-date
cached = provider.getContainerUsageNanoCores(test.stats)
assert.Equal(t, test.expected, cached, test.desc)
}
}
1 change: 1 addition & 0 deletions pkg/kubelet/stats/stats_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type StatsProvider struct {
// containers managed by pods.
type containerStatsProvider interface {
ListPodStats() ([]statsapi.PodStats, error)
ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error)
ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error)
ImageFsStats() (*statsapi.FsStats, error)
ImageFsDevice() (string, error)
Expand Down