Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ v0.4.0
- fragment size now includes frame header and length.
- Added checking fragment size limit (minimum 64) as in java implementation
- Updated examples
- Added reactivex (RxPy version 4) wrapper client

v0.3.0
======
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ pip install rsocket

or install any of the extras:

* rx
* rx (RxPy3)
* reactivex (RxPy4)
* aiohttp
* quart
* uic

Example:

```shell
pip install --pre rsocket[rx]
pip install --pre rsocket[reactivex]
```

Alternatively, download the source code, build a package:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ aiohttp==3.8.1
quart==0.17.0
coveralls==3.3.1
aioquic==0.9.19
reactivex==4.0.4
Empty file added rsocket/reactivex/__init__.py
Empty file.
77 changes: 77 additions & 0 deletions rsocket/reactivex/back_pressure_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import asyncio
from typing import Optional

import reactivex
from reactivex import Observable, Observer
from reactivex.notification import OnNext, OnError, OnCompleted
from reactivex.operators import materialize
from reactivex.subject import Subject

from reactivestreams.subscriber import Subscriber
from rsocket.helpers import DefaultPublisherSubscription
from rsocket.logger import logger
from rsocket.reactivex.subscriber_adapter import SubscriberAdapter


async def observable_to_async_event_generator(observable: Observable):
queue = asyncio.Queue()

def on_next(i):
queue.put_nowait(i)

observable.pipe(materialize()).subscribe(
on_next=on_next
)

while True:
value = await queue.get()
yield value
queue.task_done()


def from_aiter(iterator, feedback: Optional[Observable] = None):
# noinspection PyUnusedLocal
def on_subscribe(observer: Observer, scheduler):
async def _aio_next():
try:
event = await iterator.__anext__()

if isinstance(event, OnNext):
observer.on_next(event.value)
elif isinstance(event, OnError):
observer.on_error(event.exception)
elif isinstance(event, OnCompleted):
observer.on_completed()
except StopAsyncIteration:
pass
except Exception as exception:
logger().error(str(exception), exc_info=True)
observer.on_error(exception)

def create_next_task():
asyncio.create_task(_aio_next())

return feedback.subscribe(
on_next=lambda i: create_next_task()
)

return reactivex.create(on_subscribe)


class BackPressurePublisher(DefaultPublisherSubscription):
def __init__(self, wrapped_observable: Observable):
self._wrapped_observable = wrapped_observable
self._feedback = None

def subscribe(self, subscriber: Subscriber):
super().subscribe(subscriber)
self._feedback = Subject()
async_iterator = observable_to_async_event_generator(self._wrapped_observable).__aiter__()
from_aiter(async_iterator, self._feedback).subscribe(SubscriberAdapter(subscriber))

def request(self, n: int):
for i in range(n):
self._feedback.on_next(True)

def cancel(self):
self._feedback.on_completed()
92 changes: 92 additions & 0 deletions rsocket/reactivex/from_rsocket_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import asyncio
import functools

import reactivex
from reactivex import Observable, Observer
from reactivex.disposable import Disposable

from reactivestreams.publisher import Publisher
from reactivestreams.subscriber import Subscriber
from reactivestreams.subscription import Subscription
from rsocket.logger import logger


class RxSubscriber(Subscriber):
def __init__(self, observer, limit_rate: int):
self.limit_rate = limit_rate
self.observer = observer
self._received_messages = 0
self.done = asyncio.Event()
self.get_next_n = asyncio.Event()
self.subscription = None

def on_subscribe(self, subscription: Subscription):
self.subscription = subscription

def on_next(self, value, is_complete=False):
self._received_messages += 1
self.observer.on_next(value)
if is_complete:
self.observer.on_completed()
self._finish()

else:
if self._received_messages == self.limit_rate:
self._received_messages = 0
self.get_next_n.set()

def _finish(self):
self.done.set()

def on_error(self, exception: Exception):
self.observer.on_error(exception)
self._finish()

def on_complete(self):
self.observer.on_completed()
self._finish()


async def _aio_sub(publisher: Publisher, subscriber: RxSubscriber, observer: Observer, loop):
try:
publisher.subscribe(subscriber)
await subscriber.done.wait()

except asyncio.CancelledError:
if not subscriber.done.is_set():
subscriber.subscription.cancel()
except Exception as exception:
loop.call_soon(functools.partial(observer.on_error, exception))


async def _trigger_next_request_n(subscriber, limit_rate):
try:
while True:
await subscriber.get_next_n.wait()
subscriber.subscription.request(limit_rate)
subscriber.get_next_n.clear()
except asyncio.CancelledError:
logger().debug('Asyncio task canceled: trigger_next_request_n')


def from_rsocket_publisher(publisher: Publisher, limit_rate=5) -> Observable:
loop = asyncio.get_event_loop()

# noinspection PyUnusedLocal
def on_subscribe(observer: Observer, scheduler):
subscriber = RxSubscriber(observer, limit_rate)

get_next_task = asyncio.create_task(
_trigger_next_request_n(subscriber, limit_rate)
)
task = asyncio.create_task(
_aio_sub(publisher, subscriber, observer, loop)
)

def dispose():
get_next_task.cancel()
task.cancel()

return Disposable(dispose)

return reactivex.create(on_subscribe)
57 changes: 57 additions & 0 deletions rsocket/reactivex/reactivex_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from asyncio import Future

from typing import Optional, cast

import reactivex
from reactivex import Observable

from rsocket.frame import MAX_REQUEST_N
from rsocket.payload import Payload
from rsocket.rsocket import RSocket
from rsocket.reactivex.back_pressure_publisher import BackPressurePublisher
from rsocket.reactivex.from_rsocket_publisher import from_rsocket_publisher


class ReactiveXClient:
def __init__(self, rsocket: RSocket):
self._rsocket = rsocket

def request_stream(self, request: Payload, request_limit: int = MAX_REQUEST_N) -> Observable:
response_publisher = self._rsocket.request_stream(request).initial_request_n(request_limit)
return from_rsocket_publisher(response_publisher, request_limit)

def request_response(self, request: Payload) -> Observable:
return reactivex.from_future(cast(Future, self._rsocket.request_response(request)))

def request_channel(self,
request: Payload,
request_limit: int = MAX_REQUEST_N,
observable: Optional[Observable] = None) -> Observable:
if observable is not None:
local_publisher = BackPressurePublisher(observable)
else:
local_publisher = None

response_publisher = self._rsocket.request_channel(
request, local_publisher
).initial_request_n(request_limit)
return from_rsocket_publisher(response_publisher, request_limit)

def fire_and_forget(self, request: Payload) -> Observable:
return reactivex.from_future(cast(Future, self._rsocket.fire_and_forget(request)))

def metadata_push(self, metadata: bytes) -> Observable:
return reactivex.from_future(cast(Future, self._rsocket.metadata_push(metadata)))

async def connect(self):
return await self._rsocket.connect()

async def close(self):
await self._rsocket.close()

async def __aenter__(self):
await self._rsocket.__aenter__()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._rsocket.__aexit__(exc_type, exc_val, exc_tb)
17 changes: 17 additions & 0 deletions rsocket/reactivex/subscriber_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from reactivex.abc import ObserverBase

from reactivestreams.subscriber import Subscriber


class SubscriberAdapter(ObserverBase):
def __init__(self, subscriber: Subscriber):
self._subscriber = subscriber

def on_next(self, value):
self._subscriber.on_next(value)

def on_error(self, error):
self._subscriber.on_error(error)

def on_completed(self):
self._subscriber.on_complete()
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
python_requires='>=3.8',
extras_require={
'rx': {'Rx >= 3.0.0'},
'reactivex': {'reactivex >= 4.0.0'},
'aiohttp': {'aiohttp >= 3.0.0'},
'quart': {'quart >= 0.15.0'},
'quic': {'aioquic >= 0.9.0'}
Expand Down
Empty file added tests/rx_support/__init__.py
Empty file.
Empty file.
Loading