Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,8 @@ def __init__(self, request: HTTPRequest):
class $3L:
""\"An asynchronous HTTP client solely for testing purposes.""\"
TIMEOUT_EXCEPTIONS = ()
def __init__(self, *, client_config: HTTPClientConfiguration | None = None):
self._client_config = client_config
Expand All @@ -644,6 +646,8 @@ async def send(
class $4L:
""\"An asynchronous HTTP client solely for testing purposes.""\"
TIMEOUT_EXCEPTIONS = ()
def __init__(
self,
*,
Expand Down
45 changes: 26 additions & 19 deletions packages/smithy-core/src/smithy_core/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ..auth import AuthParams
from ..deserializers import DeserializeableShape, ShapeDeserializer
from ..endpoints import EndpointResolverParams
from ..exceptions import RetryError, SmithyError
from ..exceptions import ClientTimeoutError, RetryError, SmithyError
from ..interceptors import (
InputContext,
Interceptor,
Expand Down Expand Up @@ -448,24 +448,31 @@ async def _handle_attempt[I: SerializeableShape, O: DeserializeableShape](

_LOGGER.debug("Sending request %s", request_context.transport_request)

if request_future is not None:
# If we have an input event stream (or duplex event stream) then we
# need to let the client return ASAP so that it can start sending
# events. So here we start the transport send in a background task
# then set the result of the request future. It's important to sequence
# it just like that so that the client gets a stream that's ready
# to send.
transport_task = asyncio.create_task(
self.transport.send(request=request_context.transport_request)
)
request_future.set_result(request_context)
transport_response = await transport_task
else:
# If we don't have an input stream, there's no point in creating a
# task, so we just immediately await the coroutine.
transport_response = await self.transport.send(
request=request_context.transport_request
)
try:
if request_future is not None:
# If we have an input event stream (or duplex event stream) then we
# need to let the client return ASAP so that it can start sending
# events. So here we start the transport send in a background task
# then set the result of the request future. It's important to sequence
# it just like that so that the client gets a stream that's ready
# to send.
transport_task = asyncio.create_task(
self.transport.send(request=request_context.transport_request)
)
request_future.set_result(request_context)
transport_response = await transport_task
else:
# If we don't have an input stream, there's no point in creating a
# task, so we just immediately await the coroutine.
transport_response = await self.transport.send(
request=request_context.transport_request
)
except Exception as e:
if isinstance(e, self.transport.TIMEOUT_EXCEPTIONS):
raise ClientTimeoutError(
message=f"Client timeout occurred: {e}"
) from e
raise

_LOGGER.debug("Received response: %s", transport_response)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ async def resolve_endpoint(self, params: EndpointResolverParams[Any]) -> Endpoin


class ClientTransport[I: Request, O: Response](Protocol):
"""Protocol-agnostic representation of a client tranport (e.g. an HTTP client)."""
"""Protocol-agnostic representation of a client transport (e.g. an HTTP client)."""

TIMEOUT_EXCEPTIONS: tuple[type[Exception], ...]

async def send(self, request: I) -> O:
"""Send a request over the transport and receive the response."""
Expand Down
17 changes: 17 additions & 0 deletions packages/smithy-core/src/smithy_core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class CallError(SmithyError):
is_throttling_error: bool = False
"""Whether the error is a throttling error."""

is_timeout_error: bool = False
"""Whether the error represents a timeout condition."""

def __post_init__(self):
super().__init__(self.message)

Expand All @@ -61,6 +64,20 @@ class ModeledError(CallError):
fault: Fault = "client"


@dataclass(kw_only=True)
class ClientTimeoutError(CallError):
"""Exception raised when a client-side timeout occurs.

This error indicates that the client transport layer encountered a timeout while
attempting to communicate with the server. This typically occurs when network
requests take longer than the configured timeout period.
"""

fault: Fault = "client"
is_timeout_error: bool = True
is_retry_safe: bool | None = True


class SerializationError(SmithyError):
"""Base exception type for exceptions raised during serialization."""

Expand Down
3 changes: 3 additions & 0 deletions packages/smithy-core/src/smithy_core/interfaces/retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ class ErrorRetryInfo(Protocol):
is_throttling_error: bool = False
"""Whether the error is a throttling error."""

is_timeout_error: bool = False
"""Whether the error is a timeout error."""


class RetryBackoffStrategy(Protocol):
"""Stateless strategy for computing retry delays based on retry attempt account."""
Expand Down
7 changes: 6 additions & 1 deletion packages/smithy-core/src/smithy_core/retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,12 @@ def acquire(self, *, error: Exception) -> int:
If there's insufficient capacity available, raise an exception.
Otherwise, return the amount of capacity successfully allocated.
"""
capacity_amount = self.RETRY_COST

is_timeout = (
isinstance(error, retries_interface.ErrorRetryInfo)
and error.is_timeout_error
)
capacity_amount = self.TIMEOUT_RETRY_COST if is_timeout else self.RETRY_COST

with self._lock:
if capacity_amount > self._available_capacity:
Expand Down
9 changes: 9 additions & 0 deletions packages/smithy-core/tests/unit/test_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,12 @@ def test_retry_quota_release_caps_at_max(
# Release more than we acquired. Should cap at initial capacity.
retry_quota.release(release_amount=50)
assert retry_quota.available_capacity == 10


def test_retry_quota_acquire_timeout_error(
retry_quota: StandardRetryQuota,
) -> None:
timeout_error = CallError(is_timeout_error=True, is_retry_safe=True)
acquired = retry_quota.acquire(error=timeout_error)
assert acquired == StandardRetryQuota.TIMEOUT_RETRY_COST
assert retry_quota.available_capacity == 0
2 changes: 2 additions & 0 deletions packages/smithy-http/src/smithy_http/aio/aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def __post_init__(self) -> None:
class AIOHTTPClient(HTTPClient):
"""Implementation of :py:class:`.interfaces.HTTPClient` using aiohttp."""

TIMEOUT_EXCEPTIONS = (TimeoutError,)

def __init__(
self,
*,
Expand Down
32 changes: 23 additions & 9 deletions packages/smithy-http/src/smithy_http/aio/crt.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from inspect import iscoroutinefunction
from typing import TYPE_CHECKING, Any

from awscrt.exceptions import AwsCrtError

if TYPE_CHECKING:
# pyright doesn't like optional imports. This is reasonable because if we use these
# in type hints then they'd result in runtime errors.
Expand Down Expand Up @@ -129,9 +131,16 @@ def __post_init__(self) -> None:
_assert_crt()


class _CRTTimeoutError(Exception):
"""Internal wrapper for CRT timeout errors."""


class AWSCRTHTTPClient(http_aio_interfaces.HTTPClient):
_HTTP_PORT = 80
_HTTPS_PORT = 443
_TIMEOUT_ERROR_NAMES = frozenset(["AWS_IO_SOCKET_TIMEOUT", "AWS_IO_SOCKET_CLOSED"])

TIMEOUT_EXCEPTIONS = (_CRTTimeoutError,)

def __init__(
self,
Expand Down Expand Up @@ -163,18 +172,23 @@ async def send(
:param request: The request including destination URI, fields, payload.
:param request_config: Configuration specific to this request.
"""
crt_request = self._marshal_request(request)
connection = await self._get_connection(request.destination)
try:
crt_request = self._marshal_request(request)
connection = await self._get_connection(request.destination)

# Convert body to async iterator for request_body_generator
body_generator = self._create_body_generator(request.body)
# Convert body to async iterator for request_body_generator
body_generator = self._create_body_generator(request.body)

crt_stream = connection.request(
crt_request,
request_body_generator=body_generator,
)
crt_stream = connection.request(
crt_request,
request_body_generator=body_generator,
)

return await self._await_response(crt_stream)
return await self._await_response(crt_stream)
except AwsCrtError as e:
if e.name in self._TIMEOUT_ERROR_NAMES:
raise _CRTTimeoutError() from e
raise

async def _await_response(
self, stream: "AIOHttpClientStreamUnified"
Expand Down
11 changes: 8 additions & 3 deletions packages/smithy-http/src/smithy_http/aio/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ async def _create_error(
)
return error_shape.deserialize(deserializer)

is_throttle = response.status == 429
message = (
f"Unknown error for operation {operation.schema.id} "
f"- status: {response.status}"
Expand All @@ -224,11 +223,17 @@ async def _create_error(
message += f" - id: {error_id}"
if response.reason is not None:
message += f" - reason: {response.status}"

is_timeout = response.status == 408
is_throttle = response.status == 429
fault = "client" if response.status < 500 else "server"

return CallError(
message=message,
fault="client" if response.status < 500 else "server",
fault=fault,
is_throttling_error=is_throttle,
is_retry_safe=is_throttle or None,
is_timeout_error=is_timeout,
is_retry_safe=is_throttle or is_timeout or None,
)

def _matches_content_type(self, response: HTTPResponse) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions packages/smithy-http/src/smithy_http/testing/mockhttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class MockHTTPClient(HTTPClient):
requests are captured for inspection.
"""

TIMEOUT_EXCEPTIONS = (TimeoutError,)

def __init__(
self,
*,
Expand Down
4 changes: 2 additions & 2 deletions packages/smithy-http/tests/unit/aio/test_protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from smithy_http.aio.protocols import HttpClientProtocol


class TestProtocol(HttpClientProtocol):
class MockProtocol(HttpClientProtocol):
_id = ShapeID("ns.foo#bar")

@property
Expand Down Expand Up @@ -125,7 +125,7 @@ def deserialize_response(
def test_http_protocol_joins_uris(
request_uri: URI, endpoint_uri: URI, expected: URI
) -> None:
protocol = TestProtocol()
protocol = MockProtocol()
request = HTTPRequest(
destination=request_uri,
method="GET",
Expand Down
Loading