diff --git a/examples/stress_upload_file.py b/examples/stress_upload_file.py new file mode 100644 index 000000000..39ac09e80 --- /dev/null +++ b/examples/stress_upload_file.py @@ -0,0 +1,216 @@ +#!/usr/bin/env -S uv run python +"""Stress test for AsyncRunloop.devboxes.upload_file. + +Reproduces the 408 "Request timed out waiting for request data" pattern seen +in production when many upload_file calls run concurrently through a single +SDK client (shared httpx connection pool). + +Usage: + + export RUNLOOP_API_KEY=... + + # Repro path: create a devbox, fire 200 uploads with 50 in flight at once. + ./examples/stress_upload_file.py --concurrency 50 --total 200 + + # Reuse an existing devbox instead of creating one: + ./examples/stress_upload_file.py --devbox-id dbx_abc... --total 200 + + # Tune file size and SDK retry behavior: + ./examples/stress_upload_file.py --file-size-kb 256 --max-retries 0 +""" + +from __future__ import annotations + +import os +import sys +import time +import asyncio +import argparse +import tempfile +import statistics +from pathlib import Path +from dataclasses import dataclass + +import httpx + +from runloop_api_client import AsyncRunloop +from runloop_api_client._exceptions import APIStatusError + + +@dataclass +class UploadResult: + index: int + started_at: float + finished_at: float + status: str # "ok", "408", "other_error" + detail: str = "" + + @property + def duration_ms(self) -> float: + return (self.finished_at - self.started_at) * 1000.0 + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("--devbox-id", default=os.environ.get("RUNLOOP_DEVBOX_ID"), + help="Reuse an existing devbox (default: create a new one)") + p.add_argument("--base-url", default=os.environ.get("RUNLOOP_BASE_URL"), + help="Override Runloop API base URL") + p.add_argument("--concurrency", type=int, default=50, + help="Number of upload_file calls in flight at once (default: 50)") + p.add_argument("--total", type=int, default=200, + help="Total number of uploads to perform (default: 200)") + p.add_argument("--file-size-kb", type=int, default=64, + help="Size of each uploaded file in KB (default: 64)") + p.add_argument("--max-retries", type=int, default=None, + help="Override SDK max_retries (default: SDK default of 5)") + p.add_argument("--max-connections", type=int, default=None, + help="Override SDK max_connections via connection_limits (default: SDK default of 100)") + p.add_argument("--max-keepalive", type=int, default=None, + help="Override max_keepalive_connections (only meaningful with --max-connections)") + p.add_argument("--shutdown", action="store_true", + help="Shut down the devbox after the run (only if we created it)") + p.add_argument("--blueprint-name", default=None, + help="Blueprint to launch from when creating a devbox") + return p.parse_args() + + +async def ensure_devbox(client: AsyncRunloop, args: argparse.Namespace) -> tuple[str, bool]: + """Return (devbox_id, created_by_us).""" + if args.devbox_id: + print(f"[setup] reusing devbox {args.devbox_id}") + return args.devbox_id, False + + print("[setup] creating a new devbox for the stress run...") + kwargs: dict[str, object] = {} + if args.blueprint_name: + kwargs["blueprint_name"] = args.blueprint_name + devbox = await client.devboxes.create_and_await_running(**kwargs) # type: ignore[arg-type] + print(f"[setup] devbox {devbox.id} is running") + return devbox.id, True + + +async def one_upload( + client: AsyncRunloop, + devbox_id: str, + src_path: Path, + index: int, + sem: asyncio.Semaphore, +) -> UploadResult: + async with sem: + started = time.monotonic() + try: + await client.devboxes.upload_file( + devbox_id, + path=f"/tmp/stress_{index}.bin", + file=src_path, + ) + return UploadResult(index=index, started_at=started, finished_at=time.monotonic(), status="ok") + except APIStatusError as exc: + if exc.status_code == 408: + status = "408" + else: + status = f"http_{exc.status_code}" + return UploadResult( + index=index, + started_at=started, + finished_at=time.monotonic(), + status=status, + detail=str(exc)[:200], + ) + except (httpx.HTTPError, asyncio.TimeoutError) as exc: + return UploadResult( + index=index, + started_at=started, + finished_at=time.monotonic(), + status=f"transport_{type(exc).__name__}", + detail=str(exc)[:200], + ) + + +def summarize(results: list[UploadResult], wall_seconds: float) -> int: + by_status: dict[str, list[UploadResult]] = {} + for r in results: + by_status.setdefault(r.status, []).append(r) + + print() + print("=" * 72) + print(f"Total uploads: {len(results)}") + print(f"Wall time: {wall_seconds:.1f}s") + print(f"Effective throughput:{len(results) / wall_seconds:.2f} uploads/s") + print() + print("Outcome breakdown:") + for status in sorted(by_status): + bucket = by_status[status] + durations = sorted(r.duration_ms for r in bucket) + p50 = statistics.median(durations) + p95 = durations[int(0.95 * (len(durations) - 1))] + p99 = durations[int(0.99 * (len(durations) - 1))] + print(f" {status:>20} count={len(bucket):>5} p50={p50:>8.0f}ms p95={p95:>8.0f}ms p99={p99:>8.0f}ms") + + sample_408 = by_status.get("408", [])[:3] + if sample_408: + print() + print("Sample 408 errors:") + for r in sample_408: + print(f" #{r.index} duration={r.duration_ms:.0f}ms detail={r.detail}") + + success = len(by_status.get("ok", [])) + return 0 if success == len(results) else 1 + + +async def run() -> int: + args = parse_args() + + if "RUNLOOP_API_KEY" not in os.environ: + print("error: RUNLOOP_API_KEY is required", file=sys.stderr) + return 2 + + client_kwargs: dict[str, object] = {} + if args.base_url: + client_kwargs["base_url"] = args.base_url + if args.max_retries is not None: + client_kwargs["max_retries"] = args.max_retries + if args.max_connections is not None: + limits_kwargs: dict[str, int] = {"max_connections": args.max_connections} + if args.max_keepalive is not None: + limits_kwargs["max_keepalive_connections"] = args.max_keepalive + client_kwargs["connection_limits"] = httpx.Limits(**limits_kwargs) + + payload = os.urandom(args.file_size_kb * 1024) + with tempfile.NamedTemporaryFile(prefix="rl-stress-", suffix=".bin", delete=False) as f: + f.write(payload) + src_path = Path(f.name) + print(f"[setup] payload: {src_path} ({len(payload)} bytes)") + + try: + async with AsyncRunloop(**client_kwargs) as client: # type: ignore[arg-type] + devbox_id, created = await ensure_devbox(client, args) + + print( + f"[run] firing {args.total} upload_file calls " + f"(concurrency={args.concurrency}) to {devbox_id}" + ) + sem = asyncio.Semaphore(args.concurrency) + t0 = time.monotonic() + results = await asyncio.gather( + *(one_upload(client, devbox_id, src_path, i, sem) for i in range(args.total)) + ) + wall = time.monotonic() - t0 + + exit_code = summarize(results, wall) + + if created and args.shutdown: + print(f"[teardown] shutting down devbox {devbox_id}") + await client.devboxes.shutdown(devbox_id) + finally: + try: + src_path.unlink() + except OSError: + pass + + return exit_code + + +if __name__ == "__main__": + sys.exit(asyncio.run(run())) diff --git a/src/runloop_api_client/_base_client.py b/src/runloop_api_client/_base_client.py index 88e0bbb3b..0c031265c 100644 --- a/src/runloop_api_client/_base_client.py +++ b/src/runloop_api_client/_base_client.py @@ -870,6 +870,12 @@ def _should_retry(self, response: httpx.Response) -> bool: # Retry on request timeouts. if response.status_code == 408: + # upload_file 408s mean the server gave up waiting for the request body. + # Retrying just queues another large request behind the same exhausted + # connection pool and amplifies the stall. + if response.request.url.path.endswith("/upload_file"): + log.debug("Not retrying upload_file 408 (request-body stall)") + return False log.debug("Retrying due to status code %i", response.status_code) return True @@ -931,6 +937,7 @@ class SyncAPIClient(BaseClient[httpx.Client, Stream[Any]]): _client: httpx.Client _default_stream_cls: type[Stream[Any]] | None = None _uses_shared_pool: bool + _connection_limits: httpx.Limits | None _closed: bool def __init__( @@ -945,6 +952,7 @@ def __init__( custom_query: Mapping[str, object] | None = None, _strict_response_validation: bool, shared_http_pool: bool = True, + connection_limits: httpx.Limits | None = None, ) -> None: if not is_given(timeout): # if the user passed in a custom http client with a non-default @@ -976,10 +984,20 @@ def __init__( ) self._closed = False + self._connection_limits = connection_limits if http_client is not None: self._client = http_client self._uses_shared_pool = False + elif connection_limits is not None: + # Custom limits get a private pool — the shared pool is process-global + # so per-client limits can't be respected through it. + self._client = SyncHttpxClientWrapper( + base_url=base_url, + timeout=cast(Timeout, timeout), + transport=httpx.HTTPTransport(limits=connection_limits, http2=True), + ) + self._uses_shared_pool = False elif shared_http_pool: global _shared_sync_transport with _pool_lock: @@ -1563,6 +1581,7 @@ class AsyncAPIClient(BaseClient[httpx.AsyncClient, AsyncStream[Any]]): _client: httpx.AsyncClient _default_stream_cls: type[AsyncStream[Any]] | None = None _uses_shared_pool: bool + _connection_limits: httpx.Limits | None _closed: bool def __init__( @@ -1577,6 +1596,7 @@ def __init__( custom_headers: Mapping[str, str] | None = None, custom_query: Mapping[str, object] | None = None, shared_http_pool: bool = True, + connection_limits: httpx.Limits | None = None, ) -> None: if not is_given(timeout): # if the user passed in a custom http client with a non-default @@ -1608,10 +1628,20 @@ def __init__( ) self._closed = False + self._connection_limits = connection_limits if http_client is not None: self._client = http_client self._uses_shared_pool = False + elif connection_limits is not None: + # Custom limits get a private pool — the shared pool is process-global + # so per-client limits can't be respected through it. + self._client = AsyncHttpxClientWrapper( + base_url=base_url, + timeout=cast(Timeout, timeout), + transport=httpx.AsyncHTTPTransport(limits=connection_limits, http2=True), + ) + self._uses_shared_pool = False elif shared_http_pool: try: loop: asyncio.AbstractEventLoop | None = asyncio.get_running_loop() diff --git a/src/runloop_api_client/_client.py b/src/runloop_api_client/_client.py index 9e422721e..5c1425ac2 100644 --- a/src/runloop_api_client/_client.py +++ b/src/runloop_api_client/_client.py @@ -94,6 +94,13 @@ def __init__( # Enables HTTP/2 multiplexing and avoids ConnectTimeout storms under high concurrency. # Set to False to create a private connection pool (old behavior). shared_http_pool: bool = True, + # Override the httpx connection pool limits (max concurrent connections and + # keep-alive size). When provided, the client gets a private pool with these + # limits and `shared_http_pool` is implicitly disabled — the shared pool is + # process-global so per-client limits can't be expressed through it. + # Use this to raise the cap above the default 100 for upload-heavy workloads, + # or to lower it in memory-constrained environments. + connection_limits: httpx.Limits | None = None, # Enable or disable schema validation for data returned by the API. # When enabled an error APIResponseValidationError is raised # if the API responds with invalid data for the expected schema. @@ -140,6 +147,7 @@ def __init__( custom_query=default_query, _strict_response_validation=_strict_response_validation, shared_http_pool=shared_http_pool, + connection_limits=connection_limits, ) self._idempotency_header = "x-request-id" @@ -276,6 +284,7 @@ def copy( timeout: float | Timeout | None | NotGiven = not_given, http_client: httpx.Client | None = None, shared_http_pool: bool | None = None, + connection_limits: httpx.Limits | None | NotGiven = not_given, max_retries: int | NotGiven = not_given, default_headers: Mapping[str, str] | None = None, set_default_headers: Mapping[str, str] | None = None, @@ -304,8 +313,12 @@ def copy( elif set_default_query is not None: params = set_default_query + resolved_limits = self._connection_limits if isinstance(connection_limits, NotGiven) else connection_limits + if http_client is not None: resolved_shared = False + elif resolved_limits is not None: + resolved_shared = False elif shared_http_pool is not None: resolved_shared = shared_http_pool else: @@ -317,6 +330,7 @@ def copy( timeout=self.timeout if isinstance(timeout, NotGiven) else timeout, http_client=http_client, shared_http_pool=resolved_shared, + connection_limits=resolved_limits, max_retries=max_retries if is_given(max_retries) else self.max_retries, default_headers=headers, default_query=params, @@ -382,6 +396,13 @@ def __init__( # Enables HTTP/2 multiplexing and avoids ConnectTimeout storms under high concurrency. # Set to False to create a private connection pool (old behavior). shared_http_pool: bool = True, + # Override the httpx connection pool limits (max concurrent connections and + # keep-alive size). When provided, the client gets a private pool with these + # limits and `shared_http_pool` is implicitly disabled — the shared pool is + # process-global so per-client limits can't be expressed through it. + # Use this to raise the cap above the default 100 for upload-heavy workloads, + # or to lower it in memory-constrained environments. + connection_limits: httpx.Limits | None = None, # Enable or disable schema validation for data returned by the API. # When enabled an error APIResponseValidationError is raised # if the API responds with invalid data for the expected schema. @@ -428,6 +449,7 @@ def __init__( custom_query=default_query, _strict_response_validation=_strict_response_validation, shared_http_pool=shared_http_pool, + connection_limits=connection_limits, ) self._idempotency_header = "x-request-id" @@ -564,6 +586,7 @@ def copy( timeout: float | Timeout | None | NotGiven = not_given, http_client: httpx.AsyncClient | None = None, shared_http_pool: bool | None = None, + connection_limits: httpx.Limits | None | NotGiven = not_given, max_retries: int | NotGiven = not_given, default_headers: Mapping[str, str] | None = None, set_default_headers: Mapping[str, str] | None = None, @@ -592,8 +615,12 @@ def copy( elif set_default_query is not None: params = set_default_query + resolved_limits = self._connection_limits if isinstance(connection_limits, NotGiven) else connection_limits + if http_client is not None: resolved_shared = False + elif resolved_limits is not None: + resolved_shared = False elif shared_http_pool is not None: resolved_shared = shared_http_pool else: @@ -605,6 +632,7 @@ def copy( timeout=self.timeout if isinstance(timeout, NotGiven) else timeout, http_client=http_client, shared_http_pool=resolved_shared, + connection_limits=resolved_limits, max_retries=max_retries if is_given(max_retries) else self.max_retries, default_headers=headers, default_query=params, diff --git a/src/runloop_api_client/_constants.py b/src/runloop_api_client/_constants.py index 88f944ce2..d6361c8ad 100644 --- a/src/runloop_api_client/_constants.py +++ b/src/runloop_api_client/_constants.py @@ -8,7 +8,7 @@ # default timeout is 30 seconds DEFAULT_TIMEOUT = httpx.Timeout(timeout=30, connect=5.0) DEFAULT_MAX_RETRIES = 5 -DEFAULT_CONNECTION_LIMITS = httpx.Limits(max_connections=20, max_keepalive_connections=10) +DEFAULT_CONNECTION_LIMITS = httpx.Limits(max_connections=100, max_keepalive_connections=20) INITIAL_RETRY_DELAY = 1.0 MAX_RETRY_DELAY = 60.0 diff --git a/src/runloop_api_client/_files.py b/src/runloop_api_client/_files.py index 1a013e434..aea6ca730 100644 --- a/src/runloop_api_client/_files.py +++ b/src/runloop_api_client/_files.py @@ -66,7 +66,11 @@ def _transform_file(file: FileTypes) -> HttpxFileTypes: if is_file_content(file): if isinstance(file, os.PathLike): path = pathlib.Path(file) - return (path.name, path.read_bytes()) + # Hand httpx an open file handle so the multipart encoder reads + # lazily. read_bytes() buffers the entire file in memory before + # the request is even built, which holds a connection-pool slot + # during the read and compounds upload_file stalls under concurrency. + return (path.name, path.open("rb")) return file @@ -107,8 +111,11 @@ async def async_to_httpx_files(files: RequestFiles | None) -> HttpxRequestFiles async def _async_transform_file(file: FileTypes) -> HttpxFileTypes: if is_file_content(file): if isinstance(file, os.PathLike): - path = anyio.Path(file) - return (path.name, await path.read_bytes()) + path = pathlib.Path(file) + # Same rationale as the sync path: avoid buffering the file in + # memory. httpx's multipart encoder will read from this handle in + # chunks as it serializes the request body. + return (path.name, path.open("rb")) return file diff --git a/tests/test_client.py b/tests/test_client.py index 7728bf5bb..cc89603b5 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1082,6 +1082,51 @@ def test_follow_redirects_disabled(self, respx_mock: MockRouter, client: Runloop assert exc_info.value.response.status_code == 302 assert exc_info.value.response.headers["Location"] == f"{base_url}/redirected" + @mock.patch("runloop_api_client._base_client.BaseClient._calculate_retry_timeout", _low_retry_timeout) + @pytest.mark.respx(base_url=base_url) + def test_upload_file_408_does_not_retry(self, respx_mock: MockRouter, client: Runloop) -> None: + # 408 on upload_file means the server gave up waiting for the request body. + # Retrying piles another large request onto the same exhausted connection + # pool and amplifies the stall, so the SDK must NOT auto-retry these. + client = client.with_options(max_retries=4) + + call_count = 0 + + def handler(_request: httpx.Request) -> httpx.Response: + nonlocal call_count + call_count += 1 + return httpx.Response(408) + + respx_mock.post("/v1/devboxes/dbx_test/upload_file").mock(side_effect=handler) + + with pytest.raises(APIStatusError) as exc_info: + client.devboxes.upload_file(id="dbx_test", path="/tmp/foo", file=b"payload") + + assert exc_info.value.response.status_code == 408 + assert call_count == 1, f"upload_file 408 should not retry, got {call_count} attempts" + + @mock.patch("runloop_api_client._base_client.BaseClient._calculate_retry_timeout", _low_retry_timeout) + @pytest.mark.respx(base_url=base_url) + def test_non_upload_file_408_still_retries(self, respx_mock: MockRouter, client: Runloop) -> None: + # Sanity check that the upload_file carve-out didn't break the general + # 408-retries-by-default behavior for other routes. + client = client.with_options(max_retries=2) + + call_count = 0 + + def handler(_request: httpx.Request) -> httpx.Response: + nonlocal call_count + call_count += 1 + return httpx.Response(408) + + respx_mock.post("/v1/devboxes").mock(side_effect=handler) + + with pytest.raises(APIStatusError): + client.devboxes.create() + + # initial attempt + 2 retries + assert call_count == 3 + class TestAsyncRunloop: @pytest.mark.respx(base_url=base_url) diff --git a/tests/test_files.py b/tests/test_files.py index b5ee73ae2..11a377ba7 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -1,8 +1,9 @@ +import io from pathlib import Path import anyio import pytest -from dirty_equals import IsDict, IsList, IsBytes, IsTuple +from dirty_equals import IsDict, IsList, IsTuple, IsInstance from runloop_api_client._files import to_httpx_files, deepcopy_with_paths, async_to_httpx_files from runloop_api_client._utils import extract_files @@ -10,37 +11,63 @@ readme_path = Path(__file__).parent.parent.joinpath("README.md") +def _close_file_handles(value: object) -> None: + """Recursively close any open file handles in to_httpx_files output. + + The transform now returns streaming file handles instead of bytes, so tests + that don't actually issue a request must close them to avoid resource warnings. + """ + if isinstance(value, io.IOBase): + value.close() + elif isinstance(value, dict): + for v in value.values(): + _close_file_handles(v) + elif isinstance(value, (list, tuple)): + for v in value: + _close_file_handles(v) + + def test_pathlib_includes_file_name() -> None: result = to_httpx_files({"file": readme_path}) - print(result) - assert result == IsDict({"file": IsTuple("README.md", IsBytes())}) + try: + assert result == IsDict({"file": IsTuple("README.md", IsInstance(io.IOBase))}) + finally: + _close_file_handles(result) def test_tuple_input() -> None: result = to_httpx_files([("file", readme_path)]) - print(result) - assert result == IsList(IsTuple("file", IsTuple("README.md", IsBytes()))) + try: + assert result == IsList(IsTuple("file", IsTuple("README.md", IsInstance(io.IOBase)))) + finally: + _close_file_handles(result) @pytest.mark.asyncio async def test_async_pathlib_includes_file_name() -> None: result = await async_to_httpx_files({"file": readme_path}) - print(result) - assert result == IsDict({"file": IsTuple("README.md", IsBytes())}) + try: + assert result == IsDict({"file": IsTuple("README.md", IsInstance(io.IOBase))}) + finally: + _close_file_handles(result) @pytest.mark.asyncio async def test_async_supports_anyio_path() -> None: result = await async_to_httpx_files({"file": anyio.Path(readme_path)}) - print(result) - assert result == IsDict({"file": IsTuple("README.md", IsBytes())}) + try: + assert result == IsDict({"file": IsTuple("README.md", IsInstance(io.IOBase))}) + finally: + _close_file_handles(result) @pytest.mark.asyncio async def test_async_tuple_input() -> None: result = await async_to_httpx_files([("file", readme_path)]) - print(result) - assert result == IsList(IsTuple("file", IsTuple("README.md", IsBytes()))) + try: + assert result == IsList(IsTuple("file", IsTuple("README.md", IsInstance(io.IOBase)))) + finally: + _close_file_handles(result) def test_string_not_allowed() -> None: diff --git a/tests/test_shared_pool.py b/tests/test_shared_pool.py index 4220f8ba9..261db94ae 100644 --- a/tests/test_shared_pool.py +++ b/tests/test_shared_pool.py @@ -288,6 +288,106 @@ async def test_copy_with_custom_client_disables_sharing(self): assert c2._client is custom +class TestSyncConnectionLimits: + def test_custom_limits_force_private_pool(self): + limits = httpx.Limits(max_connections=42, max_keepalive_connections=7) + c = _make_client(connection_limits=limits) + + assert c._uses_shared_pool is False + assert c._connection_limits is limits + transport = _get_transport(c) + # Real httpx.HTTPTransport (not the shared wrapper) + assert isinstance(transport, httpx.HTTPTransport) + # Pool actually got the requested limits + assert transport._pool._max_connections == 42 + assert transport._pool._max_keepalive_connections == 7 + + c.close() + + def test_custom_limits_override_shared_pool_request(self): + # Even with shared_http_pool=True (default), explicit limits take precedence + # and the client gets a private pool. + limits = httpx.Limits(max_connections=5) + c = _make_client(shared_http_pool=True, connection_limits=limits) + + assert c._uses_shared_pool is False + + c.close() + + def test_copy_inherits_connection_limits(self): + limits = httpx.Limits(max_connections=25) + c1 = _make_client(connection_limits=limits) + c2 = c1.copy() + + assert c2._connection_limits is limits + assert c2._uses_shared_pool is False + assert _get_transport(c2)._pool._max_connections == 25 + + c1.close() + c2.close() + + def test_copy_can_override_connection_limits(self): + c1 = _make_client(connection_limits=httpx.Limits(max_connections=25)) + new_limits = httpx.Limits(max_connections=200, max_keepalive_connections=50) + c2 = c1.copy(connection_limits=new_limits) + + assert c2._connection_limits is new_limits + assert _get_transport(c2)._pool._max_connections == 200 + + c1.close() + c2.close() + + def test_copy_can_reset_connection_limits(self): + c1 = _make_client(connection_limits=httpx.Limits(max_connections=25)) + # Passing None resets to the SDK default limits but keeps the inherited + # sharing mode (parent was private, so the copy stays private). Use + # shared_http_pool=True alongside to also re-enable sharing. + c2 = c1.copy(connection_limits=None) + + assert c2._connection_limits is None + assert c2._uses_shared_pool is False + + c3 = c1.copy(connection_limits=None, shared_http_pool=True) + assert c3._connection_limits is None + assert c3._uses_shared_pool is True + + c1.close() + c2.close() + c3.close() + + +class TestAsyncConnectionLimits: + async def test_custom_limits_force_private_pool(self): + limits = httpx.Limits(max_connections=42, max_keepalive_connections=7) + c = _make_async_client(connection_limits=limits) + + assert c._uses_shared_pool is False + assert c._connection_limits is limits + transport = _get_transport(c) + assert isinstance(transport, httpx.AsyncHTTPTransport) + assert transport._pool._max_connections == 42 + assert transport._pool._max_keepalive_connections == 7 + + await c.close() + + async def test_custom_limits_override_shared_pool_request(self): + limits = httpx.Limits(max_connections=5) + c = _make_async_client(shared_http_pool=True, connection_limits=limits) + + assert c._uses_shared_pool is False + + await c.close() + + async def test_copy_inherits_connection_limits(self): + limits = httpx.Limits(max_connections=25) + c1 = _make_async_client(connection_limits=limits) + c2 = c1.copy() + + assert c2._connection_limits is limits + assert c2._uses_shared_pool is False + assert _get_transport(c2)._pool._max_connections == 25 + + class TestAsyncCrossLoop: def test_separate_loops_get_separate_transports(self): """Clients created in different asyncio.run() calls must not share a transport."""