Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added handling custom metrics to resource consumer. #18511

Merged
merged 1 commit into from
Dec 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 11 additions & 9 deletions test/images/resource-consumer/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Resource Consumer

## Overview

Resource Consumer is a tool which allows to generate cpu/memory utilization in a container.
The reason why it was created is testing kubernetes autoscaling.
Resource Consumer can help with autoscaling tests for:
Expand All @@ -10,8 +9,7 @@ Resource Consumer can help with autoscaling tests for:
- vertical autoscaling of pod - changing its resource limits.

## Usage

Resource Consumer starts an HTTP server and handle sended requests.
Resource Consumer starts an HTTP server and handle sent requests.
It listens on port given as a flag (default 8080).
Action of consuming resources is send to the container by a POST http request.
Each http request creates new process.
Expand All @@ -20,12 +18,10 @@ Http request handler is in file resource_consumer_handler.go
The container consumes specified amount of resources:

- CPU in millicores,
- Memory in megabytes.


- Memory in megabytes,
- Fake custom metrics.

###Consume CPU http request

- suffix "ConsumeCPU",
- parameters "millicores" and "durationSec".

Expand All @@ -36,13 +32,19 @@ and if consumption is too high binary sleeps for 10 millisecond.
One replica of Resource Consumer cannot consume more that 1 cpu.

###Consume Memory http request

- suffix "ConsumeMem",
- parameters "megabytes" and "durationSec".

Consumes specified amount of megabytes for durationSec seconds.
Consume Memory uses stress tool (stress -m 1 --vm-bytes megabytes --vm-hang 0 -t durationSec).
Request leading to consumig more memory then container limit will be ignored.
Request leading to consuming more memory then container limit will be ignored.

###Bump value of a fake custom metric
- suffix "BumpMetric",
- parameters "metric", "delta" and "durationSec".

Bumps metric with given name by delta for durationSec seconds.
Custom metrics in Prometheus format are exposed on "/metrics" endpoint.

###CURL example
```console
Expand Down
2 changes: 1 addition & 1 deletion test/images/resource-consumer/resource_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ var port = flag.Int("port", 8080, "Port number.")

func main() {
flag.Parse()
var resourceConsumerHandler ResourceConsumerHandler
resourceConsumerHandler := NewResourceConsumerHandler()
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), resourceConsumerHandler))
}
125 changes: 100 additions & 25 deletions test/images/resource-consumer/resource_consumer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"time"
)

const (
Expand All @@ -30,17 +32,34 @@ const (
notGivenFunctionArgument = "not given function argument"
consumeCPUAddress = "/ConsumeCPU"
consumeMemAddress = "/ConsumeMem"
bumpMetricAddress = "/BumpMetric"
getCurrentStatusAddress = "/GetCurrentStatus"
metricsAddress = "/metrics"
millicoresQuery = "millicores"
megabytesQuery = "megabytes"
metricNameQuery = "metric"
deltaQuery = "delta"
durationSecQuery = "durationSec"
)

type ResourceConsumerHandler struct{}
type ResourceConsumerHandler struct {
metrics map[string]float64
metricsLock sync.Mutex
}

func NewResourceConsumerHandler() ResourceConsumerHandler {
return ResourceConsumerHandler{metrics: map[string]float64{}}
}

func (handler ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// handle exposing metrics in Prometheus format (both GET & POST)
if req.URL.Path == metricsAddress {
handler.handleMetrics(w)
return
}
if req.Method != "POST" {
http.Error(w, badRequest, http.StatusBadRequest)
return
}
// parsing POST request data and URL data
if err := req.ParseForm(); err != nil {
Expand All @@ -62,6 +81,11 @@ func (handler ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *htt
handler.handleGetCurrentStatus(w)
return
}
// handle bumpMetric
if req.URL.Path == bumpMetricAddress {
handler.handleBumpMetric(w, req.Form)
return
}
http.Error(w, unknownFunction, http.StatusNotFound)
}

Expand All @@ -72,21 +96,20 @@ func (handler ResourceConsumerHandler) handleConsumeCPU(w http.ResponseWriter, q
if durationSecString == "" || millicoresString == "" {
http.Error(w, notGivenFunctionArgument, http.StatusBadRequest)
return
} else {
// convert data (strings to ints) for consumeCPU
durationSec, durationSecError := strconv.Atoi(durationSecString)
millicores, millicoresError := strconv.Atoi(millicoresString)
if durationSecError != nil || millicoresError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}
go ConsumeCPU(millicores, durationSec)
fmt.Fprintln(w, consumeCPUAddress[1:])
fmt.Fprintln(w, millicores, millicoresQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}

// convert data (strings to ints) for consumeCPU
durationSec, durationSecError := strconv.Atoi(durationSecString)
millicores, millicoresError := strconv.Atoi(millicoresString)
if durationSecError != nil || millicoresError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}

go ConsumeCPU(millicores, durationSec)
fmt.Fprintln(w, consumeCPUAddress[1:])
fmt.Fprintln(w, millicores, millicoresQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}

func (handler ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, query url.Values) {
Expand All @@ -96,23 +119,75 @@ func (handler ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, q
if durationSecString == "" || megabytesString == "" {
http.Error(w, notGivenFunctionArgument, http.StatusBadRequest)
return
} else {
// convert data (strings to ints) for consumeMem
durationSec, durationSecError := strconv.Atoi(durationSecString)
megabytes, megabytesError := strconv.Atoi(megabytesString)
if durationSecError != nil || megabytesError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}
go ConsumeMem(megabytes, durationSec)
fmt.Fprintln(w, consumeMemAddress[1:])
fmt.Fprintln(w, megabytes, megabytesQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}

// convert data (strings to ints) for consumeMem
durationSec, durationSecError := strconv.Atoi(durationSecString)
megabytes, megabytesError := strconv.Atoi(megabytesString)
if durationSecError != nil || megabytesError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}

go ConsumeMem(megabytes, durationSec)
fmt.Fprintln(w, consumeMemAddress[1:])
fmt.Fprintln(w, megabytes, megabytesQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}

func (handler ResourceConsumerHandler) handleGetCurrentStatus(w http.ResponseWriter) {
GetCurrentStatus()
fmt.Fprintln(w, "Warning: not implemented!")
fmt.Fprint(w, getCurrentStatusAddress[1:])
}

func (handler ResourceConsumerHandler) handleMetrics(w http.ResponseWriter) {
handler.metricsLock.Lock()
defer handler.metricsLock.Unlock()
for k, v := range handler.metrics {
fmt.Fprintf(w, "# HELP %s info message.\n", k)
fmt.Fprintf(w, "# TYPE %s gauge\n", k)
fmt.Fprintf(w, "%s %f\n", k, v)
}
}

func (handler ResourceConsumerHandler) bumpMetric(metric string, delta float64, duration time.Duration) {
handler.metricsLock.Lock()
if _, ok := handler.metrics[metric]; ok {
handler.metrics[metric] += delta
} else {
handler.metrics[metric] = delta
}
handler.metricsLock.Unlock()

time.Sleep(duration)

handler.metricsLock.Lock()
handler.metrics[metric] -= delta
handler.metricsLock.Unlock()
}

func (handler ResourceConsumerHandler) handleBumpMetric(w http.ResponseWriter, query url.Values) {
// geting string data for handleBumpMetric
metric := query.Get(metricNameQuery)
deltaString := query.Get(deltaQuery)
durationSecString := query.Get(durationSecQuery)
if durationSecString == "" || metric == "" || deltaString == "" {
http.Error(w, notGivenFunctionArgument, http.StatusBadRequest)
return
}

// convert data (strings to ints/floats) for handleBumpMetric
durationSec, durationSecError := strconv.Atoi(durationSecString)
delta, deltaError := strconv.ParseFloat(deltaString, 64)
if durationSecError != nil || deltaError != nil {
http.Error(w, incorrectFunctionArgument, http.StatusBadRequest)
return
}

go handler.bumpMetric(metric, delta, time.Duration(durationSec)*time.Second)
fmt.Fprintln(w, bumpMetricAddress[1:])
fmt.Fprintln(w, metric, metricNameQuery)
fmt.Fprintln(w, delta, deltaQuery)
fmt.Fprintln(w, durationSec, durationSecQuery)
}