Skip to content

AsyncTenacityTransport does not release connections on retries #2724

@tobias-bahls

Description

@tobias-bahls

Initial Checks

Description

I found one more issue with AsyncTenacityTransport. It appears like the connection is not released back to the pool on retries, see reproducer. It manifests itself as httpx.PoolTimeout when the attempts exceed the connection pool size.

Example Code

#!/usr/bin/env uv run --script
# /// script
# requires-python = ">=3.13"
# dependencies = [
#     "pydantic-ai==0.8.0",
#     "httpx==0.28.1",
#     "tenacity",
# ]
# ///

import asyncio
import threading
import time
from http.server import HTTPServer, BaseHTTPRequestHandler

import httpx
from httpx import AsyncHTTPTransport
from pydantic_ai.retries import AsyncTenacityTransport, wait_retry_after
from pydantic_ai.retries import RetryConfig as PydanticAiRetryConfig
from tenacity import stop_after_attempt, retry_if_exception_type, wait_exponential


class AlwaysReturnHTTP429Handler(BaseHTTPRequestHandler):
    def do_GET(self):
        self.send_response(429)
        self.send_header('Retry-After', '1')
        self.end_headers()
        self.wfile.write(b'Rate limited')

    def log_message(self, format, *args):
        pass


def start_test_server(port: int = 8429) -> HTTPServer:
    server = HTTPServer(('localhost', port), AlwaysReturnHTTP429Handler)
    
    def run_server():
        server.serve_forever()
    
    server_thread = threading.Thread(target=run_server, daemon=True)
    server_thread.start()
    time.sleep(0.1) 
    return server


def create_retrying_client() -> httpx.AsyncClient:
    def validate_response(response: httpx.Response) -> None:
        response.raise_for_status()

    retry_strategy = PydanticAiRetryConfig(
        stop=stop_after_attempt(5),
        wait=wait_retry_after(max_wait=5, fallback_strategy=wait_exponential(multiplier=1, max=2)),
        retry=retry_if_exception_type(httpx.HTTPStatusError),
        reraise=True,
    )

    transport = AsyncTenacityTransport(
        config=retry_strategy,
        validate_response=validate_response,
        wrapped=AsyncHTTPTransport(
            limits=httpx.Limits(
                max_connections=2,
                max_keepalive_connections=2,
                keepalive_expiry=30
            )
        )
    )
    
    return httpx.AsyncClient(transport=transport)


async def make_request(client: httpx.AsyncClient, url: str, request_id: int):
    print(f"Starting request {request_id}")
    try:
        response = await client.get(url)
        print(f"Request {request_id} completed with status: {response.status_code}")
        return response
    except Exception as e:
        print(f"Request {request_id} failed after all retries: {e}")
        raise


async def main():
    print("Starting server that always returns 429...")
    server = start_test_server(8429)
    test_url = "http://localhost:8429/test"
    
    print("Creating client with 2 connection pool and retry on all requests...")
    client = create_retrying_client()
    
    try:
        print("Making single request (will retry 5 times)...")
        result = await make_request(client, test_url, 1)
        
        if isinstance(result, Exception):
            print(f"Request failed: {result}")
        else:
            print(f"Request completed with status: {result.status_code}")
                
    finally:
        await client.aclose()
        server.shutdown()
        print("Cleanup completed")


if __name__ == "__main__":
    asyncio.run(main())

Python, Pydantic AI & LLM client version

python 3.13, pydantic-ai 0.8.0

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions