forked from snowflakedb/gosnowflake
-
Notifications
You must be signed in to change notification settings - Fork 0
/
retry.go
124 lines (110 loc) · 2.87 KB
/
retry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright (c) 2017-2018 Snowflake Computing Inc. All right reserved.
package gosnowflake
import (
"bytes"
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"time"
"context"
"sync"
)
var random *rand.Rand
func init() {
random = rand.New(rand.NewSource(time.Now().UnixNano()))
}
type waitAlgo struct {
mutex *sync.Mutex // required for random.Int63n
base time.Duration // base wait time
cap time.Duration // maximum wait time
}
func randSecondDuration(n time.Duration) time.Duration {
return time.Duration(random.Int63n(int64(n/time.Second))) * time.Second
}
// decorrelated jitter backoff
func (w *waitAlgo) decorr(attempt int, sleep time.Duration) time.Duration {
w.mutex.Lock()
defer w.mutex.Unlock()
t := 3*sleep - w.base
switch {
case t > 0:
return durationMin(w.cap, randSecondDuration(t)+w.base)
case t < 0:
return durationMin(w.cap, randSecondDuration(-t)+3*sleep)
}
return w.base
}
var defaultWaitAlgo = &waitAlgo{
mutex: &sync.Mutex{},
base: 5 * time.Second,
cap: 160 * time.Second,
}
type requestFunc func(method, urlStr string, body io.Reader) (*http.Request, error)
type clientInterface interface {
Do(req *http.Request) (*http.Response, error)
}
func retryHTTP(
ctx context.Context,
client clientInterface,
req requestFunc,
method string,
fullURL string,
headers map[string]string,
body []byte,
timeout time.Duration) (res *http.Response, err error) {
totalTimeout := timeout
glog.V(2).Infof("retryHTTP.totalTimeout: %v", totalTimeout)
retryCounter := 0
sleepTime := time.Duration(0)
for {
req, err := req(method, fullURL, bytes.NewReader(body))
if err != nil {
return nil, err
}
if req != nil {
// req can be nil in tests
req = req.WithContext(ctx)
}
for k, v := range headers {
req.Header.Set(k, v)
}
res, err = client.Do(req)
if err == nil && res.StatusCode == http.StatusOK {
// success
break
}
if err == context.Canceled {
break
}
// cannot just return 4xx and 5xx status as the error can be sporadic. retry often helps.
if err != nil {
glog.V(2).Infof(
"failed http connection. no response is returned. err: %v. retrying...\n", err)
} else {
glog.V(2).Infof(
"failed http connection. HTTP Status: %v. retrying...\n", res.StatusCode)
}
// uses decorrelated jitter backoff
sleepTime = defaultWaitAlgo.decorr(retryCounter, sleepTime)
if totalTimeout > 0 {
glog.V(2).Infof("to timeout: %v", totalTimeout)
// if any timeout is set
totalTimeout -= sleepTime
if totalTimeout <= 0 {
if err != nil {
return nil, fmt.Errorf("timeout. err: %v. Hanging?", err)
}
if res != nil {
return nil, fmt.Errorf("timeout. HTTP Status: %v. Hanging?", res.StatusCode)
}
return nil, errors.New("timeout. Hanging?")
}
}
retryCounter++
glog.V(2).Infof("sleeping %v. to timeout: %v. retrying", sleepTime, totalTimeout)
time.Sleep(sleepTime)
}
return res, err
}