From c883376087d0e9afd4e7f81114bb2b579c023f22 Mon Sep 17 00:00:00 2001 From: JordonPhillips Date: Thu, 13 Mar 2025 16:19:20 +0100 Subject: [PATCH 1/3] Centralize event stream wrappers --- designs/event-streams.md | 92 +++--- packages/aws-event-stream/pyproject.toml | 1 - .../_private/deserializers.py | 60 ---- .../aws_event_stream/_private/serializers.py | 61 +--- .../src/aws_event_stream/aio/__init__.py | 291 ++++++------------ .../tests/unit/_private/test_deserializers.py | 14 +- .../tests/unit/_private/test_serializers.py | 12 +- .../src/smithy_core/aio/eventstream.py} | 199 ++++-------- .../smithy_core/aio/interfaces/__init__.py | 54 +++- .../smithy_core/aio/interfaces/eventstream.py | 69 +++++ packages/smithy-event-stream/NOTICE | 1 - packages/smithy-event-stream/README.md | 0 packages/smithy-event-stream/pyproject.toml | 21 -- .../src/smithy_event_stream/__init__.py | 6 - .../src/smithy_event_stream/aio/__init__.py | 2 - .../src/smithy_event_stream/py.typed | 0 packages/smithy-event-stream/tests/py.typed | 0 uv.lock | 18 +- 18 files changed, 346 insertions(+), 555 deletions(-) rename packages/{smithy-event-stream/src/smithy_event_stream/aio/interfaces.py => smithy-core/src/smithy_core/aio/eventstream.py} (52%) create mode 100644 packages/smithy-core/src/smithy_core/aio/interfaces/eventstream.py delete mode 100644 packages/smithy-event-stream/NOTICE delete mode 100644 packages/smithy-event-stream/README.md delete mode 100644 packages/smithy-event-stream/pyproject.toml delete mode 100644 packages/smithy-event-stream/src/smithy_event_stream/__init__.py delete mode 100644 packages/smithy-event-stream/src/smithy_event_stream/aio/__init__.py delete mode 100644 packages/smithy-event-stream/src/smithy_event_stream/py.typed delete mode 100644 packages/smithy-event-stream/tests/py.typed diff --git a/designs/event-streams.md b/designs/event-streams.md index 420d183dd..89eb8025d 100644 --- a/designs/event-streams.md +++ b/designs/event-streams.md @@ -53,6 +53,8 @@ async with publisher: publisher.send(FooEvent(foo="bar")) ``` +Protocol implementations will be responsible for creating publishers. + ## Event Receivers An `AsyncEventReceiver` is used to receive events from a service. @@ -131,6 +133,8 @@ async for event in reciever: handle_event(event) ``` +Protocol implementations will be responsible for creating receivers. + ### Errors Event streams may define modeled errors that may be sent over the stream. These @@ -169,38 +173,30 @@ are handled by the following classes: * `OutputEventStream` is returned when the operation only has an output stream. ```python -class DuplexEventStream[I: SerializableShape, O: DeserializableShape, R](Protocol): - - input_stream: AsyncEventPublisher[I] - - _output_stream: AsyncEventReceiver[O] | None = None - _response: R | None = None - - @property - def output_stream(self) -> AsyncEventReceiver[O] | None: - return self._output_stream - - @output_stream.setter - def output_stream(self, value: AsyncEventReceiver[O]) -> None: - self._output_stream = value - - @property - def response(self) -> R | None: - return self._response - - @response.setter - def response(self, value: R) -> None: - self._response = value - - async def await_output(self) -> tuple[R, AsyncEventReceiver[O]]: +class DuplexEventStream[ + IE: SerializeableShape, + OE: DeserializeableShape, + O: DeserializeableShape, +]: + + input_stream: EventPublisher[IE] + output_stream: EventReceiver[OE] | None = None + response: O | None = None + + def __init__( + self, + *, + input_stream: EventPublisher[IE], + output_future: Future[tuple[O, EventReceiver[OE]]], + ) -> None: + self.input_stream = input_stream + self._output_future = output_future + + async def await_output(self) -> tuple[O, EventReceiver[OE]]: ... async def close(self) -> None: - if self.output_stream is None: - _, self.output_stream = await self.await_output() - - await self.input_stream.close() - await self.output_stream.close() + ... async def __aenter__(self) -> Self: return self @@ -209,21 +205,21 @@ class DuplexEventStream[I: SerializableShape, O: DeserializableShape, R](Protoco await self.close() -class InputEventStream[I: SerializableShape, R](Protocol): +class InputEventStream[IE: SerializeableShape, O]: - input_stream: AsyncEventPublisher[I] + input_stream: EventPublisher[IE] + response: O | None = None - _response: R | None = None + def __init__( + self, + *, + input_stream: EventPublisher[IE], + output_future: Future[O], + ) -> None: + self.input_stream = input_stream + self._output_future = output_future - @property - def response(self) -> R | None: - return self._response - - @response.setter - def response(self, value: R) -> None: - self._response = value - - async def await_output(self) -> R: + async def await_output(self) -> O: ... async def close(self) -> None: @@ -236,11 +232,14 @@ class InputEventStream[I: SerializableShape, R](Protocol): await self.close() -class OutputEventStream[O: DeserializableShape, R](Protocol): +class OutputEventStream[OE: DeserializeableShape, O: DeserializeableShape]: - output_stream: AsyncEventReceiver[O] - - response: R + output_stream: EventReceiver[OE] + response: O + + def __init__(self, output_stream: EventReceiver[OE], output: O) -> None: + self.output_stream = output_stream + self.response = output async def close(self) -> None: await self.output_stream.close() @@ -290,6 +289,9 @@ with await client.output_operation() as stream: handle_event(event) ``` +All three output types are centrally located and will be constructed by filling +in the relevant publishers and receivers from the protocol implementation. + ## Event Structure Event messages are structurally similar to HTTP messages. They consist of a map diff --git a/packages/aws-event-stream/pyproject.toml b/packages/aws-event-stream/pyproject.toml index ec994ea15..a064f9fa9 100644 --- a/packages/aws-event-stream/pyproject.toml +++ b/packages/aws-event-stream/pyproject.toml @@ -6,7 +6,6 @@ readme = "README.md" requires-python = ">=3.12" dependencies = [ "smithy-core", - "smithy-event-stream", ] [build-system] diff --git a/packages/aws-event-stream/src/aws_event_stream/_private/deserializers.py b/packages/aws-event-stream/src/aws_event_stream/_private/deserializers.py index dadf8edc0..4118204d3 100644 --- a/packages/aws-event-stream/src/aws_event_stream/_private/deserializers.py +++ b/packages/aws-event-stream/src/aws_event_stream/_private/deserializers.py @@ -1,21 +1,17 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -import asyncio import datetime import logging from collections.abc import Callable -from smithy_core.aio.interfaces import AsyncByteStream from smithy_core.codecs import Codec from smithy_core.deserializers import ( - DeserializeableShape, ShapeDeserializer, SpecificShapeDeserializer, ) from smithy_core.schemas import Schema from smithy_core.shapes import ShapeType from smithy_core.utils import expect_type -from smithy_event_stream.aio.interfaces import AsyncEventReceiver from ..events import HEADERS_DICT, Event from ..exceptions import EventError, UnmodeledEventError @@ -31,62 +27,6 @@ INITIAL_MESSAGE_TYPES = (INITIAL_REQUEST_EVENT_TYPE, INITIAL_RESPONSE_EVENT_TYPE) -class AWSAsyncEventReceiver[E: DeserializeableShape](AsyncEventReceiver[E]): - def __init__( - self, - payload_codec: Codec, - source: AsyncByteStream, - deserializer: Callable[[ShapeDeserializer], E], - is_client_mode: bool = True, - ) -> None: - self._payload_codec = payload_codec - self._source = source - self._is_client_mode = is_client_mode - self._deserializer = deserializer - self._closed = False - - async def receive(self) -> E | None: - if self._closed: - return None - - try: - event = await Event.decode_async(self._source) - except Exception as e: - await self.close() - if not isinstance(e, EventError): - raise IOError("Failed to read from stream.") from e - raise - - if event is None: - logger.debug("No event received from the source.") - return None - logger.debug("Received raw event: %s", event) - - deserializer = EventDeserializer( - event=event, - payload_codec=self._payload_codec, - is_client_mode=self._is_client_mode, - ) - result = self._deserializer(deserializer) - logger.debug("Successfully deserialized event: %s", result) - if isinstance(getattr(result, "value"), Exception): - raise result.value # type: ignore - return result - - async def close(self) -> None: - if self._closed: - return - self._closed = True - - if (close := getattr(self._source, "close", None)) is not None: - if asyncio.iscoroutine(result := close()): - await result - - @property - def closed(self) -> bool: - return self._closed - - class EventDeserializer(SpecificShapeDeserializer): def __init__( self, event: Event, payload_codec: Codec, is_client_mode: bool = True diff --git a/packages/aws-event-stream/src/aws_event_stream/_private/serializers.py b/packages/aws-event-stream/src/aws_event_stream/_private/serializers.py index 3c98a570a..31594ea3c 100644 --- a/packages/aws-event-stream/src/aws_event_stream/_private/serializers.py +++ b/packages/aws-event-stream/src/aws_event_stream/_private/serializers.py @@ -1,25 +1,20 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -import asyncio import datetime import logging -from collections.abc import Callable, Iterator +from collections.abc import Iterator from contextlib import contextmanager from io import BytesIO from typing import Never -from smithy_core.aio.interfaces import AsyncWriter from smithy_core.codecs import Codec -from smithy_core.exceptions import ExpectationNotMetException from smithy_core.schemas import Schema from smithy_core.serializers import ( InterceptingSerializer, - SerializeableShape, ShapeSerializer, SpecificShapeSerializer, ) from smithy_core.shapes import ShapeType -from smithy_event_stream.aio.interfaces import AsyncEventPublisher from ..events import EventMessage, HEADER_VALUE, Short, Byte, Long from ..exceptions import InvalidHeaderValue @@ -36,60 +31,6 @@ _DEFAULT_BLOB_CONTENT_TYPE = "application/octet-stream" -type Signer = Callable[[EventMessage], EventMessage] -"""A function that takes an event message and signs it, and returns it signed.""" - - -class AWSAsyncEventPublisher[E: SerializeableShape](AsyncEventPublisher[E]): - def __init__( - self, - payload_codec: Codec, - async_writer: AsyncWriter, - signer: Signer | None = None, - is_client_mode: bool = True, - ): - self._writer = async_writer - self._signer = signer - self._serializer = EventSerializer( - payload_codec=payload_codec, is_client_mode=is_client_mode - ) - self._closed = False - - async def send(self, event: E) -> None: - if self._closed: - raise IOError("Attempted to write to closed stream.") - logger.debug("Preparing to publish event: %s", event) - event.serialize(self._serializer) - result = self._serializer.get_result() - if result is None: - raise ExpectationNotMetException( - "Expected an event message to be serialized, but was None." - ) - if self._signer is not None: - result = self._signer(result) - - encoded_result = result.encode() - try: - logger.debug("Publishing serialized event: %s", result) - await self._writer.write(encoded_result) - except Exception as e: - await self.close() - raise IOError("Failed to write to stream.") from e - - async def close(self) -> None: - if self._closed: - return - self._closed = True - - if (close := getattr(self._writer, "close", None)) is not None: - if asyncio.iscoroutine(result := close()): - await result - - @property - def closed(self) -> bool: - return self._closed - - class EventSerializer(SpecificShapeSerializer): def __init__( self, diff --git a/packages/aws-event-stream/src/aws_event_stream/aio/__init__.py b/packages/aws-event-stream/src/aws_event_stream/aio/__init__.py index a71053c91..4403a9af7 100644 --- a/packages/aws-event-stream/src/aws_event_stream/aio/__init__.py +++ b/packages/aws-event-stream/src/aws_event_stream/aio/__init__.py @@ -1,223 +1,130 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 +import asyncio +import logging from collections.abc import Callable -from typing import Self, Awaitable -from smithy_core.aio.interfaces import AsyncByteStream, AsyncWriter, Response -from smithy_core.aio.types import AsyncBytesReader +from smithy_core.aio.interfaces import AsyncByteStream from smithy_core.codecs import Codec from smithy_core.deserializers import DeserializeableShape, ShapeDeserializer +from smithy_core.exceptions import ExpectationNotMetException from smithy_core.serializers import SerializeableShape -from smithy_event_stream.aio.interfaces import ( - AsyncEventReceiver, - DuplexEventStream, - InputEventStream, - OutputEventStream, -) - -from .._private.deserializers import AWSAsyncEventReceiver as _AWSEventReceiver -from .._private.serializers import AWSAsyncEventPublisher as _AWSEventPublisher -from .._private.serializers import Signer -from ..exceptions import MissingInitialResponse - - -class AWSDuplexEventStream[ - I: SerializeableShape, - O: DeserializeableShape, - R: DeserializeableShape, -](DuplexEventStream[I, O, R]): - """A duplex event stream using the application/vnd.amazon.eventstream format.""" +from smithy_core.aio.interfaces.eventstream import EventPublisher, EventReceiver +from smithy_core.aio.interfaces import AsyncWriter - def __init__( - self, - payload_codec: Codec, - async_writer: AsyncWriter, - deserializer: Callable[[ShapeDeserializer], O], - awaitable_response: Awaitable[Response], - awaitable_output: Awaitable[R], - deserializeable_response: type[R] | None = None, - signer: Signer | None = None, - is_client_mode: bool = True, - ) -> None: - """Construct an AWSDuplexEventStream. - - :param payload_codec: The codec to encode the event payload with. - :param async_writer: The writer to write event bytes to. - :param deserializer: A callable to deserialize events with. This should be the - union's deserialize method. - :param async_reader: The reader to read event bytes from, if available. If not - immediately available, output will be blocked on it becoming available. - :param initial_response: The deserialized operation response, if available. If - not immediately available, output will be blocked on it becoming available. - :param deserializeable_response: The deserializeable response class. Setting - this indicates that the initial response is sent over the event stream. The - deserialize method of this class will be used to deserialize it upon - calling ``await_output``. - :param signer: An optional callable to sign events with prior to them being - encoded. - :param is_client_mode: Whether the stream is being constructed for a client or - server implementation. - """ - self.input_stream = _AWSEventPublisher( - payload_codec=payload_codec, - async_writer=async_writer, - signer=signer, - is_client_mode=is_client_mode, - ) +from .._private.serializers import EventSerializer as _EventSerializer +from .._private.deserializers import EventDeserializer as _EventDeserializer +from ..events import Event, EventMessage +from ..exceptions import EventError - self._deserializer = deserializer - self._payload_codec = payload_codec - self._is_client_mode = is_client_mode - self._deserializeable_response = deserializeable_response - - self._awaitable_response = awaitable_response - self._awaitable_output = awaitable_output - self.response: R | None = None - - async def await_output(self) -> tuple[R, AsyncEventReceiver[O]]: - try: - async_reader = AsyncBytesReader((await self._awaitable_response).body) - if self.output_stream is None: - self.output_stream = _AWSEventReceiver[O]( - payload_codec=self._payload_codec, - source=async_reader, - deserializer=self._deserializer, - is_client_mode=self._is_client_mode, - ) - - if self.response is None: - if self._deserializeable_response is None: - initial_response = await self._awaitable_output - else: - initial_response_stream = _AWSEventReceiver( - payload_codec=self._payload_codec, - source=async_reader, - deserializer=self._deserializeable_response.deserialize, - is_client_mode=self._is_client_mode, - ) - initial_response = await initial_response_stream.receive() - if initial_response is None: - raise MissingInitialResponse() - self.response = initial_response - else: - initial_response = self.response - except Exception: - await self.input_stream.close() - raise +logger = logging.getLogger(__name__) - return initial_response, self.output_stream +type Signer = Callable[[EventMessage], EventMessage] +"""A function that takes an event message and signs it, and returns it signed.""" -class AWSInputEventStream[I: SerializeableShape, R](InputEventStream[I, R]): - """An input event stream using the application/vnd.amazon.eventstream format.""" +class AWSEventPublisher[E: SerializeableShape](EventPublisher[E]): def __init__( self, payload_codec: Codec, async_writer: AsyncWriter, - awaitable_output: Awaitable[R], signer: Signer | None = None, is_client_mode: bool = True, - ) -> None: - """Construct an AWSInputEventStream. - - :param payload_codec: The codec to encode the event payload with. - :param async_writer: The writer to write event bytes to. - :param initial_response: The deserialized operation response, if available. - :param signer: An optional callable to sign events with prior to them being - encoded. - :param is_client_mode: Whether the stream is being constructed for a client or - server implementation. - """ - self.response: R | None = None - self._awaitable_response = awaitable_output - - self.input_stream = _AWSEventPublisher( - payload_codec=payload_codec, - async_writer=async_writer, - signer=signer, - is_client_mode=is_client_mode, + ): + self._writer = async_writer + self._signer = signer + self._serializer = _EventSerializer( + payload_codec=payload_codec, is_client_mode=is_client_mode ) + self._closed = False + + async def send(self, event: E) -> None: + if self._closed: + raise IOError("Attempted to write to closed stream.") + logger.debug("Preparing to publish event: %s", event) + event.serialize(self._serializer) + result = self._serializer.get_result() + if result is None: + raise ExpectationNotMetException( + "Expected an event message to be serialized, but was None." + ) + if self._signer is not None: + result = self._signer(result) + + encoded_result = result.encode() + try: + logger.debug("Publishing serialized event: %s", result) + await self._writer.write(encoded_result) + except Exception as e: + await self.close() + raise IOError("Failed to write to stream.") from e - async def await_output(self) -> R: - if self.response is None: - try: - self.response = await self._awaitable_response - except Exception: - await self.input_stream.close() - raise - return self.response + async def close(self) -> None: + if self._closed: + return + self._closed = True + if (close := getattr(self._writer, "close", None)) is not None: + if asyncio.iscoroutine(result := close()): + await result -class AWSOutputEventStream[O: DeserializeableShape, R: DeserializeableShape]( - OutputEventStream[O, R] -): - """An output event stream using the application/vnd.amazon.eventstream format.""" + @property + def closed(self) -> bool: + return self._closed + +class AWSEventReceiver[E: DeserializeableShape](EventReceiver[E]): def __init__( self, payload_codec: Codec, - initial_response: R, - async_reader: AsyncByteStream, - deserializer: Callable[[ShapeDeserializer], O], + source: AsyncByteStream, + deserializer: Callable[[ShapeDeserializer], E], is_client_mode: bool = True, ) -> None: - """Construct an AWSOutputEventStream. - - :param payload_codec: The codec to decode event payloads with. - :param initial_response: The deserialized operation response. If this is not - available immediately, use ``AWSOutputEventStream.create``. - :param async_reader: An async reader to read event bytes from. - :param deserializer: A callable to deserialize events with. This should be the - union's deserialize method. - :param is_client_mode: Whether the stream is being constructed for a client or - server implementation. - """ - self.response = initial_response - self.output_stream = _AWSEventReceiver[O]( - payload_codec=payload_codec, - source=async_reader, - deserializer=deserializer, - is_client_mode=is_client_mode, - ) + self._payload_codec = payload_codec + self._source = source + self._is_client_mode = is_client_mode + self._deserializer = deserializer + self._closed = False - @classmethod - async def create( - cls, - payload_codec: Codec, - deserializeable_response: type[R], - async_reader: AsyncByteStream, - deserializer: Callable[[ShapeDeserializer], O], - is_client_mode: bool = True, - ) -> Self: - """Construct an AWSOutputEventStream and decode the initial response. - - :param payload_codec: The codec to decode event payloads with. - :param deserializeable_response: The deserializeable response class. The - deserialize method of this class will be used to deserialize the - initial response from the stream.. - :param initial_response: The deserialized operation response. If this is not - available immediately, use ``AWSOutputEventStream.create``. - :param async_reader: An async reader to read event bytes from. - :param deserializer: A callable to deserialize events with. This should be the - union's deserialize method. - :param is_client_mode: Whether the stream is being constructed for a client or - server implementation. - """ - initial_response_stream = _AWSEventReceiver( - payload_codec=payload_codec, - source=async_reader, - deserializer=deserializeable_response.deserialize, - is_client_mode=is_client_mode, - ) - initial_response = await initial_response_stream.receive() - if initial_response is None: - raise MissingInitialResponse() - - return cls( - payload_codec=payload_codec, - initial_response=initial_response, - async_reader=async_reader, - deserializer=deserializer, - is_client_mode=is_client_mode, + async def receive(self) -> E | None: + if self._closed: + return None + + try: + event = await Event.decode_async(self._source) + except Exception as e: + await self.close() + if not isinstance(e, EventError): + raise IOError("Failed to read from stream.") from e + raise + + if event is None: + logger.debug("No event received from the source.") + return None + logger.debug("Received raw event: %s", event) + + deserializer = _EventDeserializer( + event=event, + payload_codec=self._payload_codec, + is_client_mode=self._is_client_mode, ) + result = self._deserializer(deserializer) + logger.debug("Successfully deserialized event: %s", result) + if isinstance(getattr(result, "value"), Exception): + raise result.value # type: ignore + return result + + async def close(self) -> None: + if self._closed: + return + self._closed = True + + if (close := getattr(self._source, "close", None)) is not None: + if asyncio.iscoroutine(result := close()): + await result + + @property + def closed(self) -> bool: + return self._closed diff --git a/packages/aws-event-stream/tests/unit/_private/test_deserializers.py b/packages/aws-event-stream/tests/unit/_private/test_deserializers.py index e4213c0f8..3bd662d5e 100644 --- a/packages/aws-event-stream/tests/unit/_private/test_deserializers.py +++ b/packages/aws-event-stream/tests/unit/_private/test_deserializers.py @@ -8,10 +8,8 @@ from smithy_core.deserializers import DeserializeableShape from smithy_json import JSONCodec -from aws_event_stream._private.deserializers import ( - AWSAsyncEventReceiver, - EventDeserializer, -) +from aws_event_stream.aio import AWSEventReceiver +from aws_event_stream._private.deserializers import EventDeserializer from aws_event_stream.events import Event, EventMessage from aws_event_stream.exceptions import UnmodeledEventError @@ -30,7 +28,7 @@ async def test_event_receiver(expected: DeserializeableShape, given: EventMessage): source = AsyncBytesReader(given.encode()) deserializer = EventStreamDeserializer() - receiver = AWSAsyncEventReceiver[Any]( + receiver = AWSEventReceiver[Any]( payload_codec=JSONCodec(), source=source, deserializer=deserializer.deserialize ) @@ -94,7 +92,7 @@ def test_deserialize_unmodeled_error(): async def test_receiver_closes_source() -> None: source = AsyncBytesReader(b"") deserializer = EventStreamDeserializer() - receiver = AWSAsyncEventReceiver[Any]( + receiver = AWSEventReceiver[Any]( payload_codec=JSONCodec(), source=source, deserializer=deserializer.deserialize ) assert not receiver.closed @@ -107,7 +105,7 @@ async def test_receiver_closes_source() -> None: async def test_read_closed_receiver() -> None: source = AsyncBytesReader(b"") deserializer = EventStreamDeserializer() - receiver = AWSAsyncEventReceiver[Any]( + receiver = AWSEventReceiver[Any]( payload_codec=JSONCodec(), source=source, deserializer=deserializer.deserialize ) @@ -119,7 +117,7 @@ async def test_read_closed_receiver() -> None: async def test_read_closed_receiver_source() -> None: source = AsyncBytesReader(b"") deserializer = EventStreamDeserializer() - receiver = AWSAsyncEventReceiver[Any]( + receiver = AWSEventReceiver[Any]( payload_codec=JSONCodec(), source=source, deserializer=deserializer.deserialize ) diff --git a/packages/aws-event-stream/tests/unit/_private/test_serializers.py b/packages/aws-event-stream/tests/unit/_private/test_serializers.py index a8134acdf..5421940ea 100644 --- a/packages/aws-event-stream/tests/unit/_private/test_serializers.py +++ b/packages/aws-event-stream/tests/unit/_private/test_serializers.py @@ -7,10 +7,8 @@ from smithy_core.aio.types import AsyncBytesProvider from smithy_json import JSONCodec -from aws_event_stream._private.serializers import ( - EventSerializer, - AWSAsyncEventPublisher, -) +from aws_event_stream.aio import AWSEventPublisher +from aws_event_stream._private.serializers import EventSerializer from aws_event_stream.events import EventMessage from . import EVENT_STREAM_SERDE_CASES, INITIAL_REQUEST_CASE, INITIAL_RESPONSE_CASE @@ -46,7 +44,7 @@ def test_serialize_initial_response(): async def test_publisher_closes_reader(): writer = AsyncBytesProvider() - publisher: AWSAsyncEventPublisher[Any] = AWSAsyncEventPublisher( + publisher: AWSEventPublisher[Any] = AWSEventPublisher( payload_codec=JSONCodec(), async_writer=writer ) @@ -59,7 +57,7 @@ async def test_publisher_closes_reader(): async def test_send_after_close(): writer = AsyncBytesProvider() - publisher: AWSAsyncEventPublisher[Any] = AWSAsyncEventPublisher( + publisher: AWSEventPublisher[Any] = AWSEventPublisher( payload_codec=JSONCodec(), async_writer=writer ) @@ -71,7 +69,7 @@ async def test_send_after_close(): async def test_send_to_closed_writer(): writer = AsyncBytesProvider() - publisher: AWSAsyncEventPublisher[Any] = AWSAsyncEventPublisher( + publisher: AWSEventPublisher[Any] = AWSEventPublisher( payload_codec=JSONCodec(), async_writer=writer ) diff --git a/packages/smithy-event-stream/src/smithy_event_stream/aio/interfaces.py b/packages/smithy-core/src/smithy_core/aio/eventstream.py similarity index 52% rename from packages/smithy-event-stream/src/smithy_event_stream/aio/interfaces.py rename to packages/smithy-core/src/smithy_core/aio/eventstream.py index 2c5c4a547..704fdfa35 100644 --- a/packages/smithy-event-stream/src/smithy_event_stream/aio/interfaces.py +++ b/packages/smithy-core/src/smithy_core/aio/eventstream.py @@ -1,75 +1,19 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -from typing import Any, Protocol, Self +from asyncio import Future +from typing import Any, Self -from smithy_core.deserializers import DeserializeableShape -from smithy_core.serializers import SerializeableShape +from ..deserializers import DeserializeableShape +from ..serializers import SerializeableShape +from .interfaces.eventstream import EventPublisher, EventReceiver -class AsyncEventPublisher[E: SerializeableShape](Protocol): - """Asynchronously sends events to a service. - This may be used as a context manager to ensure the stream is closed before exiting. - """ - - async def send(self, event: E) -> None: - """Sends an event to the service. - - :param event: The event to send. - """ - ... - - async def close(self) -> None: - """Closes the event stream.""" - ... - - async def __aenter__(self) -> Self: - return self - - async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any): - await self.close() - - -class AsyncEventReceiver[E: DeserializeableShape](Protocol): - """Asynchronously receives events from a service. - - Events may be received via the ``receive`` method or by using this class as - an async iterable. - - This may also be used as a context manager to ensure the stream is closed before - exiting. - """ - - async def receive(self) -> E | None: - """Receive a single event from the service. - - :returns: An event or None. None indicates that no more events will be sent by - the service. - """ - ... - - async def close(self) -> None: - """Closes the event stream.""" - ... - - async def __anext__(self) -> E: - result = await self.receive() - if result is None: - await self.close() - raise StopAsyncIteration - return result - - def __aiter__(self) -> Self: - return self - - async def __enter__(self) -> Self: - return self - - async def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any): - await self.close() - - -class DuplexEventStream[I: SerializeableShape, O: DeserializeableShape, R](Protocol): +class DuplexEventStream[ + IE: SerializeableShape, + OE: DeserializeableShape, + O: DeserializeableShape, +]: """An event stream that both sends and receives messages. To ensure that streams are closed upon exiting, this class may be used as an async @@ -106,46 +50,36 @@ async def handle_output(stream: EventStream) -> None: return """ - input_stream: AsyncEventPublisher[I] + input_stream: EventPublisher[IE] """An event stream that sends events to the service.""" - # Exposing response and output_stream via @property allows implementations that - # don't have it immediately available to do things like put a future in - # await_output or otherwise reasonably implement that method while still allowing - # them to inherit directly from the protocol class. - _output_stream: AsyncEventReceiver[O] | None = None - _response: R | None = None - - @property - def output_stream(self) -> AsyncEventReceiver[O] | None: - """An event stream that receives events from the service. - - This value may be None until ``await_output`` has been called. + output_stream: EventReceiver[OE] | None = None + """An event stream that receives events from the service. - This value will also be None if the operation has no output stream. - """ - return self._output_stream + This value may be None until ``await_output`` has been called. - @output_stream.setter - def output_stream(self, value: AsyncEventReceiver[O]) -> None: - self._output_stream = value + This value will also be None if the operation has no output stream. + """ - @property - def response(self) -> R | None: - """The initial response from the service. + response: O | None = None + """The initial response from the service. - This value may be None until ``await_output`` has been called. + This value may be None until ``await_output`` has been called. - This may include context necessary to interpret output events or prepare - input events. It will always be available before any events. - """ - return self._response + This may include context necessary to interpret output events or prepare + input events. It will always be available before any events. + """ - @response.setter - def response(self, value: R) -> None: - self._response = value + def __init__( + self, + *, + input_stream: EventPublisher[IE], + output_future: Future[tuple[O, EventReceiver[OE]]], + ) -> None: + self.input_stream = input_stream + self._output_future = output_future - async def await_output(self) -> tuple[R, AsyncEventReceiver[O]]: + async def await_output(self) -> tuple[O, EventReceiver[OE]]: """Await the operation's output. The EventStream will be returned as soon as the input stream is ready to @@ -164,7 +98,8 @@ async def await_output(self) -> tuple[R, AsyncEventReceiver[O]]: :returns: A tuple containing the initial response and output stream. If the operation has no output stream, the second value will be None. """ - ... + self.response, self.output_stream = await self._output_future + return self.response, self.output_stream async def close(self) -> None: """Closes the event stream. @@ -184,7 +119,7 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any): await self.close() -class InputEventStream[I: SerializeableShape, R](Protocol): +class InputEventStream[IE: SerializeableShape, O]: """An event stream that streams messages to the service. To ensure that streams are closed upon exiting, this class may be used as an async @@ -203,47 +138,41 @@ async def main(): await stream.await_output() """ - input_stream: AsyncEventPublisher[I] + input_stream: EventPublisher[IE] """An event stream that sends events to the service.""" - # Exposing response via @property allows implementations that don't have it - # immediately available to do things like put a future in await_output or - # otherwise reasonably implement that method while still allowing them to - # inherit directly from the protocol class. - _response: R | None = None - - @property - def response(self) -> R | None: - """The initial response from the service. - - This value may be None until ``await_output`` has been called. + response: O | None = None + """The initial response from the service. - This may include context necessary to interpret output events or prepare - input events. It will always be available before any events. - """ - return self._response + This value may be None until ``await_output`` has been called. - @response.setter - def response(self, value: R) -> None: - self._response = value + This may include context necessary to interpret output events or prepare + input events. It will always be available before any events. + """ - async def await_output(self) -> R: - """Await the operation's output. + def __init__( + self, + *, + input_stream: EventPublisher[IE], + output_future: Future[O], + ) -> None: + self.input_stream = input_stream + self._output_future = output_future - The InputEventStream will be returned as soon as the input stream is ready to - receive events, which may be before the initial response has been received. + async def await_output(self) -> O: + """Await the operation's initial response. - Awaiting this method will wait until the initial response was received. The - operation response will be returned by this operation and also cached in - ``response``. + The EventStream will be returned as soon as the input stream is ready to receive + events, which may be before the initial response has been received and the + service is ready to send events. - The default implementation of this method performs the caching behavior, - delegating to the abstract ``_await_output`` method to actually retrieve the - operation response. + Awaiting this method will wait until the initial response was received. - :returns: The operation's response. + :returns: The service's initial response. """ - ... + if self.response is None: + self.response = await self._output_future + return self.response async def close(self) -> None: """Closes the event stream.""" @@ -256,7 +185,7 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any): await self.close() -class OutputEventStream[O: DeserializeableShape, R](Protocol): +class OutputEventStream[OE: DeserializeableShape, O: DeserializeableShape]: """An event stream that streams messages from the service. To ensure that streams are closed upon exiting, this class may be used as an async @@ -277,19 +206,23 @@ async def main(): return """ - output_stream: AsyncEventReceiver[O] + output_stream: EventReceiver[OE] """An event stream that receives events from the service. This value will also be None if the operation has no output stream. """ - response: R + response: O """The initial response from the service. This may include context necessary to interpret output events or prepare input events. It will always be available before any events. """ + def __init__(self, output_stream: EventReceiver[OE], output: O) -> None: + self.output_stream = output_stream + self.response = output + async def close(self) -> None: """Closes the event stream.""" await self.output_stream.close() diff --git a/packages/smithy-core/src/smithy_core/aio/interfaces/__init__.py b/packages/smithy-core/src/smithy_core/aio/interfaces/__init__.py index 359e39329..bd808e42a 100644 --- a/packages/smithy-core/src/smithy_core/aio/interfaces/__init__.py +++ b/packages/smithy-core/src/smithy_core/aio/interfaces/__init__.py @@ -1,17 +1,21 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 from collections.abc import AsyncIterable -from typing import Protocol, runtime_checkable, TYPE_CHECKING +from typing import Protocol, runtime_checkable, TYPE_CHECKING, Callable +from ...exceptions import UnsupportedStreamException from ...interfaces import URI, Endpoint, TypedProperties from ...interfaces import StreamingBlob as SyncStreamingBlob from ...documents import TypeRegistry +from .eventstream import EventPublisher, EventReceiver + + if TYPE_CHECKING: from ...schemas import APIOperation from ...shapes import ShapeID from ...serializers import SerializeableShape - from ...deserializers import DeserializeableShape + from ...deserializers import DeserializeableShape, ShapeDeserializer @runtime_checkable @@ -138,3 +142,49 @@ async def deserialize_response[ :param context: A context bag for the request. """ ... + + def create_event_publisher[ + OperationInput: "SerializeableShape", + OperationOutput: "DeserializeableShape", + Event: "SerializeableShape", + ]( + self, + *, + operation: "APIOperation[OperationInput, OperationOutput]", + request: I, + event_type: type[Event], + context: TypedProperties, + ) -> EventPublisher[Event]: + """Creates an event publisher for a protocol event stream. + + :param operation: The event stream operation. + :param request: The transport request that was sent for this stream. + :param event_type: The type of event to publish. + :param context: A context bag for the request. + """ + raise UnsupportedStreamException() + + def create_event_receiver[ + OperationInput: "SerializeableShape", + OperationOutput: "DeserializeableShape", + Event: "DeserializeableShape", + ]( + self, + *, + operation: "APIOperation[OperationInput, OperationOutput]", + request: I, + response: O, + event_type: type[Event], + event_deserializer: Callable[["ShapeDeserializer"], Event], + context: TypedProperties, + ) -> EventReceiver[Event]: + """Creates an event receiver for a protocol event stream. + + :param operation: The event stream operation. + :param request: The transport request that was sent for this stream. + :param response: The transport response that was received for this stream. + :param event_type: The type of event to publish. + :param event_deserializer: The deserializer to be used to deserialize events. + :param context: A context bag for the request. + """ + raise UnsupportedStreamException() diff --git a/packages/smithy-core/src/smithy_core/aio/interfaces/eventstream.py b/packages/smithy-core/src/smithy_core/aio/interfaces/eventstream.py new file mode 100644 index 000000000..d0aaf8099 --- /dev/null +++ b/packages/smithy-core/src/smithy_core/aio/interfaces/eventstream.py @@ -0,0 +1,69 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from typing import Any, Protocol, Self + +from ...deserializers import DeserializeableShape +from ...serializers import SerializeableShape + + +class EventPublisher[E: SerializeableShape](Protocol): + """Asynchronously sends events to a service. + + This may be used as a context manager to ensure the stream is closed before exiting. + """ + + async def send(self, event: E) -> None: + """Sends an event to the service. + + :param event: The event to send. + """ + ... + + async def close(self) -> None: + """Closes the event stream.""" + ... + + async def __aenter__(self) -> Self: + return self + + async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any): + await self.close() + + +class EventReceiver[E: DeserializeableShape](Protocol): + """Asynchronously receives events from a service. + + Events may be received via the ``receive`` method or by using this class as + an async iterable. + + This may also be used as a context manager to ensure the stream is closed before + exiting. + """ + + async def receive(self) -> E | None: + """Receive a single event from the service. + + :returns: An event or None. None indicates that no more events will be sent by + the service. + """ + ... + + async def close(self) -> None: + """Closes the event stream.""" + ... + + async def __anext__(self) -> E: + result = await self.receive() + if result is None: + await self.close() + raise StopAsyncIteration + return result + + def __aiter__(self) -> Self: + return self + + async def __enter__(self) -> Self: + return self + + async def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any): + await self.close() diff --git a/packages/smithy-event-stream/NOTICE b/packages/smithy-event-stream/NOTICE deleted file mode 100644 index 616fc5889..000000000 --- a/packages/smithy-event-stream/NOTICE +++ /dev/null @@ -1 +0,0 @@ -Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. diff --git a/packages/smithy-event-stream/README.md b/packages/smithy-event-stream/README.md deleted file mode 100644 index e69de29bb..000000000 diff --git a/packages/smithy-event-stream/pyproject.toml b/packages/smithy-event-stream/pyproject.toml deleted file mode 100644 index 8e02c864f..000000000 --- a/packages/smithy-event-stream/pyproject.toml +++ /dev/null @@ -1,21 +0,0 @@ -[project] -name = "smithy-event-stream" -version = "0.0.1" -description = "Smithy event stream interfaces and core components." -readme = "README.md" -requires-python = ">=3.12" -dependencies = [ - "smithy-core", -] - -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[tool.hatch.build] -exclude = [ - "tests", -] - -[tool.ruff] -src = ["src"] diff --git a/packages/smithy-event-stream/src/smithy_event_stream/__init__.py b/packages/smithy-event-stream/src/smithy_event_stream/__init__.py deleted file mode 100644 index a0e0eb6c0..000000000 --- a/packages/smithy-event-stream/src/smithy_event_stream/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -# SPDX-License-Identifier: Apache-2.0 - -import importlib.metadata - -__version__: str = importlib.metadata.version("smithy-event-stream") diff --git a/packages/smithy-event-stream/src/smithy_event_stream/aio/__init__.py b/packages/smithy-event-stream/src/smithy_event_stream/aio/__init__.py deleted file mode 100644 index 04f8b7b76..000000000 --- a/packages/smithy-event-stream/src/smithy_event_stream/aio/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -# SPDX-License-Identifier: Apache-2.0 diff --git a/packages/smithy-event-stream/src/smithy_event_stream/py.typed b/packages/smithy-event-stream/src/smithy_event_stream/py.typed deleted file mode 100644 index e69de29bb..000000000 diff --git a/packages/smithy-event-stream/tests/py.typed b/packages/smithy-event-stream/tests/py.typed deleted file mode 100644 index e69de29bb..000000000 diff --git a/uv.lock b/uv.lock index 788a60bc5..d716ebc20 100644 --- a/uv.lock +++ b/uv.lock @@ -8,7 +8,6 @@ members = [ "aws-sdk-signers", "smithy-aws-core", "smithy-core", - "smithy-event-stream", "smithy-http", "smithy-json", "smithy-python", @@ -99,14 +98,10 @@ version = "0.0.1" source = { editable = "packages/aws-event-stream" } dependencies = [ { name = "smithy-core" }, - { name = "smithy-event-stream" }, ] [package.metadata] -requires-dist = [ - { name = "smithy-core", editable = "packages/smithy-core" }, - { name = "smithy-event-stream", editable = "packages/smithy-event-stream" }, -] +requires-dist = [{ name = "smithy-core", editable = "packages/smithy-core" }] [[package]] name = "aws-sdk-signers" @@ -680,17 +675,6 @@ dev = [ [package.metadata.requires-dev] dev = [{ name = "freezegun", specifier = ">=1.5.1" }] -[[package]] -name = "smithy-event-stream" -version = "0.0.1" -source = { editable = "packages/smithy-event-stream" } -dependencies = [ - { name = "smithy-core" }, -] - -[package.metadata] -requires-dist = [{ name = "smithy-core", editable = "packages/smithy-core" }] - [[package]] name = "smithy-http" version = "0.0.1" From 1349b0bd960816532b0b3715d2641cb75fbb9894 Mon Sep 17 00:00:00 2001 From: JordonPhillips Date: Tue, 18 Mar 2025 17:27:09 +0100 Subject: [PATCH 2/3] Update generated stream wrappers --- .../python/codegen/ClientGenerator.java | 42 ++++++++++++++++--- .../codegen/generators/ProtocolGenerator.java | 2 - .../RestJsonProtocolGenerator.java | 33 +++------------ 3 files changed, 42 insertions(+), 35 deletions(-) diff --git a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/ClientGenerator.java b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/ClientGenerator.java index 9f35ea781..0c82a2209 100644 --- a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/ClientGenerator.java +++ b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/ClientGenerator.java @@ -200,6 +200,13 @@ def _classify_error( if (hasStreaming) { writer.addStdlibImports("typing", Set.of("Any", "Awaitable")); writer.addStdlibImport("asyncio"); + + writer.addImports("smithy_core.aio.eventstream", + Set.of( + "InputEventStream", + "OutputEventStream", + "DuplexEventStream")); + writer.addImport("smithy_core.aio.interfaces.eventstream", "EventReceiver"); writer.write( """ async def _input_stream[Input: SerializeableShape, Output: DeserializeableShape]( @@ -218,6 +225,10 @@ def _classify_error( )) request_context = await request_future ${5C|} + return InputEventStream[Any, Any]( + input_stream=publisher, + output_future=awaitable_output, + ) async def _output_stream[Input: SerializeableShape, Output: DeserializeableShape]( self, @@ -236,6 +247,10 @@ def _classify_error( ) transport_response = await response_future ${6C|} + return OutputEventStream[Any, Any]( + output_stream=receiver, + output=output + ) async def _duplex_stream[Input: SerializeableShape, Output: DeserializeableShape]( self, @@ -255,15 +270,34 @@ def _classify_error( response_future=response_future )) request_context = await request_future - ${7C|} + ${5C|} + output_future = asyncio.create_task(self._wrap_duplex_output( + response_future, awaitable_output, config, operation_name, + event_deserializer + )) + return DuplexEventStream[Any, Any, Any]( + input_stream=publisher, + output_future=output_future, + ) + + async def _wrap_duplex_output( + self, + response_future: Future[$3T], + awaitable_output: Future[Any], + config: $4T, + operation_name: str, + event_deserializer: Callable[[ShapeDeserializer], Any], + ) -> tuple[Any, EventReceiver[Any]]: + transport_response = await response_future + ${6C|} + return await awaitable_output, receiver """, pluginSymbol, transportRequest, transportResponse, configSymbol, writer.consumer(w -> context.protocolGenerator().wrapInputStream(context, w)), - writer.consumer(w -> context.protocolGenerator().wrapOutputStream(context, w)), - writer.consumer(w -> context.protocolGenerator().wrapDuplexStream(context, w))); + writer.consumer(w -> context.protocolGenerator().wrapOutputStream(context, w))); } writer.addStdlibImport("typing", "Any"); writer.addStdlibImport("asyncio", "iscoroutine"); @@ -872,7 +906,6 @@ private void generateEventStreamOperation(PythonWriter writer, OperationShape op if (inputStreamSymbol != null) { if (outputStreamSymbol != null) { - writer.addImport("smithy_event_stream.aio.interfaces", "DuplexEventStream"); writer.write(""" async def ${operationName:L}( self, @@ -922,7 +955,6 @@ raise NotImplementedError() """, writer.consumer(w -> writeSharedOperationInit(w, operation, input))); } } else { - writer.addImport("smithy_event_stream.aio.interfaces", "OutputEventStream"); writer.write(""" async def ${operationName:L}( self, diff --git a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ProtocolGenerator.java b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ProtocolGenerator.java index 0006e498f..48fe8b663 100644 --- a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ProtocolGenerator.java +++ b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ProtocolGenerator.java @@ -157,6 +157,4 @@ default void generateProtocolTests(GenerationContext context) {} default void wrapInputStream(GenerationContext context, PythonWriter writer) {} default void wrapOutputStream(GenerationContext context, PythonWriter writer) {} - - default void wrapDuplexStream(GenerationContext context, PythonWriter writer) {} } diff --git a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/integrations/RestJsonProtocolGenerator.java b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/integrations/RestJsonProtocolGenerator.java index ef55e15d7..0fa2e312a 100644 --- a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/integrations/RestJsonProtocolGenerator.java +++ b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/integrations/RestJsonProtocolGenerator.java @@ -396,13 +396,12 @@ public void wrapInputStream(GenerationContext context, PythonWriter writer) { writer.addImport("smithy_json", "JSONCodec"); writer.addImport("smithy_core.aio.types", "AsyncBytesReader"); writer.addImport("smithy_core.types", "TimestampFormat"); - writer.addImport("aws_event_stream.aio", "AWSInputEventStream"); + writer.addImport("aws_event_stream.aio", "AWSEventPublisher"); writer.write( """ codec = JSONCodec(default_timestamp_format=TimestampFormat.EPOCH_SECONDS) - return AWSInputEventStream[Any, Any]( + publisher = AWSEventPublisher[Any]( payload_codec=codec, - awaitable_output=awaitable_output, async_writer=request_context.transport_request.body, # type: ignore ) """); @@ -415,39 +414,17 @@ public void wrapOutputStream(GenerationContext context, PythonWriter writer) { writer.addImport("smithy_json", "JSONCodec"); writer.addImport("smithy_core.aio.types", "AsyncBytesReader"); writer.addImport("smithy_core.types", "TimestampFormat"); - writer.addImport("aws_event_stream.aio", "AWSOutputEventStream"); + writer.addImport("aws_event_stream.aio", "AWSEventReceiver"); writer.write( """ codec = JSONCodec(default_timestamp_format=TimestampFormat.EPOCH_SECONDS) - return AWSOutputEventStream[Any, Any]( + receiver = AWSEventReceiver( payload_codec=codec, - initial_response=output, - async_reader=AsyncBytesReader( + source=AsyncBytesReader( transport_response.body # type: ignore ), deserializer=event_deserializer, # type: ignore ) """); } - - @Override - public void wrapDuplexStream(GenerationContext context, PythonWriter writer) { - writer.addDependency(SmithyPythonDependency.SMITHY_JSON); - writer.addDependency(SmithyPythonDependency.AWS_EVENT_STREAM); - writer.addImport("smithy_json", "JSONCodec"); - writer.addImport("smithy_core.aio.types", "AsyncBytesReader"); - writer.addImport("smithy_core.types", "TimestampFormat"); - writer.addImport("aws_event_stream.aio", "AWSDuplexEventStream"); - writer.write( - """ - codec = JSONCodec(default_timestamp_format=TimestampFormat.EPOCH_SECONDS) - return AWSDuplexEventStream[Any, Any, Any]( - payload_codec=codec, - async_writer=request_context.transport_request.body, # type: ignore - awaitable_output=awaitable_output, - awaitable_response=response_future, - deserializer=event_deserializer, # type: ignore - ) - """); - } } From e6cf8508747d9ab163eb1a53cc384b5fdddebc54 Mon Sep 17 00:00:00 2001 From: JordonPhillips Date: Wed, 19 Mar 2025 19:08:43 +0100 Subject: [PATCH 3/3] Rename resposne to output in event stream wrappers --- designs/event-streams.md | 14 +++++++------- .../src/smithy_core/aio/eventstream.py | 18 +++++++++--------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/designs/event-streams.md b/designs/event-streams.md index 89eb8025d..d70465f8d 100644 --- a/designs/event-streams.md +++ b/designs/event-streams.md @@ -181,7 +181,7 @@ class DuplexEventStream[ input_stream: EventPublisher[IE] output_stream: EventReceiver[OE] | None = None - response: O | None = None + output: O | None = None def __init__( self, @@ -208,7 +208,7 @@ class DuplexEventStream[ class InputEventStream[IE: SerializeableShape, O]: input_stream: EventPublisher[IE] - response: O | None = None + output: O | None = None def __init__( self, @@ -235,11 +235,11 @@ class InputEventStream[IE: SerializeableShape, O]: class OutputEventStream[OE: DeserializeableShape, O: DeserializeableShape]: output_stream: EventReceiver[OE] - response: O + output: O def __init__(self, output_stream: EventReceiver[OE], output: O) -> None: self.output_stream = output_stream - self.response = output + self.output = output async def close(self) -> None: await self.output_stream.close() @@ -257,7 +257,7 @@ the underlying publisher and/or receiver. Both `InputEventStream` and `DuplexEventStream` have an `await_output` method that waits for the initial request to be received, returning that and the output -stream. Their `response` and `output_stream` properties will not be set until +stream. Their `output` and `output_stream` properties will not be set until then. This is important because clients MUST be able to start sending events to the service immediately, without waiting for the initial response. This is critical because there are existing services that require one or more events to @@ -277,8 +277,8 @@ with await client.input_operation() as stream: stream.input_stream.send(FooEvent(foo="bar")) ``` -The `OutputEventStream`'s initial `response` and `output_stream` will never be -`None`, however. Instead, the `ClientProtocol` MUST set values for these when +The `OutputEventStream`'s `output` and `output_stream` will never be `None`, +however. Instead, the `ClientProtocol` MUST set values for these when constructing the object. This differs from the other stream types because the lack of an input stream means that the service has nothing to wait on from the client before sending responses. diff --git a/packages/smithy-core/src/smithy_core/aio/eventstream.py b/packages/smithy-core/src/smithy_core/aio/eventstream.py index 704fdfa35..1f1057134 100644 --- a/packages/smithy-core/src/smithy_core/aio/eventstream.py +++ b/packages/smithy-core/src/smithy_core/aio/eventstream.py @@ -61,7 +61,7 @@ async def handle_output(stream: EventStream) -> None: This value will also be None if the operation has no output stream. """ - response: O | None = None + output: O | None = None """The initial response from the service. This value may be None until ``await_output`` has been called. @@ -98,8 +98,8 @@ async def await_output(self) -> tuple[O, EventReceiver[OE]]: :returns: A tuple containing the initial response and output stream. If the operation has no output stream, the second value will be None. """ - self.response, self.output_stream = await self._output_future - return self.response, self.output_stream + self.output, self.output_stream = await self._output_future + return self.output, self.output_stream async def close(self) -> None: """Closes the event stream. @@ -141,7 +141,7 @@ async def main(): input_stream: EventPublisher[IE] """An event stream that sends events to the service.""" - response: O | None = None + output: O | None = None """The initial response from the service. This value may be None until ``await_output`` has been called. @@ -170,9 +170,9 @@ async def await_output(self) -> O: :returns: The service's initial response. """ - if self.response is None: - self.response = await self._output_future - return self.response + if self.output is None: + self.output = await self._output_future + return self.output async def close(self) -> None: """Closes the event stream.""" @@ -212,7 +212,7 @@ async def main(): This value will also be None if the operation has no output stream. """ - response: O + output: O """The initial response from the service. This may include context necessary to interpret output events or prepare input @@ -221,7 +221,7 @@ async def main(): def __init__(self, output_stream: EventReceiver[OE], output: O) -> None: self.output_stream = output_stream - self.response = output + self.output = output async def close(self) -> None: """Closes the event stream."""