# 3.5 OpenAI API Deep Dive — Rate Limiting, Error Handling & Best Practices

## Playground Notebook

Production applications **will** encounter errors. This notebook teaches you to handle every one of them.

| Topic | What You'll Learn |
|-------|-------------------|
| **Rate Limits** | What they are, how tiers work, reading headers |
| **Error Types** | Every error you'll encounter + whether to retry |
| **Retry Logic** | Exponential backoff with jitter |
| **Best Practices** | Timeouts, token management, cost optimization |

> **Model:** `gpt-4o-mini`

---

In [1]:
import os
import time
import random
from dotenv import load_dotenv
from openai import OpenAI
import openai
from IPython.display import display, Markdown, HTML

load_dotenv()

MODEL = "gpt-4o-mini"
client = OpenAI()

print(f"\u2705 Client ready | Model: {MODEL}")

✅ Client ready | Model: gpt-4o-mini


In [2]:
# ============================================================
#  HELPER FUNCTIONS
# ============================================================

def chat(messages, max_tokens=150, **kwargs):
    """Send messages to OpenAI and display the response."""
    start = time.time()
    response = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        max_tokens=max_tokens,
        **kwargs
    )
    elapsed = time.time() - start
    content = response.choices[0].message.content
    if content:
        display(Markdown(content))
    print(f"\n\u23f1\ufe0f {elapsed:.2f}s | Tokens: {response.usage.prompt_tokens}+{response.usage.completion_tokens}={response.usage.total_tokens}")
    return response


def show_messages(messages):
    """Pretty-print the message list being sent."""
    colors = {"system": "#e74c3c", "user": "#3498db", "assistant": "#2ecc71", "tool": "#f39c12"}
    html = ""
    for msg in messages:
        if isinstance(msg, dict):
            role = msg.get("role", "unknown")
            content = msg.get("content", "")
        else:
            role = getattr(msg, "role", "unknown")
            content = getattr(msg, "content", "(empty)")
        if content and len(str(content)) > 200:
            content = str(content)[:200] + "..."
        color = colors.get(role, "#888")
        html += (
            f'<div style="margin:6px 0;padding:8px 12px;border-left:4px solid {color};'
            f'background:#1e1e1e;border-radius:4px;">'
            f'<strong style="color:{color};text-transform:uppercase;">{role}</strong>'
            f'<br><span style="color:#ccc;">{content}</span></div>'
        )
    display(HTML(html))


print("\u2705 Helpers loaded")

✅ Helpers loaded


---

## 1. Rate Limits — Understanding the Boundaries

OpenAI enforces limits across **three dimensions**:

| Dimension | What It Measures |
|-----------|------------------|
| **RPM** | Requests Per Minute |
| **TPM** | Tokens Per Minute |
| **RPD** | Requests Per Day |

### Rate Limit Tiers

| Tier | RPM | TPM | Triggered By |
|------|-----|-----|-------------|
| Free | 3 | 40,000 | New account |
| Tier 1 | 500 | 200,000 | $5 paid |
| Tier 2 | 5,000 | 2,000,000 | $50 paid |
| Tier 3 | 5,000 | 4,000,000 | $100 paid |

### Experiment 1A: Reading Rate Limit Headers

In [3]:
# Make a request and inspect the raw HTTP response headers
response = client.chat.completions.with_raw_response.create(
    model=MODEL,
    messages=[{"role": "user", "content": "Hi"}],
    max_tokens=5
)

# Extract rate limit headers
headers = response.headers
print("\u250c\u2500 RATE LIMIT HEADERS \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510")

rate_headers = [
    ("x-ratelimit-limit-requests",     "Max requests/min"),
    ("x-ratelimit-limit-tokens",       "Max tokens/min"),
    ("x-ratelimit-remaining-requests", "Remaining requests"),
    ("x-ratelimit-remaining-tokens",   "Remaining tokens"),
    ("x-ratelimit-reset-requests",     "Requests reset in"),
    ("x-ratelimit-reset-tokens",       "Tokens reset in"),
]

for header, label in rate_headers:
    value = headers.get(header, "N/A")
    print(f"\u2502 {label:25s} {value}")

print(f"\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518")

# Parse the actual completion too
completion = response.parse()
print(f"\nResponse: {completion.choices[0].message.content}")

┌─ RATE LIMIT HEADERS ────────────────────────────┐
│ Max requests/min          5000
│ Max tokens/min            2000000
│ Remaining requests        4999
│ Remaining tokens          1999997
│ Requests reset in         12ms
│ Tokens reset in           0s
└────────────────────────────────────────────────┘

Response: Hello! How can I


---

## 2. Error Types — Every Error You Will Encounter

| Error | Status | Retry? | Action |
|-------|--------|--------|--------|
| `AuthenticationError` | 401 | \u274c Never | Fix your API key |
| `PermissionDeniedError` | 403 | \u274c Never | Check permissions |
| `NotFoundError` | 404 | \u274c Never | Fix model name/endpoint |
| `BadRequestError` | 400 | \u274c Never | Fix your request |
| `RateLimitError` | 429 | \u2705 Yes | Wait + retry with backoff |
| `InternalServerError` | 500 | \u2705 Yes | Retry with backoff |
| `APIConnectionError` | - | \u2705 Yes | Check network |
| `APITimeoutError` | - | \u2705 Yes | Increase timeout |

### Experiment 2A: Triggering and Catching Common Errors

In [4]:
from openai import (
    AuthenticationError,
    BadRequestError,
    NotFoundError,
    RateLimitError,
    APIConnectionError,
    APITimeoutError,
    APIError
)

# --- 1. AuthenticationError (401) ---
print("1. AuthenticationError (bad key)")
try:
    bad = OpenAI(api_key="sk-invalid")
    bad.chat.completions.create(
        model=MODEL, messages=[{"role": "user", "content": "Hi"}], max_tokens=5
    )
except AuthenticationError as e:
    print(f"   \u274c Caught! Status {e.status_code}: {str(e)[:80]}")

# --- 2. NotFoundError (404) ---
print("\n2. NotFoundError (wrong model name)")
try:
    client.chat.completions.create(
        model="gpt-99-turbo", messages=[{"role": "user", "content": "Hi"}], max_tokens=5
    )
except NotFoundError as e:
    print(f"   \u274c Caught! Status {e.status_code}: model not found")

# --- 3. BadRequestError (400) ---
print("\n3. BadRequestError (invalid request)")
try:
    client.chat.completions.create(
        model=MODEL, messages=[], max_tokens=5  # empty messages!
    )
except BadRequestError as e:
    print(f"   \u274c Caught! Status {e.status_code}: {str(e)[:80]}")

print("\n\u2705 All error types caught correctly!")

1. AuthenticationError (bad key)
   ❌ Caught! Status 401: Error code: 401 - {'error': {'message': 'Incorrect API key provided: sk-invalid.

2. NotFoundError (wrong model name)
   ❌ Caught! Status 404: model not found

3. BadRequestError (invalid request)
   ❌ Caught! Status 400: Error code: 400 - {'error': {'message': "Invalid 'messages': empty array. Expect

✅ All error types caught correctly!


### Experiment 2B: The Complete Error Handler

In [5]:
def safe_chat(messages, max_tokens=80):
    """Make an API call with comprehensive error handling."""
    try:
        response = client.chat.completions.create(
            model=MODEL, messages=messages, max_tokens=max_tokens
        )
        content = response.choices[0].message.content
        display(Markdown(content))
        print(f"\n\u23f1\ufe0f Tokens: {response.usage.prompt_tokens}+{response.usage.completion_tokens}={response.usage.total_tokens}")
        return content

    except AuthenticationError:
        print("\u274c Auth failed — check your API key")
    except BadRequestError as e:
        print(f"\u274c Bad request — fix your input: {e}")
    except NotFoundError:
        print(f"\u274c Model '{MODEL}' not found")
    except RateLimitError:
        print("\u26a0\ufe0f Rate limited — should retry with backoff")
    except APIConnectionError:
        print("\u26a0\ufe0f Connection error — check internet")
    except APITimeoutError:
        print("\u26a0\ufe0f Timeout — try increasing timeout or simplifying request")
    except APIError as e:
        print(f"\u26a0\ufe0f Server error ({e.status_code}) — retry")

    return None


# Test with a valid request
messages = [{"role": "user", "content": "Say hello in 3 words."}]
show_messages(messages)
result = safe_chat(messages)
print(f"\u2705 safe_chat worked!")

Hello there, friend!


⏱️ Tokens: 14+5=19
✅ safe_chat worked!


---

## 3. Retry Logic — Exponential Backoff with Jitter

When a **retriable** error occurs, wait before retrying — each attempt waits longer.

```
Attempt 1: wait 1s
Attempt 2: wait 2s
Attempt 3: wait 4s  (doubles each time)
```

**Jitter** adds randomness to prevent all clients retrying at the same moment (thundering herd problem).

### Experiment 3A: Manual Exponential Backoff

In [6]:
def chat_with_retry(messages, max_retries=3, max_tokens=80):
    """Chat with exponential backoff + jitter for retriable errors."""
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=MODEL, messages=messages, max_tokens=max_tokens
            )
            content = response.choices[0].message.content
            display(Markdown(content))
            print(f"\n\u23f1\ufe0f Tokens: {response.usage.prompt_tokens}+{response.usage.completion_tokens}={response.usage.total_tokens}")
            return content

        except (RateLimitError, APIConnectionError, APITimeoutError) as e:
            if attempt == max_retries:
                print(f"\u274c All {max_retries} retries exhausted.")
                raise

            # Exponential backoff with jitter
            base_wait = 2 ** attempt  # 1, 2, 4, 8...
            jitter = random.uniform(0, base_wait * 0.5)
            wait = base_wait + jitter

            print(f"  \u26a0\ufe0f Attempt {attempt + 1} failed: {type(e).__name__}")
            print(f"     Waiting {wait:.1f}s before retry...")
            time.sleep(wait)

        except (AuthenticationError, BadRequestError, NotFoundError) as e:
            # Non-retriable — fail immediately
            print(f"\u274c Non-retriable error: {type(e).__name__}")
            raise


# Test with a valid request (should succeed on first try)
messages = [{"role": "user", "content": "Name a planet. One word."}]
show_messages(messages)
result = chat_with_retry(messages)
print(f"\u2705 Succeeded on first try!")

Mars.


⏱️ Tokens: 14+2=16
✅ Succeeded on first try!


### Experiment 3B: Using the `tenacity` Library (Production Approach)

In [7]:
try:
    from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type

    @retry(
        wait=wait_exponential(multiplier=1, min=1, max=10),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((RateLimitError, APIConnectionError, APITimeoutError)),
        before_sleep=lambda retry_state: print(f"  \u26a0\ufe0f Retrying in {retry_state.next_action.sleep:.1f}s...")
    )
    def chat_tenacity(messages, max_tokens=80):
        response = client.chat.completions.create(
            model=MODEL, messages=messages, max_tokens=max_tokens
        )
        return response.choices[0].message.content

    result = chat_tenacity([{"role": "user", "content": "Name a color. One word."}])
    print(f"\u2705 tenacity result: {result}")

except ImportError:
    print("\u26a0\ufe0f tenacity not installed. Install with: pip install tenacity")
    print("   The manual retry pattern above works just as well.")

✅ tenacity result: Crimson.


### Visualizing Backoff Timing

In [8]:
# Visualize what exponential backoff with jitter looks like
print("Exponential Backoff Timing (simulated):")
print(f"{'Attempt':<10} {'Base Wait':<12} {'With Jitter':<14} {'Cumulative'}")
print("-" * 50)

cumulative = 0
for i in range(6):
    base = 2 ** i
    jitter = random.uniform(0, base * 0.5)
    actual = base + jitter
    cumulative += actual
    bar = "\u2588" * int(actual * 2)
    print(f"{i+1:<10} {base:<12.1f} {actual:<14.2f} {cumulative:.1f}s  {bar}")

print(f"\n\u2139\ufe0f After 6 retries, total wait \u2248 {cumulative:.0f}s")

Exponential Backoff Timing (simulated):
Attempt    Base Wait    With Jitter    Cumulative
--------------------------------------------------
1          1.0          1.16           1.2s  ██
2          2.0          2.77           3.9s  █████
3          4.0          4.40           8.3s  ████████
4          8.0          10.37          18.7s  ████████████████████
5          16.0         16.83          35.5s  █████████████████████████████████
6          32.0         36.97          72.5s  █████████████████████████████████████████████████████████████████████████

ℹ️ After 6 retries, total wait ≈ 72s


---

## 4. Best Practices

### Experiment 4A: Setting Timeouts

In [9]:
# Client with custom timeout
timed_client = OpenAI(timeout=30.0)  # 30 second timeout

# Or per-request timeout
response = client.chat.completions.create(
    model=MODEL,
    messages=[{"role": "user", "content": "Say hi."}],
    max_tokens=10,
    timeout=10.0  # 10 second timeout for this request only
)
print(f"\u2705 Response: {response.choices[0].message.content}")

print("\nRecommended Timeouts:")
print(f"  {'Use Case':<30} {'Timeout'}")
print(f"  {'-'*30} {'-'*10}")
print(f"  {'Short chat completion':<30} {'10-15s'}")
print(f"  {'Long generation':<30} {'30-60s'}")
print(f"  {'Function calling':<30} {'30s'}")
print(f"  {'Whisper transcription':<30} {'60s'}")
print(f"  {'TTS generation':<30} {'30s'}")

✅ Response: Hi! How can I assist you today?

Recommended Timeouts:
  Use Case                       Timeout
  ------------------------------ ----------
  Short chat completion          10-15s
  Long generation                30-60s
  Function calling               30s
  Whisper transcription          60s
  TTS generation                 30s


### Experiment 4B: Counting Tokens Before Sending

In [10]:
try:
    import tiktoken

    enc = tiktoken.encoding_for_model(MODEL)

    texts = [
        "Hello",
        "Explain quantum computing",
        "Write a detailed essay about the history of artificial intelligence from its origins to modern day.",
    ]

    print(f"{'Text':<70} {'Tokens'}")
    print("-" * 80)
    for text in texts:
        tokens = enc.encode(text)
        display_text = text[:65] + "..." if len(text) > 65 else text
        print(f"{display_text:<70} {len(tokens)}")

    # Show tokenization
    sample = "OpenAI's GPT-4o-mini is cost-effective."
    tokens = enc.encode(sample)
    print(f"\n\u2139\ufe0f Tokenization of: \"{sample}\"")
    print(f"   Tokens ({len(tokens)}): {[enc.decode([t]) for t in tokens]}")

except ImportError:
    print("\u26a0\ufe0f tiktoken not installed. Install with: pip install tiktoken")
    print("   Rule of thumb: 1 token \u2248 4 characters \u2248 0.75 words")

Text                                                                   Tokens
--------------------------------------------------------------------------------
Hello                                                                  1
Explain quantum computing                                              3
Write a detailed essay about the history of artificial intelligen...   17

ℹ️ Tokenization of: "OpenAI's GPT-4o-mini is cost-effective."
   Tokens (12): ['Open', 'AI', "'s", ' GPT', '-', '4', 'o', '-mini', ' is', ' cost', '-effective', '.']


### Experiment 4C: Tracking API Costs

In [11]:
# Simple cost tracker
class CostTracker:
    # gpt-4o-mini pricing (per 1M tokens)
    PRICING = {
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "gpt-4o":      {"input": 2.50, "output": 10.00},
    }

    def __init__(self):
        self.total_input = 0
        self.total_output = 0
        self.total_cost = 0.0
        self.requests = 0

    def track(self, response):
        usage = response.usage
        model = response.model
        prices = self.PRICING.get(model, self.PRICING["gpt-4o-mini"])

        input_cost = (usage.prompt_tokens / 1_000_000) * prices["input"]
        output_cost = (usage.completion_tokens / 1_000_000) * prices["output"]
        cost = input_cost + output_cost

        self.total_input += usage.prompt_tokens
        self.total_output += usage.completion_tokens
        self.total_cost += cost
        self.requests += 1
        return cost

    def summary(self):
        print(f"\u250c\u2500 COST SUMMARY \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510")
        print(f"\u2502 Requests:      {self.requests}")
        print(f"\u2502 Input tokens:  {self.total_input:,}")
        print(f"\u2502 Output tokens: {self.total_output:,}")
        print(f"\u2502 Total cost:    ${self.total_cost:.6f}")
        print(f"\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518")


tracker = CostTracker()

# Make a few requests and track costs
for q in ["What is AI?", "Name 3 colors.", "Say hello."]:
    resp = client.chat.completions.create(
        model=MODEL,
        messages=[{"role": "user", "content": q}],
        max_tokens=30
    )
    cost = tracker.track(resp)
    print(f"  \"{q}\" \u2192 ${cost:.6f}")

print()
tracker.summary()

  "What is AI?" → $0.000020
  "Name 3 colors." → $0.000006
  "Say hello." → $0.000007

┌─ COST SUMMARY ───────────────────┐
│ Requests:      3
│ Input tokens:  33
│ Output tokens: 46
│ Total cost:    $0.000033
└─────────────────────────────────┘


### Experiment 4D: Managing Conversation History Size

In [12]:
def trim_history(messages, max_messages=10):
    """Keep conversation within bounds. Always preserve system message."""
    if len(messages) <= max_messages:
        return messages

    # Keep system message + last N messages
    system = [m for m in messages if m["role"] == "system"]
    non_system = [m for m in messages if m["role"] != "system"]
    trimmed = system + non_system[-(max_messages - len(system)):]

    print(f"  \u2702\ufe0f Trimmed {len(messages)} \u2192 {len(trimmed)} messages")
    return trimmed


# Simulate a growing conversation
history = [{"role": "system", "content": "Be concise."}]

for i in range(8):
    history.append({"role": "user", "content": f"Message {i+1}"})
    history.append({"role": "assistant", "content": f"Reply {i+1}"})

print(f"Before trim: {len(history)} messages")
trimmed = trim_history(history, max_messages=7)
print(f"After trim:  {len(trimmed)} messages")
print(f"Kept: system + messages {[m['content'] for m in trimmed[1:]]}")

Before trim: 17 messages
  ✂️ Trimmed 17 → 7 messages
After trim:  7 messages
Kept: system + messages ['Message 6', 'Reply 6', 'Message 7', 'Reply 7', 'Message 8', 'Reply 8']


---

## 5. Production-Ready Client Pattern

In [13]:
class RobustOpenAI:
    """Production-ready OpenAI client with retry, timeout, and cost tracking."""

    def __init__(self, model="gpt-4o-mini", max_retries=3, timeout=30.0):
        self.client = OpenAI(timeout=timeout)
        self.model = model
        self.max_retries = max_retries
        self.tracker = CostTracker()

    def chat(self, messages, max_tokens=100):
        for attempt in range(self.max_retries + 1):
            try:
                response = self.client.chat.completions.create(
                    model=self.model, messages=messages, max_tokens=max_tokens
                )
                self.tracker.track(response)
                return response.choices[0].message.content

            except (RateLimitError, APIConnectionError, APITimeoutError) as e:
                if attempt == self.max_retries:
                    raise
                wait = (2 ** attempt) + random.uniform(0, 1)
                print(f"  Retry {attempt+1}/{self.max_retries} in {wait:.1f}s ({type(e).__name__})")
                time.sleep(wait)

            except (AuthenticationError, BadRequestError, NotFoundError):
                raise  # don't retry


# Use it
ai = RobustOpenAI()

result = ai.chat([{"role": "user", "content": "What is 2+2? Just the number."}], max_tokens=10)
print(f"Result: {result}")

result = ai.chat([{"role": "user", "content": "Name a fruit. One word."}], max_tokens=10)
print(f"Result: {result}")

print()
ai.tracker.summary()

Result: 4
Result: Mango.

┌─ COST SUMMARY ───────────────────┐
│ Requests:      2
│ Input tokens:  32
│ Output tokens: 4
│ Total cost:    $0.000007
└─────────────────────────────────┘


---

## Key Takeaways

| Concept | What to Remember |
|---------|------------------|
| **Rate Limits** | RPM, TPM, RPD — check headers to monitor |
| **Tiers** | Limits grow automatically as you spend more |
| **Non-retriable** | 401, 403, 404, 400 — fix the code, don't retry |
| **Retriable** | 429, 500, connection, timeout — retry with backoff |
| **Backoff** | Exponential: 1s, 2s, 4s, 8s... + random jitter |
| **Jitter** | Random offset prevents thundering herd |
| **Timeouts** | Set per-client or per-request (10-60s typical) |
| **Token counting** | Use `tiktoken` before sending to estimate cost |
| **History trim** | Keep conversation size bounded to manage costs |