Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries on 429s & 5xxs to the Auth webhooks #27993

Merged
merged 1 commit into from
Jun 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 12 additions & 2 deletions plugin/pkg/auth/authenticator/token/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/apis/authentication.k8s.io/v1beta1"
"k8s.io/kubernetes/pkg/auth/authenticator"
"k8s.io/kubernetes/pkg/auth/user"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/util/cache"
"k8s.io/kubernetes/plugin/pkg/webhook"

Expand All @@ -35,6 +36,8 @@ var (
groupVersions = []unversioned.GroupVersion{v1beta1.SchemeGroupVersion}
)

const retryBackoff = 500 * time.Millisecond

// Ensure WebhookTokenAuthenticator implements the authenticator.Token interface.
var _ authenticator.Token = (*WebhookTokenAuthenticator)(nil)

Expand All @@ -46,7 +49,12 @@ type WebhookTokenAuthenticator struct {

// New creates a new WebhookTokenAuthenticator from the provided kubeconfig file.
func New(kubeConfigFile string, ttl time.Duration) (*WebhookTokenAuthenticator, error) {
gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions)
return newWithBackoff(kubeConfigFile, ttl, retryBackoff)
}

// newWithBackoff allows tests to skip the sleep.
func newWithBackoff(kubeConfigFile string, ttl, initialBackoff time.Duration) (*WebhookTokenAuthenticator, error) {
gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions, initialBackoff)
if err != nil {
return nil, err
}
Expand All @@ -61,7 +69,9 @@ func (w *WebhookTokenAuthenticator) AuthenticateToken(token string) (user.Info,
if entry, ok := w.responseCache.Get(r.Spec); ok {
r.Status = entry.(v1beta1.TokenReviewStatus)
} else {
result := w.RestClient.Post().Body(r).Do()
result := w.WithExponentialBackoff(func() restclient.Result {
return w.RestClient.Post().Body(r).Do()
})
if err := result.Error(); err != nil {
return nil, false, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func newTokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, c
if err := json.NewEncoder(tempfile).Encode(config); err != nil {
return nil, err
}
return New(p, cacheTime)
return newWithBackoff(p, cacheTime, 0)
}

func TestTLSConfig(t *testing.T) {
Expand Down
14 changes: 12 additions & 2 deletions plugin/pkg/auth/authorizer/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/authorization/v1beta1"
"k8s.io/kubernetes/pkg/auth/authorizer"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/util/cache"
"k8s.io/kubernetes/plugin/pkg/webhook"

Expand All @@ -36,6 +37,8 @@ var (
groupVersions = []unversioned.GroupVersion{v1beta1.SchemeGroupVersion}
)

const retryBackoff = 500 * time.Millisecond

// Ensure Webhook implements the authorizer.Authorizer interface.
var _ authorizer.Authorizer = (*WebhookAuthorizer)(nil)

Expand Down Expand Up @@ -67,7 +70,12 @@ type WebhookAuthorizer struct {
// For additional HTTP configuration, refer to the kubeconfig documentation
// http://kubernetes.io/v1.1/docs/user-guide/kubeconfig-file.html.
func New(kubeConfigFile string, authorizedTTL, unauthorizedTTL time.Duration) (*WebhookAuthorizer, error) {
gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions)
return newWithBackoff(kubeConfigFile, authorizedTTL, unauthorizedTTL, retryBackoff)
}

// newWithBackoff allows tests to skip the sleep.
func newWithBackoff(kubeConfigFile string, authorizedTTL, unauthorizedTTL, initialBackoff time.Duration) (*WebhookAuthorizer, error) {
gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions, initialBackoff)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -148,7 +156,9 @@ func (w *WebhookAuthorizer) Authorize(attr authorizer.Attributes) (err error) {
if entry, ok := w.responseCache.Get(string(key)); ok {
r.Status = entry.(v1beta1.SubjectAccessReviewStatus)
} else {
result := w.RestClient.Post().Body(r).Do()
result := w.WithExponentialBackoff(func() restclient.Result {
return w.RestClient.Post().Body(r).Do()
})
if err := result.Error(); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/pkg/auth/authorizer/webhook/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ current-context: default
return fmt.Errorf("failed to execute test template: %v", err)
}
// Create a new authorizer
_, err = New(p, 0, 0)
_, err = newWithBackoff(p, 0, 0, 0)
return err
}()
if err != nil && !tt.wantErr {
Expand Down Expand Up @@ -291,7 +291,7 @@ func newAuthorizer(callbackURL string, clientCert, clientKey, ca []byte, cacheTi
if err := json.NewEncoder(tempfile).Encode(config); err != nil {
return nil, err
}
return New(p, cacheTime, cacheTime)
return newWithBackoff(p, cacheTime, cacheTime, 0)
}

func TestTLSConfig(t *testing.T) {
Expand Down
35 changes: 32 additions & 3 deletions plugin/pkg/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package webhook

import (
"fmt"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
Expand All @@ -27,16 +28,18 @@ import (
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/runtime"
runtimeserializer "k8s.io/kubernetes/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/util/wait"

_ "k8s.io/kubernetes/pkg/apis/authorization/install"
)

type GenericWebhook struct {
RestClient *restclient.RESTClient
RestClient *restclient.RESTClient
initialBackoff time.Duration
}

// New creates a new GenericWebhook from the provided kubeconfig file.
func NewGenericWebhook(kubeConfigFile string, groupVersions []unversioned.GroupVersion) (*GenericWebhook, error) {
func NewGenericWebhook(kubeConfigFile string, groupVersions []unversioned.GroupVersion, initialBackoff time.Duration) (*GenericWebhook, error) {
for _, groupVersion := range groupVersions {
if !registered.IsEnabledVersion(groupVersion) {
return nil, fmt.Errorf("webhook plugin requires enabling extension resource: %s", groupVersion)
Expand Down Expand Up @@ -64,5 +67,31 @@ func NewGenericWebhook(kubeConfigFile string, groupVersions []unversioned.GroupV

// TODO(ericchiang): Can we ensure remote service is reachable?

return &GenericWebhook{restClient}, nil
return &GenericWebhook{restClient, initialBackoff}, nil
}

// WithExponentialBackoff will retry webhookFn 5 times w/ exponentially
// increasing backoff when a 429 or a 5xx response code is returned.
func (g *GenericWebhook) WithExponentialBackoff(webhookFn func() restclient.Result) restclient.Result {
backoff := wait.Backoff{
Duration: g.initialBackoff,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}
var result restclient.Result
wait.ExponentialBackoff(backoff, func() (bool, error) {
result = webhookFn()
// Return from Request.Do() errors immediately.
if err := result.Error(); err != nil {
return false, err
}
// Retry 429s, and 5xxs.
var statusCode int
if result.StatusCode(&statusCode); statusCode == 429 || statusCode >= 500 {
return false, nil
}
return true, nil
})
return result
}