Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a rate limiter, use it to rate limit docker pulls. #1457

Merged
merged 1 commit into from
Sep 26, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ var (
etcdServerList util.StringList
rootDirectory = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).")
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]")
registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]")
registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0")
)

func init() {
Expand Down Expand Up @@ -157,7 +159,9 @@ func main() {
cadvisorClient,
etcdClient,
*rootDirectory,
*syncFrequency)
*syncFrequency,
float32(*registryPullQPS),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why bother to convert to 32 bit? Is float even really needed, or could we just spec QPS as int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are most of the time going to want to pass in < 1 QPS

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. milliQPS? :) j/k

*registryBurst)

health.AddHealthChecker("exec", health.NewExecHealthChecker(k))
health.AddHealthChecker("http", health.NewHTTPHealthChecker(&http.Client{}))
Expand Down
24 changes: 21 additions & 3 deletions pkg/kubelet/dockertools/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
)
Expand Down Expand Up @@ -64,8 +65,13 @@ type dockerPuller struct {
keyring *dockerKeyring
}

type throttledDockerPuller struct {
puller dockerPuller
limiter util.RateLimiter
}

// NewDockerPuller creates a new instance of the default implementation of DockerPuller.
func NewDockerPuller(client DockerInterface) DockerPuller {
func NewDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller {
dp := dockerPuller{
client: client,
keyring: newDockerKeyring(),
Expand All @@ -81,8 +87,13 @@ func NewDockerPuller(client DockerInterface) DockerPuller {
if dp.keyring.count() == 0 {
glog.Infof("Continuing with empty docker keyring")
}

return dp
if qps == 0.0 {
return dp
}
return &throttledDockerPuller{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By making this a hidden detail of the puller, you're sort of making an assumption that the caller will retry on a long-enough timescale to eventually pass. we already have a problem where we can't tell if a pull failed because of network error, because of server error, or because the image doesn't exist. We will have to do better on error reporting eventually. I guess we can revisit the error type later, and maybe pass up more info in a specific type for this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we should add richer errors here, esp. so we can try to actually run the container in some errors (e.g. network problem)

puller: dp,
limiter: util.NewTokenBucketRateLimiter(qps, burst),
}
}

type dockerContainerCommandRunner struct{}
Expand Down Expand Up @@ -130,6 +141,13 @@ func (p dockerPuller) Pull(image string) error {
return p.client.PullImage(opts, creds)
}

func (p throttledDockerPuller) Pull(image string) error {
if p.limiter.CanAccept() {
return p.puller.Pull(image)
}
return fmt.Errorf("pull QPS exceeded.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just block until the QPS block is lifted?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the question is whether there's ever a situation where you'd rather fail than respect the rate limit? I have a hard time imagining such a mode, except in a very extreme situation where you might be better off getting rescheduled on a different host...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you might want to fail here and try a local image if it exists. Do we then need a queueing API here that says "I have no local image, I want to run, put me in the queue" to prevent DoS?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the more I think about it the more I think you're going to usually just spend all of the QPS budget on calls that always fail fast, starving legit calls. I think the management system needs to slice up the QPS budget and explicitly not starve other pods.

Basically I don't think you can do this while hiding the QPS budget behind the docker puller interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really just to prevent us from DDOS-ing Docker. (there were complaints yesterday).

We should also have an explicit "back off in case of image pull failures" component, but that is not this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I don't want to block, for the reasons Tim mentioned. I want to retry using a local image. I realize that the code doesn't do that right now as written, since we'll need to introduce our own error type. It's TODO'd

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a TODO? @dchen1107 because we were talking about error-handling around pulls yesterday - I think we could spend a bit of time in this space to make it safer, not necessary as part of this PR

}

// DockerContainers is a map of containers
type DockerContainers map[DockerID]*docker.APIContainers

Expand Down
16 changes: 13 additions & 3 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func NewMainKubelet(
cc CadvisorInterface,
ec tools.EtcdClient,
rd string,
ri time.Duration) *Kubelet {
ri time.Duration,
pullQPS float32,
pullBurst int) *Kubelet {
return &Kubelet{
hostname: hn,
dockerClient: dc,
Expand All @@ -80,6 +82,8 @@ func NewMainKubelet(
podWorkers: newPodWorkers(),
runner: dockertools.NewDockerContainerCommandRunner(),
httpClient: &http.Client{},
pullQPS: pullQPS,
pullBurst: pullBurst,
}
}

Expand Down Expand Up @@ -121,6 +125,10 @@ type Kubelet struct {
runner dockertools.ContainerCommandRunner
// Optional, client for http requests, defaults to empty client
httpClient httpGetInterface
// Optional, maximum pull QPS from the docker registry, 0.0 means unlimited.
pullQPS float32
// Optional, maximum burst QPS from the docker registry, must be positive if QPS is > 0.0
pullBurst int
}

// Run starts the kubelet reacting to config updates
Expand All @@ -129,7 +137,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.dockerPuller == nil {
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient)
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
}
if kl.healthChecker == nil {
kl.healthChecker = health.NewHealthChecker()
Expand Down Expand Up @@ -404,7 +412,9 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (dockertools.DockerID, error
Image: networkContainerImage,
Ports: ports,
}
kl.dockerPuller.Pull(networkContainerImage)
if err := kl.dockerPuller.Pull(networkContainerImage); err != nil {
return "", err
}
return kl.runContainer(pod, container, nil, "")
}

Expand Down
104 changes: 104 additions & 0 deletions pkg/util/throttle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright 2014 Google Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"sync"
"time"
)

type RateLimiter interface {
// CanAccept returns true if the rate is below the limit, false otherwise
CanAccept() bool
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
Stop()
}

type tickRateLimiter struct {
lock sync.Mutex
tokens chan bool
ticker <-chan time.Time
stop chan bool
}

// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
// The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a
// smoothed qps rate of 'qps'.
// The bucket is initially filled with 'burst' tokens, the rate limiter spawns a go routine
// which refills the bucket with one token at a rate of 'qps'. The maximum number of tokens in
// the bucket is capped at 'burst'.
// When done with the limiter, Stop() must be called to halt the associated goroutine.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
ticker := time.Tick(time.Duration(float32(time.Second) / qps))
rate := newTokenBucketRateLimiterFromTicker(ticker, burst)
go rate.run()
return rate
}

func newTokenBucketRateLimiterFromTicker(ticker <-chan time.Time, burst int) *tickRateLimiter {
if burst < 1 {
panic("burst must be a positive integer")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't do a lot of precondition checking, and Go does not seem to endorse CHECK failing - is this a pattern we should follow elsewhere? I have wanted to do this many times...

}
rate := &tickRateLimiter{
tokens: make(chan bool, burst),
ticker: ticker,
stop: make(chan bool),
}
for i := 0; i < burst; i++ {
rate.tokens <- true
}
return rate
}

func (t *tickRateLimiter) CanAccept() bool {
select {
case <-t.tokens:
return true
default:
return false
}
}

func (t *tickRateLimiter) Stop() {
close(t.stop)
}

func (r *tickRateLimiter) run() {
for {
if !r.step() {
break
}
}
}

func (r *tickRateLimiter) step() bool {
select {
case <-r.ticker:
r.increment()
return true
case <-r.stop:
return false
}
}

func (t *tickRateLimiter) increment() {
// non-blocking send
select {
case t.tokens <- true:
default:
}
}
62 changes: 62 additions & 0 deletions pkg/util/throttle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
Copyright 2014 Google Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"testing"
"time"
)

func TestBasicThrottle(t *testing.T) {
ticker := make(chan time.Time, 1)
r := newTokenBucketRateLimiterFromTicker(ticker, 3)
for i := 0; i < 3; i++ {
if !r.CanAccept() {
t.Error("unexpected false accept")
}
}
if r.CanAccept() {
t.Error("unexpected true accept")
}
}

func TestIncrementThrottle(t *testing.T) {
ticker := make(chan time.Time, 1)
r := newTokenBucketRateLimiterFromTicker(ticker, 1)
if !r.CanAccept() {
t.Error("unexpected false accept")
}
if r.CanAccept() {
t.Error("unexpected true accept")
}
ticker <- time.Now()
r.step()

if !r.CanAccept() {
t.Error("unexpected false accept")
}
}

func TestOverBurst(t *testing.T) {
ticker := make(chan time.Time, 1)
r := newTokenBucketRateLimiterFromTicker(ticker, 3)

for i := 0; i < 4; i++ {
ticker <- time.Now()
r.step()
}
}