Implement rate limiter with dual token buckets and header parsing#31
Implement rate limiter with dual token buckets and header parsing#31
Conversation
Add client-layer rate limiting that automatically throttles all API calls. - Dual token buckets: general (100 req/min), search (30 req/min) - Search endpoints (/records, /communities, /licenses) consume both buckets - Proactive sleep when approaching limits - X-RateLimit-Remaining/Reset header parsing adjusts bucket state - Integrated into Client.do() and Client.GetRaw() — all requests rate-limited - Logging when throttling occurs - 9 tests: bucket creation, path detection, token consumption, refill, cap, wait duration, header updates, nil safety Closes #6 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR implements a dual token bucket rate limiter to automatically throttle API requests to Zenodo's endpoints, preventing rate limit violations. The rate limiter operates transparently at the HTTP client layer, checking limits before each request and synchronizing with server-reported rate limit headers.
Changes:
- Added dual token bucket rate limiter with separate limits for general (100/min) and search (30/min) endpoints
- Integrated rate limiter into Client with automatic throttling before requests and state updates from response headers
- Added comprehensive unit tests for bucket behavior, path detection, and header parsing
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 14 comments.
| File | Description |
|---|---|
| internal/api/ratelimit.go | New rate limiter with dual token buckets, Wait() for proactive throttling, UpdateFromHeaders() for server synchronization, and isSearchPath() helper |
| internal/api/ratelimit_test.go | Unit tests covering rate limiter creation, path detection, token consumption, refill logic, wait duration calculation, and header updates |
| internal/api/client.go | Added rateLimiter field to Client, integrated Wait() calls before requests and UpdateFromHeaders() after responses in both do() and GetRaw() |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| resp := &http.Response{ | ||
| Header: http.Header{ | ||
| "X-Ratelimit-Remaining": []string{"5"}, |
There was a problem hiding this comment.
The header name in the test uses incorrect casing. The test uses "X-Ratelimit-Remaining" (lowercase 'l' in 'limit'), but the production code in ratelimit.go line 121 expects "X-RateLimit-Remaining" (camelCase 'RateLimit'). This means the test doesn't properly validate the header parsing functionality. According to HTTP/2 specification and common practice, header names are case-insensitive, but Go's http.Header.Get() performs case-insensitive lookups. However, for consistency and to match Zenodo's actual header format, the test should use the same casing as the production code.
| "X-Ratelimit-Remaining": []string{"5"}, | |
| "X-RateLimit-Remaining": []string{"5"}, |
| if isSearchPath(path) && rem < rl.search.tokens { | ||
| slog.Debug("rate limit: server reports lower search remaining", "remaining", rem) | ||
| rl.search.tokens = rem | ||
| } | ||
|
|
||
| if rem < rl.general.tokens { |
There was a problem hiding this comment.
The UpdateFromHeaders method only updates bucket tokens when the server reports fewer remaining tokens than the client's local count (lines 134 and 139 check rem < bucket.tokens). However, this approach doesn't handle the case where the server reports MORE tokens than locally tracked. This can happen after the rate limit window resets on the server side. The local buckets will continue to be constrained by their stale low count until enough time passes for local refills to catch up, potentially throttling requests unnecessarily even though the server has reset its limits.
| if isSearchPath(path) && rem < rl.search.tokens { | |
| slog.Debug("rate limit: server reports lower search remaining", "remaining", rem) | |
| rl.search.tokens = rem | |
| } | |
| if rem < rl.general.tokens { | |
| if isSearchPath(path) && rem != rl.search.tokens { | |
| slog.Debug("rate limit: server reports lower search remaining", "remaining", rem) | |
| rl.search.tokens = rem | |
| } | |
| if rem != rl.general.tokens { |
| return time.Duration(deficit/b.refillRate*1000) * time.Millisecond | ||
| } | ||
|
|
||
| func (b *bucket) consume() { |
There was a problem hiding this comment.
The consume() method decrements tokens without checking if they're available (b.tokens--). Combined with the race condition in Wait(), where multiple goroutines might consume after waiting, this can result in negative token counts. While the token bucket algorithm should handle this through increased wait times on subsequent requests, it violates the invariant that tokens should stay between 0 and maxTokens. Consider adding a check to prevent tokens from going below 0, or restructure the Wait() method to prevent concurrent consumption races.
| func (b *bucket) consume() { | |
| func (b *bucket) consume() { | |
| if b.tokens <= 0 { | |
| return | |
| } |
| package api | ||
|
|
||
| import ( | ||
| "net/http" | ||
| "testing" | ||
| "time" | ||
| ) | ||
|
|
||
| func TestNewRateLimiter(t *testing.T) { | ||
| rl := NewRateLimiter() | ||
| if rl.general.maxTokens != 100 { | ||
| t.Errorf("general max = %f, want 100", rl.general.maxTokens) | ||
| } | ||
| if rl.search.maxTokens != 30 { | ||
| t.Errorf("search max = %f, want 30", rl.search.maxTokens) | ||
| } | ||
| } | ||
|
|
||
| func TestIsSearchPath(t *testing.T) { | ||
| tests := []struct { | ||
| path string | ||
| search bool | ||
| }{ | ||
| {"/records", true}, | ||
| {"/records/123", true}, | ||
| {"/communities", true}, | ||
| {"/licenses", true}, | ||
| {"/deposit/depositions", false}, | ||
| {"/api/user/records", false}, | ||
| } | ||
| for _, tt := range tests { | ||
| if got := isSearchPath(tt.path); got != tt.search { | ||
| t.Errorf("isSearchPath(%q) = %v, want %v", tt.path, got, tt.search) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func TestRateLimiter_GeneralConsumes(t *testing.T) { | ||
| rl := NewRateLimiter() | ||
|
|
||
| // Consume a general token. | ||
| startTokens := rl.general.tokens | ||
| rl.Wait("/deposit/depositions/123") | ||
|
|
||
| rl.mu.Lock() | ||
| after := rl.general.tokens | ||
| rl.mu.Unlock() | ||
|
|
||
| // Should have consumed ~1 token (might refill a tiny amount). | ||
| if after > startTokens-0.5 { | ||
| t.Errorf("expected general tokens to decrease, before=%f after=%f", startTokens, after) | ||
| } | ||
| } | ||
|
|
||
| func TestRateLimiter_SearchConsumesBoth(t *testing.T) { | ||
| rl := NewRateLimiter() | ||
|
|
||
| generalBefore := rl.general.tokens | ||
| searchBefore := rl.search.tokens | ||
|
|
||
| rl.Wait("/records") | ||
|
|
||
| rl.mu.Lock() | ||
| generalAfter := rl.general.tokens | ||
| searchAfter := rl.search.tokens | ||
| rl.mu.Unlock() | ||
|
|
||
| if generalAfter > generalBefore-0.5 { | ||
| t.Errorf("expected general tokens to decrease, before=%f after=%f", generalBefore, generalAfter) | ||
| } | ||
| if searchAfter > searchBefore-0.5 { | ||
| t.Errorf("expected search tokens to decrease, before=%f after=%f", searchBefore, searchAfter) | ||
| } | ||
| } | ||
|
|
||
| func TestBucket_Refill(t *testing.T) { | ||
| b := newBucket(10, 60) // 1 token per second | ||
| b.tokens = 0 | ||
| b.lastRefill = time.Now().Add(-2 * time.Second) // 2 seconds ago | ||
|
|
||
| b.refill() | ||
|
|
||
| // Should have ~2 tokens after refill. | ||
| if b.tokens < 1.5 || b.tokens > 2.5 { | ||
| t.Errorf("expected ~2 tokens after 2s refill, got %f", b.tokens) | ||
| } | ||
| } | ||
|
|
||
| func TestBucket_RefillCapped(t *testing.T) { | ||
| b := newBucket(10, 60) | ||
| b.tokens = 10 | ||
| b.lastRefill = time.Now().Add(-10 * time.Second) | ||
|
|
||
| b.refill() | ||
|
|
||
| if b.tokens != 10 { | ||
| t.Errorf("tokens should be capped at max, got %f", b.tokens) | ||
| } | ||
| } | ||
|
|
||
| func TestBucket_WaitDuration(t *testing.T) { | ||
| b := newBucket(100, 60) // 1 token per second | ||
| b.tokens = 0.5 | ||
|
|
||
| wait := b.waitDuration() | ||
| // Need 0.5 more tokens at 1/sec = ~500ms | ||
| if wait < 400*time.Millisecond || wait > 600*time.Millisecond { | ||
| t.Errorf("expected ~500ms wait, got %v", wait) | ||
| } | ||
|
|
||
| b.tokens = 5 | ||
| wait = b.waitDuration() | ||
| if wait != 0 { | ||
| t.Errorf("expected 0 wait with tokens available, got %v", wait) | ||
| } | ||
| } | ||
|
|
||
| func TestUpdateFromHeaders(t *testing.T) { | ||
| rl := NewRateLimiter() | ||
|
|
||
| resp := &http.Response{ | ||
| Header: http.Header{ | ||
| "X-Ratelimit-Remaining": []string{"5"}, | ||
| }, | ||
| } | ||
|
|
||
| rl.UpdateFromHeaders(resp, "/records") | ||
|
|
||
| rl.mu.Lock() | ||
| defer rl.mu.Unlock() | ||
|
|
||
| if rl.general.tokens > 5.5 { | ||
| t.Errorf("expected general tokens <=5 after header update, got %f", rl.general.tokens) | ||
| } | ||
| if rl.search.tokens > 5.5 { | ||
| t.Errorf("expected search tokens <=5 after header update, got %f", rl.search.tokens) | ||
| } | ||
| } | ||
|
|
||
| func TestUpdateFromHeaders_Nil(t *testing.T) { | ||
| rl := NewRateLimiter() | ||
| // Should not panic. | ||
| rl.UpdateFromHeaders(nil, "/test") | ||
| } |
There was a problem hiding this comment.
Missing test coverage for concurrent access patterns. The RateLimiter uses a mutex and is designed to handle concurrent requests from multiple goroutines, but there are no tests that verify thread-safety or correct behavior under concurrent load. Given that the Wait() method has potential race conditions (unlocking during sleep), tests should verify that multiple goroutines calling Wait() simultaneously don't violate rate limits or cause data races.
| if reset != "" { | ||
| resetTime, err := strconv.ParseInt(reset, 10, 64) | ||
| if err == nil { | ||
| resetAt := time.Unix(resetTime, 0) | ||
| untilReset := time.Until(resetAt) | ||
| if untilReset > 0 && rem <= 5 { | ||
| slog.Warn("rate limit: approaching limit, server resets in", "seconds", untilReset.Seconds(), "remaining", rem) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The X-RateLimit-Reset header is parsed but not actually used to synchronize the rate limiter state. Lines 146-152 parse the reset timestamp and log a warning, but don't update the lastRefill time or adjust the token count based on when the server's rate limit window actually resets. This means the client's token bucket refill schedule can drift out of sync with the server's actual rate limit windows, potentially causing the client to be too conservative or too aggressive compared to the server's limits.
| defer rl.mu.Unlock() | ||
|
|
||
| // Refill both buckets. | ||
| rl.general.refill() | ||
| rl.search.refill() | ||
|
|
||
| // Check general bucket. | ||
| if wait := rl.general.waitDuration(); wait > 0 { | ||
| slog.Info("rate limiting: waiting for general bucket", "wait", wait) | ||
| rl.mu.Unlock() | ||
| time.Sleep(wait) | ||
| rl.mu.Lock() | ||
| rl.general.refill() | ||
| } | ||
|
|
||
| // Check search bucket for search endpoints. | ||
| if isSearchPath(path) { | ||
| if wait := rl.search.waitDuration(); wait > 0 { | ||
| slog.Info("rate limiting: waiting for search bucket", "wait", wait) | ||
| rl.mu.Unlock() | ||
| time.Sleep(wait) | ||
| rl.mu.Lock() | ||
| rl.search.refill() | ||
| } | ||
| rl.search.consume() | ||
| } | ||
|
|
||
| rl.general.consume() |
There was a problem hiding this comment.
Race condition: Similar to the general bucket handling, the mutex is unlocked during sleep (line 100) and re-locked (line 102). Between these operations, other goroutines can modify the search bucket state. After re-acquiring the lock, the code refills the bucket (line 103) and then immediately consumes a token (line 105) without re-checking if tokens are actually available. This can lead to the bucket going negative when multiple goroutines are competing for search endpoint access.
| defer rl.mu.Unlock() | |
| // Refill both buckets. | |
| rl.general.refill() | |
| rl.search.refill() | |
| // Check general bucket. | |
| if wait := rl.general.waitDuration(); wait > 0 { | |
| slog.Info("rate limiting: waiting for general bucket", "wait", wait) | |
| rl.mu.Unlock() | |
| time.Sleep(wait) | |
| rl.mu.Lock() | |
| rl.general.refill() | |
| } | |
| // Check search bucket for search endpoints. | |
| if isSearchPath(path) { | |
| if wait := rl.search.waitDuration(); wait > 0 { | |
| slog.Info("rate limiting: waiting for search bucket", "wait", wait) | |
| rl.mu.Unlock() | |
| time.Sleep(wait) | |
| rl.mu.Lock() | |
| rl.search.refill() | |
| } | |
| rl.search.consume() | |
| } | |
| rl.general.consume() | |
| // Refill both buckets. | |
| rl.general.refill() | |
| rl.search.refill() | |
| // Check general bucket. | |
| for { | |
| if wait := rl.general.waitDuration(); wait > 0 { | |
| slog.Info("rate limiting: waiting for general bucket", "wait", wait) | |
| rl.mu.Unlock() | |
| time.Sleep(wait) | |
| rl.mu.Lock() | |
| rl.general.refill() | |
| continue | |
| } | |
| break | |
| } | |
| // Check search bucket for search endpoints. | |
| if isSearchPath(path) { | |
| for { | |
| if wait := rl.search.waitDuration(); wait > 0 { | |
| slog.Info("rate limiting: waiting for search bucket", "wait", wait) | |
| rl.mu.Unlock() | |
| time.Sleep(wait) | |
| rl.mu.Lock() | |
| rl.search.refill() | |
| continue | |
| } | |
| break | |
| } | |
| rl.search.consume() | |
| } | |
| rl.general.consume() | |
| rl.mu.Unlock() |
| if untilReset > 0 && rem <= 5 { | ||
| slog.Warn("rate limit: approaching limit, server resets in", "seconds", untilReset.Seconds(), "remaining", rem) | ||
| } |
There was a problem hiding this comment.
The UpdateFromHeaders function logs a warning when remaining tokens are low and approaching the limit (line 151), but this logic doesn't trigger any proactive backoff. The requirement NFR-2.3 specifies "proactive sleep when remaining < 10% of limit," but this is only satisfied by the token bucket algorithm's sleep behavior when tokens are exhausted. There's no proactive backoff based on the server-reported remaining count. When the server reports only 5 remaining tokens out of 100 (5%), the client should proactively slow down or pause, but currently it only logs and continues consuming tokens normally until they're exhausted.
| if untilReset > 0 && rem <= 5 { | |
| slog.Warn("rate limit: approaching limit, server resets in", "seconds", untilReset.Seconds(), "remaining", rem) | |
| } | |
| // Proactive backoff when remaining quota is below 10% of the limit (NFR-2.3). | |
| limit := 100.0 | |
| if isSearchPath(path) { | |
| limit = 30.0 | |
| } | |
| threshold := 0.1 * limit | |
| if untilReset > 0 && rem <= threshold { | |
| slog.Warn("rate limit: approaching limit, applying proactive backoff", "seconds_until_reset", untilReset.Seconds(), "remaining", rem, "limit", limit) | |
| // Sleep for a fraction of the remaining reset window to slow down proactively. | |
| // This satisfies "proactive sleep when remaining < 10% of limit" while avoiding | |
| // a full stop until reset. | |
| sleepDur := untilReset / 10 | |
| if sleepDur > 0 { | |
| time.Sleep(sleepDur) | |
| } | |
| } |
| func TestUpdateFromHeaders(t *testing.T) { | ||
| rl := NewRateLimiter() | ||
|
|
||
| resp := &http.Response{ | ||
| Header: http.Header{ | ||
| "X-Ratelimit-Remaining": []string{"5"}, | ||
| }, | ||
| } | ||
|
|
||
| rl.UpdateFromHeaders(resp, "/records") | ||
|
|
||
| rl.mu.Lock() | ||
| defer rl.mu.Unlock() | ||
|
|
||
| if rl.general.tokens > 5.5 { | ||
| t.Errorf("expected general tokens <=5 after header update, got %f", rl.general.tokens) | ||
| } | ||
| if rl.search.tokens > 5.5 { | ||
| t.Errorf("expected search tokens <=5 after header update, got %f", rl.search.tokens) | ||
| } | ||
| } |
There was a problem hiding this comment.
Missing test coverage for error handling in UpdateFromHeaders. The function handles errors from strconv.ParseFloat (line 129) and strconv.ParseInt (line 147) by silently returning, but there are no tests that verify this behavior with invalid header values like "invalid", "NaN", or very large numbers that might overflow.
| // Should have ~2 tokens after refill. | ||
| if b.tokens < 1.5 || b.tokens > 2.5 { | ||
| t.Errorf("expected ~2 tokens after 2s refill, got %f", b.tokens) | ||
| } |
There was a problem hiding this comment.
The test verifies that tokens are refilled correctly, but doesn't verify that lastRefill is updated to the current time after refill(). This is an important invariant because if lastRefill isn't updated, subsequent refills would add too many tokens. Consider adding an assertion that b.lastRefill is approximately equal to the current time after calling refill().
| } | |
| } | |
| // lastRefill should be updated to approximately now. | |
| now := time.Now() | |
| if b.lastRefill.After(now) { | |
| t.Errorf("lastRefill should not be in the future, got %v > %v", b.lastRefill, now) | |
| } | |
| if now.Sub(b.lastRefill) > 200*time.Millisecond { | |
| t.Errorf("lastRefill should be updated close to now, got %v (delta %v)", b.lastRefill, now.Sub(b.lastRefill)) | |
| } |
| rl.search.tokens = rem | ||
| } | ||
|
|
||
| if rem < rl.general.tokens { | ||
| slog.Debug("rate limit: server reports lower general remaining", "remaining", rem) | ||
| rl.general.tokens = rem |
There was a problem hiding this comment.
When UpdateFromHeaders sets tokens to a server-reported value (lines 136 and 141), it doesn't update the lastRefill timestamp. This can lead to incorrect behavior on the next refill() call because the elapsed time calculation (line 38) will use a stale lastRefill time, potentially adding too many tokens. When synchronizing with server state, lastRefill should be reset to time.Now() to prevent over-refilling.
| rl.search.tokens = rem | |
| } | |
| if rem < rl.general.tokens { | |
| slog.Debug("rate limit: server reports lower general remaining", "remaining", rem) | |
| rl.general.tokens = rem | |
| rl.search.tokens = rem | |
| rl.search.lastRefill = time.Now() | |
| } | |
| if rem < rl.general.tokens { | |
| slog.Debug("rate limit: server reports lower general remaining", "remaining", rem) | |
| rl.general.tokens = rem | |
| rl.general.lastRefill = time.Now() |
ELI5
Zenodo limits how many API requests you can make per minute (100 general, 30 for searches). If you blow past those limits, your requests get rejected. This PR adds an automatic speed governor inside the HTTP client — before every request, it checks if you have "tokens" left in your bucket. If not, it waits the right amount of time. It also reads Zenodo's response headers to stay in sync with the server's actual count. You never have to think about rate limits; the CLI handles it.
Summary
/records,/communities,/licenses) consume from both bucketsX-RateLimit-RemainingandX-RateLimit-ResetheadersClient.do()andClient.GetRaw()— every request is rate-limited automaticallyCode changes
internal/api/ratelimit.goRateLimiter, dualbucket,Wait(),UpdateFromHeaders(),isSearchPath()internal/api/ratelimit_test.gointernal/api/client.gorateLimiterfield,Wait()before requests,UpdateFromHeaders()after responsesTest plan
go test ./...— all 35 tests passgo build ./cmd/zenodo/compilesCloses #6
🤖 Generated with Claude Code