Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS: Delay all AWS calls when we observe RequestLimitExceeded errors #19335

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
54 changes: 49 additions & 5 deletions pkg/cloudprovider/providers/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,57 @@ type awsSdkEC2 struct {

type awsSDKProvider struct {
creds *credentials.Credentials

mutex sync.Mutex
regionDelayers map[string]*CrossRequestRetryDelay
}

func newAWSSDKProvider(creds *credentials.Credentials) *awsSDKProvider {
return &awsSDKProvider{
creds: creds,
regionDelayers: make(map[string]*CrossRequestRetryDelay),
}
}

func addHandlers(h *request.Handlers) {
func (p *awsSDKProvider) addHandlers(regionName string, h *request.Handlers) {
h.Sign.PushFrontNamed(request.NamedHandler{
Name: "k8s/logger",
Fn: awsHandlerLogger,
})

delayer := p.getCrossRequestRetryDelay(regionName)
if delayer != nil {
h.Sign.PushFrontNamed(request.NamedHandler{
Name: "k8s/delay-presign",
Fn: delayer.BeforeSign,
})

h.AfterRetry.PushFrontNamed(request.NamedHandler{
Name: "k8s/delay-afterretry",
Fn: delayer.AfterRetry,
})
}
}

// Get a CrossRequestRetryDelay, scoped to the region, not to the request.
// This means that when we hit a limit on a call, we will delay _all_ calls to the API.
// We do this to protect the AWS account from becoming overloaded and effectively locked.
// We also log when we hit request limits.
// Note that this delays the current goroutine; this is bad behaviour and will
// likely cause k8s to become slow or unresponsive for cloud operations.
// However, this throttle is intended only as a last resort. When we observe
// this throttling, we need to address the root cause (e.g. add a delay to a
// controller retry loop)
func (p *awsSDKProvider) getCrossRequestRetryDelay(regionName string) *CrossRequestRetryDelay {
p.mutex.Lock()
defer p.mutex.Unlock()

delayer, found := p.regionDelayers[regionName]
if !found {
delayer = NewCrossRequestRetryDelay()
p.regionDelayers[regionName] = delayer
}
return delayer
}

func (p *awsSDKProvider) Compute(regionName string) (EC2, error) {
Expand All @@ -243,7 +287,7 @@ func (p *awsSDKProvider) Compute(regionName string) (EC2, error) {
Credentials: p.creds,
}))

addHandlers(&service.Handlers)
p.addHandlers(regionName, &service.Handlers)

ec2 := &awsSdkEC2{
ec2: service,
Expand All @@ -257,7 +301,7 @@ func (p *awsSDKProvider) LoadBalancing(regionName string) (ELB, error) {
Credentials: p.creds,
}))

addHandlers(&elbClient.Handlers)
p.addHandlers(regionName, &elbClient.Handlers)

return elbClient, nil
}
Expand All @@ -268,7 +312,7 @@ func (p *awsSDKProvider) Autoscaling(regionName string) (ASG, error) {
Credentials: p.creds,
}))

addHandlers(&client.Handlers)
p.addHandlers(regionName, &client.Handlers)

return client, nil
}
Expand Down Expand Up @@ -458,7 +502,7 @@ func init() {
},
&credentials.SharedCredentialsProvider{},
})
aws := &awsSDKProvider{creds: creds}
aws := newAWSSDKProvider(creds)
return newAWSCloud(config, aws)
})
}
Expand Down
161 changes: 161 additions & 0 deletions pkg/cloudprovider/providers/aws/retry_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package aws

import (
"math"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/golang/glog"
)

const (
decayIntervalSeconds = 20
decayFraction = 0.8
maxDelay = 60 * time.Second
)

// CrossRequestRetryDelay inserts delays before AWS calls, when we are observing RequestLimitExceeded errors
// Note that we share a CrossRequestRetryDelay across multiple AWS requests; this is a process-wide back-off,
// whereas the aws-sdk-go implements a per-request exponential backoff/retry
type CrossRequestRetryDelay struct {
backoff Backoff
}

// Create a new CrossRequestRetryDelay
func NewCrossRequestRetryDelay() *CrossRequestRetryDelay {
c := &CrossRequestRetryDelay{}
c.backoff.init(decayIntervalSeconds, decayFraction, maxDelay)
return c
}

// Added to the Sign chain; called before each request
func (c *CrossRequestRetryDelay) BeforeSign(r *request.Request) {
now := time.Now()
delay := c.backoff.ComputeDelayForRequest(now)
if delay > 0 {
glog.Warningf("Inserting delay before AWS request (%s) to avoid RequestLimitExceeded: %s",
describeRequest(r), delay.String())
r.Config.SleepDelay(delay)

// Avoid clock skew problems
r.Time = now
}
}

// Return a user-friendly string describing the request, for use in log messages
func describeRequest(r *request.Request) string {
service := r.ClientInfo.ServiceName

name := "?"
if r.Operation != nil {
name = r.Operation.Name
}

return service + "::" + name
}

// Added to the AfterRetry chain; called after any error
func (c *CrossRequestRetryDelay) AfterRetry(r *request.Request) {
if r.Error == nil {
return
}
awsError, ok := r.Error.(awserr.Error)
if !ok {
return
}
if awsError.Code() == "RequestLimitExceeded" {
c.backoff.ReportError()
glog.Warningf("Got RequestLimitExceeded error on AWS request (%s)",
describeRequest(r))
}
}

// Backoff manages a backoff that varies based on the recently observed failures
type Backoff struct {
decayIntervalSeconds int64
decayFraction float64
maxDelay time.Duration

mutex sync.Mutex

// We count all requests & the number of requests which hit a
// RequestLimit. We only really care about 'recent' requests, so we
// decay the counts exponentially to bias towards recent values.
countErrorsRequestLimit float32
countRequests float32
lastDecay int64
}

func (b *Backoff) init(decayIntervalSeconds int, decayFraction float64, maxDelay time.Duration) {
b.lastDecay = time.Now().Unix()
// Bias so that if the first request hits the limit we don't immediately apply the full delay
b.countRequests = 4
b.decayIntervalSeconds = int64(decayIntervalSeconds)
b.decayFraction = decayFraction
b.maxDelay = maxDelay
}

// Computes the delay required for a request, also updating internal state to count this request
func (b *Backoff) ComputeDelayForRequest(now time.Time) time.Duration {
b.mutex.Lock()
defer b.mutex.Unlock()

// Apply exponential decay to the counters
timeDeltaSeconds := now.Unix() - b.lastDecay
if timeDeltaSeconds > b.decayIntervalSeconds {
intervals := float64(timeDeltaSeconds) / float64(b.decayIntervalSeconds)
decay := float32(math.Pow(b.decayFraction, intervals))
b.countErrorsRequestLimit *= decay
b.countRequests *= decay
b.lastDecay = now.Unix()
}

// Count this request
b.countRequests += 1.0

// Compute the failure rate
errorFraction := float32(0.0)
if b.countRequests > 0.5 {
// Avoid tiny residuals & rounding errors
errorFraction = b.countErrorsRequestLimit / b.countRequests
}

// Ignore a low fraction of errors
// This also allows them to time-out
if errorFraction < 0.1 {
return time.Duration(0)
}

// Delay by the max delay multiplied by the recent error rate
// (i.e. we apply a linear delay function)
// TODO: This is pretty arbitrary
delay := time.Nanosecond * time.Duration(float32(b.maxDelay.Nanoseconds())*errorFraction)
// Round down to the nearest second for sanity
return time.Second * time.Duration(int(delay.Seconds()))
}

// Called when we observe a throttling error
func (b *Backoff) ReportError() {
b.mutex.Lock()
defer b.mutex.Unlock()

b.countErrorsRequestLimit += 1.0
}
135 changes: 135 additions & 0 deletions pkg/cloudprovider/providers/aws/retry_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package aws

import (
"testing"
"time"
)

// There follows a group of tests for the backoff logic. There's nothing
// particularly special about the values chosen: if we tweak the values in the
// backoff logic then we might well have to update the tests. However the key
// behavioural elements should remain (e.g. no errors => no backoff), and these
// are each tested by one of the tests below.

// Test that we don't apply any delays when there are no errors
func TestBackoffNoErrors(t *testing.T) {
b := &Backoff{}
b.init(decayIntervalSeconds, decayFraction, maxDelay)

now := time.Now()
for i := 0; i < 100; i++ {
d := b.ComputeDelayForRequest(now)
if d.Nanoseconds() != 0 {
t.Fatalf("unexpected delay during no-error case")
}
now = now.Add(time.Second)
}
}

// Test that we always apply a delay when there are errors, and also that we
// don't "flap" - that our own delay doesn't cause us to oscillate between
// delay and no-delay.
func TestBackoffAllErrors(t *testing.T) {
b := &Backoff{}
b.init(decayIntervalSeconds, decayFraction, maxDelay)

now := time.Now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the typical use case of a fake clock, so you don't need to depend on the system clock in a unittest. Eg:

expiredTime := fakeTime.Add(-(ttl + 1))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only use time.Now because it's the simplest way to build a time value :-) Then ComputeDelayForRequest naturally takes a timestamp, because of the need to set "r.Time = now" to avoid clock skew problems with the aws-sdk.

That said, I think it's a total wash - happy to switch to a fake clock if that's clearer...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upto you, yeah the real danger case is subtracting time values in a unit test because the system clock can just randomly reset causing flake. As long as we're not hitting that it's fine.

// Warm up
for i := 0; i < 10; i++ {
_ = b.ComputeDelayForRequest(now)
b.ReportError()
now = now.Add(time.Second)
}

for i := 0; i < 100; i++ {
d := b.ComputeDelayForRequest(now)
b.ReportError()
if d.Seconds() < 5 {
t.Fatalf("unexpected short-delay during all-error case: %v", d)
}
t.Logf("delay @%d %v", i, d)
now = now.Add(d)
}
}

// Test that we do come close to our max delay, when we see all errors at 1
// second intervals (this simulates multiple concurrent requests, because we
// don't wait for delay in between requests)
func TestBackoffHitsMax(t *testing.T) {
b := &Backoff{}
b.init(decayIntervalSeconds, decayFraction, maxDelay)

now := time.Now()
for i := 0; i < 100; i++ {
_ = b.ComputeDelayForRequest(now)
b.ReportError()
now = now.Add(time.Second)
}

for i := 0; i < 10; i++ {
d := b.ComputeDelayForRequest(now)
b.ReportError()
if float32(d.Nanoseconds()) < (float32(maxDelay.Nanoseconds()) * 0.95) {
t.Fatalf("expected delay to be >= 95 percent of max delay, was %v", d)
}
t.Logf("delay @%d %v", i, d)
now = now.Add(time.Second)
}
}

// Test that after a phase of errors, we eventually stop applying a delay once there are
// no more errors.
func TestBackoffRecovers(t *testing.T) {
b := &Backoff{}
b.init(decayIntervalSeconds, decayFraction, maxDelay)

now := time.Now()

// Phase of all-errors
for i := 0; i < 100; i++ {
_ = b.ComputeDelayForRequest(now)
b.ReportError()
now = now.Add(time.Second)
}

for i := 0; i < 10; i++ {
d := b.ComputeDelayForRequest(now)
b.ReportError()
if d.Seconds() < 5 {
t.Fatalf("unexpected short-delay during all-error phase: %v", d)
}
t.Logf("error phase delay @%d %v", i, d)
now = now.Add(time.Second)
}

// Phase of no errors
for i := 0; i < 100; i++ {
_ = b.ComputeDelayForRequest(now)
now = now.Add(3 * time.Second)
}

for i := 0; i < 10; i++ {
d := b.ComputeDelayForRequest(now)
if d.Seconds() != 0 {
t.Fatalf("unexpected delay during error recovery phase: %v", d)
}
t.Logf("no-error phase delay @%d %v", i, d)
now = now.Add(time.Second)
}
}