-
Notifications
You must be signed in to change notification settings - Fork 38.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add resource monitoring of kube-system pods #16505
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ import ( | |
"encoding/json" | ||
"fmt" | ||
"io/ioutil" | ||
"math" | ||
"net/http" | ||
"sort" | ||
"strconv" | ||
|
@@ -209,8 +210,10 @@ func (r *containerResourceUsage) isStrictlyGreaterThan(rhs *containerResourceUsa | |
return r.CPUUsageInCores > rhs.CPUUsageInCores && r.MemoryUsageInBytes > rhs.MemoryUsageInBytes && r.MemoryWorkingSetInBytes > rhs.MemoryWorkingSetInBytes | ||
} | ||
|
||
type resourceUsagePerContainer map[string]*containerResourceUsage | ||
|
||
// getOneTimeResourceUsageOnNode queries the node's /stats/container endpoint | ||
// and returns the resource usage of targetContainers for the past | ||
// and returns the resource usage of all containerNames for the past | ||
// cpuInterval. | ||
// The acceptable range of the interval is 2s~120s. Be warned that as the | ||
// interval (and #containers) increases, the size of kubelet's response | ||
|
@@ -223,7 +226,19 @@ func (r *containerResourceUsage) isStrictlyGreaterThan(rhs *containerResourceUsa | |
// Note that this is an approximation and may not be accurate, hence we also | ||
// write the actual interval used for calculation (based on the timestamps of | ||
// the stats points in containerResourceUsage.CPUInterval. | ||
func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterval time.Duration) (map[string]*containerResourceUsage, error) { | ||
// | ||
// containerNames is a function returning a collection of contianer names in which | ||
// user is interested in. ExpectMissingContainers is a flag which says if the test | ||
// should fail if one of containers listed by containerNames is missing on any node | ||
// (useful e.g. when looking for system containers or daemons). If set to true function | ||
// is more forgiving and ignores missing containers. | ||
func getOneTimeResourceUsageOnNode( | ||
c *client.Client, | ||
nodeName string, | ||
cpuInterval time.Duration, | ||
containerNames func() []string, | ||
expectMissingContainers bool, | ||
) (resourceUsagePerContainer, error) { | ||
numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds) | ||
if numStats < 2 || numStats > maxNumStatsToRequest { | ||
return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest) | ||
|
@@ -238,12 +253,15 @@ func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterva | |
return nil, err | ||
} | ||
// Process container infos that are relevant to us. | ||
containers := targetContainers() | ||
usageMap := make(map[string]*containerResourceUsage, len(containers)) | ||
containers := containerNames() | ||
usageMap := make(resourceUsagePerContainer, len(containers)) | ||
for _, name := range containers { | ||
info, ok := containerInfos[name] | ||
if !ok { | ||
return nil, fmt.Errorf("missing info for container %q on node %q", name, nodeName) | ||
if !expectMissingContainers { | ||
return nil, fmt.Errorf("missing info for container %q on node %q", name, nodeName) | ||
} | ||
continue | ||
} | ||
first := info.Stats[0] | ||
last := info.Stats[len(info.Stats)-1] | ||
|
@@ -252,12 +270,58 @@ func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterva | |
return usageMap, nil | ||
} | ||
|
||
func getKubeSystemContainersResourceUsage(c *client.Client) (resourceUsagePerContainer, error) { | ||
pods, err := c.Pods("kube-system").List(labels.Everything(), fields.Everything()) | ||
if err != nil { | ||
return resourceUsagePerContainer{}, err | ||
} | ||
nodes, err := c.Nodes().List(labels.Everything(), fields.Everything()) | ||
if err != nil { | ||
return resourceUsagePerContainer{}, err | ||
} | ||
containerIDToNameMap := make(map[string]string) | ||
containerIDs := make([]string, 0) | ||
for _, pod := range pods.Items { | ||
for _, container := range pod.Status.ContainerStatuses { | ||
containerID := strings.TrimPrefix(container.ContainerID, "docker:/") | ||
containerIDToNameMap[containerID] = pod.Name + "/" + container.Name | ||
containerIDs = append(containerIDs, containerID) | ||
} | ||
} | ||
|
||
mutex := sync.Mutex{} | ||
wg := sync.WaitGroup{} | ||
wg.Add(len(nodes.Items)) | ||
errors := make([]error, 0) | ||
nameToUsageMap := make(resourceUsagePerContainer, len(containerIDToNameMap)) | ||
for _, node := range nodes.Items { | ||
go func(nodeName string) { | ||
defer wg.Done() | ||
nodeUsage, err := getOneTimeResourceUsageOnNode(c, nodeName, 5*time.Second, func() []string { return containerIDs }, true) | ||
mutex.Lock() | ||
defer mutex.Unlock() | ||
if err != nil { | ||
errors = append(errors, err) | ||
return | ||
} | ||
for k, v := range nodeUsage { | ||
nameToUsageMap[containerIDToNameMap[k]] = v | ||
} | ||
}(node.Name) | ||
} | ||
wg.Wait() | ||
if len(errors) != 0 { | ||
return resourceUsagePerContainer{}, fmt.Errorf("Errors while gathering usage data: %v", errors) | ||
} | ||
return nameToUsageMap, nil | ||
} | ||
|
||
// logOneTimeResourceUsageSummary collects container resource for the list of | ||
// nodes, formats and logs the stats. | ||
func logOneTimeResourceUsageSummary(c *client.Client, nodeNames []string, cpuInterval time.Duration) { | ||
var summary []string | ||
for _, nodeName := range nodeNames { | ||
stats, err := getOneTimeResourceUsageOnNode(c, nodeName, cpuInterval) | ||
stats, err := getOneTimeResourceUsageOnNode(c, nodeName, cpuInterval, targetContainers, false) | ||
if err != nil { | ||
summary = append(summary, fmt.Sprintf("Error getting resource usage from node %q, err: %v", nodeName, err)) | ||
} else { | ||
|
@@ -267,7 +331,7 @@ func logOneTimeResourceUsageSummary(c *client.Client, nodeNames []string, cpuInt | |
Logf("\n%s", strings.Join(summary, "\n")) | ||
} | ||
|
||
func formatResourceUsageStats(nodeName string, containerStats map[string]*containerResourceUsage) string { | ||
func formatResourceUsageStats(nodeName string, containerStats resourceUsagePerContainer) string { | ||
// Example output: | ||
// | ||
// Resource usage for node "e2e-test-foo-minion-abcde": | ||
|
@@ -287,6 +351,120 @@ func formatResourceUsageStats(nodeName string, containerStats map[string]*contai | |
return fmt.Sprintf("Resource usage on node %q:\n%s", nodeName, buf.String()) | ||
} | ||
|
||
type int64arr []int64 | ||
|
||
func (a int64arr) Len() int { return len(a) } | ||
func (a int64arr) Swap(i, j int) { a[i], a[j] = a[j], a[i] } | ||
func (a int64arr) Less(i, j int) bool { return a[i] < a[j] } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is ridiculous :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rather do it in a followup PR - it needs dealing with the legacy code. |
||
|
||
type usageDataPerContainer struct { | ||
cpuData []float64 | ||
memUseData []int64 | ||
memWorkSetData []int64 | ||
} | ||
|
||
func computePercentiles(timeSeries map[time.Time]resourceUsagePerContainer, percentilesToCompute []int) map[int]resourceUsagePerContainer { | ||
if len(timeSeries) == 0 { | ||
return make(map[int]resourceUsagePerContainer) | ||
} | ||
dataMap := make(map[string]*usageDataPerContainer) | ||
for _, v := range timeSeries { | ||
for k := range v { | ||
dataMap[k] = &usageDataPerContainer{ | ||
cpuData: make([]float64, len(timeSeries)), | ||
memUseData: make([]int64, len(timeSeries)), | ||
memWorkSetData: make([]int64, len(timeSeries)), | ||
} | ||
} | ||
break | ||
} | ||
for _, singleStatistic := range timeSeries { | ||
for name, data := range singleStatistic { | ||
dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores) | ||
dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes) | ||
dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes) | ||
} | ||
} | ||
for _, v := range dataMap { | ||
sort.Float64s(v.cpuData) | ||
sort.Sort(int64arr(v.memUseData)) | ||
sort.Sort(int64arr(v.memWorkSetData)) | ||
} | ||
|
||
result := make(map[int]resourceUsagePerContainer) | ||
for _, perc := range percentilesToCompute { | ||
data := make(resourceUsagePerContainer) | ||
for k, v := range dataMap { | ||
percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1 | ||
data[k] = &containerResourceUsage{ | ||
Name: k, | ||
CPUUsageInCores: v.cpuData[percentileIndex], | ||
MemoryUsageInBytes: v.memUseData[percentileIndex], | ||
MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex], | ||
} | ||
} | ||
result[perc] = data | ||
} | ||
return result | ||
} | ||
|
||
type containerResourceGatherer struct { | ||
usageTimeseries map[time.Time]resourceUsagePerContainer | ||
stopCh chan struct{} | ||
timer *time.Ticker | ||
wg sync.WaitGroup | ||
} | ||
|
||
func (g *containerResourceGatherer) startGatheringData(c *client.Client, period time.Duration) { | ||
g.usageTimeseries = make(map[time.Time]resourceUsagePerContainer) | ||
g.wg.Add(1) | ||
g.stopCh = make(chan struct{}) | ||
g.timer = time.NewTicker(period) | ||
go func() error { | ||
for { | ||
select { | ||
case <-g.timer.C: | ||
now := time.Now() | ||
data, err := getKubeSystemContainersResourceUsage(c) | ||
if err != nil { | ||
return err | ||
} | ||
g.usageTimeseries[now] = data | ||
case <-g.stopCh: | ||
g.wg.Done() | ||
return nil | ||
} | ||
} | ||
}() | ||
} | ||
|
||
func (g *containerResourceGatherer) stopAndPrintData(percentiles []int) { | ||
close(g.stopCh) | ||
g.timer.Stop() | ||
g.wg.Wait() | ||
if len(percentiles) == 0 { | ||
Logf("Warning! Empty percentile list for stopAndPrintData.") | ||
return | ||
} | ||
stats := computePercentiles(g.usageTimeseries, percentiles) | ||
sortedKeys := []string{} | ||
for name := range stats[percentiles[0]] { | ||
sortedKeys = append(sortedKeys, name) | ||
} | ||
sort.Strings(sortedKeys) | ||
for _, perc := range percentiles { | ||
buf := &bytes.Buffer{} | ||
w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0) | ||
fmt.Fprintf(w, "container\tcpu(cores)\tmemory(MB)\n") | ||
for _, name := range sortedKeys { | ||
usage := stats[perc][name] | ||
fmt.Fprintf(w, "%q\t%.3f\t%.2f\n", name, usage.CPUUsageInCores, float64(usage.MemoryWorkingSetInBytes)/(1024*1024)) | ||
} | ||
w.Flush() | ||
Logf("%v percentile:\n%v", perc, buf.String()) | ||
} | ||
} | ||
|
||
// Performs a get on a node proxy endpoint given the nodename and rest client. | ||
func nodeProxyRequest(c *client.Client, node, endpoint string) client.Result { | ||
return c.Get(). | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is expectMissingContainers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment.