forked from kubernetes-sigs/aws-load-balancer-controller
-
Notifications
You must be signed in to change notification settings - Fork 0
/
throttler.go
70 lines (59 loc) · 2.09 KB
/
throttler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package throttle
import (
"github.com/aws/aws-sdk-go/aws/request"
"golang.org/x/time/rate"
"regexp"
)
const sdkHandlerRequestThrottle = "requestThrottle"
type conditionLimiter struct {
condition Condition
limiter *rate.Limiter
}
type throttler struct {
conditionLimiters []conditionLimiter
}
// NewThrottler constructs new request throttler instance.
func NewThrottler(config *ServiceOperationsThrottleConfig) *throttler {
throttler := &throttler{}
for serviceID, operationsThrottleConfigs := range config.value {
for _, operationsThrottleConfig := range operationsThrottleConfigs {
throttler = throttler.WithOperationPatternThrottle(
serviceID,
operationsThrottleConfig.operationPtn,
operationsThrottleConfig.r,
operationsThrottleConfig.burst)
}
}
return throttler
}
func (t *throttler) WithConditionThrottle(condition Condition, r rate.Limit, burst int) *throttler {
limiter := rate.NewLimiter(r, burst)
t.conditionLimiters = append(t.conditionLimiters, conditionLimiter{
condition: condition,
limiter: limiter,
})
return t
}
func (t *throttler) WithServiceThrottle(serviceID string, r rate.Limit, burst int) *throttler {
return t.WithConditionThrottle(matchService(serviceID), r, burst)
}
func (t *throttler) WithOperationThrottle(serviceID string, operation string, r rate.Limit, burst int) *throttler {
return t.WithConditionThrottle(matchServiceOperation(serviceID, operation), r, burst)
}
func (t *throttler) WithOperationPatternThrottle(serviceID string, operationPtn *regexp.Regexp, r rate.Limit, burst int) *throttler {
return t.WithConditionThrottle(matchServiceOperationPattern(serviceID, operationPtn), r, burst)
}
func (t *throttler) InjectHandlers(handlers *request.Handlers) {
handlers.Sign.PushFrontNamed(request.NamedHandler{
Name: sdkHandlerRequestThrottle,
Fn: t.beforeSign,
})
}
// beforeSign is added to the Sign chain; called before each request
func (t *throttler) beforeSign(r *request.Request) {
for _, conditionLimiter := range t.conditionLimiters {
if conditionLimiter.condition(r) {
conditionLimiter.limiter.Wait(r.Context())
}
}
}