/
ratelimit.go
115 lines (96 loc) · 2.83 KB
/
ratelimit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package interceptor
import (
"context"
"time"
"github.com/zhufuyi/pkg/errcode"
rl "github.com/zhufuyi/pkg/shield/ratelimit"
"google.golang.org/grpc"
)
// ---------------------------------- server interceptor ----------------------------------
// ErrLimitExceed is returned when the rate limiter is
// triggered and the request is rejected due to limit exceeded.
var ErrLimitExceed = rl.ErrLimitExceed
// RatelimitOption set the rate limits ratelimitOptions.
type RatelimitOption func(*ratelimitOptions)
type ratelimitOptions struct {
window time.Duration
bucket int
cpuThreshold int64
cpuQuota float64
}
func defaultRatelimitOptions() *ratelimitOptions {
return &ratelimitOptions{
window: time.Second * 10,
bucket: 100,
cpuThreshold: 800,
}
}
func (o *ratelimitOptions) apply(opts ...RatelimitOption) {
for _, opt := range opts {
opt(o)
}
}
// WithWindow with window size.
func WithWindow(d time.Duration) RatelimitOption {
return func(o *ratelimitOptions) {
o.window = d
}
}
// WithBucket with bucket size.
func WithBucket(b int) RatelimitOption {
return func(o *ratelimitOptions) {
o.bucket = b
}
}
// WithCPUThreshold with cpu threshold
func WithCPUThreshold(threshold int64) RatelimitOption {
return func(o *ratelimitOptions) {
o.cpuThreshold = threshold
}
}
// WithCPUQuota with real cpu quota(if it can not collect from process correct);
func WithCPUQuota(quota float64) RatelimitOption {
return func(o *ratelimitOptions) {
o.cpuQuota = quota
}
}
// UnaryServerRateLimit server-side unary circuit breaker interceptor
func UnaryServerRateLimit(opts ...RatelimitOption) grpc.UnaryServerInterceptor {
o := defaultRatelimitOptions()
o.apply(opts...)
limiter := rl.NewLimiter(
rl.WithWindow(o.window),
rl.WithBucket(o.bucket),
rl.WithCPUThreshold(o.cpuThreshold),
rl.WithCPUQuota(o.cpuQuota),
)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
done, err := limiter.Allow()
if err != nil {
return nil, errcode.StatusLimitExceed.ToRPCErr(err.Error())
}
reply, err := handler(ctx, req)
done(rl.DoneInfo{Err: err})
return reply, err
}
}
// StreamServerRateLimit server-side stream circuit breaker interceptor
func StreamServerRateLimit(opts ...RatelimitOption) grpc.StreamServerInterceptor {
o := defaultRatelimitOptions()
o.apply(opts...)
limiter := rl.NewLimiter(
rl.WithWindow(o.window),
rl.WithBucket(o.bucket),
rl.WithCPUThreshold(o.cpuThreshold),
rl.WithCPUQuota(o.cpuQuota),
)
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
done, err := limiter.Allow()
if err != nil {
return errcode.StatusLimitExceed.ToRPCErr(err.Error())
}
err = handler(srv, ss)
done(rl.DoneInfo{Err: err})
return err
}
}