-
Notifications
You must be signed in to change notification settings - Fork 621
/
transport.go
72 lines (59 loc) 路 1.71 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package api
import (
"net/http"
"time"
"golang.org/x/time/rate"
)
// A rate-limited HTTP transport for requests to the W&B backend.
//
// Implements [http.RoundTripper] for use as a transport for an HTTP client.
type RateLimitedTransport struct {
delegate http.RoundTripper
// Rate limit for all outgoing requests.
rateLimiter *rate.Limiter
// Dynamic adjustments to the rate-limit based on server backpressure.
rlTracker *RateLimitTracker
}
// Rate-limits an HTTP transport for the W&B backend.
func NewRateLimitedTransport(
delegate http.RoundTripper,
) *RateLimitedTransport {
return &RateLimitedTransport{
delegate: delegate,
rateLimiter: rate.NewLimiter(maxRequestsPerSecond, maxBurst),
rlTracker: NewRateLimitTracker(RateLimitTrackerParams{
MinPerSecond: minRequestsPerSecond,
MaxPerSecond: maxRequestsPerSecond,
// TODO: Allow changing these through settings.
Smoothing: 0.2,
MinRequestsForEstimate: 5,
}),
}
}
func (transport *RateLimitedTransport) RoundTrip(
req *http.Request,
) (*http.Response, error) {
if err := transport.rateLimiter.Wait(req.Context()); err != nil {
// Errors happen if:
// - The request is canceled
// - The rate limit exceeds the request deadline
return nil, err
}
transport.rlTracker.TrackRequest()
resp, err := transport.delegate.RoundTrip(req)
if resp != nil {
transport.processRateLimitHeaders(resp)
}
return resp, err
}
func (transport *RateLimitedTransport) processRateLimitHeaders(
resp *http.Response,
) {
rateLimit, ok := ParseRateLimitHeaders(resp.Header)
if !ok {
return
}
transport.rlTracker.UpdateEstimates(time.Now(), rateLimit)
transport.rateLimiter.SetLimit(rate.Limit(
transport.rlTracker.TargetRateLimit()))
}