# Step 8 — Rate Limiter Module

**What we built**: A `RateLimitModule` using the token bucket algorithm with per-provider shared state.

**Why it matters**: Provider APIs enforce rate limits (requests per minute). Without client-side throttling, thousands of concurrent agents would blast the API, get 429'd, and waste time on retries. The rate limiter sits *innermost* in the module stack — throttling *before* the call goes out — so fallback and retry don't even trigger on rate-limit waits.

**Key decisions**:
- **D-055**: Token bucket algorithm (allows bursts, enforces average rate — battle-tested in nginx, AWS)
- **D-056**: Per-provider shared buckets (rate limits are per API key, not per model)
- **D-058**: Async wait + WARNING log (transparent to agents, no error raised)
- **D-060**: Innermost position: `Retry(Fallback(RateLimit(adapter)))`

**Stack position**: `Telemetry → Retry → Fallback → RateLimit → Adapter`

In [None]:
# Setup: ensure arcllm is importable
import sys, os
sys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), '..', 'src')))

---
## 1. The Token Bucket Algorithm

The token bucket is a classic rate-limiting algorithm:

1. **Bucket starts full** — `capacity` tokens available
2. **Each request consumes 1 token** — if available, proceed immediately
3. **If empty, wait** — sleep until a token refills
4. **Tokens refill continuously** — at `refill_rate` tokens/second, capped at `capacity`

```
capacity=5, refill_rate=1/sec:

t=0: [●●●●●]  5 tokens (full)
t=0: [●●●●○]  4 tokens (1 request consumed)
t=0: [●●●○○]  3 tokens (burst: 2nd request, no wait)
t=0: [●●○○○]  2 tokens (burst: 3rd request)
t=1: [●●●○○]  3 tokens (1 refilled after 1 second)
t=1: [●●○○○]  2 tokens (4th request)
```

**Why token bucket vs. other algorithms?**
- Sliding window: complex, needs timestamp storage per request
- Fixed window: allows 2x burst at window boundary
- Leaky bucket: no burst ability — every request waits
- **Token bucket: simple counter + timestamp, allows bursts, enforces average rate**

---
## 2. Constructing a TokenBucket

In [None]:
from arcllm.modules.rate_limit import TokenBucket

# 60 RPM = 1 request/second refill rate, bucket holds 60 tokens
bucket = TokenBucket(capacity=60, refill_rate=1.0)

print(f"Capacity:     {bucket._capacity}")
print(f"Tokens:       {bucket._tokens}")
print(f"Refill rate:  {bucket._refill_rate} tokens/sec")
print(f"Lock type:    {type(bucket._lock).__name__}")

Notice:
- Bucket starts **full** (`tokens == capacity`)
- Uses `asyncio.Lock` for concurrent safety (multiple agents sharing one bucket)
- `refill_rate` is in tokens/second (RPM / 60)

---
## 3. Acquiring Tokens — Immediate vs. Wait

In [None]:
import asyncio

# Small bucket for demo: capacity=3, refill 10/sec (fast for demo)
bucket = TokenBucket(capacity=3, refill_rate=10.0)

# Immediate acquires (tokens available)
for i in range(3):
    wait = await bucket.acquire()
    print(f"Acquire {i+1}: wait={wait:.4f}s, tokens_remaining={bucket._tokens:.1f}")

print("\n--- Bucket empty! Next acquire will wait ---\n")

# This one has to wait for refill
wait = await bucket.acquire()
print(f"Acquire 4: wait={wait:.4f}s (waited for refill)")

Key behaviors:
- Returns `0.0` when a token is immediately available
- Returns `> 0.0` when it had to wait (the actual wait time in seconds)
- The caller sees **no error** — just a transparent delay

---
## 4. Refill Mechanics

Tokens refill based on elapsed time since last refill, capped at capacity.

In [None]:
import time
from unittest.mock import patch

# Demonstrate refill with mocked time
with patch("arcllm.modules.rate_limit.time.monotonic") as mock_mono:
    mock_mono.return_value = 1000.0
    bucket = TokenBucket(capacity=10, refill_rate=2.0)  # 2 tokens/sec
    
    # Drain all tokens
    for _ in range(10):
        # Direct internal drain (bypassing async for demo)
        bucket._tokens -= 1.0
    print(f"After draining: {bucket._tokens} tokens")
    
    # Advance time by 3 seconds → should refill 6 tokens
    mock_mono.return_value = 1003.0
    bucket._refill()
    print(f"After 3s refill: {bucket._tokens} tokens (2/sec × 3s = 6)")
    
    # Advance 100 more seconds → would be 200, but capped at 10
    mock_mono.return_value = 1103.0
    bucket._refill()
    print(f"After 100s more: {bucket._tokens} tokens (capped at capacity={bucket._capacity})")

The refill formula:
```python
elapsed = now - last_refill
tokens = min(capacity, tokens + elapsed * refill_rate)
```

- `time.monotonic()` — immune to system clock adjustments
- Cap prevents unbounded token accumulation during idle periods

---
## 5. Burst Capacity

The `capacity` parameter controls **burst allowance** — how many requests can fire immediately before throttling kicks in.

In [None]:
# burst_capacity=5 allows 5 immediate requests
bucket = TokenBucket(capacity=5, refill_rate=1.0)

waits = []
for i in range(5):
    w = await bucket.acquire()
    waits.append(w)

print(f"5 burst acquires: {waits}")
print(f"All immediate:    {all(w == 0.0 for w in waits)}")
print(f"Tokens remaining: {bucket._tokens}")

**Why separate burst_capacity from RPM?**

| Config | RPM | Burst | Behavior |
|--------|-----|-------|----------|
| Default | 60 | 60 | Fire 60 immediately, then 1/sec sustained |
| Conservative | 60 | 10 | Fire 10 immediately, then 1/sec sustained |
| Aggressive | 60 | 120 | Fire 120 immediately, then 1/sec sustained |

Default: `burst_capacity = requests_per_minute` (sensible for most APIs).

---
## 6. The RateLimitModule

In [None]:
from unittest.mock import AsyncMock, MagicMock
from arcllm.modules.rate_limit import RateLimitModule, clear_buckets
from arcllm.types import LLMProvider, LLMResponse, Message, Usage

clear_buckets()  # Clean state

# Create a mock inner adapter
inner = MagicMock(spec=LLMProvider)
inner.name = "anthropic"
inner.model_name = "claude-sonnet-4-20250514"
inner.invoke = AsyncMock(return_value=LLMResponse(
    content="Hello!",
    usage=Usage(input_tokens=10, output_tokens=5, total_tokens=15),
    model="claude-sonnet-4-20250514",
    stop_reason="end_turn",
))

# Wrap with rate limiting: 60 RPM, burst=5
config = {"requests_per_minute": 60, "burst_capacity": 5}
module = RateLimitModule(config, inner)

print(f"Module type:    {type(module).__name__}")
print(f"Provider name:  {module._provider_name}")
print(f"Bucket capacity: {module._bucket._capacity}")
print(f"Bucket refill:  {module._bucket._refill_rate} tokens/sec")
print(f"  (60 RPM / 60 = 1.0 tokens/sec)")

In [None]:
# Invoke — acquires token, then delegates to inner
messages = [Message(role="user", content="hi")]
result = await module.invoke(messages)

print(f"Response: {result.content}")
print(f"Inner called: {inner.invoke.await_count} time(s)")

The flow:
```
module.invoke(messages)
  → bucket.acquire()          # Get a token (wait if empty)
  → if waited: log WARNING    # "Rate limited for 'anthropic'. Waited 0.42s"
  → inner.invoke(messages)    # Delegate to actual adapter
  → return response
```

---
## 7. Per-Provider Shared Buckets

Rate limits are per API key, not per model. So all agents using the same provider share one bucket.

In [None]:
from arcllm.modules.rate_limit import _bucket_registry

clear_buckets()

# Two modules for same provider
inner1 = MagicMock(spec=LLMProvider)
inner1.name = "anthropic"
inner2 = MagicMock(spec=LLMProvider)
inner2.name = "anthropic"

config = {"requests_per_minute": 60}
m1 = RateLimitModule(config, inner1)
m2 = RateLimitModule(config, inner2)

print(f"Same bucket?     {m1._bucket is m2._bucket}")
print(f"Registry entries: {list(_bucket_registry.keys())}")

# Different provider → different bucket
inner3 = MagicMock(spec=LLMProvider)
inner3.name = "openai"
m3 = RateLimitModule(config, inner3)

print(f"\nAfter OpenAI module:")
print(f"Registry entries: {list(_bucket_registry.keys())}")
print(f"Same as m1?      {m1._bucket is m3._bucket}")

In [None]:
# clear_buckets() resets shared state — used by tests and registry.clear_cache()
print(f"Before clear: {len(_bucket_registry)} buckets")
clear_buckets()
print(f"After clear:  {len(_bucket_registry)} buckets")

---
## 8. Config Validation

In [None]:
from arcllm.exceptions import ArcLLMConfigError

inner = MagicMock(spec=LLMProvider)
inner.name = "test"

# Zero RPM → rejected (would mean no requests ever)
try:
    RateLimitModule({"requests_per_minute": 0}, inner)
except ArcLLMConfigError as e:
    print(f"Zero RPM:      {e}")

# Negative RPM → rejected
try:
    RateLimitModule({"requests_per_minute": -10}, inner)
except ArcLLMConfigError as e:
    print(f"Negative RPM:  {e}")

# Zero burst → rejected (can't make any request)
try:
    RateLimitModule({"requests_per_minute": 60, "burst_capacity": 0}, inner)
except ArcLLMConfigError as e:
    print(f"Zero burst:    {e}")

In [None]:
# burst_capacity defaults to RPM when not specified
clear_buckets()
module = RateLimitModule({"requests_per_minute": 120}, inner)
print(f"RPM=120, no burst specified → burst_capacity={module._bucket._capacity}")

---
## 9. Throttle Logging

When the bucket is empty and a caller waits, a WARNING is logged with the provider name and wait duration.

In [None]:
import logging

clear_buckets()

# Set up logging capture
handler = logging.StreamHandler()
handler.setLevel(logging.WARNING)
rl_logger = logging.getLogger("arcllm.modules.rate_limit")
rl_logger.addHandler(handler)
rl_logger.setLevel(logging.WARNING)

# Tiny burst so we hit the limit fast
inner = MagicMock(spec=LLMProvider)
inner.name = "anthropic"
inner.invoke = AsyncMock(return_value=LLMResponse(
    content="ok",
    usage=Usage(input_tokens=10, output_tokens=5, total_tokens=15),
    model="test",
    stop_reason="end_turn",
))

module = RateLimitModule({"requests_per_minute": 60, "burst_capacity": 1}, inner)

# First call: immediate (has 1 token)
await module.invoke([Message(role="user", content="first")])
print("First call: immediate (no log)")

# Second call: must wait → WARNING logged
print("\nSecond call (will trigger WARNING):")
await module.invoke([Message(role="user", content="second")])

# Cleanup
rl_logger.removeHandler(handler)

---
## 10. Concurrent Safety

The `asyncio.Lock` ensures multiple concurrent agents sharing one bucket don't corrupt token counts.

In [None]:
# Simulate 5 concurrent agents competing for 3 tokens
bucket = TokenBucket(capacity=3, refill_rate=100.0)  # Fast refill for demo

# Drain all tokens first
for _ in range(3):
    await bucket.acquire()

print(f"Tokens after drain: {bucket._tokens}")
print("Launching 5 concurrent acquires...\n")

# All 5 compete for tokens simultaneously
results = await asyncio.gather(
    bucket.acquire(),
    bucket.acquire(),
    bucket.acquire(),
    bucket.acquire(),
    bucket.acquire(),
)

print(f"Wait times: {[f'{w:.4f}s' for w in results]}")
print(f"Tokens after: {bucket._tokens:.1f} (>= 0 guaranteed)")
print(f"All floats: {all(isinstance(w, float) for w in results)}")

The lock is held **only** during the refill+check. Sleep happens **outside** the lock:

```python
while True:
    async with self._lock:        # Hold lock briefly
        self._refill()
        if self._tokens >= 1.0:
            self._tokens -= 1.0
            return total_wait
        wait_seconds = ...        # Calculate wait
    
    await asyncio.sleep(wait)     # Sleep OUTSIDE lock
    total_wait += wait_seconds    # Loop back to re-check
```

This prevents one sleeping caller from blocking others.

---
## 11. Registry Integration

`load_model()` now accepts `rate_limit` kwarg with the same 4-level config resolution.

In [None]:
from arcllm.registry import load_model, clear_cache
import os

os.environ.setdefault("ANTHROPIC_API_KEY", "test-key")
os.environ.setdefault("OPENAI_API_KEY", "test-key")
clear_cache()

# Enable rate limiting
model = load_model("anthropic", rate_limit=True)
print(f"Type: {type(model).__name__}")
print(f"Inner: {type(model._inner).__name__}")

In [None]:
clear_cache()

# Custom RPM
model = load_model("anthropic", rate_limit={"requests_per_minute": 120})
print(f"Custom RPM: bucket capacity = {model._bucket._capacity}")

In [None]:
clear_cache()

# Full stack: Retry(Fallback(RateLimit(adapter)))
model = load_model("anthropic", retry=True, fallback=True, rate_limit=True)
print(f"Layer 1 (outer): {type(model).__name__}")
print(f"Layer 2:         {type(model._inner).__name__}")
print(f"Layer 3:         {type(model._inner._inner).__name__}")
print(f"Layer 4 (inner): {type(model._inner._inner._inner).__name__}")

Stacking order (innermost first):
1. **Adapter** — makes the actual HTTP call
2. **RateLimit** — throttles before call goes out
3. **Fallback** — switches provider on failure
4. **Retry** — retries transient failures

Why RateLimit is innermost:
- Throttles *before* the call → prevents hitting the API too fast
- Fallback doesn't trigger on rate-limit waits (it's just a delay, not an error)
- Retry covers failures *after* the rate-limited call succeeds in getting through

---
## 12. Agent Code Unchanged

The agent never knows rate limiting exists — same `model.invoke()` call.

In [None]:
# Agent code is identical with or without rate limiting:

# Without rate limiting
# model = load_model("anthropic")

# With rate limiting
# model = load_model("anthropic", rate_limit=True)

# Agent code (unchanged either way):
# response = await model.invoke(messages, tools)
# if response.stop_reason == "tool_use":
#     ...

print("Agent code is the same with or without rate_limit=True")
print("The only difference: calls may be slightly delayed when throttled")

---
## 13. Mocked Rate-Limited Cycle

Simulate what happens when an agent hits the rate limit.

In [None]:
from unittest.mock import patch as mock_patch

clear_buckets()

inner = MagicMock(spec=LLMProvider)
inner.name = "anthropic"
inner.invoke = AsyncMock(return_value=LLMResponse(
    content="I can help with that!",
    usage=Usage(input_tokens=20, output_tokens=15, total_tokens=35),
    model="claude-sonnet-4-20250514",
    stop_reason="end_turn",
))

# burst_capacity=2, so 3rd request must wait
module = RateLimitModule({"requests_per_minute": 60, "burst_capacity": 2}, inner)

for i in range(4):
    msg = [Message(role="user", content=f"Agent request #{i+1}")]
    result = await module.invoke(msg)
    print(f"Request {i+1}: '{result.content}' (bucket: {module._bucket._tokens:.1f} tokens)")

---
## 14. Implementation Details

Complete source: `src/arcllm/modules/rate_limit.py` (124 lines)

In [None]:
import inspect
from arcllm.modules.rate_limit import TokenBucket, RateLimitModule

# TokenBucket acquire method
print("=== TokenBucket.acquire ===")
print(inspect.getsource(TokenBucket.acquire))

In [None]:
# RateLimitModule invoke method
print("=== RateLimitModule.invoke ===")
print(inspect.getsource(RateLimitModule.invoke))

---
## 15. Live API Call with Rate Limiting

Real Anthropic API call with `rate_limit=True`.

In [None]:
from dotenv import load_dotenv
load_dotenv(os.path.join(os.getcwd(), '..', '.env'))

has_key = bool(os.environ.get("ANTHROPIC_API_KEY"))
print(f"Anthropic API key: {'found' if has_key else 'not found (skip live tests)'}")

In [None]:
if has_key:
    clear_cache()
    
    # Load with rate limiting (60 RPM default)
    model = load_model("anthropic", rate_limit=True)
    print(f"Model type: {type(model).__name__}")
    print(f"Inner type: {type(model._inner).__name__}")
    
    response = await model.invoke([
        Message(role="user", content="What is a token bucket? One sentence.")
    ])
    print(f"\nResponse: {response.content}")
    print(f"Tokens: {response.usage.input_tokens} in / {response.usage.output_tokens} out")
    print(f"Bucket tokens remaining: {model._bucket._tokens:.1f}")
    
    await model.close()
else:
    print("Skipped — no API key")

In [None]:
if has_key:
    clear_cache()
    
    # Full stack: rate limit + retry
    model = load_model("anthropic", rate_limit=True, retry=True)
    stack = type(model).__name__
    inner = type(model._inner).__name__
    innermost = type(model._inner._inner).__name__
    print(f"Stack: {stack}({inner}({innermost}))")
    
    response = await model.invoke([
        Message(role="user", content="Say 'rate limited and retried' in exactly those words.")
    ])
    print(f"Response: {response.content}")
    
    await model._inner._inner.close()  # Close the adapter
else:
    print("Skipped — no API key")

---
## Summary

| Component | What | Why |
|-----------|------|-----|
| `TokenBucket` | Counter + timestamp + async lock | Classic algorithm, allows bursts, enforces average rate |
| `RateLimitModule` | Acquires token before each `invoke()` | Transparent throttling, no agent code changes |
| Shared buckets | Per-provider registry dict | Matches provider reality: limits are per API key |
| `clear_buckets()` | Resets shared state | Test isolation, hooked into `registry.clear_cache()` |
| WARNING log | Emitted when caller waits | Operators see why calls are slower |
| Stack position | Innermost (before Fallback/Retry) | Throttle before call, not after failure |

**Config**:
```python
load_model("anthropic", rate_limit=True)                        # 60 RPM default
load_model("anthropic", rate_limit={"requests_per_minute": 120}) # Custom RPM
load_model("anthropic", rate_limit=False)                        # Disable
```

**Test count**: 244 passed, 1 skipped (25 new rate limiter tests)