Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ v0.4.1
- Performance test examples available in *performance* folder
- WSS (Secure websocket) example and support (aiohttp)
- Refactored Websocket transport to allow providing either url or an existing websocket
- Added command line tool (rsocket-py)

v0.4.0
======
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ or install any of the extras:
* aiohttp
* quart
* quic
* cli

Example:

Expand Down Expand Up @@ -62,7 +63,6 @@ all the examples
| | ServerWithFragmentation | client_with_routing.py | |
| server_quart_websocket.py | | client_websocket.py | |
| server_aiohttp_websocket.py | | client_websocket.py | |
| server_aiohttp_websocket_secure.py | | client_wss.py | |

# Build Status

Expand Down
32 changes: 24 additions & 8 deletions examples/client_websocket.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
import asyncio
import logging
import sys

import aiohttp
import asyncclick as click

from rsocket.helpers import single_transport_provider
from rsocket.payload import Payload
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.aiohttp_websocket import TransportAioHttpClient
from rsocket.transports.aiohttp_websocket import websocket_client


async def application(serve_port):
async with websocket_client('http://localhost:%s' % serve_port) as client:
result = await client.request_response(Payload(b'ping'))
print(result)
async def application(with_ssl: bool, serve_port: int):
if with_ssl:
async with aiohttp.ClientSession() as session:
async with session.ws_connect('wss://localhost:%s' % serve_port, verify_ssl=False) as websocket:
async with RSocketClient(
single_transport_provider(TransportAioHttpClient(websocket=websocket))) as client:
result = await client.request_response(Payload(b'ping'))
print(result)

else:
async with websocket_client('http://localhost:%s' % serve_port) as client:
result = await client.request_response(Payload(b'ping'))
print(result)


if __name__ == '__main__':
port = sys.argv[1] if len(sys.argv) > 1 else 6565
@click.command()
@click.option('--with-ssl', is_flag=False, default=False)
@click.option('--port', is_flag=False, default=6565)
async def command(with_ssl, port: int):
logging.basicConfig(level=logging.DEBUG)
asyncio.run(application(port))
asyncio.run(application(with_ssl, port))
24 changes: 0 additions & 24 deletions examples/client_wss.py

This file was deleted.

40 changes: 35 additions & 5 deletions examples/server_aiohttp_websocket.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import logging
import sys
import ssl

import asyncclick as click
from aiohttp import web

from examples.fixtures import cert_gen
from rsocket.helpers import create_future
from rsocket.local_typing import Awaitable
from rsocket.payload import Payload
from rsocket.request_handler import BaseRequestHandler
from rsocket.transports.aiohttp_websocket import websocket_handler_factory
from rsocket.rsocket_server import RSocketServer
from rsocket.transports.aiohttp_websocket import TransportAioHttpWebsocket


class Handler(BaseRequestHandler):
Expand All @@ -16,9 +19,36 @@ async def request_response(self, payload: Payload) -> Awaitable[Payload]:
return create_future(Payload(b'pong'))


if __name__ == '__main__':
port = sys.argv[1] if len(sys.argv) > 1 else 6565
def websocket_handler_factory( **kwargs):
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
transport = TransportAioHttpWebsocket(ws)
RSocketServer(transport, **kwargs)
await transport.handle_incoming_ws_messages()
return ws

return websocket_handler


@click.command()
@click.option('--port', help='Port to listen on', default=6565, type=int)
@click.option('--with-ssl', is_flag=True, help='Enable SSL mode')
async def start_server(with_ssl: bool, port: int):
logging.basicConfig(level=logging.DEBUG)
app = web.Application()
app.add_routes([web.get('/', websocket_handler_factory(handler_factory=Handler))])
web.run_app(app, port=port)

if with_ssl:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)

with cert_gen() as (certificate, key):
ssl_context.load_cert_chain(certificate, key)
else:
ssl_context = None

await web._run_app(app, port=port, ssl_context=ssl_context)


if __name__ == '__main__':
start_server()
32 changes: 0 additions & 32 deletions examples/server_aiohttp_websocket_secure.py

This file was deleted.

56 changes: 47 additions & 9 deletions examples/server_with_routing.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import asyncio
import logging
import sys
import ssl
from dataclasses import dataclass
from datetime import timedelta
from typing import Optional

import asyncclick as click
from aiohttp import web

from examples.example_fixtures import large_data1
from examples.fixtures import cert_gen
from examples.response_channel import response_stream_1, LoggingSubscriber
from response_stream import response_stream_2
from rsocket.extensions.authentication import Authentication, AuthenticationSimple
Expand All @@ -15,6 +19,7 @@
from rsocket.routing.request_router import RequestRouter
from rsocket.routing.routing_request_handler import RoutingRequestHandler
from rsocket.rsocket_server import RSocketServer
from rsocket.transports.aiohttp_websocket import TransportAioHttpWebsocket
from rsocket.transports.tcp import TransportTCP

router = RequestRouter()
Expand Down Expand Up @@ -106,16 +111,49 @@ def handle_client(reader, writer):
RSocketServer(TransportTCP(reader, writer), handler_factory=handler_factory)


async def run_server(server_port):
logging.info('Starting server at localhost:%s', server_port)
def websocket_handler_factory(**kwargs):
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
transport = TransportAioHttpWebsocket(ws)
RSocketServer(transport, **kwargs)
await transport.handle_incoming_ws_messages()
return ws

return websocket_handler


@click.command()
@click.option('--port', help='Port to listen on', default=6565, type=int)
@click.option('--with-ssl', is_flag=True, help='Enable SSL mode')
@click.option('--transport', is_flag=False, default='tcp')
async def start_server(with_ssl: bool, port: int, transport: str):
logging.basicConfig(level=logging.DEBUG)

logging.info(f'Starting {transport} server at localhost:{port}')

if transport in ['ws', 'wss']:
app = web.Application()
app.add_routes([web.get('/', websocket_handler_factory(handler_factory=handler_factory))])

if with_ssl:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)

with cert_gen() as (certificate, key):
ssl_context.load_cert_chain(certificate, key)
else:
ssl_context = None

server = await asyncio.start_server(handle_client, 'localhost', server_port)
await web._run_app(app, port=port, ssl_context=ssl_context)
elif transport == 'tcp':

async with server:
await server.serve_forever()
server = await asyncio.start_server(handle_client, 'localhost', port)

async with server:
await server.serve_forever()
else:
raise Exception(f'Unsupported transport {transport}')


if __name__ == '__main__':
port = sys.argv[1] if len(sys.argv) > 1 else 6565
logging.basicConfig(level=logging.DEBUG)
asyncio.run(run_server(port))
start_server()
9 changes: 5 additions & 4 deletions examples/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ def test_client_server_over_websocket_aiohttp(unused_tcp_port):


def test_client_server_over_websocket_secure_aiohttp(unused_tcp_port):
pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', './server_aiohttp_websocket_secure.py', str(unused_tcp_port))
pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', 'server_aiohttp_websocket.py', '--port', str(unused_tcp_port),
'--with-ssl')

try:
sleep(2)
client = subprocess.Popen(['python3', './client_wss.py', str(unused_tcp_port)])
client = subprocess.Popen(['python3', './client_websocket.py', '--port', str(unused_tcp_port), '--with-ssl'])
client.wait(timeout=3)

assert client.returncode == 0
Expand All @@ -103,11 +104,11 @@ def test_client_server_over_websocket_secure_aiohttp(unused_tcp_port):


def test_client_server_over_websocket_quart(unused_tcp_port):
pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', './server_quart_websocket.py', str(unused_tcp_port))
pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', './server_quart_websocket.py', '--port', str(unused_tcp_port))

try:
sleep(2)
client = subprocess.Popen(['python3', './client_websocket.py', str(unused_tcp_port)])
client = subprocess.Popen(['python3', './client_websocket.py', '--port', str(unused_tcp_port)])
client.wait(timeout=3)

assert client.returncode == 0
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ quart==0.18.2
coveralls==3.3.1
aioquic==0.9.20
reactivex==4.0.4
starlette==0.16.0
starlette==0.16.0
asyncclick==8.1.3.4
1 change: 1 addition & 0 deletions rsocket/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = '0.4.1'
12 changes: 6 additions & 6 deletions rsocket/awaitable/awaitable_rsocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ async def request_response(self, payload: Payload) -> Payload:

async def request_stream(self,
payload: Payload,
initial_request_n=MAX_REQUEST_N) -> List[Payload]:
subscriber = CollectorSubscriber()
limit_rate=MAX_REQUEST_N) -> List[Payload]:
subscriber = CollectorSubscriber(limit_rate)

self._rsocket.request_stream(payload).initial_request_n(initial_request_n).subscribe(subscriber)
self._rsocket.request_stream(payload).initial_request_n(limit_rate).subscribe(subscriber)

return await subscriber.run()

async def request_channel(self,
payload: Payload,
publisher: Optional[Publisher] = None,
initial_request_n=MAX_REQUEST_N) -> List[Payload]:
subscriber = CollectorSubscriber()
limit_rate=MAX_REQUEST_N) -> List[Payload]:
subscriber = CollectorSubscriber(limit_rate)

self._rsocket.request_channel(payload, publisher).initial_request_n(initial_request_n).subscribe(subscriber)
self._rsocket.request_channel(payload, publisher).initial_request_n(limit_rate).subscribe(subscriber)

return await subscriber.run()

Expand Down
17 changes: 16 additions & 1 deletion rsocket/awaitable/collector_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

from reactivestreams.subscriber import Subscriber
from reactivestreams.subscription import DefaultSubscription
from rsocket.frame import MAX_REQUEST_N


class CollectorSubscriber(Subscriber):

def __init__(self) -> None:
def __init__(self, limit_rate=MAX_REQUEST_N, limit_count=None) -> None:
self._limit_count = limit_count
self._limit_rate = limit_rate
self._received_count = 0
self._total_received_count = 0
self.is_done = asyncio.Event()
self.error = None
self.values = []
Expand All @@ -21,8 +26,18 @@ def on_subscribe(self, subscription: DefaultSubscription):
def on_next(self, value, is_complete=False):
self.values.append(value)

self._received_count += 1
self._total_received_count += 1

if is_complete:
self.is_done.set()
elif self._limit_count is not None and self._limit_count == self._total_received_count:
self.subscription.cancel()
self.is_done.set()
else:
if self._received_count == self._limit_rate:
self._received_count = 0
self.subscription.request(self._limit_rate)

def on_error(self, exception: Exception):
self.error = exception
Expand Down
Empty file added rsocket/cli/__init__.py
Empty file.
Loading