/
context_client.go
executable file
·270 lines (247 loc) · 7.49 KB
/
context_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
package httpclient
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
xhttp "net/http"
"strings"
"time"
"go.opencensus.io/trace"
"golang.org/x/net/http2"
"github.com/pkg/errors"
"github.com/subscan-explorer/subscan-common/core/backoff"
"github.com/subscan-explorer/subscan-common/core/retry"
"github.com/subscan-explorer/subscan-common/core/util/xtime"
)
const (
minRead = 16 * 1024 // 16kb
defaultRetryCount int = 0
)
type Config struct {
Dial xtime.Duration
Timeout xtime.Duration
KeepAlive xtime.Duration
MaxConns int
MaxIdle int
BackoffInterval xtime.Duration // Interval is second
retryCount int
}
type HttpClient struct {
conf *Config
client *xhttp.Client
retryCount int
retrier retry.Retriable
}
// NewHTTPClient returns a new instance of httpClient
func NewHTTPClient(c *Config) *HttpClient {
dialer := &net.Dialer{
Timeout: time.Duration(c.Dial),
KeepAlive: time.Duration(c.KeepAlive),
}
transport := &xhttp.Transport{
DialContext: dialer.DialContext,
MaxConnsPerHost: c.MaxConns,
MaxIdleConnsPerHost: c.MaxIdle,
IdleConnTimeout: time.Duration(c.KeepAlive),
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
_ = http2.ConfigureTransport(transport)
bo := backoff.NewConstantBackoff(c.BackoffInterval)
return &HttpClient{
conf: c,
client: &xhttp.Client{
Transport: transport,
},
retryCount: defaultRetryCount,
retrier: retry.NewRetrier(bo),
}
}
// SetRetryCount sets the retry count for the httpClient
func (c *HttpClient) SetRetryCount(count int) {
c.retryCount = count
}
// SetRetryCount sets the retry count for the httpClient
func (c *HttpClient) SetRetrier(retrier retry.Retriable) {
c.retrier = retrier
}
// Get makes a HTTP GET request to provided URL with context passed in
func (c *HttpClient) Get(ctx context.Context, url string, headers xhttp.Header, res interface{}) (err error) {
ctx, span := trace.StartSpan(ctx, "httpclient Get")
defer span.End()
request, err := xhttp.NewRequest(xhttp.MethodGet, url, nil)
if err != nil {
return errors.Wrap(err, "GET - request creation failed")
}
request.Header = headers
ats := []trace.Attribute{
trace.StringAttribute("Get URL", url),
}
span.Annotate(ats, "GET Request")
span.AddAttributes(ats...)
return c.Do(ctx, request, res)
}
// Post makes a HTTP POST request to provided URL with context passed in
func (c *HttpClient) Post(ctx context.Context, url, contentType string, headers xhttp.Header, param, res interface{}) (err error) {
ctx, span := trace.StartSpan(ctx, "httpclient Post")
defer span.End()
request, err := xhttp.NewRequest(xhttp.MethodPost, url, reqBody(contentType, param))
if err != nil {
return errors.Wrap(err, "POST - request creation failed")
}
if headers == nil {
headers = make(xhttp.Header)
}
headers.Set("Content-Type", contentType)
request.Header = headers
paramByte, _ := json.Marshal(param)
ats := []trace.Attribute{
trace.StringAttribute("POST URL", url),
trace.StringAttribute("POST PARAM ", string(paramByte)),
}
span.Annotate(ats, "POST Request")
span.AddAttributes(ats...)
return c.Do(ctx, request, res)
}
// Put makes a HTTP PUT request to provided URL with context passed in
func (c *HttpClient) Put(ctx context.Context, url, contentType string, headers xhttp.Header, param, res interface{}) (err error) {
ctx, span := trace.StartSpan(ctx, "httpclient Put")
defer span.End()
request, err := xhttp.NewRequest(xhttp.MethodPut, url, reqBody(contentType, param))
if err != nil {
return errors.Wrap(err, "PUT - request creation failed")
}
if headers == nil {
headers = make(xhttp.Header)
}
headers.Set("Content-Type", contentType)
request.Header = headers
paramByte, _ := json.Marshal(param)
ats := []trace.Attribute{
trace.StringAttribute("PUT URL", url),
trace.StringAttribute("PUT PARAM ", string(paramByte)),
}
span.Annotate(ats, "PUT Request")
span.AddAttributes(ats...)
return c.Do(ctx, request, res)
}
// Patch makes a HTTP PATCH request to provided URL with context passed in
func (c *HttpClient) PATCH(ctx context.Context, url, contentType string, headers xhttp.Header, param, res interface{}) (err error) {
ctx, span := trace.StartSpan(ctx, "httpclient Patch")
defer span.End()
request, err := xhttp.NewRequest(xhttp.MethodPatch, url, reqBody(contentType, param))
if err != nil {
return errors.Wrap(err, "PATCH - request creation failed")
}
if headers == nil {
headers = make(xhttp.Header)
}
headers.Set("Content-Type", contentType)
request.Header = headers
paramByte, _ := json.Marshal(param)
ats := []trace.Attribute{
trace.StringAttribute("PATCH URL", url),
trace.StringAttribute("PATCH PARAM ", string(paramByte)),
}
span.Annotate(ats, "PATCH Request")
span.AddAttributes(ats...)
return c.Do(ctx, request, res)
}
// Delete makes a HTTP DELETE request to provided URL with context passed in
func (c *HttpClient) Delete(ctx context.Context, url, contentType string, headers xhttp.Header, param, res interface{}) (err error) {
ctx, span := trace.StartSpan(ctx, "httpclient Delete")
defer span.End()
request, err := xhttp.NewRequest(xhttp.MethodDelete, url, nil)
if err != nil {
return errors.Wrap(err, "DELETE - request creation failed")
}
if headers == nil {
headers = make(xhttp.Header)
}
headers.Set("Content-Type", contentType)
request.Header = headers
paramByte, _ := json.Marshal(param)
ats := []trace.Attribute{
trace.StringAttribute("DELETE URL", url),
trace.StringAttribute("DELETE PARAM ", string(paramByte)),
}
span.Annotate(ats, "DELETE Request")
span.AddAttributes(ats...)
return c.Do(ctx, request, res)
}
// Do makes an HTTP request with the native `http.Do` interface and context passed in
func (c *HttpClient) Do(ctx context.Context, req *xhttp.Request, res interface{}) (err error) {
for i := 0; i <= c.retryCount; i++ {
if err = c.request(ctx, req, res); err != nil {
backoffTime := c.retrier.NextInterval(i)
time.Sleep(backoffTime)
continue
}
break
}
return
}
func (c *HttpClient) request(ctx context.Context, req *xhttp.Request, res interface{}) (err error) {
var (
response *xhttp.Response
bs []byte
cancel func()
)
ctx, cancel = context.WithTimeout(ctx, time.Duration(c.conf.Timeout))
defer cancel()
response, err = c.client.Do(req.WithContext(ctx))
if err != nil {
<-ctx.Done()
err = ctx.Err()
return
}
defer response.Body.Close()
if response.StatusCode >= xhttp.StatusInternalServerError {
err = errors.Wrap(errors.New("Server Error"), fmt.Sprintf("Remote URL %s Response.StatusCode %d", req.URL, response.StatusCode))
return
}
if bs, err = readAll(response.Body, minRead); err != nil {
return
}
err = json.Unmarshal(bs, &res)
return
}
func reqBody(contentType string, param interface{}) (body io.Reader) {
var err error
if contentType == MIMEPOSTForm {
enc, ok := param.(string)
if ok {
body = strings.NewReader(enc)
}
}
if contentType == MIMEJSON {
buff := new(bytes.Buffer)
err = json.NewEncoder(buff).Encode(param)
if err != nil {
return
}
body = buff
}
return
}
func readAll(r io.Reader, capacity int64) (b []byte, err error) {
buf := bytes.NewBuffer(make([]byte, 0, capacity))
// If the buffer overflows, we will get bytes.ErrTooLarge.
// Return that as an error. Any other panic remains.
defer func() {
e := recover()
if e == nil {
return
}
if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
err = panicErr
} else {
panic(e)
}
}()
_, err = buf.ReadFrom(r)
return buf.Bytes(), err
}