diff --git a/rsocket/transports/aioquic_transport.py b/rsocket/transports/aioquic_transport.py index 55534f65..53230772 100644 --- a/rsocket/transports/aioquic_transport.py +++ b/rsocket/transports/aioquic_transport.py @@ -6,7 +6,7 @@ from aioquic.quic.events import QuicEvent, StreamDataReceived, ConnectionTerminated from rsocket.exceptions import RSocketTransportError -from rsocket.frame import Frame +from rsocket.frame import Frame, serialize_with_frame_size_header from rsocket.helpers import wrap_transport_exception, cancel_if_task_exists from rsocket.logger import logger from rsocket.rsocket_server import RSocketServer @@ -63,7 +63,7 @@ def __init__(self, *args, **kwargs): self._stream_id = self._quic.get_next_available_stream_id() async def query(self, frame: Frame) -> None: - data = frame.serialize() + data = serialize_with_frame_size_header(frame) self._quic.send_stream_data(self._stream_id, data, end_stream=False) self.transmit() @@ -103,7 +103,7 @@ async def incoming_data_listener(self): self._incoming_frame_queue.put_nowait(data) return else: - async for frame in self._frame_parser.receive_data(data, 0): + async for frame in self._frame_parser.receive_data(data): self._incoming_frame_queue.put_nowait(frame) except asyncio.CancelledError: @@ -116,3 +116,6 @@ async def close(self): self._quic_protocol.close() await self._quic_protocol.wait_closed() + + def requires_length_header(self) -> bool: + return True diff --git a/rsocket/transports/http3_transport.py b/rsocket/transports/http3_transport.py index 79e24d27..3a179f3f 100644 --- a/rsocket/transports/http3_transport.py +++ b/rsocket/transports/http3_transport.py @@ -17,7 +17,7 @@ from starlette.websockets import WebSocket, WebSocketDisconnect from rsocket.exceptions import RSocketTransportError -from rsocket.frame import Frame +from rsocket.frame import Frame, serialize_with_frame_size_header from rsocket.helpers import wrap_transport_exception, cancel_if_task_exists from rsocket.logger import logger from rsocket.transports.abstract_messaging import AbstractMessagingTransport @@ -159,7 +159,8 @@ def __init__(self, websocket: Union[WebSocket, ClientWebSocket]): async def send_frame(self, frame: Frame): with wrap_transport_exception(): try: - await self._websocket.send_bytes(frame.serialize()) + data = serialize_with_frame_size_header(frame) + await self._websocket.send_bytes(data) except WebSocketDisconnect: self._disconnect_event.set() @@ -177,7 +178,7 @@ async def incoming_data_listener(self): self._disconnect_event.set() break - async for frame in self._frame_parser.receive_data(data, 0): + async for frame in self._frame_parser.receive_data(data): self._incoming_frame_queue.put_nowait(frame) except asyncio.CancelledError: @@ -189,3 +190,6 @@ async def incoming_data_listener(self): async def wait_for_disconnect(self): await self._disconnect_event.wait() + + def requires_length_header(self) -> bool: + return True