Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Changelog
---------

v0.3.1
======

- Added ability to await fire_and_forget and push_metadata. Waits until the client finishes sending the frame.
7 changes: 5 additions & 2 deletions rsocket/frame.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc
import struct
from abc import ABCMeta
from asyncio import Future
from enum import IntEnum, unique
from typing import Tuple, Optional

Expand Down Expand Up @@ -86,7 +87,8 @@ class Frame(Header, metaclass=ABCMeta):
'data',
'flags_follows',
'flags_complete',
'metadata_only'
'metadata_only',
'sent_future'
)

def __init__(self, frame_type: FrameType):
Expand All @@ -100,9 +102,10 @@ def __init__(self, frame_type: FrameType):
self.flags_metadata = False
self.flags_follows = False
self.flags_complete = False

self.metadata_only = False

self.sent_future: Optional[Future] = None

def parse_metadata(self, buffer: bytes, offset: int) -> int:
if not self.flags_metadata:
return 0
Expand Down
9 changes: 7 additions & 2 deletions rsocket/frame_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
RequestFireAndForgetFrame, SetupFrame,
MetadataPushFrame, KeepAliveFrame,
MAX_REQUEST_N)
from rsocket.helpers import create_future
from rsocket.payload import Payload


Expand Down Expand Up @@ -69,11 +70,13 @@ def to_request_response_frame(stream_id: int, payload: Payload):
return request


def to_fire_and_forget_frame(stream_id: int, payload: Payload):
def to_fire_and_forget_frame(stream_id: int, payload: Payload) -> RequestFireAndForgetFrame:
frame = RequestFireAndForgetFrame()
frame.stream_id = stream_id
frame.data = payload.data
frame.metadata = payload.metadata
frame.sent_future = create_future()

return frame


Expand All @@ -95,9 +98,11 @@ def to_setup_frame(payload,
return setup


def to_metadata_push_frame(metadata: bytes):
def to_metadata_push_frame(metadata: bytes) -> MetadataPushFrame:
frame = MetadataPushFrame()
frame.metadata = metadata
frame.sent_future = create_future()

return frame


Expand Down
8 changes: 4 additions & 4 deletions rsocket/load_balancer/load_balancer_rsocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ def request_channel(self, payload: Payload, local_publisher: Optional[Publisher]
def request_response(self, payload: Payload) -> Future:
return self._select_client().request_response(payload)

def fire_and_forget(self, payload: Payload):
self._select_client().fire_and_forget(payload)
def fire_and_forget(self, payload: Payload) -> Future:
return self._select_client().fire_and_forget(payload)

def request_stream(self, payload: Payload) -> Union[BackpressureApi, Publisher]:
return self._select_client().request_stream(payload)

def metadata_push(self, metadata: bytes):
self._select_client().metadata_push(metadata)
def metadata_push(self, metadata: bytes) -> Future:
return self._select_client().metadata_push(metadata)

async def connect(self):
await self._strategy.connect()
Expand Down
4 changes: 2 additions & 2 deletions rsocket/rsocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ def request_response(self, payload: Payload) -> Future:
...

@abc.abstractmethod
def fire_and_forget(self, payload: Payload):
def fire_and_forget(self, payload: Payload) -> Future:
...

@abc.abstractmethod
def request_stream(self, payload: Payload) -> Union[BackpressureApi, Publisher]:
...

@abc.abstractmethod
def metadata_push(self, metadata: bytes):
def metadata_push(self, metadata: bytes) -> Future:
...

@abc.abstractmethod
Expand Down
15 changes: 11 additions & 4 deletions rsocket/rsocket_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ async def _sender(self):
log_frame(frame, self._log_identifier(), 'Sent')
self._send_queue.task_done()

if frame.sent_future is not None:
frame.sent_future.set_result(None)

if self._send_queue.empty():
await transport.on_send_queue_empty()
except RSocketTransportError as exception:
Expand Down Expand Up @@ -441,12 +444,14 @@ def request_response(self, payload: Payload) -> Future:
self.register_new_stream(requester).setup()
return requester.run()

def fire_and_forget(self, payload: Payload):
def fire_and_forget(self, payload: Payload) -> Future:
logger().debug('%s: fire-and-forget: %s', self._log_identifier(), payload)

stream_id = self._allocate_stream()
self.send_request(to_fire_and_forget_frame(stream_id, payload))
frame = to_fire_and_forget_frame(stream_id, payload)
self.send_request(frame)
self.finish_stream(stream_id)
return frame.sent_future

def request_stream(self, payload: Payload) -> Union[BackpressureApi, Publisher]:
logger().debug('%s: request-stream: %s', self._log_identifier(), payload)
Expand All @@ -463,10 +468,12 @@ def request_channel(
requester = RequestChannelRequester(self, payload, local_publisher)
return self.register_new_stream(requester)

def metadata_push(self, metadata: bytes):
def metadata_push(self, metadata: bytes) -> Future:
logger().debug('%s: metadata-push: %s', self._log_identifier(), metadata)

self.send_frame(to_metadata_push_frame(metadata))
frame = to_metadata_push_frame(metadata)
self.send_frame(frame)
return frame.sent_future

def _is_frame_allowed_to_send(self, frame: Frame) -> bool:
if isinstance(frame, initiate_request_frame_types):
Expand Down
8 changes: 4 additions & 4 deletions rsocket/rx_support/rx_rsocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ def request_channel(self,
).initial_request_n(request_limit)
return from_rsocket_publisher(response_publisher, request_limit)

def fire_and_forget(self, request: Payload):
self._rsocket.fire_and_forget(request)
def fire_and_forget(self, request: Payload) -> rx.Observable:
return rx.from_future(self._rsocket.fire_and_forget(request))

def metadata_push(self, metadata: bytes):
self._rsocket.metadata_push(metadata)
def metadata_push(self, metadata: bytes) -> rx.Observable:
return rx.from_future(self._rsocket.metadata_push(metadata))

async def connect(self):
return await self._rsocket.connect()
Expand Down
13 changes: 13 additions & 0 deletions tests/rsocket/test_fire_and_forget.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ def handler_factory(socket):
assert handler.received_payload.metadata == b'cat'


async def test_request_fire_and_forget_wait(lazy_pipe):
handler: Optional[FireAndForgetHandler] = None

def handler_factory(socket):
nonlocal handler
handler = FireAndForgetHandler(socket)
return handler

async with lazy_pipe(
server_arguments={'handler_factory': handler_factory}) as (server, client):
await client.fire_and_forget(Payload(b'dog', b'cat'))


async def test_request_fire_and_forget_awaitable_client(lazy_pipe):
handler: Optional[FireAndForgetHandler] = None

Expand Down
16 changes: 16 additions & 0 deletions tests/rsocket/test_metadata_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def handler_factory(socket):
client: RSocketClient = pipe[1]
server.set_handler_using_factory(handler_factory)

# noinspection PyAsyncCall
client.metadata_push(b'cat')

await handler.received.wait()
Expand All @@ -39,6 +40,21 @@ def handler_factory(socket):
assert handler.received_payload.metadata == b'cat'


async def test_metadata_push_await(pipe):
handler: Optional[MetadataPushHandler] = None

def handler_factory(socket):
nonlocal handler
handler = MetadataPushHandler(socket)
return handler

server: RSocketServer = pipe[0]
client: RSocketClient = pipe[1]
server.set_handler_using_factory(handler_factory)

await client.metadata_push(b'cat')


async def test_metadata_push_awaitable_client(pipe):
handler: Optional[MetadataPushHandler] = None

Expand Down
4 changes: 2 additions & 2 deletions tests/rx_support/test_rx_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ async def on_metadata_push(self, payload: Payload):
server.set_handler_using_factory(Handler)

rx_client = RxRSocket(client)
rx_client.metadata_push(b'request text')
await rx_client.metadata_push(b'request text')

await received_item_event.wait()

Expand All @@ -221,7 +221,7 @@ async def request_fire_and_forget(self, payload: Payload):
server.set_handler_using_factory(Handler)

rx_client = RxRSocket(client)
rx_client.fire_and_forget(Payload(b'request text'))
await rx_client.fire_and_forget(Payload(b'request text'))

await received_item_event.wait()

Expand Down