Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

MegaFix global behavior bugs. #225

Merged
merged 24 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f02cb5b
Fix global behavior `ResetTime` bug.
Baliedge Feb 26, 2024
a665c3c
Refine request time propagation.
Baliedge Feb 28, 2024
427dd2b
Merge branch 'master' of github.com:mailgun/gubernator into Baliedge/…
Baliedge Feb 28, 2024
57a5c97
Fix race condition in global behavior.
Baliedge Mar 6, 2024
56a0b22
Fix compile error.
Baliedge Mar 7, 2024
cb3816a
Fix intermittent test error caused by `TestHealthCheck`.
Baliedge Mar 7, 2024
e2b8853
Refactor global behavior and functional tests for stability.
Baliedge Mar 11, 2024
7c67a32
Fix lint errors.
Baliedge Mar 11, 2024
24bee89
Tidy code.
Baliedge Mar 11, 2024
ea42442
Add back tests that were erroneously removed.
Baliedge Mar 11, 2024
bd38ee6
Fix tests.
Baliedge Mar 11, 2024
65ee4fa
Fix benchmark test errors.
Baliedge Mar 11, 2024
d5c74d2
Fix TestHealthCheck.
Baliedge Mar 11, 2024
cd7bbab
Fix flaky test `TestHealthCheck`.
Baliedge Mar 11, 2024
0fb2a33
Backwards compatibility needed for upgrading.
Baliedge Mar 11, 2024
47eede5
Fix compile error.
Baliedge Mar 11, 2024
2229596
Fix test.
Baliedge Mar 11, 2024
c0608d5
Fix for overlimit metric doublecounting on non-owner and owner.
Baliedge Mar 11, 2024
517bf63
Metric `gubernator_getratelimit_counter` adds calltype value "local n…
Baliedge Mar 11, 2024
5f137ad
Changed mind. Instead of `calltype="local non-owner"`, just don't in…
Baliedge Mar 11, 2024
f51861d
Don't call `OnChange()` event from non-owner.
Baliedge Mar 12, 2024
d55016d
Simplify cache item expiration check.
Baliedge Mar 13, 2024
5ce3bc1
Rename `RequestTime` to `CreatedAt` in protos.
Baliedge Mar 13, 2024
0b2adf6
Revert optimization that won't work.
Baliedge Mar 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ $(GOLANGCI_LINT): ## Download Go linter

.PHONY: lint
lint: $(GOLANGCI_LINT) ## Run Go linter
$(GOLANGCI_LINT) run -v --fix -c .golangci.yml ./...
$(GOLANGCI_LINT) run -v -c .golangci.yml ./...

.PHONY: test
test: ## Run unit tests and measure code coverage
Expand All @@ -24,7 +24,7 @@ test: ## Run unit tests and measure code coverage

.PHONY: bench
bench: ## Run Go benchmarks
go test ./... -bench . -benchtime 5s -timeout 0 -run=XXX -benchmem
go test ./... -bench . -benchtime 5s -timeout 0 -run='^$$' -benchmem

.PHONY: docker
docker: ## Build Docker image
Expand Down
87 changes: 49 additions & 38 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import (
// with 100 emails and the request will succeed. You can override this default behavior with `DRAIN_OVER_LIMIT`

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {

func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
tokenBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("tokenBucket"))
defer tokenBucketTimer.ObserveDuration()

Expand Down Expand Up @@ -100,7 +99,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
s.Remove(ctx, hashKey)
}

return tokenBucketNewItem(ctx, s, c, r)
return tokenBucketNewItem(ctx, s, c, r, reqState)
}

// Update the limit if it changed.
Expand Down Expand Up @@ -133,12 +132,12 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
}

// If our new duration means we are currently expired.
now := MillisecondNow()
if expire <= now {
createdAt := *r.CreatedAt
if expire <= createdAt {
// Renew item.
span.AddEvent("Limit has expired")
expire = now + r.Duration
t.CreatedAt = now
expire = createdAt + r.Duration
t.CreatedAt = createdAt
t.Remaining = t.Limit
}

Expand All @@ -147,7 +146,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
rl.ResetTime = expire
}

if s != nil {
if s != nil && reqState.IsOwner {
defer func() {
s.OnChange(ctx, r, item)
}()
Expand All @@ -162,7 +161,9 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
// If we are already at the limit.
if rl.Remaining == 0 && r.Hits > 0 {
trace.SpanFromContext(ctx).AddEvent("Already over the limit")
metricOverLimitCounter.Add(1)
if reqState.IsOwner {
metricOverLimitCounter.Add(1)
}
rl.Status = Status_OVER_LIMIT
t.Status = rl.Status
return rl, nil
Expand All @@ -180,7 +181,9 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
// without updating the cache.
if r.Hits > t.Remaining {
trace.SpanFromContext(ctx).AddEvent("Over the limit")
metricOverLimitCounter.Add(1)
if reqState.IsOwner {
metricOverLimitCounter.Add(1)
}
rl.Status = Status_OVER_LIMIT
if HasBehavior(r.Behavior, Behavior_DRAIN_OVER_LIMIT) {
// DRAIN_OVER_LIMIT behavior drains the remaining counter.
Expand All @@ -196,19 +199,19 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
}

// Item is not found in cache or store, create new.
return tokenBucketNewItem(ctx, s, c, r)
return tokenBucketNewItem(ctx, s, c, r, reqState)
}

// Called by tokenBucket() when adding a new item in the store.
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
now := MillisecondNow()
expire := now + r.Duration
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
createdAt := *r.CreatedAt
expire := createdAt + r.Duration

t := &TokenBucketItem{
Limit: r.Limit,
Duration: r.Duration,
Remaining: r.Limit - r.Hits,
CreatedAt: now,
CreatedAt: createdAt,
}

// Add a new rate limit to the cache.
Expand Down Expand Up @@ -236,31 +239,33 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
// Client could be requesting that we always return OVER_LIMIT.
if r.Hits > r.Limit {
trace.SpanFromContext(ctx).AddEvent("Over the limit")
metricOverLimitCounter.Add(1)
if reqState.IsOwner {
metricOverLimitCounter.Add(1)
}
rl.Status = Status_OVER_LIMIT
rl.Remaining = r.Limit
t.Remaining = r.Limit
}

c.Add(item)

if s != nil {
if s != nil && reqState.IsOwner {
s.OnChange(ctx, r, item)
}

return rl, nil
}

// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
leakyBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getRateLimit_leakyBucket"))
defer leakyBucketTimer.ObserveDuration()

if r.Burst == 0 {
r.Burst = r.Limit
}

now := MillisecondNow()
createdAt := *r.CreatedAt

// Get rate limit from cache.
hashKey := r.HashKey()
Expand Down Expand Up @@ -309,7 +314,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
s.Remove(ctx, hashKey)
}

return leakyBucketNewItem(ctx, s, c, r)
return leakyBucketNewItem(ctx, s, c, r, reqState)
}

if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) {
Expand Down Expand Up @@ -349,16 +354,16 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
}

if r.Hits != 0 {
c.UpdateExpiration(r.HashKey(), now+duration)
c.UpdateExpiration(r.HashKey(), createdAt+duration)
}

// Calculate how much leaked out of the bucket since the last time we leaked a hit
elapsed := now - b.UpdatedAt
elapsed := createdAt - b.UpdatedAt
leak := float64(elapsed) / rate

if int64(leak) > 0 {
b.Remaining += leak
b.UpdatedAt = now
b.UpdatedAt = createdAt
}

if int64(b.Remaining) > b.Burst {
Expand All @@ -369,20 +374,22 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
Limit: b.Limit,
Remaining: int64(b.Remaining),
Status: Status_UNDER_LIMIT,
ResetTime: now + (b.Limit-int64(b.Remaining))*int64(rate),
ResetTime: createdAt + (b.Limit-int64(b.Remaining))*int64(rate),
}

// TODO: Feature missing: check for Duration change between item/request.

if s != nil {
if s != nil && reqState.IsOwner {
defer func() {
s.OnChange(ctx, r, item)
}()
}

// If we are already at the limit
if int64(b.Remaining) == 0 && r.Hits > 0 {
metricOverLimitCounter.Add(1)
if reqState.IsOwner {
metricOverLimitCounter.Add(1)
}
rl.Status = Status_OVER_LIMIT
return rl, nil
}
Expand All @@ -391,14 +398,16 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
if int64(b.Remaining) == r.Hits {
b.Remaining = 0
rl.Remaining = int64(b.Remaining)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
rl.ResetTime = createdAt + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the bucket, unless `DRAIN_OVER_LIMIT` is set.
if r.Hits > int64(b.Remaining) {
metricOverLimitCounter.Add(1)
if reqState.IsOwner {
metricOverLimitCounter.Add(1)
}
rl.Status = Status_OVER_LIMIT

// DRAIN_OVER_LIMIT behavior drains the remaining counter.
Expand All @@ -417,16 +426,16 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

b.Remaining -= float64(r.Hits)
rl.Remaining = int64(b.Remaining)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
rl.ResetTime = createdAt + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

return leakyBucketNewItem(ctx, s, c, r)
return leakyBucketNewItem(ctx, s, c, r, reqState)
}

// Called by leakyBucket() when adding a new item in the store.
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
now := MillisecondNow()
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
createdAt := *r.CreatedAt
duration := r.Duration
rate := float64(duration) / float64(r.Limit)
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
Expand All @@ -445,36 +454,38 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
Remaining: float64(r.Burst - r.Hits),
Limit: r.Limit,
Duration: duration,
UpdatedAt: now,
UpdatedAt: createdAt,
Burst: r.Burst,
}

rl := RateLimitResp{
Status: Status_UNDER_LIMIT,
Limit: b.Limit,
Remaining: r.Burst - r.Hits,
ResetTime: now + (b.Limit-(r.Burst-r.Hits))*int64(rate),
ResetTime: createdAt + (b.Limit-(r.Burst-r.Hits))*int64(rate),
}

// Client could be requesting that we start with the bucket OVER_LIMIT
if r.Hits > r.Burst {
metricOverLimitCounter.Add(1)
if reqState.IsOwner {
metricOverLimitCounter.Add(1)
}
rl.Status = Status_OVER_LIMIT
rl.Remaining = 0
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
rl.ResetTime = createdAt + (rl.Limit-rl.Remaining)*int64(rate)
b.Remaining = 0
}

item := &CacheItem{
ExpireAt: now + duration,
ExpireAt: createdAt + duration,
Algorithm: r.Algorithm,
Key: r.HashKey(),
Value: &b,
}

c.Add(item)

if s != nil {
if s != nil && reqState.IsOwner {
s.OnChange(ctx, r, item)
}

Expand Down
Loading
Loading