Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding kubernetes core rate limiter handlers #3472

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions upup/pkg/fi/cloudup/awsup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/kubernetes/federation/pkg/dnsprovider:go_default_library",
"//vendor/k8s.io/kubernetes/federation/pkg/dnsprovider/providers/aws/route53:go_default_library",
"//vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/aws:go_default_library",
],
)

Expand Down
64 changes: 60 additions & 4 deletions upup/pkg/fi/cloudup/awsup/aws_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package awsup
import (
"fmt"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface"
Expand All @@ -43,6 +45,7 @@ import (
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
dnsproviderroute53 "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/aws/route53"
k8s_aws "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just copy it in, not add another k/k dependency

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually on seconds thoughts it is such a beast it's probably better not to copy it until we have to...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM ... yes beast

)

// By default, aws-sdk-go only retries 3 times, which doesn't give
Expand Down Expand Up @@ -144,6 +147,13 @@ type awsCloudImplementation struct {
region string

tags map[string]string

regionDelayers *RegionDelayers
}

type RegionDelayers struct {
mutex sync.Mutex
delayerMap map[string]*k8s_aws.CrossRequestRetryDelay
}

var _ fi.Cloud = &awsCloudImplementation{}
Expand All @@ -161,16 +171,19 @@ var awsCloudInstances map[string]AWSCloud = make(map[string]AWSCloud)
func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
raw := awsCloudInstances[region]
if raw == nil {
c := &awsCloudImplementation{region: region}
c := &awsCloudImplementation{
region: region,
regionDelayers: &RegionDelayers{
delayerMap: make(map[string]*k8s_aws.CrossRequestRetryDelay),
},
}

config := aws.NewConfig().WithRegion(region)

// Add some logging of retries
config.Retryer = newLoggingRetryer(ClientMaxRetries)

// This avoids a confusing error message when we fail to get credentials
// e.g. https://github.com/kubernetes/kops/issues/605
config = config.WithCredentialsChainVerboseErrors(true)
config = request.WithRetryer(config, newLoggingRetryer(ClientMaxRetries))

requestLogger := newRequestLogger(2)

Expand All @@ -180,41 +193,47 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
}
c.cf = cloudformation.New(sess, config)
c.cf.Handlers.Send.PushFront(requestLogger)
c.addHandlers(region, &c.cf.Handlers)

sess, err = session.NewSession(config)
if err != nil {
return c, err
}
c.ec2 = ec2.New(sess, config)
c.ec2.Handlers.Send.PushFront(requestLogger)
c.addHandlers(region, &c.ec2.Handlers)

sess, err = session.NewSession(config)
if err != nil {
return c, err
}
c.iam = iam.New(sess, config)
c.iam.Handlers.Send.PushFront(requestLogger)
c.addHandlers(region, &c.iam.Handlers)

sess, err = session.NewSession(config)
if err != nil {
return c, err
}
c.elb = elb.New(sess, config)
c.elb.Handlers.Send.PushFront(requestLogger)
c.addHandlers(region, &c.elb.Handlers)

sess, err = session.NewSession(config)
if err != nil {
return c, err
}
c.autoscaling = autoscaling.New(sess, config)
c.autoscaling.Handlers.Send.PushFront(requestLogger)
c.addHandlers(region, &c.autoscaling.Handlers)

sess, err = session.NewSession(config)
if err != nil {
return c, err
}
c.route53 = route53.New(sess, config)
c.route53.Handlers.Send.PushFront(requestLogger)
c.addHandlers(region, &c.route53.Handlers)

awsCloudInstances[region] = c
raw = c
Expand All @@ -225,6 +244,43 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
return i, nil
}

func (c *awsCloudImplementation) addHandlers(regionName string, h *request.Handlers) {

delayer := c.getCrossRequestRetryDelay(regionName)
if delayer != nil {
h.Sign.PushFrontNamed(request.NamedHandler{
Name: "kops/delay-presign",
Fn: delayer.BeforeSign,
})

h.AfterRetry.PushFrontNamed(request.NamedHandler{
Name: "kops/delay-afterretry",
Fn: delayer.AfterRetry,
})
}
}

// Get a CrossRequestRetryDelay, scoped to the region, not to the request.
// This means that when we hit a limit on a call, we will delay _all_ calls to the API.
// We do this to protect the AWS account from becoming overloaded and effectively locked.
// We also log when we hit request limits.
// Note that this delays the current goroutine; this is bad behaviour and will
// likely cause kops to become slow or unresponsive for cloud operations.
// However, this throttle is intended only as a last resort. When we observe
// this throttling, we need to address the root cause (e.g. add a delay to a
// controller retry loop)
func (c *awsCloudImplementation) getCrossRequestRetryDelay(regionName string) *k8s_aws.CrossRequestRetryDelay {
c.regionDelayers.mutex.Lock()
defer c.regionDelayers.mutex.Unlock()

delayer, found := c.regionDelayers.delayerMap[regionName]
if !found {
delayer = k8s_aws.NewCrossRequestRetryDelay()
c.regionDelayers.delayerMap[regionName] = delayer
}
return delayer
}

func NewEC2Filter(name string, values ...string) *ec2.Filter {
awsValues := []*string{}
for _, value := range values {
Expand Down