-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
with_retry.go
232 lines (199 loc) · 7.94 KB
/
with_retry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
"k8s.io/klog/v2"
)
// IsRetryableErrorFunc allows the client to provide its own function
// that determines whether the specified err from the server is retryable.
//
// request: the original request sent to the server
// err: the server sent this error to us
//
// The function returns true if the error is retryable and the request
// can be retried, otherwise it returns false.
// We have four mode of communications - 'Stream', 'Watch', 'Do' and 'DoRaw', this
// function allows us to customize the retryability aspect of each.
type IsRetryableErrorFunc func(request *http.Request, err error) bool
func (r IsRetryableErrorFunc) IsErrorRetryable(request *http.Request, err error) bool {
return r(request, err)
}
var neverRetryError = IsRetryableErrorFunc(func(_ *http.Request, _ error) bool {
return false
})
// WithRetry allows the client to retry a request up to a certain number of times
// Note that WithRetry is not safe for concurrent use by multiple
// goroutines without additional locking or coordination.
type WithRetry interface {
// SetMaxRetries makes the request use the specified integer as a ceiling
// for retries upon receiving a 429 status code and the "Retry-After" header
// in the response.
// A zero maxRetries should prevent from doing any retry and return immediately.
SetMaxRetries(maxRetries int)
// NextRetry advances the retry counter appropriately and returns true if the
// request should be retried, otherwise it returns false if:
// - we have already reached the maximum retry threshold.
// - the error does not fall into the retryable category.
// - the server has not sent us a 429, or 5xx status code and the
// 'Retry-After' response header is not set with a value.
//
// if retry is set to true, retryAfter will contain the information
// regarding the next retry.
//
// request: the original request sent to the server
// resp: the response sent from the server, it is set if err is nil
// err: the server sent this error to us, if err is set then resp is nil.
// f: a IsRetryableErrorFunc function provided by the client that determines
// if the err sent by the server is retryable.
NextRetry(req *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) (*RetryAfter, bool)
// BeforeNextRetry is responsible for carrying out operations that need
// to be completed before the next retry is initiated:
// - if the request context is already canceled there is no need to
// retry, the function will return ctx.Err().
// - we need to seek to the beginning of the request body before we
// initiate the next retry, the function should return an error if
// it fails to do so.
// - we should wait the number of seconds the server has asked us to
// in the 'Retry-After' response header.
//
// If BeforeNextRetry returns an error the client should abort the retry,
// otherwise it is safe to initiate the next retry.
BeforeNextRetry(ctx context.Context, backoff BackoffManager, retryAfter *RetryAfter, url string, body io.Reader) error
}
// RetryAfter holds information associated with the next retry.
type RetryAfter struct {
// Wait is the duration the server has asked us to wait before
// the next retry is initiated.
// This is the value of the 'Retry-After' response header in seconds.
Wait time.Duration
// Attempt is the Nth attempt after which we have received a retryable
// error or a 'Retry-After' response header from the server.
Attempt int
// Reason describes why we are retrying the request
Reason string
}
type withRetry struct {
maxRetries int
attempts int
}
func (r *withRetry) SetMaxRetries(maxRetries int) {
if maxRetries < 0 {
maxRetries = 0
}
r.maxRetries = maxRetries
}
func (r *withRetry) NextRetry(req *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) (*RetryAfter, bool) {
if req == nil || (resp == nil && err == nil) {
// bad input, we do nothing.
return nil, false
}
r.attempts++
retryAfter := &RetryAfter{Attempt: r.attempts}
if r.attempts > r.maxRetries {
return retryAfter, false
}
// if the server returned an error, it takes precedence over the http response.
var errIsRetryable bool
if f != nil && err != nil && f.IsErrorRetryable(req, err) {
errIsRetryable = true
// we have a retryable error, for which we will create an
// artificial "Retry-After" response.
resp = retryAfterResponse()
}
if err != nil && !errIsRetryable {
return retryAfter, false
}
// if we are here, we have either a or b:
// a: we have a retryable error, for which we already
// have an artificial "Retry-After" response.
// b: we have a response from the server for which we
// need to check if it is retryable
seconds, wait := checkWait(resp)
if !wait {
return retryAfter, false
}
retryAfter.Wait = time.Duration(seconds) * time.Second
retryAfter.Reason = getRetryReason(r.attempts, seconds, resp, err)
return retryAfter, true
}
func (r *withRetry) BeforeNextRetry(ctx context.Context, backoff BackoffManager, retryAfter *RetryAfter, url string, body io.Reader) error {
// Ensure the response body is fully read and closed before
// we reconnect, so that we reuse the same TCP connection.
if ctx.Err() != nil {
return ctx.Err()
}
if seeker, ok := body.(io.Seeker); ok && body != nil {
if _, err := seeker.Seek(0, 0); err != nil {
return fmt.Errorf("can't Seek() back to beginning of body for %T", r)
}
}
klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", retryAfter.Wait, retryAfter.Attempt, url)
if backoff != nil {
backoff.Sleep(retryAfter.Wait)
}
return nil
}
// checkWait returns true along with a number of seconds if
// the server instructed us to wait before retrying.
func checkWait(resp *http.Response) (int, bool) {
switch r := resp.StatusCode; {
// any 500 error code and 429 can trigger a wait
case r == http.StatusTooManyRequests, r >= 500:
default:
return 0, false
}
i, ok := retryAfterSeconds(resp)
return i, ok
}
func getRetryReason(retries, seconds int, resp *http.Response, err error) string {
// priority and fairness sets the UID of the FlowSchema
// associated with a request in the following response Header.
const responseHeaderMatchedFlowSchemaUID = "X-Kubernetes-PF-FlowSchema-UID"
message := fmt.Sprintf("retries: %d, retry-after: %ds", retries, seconds)
switch {
case resp.StatusCode == http.StatusTooManyRequests:
// it is server-side throttling from priority and fairness
flowSchemaUID := resp.Header.Get(responseHeaderMatchedFlowSchemaUID)
return fmt.Sprintf("%s - retry-reason: due to server-side throttling, FlowSchema UID: %q", message, flowSchemaUID)
case err != nil:
// it's a retryable error
return fmt.Sprintf("%s - retry-reason: due to retryable error, error: %v", message, err)
default:
return fmt.Sprintf("%s - retry-reason: %d", message, resp.StatusCode)
}
}
func readAndCloseResponseBody(resp *http.Response) {
if resp == nil {
return
}
// Ensure the response body is fully read and closed
// before we reconnect, so that we reuse the same TCP
// connection.
const maxBodySlurpSize = 2 << 10
defer resp.Body.Close()
if resp.ContentLength <= maxBodySlurpSize {
io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
}
}
func retryAfterResponse() *http.Response {
return &http.Response{
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{"1"}},
}
}