-
Notifications
You must be signed in to change notification settings - Fork 343
/
leakybucket.go
123 lines (108 loc) · 3.99 KB
/
leakybucket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package ratelimit
import (
"context"
_ "embed"
"fmt"
"time"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/zalando/skipper/metrics"
"github.com/zalando/skipper/net"
)
type ClusterLeakyBucket struct {
capacity int
emission time.Duration
labelPrefix string
script *net.RedisScript
ringClient *net.RedisRingClient
metrics metrics.Metrics
now func() time.Time
}
const (
leakyBucketRedisKeyPrefix = "lkb."
leakyBucketMetricPrefix = "leakybucket.redis."
leakyBucketMetricLatency = leakyBucketMetricPrefix + "latency"
leakyBucketSpanName = "redis_leakybucket"
)
// Implements leaky bucket algorithm as a Redis lua script.
// Redis guarantees that a script is executed in an atomic way:
// no other script or Redis command will be executed while a script is being executed.
//
// Possible optimization: substitute capacity and emission in script source code
// on script creation in order not to send them over the wire on each call.
// This way every distinct bucket configuration will get its own script.
//
// See https://redis.io/commands/eval
//
//go:embed leakybucket.lua
var leakyBucketScript string
// NewClusterLeakyBucket creates a class of leaky buckets of a given capacity and emission.
// Emission is the reciprocal of the leak rate and equals the time to leak one unit.
//
// The leaky bucket is an algorithm based on an analogy of how a bucket with a constant leak will overflow if either
// the average rate at which water is poured in exceeds the rate at which the bucket leaks or if more water than
// the capacity of the bucket is poured in all at once.
// See https://en.wikipedia.org/wiki/Leaky_bucket
func NewClusterLeakyBucket(r *Registry, capacity int, emission time.Duration) *ClusterLeakyBucket {
return newClusterLeakyBucket(r.redisRing, capacity, emission, time.Now)
}
func newClusterLeakyBucket(ringClient *net.RedisRingClient, capacity int, emission time.Duration, now func() time.Time) *ClusterLeakyBucket {
return &ClusterLeakyBucket{
capacity: capacity,
emission: emission,
labelPrefix: fmt.Sprintf("%d-%v-", capacity, emission),
script: ringClient.NewScript(leakyBucketScript),
ringClient: ringClient,
metrics: metrics.Default,
now: now,
}
}
// Add adds an increment amount to the bucket identified by the label.
// It returns true if the amount was successfully added to the bucket or a time to wait for the next attempt.
// It also returns any error occurred during the attempt.
func (b *ClusterLeakyBucket) Add(ctx context.Context, label string, increment int) (added bool, retry time.Duration, err error) {
if increment > b.capacity {
// not allowed to add more than capacity and retry is not possible
return false, 0, nil
}
now := b.now()
span := b.startSpan(ctx)
defer span.Finish()
defer b.metrics.MeasureSince(leakyBucketMetricLatency, now)
added, retry, err = b.add(ctx, label, increment, now)
if err != nil {
ext.Error.Set(span, true)
}
return
}
func (b *ClusterLeakyBucket) add(ctx context.Context, label string, increment int, now time.Time) (added bool, retry time.Duration, err error) {
r, err := b.ringClient.RunScript(ctx, b.script,
[]string{b.getBucketId(label)},
b.capacity,
b.emission.Microseconds(),
increment,
now.UnixMicro(),
)
if err == nil {
x := r.(int64)
if x >= 0 {
added, retry = true, 0
} else {
added, retry = false, -time.Duration(x)*time.Microsecond
}
}
return
}
func (b *ClusterLeakyBucket) getBucketId(label string) string {
return leakyBucketRedisKeyPrefix + getHashedKey(b.labelPrefix+label)
}
func (b *ClusterLeakyBucket) startSpan(ctx context.Context) (span opentracing.Span) {
spanOpts := []opentracing.StartSpanOption{opentracing.Tags{
string(ext.Component): "skipper",
string(ext.SpanKind): "client",
}}
if parent := opentracing.SpanFromContext(ctx); parent != nil {
spanOpts = append(spanOpts, opentracing.ChildOf(parent.Context()))
}
return b.ringClient.StartSpan(leakyBucketSpanName, spanOpts...)
}