/
transport.go
406 lines (360 loc) · 12.7 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
// Copyright (c) 2022 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package http
import (
"context"
"math/rand"
"net"
"net/http"
"sync"
"time"
"github.com/opentracing/opentracing-go"
"go.uber.org/net/metrics"
backoffapi "go.uber.org/yarpc/api/backoff"
"go.uber.org/yarpc/api/peer"
"go.uber.org/yarpc/api/transport"
yarpctls "go.uber.org/yarpc/api/transport/tls"
"go.uber.org/yarpc/internal/backoff"
"go.uber.org/yarpc/pkg/lifecycle"
"go.uber.org/zap"
)
type transportOptions struct {
keepAlive time.Duration
maxIdleConns int
maxIdleConnsPerHost int
idleConnTimeout time.Duration
disableKeepAlives bool
disableCompression bool
responseHeaderTimeout time.Duration
connTimeout time.Duration
connBackoffStrategy backoffapi.Strategy
innocenceWindow time.Duration
dialContext func(ctx context.Context, network, addr string) (net.Conn, error)
jitter func(int64) int64
tracer opentracing.Tracer
buildClient func(*transportOptions) *http.Client
logger *zap.Logger
meter *metrics.Scope
serviceName string
outboundTLSConfigProvider yarpctls.OutboundTLSConfigProvider
}
var defaultTransportOptions = transportOptions{
keepAlive: 30 * time.Second,
maxIdleConnsPerHost: 2,
connTimeout: defaultConnTimeout,
connBackoffStrategy: backoff.DefaultExponential,
buildClient: buildHTTPClient,
innocenceWindow: defaultInnocenceWindow,
idleConnTimeout: defaultIdleConnTimeout,
jitter: rand.Int63n,
}
func newTransportOptions() transportOptions {
options := defaultTransportOptions
options.tracer = opentracing.GlobalTracer()
return options
}
// TransportOption customizes the behavior of an HTTP transport.
type TransportOption func(*transportOptions)
func (TransportOption) httpOption() {}
// KeepAlive specifies the keep-alive period for the network connection. If
// zero, keep-alives are disabled.
//
// Defaults to 30 seconds.
func KeepAlive(t time.Duration) TransportOption {
return func(options *transportOptions) {
options.keepAlive = t
}
}
// MaxIdleConns controls the maximum number of idle (keep-alive) connections
// across all hosts. Zero means no limit.
func MaxIdleConns(i int) TransportOption {
return func(options *transportOptions) {
options.maxIdleConns = i
}
}
// MaxIdleConnsPerHost specifies the number of idle (keep-alive) HTTP
// connections that will be maintained per host.
// Existing idle connections will be used instead of creating new HTTP
// connections.
//
// Defaults to 2 connections.
func MaxIdleConnsPerHost(i int) TransportOption {
return func(options *transportOptions) {
options.maxIdleConnsPerHost = i
}
}
// IdleConnTimeout is the maximum amount of time an idle (keep-alive)
// connection will remain idle before closing itself.
// Zero means no limit.
//
// Defaults to 15 minutes.
func IdleConnTimeout(t time.Duration) TransportOption {
return func(options *transportOptions) {
options.idleConnTimeout = t
}
}
// DisableKeepAlives prevents re-use of TCP connections between different HTTP
// requests.
func DisableKeepAlives() TransportOption {
return func(options *transportOptions) {
options.disableKeepAlives = true
}
}
// DisableCompression if true prevents the Transport from requesting
// compression with an "Accept-Encoding: gzip" request header when the Request
// contains no existing Accept-Encoding value. If the Transport requests gzip
// on its own and gets a gzipped response, it's transparently decoded in the
// Response.Body. However, if the user explicitly requested gzip it is not
// automatically uncompressed.
func DisableCompression() TransportOption {
return func(options *transportOptions) {
options.disableCompression = true
}
}
// ResponseHeaderTimeout if non-zero specifies the amount of time to wait for
// a server's response headers after fully writing the request (including its
// body, if any). This time does not include the time to read the response
// body.
func ResponseHeaderTimeout(t time.Duration) TransportOption {
return func(options *transportOptions) {
options.responseHeaderTimeout = t
}
}
// ConnTimeout is the time that the transport will wait for a connection attempt.
// If a peer has been retained by a peer list, connection attempts are
// performed in a goroutine off the request path.
//
// The default is half a second.
func ConnTimeout(d time.Duration) TransportOption {
return func(options *transportOptions) {
options.connTimeout = d
}
}
// ConnBackoff specifies the connection backoff strategy for delays between
// connection attempts for each peer.
//
// The default is exponential backoff starting with 10ms fully jittered,
// doubling each attempt, with a maximum interval of 30s.
func ConnBackoff(s backoffapi.Strategy) TransportOption {
return func(options *transportOptions) {
options.connBackoffStrategy = s
}
}
// InnocenceWindow is the duration after the peer connection management loop
// will suspend suspicion for a peer after successfully checking whether the
// peer is live with a fresh TCP connection.
//
// The default innocence window is 5 seconds.
//
// A timeout does not necessarily indicate that a peer is unavailable,
// but it could indicate that the connection is half-open, that the peer died
// without sending a TCP FIN packet.
// In this case, the peer connection management loop attempts to open a TCP
// connection in the background, once per innocence window, while suspicious of
// the connection, leaving the peer available until it fails.
func InnocenceWindow(d time.Duration) TransportOption {
return func(options *transportOptions) {
options.innocenceWindow = d
}
}
// DialContext specifies the dial function for creating TCP connections on the
// outbound. This will override the default dial context, which has a 30 second
// timeout and respects the KeepAlive option.
//
// See https://golang.org/pkg/net/http/#Transport.DialContext for details.
func DialContext(f func(ctx context.Context, network, addr string) (net.Conn, error)) TransportOption {
return func(options *transportOptions) {
options.dialContext = f
}
}
// Tracer configures a tracer for the transport and all its inbounds and
// outbounds.
func Tracer(tracer opentracing.Tracer) TransportOption {
return func(options *transportOptions) {
options.tracer = tracer
}
}
// Logger sets a logger to use for internal logging.
//
// The default is to not write any logs.
func Logger(logger *zap.Logger) TransportOption {
return func(options *transportOptions) {
options.logger = logger
}
}
// Meter sets a meter to use for internal transport metrics.
//
// The default is to not emit any metrics.
func Meter(meter *metrics.Scope) TransportOption {
return func(options *transportOptions) {
options.meter = meter
}
}
// ServiceName sets the name of the service used in transport logging
// and metrics.
func ServiceName(name string) TransportOption {
return func(options *transportOptions) {
options.serviceName = name
}
}
// OutboundTLSConfigProvider returns an TransportOption that provides the
// outbound TLS config provider.
func OutboundTLSConfigProvider(provider yarpctls.OutboundTLSConfigProvider) TransportOption {
return func(options *transportOptions) {
options.outboundTLSConfigProvider = provider
}
}
// Hidden option to override the buildHTTPClient function. This is used only
// for testing.
func buildClient(f func(*transportOptions) *http.Client) TransportOption {
return func(options *transportOptions) {
options.buildClient = f
}
}
// NewTransport creates a new HTTP transport for managing peers and sending requests
func NewTransport(opts ...TransportOption) *Transport {
options := newTransportOptions()
for _, opt := range opts {
opt(&options)
}
return options.newTransport()
}
func (o *transportOptions) newTransport() *Transport {
logger := o.logger
if logger == nil {
logger = zap.NewNop()
}
return &Transport{
once: lifecycle.NewOnce(),
client: o.buildClient(o),
connTimeout: o.connTimeout,
connBackoffStrategy: o.connBackoffStrategy,
innocenceWindow: o.innocenceWindow,
jitter: o.jitter,
peers: make(map[string]*httpPeer),
tracer: o.tracer,
logger: logger,
meter: o.meter,
serviceName: o.serviceName,
ouboundTLSConfigProvider: o.outboundTLSConfigProvider,
}
}
func buildHTTPClient(options *transportOptions) *http.Client {
dialContext := options.dialContext
if dialContext == nil {
dialContext = (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: options.keepAlive,
}).DialContext
}
return &http.Client{
Transport: &http.Transport{
// options lifted from https://golang.org/src/net/http/transport.go
Proxy: http.ProxyFromEnvironment,
DialContext: dialContext,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
MaxIdleConns: options.maxIdleConns,
MaxIdleConnsPerHost: options.maxIdleConnsPerHost,
IdleConnTimeout: options.idleConnTimeout,
DisableKeepAlives: options.disableKeepAlives,
DisableCompression: options.disableCompression,
ResponseHeaderTimeout: options.responseHeaderTimeout,
},
}
}
// Transport keeps track of HTTP peers and the associated HTTP client. It
// allows using a single HTTP client to make requests to multiple YARPC
// services and pooling the resources needed therein.
type Transport struct {
lock sync.Mutex
once *lifecycle.Once
client *http.Client
peers map[string]*httpPeer
connTimeout time.Duration
connBackoffStrategy backoffapi.Strategy
connectorsGroup sync.WaitGroup
innocenceWindow time.Duration
jitter func(int64) int64
tracer opentracing.Tracer
logger *zap.Logger
meter *metrics.Scope
serviceName string
ouboundTLSConfigProvider yarpctls.OutboundTLSConfigProvider
}
var _ transport.Transport = (*Transport)(nil)
// Start starts the HTTP transport.
func (a *Transport) Start() error {
return a.once.Start(func() error {
return nil // Nothing to do
})
}
// Stop stops the HTTP transport.
func (a *Transport) Stop() error {
return a.once.Stop(func() error {
closeIdleConnections(a.client)
a.connectorsGroup.Wait()
return nil
})
}
// IsRunning returns whether the HTTP transport is running.
func (a *Transport) IsRunning() bool {
return a.once.IsRunning()
}
// RetainPeer gets or creates a Peer for the specified peer.Subscriber (usually a peer.Chooser)
func (a *Transport) RetainPeer(pid peer.Identifier, sub peer.Subscriber) (peer.Peer, error) {
a.lock.Lock()
defer a.lock.Unlock()
p := a.getOrCreatePeer(pid)
p.Subscribe(sub)
return p, nil
}
// **NOTE** should only be called while the lock write mutex is acquired
func (a *Transport) getOrCreatePeer(pid peer.Identifier) *httpPeer {
addr := pid.Identifier()
if p, ok := a.peers[addr]; ok {
return p
}
p := newPeer(addr, a)
a.peers[addr] = p
a.connectorsGroup.Add(1)
go p.MaintainConn()
return p
}
// ReleasePeer releases a peer from the peer.Subscriber and removes that peer from the Transport if nothing is listening to it
func (a *Transport) ReleasePeer(pid peer.Identifier, sub peer.Subscriber) error {
a.lock.Lock()
defer a.lock.Unlock()
p, ok := a.peers[pid.Identifier()]
if !ok {
return peer.ErrTransportHasNoReferenceToPeer{
TransportName: "http.Transport",
PeerIdentifier: pid.Identifier(),
}
}
if err := p.Unsubscribe(sub); err != nil {
return err
}
if p.NumSubscribers() == 0 {
delete(a.peers, pid.Identifier())
p.Release()
}
return nil
}