Skip to content

Commit

Permalink
client-go: make retry in Request thread safe
Browse files Browse the repository at this point in the history
Kubernetes-commit: 6618b8ef7c0b552839555d4578b64427d20524ef
  • Loading branch information
tkashem authored and k8s-publishing-bot committed Mar 29, 2022
1 parent 33011f1 commit d8531f5
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 45 deletions.
52 changes: 34 additions & 18 deletions rest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ func (r *RequestConstructionError) Error() string {

var noBackoff = &NoBackoff{}

type requestRetryFunc func(maxRetries int) WithRetry

func defaultRequestRetryFn(maxRetries int) WithRetry {
return &withRetry{maxRetries: maxRetries}
}

// Request allows for building up a request to a server in a chained fashion.
// Any errors are stored until the end of your call, so you only have to
// check once.
Expand All @@ -93,6 +99,7 @@ type Request struct {
rateLimiter flowcontrol.RateLimiter
backoff BackoffManager
timeout time.Duration
maxRetries int

// generic components accessible via method setters
verb string
Expand All @@ -109,9 +116,10 @@ type Request struct {
subresource string

// output
err error
body io.Reader
retry WithRetry
err error
body io.Reader

retryFn requestRetryFunc
}

// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
Expand Down Expand Up @@ -142,7 +150,8 @@ func NewRequest(c *RESTClient) *Request {
backoff: backoff,
timeout: timeout,
pathPrefix: pathPrefix,
retry: &withRetry{maxRetries: 10},
maxRetries: 10,
retryFn: defaultRequestRetryFn,
warningHandler: c.warningHandler,
}

Expand Down Expand Up @@ -408,7 +417,10 @@ func (r *Request) Timeout(d time.Duration) *Request {
// function is specifically called with a different value.
// A zero maxRetries prevent it from doing retires and return an error immediately.
func (r *Request) MaxRetries(maxRetries int) *Request {
r.retry.SetMaxRetries(maxRetries)
if maxRetries < 0 {
maxRetries = 0
}
r.maxRetries = maxRetries
return r
}

Expand Down Expand Up @@ -612,27 +624,29 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
}
return false
}
retry := r.retryFn(r.maxRetries)
url := r.URL().String()
for {
if err := r.retry.Before(ctx, r); err != nil {
return nil, r.retry.WrapPreviousError(err)
if err := retry.Before(ctx, r); err != nil {
return nil, retry.WrapPreviousError(err)
}

req, err := r.newHTTPRequest(ctx)
if err != nil {
return nil, err
}

resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
r.retry.After(ctx, r, resp, err)
retry.After(ctx, r, resp, err)
if err == nil && resp.StatusCode == http.StatusOK {
return r.newStreamWatcher(resp)
}

done, transformErr := func() (bool, error) {
defer readAndCloseResponseBody(resp)

if r.retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
return false, nil
}

Expand All @@ -654,7 +668,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
// we need to return the error object from that.
err = transformErr
}
return nil, r.retry.WrapPreviousError(err)
return nil, retry.WrapPreviousError(err)
}
}
}
Expand Down Expand Up @@ -719,9 +733,10 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
client = http.DefaultClient
}

retry := r.retryFn(r.maxRetries)
url := r.URL().String()
for {
if err := r.retry.Before(ctx, r); err != nil {
if err := retry.Before(ctx, r); err != nil {
return nil, err
}

Expand All @@ -734,7 +749,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
r.retry.After(ctx, r, resp, err)
retry.After(ctx, r, resp, err)
if err != nil {
// we only retry on an HTTP response with 'Retry-After' header
return nil, err
Expand All @@ -749,7 +764,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
done, transformErr := func() (bool, error) {
defer resp.Body.Close()

if r.retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
return false, nil
}
result := r.transformResponse(resp, req)
Expand Down Expand Up @@ -856,9 +871,10 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
}

// Right now we make about ten retry attempts if we get a Retry-After response.
retry := r.retryFn(r.maxRetries)
for {
if err := r.retry.Before(ctx, r); err != nil {
return r.retry.WrapPreviousError(err)
if err := retry.Before(ctx, r); err != nil {
return retry.WrapPreviousError(err)
}
req, err := r.newHTTPRequest(ctx)
if err != nil {
Expand All @@ -871,7 +887,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
}
r.retry.After(ctx, r, resp, err)
retry.After(ctx, r, resp, err)

done := func() bool {
defer readAndCloseResponseBody(resp)
Expand All @@ -884,15 +900,15 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
fn(req, resp)
}

if r.retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
return false
}

f(req, resp)
return true
}()
if done {
return r.retry.WrapPreviousError(err)
return retry.WrapPreviousError(err)
}
}
}
Expand Down
36 changes: 22 additions & 14 deletions rest/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,8 @@ func TestRequestWatch(t *testing.T) {
c.Client = client
}
testCase.Request.backoff = &noSleepBackOff{}
testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries}
testCase.Request.maxRetries = testCase.maxRetries
testCase.Request.retryFn = defaultRequestRetryFn

watch, err := testCase.Request.Watch(context.Background())

Expand Down Expand Up @@ -1211,7 +1212,8 @@ func TestRequestStream(t *testing.T) {
c.Client = client
}
testCase.Request.backoff = &noSleepBackOff{}
testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries}
testCase.Request.maxRetries = testCase.maxRetries
testCase.Request.retryFn = defaultRequestRetryFn

body, err := testCase.Request.Stream(context.Background())

Expand Down Expand Up @@ -1266,7 +1268,7 @@ func TestRequestDo(t *testing.T) {
}
for i, testCase := range testCases {
testCase.Request.backoff = &NoBackoff{}
testCase.Request.retry = &withRetry{}
testCase.Request.retryFn = defaultRequestRetryFn
body, err := testCase.Request.Do(context.Background()).Raw()
hasErr := err != nil
if hasErr != testCase.Err {
Expand Down Expand Up @@ -1429,8 +1431,9 @@ func TestConnectionResetByPeerIsRetried(t *testing.T) {
return nil, &net.OpError{Err: syscall.ECONNRESET}
}),
},
backoff: backoff,
retry: &withRetry{maxRetries: 10},
backoff: backoff,
maxRetries: 10,
retryFn: defaultRequestRetryFn,
}
// We expect two retries of "connection reset by peer" and the success.
_, err := req.Do(context.Background()).Raw()
Expand Down Expand Up @@ -2504,8 +2507,9 @@ func TestRequestWithRetry(t *testing.T) {
c: &RESTClient{
Client: client,
},
backoff: &noSleepBackOff{},
retry: &withRetry{maxRetries: 1},
backoff: &noSleepBackOff{},
maxRetries: 1,
retryFn: defaultRequestRetryFn,
}

var transformFuncInvoked int
Expand Down Expand Up @@ -2782,8 +2786,9 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
content: defaultContentConfig(),
Client: client,
},
backoff: &noSleepBackOff{},
retry: &withRetry{maxRetries: test.maxRetries},
backoff: &noSleepBackOff{},
maxRetries: test.maxRetries,
retryFn: defaultRequestRetryFn,
}

doFunc(context.Background(), req)
Expand Down Expand Up @@ -3006,7 +3011,8 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
pathPrefix: "/api/v1",
rateLimiter: interceptor,
backoff: interceptor,
retry: &withRetry{maxRetries: test.maxRetries},
maxRetries: test.maxRetries,
retryFn: defaultRequestRetryFn,
}

doFunc(ctx, req)
Expand Down Expand Up @@ -3140,7 +3146,7 @@ func testWithRetryInvokeOrder(t *testing.T, key string, doFunc func(ctx context.
pathPrefix: "/api/v1",
rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
backoff: &NoBackoff{},
retry: interceptor,
retryFn: func(_ int) WithRetry { return interceptor },
}

doFunc(context.Background(), req)
Expand Down Expand Up @@ -3315,7 +3321,8 @@ func testWithWrapPreviousError(t *testing.T, doFunc func(ctx context.Context, r
pathPrefix: "/api/v1",
rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
backoff: &noSleepBackOff{},
retry: &withRetry{maxRetries: test.maxRetries},
maxRetries: test.maxRetries,
retryFn: defaultRequestRetryFn,
}

err = doFunc(context.Background(), req)
Expand Down Expand Up @@ -3618,8 +3625,9 @@ func TestRequestBodyResetOrder(t *testing.T) {
content: defaultContentConfig(),
Client: client,
},
backoff: &noSleepBackOff{},
retry: &withRetry{maxRetries: 1},
backoff: &noSleepBackOff{},
maxRetries: 1,
retryFn: defaultRequestRetryFn,
}

req.Do(context.Background())
Expand Down
13 changes: 0 additions & 13 deletions rest/with_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ var neverRetryError = IsRetryableErrorFunc(func(_ *http.Request, _ error) bool {
// Note that WithRetry is not safe for concurrent use by multiple
// goroutines without additional locking or coordination.
type WithRetry interface {
// SetMaxRetries makes the request use the specified integer as a ceiling
// for retries upon receiving a 429 status code and the "Retry-After" header
// in the response.
// A zero maxRetries should prevent from doing any retry and return immediately.
SetMaxRetries(maxRetries int)

// IsNextRetry advances the retry counter appropriately
// and returns true if the request should be retried,
// otherwise it returns false, if:
Expand Down Expand Up @@ -144,13 +138,6 @@ type withRetry struct {
previousErr, currentErr error
}

func (r *withRetry) SetMaxRetries(maxRetries int) {
if maxRetries < 0 {
maxRetries = 0
}
r.maxRetries = maxRetries
}

func (r *withRetry) trackPreviousError(err error) {
// keep track of two most recent errors
if r.currentErr != nil {
Expand Down

0 comments on commit d8531f5

Please sign in to comment.