# Rate Limiting Strategies with Azure LLM Toolkit

This notebook demonstrates advanced rate limiting strategies to handle Azure OpenAI API limits effectively.

## Topics Covered

1. Understanding Azure OpenAI rate limits
2. Token bucket algorithm
3. Sliding window rate limiting
4. Adaptive rate limiting
5. Concurrent request management
6. Handling 429 errors gracefully
7. Best practices for high-throughput applications

## Azure OpenAI Rate Limits

Azure OpenAI enforces several types of rate limits:
- **TPM (Tokens Per Minute)**: Total tokens processed per minute
- **RPM (Requests Per Minute)**: Total API requests per minute
- **Concurrent Requests**: Maximum simultaneous requests

Exceeding these limits results in HTTP 429 (Too Many Requests) errors.

## Setup

In [None]:
import asyncio
import time
from typing import List

from azure_llm_toolkit import AzureLLMClient, AzureConfig
from azure_llm_toolkit.rate_limiter import RateLimiter, RateLimitConfig

## 1. Basic Rate Limiting

The toolkit includes automatic rate limiting with sensible defaults:

In [None]:
# Create client with default rate limiting
client = AzureLLMClient()

# The client automatically handles rate limits
print("‚úÖ Client created with automatic rate limiting")

### Test Basic Rate Limiting

In [None]:
async def test_basic_rate_limiting():
    """Send multiple requests and observe rate limiting in action."""

    messages = [{"role": "user", "content": "Say 'hello' in one word."}]

    print("Sending 10 requests...")
    start_time = time.time()

    for i in range(10):
        response = await client.chat_completion(messages=messages, max_tokens=10)
        elapsed = time.time() - start_time
        print(f"Request {i + 1}: {response.choices[0].message.content.strip()[:20]} (elapsed: {elapsed:.2f}s)")

    total_time = time.time() - start_time
    print(f"\n‚úÖ Completed 10 requests in {total_time:.2f}s")
    print(f"Average: {total_time / 10:.2f}s per request")


await test_basic_rate_limiting()

## 2. Custom Rate Limit Configuration

Configure rate limits to match your Azure deployment:

In [None]:
# Configure rate limits based on your Azure deployment
rate_limit_config = RateLimitConfig(
    max_requests_per_minute=60,  # RPM limit
    max_tokens_per_minute=90000,  # TPM limit
    max_concurrent_requests=10,  # Concurrent request limit
    retry_max_attempts=5,  # Number of retry attempts
    retry_initial_delay=1.0,  # Initial retry delay (seconds)
    retry_max_delay=60.0,  # Maximum retry delay (seconds)
    retry_exponential_base=2.0,  # Exponential backoff multiplier
)

# Create client with custom configuration
custom_client = AzureLLMClient(rate_limit_config=rate_limit_config)

print("‚úÖ Client created with custom rate limits:")
print(f"  RPM: {rate_limit_config.max_requests_per_minute}")
print(f"  TPM: {rate_limit_config.max_tokens_per_minute}")
print(f"  Concurrent: {rate_limit_config.max_concurrent_requests}")

## 3. Token Bucket Algorithm

The toolkit uses a token bucket algorithm for smooth rate limiting:

In [None]:
# Create a standalone rate limiter to observe its behavior
rate_limiter = RateLimiter(max_requests_per_minute=30, max_tokens_per_minute=10000)


async def demonstrate_token_bucket():
    """Demonstrate token bucket behavior."""

    print("Token Bucket Demonstration")
    print("=" * 50)

    # Try to acquire tokens
    for i in range(5):
        start = time.time()

        # Acquire 100 tokens
        await rate_limiter.acquire(tokens=100)

        elapsed = time.time() - start
        stats = rate_limiter.get_stats()

        print(f"\nAcquisition {i + 1}:")
        print(f"  Wait time: {elapsed:.3f}s")
        print(f"  Tokens available: {stats['tokens_available']:.0f}")
        print(f"  Requests available: {stats['requests_available']:.0f}")


await demonstrate_token_bucket()

## 4. Handling Concurrent Requests

Process multiple requests concurrently while respecting rate limits:

In [None]:
async def process_batch_with_concurrency(prompts: List[str], max_concurrent: int = 5):
    """Process a batch of prompts with controlled concurrency."""

    # Configure client with concurrency limit
    batch_client = AzureLLMClient(rate_limit_config=RateLimitConfig(max_concurrent_requests=max_concurrent))

    async def process_one(prompt: str, idx: int):
        """Process a single prompt."""
        start = time.time()
        response = await batch_client.chat_completion(messages=[{"role": "user", "content": prompt}], max_tokens=50)
        elapsed = time.time() - start
        return {
            "idx": idx,
            "prompt": prompt[:30] + "...",
            "response": response.choices[0].message.content[:50],
            "time": elapsed,
        }

    # Process all prompts concurrently
    start_time = time.time()
    tasks = [process_one(prompt, i) for i, prompt in enumerate(prompts)]
    results = await asyncio.gather(*tasks)
    total_time = time.time() - start_time

    await batch_client.close()

    return results, total_time


# Test with 20 prompts
test_prompts = [f"What is {i} + {i + 1}?" for i in range(20)]

results, total_time = await process_batch_with_concurrency(test_prompts, max_concurrent=5)

print(f"\n‚úÖ Processed {len(results)} prompts in {total_time:.2f}s")
print(f"Average time per request: {total_time / len(results):.2f}s")
print(f"Throughput: {len(results) / total_time:.2f} requests/second")
print(f"\nFirst 3 results:")
for result in results[:3]:
    print(f"  [{result['idx']}] {result['prompt']} -> {result['response'][:30]}...")

## 5. Adaptive Rate Limiting

Automatically adjust rate limits based on API responses:

In [None]:
from azure_llm_toolkit.rate_limiter import AdaptiveRateLimiter

# Create adaptive rate limiter
adaptive_limiter = AdaptiveRateLimiter(
    initial_rpm=60,
    initial_tpm=90000,
    adjustment_factor=0.8,  # Reduce by 20% on 429 errors
    recovery_factor=1.1,  # Increase by 10% on success
)

adaptive_client = AzureLLMClient(rate_limiter=adaptive_limiter)

print("‚úÖ Client created with adaptive rate limiting")
print(f"Initial RPM: {adaptive_limiter.current_rpm}")
print(f"Initial TPM: {adaptive_limiter.current_tpm}")

### Test Adaptive Behavior

In [None]:
async def test_adaptive_limiting():
    """Test adaptive rate limiting behavior."""

    print("Testing adaptive rate limiting...\n")

    for i in range(10):
        try:
            response = await adaptive_client.chat_completion(
                messages=[{"role": "user", "content": f"Count to {i + 1}"}], max_tokens=20
            )

            stats = adaptive_limiter.get_stats()
            print(f"Request {i + 1}: Success")
            print(f"  Current RPM: {stats['current_rpm']:.0f}")
            print(f"  Current TPM: {stats['current_tpm']:.0f}")

        except Exception as e:
            print(f"Request {i + 1}: Error - {str(e)}")
            stats = adaptive_limiter.get_stats()
            print(f"  Adjusted RPM: {stats['current_rpm']:.0f}")
            print(f"  Adjusted TPM: {stats['current_tpm']:.0f}")

        await asyncio.sleep(0.5)


await test_adaptive_limiting()

## 6. Handling 429 Errors

Best practices for handling rate limit errors:

In [None]:
from azure_llm_toolkit.exceptions import RateLimitError


async def safe_request_with_retry(messages: List[dict], max_retries: int = 5):
    """Make a request with custom retry logic."""

    for attempt in range(max_retries):
        try:
            response = await client.chat_completion(messages=messages, max_tokens=100)
            return response

        except RateLimitError as e:
            if attempt < max_retries - 1:
                wait_time = 2**attempt  # Exponential backoff
                print(f"‚ö†Ô∏è  Rate limit hit (attempt {attempt + 1}/{max_retries})")
                print(f"   Waiting {wait_time}s before retry...")
                await asyncio.sleep(wait_time)
            else:
                print(f"‚ùå Failed after {max_retries} attempts")
                raise

        except Exception as e:
            print(f"‚ùå Unexpected error: {str(e)}")
            raise


# Test the retry logic
try:
    response = await safe_request_with_retry(messages=[{"role": "user", "content": "Hello!"}])
    print(f"‚úÖ Success: {response.choices[0].message.content}")
except Exception as e:
    print(f"Final error: {e}")

## 7. Monitoring Rate Limit Usage

Track rate limit utilization in real-time:

In [None]:
def print_rate_limit_stats(client: AzureLLMClient):
    """Print current rate limit statistics."""

    if hasattr(client, "rate_limiter"):
        stats = client.rate_limiter.get_stats()

        print("üìä Rate Limit Statistics")
        print("=" * 50)

        # Request limits
        rpm_used = stats["max_requests_per_minute"] - stats["requests_available"]
        rpm_percent = (rpm_used / stats["max_requests_per_minute"]) * 100
        print(f"Requests:")
        print(f"  Used: {rpm_used:.0f}/{stats['max_requests_per_minute']:.0f} ({rpm_percent:.1f}%)")
        print(f"  Available: {stats['requests_available']:.0f}")

        # Token limits
        tpm_used = stats["max_tokens_per_minute"] - stats["tokens_available"]
        tpm_percent = (tpm_used / stats["max_tokens_per_minute"]) * 100
        print(f"\nTokens:")
        print(f"  Used: {tpm_used:.0f}/{stats['max_tokens_per_minute']:.0f} ({tpm_percent:.1f}%)")
        print(f"  Available: {stats['tokens_available']:.0f}")

        # Concurrent requests
        print(f"\nConcurrent Requests: {stats.get('active_requests', 0)}/{stats.get('max_concurrent', 'N/A')}")

        # Errors
        print(f"\nErrors:")
        print(f"  Rate limit errors: {stats.get('rate_limit_errors', 0)}")
        print(f"  Retry attempts: {stats.get('retry_attempts', 0)}")
    else:
        print("‚ùå Rate limiter not available")


# Print current stats
print_rate_limit_stats(client)

## 8. Batch Processing with Rate Limiting

Use the batch runner for efficient processing of many requests:

In [None]:
from azure_llm_toolkit.batch import ChatBatchRunner, ChatBatchItem


async def process_large_batch():
    """Process a large batch of requests efficiently."""

    # Create batch items
    items = [
        ChatBatchItem(
            id=f"question_{i}", messages=[{"role": "user", "content": f"What is {i} squared?"}], max_tokens=20
        )
        for i in range(50)
    ]

    # Create batch runner with rate limiting
    runner = ChatBatchRunner(
        client=client,
        max_concurrent=10,  # Process 10 at a time
        show_progress=True,  # Show progress bar
    )

    # Process batch
    print("Processing 50 chat completions...\n")
    start_time = time.time()

    results = await runner.run(items)

    total_time = time.time() - start_time

    # Analyze results
    successful = sum(1 for r in results if r.success)
    failed = len(results) - successful

    print(f"\n‚úÖ Batch processing complete!")
    print(f"Total time: {total_time:.2f}s")
    print(f"Successful: {successful}")
    print(f"Failed: {failed}")
    print(f"Throughput: {successful / total_time:.2f} requests/second")

    # Show sample results
    print(f"\nSample results:")
    for result in results[:3]:
        if result.success:
            content = result.response.choices[0].message.content
            print(f"  {result.id}: {content[:50]}...")


await process_large_batch()

## 9. Best Practices Summary

### Do's ‚úÖ

1. **Configure limits based on your deployment**: Check your Azure portal for actual limits
2. **Use batch processing**: For many requests, use `ChatBatchRunner` or `EmbeddingBatchRunner`
3. **Implement exponential backoff**: On 429 errors, wait progressively longer
4. **Monitor usage**: Track rate limit utilization to optimize throughput
5. **Use adaptive limiting**: Let the system adjust automatically based on API responses
6. **Set appropriate concurrency**: More isn't always better; match your deployment capacity

### Don'ts ‚ùå

1. **Don't ignore rate limits**: Always configure them; default values may not match your deployment
2. **Don't retry immediately**: Always wait before retrying after a 429 error
3. **Don't set excessive concurrency**: This wastes resources and increases latency
4. **Don't forget to close clients**: Always call `await client.close()` when done
5. **Don't use synchronous code for high throughput**: Use async for better performance

## 10. Advanced: Dynamic Rate Limit Adjustment

In [None]:
class DynamicRateLimitManager:
    """Dynamically adjust rate limits based on system load."""

    def __init__(self, client: AzureLLMClient):
        self.client = client
        self.error_count = 0
        self.success_count = 0
        self.adjustment_threshold = 5

    async def adaptive_request(self, messages: List[dict], **kwargs):
        """Make a request with adaptive rate limiting."""

        try:
            response = await self.client.chat_completion(messages=messages, **kwargs)
            self.success_count += 1

            # Gradually increase limits on success
            if self.success_count >= self.adjustment_threshold:
                self._increase_limits()
                self.success_count = 0

            return response

        except RateLimitError as e:
            self.error_count += 1

            # Decrease limits on errors
            if self.error_count >= 2:
                self._decrease_limits()
                self.error_count = 0

            raise

    def _increase_limits(self):
        """Increase rate limits by 5%."""
        if hasattr(self.client, "rate_limiter"):
            limiter = self.client.rate_limiter
            limiter.max_requests_per_minute *= 1.05
            limiter.max_tokens_per_minute *= 1.05
            print(
                f"üìà Increased limits: RPM={limiter.max_requests_per_minute:.0f}, TPM={limiter.max_tokens_per_minute:.0f}"
            )

    def _decrease_limits(self):
        """Decrease rate limits by 20%."""
        if hasattr(self.client, "rate_limiter"):
            limiter = self.client.rate_limiter
            limiter.max_requests_per_minute *= 0.8
            limiter.max_tokens_per_minute *= 0.8
            print(
                f"üìâ Decreased limits: RPM={limiter.max_requests_per_minute:.0f}, TPM={limiter.max_tokens_per_minute:.0f}"
            )


# Test dynamic manager
manager = DynamicRateLimitManager(client)

print("Testing dynamic rate limit adjustment...\n")
for i in range(10):
    try:
        response = await manager.adaptive_request(messages=[{"role": "user", "content": f"Number {i}"}], max_tokens=10)
        print(f"‚úÖ Request {i + 1} succeeded")
    except Exception as e:
        print(f"‚ùå Request {i + 1} failed: {str(e)[:50]}")

    await asyncio.sleep(0.5)

## Clean Up

In [None]:
# Close all clients
await client.close()
await custom_client.close()
await adaptive_client.close()

print("‚úÖ All clients closed")

## Next Steps

- **03_cost_optimization.ipynb**: Learn how to minimize API costs
- **04_rag_implementation.ipynb**: Build a RAG system
- **05_agent_patterns.ipynb**: Create intelligent agents

## Resources

- [Azure OpenAI Rate Limits](https://learn.microsoft.com/en-us/azure/ai-services/openai/quotas-limits)
- [Token Bucket Algorithm](https://en.wikipedia.org/wiki/Token_bucket)
- [Examples Directory](https://github.com/tsoernes/azure-llm-toolkit/tree/main/examples)