Skip to content

Commit

Permalink
Merge pull request kubernetes#110100 from tkashem/client-go-backoff-fix
Browse files Browse the repository at this point in the history
client-go: fix backoff delay
  • Loading branch information
k8s-ci-robot committed May 23, 2022
2 parents fbb5717 + 60e74a9 commit 9997897
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 47 deletions.
142 changes: 103 additions & 39 deletions staging/src/k8s.io/client-go/rest/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,7 +1392,9 @@ func TestCheckRetryClosesBody(t *testing.T) {
defer testServer.Close()

backoff := &testBackoffManager{}
expectedSleeps := []time.Duration{0, time.Second, 0, time.Second, 0, time.Second, 0, time.Second, 0}

// testBackoffManager.CalculateBackoff always returns 0
expectedSleeps := []time.Duration{0, time.Second, time.Second, time.Second, time.Second}

c := testRESTClient(t, testServer)
c.createBackoffMgr = func() BackoffManager { return backoff }
Expand Down Expand Up @@ -1440,10 +1442,12 @@ func TestConnectionResetByPeerIsRetried(t *testing.T) {
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
// We have a sleep before each retry (including the initial one) and for
// every "retry-after" call - thus 5 together.
if len(backoff.sleeps) != 5 {
t.Errorf("Expected 5 retries, got: %d", len(backoff.sleeps))
if count != 3 {
t.Errorf("Expected 3 attempts, got: %d", count)
}
// We have a sleep before each retry (including the initial one) thus 3 together.
if len(backoff.sleeps) != 3 {
t.Errorf("Expected 3 backoff.Sleep, got: %d", len(backoff.sleeps))
}
}

Expand Down Expand Up @@ -2824,7 +2828,8 @@ type withRateLimiterBackoffManagerAndMetrics struct {
flowcontrol.RateLimiter
*NoBackoff
metrics.ResultMetric
backoffWaitSeconds int
calculateBackoffSeq int64
calculateBackoffFn func(i int64) time.Duration

invokeOrderGot []string
sleepsGot []string
Expand All @@ -2839,9 +2844,8 @@ func (lb *withRateLimiterBackoffManagerAndMetrics) Wait(ctx context.Context) err
func (lb *withRateLimiterBackoffManagerAndMetrics) CalculateBackoff(actualUrl *url.URL) time.Duration {
lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.CalculateBackoff")

// we simulate a sleep sequence of 0m, 2m, 4m, 6m, ...
waitFor := time.Duration(lb.backoffWaitSeconds) * time.Minute
lb.backoffWaitSeconds += 2
waitFor := lb.calculateBackoffFn(lb.calculateBackoffSeq)
lb.calculateBackoffSeq++
return waitFor
}

Expand All @@ -2868,14 +2872,16 @@ func (lb *withRateLimiterBackoffManagerAndMetrics) Do() {

func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
type expected struct {
attempts int
order []string
attempts int
order []string
sleeps []string
statusCodes []string
}

// we define the expected order of how the client invokes the
// rate limiter, backoff, and metrics methods.
// scenario:
// - A: original request fails with a retryable response: (500, 'Retry-After: 1')
// - A: original request fails with a retryable response: (500, 'Retry-After: N')
// - B: retry 1: successful with a status code 200
// so we have a total of 2 attempts
invokeOrderWant := []string{
Expand All @@ -2887,17 +2893,16 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
"BackoffManager.Sleep",

// A: first attempt for which the server sends a retryable response
// status code: 500, Retry-Afer: N
"Client.Do",

// we got a response object, status code: 500, Retry-Afer: 1
// we got a response object, status code: 500, Retry-Afer: N
// - call metrics method with appropriate status code
// - update backoff parameters with the status code returned
// - sleep for N seconds from 'Retry-After: N' response header
"RequestResult.Increment",
"BackoffManager.UpdateBackoff",
"BackoffManager.Sleep",
// sleep for delay dictated by backoff parameters
"BackoffManager.CalculateBackoff",
// sleep for delay=max(BackoffManager.CalculateBackoff, Retry-After: N)
"BackoffManager.Sleep",
// wait as dictated by the client rate lmiter
"RateLimiter.Wait",
Expand All @@ -2910,46 +2915,104 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
"RequestResult.Increment",
"BackoffManager.UpdateBackoff",
}
sleepWant := []string{
// initial backoff.Sleep before we send the request to the server for the first time
"0s",
// from 'Retry-After: 1' response header (A)
(1 * time.Second).String(),
// backoff.Sleep before retry 1 (B)
(2 * time.Minute).String(),
}
statusCodesWant := []string{
"500",
"200",
}

tests := []struct {
name string
maxRetries int
serverReturns []responseErr
name string
maxRetries int
serverReturns []responseErr
calculateBackoffFn func(i int64) time.Duration
// expectations differ based on whether it is 'Watch', 'Stream' or 'Do'
expectations map[string]expected
}{
{
name: "success after one retry",
name: "success after one retry, Retry-After: N > BackoffManager.CalculateBackoff",
maxRetries: 1,
serverReturns: []responseErr{
{response: retryAfterResponse(), err: nil},
{response: retryAfterResponseWithDelay("5"), err: nil},
{response: &http.Response{StatusCode: http.StatusOK}, err: nil},
},
// we simulate a sleep sequence of 0s, 1s, 2s, 3s, ...
calculateBackoffFn: func(i int64) time.Duration { return time.Duration(i * int64(time.Second)) },
expectations: map[string]expected{
"Do": {
attempts: 2,
order: invokeOrderWant,
attempts: 2,
order: invokeOrderWant,
statusCodes: statusCodesWant,
sleeps: []string{
// initial backoff.Sleep before we send the request to the server for the first time
"0s",
// maximum of:
// - 'Retry-After: 5' response header from (A)
// - BackoffManager.CalculateBackoff (will return 1s)
(5 * time.Second).String(),
},
},
"Watch": {
attempts: 2,
// Watch does not do 'RateLimiter.Wait' before initially sending the request to the server
order: invokeOrderWant[1:],
order: invokeOrderWant[1:],
statusCodes: statusCodesWant,
sleeps: []string{
"0s",
(5 * time.Second).String(),
},
},
"Stream": {
attempts: 2,
order: invokeOrderWant,
statusCodes: statusCodesWant,
sleeps: []string{
"0s",
(5 * time.Second).String(),
},
},
},
},
{
name: "success after one retry, Retry-After: N < BackoffManager.CalculateBackoff",
maxRetries: 1,
serverReturns: []responseErr{
{response: retryAfterResponseWithDelay("2"), err: nil},
{response: &http.Response{StatusCode: http.StatusOK}, err: nil},
},
// we simulate a sleep sequence of 0s, 4s, 8s, 16s, ...
calculateBackoffFn: func(i int64) time.Duration { return time.Duration(i * int64(4*time.Second)) },
expectations: map[string]expected{
"Do": {
attempts: 2,
order: invokeOrderWant,
statusCodes: statusCodesWant,
sleeps: []string{
// initial backoff.Sleep before we send the request to the server for the first time
"0s",
// maximum of:
// - 'Retry-After: 2' response header from (A)
// - BackoffManager.CalculateBackoff (will return 4s)
(4 * time.Second).String(),
},
},
"Watch": {
attempts: 2,
order: invokeOrderWant,
// Watch does not do 'RateLimiter.Wait' before initially sending the request to the server
order: invokeOrderWant[1:],
statusCodes: statusCodesWant,
sleeps: []string{
"0s",
(4 * time.Second).String(),
},
},
"Stream": {
attempts: 2,
order: invokeOrderWant,
statusCodes: statusCodesWant,
sleeps: []string{
"0s",
(4 * time.Second).String(),
},
},
},
},
Expand All @@ -2958,8 +3021,9 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
interceptor := &withRateLimiterBackoffManagerAndMetrics{
RateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
NoBackoff: &NoBackoff{},
RateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
NoBackoff: &NoBackoff{},
calculateBackoffFn: test.calculateBackoffFn,
}

// TODO: today this is the only site where a test overrides the
Expand Down Expand Up @@ -3027,11 +3091,11 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
if !cmp.Equal(want.order, interceptor.invokeOrderGot) {
t.Errorf("%s: Expected invoke order to match, diff: %s", key, cmp.Diff(want.order, interceptor.invokeOrderGot))
}
if !cmp.Equal(sleepWant, interceptor.sleepsGot) {
t.Errorf("%s: Expected sleep sequence to match, diff: %s", key, cmp.Diff(sleepWant, interceptor.sleepsGot))
if !cmp.Equal(want.sleeps, interceptor.sleepsGot) {
t.Errorf("%s: Expected sleep sequence to match, diff: %s", key, cmp.Diff(want.sleeps, interceptor.sleepsGot))
}
if !cmp.Equal(statusCodesWant, interceptor.statusCodesGot) {
t.Errorf("%s: Expected status codes to match, diff: %s", key, cmp.Diff(statusCodesWant, interceptor.statusCodesGot))
if !cmp.Equal(want.statusCodes, interceptor.statusCodesGot) {
t.Errorf("%s: Expected status codes to match, diff: %s", key, cmp.Diff(want.statusCodes, interceptor.statusCodesGot))
}
})
}
Expand Down
21 changes: 13 additions & 8 deletions staging/src/k8s.io/client-go/rest/with_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {
if r.retryAfter == nil {
// we do a backoff sleep before the first attempt is made,
// (preserving current behavior).
request.backoff.Sleep(request.backoff.CalculateBackoff(url))
if request.backoff != nil {
request.backoff.Sleep(request.backoff.CalculateBackoff(url))
}
return nil
}

Expand All @@ -222,12 +224,11 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {

// if we are here, we have made attempt(s) al least once before.
if request.backoff != nil {
// TODO(tkashem) with default set to use exponential backoff
// we can merge these two sleeps:
// BackOffManager.Sleep(max(backoffManager.CalculateBackoff(), retryAfter))
// see https://github.com/kubernetes/kubernetes/issues/108302
request.backoff.Sleep(r.retryAfter.Wait)
request.backoff.Sleep(request.backoff.CalculateBackoff(url))
delay := request.backoff.CalculateBackoff(url)
if r.retryAfter.Wait > delay {
delay = r.retryAfter.Wait
}
request.backoff.Sleep(delay)
}

// We are retrying the request that we already send to
Expand Down Expand Up @@ -349,8 +350,12 @@ func readAndCloseResponseBody(resp *http.Response) {
}

func retryAfterResponse() *http.Response {
return retryAfterResponseWithDelay("1")
}

func retryAfterResponseWithDelay(delay string) *http.Response {
return &http.Response{
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{"1"}},
Header: http.Header{"Retry-After": []string{delay}},
}
}

0 comments on commit 9997897

Please sign in to comment.