Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ Changelog

v0.4.15
=======
- Websockets (https://github.com/python-websockets/websockets) server support
- Websockets server support (https://github.com/python-websockets/websockets)
- AsyncWebsockets client support (https://github.com/Fuyukai/asyncwebsockets)

v0.4.14
=======
Expand Down
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@ pip install rsocket

You may also install using some **extras**:

| Extra | Functionality | Documentation |
|-------------|--------------------------------------------------------------------------------------------|---------------------------------------------------------------------|
| rx | ReactiveX ([v3](https://pypi.org/project/Rx/)) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/reactivex) |
| reactivex | [ReactiveX](https://reactivex.io/) ([v4](https://pypi.org/project/reactivex/)) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/reactivex) |
| aiohttp | [aiohttp](https://docs.aiohttp.org/en/stable/) Websocket transport (server/client) | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/websocket) |
| quart | [Quart](https://pgjones.gitlab.io/quart/) Websocket transport (server only) | |
| quic | [QUIC](https://github.com/aiortc/aioquic) transport | |
| websockets | [Websockets](https://github.com/python-websockets/websockets) transport (server only) | |
| cli | Command line | [Tutorial](https://rsocket.io/guides/rsocket-py/cli) |
| optimized | Frame parse/serialize optimizations | |
| cloudevents | [CloudEvents](https://cloudevents.io/) integration | |
| graphql | [GraphQL](https://graphql.org/) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/graphql) |
| Extra | Functionality | Documentation |
|-----------------|--------------------------------------------------------------------------------------------|---------------------------------------------------------------------|
| rx | ReactiveX ([v3](https://pypi.org/project/Rx/)) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/reactivex) |
| reactivex | [ReactiveX](https://reactivex.io/) ([v4](https://pypi.org/project/reactivex/)) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/reactivex) |
| aiohttp | [aiohttp](https://docs.aiohttp.org/en/stable/) Websocket transport (server/client) | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/websocket) |
| quart | [Quart](https://pgjones.gitlab.io/quart/) Websocket transport (server only) | |
| quic | [QUIC](https://github.com/aiortc/aioquic) transport | |
| websockets | [Websockets](https://github.com/python-websockets/websockets) transport (server only) | |
| asyncwebsockets | [Websockets](https://github.com/Fuyukai/asyncwebsockets) transport (client only) | |
| cli | Command line | [Tutorial](https://rsocket.io/guides/rsocket-py/cli) |
| optimized | Frame parse/serialize optimizations | |
| cloudevents | [CloudEvents](https://cloudevents.io/) integration | |
| graphql | [GraphQL](https://graphql.org/) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/graphql) |

For example:

Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ pydantic==1.10.13
Werkzeug==3.0.0
graphql-core==3.2.3
gql==3.4.1
websockets==11.0.3
websockets==11.0.3
asyncwebsockets==0.9.4
56 changes: 56 additions & 0 deletions rsocket/transports/asyncwebsockets_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import asyncio
from contextlib import asynccontextmanager

from rsocket.exceptions import RSocketTransportError
from rsocket.frame import Frame
from rsocket.helpers import wrap_transport_exception, single_transport_provider
from rsocket.logger import logger
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.abstract_messaging import AbstractMessagingTransport


@asynccontextmanager
async def websocket_client(url: str,
**kwargs) -> RSocketClient:
"""
Helper method to instantiate an RSocket client using a websocket url over asyncwebsockets client.
"""
from asyncwebsockets import open_websocket
async with open_websocket(url) as websocket:
async with RSocketClient(single_transport_provider(TransportAsyncWebsocketsClient(websocket)),
**kwargs) as client:
yield client


class TransportAsyncWebsocketsClient(AbstractMessagingTransport):
"""
RSocket transport over client side asyncwebsockets.
"""

def __init__(self, websocket):
super().__init__()
self._ws = websocket
self._message_handler = None

async def connect(self):
self._message_handler = asyncio.create_task(self.handle_incoming_ws_messages())

async def handle_incoming_ws_messages(self):
from wsproto.events import BytesMessage
try:
async for message in self._ws:
if isinstance(message, BytesMessage):
async for frame in self._frame_parser.receive_data(message.data, 0):
self._incoming_frame_queue.put_nowait(frame)
except asyncio.CancelledError:
logger().debug('Asyncio task canceled: incoming_data_listener')
except Exception:
self._incoming_frame_queue.put_nowait(RSocketTransportError())

async def send_frame(self, frame: Frame):
with wrap_transport_exception():
await self._ws.send(frame.serialize())

async def close(self):
self._message_handler.cancel()
await self._message_handler
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ cloudevents =
graphql =
graphql-core>=3.2.0
gql>=3.4.0
websockets =
websockets>=11.0.0
websockets = websockets>=11.0.0
asyncwebsockets = asyncwebsockets>=0.9.4

[options.entry_points]
cli.console_scripts =
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ def setup_logging(level=logging.DEBUG, use_file: bool = False):
setup_logging(logging.WARN)

tested_transports = [
'tcp'
'tcp',
'quart'
]

if sys.version_info[:3] < (3, 11, 5):
tested_transports += [
'aiohttp',
'quart',
'quic',
'http3',
# 'websockets'
Expand Down
4 changes: 2 additions & 2 deletions tests/tools/fixtures_quart.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
async def pipe_factory_quart_websocket(unused_tcp_port, client_arguments=None, server_arguments=None):
from quart import Quart
from rsocket.transports.quart_websocket import websocket_handler
from rsocket.transports.aiohttp_websocket import websocket_client
from rsocket.transports.asyncwebsockets_transport import websocket_client

app = Quart(__name__)
server: Optional[RSocketBase] = None
Expand All @@ -32,7 +32,7 @@ async def ws():
server_task = asyncio.create_task(app.run_task(port=unused_tcp_port))
await asyncio.sleep(0)

async with websocket_client('http://localhost:{}'.format(unused_tcp_port),
async with websocket_client('ws://localhost:{}'.format(unused_tcp_port),
**client_arguments) as client:
await wait_for_server.wait()
yield server, client
Expand Down