From 64c526dce24d29e9b7c31965653683c6eada9eb3 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Sun, 24 Apr 2022 22:04:52 +0300 Subject: [PATCH 1/5] removed positional arguments option from transport factories (aiohttp, quart) --- rsocket/frame.py | 7 +++++-- rsocket/frame_builders.py | 9 +++++++-- rsocket/load_balancer/load_balancer_rsocket.py | 8 ++++---- rsocket/rsocket.py | 4 ++-- rsocket/rsocket_base.py | 15 +++++++++++---- rsocket/rx_support/rx_rsocket.py | 8 ++++---- tests/rsocket/test_fire_and_forget.py | 13 +++++++++++++ tests/rsocket/test_metadata_push.py | 16 ++++++++++++++++ tests/rx_support/test_rx_support.py | 4 ++-- 9 files changed, 64 insertions(+), 20 deletions(-) diff --git a/rsocket/frame.py b/rsocket/frame.py index 3b4f5a74..32ecd8b2 100644 --- a/rsocket/frame.py +++ b/rsocket/frame.py @@ -1,6 +1,7 @@ import abc import struct from abc import ABCMeta +from asyncio import Future from enum import IntEnum, unique from typing import Tuple, Optional @@ -86,7 +87,8 @@ class Frame(Header, metaclass=ABCMeta): 'data', 'flags_follows', 'flags_complete', - 'metadata_only' + 'metadata_only', + 'sent_future' ) def __init__(self, frame_type: FrameType): @@ -100,9 +102,10 @@ def __init__(self, frame_type: FrameType): self.flags_metadata = False self.flags_follows = False self.flags_complete = False - self.metadata_only = False + self.sent_future: Optional[Future] = None + def parse_metadata(self, buffer: bytes, offset: int) -> int: if not self.flags_metadata: return 0 diff --git a/rsocket/frame_builders.py b/rsocket/frame_builders.py index 0df4fe35..a6c2b2fe 100644 --- a/rsocket/frame_builders.py +++ b/rsocket/frame_builders.py @@ -6,6 +6,7 @@ RequestFireAndForgetFrame, SetupFrame, MetadataPushFrame, KeepAliveFrame, MAX_REQUEST_N) +from rsocket.helpers import create_future from rsocket.payload import Payload @@ -69,11 +70,13 @@ def to_request_response_frame(stream_id: int, payload: Payload): return request -def to_fire_and_forget_frame(stream_id: int, payload: Payload): +def to_fire_and_forget_frame(stream_id: int, payload: Payload) -> RequestFireAndForgetFrame: frame = RequestFireAndForgetFrame() frame.stream_id = stream_id frame.data = payload.data frame.metadata = payload.metadata + frame.sent_future = create_future() + return frame @@ -95,9 +98,11 @@ def to_setup_frame(payload, return setup -def to_metadata_push_frame(metadata: bytes): +def to_metadata_push_frame(metadata: bytes) -> MetadataPushFrame: frame = MetadataPushFrame() frame.metadata = metadata + frame.sent_future = create_future() + return frame diff --git a/rsocket/load_balancer/load_balancer_rsocket.py b/rsocket/load_balancer/load_balancer_rsocket.py index c4e38caa..d9b98d7b 100644 --- a/rsocket/load_balancer/load_balancer_rsocket.py +++ b/rsocket/load_balancer/load_balancer_rsocket.py @@ -21,14 +21,14 @@ def request_channel(self, payload: Payload, local_publisher: Optional[Publisher] def request_response(self, payload: Payload) -> Future: return self._select_client().request_response(payload) - def fire_and_forget(self, payload: Payload): - self._select_client().fire_and_forget(payload) + def fire_and_forget(self, payload: Payload) -> Future: + return self._select_client().fire_and_forget(payload) def request_stream(self, payload: Payload) -> Union[BackpressureApi, Publisher]: return self._select_client().request_stream(payload) - def metadata_push(self, metadata: bytes): - self._select_client().metadata_push(metadata) + def metadata_push(self, metadata: bytes) -> Future: + return self._select_client().metadata_push(metadata) async def connect(self): await self._strategy.connect() diff --git a/rsocket/rsocket.py b/rsocket/rsocket.py index b3a80b6b..daea1530 100644 --- a/rsocket/rsocket.py +++ b/rsocket/rsocket.py @@ -21,7 +21,7 @@ def request_response(self, payload: Payload) -> Future: ... @abc.abstractmethod - def fire_and_forget(self, payload: Payload): + def fire_and_forget(self, payload: Payload) -> Future: ... @abc.abstractmethod @@ -29,7 +29,7 @@ def request_stream(self, payload: Payload) -> Union[BackpressureApi, Publisher]: ... @abc.abstractmethod - def metadata_push(self, metadata: bytes): + def metadata_push(self, metadata: bytes) -> Future: ... @abc.abstractmethod diff --git a/rsocket/rsocket_base.py b/rsocket/rsocket_base.py index e0c417e1..3fcc1b5e 100644 --- a/rsocket/rsocket_base.py +++ b/rsocket/rsocket_base.py @@ -390,6 +390,9 @@ async def _sender(self): log_frame(frame, self._log_identifier(), 'Sent') self._send_queue.task_done() + if frame.sent_future is not None: + frame.sent_future.set_result(None) + if self._send_queue.empty(): await transport.on_send_queue_empty() except RSocketTransportError as exception: @@ -441,12 +444,14 @@ def request_response(self, payload: Payload) -> Future: self.register_new_stream(requester).setup() return requester.run() - def fire_and_forget(self, payload: Payload): + def fire_and_forget(self, payload: Payload) -> Future: logger().debug('%s: fire-and-forget: %s', self._log_identifier(), payload) stream_id = self._allocate_stream() - self.send_request(to_fire_and_forget_frame(stream_id, payload)) + frame = to_fire_and_forget_frame(stream_id, payload) + self.send_request(frame) self.finish_stream(stream_id) + return frame.sent_future def request_stream(self, payload: Payload) -> Union[BackpressureApi, Publisher]: logger().debug('%s: request-stream: %s', self._log_identifier(), payload) @@ -463,10 +468,12 @@ def request_channel( requester = RequestChannelRequester(self, payload, local_publisher) return self.register_new_stream(requester) - def metadata_push(self, metadata: bytes): + def metadata_push(self, metadata: bytes) -> Future: logger().debug('%s: metadata-push: %s', self._log_identifier(), metadata) - self.send_frame(to_metadata_push_frame(metadata)) + frame = to_metadata_push_frame(metadata) + self.send_frame(frame) + return frame.sent_future def _is_frame_allowed_to_send(self, frame: Frame) -> bool: if isinstance(frame, initiate_request_frame_types): diff --git a/rsocket/rx_support/rx_rsocket.py b/rsocket/rx_support/rx_rsocket.py index c3073ac8..69a4d8cd 100644 --- a/rsocket/rx_support/rx_rsocket.py +++ b/rsocket/rx_support/rx_rsocket.py @@ -35,11 +35,11 @@ def request_channel(self, ).initial_request_n(request_limit) return from_rsocket_publisher(response_publisher, request_limit) - def fire_and_forget(self, request: Payload): - self._rsocket.fire_and_forget(request) + def fire_and_forget(self, request: Payload) -> rx.Observable: + return rx.from_future(self._rsocket.fire_and_forget(request)) - def metadata_push(self, metadata: bytes): - self._rsocket.metadata_push(metadata) + def metadata_push(self, metadata: bytes) -> rx.Observable: + return rx.from_future(self._rsocket.metadata_push(metadata)) async def connect(self): return await self._rsocket.connect() diff --git a/tests/rsocket/test_fire_and_forget.py b/tests/rsocket/test_fire_and_forget.py index d8bc0c2a..a07746f0 100644 --- a/tests/rsocket/test_fire_and_forget.py +++ b/tests/rsocket/test_fire_and_forget.py @@ -35,6 +35,19 @@ def handler_factory(socket): assert handler.received_payload.metadata == b'cat' +async def test_request_fire_and_forget_wait(lazy_pipe): + handler: Optional[FireAndForgetHandler] = None + + def handler_factory(socket): + nonlocal handler + handler = FireAndForgetHandler(socket) + return handler + + async with lazy_pipe( + server_arguments={'handler_factory': handler_factory}) as (server, client): + await client.fire_and_forget(Payload(b'dog', b'cat')) + + async def test_request_fire_and_forget_awaitable_client(lazy_pipe): handler: Optional[FireAndForgetHandler] = None diff --git a/tests/rsocket/test_metadata_push.py b/tests/rsocket/test_metadata_push.py index aa046de0..40f5b550 100644 --- a/tests/rsocket/test_metadata_push.py +++ b/tests/rsocket/test_metadata_push.py @@ -31,6 +31,7 @@ def handler_factory(socket): client: RSocketClient = pipe[1] server.set_handler_using_factory(handler_factory) + # noinspection PyAsyncCall client.metadata_push(b'cat') await handler.received.wait() @@ -39,6 +40,21 @@ def handler_factory(socket): assert handler.received_payload.metadata == b'cat' +async def test_metadata_push_await(pipe): + handler: Optional[MetadataPushHandler] = None + + def handler_factory(socket): + nonlocal handler + handler = MetadataPushHandler(socket) + return handler + + server: RSocketServer = pipe[0] + client: RSocketClient = pipe[1] + server.set_handler_using_factory(handler_factory) + + await client.metadata_push(b'cat') + + async def test_metadata_push_awaitable_client(pipe): handler: Optional[MetadataPushHandler] = None diff --git a/tests/rx_support/test_rx_support.py b/tests/rx_support/test_rx_support.py index 87376b82..f0afa22d 100644 --- a/tests/rx_support/test_rx_support.py +++ b/tests/rx_support/test_rx_support.py @@ -200,7 +200,7 @@ async def on_metadata_push(self, payload: Payload): server.set_handler_using_factory(Handler) rx_client = RxRSocket(client) - rx_client.metadata_push(b'request text') + rx_client.metadata_push(b'request text').run() await received_item_event.wait() @@ -221,7 +221,7 @@ async def request_fire_and_forget(self, payload: Payload): server.set_handler_using_factory(Handler) rx_client = RxRSocket(client) - rx_client.fire_and_forget(Payload(b'request text')) + rx_client.fire_and_forget(Payload(b'request text')).run() await received_item_event.wait() From 42b7931eaa8ab46e0d7871cb022deef121d1ff7d Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Sun, 24 Apr 2022 22:22:22 +0300 Subject: [PATCH 2/5] added changelog --- CHANGELOG.rst | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 CHANGELOG.rst diff --git a/CHANGELOG.rst b/CHANGELOG.rst new file mode 100644 index 00000000..aae74057 --- /dev/null +++ b/CHANGELOG.rst @@ -0,0 +1,6 @@ +Changelog +--------- + +v0.3.1 + +- Added ability to await fire_and_forget and push_metadata. Waits until the client finishes sending the frame. \ No newline at end of file From 6ba79565d962be3946d684868773cf34fbd6fa76 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Sun, 24 Apr 2022 22:22:33 +0300 Subject: [PATCH 3/5] added changelog --- CHANGELOG.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index aae74057..e537d697 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,5 +2,6 @@ Changelog --------- v0.3.1 +====== - Added ability to await fire_and_forget and push_metadata. Waits until the client finishes sending the frame. \ No newline at end of file From e542440123cb5c54740a9858d8ce74c7ec7cf0c9 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Sun, 24 Apr 2022 22:25:57 +0300 Subject: [PATCH 4/5] remove .run() from rxrsocket tests --- tests/rx_support/test_rx_support.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/rx_support/test_rx_support.py b/tests/rx_support/test_rx_support.py index f0afa22d..87376b82 100644 --- a/tests/rx_support/test_rx_support.py +++ b/tests/rx_support/test_rx_support.py @@ -200,7 +200,7 @@ async def on_metadata_push(self, payload: Payload): server.set_handler_using_factory(Handler) rx_client = RxRSocket(client) - rx_client.metadata_push(b'request text').run() + rx_client.metadata_push(b'request text') await received_item_event.wait() @@ -221,7 +221,7 @@ async def request_fire_and_forget(self, payload: Payload): server.set_handler_using_factory(Handler) rx_client = RxRSocket(client) - rx_client.fire_and_forget(Payload(b'request text')).run() + rx_client.fire_and_forget(Payload(b'request text')) await received_item_event.wait() From aba4a41f67d09d70f2f4f9d815985c860519a68e Mon Sep 17 00:00:00 2001 From: gabi Date: Mon, 25 Apr 2022 12:03:41 +0300 Subject: [PATCH 5/5] missing await in rx rsocket tests --- tests/rx_support/test_rx_support.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/rx_support/test_rx_support.py b/tests/rx_support/test_rx_support.py index 87376b82..38234e94 100644 --- a/tests/rx_support/test_rx_support.py +++ b/tests/rx_support/test_rx_support.py @@ -200,7 +200,7 @@ async def on_metadata_push(self, payload: Payload): server.set_handler_using_factory(Handler) rx_client = RxRSocket(client) - rx_client.metadata_push(b'request text') + await rx_client.metadata_push(b'request text') await received_item_event.wait() @@ -221,7 +221,7 @@ async def request_fire_and_forget(self, payload: Payload): server.set_handler_using_factory(Handler) rx_client = RxRSocket(client) - rx_client.fire_and_forget(Payload(b'request text')) + await rx_client.fire_and_forget(Payload(b'request text')) await received_item_event.wait()