From 21321e3976daa3d7eeec654d63662eb3ab5b87e3 Mon Sep 17 00:00:00 2001 From: Reflex Date: Thu, 14 May 2026 23:13:27 +0000 Subject: [PATCH] fix(upload_file): unstick concurrent uploads stalling at 30s on the server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Customers running many concurrent upload_file calls through a single SDK instance were hitting 408 "Request timed out waiting for request data" from mux/Jetty. Triage on 2026-05-14 found the request-body simply wasn't arriving within Jetty's 30 s data-wait window. Three contributing causes, all introduced by recent SDK changes: 1. DEFAULT_CONNECTION_LIMITS was lowered from (100, 20) to (20, 10) in #797. Combined with the new shared process-global pool, parallel uploads divide bandwidth across far fewer TCP connections than before and starve each other on multipart bodies. Restore the prior (100, 20). 2. _should_retry blanket-retries 408, including upload_file. Each retry pushes another large multipart body onto the same exhausted pool and amplifies the stall instead of recovering from it. Carve out upload_file 408 — other endpoints still retry 408 as before. 3. _files.py read the entire PathLike into memory via read_bytes() before the request was built, contradicting the docstring claim of streaming. Hand httpx an open file handle so the multipart encoder reads lazily and the pool slot isn't held during disk I/O. Tests updated to assert IsInstance(io.IOBase) instead of IsBytes() and to close handles. Also exposes a public connection_limits knob on Runloop / AsyncRunloop (and copy()) so customers can raise the cap above 100 for upload-heavy workloads without needing a custom httpx client. Passing connection_limits implicitly opts out of the shared pool, since the shared pool is process-global and can't honor per-client limits. Includes examples/stress_upload_file.py — a standalone repro that fires N concurrent upload_file calls and reports status-code distribution and latency percentiles, so the before/after delta is one command away. Co-Authored-By: Claude Opus 4.7 --- examples/stress_upload_file.py | 216 +++++++++++++++++++++++++ src/runloop_api_client/_base_client.py | 30 ++++ src/runloop_api_client/_client.py | 28 ++++ src/runloop_api_client/_constants.py | 2 +- src/runloop_api_client/_files.py | 13 +- tests/test_client.py | 45 ++++++ tests/test_files.py | 49 ++++-- tests/test_shared_pool.py | 100 ++++++++++++ 8 files changed, 468 insertions(+), 15 deletions(-) create mode 100644 examples/stress_upload_file.py diff --git a/examples/stress_upload_file.py b/examples/stress_upload_file.py new file mode 100644 index 000000000..39ac09e80 --- /dev/null +++ b/examples/stress_upload_file.py @@ -0,0 +1,216 @@ +#!/usr/bin/env -S uv run python +"""Stress test for AsyncRunloop.devboxes.upload_file. + +Reproduces the 408 "Request timed out waiting for request data" pattern seen +in production when many upload_file calls run concurrently through a single +SDK client (shared httpx connection pool). + +Usage: + + export RUNLOOP_API_KEY=... + + # Repro path: create a devbox, fire 200 uploads with 50 in flight at once. + ./examples/stress_upload_file.py --concurrency 50 --total 200 + + # Reuse an existing devbox instead of creating one: + ./examples/stress_upload_file.py --devbox-id dbx_abc... --total 200 + + # Tune file size and SDK retry behavior: + ./examples/stress_upload_file.py --file-size-kb 256 --max-retries 0 +""" + +from __future__ import annotations + +import os +import sys +import time +import asyncio +import argparse +import tempfile +import statistics +from pathlib import Path +from dataclasses import dataclass + +import httpx + +from runloop_api_client import AsyncRunloop +from runloop_api_client._exceptions import APIStatusError + + +@dataclass +class UploadResult: + index: int + started_at: float + finished_at: float + status: str # "ok", "408", "other_error" + detail: str = "" + + @property + def duration_ms(self) -> float: + return (self.finished_at - self.started_at) * 1000.0 + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("--devbox-id", default=os.environ.get("RUNLOOP_DEVBOX_ID"), + help="Reuse an existing devbox (default: create a new one)") + p.add_argument("--base-url", default=os.environ.get("RUNLOOP_BASE_URL"), + help="Override Runloop API base URL") + p.add_argument("--concurrency", type=int, default=50, + help="Number of upload_file calls in flight at once (default: 50)") + p.add_argument("--total", type=int, default=200, + help="Total number of uploads to perform (default: 200)") + p.add_argument("--file-size-kb", type=int, default=64, + help="Size of each uploaded file in KB (default: 64)") + p.add_argument("--max-retries", type=int, default=None, + help="Override SDK max_retries (default: SDK default of 5)") + p.add_argument("--max-connections", type=int, default=None, + help="Override SDK max_connections via connection_limits (default: SDK default of 100)") + p.add_argument("--max-keepalive", type=int, default=None, + help="Override max_keepalive_connections (only meaningful with --max-connections)") + p.add_argument("--shutdown", action="store_true", + help="Shut down the devbox after the run (only if we created it)") + p.add_argument("--blueprint-name", default=None, + help="Blueprint to launch from when creating a devbox") + return p.parse_args() + + +async def ensure_devbox(client: AsyncRunloop, args: argparse.Namespace) -> tuple[str, bool]: + """Return (devbox_id, created_by_us).""" + if args.devbox_id: + print(f"[setup] reusing devbox {args.devbox_id}") + return args.devbox_id, False + + print("[setup] creating a new devbox for the stress run...") + kwargs: dict[str, object] = {} + if args.blueprint_name: + kwargs["blueprint_name"] = args.blueprint_name + devbox = await client.devboxes.create_and_await_running(**kwargs) # type: ignore[arg-type] + print(f"[setup] devbox {devbox.id} is running") + return devbox.id, True + + +async def one_upload( + client: AsyncRunloop, + devbox_id: str, + src_path: Path, + index: int, + sem: asyncio.Semaphore, +) -> UploadResult: + async with sem: + started = time.monotonic() + try: + await client.devboxes.upload_file( + devbox_id, + path=f"/tmp/stress_{index}.bin", + file=src_path, + ) + return UploadResult(index=index, started_at=started, finished_at=time.monotonic(), status="ok") + except APIStatusError as exc: + if exc.status_code == 408: + status = "408" + else: + status = f"http_{exc.status_code}" + return UploadResult( + index=index, + started_at=started, + finished_at=time.monotonic(), + status=status, + detail=str(exc)[:200], + ) + except (httpx.HTTPError, asyncio.TimeoutError) as exc: + return UploadResult( + index=index, + started_at=started, + finished_at=time.monotonic(), + status=f"transport_{type(exc).__name__}", + detail=str(exc)[:200], + ) + + +def summarize(results: list[UploadResult], wall_seconds: float) -> int: + by_status: dict[str, list[UploadResult]] = {} + for r in results: + by_status.setdefault(r.status, []).append(r) + + print() + print("=" * 72) + print(f"Total uploads: {len(results)}") + print(f"Wall time: {wall_seconds:.1f}s") + print(f"Effective throughput:{len(results) / wall_seconds:.2f} uploads/s") + print() + print("Outcome breakdown:") + for status in sorted(by_status): + bucket = by_status[status] + durations = sorted(r.duration_ms for r in bucket) + p50 = statistics.median(durations) + p95 = durations[int(0.95 * (len(durations) - 1))] + p99 = durations[int(0.99 * (len(durations) - 1))] + print(f" {status:>20} count={len(bucket):>5} p50={p50:>8.0f}ms p95={p95:>8.0f}ms p99={p99:>8.0f}ms") + + sample_408 = by_status.get("408", [])[:3] + if sample_408: + print() + print("Sample 408 errors:") + for r in sample_408: + print(f" #{r.index} duration={r.duration_ms:.0f}ms detail={r.detail}") + + success = len(by_status.get("ok", [])) + return 0 if success == len(results) else 1 + + +async def run() -> int: + args = parse_args() + + if "RUNLOOP_API_KEY" not in os.environ: + print("error: RUNLOOP_API_KEY is required", file=sys.stderr) + return 2 + + client_kwargs: dict[str, object] = {} + if args.base_url: + client_kwargs["base_url"] = args.base_url + if args.max_retries is not None: + client_kwargs["max_retries"] = args.max_retries + if args.max_connections is not None: + limits_kwargs: dict[str, int] = {"max_connections": args.max_connections} + if args.max_keepalive is not None: + limits_kwargs["max_keepalive_connections"] = args.max_keepalive + client_kwargs["connection_limits"] = httpx.Limits(**limits_kwargs) + + payload = os.urandom(args.file_size_kb * 1024) + with tempfile.NamedTemporaryFile(prefix="rl-stress-", suffix=".bin", delete=False) as f: + f.write(payload) + src_path = Path(f.name) + print(f"[setup] payload: {src_path} ({len(payload)} bytes)") + + try: + async with AsyncRunloop(**client_kwargs) as client: # type: ignore[arg-type] + devbox_id, created = await ensure_devbox(client, args) + + print( + f"[run] firing {args.total} upload_file calls " + f"(concurrency={args.concurrency}) to {devbox_id}" + ) + sem = asyncio.Semaphore(args.concurrency) + t0 = time.monotonic() + results = await asyncio.gather( + *(one_upload(client, devbox_id, src_path, i, sem) for i in range(args.total)) + ) + wall = time.monotonic() - t0 + + exit_code = summarize(results, wall) + + if created and args.shutdown: + print(f"[teardown] shutting down devbox {devbox_id}") + await client.devboxes.shutdown(devbox_id) + finally: + try: + src_path.unlink() + except OSError: + pass + + return exit_code + + +if __name__ == "__main__": + sys.exit(asyncio.run(run())) diff --git a/src/runloop_api_client/_base_client.py b/src/runloop_api_client/_base_client.py index 88e0bbb3b..0c031265c 100644 --- a/src/runloop_api_client/_base_client.py +++ b/src/runloop_api_client/_base_client.py @@ -870,6 +870,12 @@ def _should_retry(self, response: httpx.Response) -> bool: # Retry on request timeouts. if response.status_code == 408: + # upload_file 408s mean the server gave up waiting for the request body. + # Retrying just queues another large request behind the same exhausted + # connection pool and amplifies the stall. + if response.request.url.path.endswith("/upload_file"): + log.debug("Not retrying upload_file 408 (request-body stall)") + return False log.debug("Retrying due to status code %i", response.status_code) return True @@ -931,6 +937,7 @@ class SyncAPIClient(BaseClient[httpx.Client, Stream[Any]]): _client: httpx.Client _default_stream_cls: type[Stream[Any]] | None = None _uses_shared_pool: bool + _connection_limits: httpx.Limits | None _closed: bool def __init__( @@ -945,6 +952,7 @@ def __init__( custom_query: Mapping[str, object] | None = None, _strict_response_validation: bool, shared_http_pool: bool = True, + connection_limits: httpx.Limits | None = None, ) -> None: if not is_given(timeout): # if the user passed in a custom http client with a non-default @@ -976,10 +984,20 @@ def __init__( ) self._closed = False + self._connection_limits = connection_limits if http_client is not None: self._client = http_client self._uses_shared_pool = False + elif connection_limits is not None: + # Custom limits get a private pool — the shared pool is process-global + # so per-client limits can't be respected through it. + self._client = SyncHttpxClientWrapper( + base_url=base_url, + timeout=cast(Timeout, timeout), + transport=httpx.HTTPTransport(limits=connection_limits, http2=True), + ) + self._uses_shared_pool = False elif shared_http_pool: global _shared_sync_transport with _pool_lock: @@ -1563,6 +1581,7 @@ class AsyncAPIClient(BaseClient[httpx.AsyncClient, AsyncStream[Any]]): _client: httpx.AsyncClient _default_stream_cls: type[AsyncStream[Any]] | None = None _uses_shared_pool: bool + _connection_limits: httpx.Limits | None _closed: bool def __init__( @@ -1577,6 +1596,7 @@ def __init__( custom_headers: Mapping[str, str] | None = None, custom_query: Mapping[str, object] | None = None, shared_http_pool: bool = True, + connection_limits: httpx.Limits | None = None, ) -> None: if not is_given(timeout): # if the user passed in a custom http client with a non-default @@ -1608,10 +1628,20 @@ def __init__( ) self._closed = False + self._connection_limits = connection_limits if http_client is not None: self._client = http_client self._uses_shared_pool = False + elif connection_limits is not None: + # Custom limits get a private pool — the shared pool is process-global + # so per-client limits can't be respected through it. + self._client = AsyncHttpxClientWrapper( + base_url=base_url, + timeout=cast(Timeout, timeout), + transport=httpx.AsyncHTTPTransport(limits=connection_limits, http2=True), + ) + self._uses_shared_pool = False elif shared_http_pool: try: loop: asyncio.AbstractEventLoop | None = asyncio.get_running_loop() diff --git a/src/runloop_api_client/_client.py b/src/runloop_api_client/_client.py index 9e422721e..5c1425ac2 100644 --- a/src/runloop_api_client/_client.py +++ b/src/runloop_api_client/_client.py @@ -94,6 +94,13 @@ def __init__( # Enables HTTP/2 multiplexing and avoids ConnectTimeout storms under high concurrency. # Set to False to create a private connection pool (old behavior). shared_http_pool: bool = True, + # Override the httpx connection pool limits (max concurrent connections and + # keep-alive size). When provided, the client gets a private pool with these + # limits and `shared_http_pool` is implicitly disabled — the shared pool is + # process-global so per-client limits can't be expressed through it. + # Use this to raise the cap above the default 100 for upload-heavy workloads, + # or to lower it in memory-constrained environments. + connection_limits: httpx.Limits | None = None, # Enable or disable schema validation for data returned by the API. # When enabled an error APIResponseValidationError is raised # if the API responds with invalid data for the expected schema. @@ -140,6 +147,7 @@ def __init__( custom_query=default_query, _strict_response_validation=_strict_response_validation, shared_http_pool=shared_http_pool, + connection_limits=connection_limits, ) self._idempotency_header = "x-request-id" @@ -276,6 +284,7 @@ def copy( timeout: float | Timeout | None | NotGiven = not_given, http_client: httpx.Client | None = None, shared_http_pool: bool | None = None, + connection_limits: httpx.Limits | None | NotGiven = not_given, max_retries: int | NotGiven = not_given, default_headers: Mapping[str, str] | None = None, set_default_headers: Mapping[str, str] | None = None, @@ -304,8 +313,12 @@ def copy( elif set_default_query is not None: params = set_default_query + resolved_limits = self._connection_limits if isinstance(connection_limits, NotGiven) else connection_limits + if http_client is not None: resolved_shared = False + elif resolved_limits is not None: + resolved_shared = False elif shared_http_pool is not None: resolved_shared = shared_http_pool else: @@ -317,6 +330,7 @@ def copy( timeout=self.timeout if isinstance(timeout, NotGiven) else timeout, http_client=http_client, shared_http_pool=resolved_shared, + connection_limits=resolved_limits, max_retries=max_retries if is_given(max_retries) else self.max_retries, default_headers=headers, default_query=params, @@ -382,6 +396,13 @@ def __init__( # Enables HTTP/2 multiplexing and avoids ConnectTimeout storms under high concurrency. # Set to False to create a private connection pool (old behavior). shared_http_pool: bool = True, + # Override the httpx connection pool limits (max concurrent connections and + # keep-alive size). When provided, the client gets a private pool with these + # limits and `shared_http_pool` is implicitly disabled — the shared pool is + # process-global so per-client limits can't be expressed through it. + # Use this to raise the cap above the default 100 for upload-heavy workloads, + # or to lower it in memory-constrained environments. + connection_limits: httpx.Limits | None = None, # Enable or disable schema validation for data returned by the API. # When enabled an error APIResponseValidationError is raised # if the API responds with invalid data for the expected schema. @@ -428,6 +449,7 @@ def __init__( custom_query=default_query, _strict_response_validation=_strict_response_validation, shared_http_pool=shared_http_pool, + connection_limits=connection_limits, ) self._idempotency_header = "x-request-id" @@ -564,6 +586,7 @@ def copy( timeout: float | Timeout | None | NotGiven = not_given, http_client: httpx.AsyncClient | None = None, shared_http_pool: bool | None = None, + connection_limits: httpx.Limits | None | NotGiven = not_given, max_retries: int | NotGiven = not_given, default_headers: Mapping[str, str] | None = None, set_default_headers: Mapping[str, str] | None = None, @@ -592,8 +615,12 @@ def copy( elif set_default_query is not None: params = set_default_query + resolved_limits = self._connection_limits if isinstance(connection_limits, NotGiven) else connection_limits + if http_client is not None: resolved_shared = False + elif resolved_limits is not None: + resolved_shared = False elif shared_http_pool is not None: resolved_shared = shared_http_pool else: @@ -605,6 +632,7 @@ def copy( timeout=self.timeout if isinstance(timeout, NotGiven) else timeout, http_client=http_client, shared_http_pool=resolved_shared, + connection_limits=resolved_limits, max_retries=max_retries if is_given(max_retries) else self.max_retries, default_headers=headers, default_query=params, diff --git a/src/runloop_api_client/_constants.py b/src/runloop_api_client/_constants.py index 88f944ce2..d6361c8ad 100644 --- a/src/runloop_api_client/_constants.py +++ b/src/runloop_api_client/_constants.py @@ -8,7 +8,7 @@ # default timeout is 30 seconds DEFAULT_TIMEOUT = httpx.Timeout(timeout=30, connect=5.0) DEFAULT_MAX_RETRIES = 5 -DEFAULT_CONNECTION_LIMITS = httpx.Limits(max_connections=20, max_keepalive_connections=10) +DEFAULT_CONNECTION_LIMITS = httpx.Limits(max_connections=100, max_keepalive_connections=20) INITIAL_RETRY_DELAY = 1.0 MAX_RETRY_DELAY = 60.0 diff --git a/src/runloop_api_client/_files.py b/src/runloop_api_client/_files.py index 1a013e434..aea6ca730 100644 --- a/src/runloop_api_client/_files.py +++ b/src/runloop_api_client/_files.py @@ -66,7 +66,11 @@ def _transform_file(file: FileTypes) -> HttpxFileTypes: if is_file_content(file): if isinstance(file, os.PathLike): path = pathlib.Path(file) - return (path.name, path.read_bytes()) + # Hand httpx an open file handle so the multipart encoder reads + # lazily. read_bytes() buffers the entire file in memory before + # the request is even built, which holds a connection-pool slot + # during the read and compounds upload_file stalls under concurrency. + return (path.name, path.open("rb")) return file @@ -107,8 +111,11 @@ async def async_to_httpx_files(files: RequestFiles | None) -> HttpxRequestFiles async def _async_transform_file(file: FileTypes) -> HttpxFileTypes: if is_file_content(file): if isinstance(file, os.PathLike): - path = anyio.Path(file) - return (path.name, await path.read_bytes()) + path = pathlib.Path(file) + # Same rationale as the sync path: avoid buffering the file in + # memory. httpx's multipart encoder will read from this handle in + # chunks as it serializes the request body. + return (path.name, path.open("rb")) return file diff --git a/tests/test_client.py b/tests/test_client.py index 7728bf5bb..cc89603b5 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1082,6 +1082,51 @@ def test_follow_redirects_disabled(self, respx_mock: MockRouter, client: Runloop assert exc_info.value.response.status_code == 302 assert exc_info.value.response.headers["Location"] == f"{base_url}/redirected" + @mock.patch("runloop_api_client._base_client.BaseClient._calculate_retry_timeout", _low_retry_timeout) + @pytest.mark.respx(base_url=base_url) + def test_upload_file_408_does_not_retry(self, respx_mock: MockRouter, client: Runloop) -> None: + # 408 on upload_file means the server gave up waiting for the request body. + # Retrying piles another large request onto the same exhausted connection + # pool and amplifies the stall, so the SDK must NOT auto-retry these. + client = client.with_options(max_retries=4) + + call_count = 0 + + def handler(_request: httpx.Request) -> httpx.Response: + nonlocal call_count + call_count += 1 + return httpx.Response(408) + + respx_mock.post("/v1/devboxes/dbx_test/upload_file").mock(side_effect=handler) + + with pytest.raises(APIStatusError) as exc_info: + client.devboxes.upload_file(id="dbx_test", path="/tmp/foo", file=b"payload") + + assert exc_info.value.response.status_code == 408 + assert call_count == 1, f"upload_file 408 should not retry, got {call_count} attempts" + + @mock.patch("runloop_api_client._base_client.BaseClient._calculate_retry_timeout", _low_retry_timeout) + @pytest.mark.respx(base_url=base_url) + def test_non_upload_file_408_still_retries(self, respx_mock: MockRouter, client: Runloop) -> None: + # Sanity check that the upload_file carve-out didn't break the general + # 408-retries-by-default behavior for other routes. + client = client.with_options(max_retries=2) + + call_count = 0 + + def handler(_request: httpx.Request) -> httpx.Response: + nonlocal call_count + call_count += 1 + return httpx.Response(408) + + respx_mock.post("/v1/devboxes").mock(side_effect=handler) + + with pytest.raises(APIStatusError): + client.devboxes.create() + + # initial attempt + 2 retries + assert call_count == 3 + class TestAsyncRunloop: @pytest.mark.respx(base_url=base_url) diff --git a/tests/test_files.py b/tests/test_files.py index b5ee73ae2..11a377ba7 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -1,8 +1,9 @@ +import io from pathlib import Path import anyio import pytest -from dirty_equals import IsDict, IsList, IsBytes, IsTuple +from dirty_equals import IsDict, IsList, IsTuple, IsInstance from runloop_api_client._files import to_httpx_files, deepcopy_with_paths, async_to_httpx_files from runloop_api_client._utils import extract_files @@ -10,37 +11,63 @@ readme_path = Path(__file__).parent.parent.joinpath("README.md") +def _close_file_handles(value: object) -> None: + """Recursively close any open file handles in to_httpx_files output. + + The transform now returns streaming file handles instead of bytes, so tests + that don't actually issue a request must close them to avoid resource warnings. + """ + if isinstance(value, io.IOBase): + value.close() + elif isinstance(value, dict): + for v in value.values(): + _close_file_handles(v) + elif isinstance(value, (list, tuple)): + for v in value: + _close_file_handles(v) + + def test_pathlib_includes_file_name() -> None: result = to_httpx_files({"file": readme_path}) - print(result) - assert result == IsDict({"file": IsTuple("README.md", IsBytes())}) + try: + assert result == IsDict({"file": IsTuple("README.md", IsInstance(io.IOBase))}) + finally: + _close_file_handles(result) def test_tuple_input() -> None: result = to_httpx_files([("file", readme_path)]) - print(result) - assert result == IsList(IsTuple("file", IsTuple("README.md", IsBytes()))) + try: + assert result == IsList(IsTuple("file", IsTuple("README.md", IsInstance(io.IOBase)))) + finally: + _close_file_handles(result) @pytest.mark.asyncio async def test_async_pathlib_includes_file_name() -> None: result = await async_to_httpx_files({"file": readme_path}) - print(result) - assert result == IsDict({"file": IsTuple("README.md", IsBytes())}) + try: + assert result == IsDict({"file": IsTuple("README.md", IsInstance(io.IOBase))}) + finally: + _close_file_handles(result) @pytest.mark.asyncio async def test_async_supports_anyio_path() -> None: result = await async_to_httpx_files({"file": anyio.Path(readme_path)}) - print(result) - assert result == IsDict({"file": IsTuple("README.md", IsBytes())}) + try: + assert result == IsDict({"file": IsTuple("README.md", IsInstance(io.IOBase))}) + finally: + _close_file_handles(result) @pytest.mark.asyncio async def test_async_tuple_input() -> None: result = await async_to_httpx_files([("file", readme_path)]) - print(result) - assert result == IsList(IsTuple("file", IsTuple("README.md", IsBytes()))) + try: + assert result == IsList(IsTuple("file", IsTuple("README.md", IsInstance(io.IOBase)))) + finally: + _close_file_handles(result) def test_string_not_allowed() -> None: diff --git a/tests/test_shared_pool.py b/tests/test_shared_pool.py index 4220f8ba9..261db94ae 100644 --- a/tests/test_shared_pool.py +++ b/tests/test_shared_pool.py @@ -288,6 +288,106 @@ async def test_copy_with_custom_client_disables_sharing(self): assert c2._client is custom +class TestSyncConnectionLimits: + def test_custom_limits_force_private_pool(self): + limits = httpx.Limits(max_connections=42, max_keepalive_connections=7) + c = _make_client(connection_limits=limits) + + assert c._uses_shared_pool is False + assert c._connection_limits is limits + transport = _get_transport(c) + # Real httpx.HTTPTransport (not the shared wrapper) + assert isinstance(transport, httpx.HTTPTransport) + # Pool actually got the requested limits + assert transport._pool._max_connections == 42 + assert transport._pool._max_keepalive_connections == 7 + + c.close() + + def test_custom_limits_override_shared_pool_request(self): + # Even with shared_http_pool=True (default), explicit limits take precedence + # and the client gets a private pool. + limits = httpx.Limits(max_connections=5) + c = _make_client(shared_http_pool=True, connection_limits=limits) + + assert c._uses_shared_pool is False + + c.close() + + def test_copy_inherits_connection_limits(self): + limits = httpx.Limits(max_connections=25) + c1 = _make_client(connection_limits=limits) + c2 = c1.copy() + + assert c2._connection_limits is limits + assert c2._uses_shared_pool is False + assert _get_transport(c2)._pool._max_connections == 25 + + c1.close() + c2.close() + + def test_copy_can_override_connection_limits(self): + c1 = _make_client(connection_limits=httpx.Limits(max_connections=25)) + new_limits = httpx.Limits(max_connections=200, max_keepalive_connections=50) + c2 = c1.copy(connection_limits=new_limits) + + assert c2._connection_limits is new_limits + assert _get_transport(c2)._pool._max_connections == 200 + + c1.close() + c2.close() + + def test_copy_can_reset_connection_limits(self): + c1 = _make_client(connection_limits=httpx.Limits(max_connections=25)) + # Passing None resets to the SDK default limits but keeps the inherited + # sharing mode (parent was private, so the copy stays private). Use + # shared_http_pool=True alongside to also re-enable sharing. + c2 = c1.copy(connection_limits=None) + + assert c2._connection_limits is None + assert c2._uses_shared_pool is False + + c3 = c1.copy(connection_limits=None, shared_http_pool=True) + assert c3._connection_limits is None + assert c3._uses_shared_pool is True + + c1.close() + c2.close() + c3.close() + + +class TestAsyncConnectionLimits: + async def test_custom_limits_force_private_pool(self): + limits = httpx.Limits(max_connections=42, max_keepalive_connections=7) + c = _make_async_client(connection_limits=limits) + + assert c._uses_shared_pool is False + assert c._connection_limits is limits + transport = _get_transport(c) + assert isinstance(transport, httpx.AsyncHTTPTransport) + assert transport._pool._max_connections == 42 + assert transport._pool._max_keepalive_connections == 7 + + await c.close() + + async def test_custom_limits_override_shared_pool_request(self): + limits = httpx.Limits(max_connections=5) + c = _make_async_client(shared_http_pool=True, connection_limits=limits) + + assert c._uses_shared_pool is False + + await c.close() + + async def test_copy_inherits_connection_limits(self): + limits = httpx.Limits(max_connections=25) + c1 = _make_async_client(connection_limits=limits) + c2 = c1.copy() + + assert c2._connection_limits is limits + assert c2._uses_shared_pool is False + assert _get_transport(c2)._pool._max_connections == 25 + + class TestAsyncCrossLoop: def test_separate_loops_get_separate_transports(self): """Clients created in different asyncio.run() calls must not share a transport."""