Skip to content

Commit

Permalink
Rework of resource consumer.
Browse files Browse the repository at this point in the history
Major rework of resource consumer: added controller running as a pod that spreads requests around consumers. This should fix #21664 and #23536.
  • Loading branch information
jszczepkowski committed Jun 1, 2016
1 parent 2976e89 commit 967a7c9
Show file tree
Hide file tree
Showing 8 changed files with 429 additions and 136 deletions.
4 changes: 4 additions & 0 deletions hack/verify-flags/known-flags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ configure-cbr0
configure-cloud-routes
conntrack-max
conntrack-tcp-timeout-established
consumer-port
consumer-service-name
consumer-service-namespace
contain-pod-resources
container-port
container-runtime
Expand Down Expand Up @@ -463,3 +466,4 @@ watch-only
whitelist-override-label
windows-line-endings
www-prefix

164 changes: 78 additions & 86 deletions test/e2e/autoscaling_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ const (
timeoutRC = 120 * time.Second
startServiceTimeout = time.Minute
startServiceInterval = 5 * time.Second
resourceConsumerImage = "gcr.io/google_containers/resource_consumer:beta2"
resourceConsumerImage = "gcr.io/google_containers/resource_consumer:beta4"
resourceConsumerControllerImage = "gcr.io/google_containers/resource_consumer/controller:beta4"
rcIsNil = "ERROR: replicationController = nil"
deploymentIsNil = "ERROR: deployment = nil"
rsIsNil = "ERROR: replicaset = nil"
Expand All @@ -58,6 +59,7 @@ rc.ConsumeCPU(300)
*/
type ResourceConsumer struct {
name string
controllerName string
kind string
framework *framework.Framework
cpu chan int
Expand Down Expand Up @@ -97,6 +99,7 @@ func newResourceConsumer(name, kind string, replicas, initCPUTotal, initMemoryTo
runServiceAndWorkloadForResourceConsumer(f.Client, f.Namespace.Name, name, kind, replicas, cpuLimit, memLimit)
rc := &ResourceConsumer{
name: name,
controllerName: name + "-controller",
kind: kind,
framework: f,
cpu: make(chan int),
Expand All @@ -111,8 +114,10 @@ func newResourceConsumer(name, kind string, replicas, initCPUTotal, initMemoryTo
requestSizeInMegabytes: requestSizeInMegabytes,
requestSizeCustomMetric: requestSizeCustomMetric,
}

go rc.makeConsumeCPURequests()
rc.ConsumeCPU(initCPUTotal)

go rc.makeConsumeMemRequests()
rc.ConsumeMem(initMemoryTotal)
go rc.makeConsumeCustomMetric()
Expand Down Expand Up @@ -140,25 +145,15 @@ func (rc *ResourceConsumer) ConsumeCustomMetric(amount int) {

func (rc *ResourceConsumer) makeConsumeCPURequests() {
defer GinkgoRecover()
var count int
var rest int
sleepTime := time.Duration(0)
millicores := 0
for {
select {
case millicores := <-rc.cpu:
framework.Logf("RC %s: consume %v millicores in total", rc.name, millicores)
if rc.requestSizeInMillicores != 0 {
count = millicores / rc.requestSizeInMillicores
}
rest = millicores - count*rc.requestSizeInMillicores
case millicores = <-rc.cpu:
framework.Logf("RC %s: setting consumption to %v millicores in total", rc.name, millicores)
case <-time.After(sleepTime):
framework.Logf("RC %s: sending %v requests to consume %v millicores each and 1 request to consume %v millicores", rc.name, count, rc.requestSizeInMillicores, rest)
if count > 0 {
rc.sendConsumeCPURequests(count, rc.requestSizeInMillicores, rc.consumptionTimeInSeconds)
}
if rest > 0 {
go rc.sendOneConsumeCPURequest(rest, rc.consumptionTimeInSeconds)
}
framework.Logf("RC %s: sending request to consume %d millicores", rc.name, millicores)
rc.sendConsumeCPURequest(millicores)
sleepTime = rc.sleepTime
case <-rc.stopCPU:
return
Expand All @@ -168,25 +163,15 @@ func (rc *ResourceConsumer) makeConsumeCPURequests() {

func (rc *ResourceConsumer) makeConsumeMemRequests() {
defer GinkgoRecover()
var count int
var rest int
sleepTime := time.Duration(0)
megabytes := 0
for {
select {
case megabytes := <-rc.mem:
framework.Logf("RC %s: consume %v MB in total", rc.name, megabytes)
if rc.requestSizeInMegabytes != 0 {
count = megabytes / rc.requestSizeInMegabytes
}
rest = megabytes - count*rc.requestSizeInMegabytes
case megabytes = <-rc.mem:
framework.Logf("RC %s: setting consumption to %v MB in total", rc.name, megabytes)
case <-time.After(sleepTime):
framework.Logf("RC %s: sending %v requests to consume %v MB each and 1 request to consume %v MB", rc.name, count, rc.requestSizeInMegabytes, rest)
if count > 0 {
rc.sendConsumeMemRequests(count, rc.requestSizeInMegabytes, rc.consumptionTimeInSeconds)
}
if rest > 0 {
go rc.sendOneConsumeMemRequest(rest, rc.consumptionTimeInSeconds)
}
framework.Logf("RC %s: sending request to consume %d MB", rc.name, megabytes)
rc.sendConsumeMemRequest(megabytes)
sleepTime = rc.sleepTime
case <-rc.stopMem:
return
Expand All @@ -196,91 +181,64 @@ func (rc *ResourceConsumer) makeConsumeMemRequests() {

func (rc *ResourceConsumer) makeConsumeCustomMetric() {
defer GinkgoRecover()
var count int
var rest int
sleepTime := time.Duration(0)
delta := 0
for {
select {
case total := <-rc.customMetric:
framework.Logf("RC %s: consume custom metric %v in total", rc.name, total)
if rc.requestSizeInMegabytes != 0 {
count = total / rc.requestSizeCustomMetric
}
rest = total - count*rc.requestSizeCustomMetric
case delta := <-rc.customMetric:
framework.Logf("RC %s: setting bump of metric %s to %d in total", rc.name, customMetricName, delta)
case <-time.After(sleepTime):
framework.Logf("RC %s: sending %v requests to consume %v custom metric each and 1 request to consume %v",
rc.name, count, rc.requestSizeCustomMetric, rest)
if count > 0 {
rc.sendConsumeCustomMetric(count, rc.requestSizeCustomMetric, rc.consumptionTimeInSeconds)
}
if rest > 0 {
go rc.sendOneConsumeCustomMetric(rest, rc.consumptionTimeInSeconds)
}
framework.Logf("RC %s: sending request to consume %d of custom metric %s", rc.name, delta, customMetricName)
rc.sendConsumeCustomMetric(delta)
sleepTime = rc.sleepTime
case <-rc.stopCustomMetric:
return
}
}
}

func (rc *ResourceConsumer) sendConsumeCPURequests(requests, millicores, durationSec int) {
for i := 0; i < requests; i++ {
go rc.sendOneConsumeCPURequest(millicores, durationSec)
}
}

func (rc *ResourceConsumer) sendConsumeMemRequests(requests, megabytes, durationSec int) {
for i := 0; i < requests; i++ {
go rc.sendOneConsumeMemRequest(megabytes, durationSec)
}
}

func (rc *ResourceConsumer) sendConsumeCustomMetric(requests, delta, durationSec int) {
for i := 0; i < requests; i++ {
go rc.sendOneConsumeCustomMetric(delta, durationSec)
}
}

// sendOneConsumeCPURequest sends POST request for cpu consumption
func (rc *ResourceConsumer) sendOneConsumeCPURequest(millicores int, durationSec int) {
defer GinkgoRecover()
func (rc *ResourceConsumer) sendConsumeCPURequest(millicores int) {
proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post())
framework.ExpectNoError(err)
_, err = proxyRequest.Namespace(rc.framework.Namespace.Name).
Name(rc.name).
req := proxyRequest.Namespace(rc.framework.Namespace.Name).
Name(rc.controllerName).
Suffix("ConsumeCPU").
Param("millicores", strconv.Itoa(millicores)).
Param("durationSec", strconv.Itoa(durationSec)).
DoRaw()
Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
Param("requestSizeMillicores", strconv.Itoa(rc.requestSizeInMillicores))
framework.Logf("URL: %v", *req.URL())
_, err = req.DoRaw()
framework.ExpectNoError(err)
}

// sendOneConsumeMemRequest sends POST request for memory consumption
func (rc *ResourceConsumer) sendOneConsumeMemRequest(megabytes int, durationSec int) {
defer GinkgoRecover()
// sendConsumeMemRequest sends POST request for memory consumption
func (rc *ResourceConsumer) sendConsumeMemRequest(megabytes int) {
proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post())
framework.ExpectNoError(err)
_, err = proxyRequest.Namespace(rc.framework.Namespace.Name).
Name(rc.name).
req := proxyRequest.Namespace(rc.framework.Namespace.Name).
Name(rc.controllerName).
Suffix("ConsumeMem").
Param("megabytes", strconv.Itoa(megabytes)).
Param("durationSec", strconv.Itoa(durationSec)).
DoRaw()
Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
Param("requestSizeMegabytes", strconv.Itoa(rc.requestSizeInMegabytes))
framework.Logf("URL: %v", *req.URL())
_, err = req.DoRaw()
framework.ExpectNoError(err)
}

// sendOneConsumeCustomMetric sends POST request for custom metric consumption
func (rc *ResourceConsumer) sendOneConsumeCustomMetric(delta int, durationSec int) {
defer GinkgoRecover()
// sendConsumeCustomMetric sends POST request for custom metric consumption
func (rc *ResourceConsumer) sendConsumeCustomMetric(delta int) {
proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post())
framework.ExpectNoError(err)
_, err = proxyRequest.Namespace(rc.framework.Namespace.Name).
Name(rc.name).
req := proxyRequest.Namespace(rc.framework.Namespace.Name).
Name(rc.controllerName).
Suffix("BumpMetric").
Param("metric", customMetricName).
Param("delta", strconv.Itoa(delta)).
Param("durationSec", strconv.Itoa(durationSec)).
DoRaw()
Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
Param("requestSizeMetrics", strconv.Itoa(rc.requestSizeCustomMetric))
framework.Logf("URL: %v", *req.URL())
_, err = req.DoRaw()
framework.ExpectNoError(err)
}

Expand Down Expand Up @@ -346,6 +304,8 @@ func (rc *ResourceConsumer) CleanUp() {
time.Sleep(10 * time.Second)
framework.ExpectNoError(framework.DeleteRC(rc.framework.Client, rc.framework.Namespace.Name, rc.name))
framework.ExpectNoError(rc.framework.Client.Services(rc.framework.Namespace.Name).Delete(rc.name))
framework.ExpectNoError(framework.DeleteRC(rc.framework.Client, rc.framework.Namespace.Name, rc.controllerName))
framework.ExpectNoError(rc.framework.Client.Services(rc.framework.Namespace.Name).Delete(rc.controllerName))
}

func runServiceAndWorkloadForResourceConsumer(c *client.Client, ns, name, kind string, replicas int, cpuLimitMillis, memLimitMb int64) {
Expand Down Expand Up @@ -400,6 +360,38 @@ func runServiceAndWorkloadForResourceConsumer(c *client.Client, ns, name, kind s
framework.Failf(invalidKind)
}

By(fmt.Sprintf("Running controller"))
controllerName := name + "-controller"
_, err = c.Services(ns).Create(&api.Service{
ObjectMeta: api.ObjectMeta{
Name: controllerName,
},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{
Port: port,
TargetPort: intstr.FromInt(targetPort),
}},

Selector: map[string]string{
"name": controllerName,
},
},
})
framework.ExpectNoError(err)

dnsClusterFirst := api.DNSClusterFirst
controllerRcConfig := framework.RCConfig{
Client: c,
Image: resourceConsumerControllerImage,
Name: controllerName,
Namespace: ns,
Timeout: timeoutRC,
Replicas: 1,
Command: []string{"/controller", "--consumer-service-name=" + name, "--consumer-service-namespace=" + ns, "--consumer-port=80"},
DNSPolicy: &dnsClusterFirst,
}
framework.ExpectNoError(framework.RunRC(controllerRcConfig))

// Make sure endpoints are propagated.
// TODO(piosz): replace sleep with endpoints watch.
time.Sleep(10 * time.Second)
Expand Down
7 changes: 6 additions & 1 deletion test/e2e/framework/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ type RCConfig struct {
MemRequest int64 // bytes
MemLimit int64 // bytes
ReadinessProbe *api.Probe
DNSPolicy *api.DNSPolicy

// Env vars, set the same for every pod.
Env map[string]string
Expand Down Expand Up @@ -2181,6 +2182,10 @@ func RunRC(config RCConfig) error {

func (config *RCConfig) create() error {
By(fmt.Sprintf("creating replication controller %s in namespace %s", config.Name, config.Namespace))
dnsDefault := api.DNSDefault
if config.DNSPolicy == nil {
config.DNSPolicy = &dnsDefault
}
rc := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: config.Name,
Expand All @@ -2204,7 +2209,7 @@ func (config *RCConfig) create() error {
ReadinessProbe: config.ReadinessProbe,
},
},
DNSPolicy: api.DNSDefault,
DNSPolicy: *config.DNSPolicy,
},
},
},
Expand Down
8 changes: 6 additions & 2 deletions test/images/resource-consumer/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.

TAG = beta2
TAG = beta4
PREFIX = gcr.io/google_containers

all: clean consumer

consumer:
CGO_ENABLED=0 go build -a -installsuffix cgo --ldflags '-w' -o consume-cpu/consume-cpu ./consume-cpu/consume_cpu.go
CGO_ENABLED=0 go build -a -installsuffix cgo --ldflags '-w' -o consumer .
CGO_ENABLED=0 go build -a -installsuffix cgo --ldflags '-w' -o controller/controller ./controller/controller.go

container: image

image:
sudo docker build -t $(PREFIX)/resource_consumer:$(TAG) .
sudo docker build -t $(PREFIX)/resource_consumer/controller:$(TAG) controller

run_container:
docker run --publish=8080:8080 $(PREFIX)/resource_consumer:$(TAG)

push:
@echo "This image is not meant to be pushed."
gcloud docker push ${PREFIX}/resource_consumer:${TAG}
gcloud docker push ${PREFIX}/resource_consumer/controller:${TAG}

clean:
rm -f consumer
rm -f consume-cpu/consume-cpu
rm -f controller/controller
41 changes: 41 additions & 0 deletions test/images/resource-consumer/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package common

const (
ConsumeCPUAddress = "/ConsumeCPU"
ConsumeMemAddress = "/ConsumeMem"
BumpMetricAddress = "/BumpMetric"
GetCurrentStatusAddress = "/GetCurrentStatus"
MetricsAddress = "/Metrics"

MillicoresQuery = "millicores"
MegabytesQuery = "megabytes"
MetricNameQuery = "metric"
DeltaQuery = "delta"
DurationSecQuery = "durationSec"
RequestSizeInMillicoresQuery = "requestSizeMillicores"
RequestSizeInMegabytesQuery = "requestSizeMegabytes"
RequestSizeCustomMetricQuery = "requestSizeMetrics"

BadRequest = "Bad request. Not a POST request"
UnknownFunction = "unknown function"
IncorrectFunctionArgument = "incorrect function argument"
NotGivenFunctionArgument = "not given function argument"

FrameworkName = "horizontal-pod-autoscaling"
)
Loading

0 comments on commit 967a7c9

Please sign in to comment.