-
Notifications
You must be signed in to change notification settings - Fork 63
/
interceptor_retry_simple.go
213 lines (182 loc) · 6.6 KB
/
interceptor_retry_simple.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package clientv2
import (
"context"
"math/rand"
"net"
"net/http"
"time"
clientv1 "github.com/qiniu/go-sdk/v7/client"
internal_io "github.com/qiniu/go-sdk/v7/internal/io"
"github.com/qiniu/go-sdk/v7/storagev2/backoff"
"github.com/qiniu/go-sdk/v7/storagev2/chooser"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
"github.com/qiniu/go-sdk/v7/storagev2/retrier"
)
type (
contextKeyBufferResponse struct{}
SimpleRetryConfig struct {
RetryMax int // 最大重试次数
RetryInterval func() time.Duration // 重试时间间隔 v1
Backoff backoff.Backoff // 重试时间间隔 v2,优先级高于 RetryInterval
ShouldRetry func(req *http.Request, resp *http.Response, err error) bool
Resolver resolver.Resolver // 主备域名解析器
Chooser chooser.Chooser // IP 选择器
Retrier retrier.Retrier // 重试器
BeforeResolve func(*http.Request) // 域名解析前回调函数
AfterResolve func(*http.Request, []net.IP) // 域名解析后回调函数
ResolveError func(*http.Request, error) // 域名解析错误回调函数
BeforeBackoff func(*http.Request, *retrier.RetrierOptions, time.Duration) // 退避前回调函数
AfterBackoff func(*http.Request, *retrier.RetrierOptions, time.Duration) // 退避后回调函数
BeforeRequest func(*http.Request, *retrier.RetrierOptions) // 请求前回调函数
AfterResponse func(*http.Response, *retrier.RetrierOptions, error) // 请求后回调函数
}
simpleRetryInterceptor struct {
config SimpleRetryConfig
}
)
func (c *SimpleRetryConfig) init() {
if c == nil {
return
}
if c.RetryMax < 0 {
c.RetryMax = 0
}
}
func (c *SimpleRetryConfig) getRetryInterval(ctx context.Context, attempts int) time.Duration {
if bf := c.Backoff; bf != nil {
return bf.Time(ctx, &backoff.BackoffOptions{Attempts: attempts})
}
if ri := c.RetryInterval; ri != nil {
return ri()
}
return defaultRetryInterval()
}
var errorRetrier = retrier.NewErrorRetrier()
func (c *SimpleRetryConfig) getRetryDecision(req *http.Request, resp *http.Response, err error, attempts int) retrier.RetryDecision {
if c.ShouldRetry != nil {
if c.ShouldRetry(req, resp, err) {
return retrier.RetryRequest
} else {
return retrier.DontRetry
}
} else {
r := errorRetrier
if c.Retrier != nil {
r = c.Retrier
}
return r.Retry(resp, err, &retrier.RetrierOptions{Attempts: attempts})
}
}
func NewSimpleRetryInterceptor(config SimpleRetryConfig) Interceptor {
return &simpleRetryInterceptor{config: config}
}
func (interceptor *simpleRetryInterceptor) Priority() InterceptorPriority {
return InterceptorPriorityRetrySimple
}
func (interceptor *simpleRetryInterceptor) Intercept(req *http.Request, handler Handler) (resp *http.Response, err error) {
var chosenIPs []net.IP
if interceptor == nil || req == nil {
return interceptor.callHandler(req, &retrier.RetrierOptions{Attempts: 0}, handler)
}
interceptor.config.init()
hostname := req.URL.Hostname()
resolvedIPs := interceptor.resolve(req, hostname)
// 可能会被重试多次
for i := 0; ; i++ {
req, chosenIPs = interceptor.choose(req, resolvedIPs, hostname)
// Clone 防止后面 Handler 处理对 req 有污染
reqBefore := cloneReq(req)
resp, err = interceptor.callHandler(req, &retrier.RetrierOptions{Attempts: i}, handler)
retryDecision := interceptor.config.getRetryDecision(reqBefore, resp, err, i)
if retryDecision == retrier.DontRetry {
interceptor.feedbackGood(req, hostname, chosenIPs)
return resp, err
}
interceptor.feedbackBad(req, hostname, chosenIPs)
req = reqBefore
if retryDecision == retrier.TryNextHost || i >= interceptor.config.RetryMax {
break
}
if resp != nil && resp.Body != nil {
_ = internal_io.SinkAll(resp.Body)
resp.Body.Close()
}
interceptor.backoff(req, i)
}
return resp, err
}
func (interceptor *simpleRetryInterceptor) callHandler(req *http.Request, options *retrier.RetrierOptions, handler Handler) (resp *http.Response, err error) {
if interceptor.config.BeforeRequest != nil {
interceptor.config.BeforeRequest(req, options)
}
resp, err = handler(req)
if interceptor.config.AfterResponse != nil {
interceptor.config.AfterResponse(resp, options, err)
}
return
}
func (interceptor *simpleRetryInterceptor) resolve(req *http.Request, hostname string) []net.IP {
var (
ips []net.IP
err error
)
if resolver := interceptor.config.Resolver; resolver != nil {
if interceptor.config.BeforeResolve != nil {
interceptor.config.BeforeResolve(req)
}
if ips, err = resolver.Resolve(req.Context(), hostname); err == nil {
if interceptor.config.AfterResolve != nil {
interceptor.config.AfterResolve(req, ips)
}
} else if err != nil && interceptor.config.ResolveError != nil {
interceptor.config.ResolveError(req, err)
}
}
return ips
}
func (interceptor *simpleRetryInterceptor) choose(req *http.Request, ips []net.IP, hostname string) (*http.Request, []net.IP) {
if len(ips) > 0 {
if cs := interceptor.config.Chooser; cs != nil {
ips = cs.Choose(req.Context(), ips, &chooser.ChooseOptions{Domain: hostname})
}
req = req.WithContext(clientv1.WithResolvedIPs(req.Context(), hostname, ips))
}
return req, ips
}
func (interceptor *simpleRetryInterceptor) feedbackGood(req *http.Request, hostname string, ips []net.IP) {
if len(ips) > 0 {
if cs := interceptor.config.Chooser; cs != nil {
cs.FeedbackGood(req.Context(), ips, &chooser.FeedbackOptions{Domain: hostname})
}
}
}
func (interceptor *simpleRetryInterceptor) feedbackBad(req *http.Request, hostname string, ips []net.IP) {
if len(ips) > 0 {
if cs := interceptor.config.Chooser; cs != nil {
cs.FeedbackBad(req.Context(), ips, &chooser.FeedbackOptions{Domain: hostname})
}
}
}
func (interceptor *simpleRetryInterceptor) backoff(req *http.Request, attempts int) {
retryInterval := interceptor.config.getRetryInterval(req.Context(), attempts)
if interceptor.config.BeforeBackoff != nil {
interceptor.config.BeforeBackoff(req, &retrier.RetrierOptions{Attempts: attempts}, retryInterval)
}
if retryInterval >= time.Microsecond {
time.Sleep(retryInterval)
}
if interceptor.config.AfterBackoff != nil {
interceptor.config.AfterBackoff(req, &retrier.RetrierOptions{Attempts: attempts}, retryInterval)
}
}
func bufferResponse(resp *http.Response) error {
buffer, err := internal_io.ReadAll(resp.Body)
if err != nil {
return err
}
resp.Body = internal_io.NewBytesNopCloser(buffer)
return nil
}
func defaultRetryInterval() time.Duration {
return time.Duration(50+rand.Int()%50) * time.Millisecond
}