Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #64630: Wait a minimum amount of time for polling operations #65119

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/cloudprovider/providers/gce/cloud/BUILD
Expand Up @@ -4,7 +4,9 @@ go_library(
name = "go_default_library",
srcs = [
"constants.go",
"context.go",
"doc.go",
"errors.go",
"gce_projects.go",
"gen.go",
"op.go",
Expand All @@ -31,6 +33,8 @@ go_test(
srcs = [
"gen_test.go",
"mock_test.go",
"ratelimit_test.go",
"service_test.go",
"utils_test.go",
],
embed = [":go_default_library"],
Expand Down
31 changes: 31 additions & 0 deletions pkg/cloudprovider/providers/gce/cloud/context.go
@@ -0,0 +1,31 @@
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cloud

import (
"context"
"time"
)

const (
defaultCallTimeout = 1 * time.Hour
)

// ContextWithCallTimeout returns a context with a default timeout, used for generated client calls.
func ContextWithCallTimeout() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultCallTimeout)
}
48 changes: 48 additions & 0 deletions pkg/cloudprovider/providers/gce/cloud/errors.go
@@ -0,0 +1,48 @@
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cloud

import "fmt"

// OperationPollingError occurs when the GCE Operation cannot be retrieved for a prolonged period.
type OperationPollingError struct {
LastPollError error
}

// Error returns a string representation including the last poll error encountered.
func (e *OperationPollingError) Error() string {
return fmt.Sprintf("GCE operation polling error: %v", e.LastPollError)
}

// GCEOperationError occurs when the GCE Operation finishes with an error.
type GCEOperationError struct {
// HTTPStatusCode is the HTTP status code of the final error.
// For example, a failed operation may have 400 - BadRequest.
HTTPStatusCode int
// Code is GCE's code of what went wrong.
// For example, RESOURCE_IN_USE_BY_ANOTHER_RESOURCE
Code string
// Message is a human readable message.
// For example, "The network resource 'xxx' is already being used by 'xxx'"
Message string
}

// Error returns a string representation including the HTTP Status code, GCE's error code
// and a human readable message.
func (e *GCEOperationError) Error() string {
return fmt.Sprintf("GCE %v - %v: %v", e.HTTPStatusCode, e.Code, e.Message)
}
52 changes: 49 additions & 3 deletions pkg/cloudprovider/providers/gce/cloud/op.go
Expand Up @@ -29,10 +29,17 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)

const (
operationStatusDone = "DONE"
)

// operation is a GCE operation that can be watied on.
type operation interface {
// isDone queries GCE for the done status. This call can block.
isDone(ctx context.Context) (bool, error)
// error returns the resulting error of the operation. This may be nil if the operations
// was successful.
error() error
// rateLimitKey returns the rate limit key to use for the given operation.
// This rate limit will govern how fast the server will be polled for
// operation completion status.
Expand All @@ -43,6 +50,7 @@ type gaOperation struct {
s *Service
projectID string
key *meta.Key
err error
}

func (o *gaOperation) String() string {
Expand Down Expand Up @@ -71,7 +79,15 @@ func (o *gaOperation) isDone(ctx context.Context) (bool, error) {
if err != nil {
return false, err
}
return op != nil && op.Status == "DONE", nil
if op == nil || op.Status != operationStatusDone {
return false, nil
}

if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil {
e := op.Error.Errors[0]
o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message}
}
return true, nil
}

func (o *gaOperation) rateLimitKey() *RateLimitKey {
Expand All @@ -83,10 +99,15 @@ func (o *gaOperation) rateLimitKey() *RateLimitKey {
}
}

func (o *gaOperation) error() error {
return o.err
}

type alphaOperation struct {
s *Service
projectID string
key *meta.Key
err error
}

func (o *alphaOperation) String() string {
Expand Down Expand Up @@ -115,7 +136,15 @@ func (o *alphaOperation) isDone(ctx context.Context) (bool, error) {
if err != nil {
return false, err
}
return op != nil && op.Status == "DONE", nil
if op == nil || op.Status != operationStatusDone {
return false, nil
}

if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil {
e := op.Error.Errors[0]
o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message}
}
return true, nil
}

func (o *alphaOperation) rateLimitKey() *RateLimitKey {
Expand All @@ -127,10 +156,15 @@ func (o *alphaOperation) rateLimitKey() *RateLimitKey {
}
}

func (o *alphaOperation) error() error {
return o.err
}

type betaOperation struct {
s *Service
projectID string
key *meta.Key
err error
}

func (o *betaOperation) String() string {
Expand Down Expand Up @@ -159,7 +193,15 @@ func (o *betaOperation) isDone(ctx context.Context) (bool, error) {
if err != nil {
return false, err
}
return op != nil && op.Status == "DONE", nil
if op == nil || op.Status != operationStatusDone {
return false, nil
}

if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil {
e := op.Error.Errors[0]
o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message}
}
return true, nil
}

func (o *betaOperation) rateLimitKey() *RateLimitKey {
Expand All @@ -170,3 +212,7 @@ func (o *betaOperation) rateLimitKey() *RateLimitKey {
Version: meta.VersionBeta,
}
}

func (o *betaOperation) error() error {
return o.err
}
62 changes: 50 additions & 12 deletions pkg/cloudprovider/providers/gce/cloud/ratelimit.go
Expand Up @@ -47,22 +47,60 @@ type RateLimiter interface {
Accept(ctx context.Context, key *RateLimitKey) error
}

// acceptor is an object which blocks within Accept until a call is allowed to run.
// Accept is a behavior of the flowcontrol.RateLimiter interface.
type acceptor interface {
// Accept blocks until a call is allowed to run.
Accept()
}

// AcceptRateLimiter wraps an Acceptor with RateLimiter parameters.
type AcceptRateLimiter struct {
// Acceptor is the underlying rate limiter.
Acceptor acceptor
}

// Accept wraps an Acceptor and blocks on Accept or context.Done(). Key is ignored.
func (rl *AcceptRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error {
ch := make(chan struct{})
go func() {
rl.Acceptor.Accept()
close(ch)
}()
select {
case <-ch:
break
case <-ctx.Done():
return ctx.Err()
}
return nil
}

// NopRateLimiter is a rate limiter that performs no rate limiting.
type NopRateLimiter struct {
}

// Accept the operation to be rate limited.
// Accept everything immediately.
func (*NopRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error {
// Rate limit polling of the Operation status to avoid hammering GCE
// for the status of an operation.
const pollTime = time.Duration(1) * time.Second
if key.Operation == "Get" && key.Service == "Operations" {
select {
case <-time.NewTimer(pollTime).C:
break
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

// MinimumRateLimiter wraps a RateLimiter and will only call its Accept until the minimum
// duration has been met or the context is cancelled.
type MinimumRateLimiter struct {
// RateLimiter is the underlying ratelimiter which is called after the mininum time is reacehd.
RateLimiter RateLimiter
// Minimum is the minimum wait time before the underlying ratelimiter is called.
Minimum time.Duration
}

// Accept blocks on the minimum duration and context. Once the minimum duration is met,
// the func is blocked on the underlying ratelimiter.
func (m *MinimumRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error {
select {
case <-time.After(m.Minimum):
return m.RateLimiter.Accept(ctx, key)
case <-ctx.Done():
return ctx.Err()
}
}
80 changes: 80 additions & 0 deletions pkg/cloudprovider/providers/gce/cloud/ratelimit_test.go
@@ -0,0 +1,80 @@
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cloud

import (
"context"
"testing"
"time"
)

type FakeAcceptor struct{ accept func() }

func (f *FakeAcceptor) Accept() {
f.accept()
}

func TestAcceptRateLimiter(t *testing.T) {
fa := &FakeAcceptor{accept: func() {}}
arl := &AcceptRateLimiter{fa}
err := arl.Accept(context.Background(), nil)
if err != nil {
t.Errorf("AcceptRateLimiter.Accept() = %v, want nil", err)
}

// Use context that has been cancelled and expect a context error returned.
ctxCancelled, cancelled := context.WithCancel(context.Background())
cancelled()
// Verify context is cancelled by now.
<-ctxCancelled.Done()

fa.accept = func() { time.Sleep(1 * time.Second) }
err = arl.Accept(ctxCancelled, nil)
if err != ctxCancelled.Err() {
t.Errorf("AcceptRateLimiter.Accept() = %v, want %v", err, ctxCancelled.Err())
}
}

func TestMinimumRateLimiter(t *testing.T) {
fa := &FakeAcceptor{accept: func() {}}
arl := &AcceptRateLimiter{fa}
var called bool
fa.accept = func() { called = true }
m := &MinimumRateLimiter{RateLimiter: arl, Minimum: 10 * time.Millisecond}

err := m.Accept(context.Background(), nil)
if err != nil {
t.Errorf("MinimumRateLimiter.Accept = %v, want nil", err)
}
if !called {
t.Errorf("`called` = false, want true")
}

// Use context that has been cancelled and expect a context error returned.
ctxCancelled, cancelled := context.WithCancel(context.Background())
cancelled()
// Verify context is cancelled by now.
<-ctxCancelled.Done()
called = false
err = m.Accept(ctxCancelled, nil)
if err != ctxCancelled.Err() {
t.Errorf("AcceptRateLimiter.Accept() = %v, want %v", err, ctxCancelled.Err())
}
if called {
t.Errorf("`called` = true, want false")
}
}