Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions rsocket/transports/aioquic_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from aioquic.quic.events import QuicEvent, StreamDataReceived, ConnectionTerminated

from rsocket.exceptions import RSocketTransportError
from rsocket.frame import Frame
from rsocket.frame import Frame, serialize_with_frame_size_header
from rsocket.helpers import wrap_transport_exception, cancel_if_task_exists
from rsocket.logger import logger
from rsocket.rsocket_server import RSocketServer
Expand Down Expand Up @@ -63,7 +63,7 @@ def __init__(self, *args, **kwargs):
self._stream_id = self._quic.get_next_available_stream_id()

async def query(self, frame: Frame) -> None:
data = frame.serialize()
data = serialize_with_frame_size_header(frame)
self._quic.send_stream_data(self._stream_id, data, end_stream=False)
self.transmit()

Expand Down Expand Up @@ -103,7 +103,7 @@ async def incoming_data_listener(self):
self._incoming_frame_queue.put_nowait(data)
return
else:
async for frame in self._frame_parser.receive_data(data, 0):
async for frame in self._frame_parser.receive_data(data):
self._incoming_frame_queue.put_nowait(frame)

except asyncio.CancelledError:
Expand All @@ -116,3 +116,6 @@ async def close(self):
self._quic_protocol.close()

await self._quic_protocol.wait_closed()

def requires_length_header(self) -> bool:
return True
10 changes: 7 additions & 3 deletions rsocket/transports/http3_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from starlette.websockets import WebSocket, WebSocketDisconnect

from rsocket.exceptions import RSocketTransportError
from rsocket.frame import Frame
from rsocket.frame import Frame, serialize_with_frame_size_header
from rsocket.helpers import wrap_transport_exception, cancel_if_task_exists
from rsocket.logger import logger
from rsocket.transports.abstract_messaging import AbstractMessagingTransport
Expand Down Expand Up @@ -159,7 +159,8 @@ def __init__(self, websocket: Union[WebSocket, ClientWebSocket]):
async def send_frame(self, frame: Frame):
with wrap_transport_exception():
try:
await self._websocket.send_bytes(frame.serialize())
data = serialize_with_frame_size_header(frame)
await self._websocket.send_bytes(data)
except WebSocketDisconnect:
self._disconnect_event.set()

Expand All @@ -177,7 +178,7 @@ async def incoming_data_listener(self):
self._disconnect_event.set()
break

async for frame in self._frame_parser.receive_data(data, 0):
async for frame in self._frame_parser.receive_data(data):
self._incoming_frame_queue.put_nowait(frame)

except asyncio.CancelledError:
Expand All @@ -189,3 +190,6 @@ async def incoming_data_listener(self):

async def wait_for_disconnect(self):
await self._disconnect_event.wait()

def requires_length_header(self) -> bool:
return True