Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubelet: add usageNanoCores from CRI stats provider #73659

Merged
merged 1 commit into from Feb 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 63 additions & 0 deletions pkg/kubelet/stats/cri_stats_provider.go
Expand Up @@ -22,6 +22,7 @@ import (
"path"
"sort"
"strings"
"sync"
"time"

cadvisorfs "github.com/google/cadvisor/fs"
Expand All @@ -38,6 +39,11 @@ import (
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)

var (
// defaultCachePeriod is the default cache period for each cpuUsage.
defaultCachePeriod = 10 * time.Minute
)

// criStatsProvider implements the containerStatsProvider interface by getting
// the container stats from CRI.
type criStatsProvider struct {
Expand All @@ -54,6 +60,10 @@ type criStatsProvider struct {
imageService internalapi.ImageManagerService
// logMetrics provides the metrics for container logs
logMetricsService LogMetricsService

// cpuUsageCache caches the cpu usage for containers.
cpuUsageCache map[string]*runtimeapi.CpuUsage
mutex sync.Mutex
}

// newCRIStatsProvider returns a containerStatsProvider implementation that
Expand All @@ -71,6 +81,7 @@ func newCRIStatsProvider(
runtimeService: runtimeService,
imageService: imageService,
logMetricsService: logMetricsService,
cpuUsageCache: make(map[string]*runtimeapi.CpuUsage),
}
}

Expand Down Expand Up @@ -165,6 +176,8 @@ func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
}
ps.Containers = append(ps.Containers, *cs)
}
// cleanup outdated caches.
p.cleanupOutdatedCaches()

result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats))
for _, s := range sandboxIDToPodStats {
Expand Down Expand Up @@ -247,6 +260,8 @@ func (p *criStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, erro
}
ps.Containers = append(ps.Containers, *cs)
}
// cleanup outdated caches.
p.cleanupOutdatedCaches()

result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats))
for _, s := range sandboxIDToPodStats {
Expand Down Expand Up @@ -450,6 +465,11 @@ func (p *criStatsProvider) makeContainerStats(
if stats.Cpu.UsageCoreNanoSeconds != nil {
result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
}

usageNanoCores := p.getContainerUsageNanoCores(stats)
if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores
}
}
if stats.Memory != nil {
result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
Expand Down Expand Up @@ -506,6 +526,11 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
if stats.Cpu.UsageCoreNanoSeconds != nil {
result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
}

usageNanoCores := p.getContainerUsageNanoCores(stats)
if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores
}
}
if stats.Memory != nil {
result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
Expand All @@ -516,6 +541,44 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
return result
}

// getContainerUsageNanoCores gets usageNanoCores based on cached usageCoreNanoSeconds.
func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
if stats == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil {
return nil
}

p.mutex.Lock()
defer func() {
// Update cache with new value.
p.cpuUsageCache[stats.Attributes.Id] = stats.Cpu
p.mutex.Unlock()
}()

cached, ok := p.cpuUsageCache[stats.Attributes.Id]
if !ok || cached.UsageCoreNanoSeconds == nil {
return nil
}

nanoSeconds := stats.Cpu.Timestamp - cached.Timestamp
usageNanoCores := (stats.Cpu.UsageCoreNanoSeconds.Value - cached.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds)
return &usageNanoCores
}

func (p *criStatsProvider) cleanupOutdatedCaches() {
p.mutex.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there may not be any expired entries for successive calls to cleanupOutdatedCaches, I wonder if the func can return early if it is called within defaultCachePeriod of the previous call.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, but that requires more work here, e.g. another property indicating the oldest Timestamp in the cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a property recording the time of last cleanup (so that we don't call this so often) ?

defer p.mutex.Unlock()

for k, v := range p.cpuUsageCache {
if v == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stats.Cpu is checked on line 546 above, when would the value be nil ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for preventing panics, e.g. in tests

delete(p.cpuUsageCache, k)
}

if time.Since(time.Unix(0, v.Timestamp)) > defaultCachePeriod {
delete(p.cpuUsageCache, k)
}
}
}

// removeTerminatedContainer returns the specified container but with
// the stats of the terminated containers removed.
func removeTerminatedContainer(containers []*runtimeapi.Container) []*runtimeapi.Container {
Expand Down
107 changes: 107 additions & 0 deletions pkg/kubelet/stats/cri_stats_provider_test.go
Expand Up @@ -645,3 +645,110 @@ func makeFakeLogStats(seed int) *volume.Metrics {
m.InodesUsed = resource.NewQuantity(int64(seed+offsetInodeUsage), resource.BinarySI)
return m
}

func TestGetContainerUsageNanoCores(t *testing.T) {
var value0 uint64
var value1 uint64 = 10000000000

tests := []struct {
desc string
cpuUsageCache map[string]*runtimeapi.CpuUsage
stats *runtimeapi.ContainerStats
expected *uint64
}{
{
desc: "should return nil if stats is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
},
{
desc: "should return nil if cpu stats is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: nil,
},
},
{
desc: "should return nil if usageCoreNanoSeconds is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: &runtimeapi.CpuUsage{
Timestamp: 1,
UsageCoreNanoSeconds: nil,
},
},
},
{
desc: "should return nil if cpu stats is not cached yet",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: &runtimeapi.CpuUsage{
Timestamp: 1,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
},
{
desc: "should return zero value if cached cpu stats is equal to current value",
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: &runtimeapi.CpuUsage{
Timestamp: 1,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
cpuUsageCache: map[string]*runtimeapi.CpuUsage{
"1": {
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
expected: &value0,
},
{
desc: "should return correct value if cached cpu stats is not equal to current value",
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: &runtimeapi.CpuUsage{
Timestamp: int64(time.Second / time.Nanosecond),
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 20000000000,
},
},
},
cpuUsageCache: map[string]*runtimeapi.CpuUsage{
"1": {
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
expected: &value1,
},
}

for _, test := range tests {
provider := &criStatsProvider{cpuUsageCache: test.cpuUsageCache}
real := provider.getContainerUsageNanoCores(test.stats)
assert.Equal(t, test.expected, real, test.desc)
}
}