From 6329dd38e1048e995dc98ce866c56c0f365c4fd2 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Sun, 23 Oct 2022 12:39:04 +0300 Subject: [PATCH 01/21] initial wip cli --- requirements.txt | 3 +- rsocket/cli/__init__.py | 0 rsocket/cli/command.py | 114 ++++++++++++++++++++++++++++++++++++++++ setup.py | 3 +- 4 files changed, 118 insertions(+), 2 deletions(-) create mode 100644 rsocket/cli/__init__.py create mode 100644 rsocket/cli/command.py diff --git a/requirements.txt b/requirements.txt index 4fd6d968..a9b87dd4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,5 @@ quart==0.18.2 coveralls==3.3.1 aioquic==0.9.20 reactivex==4.0.4 -starlette==0.16.0 \ No newline at end of file +starlette==0.16.0 +asyncclick==8.1.3.4 \ No newline at end of file diff --git a/rsocket/cli/__init__.py b/rsocket/cli/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py new file mode 100644 index 00000000..39f0e1d6 --- /dev/null +++ b/rsocket/cli/command.py @@ -0,0 +1,114 @@ +import asyncio +from dataclasses import dataclass +from typing import Optional, Type + +import asyncclick as click + +from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket +from rsocket.extensions.helpers import route, composite, authenticate_simple +from rsocket.extensions.mimetypes import WellKnownMimeTypes +from rsocket.frame_helpers import ensure_bytes +from rsocket.helpers import single_transport_provider +from rsocket.payload import Payload +from rsocket.rsocket_client import RSocketClient +from rsocket.transports.abstract_messaging import AbstractMessagingTransport +from rsocket.transports.aiohttp_websocket import TransportAioHttpClient +from rsocket.transports.tcp import TransportTCP + + +@dataclass(frozen=True) +class RSocketUri: + host: str + port: str + schema: str + path: Optional[str] = None + original_uri: Optional[str] = None + + +def parse_uri(uri: str): + schema, rest = uri.split(':', 1) + rest = rest.strip('/') + host_port = rest.split('/', 1) + host, port = host_port[0].split(':') + + if len(host_port) > 1: + rest = host_port[1] + else: + rest = None + + return RSocketUri(host, port, schema, rest, uri) + + +async def transport_from_uri(uri: RSocketUri) -> Type[AbstractMessagingTransport]: + if uri.schema == 'tcp': + connection = await asyncio.open_connection(uri.host, uri.port) + return TransportTCP(*connection) + elif uri.schema == 'ws': + return TransportAioHttpClient(uri.original_uri) + + raise Exception('Unsupported schema in CLI') + + +def build_composite_metadata(auth_simple, route_value): + composite_items = [] + if route_value is not None: + composite_items.append(route(route_value)) + if auth_simple is not None: + composite_items.append(authenticate_simple(*auth_simple.split(':'))) + return composite_items + + +@click.command() +@click.option('-d', '--data', is_flag=False) +@click.option('-m', '--metadata', is_flag=False, default=None) +@click.option('-r', '--route', 'route_value', is_flag=False, default=None) +@click.option('-u', '--as', '--authSimple', 'auth_simple', is_flag=False, default=None) +@click.option('--dataMimeType', is_flag=False, default='application/json') +@click.option('--request', is_flag=True) +@click.option('--stream', is_flag=True) +@click.option('--channel', is_flag=True) +@click.option('--fnf', is_flag=True) +@click.option('--version', is_flag=True) +@click.argument('uri') +async def command(data, + metadata, route_value, auth_simple, + datamimetype, + request, stream, channel, fnf, + uri, version): + parsed_uri = parse_uri(uri) + + composite_items = build_composite_metadata(auth_simple, route_value) + + transport = await transport_from_uri(parsed_uri) + + client_arguments = {} + + if len(composite_items) > 0: + client_arguments['metadata_encoding'] = WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA + + async with RSocketClient(single_transport_provider(transport), **client_arguments) as client: + awaitable_client = AwaitableRSocket(client) + + if len(composite_items) > 0: + metadata_value = composite(*composite_items) + else: + metadata_value = metadata + + payload = Payload(ensure_bytes(data), metadata_value) + + result = None + if request: + result = await awaitable_client.request_response(payload) + elif stream: + result = await awaitable_client.request_stream(payload) + elif channel: + result = await awaitable_client.request_channel(payload) + elif fnf: + await awaitable_client.fire_and_forget(payload) + + if result is not None: + print(result.data.decode('utf-8')) + + +if __name__ == '__main__': + command() diff --git a/setup.py b/setup.py index f9d3f5b8..2030490d 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,8 @@ 'reactivex': {'reactivex >= 4.0.0'}, 'aiohttp': {'aiohttp >= 3.0.0'}, 'quart': {'quart >= 0.15.0'}, - 'quic': {'aioquic >= 0.9.0'} + 'quic': {'aioquic >= 0.9.0'}, + 'cli': {'asyncclick >= 8.0.0'} }, classifiers=[ 'Development Status :: 3 - Alpha', From cd0af1c07c10a2b525a004ec753e6d56f0858053 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Sun, 23 Oct 2022 12:52:01 +0300 Subject: [PATCH 02/21] WIP cli --- rsocket/cli/command.py | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index 39f0e1d6..086d59df 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -1,11 +1,11 @@ import asyncio from dataclasses import dataclass -from typing import Optional, Type +from typing import Optional, Type, Collection import asyncclick as click from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket -from rsocket.extensions.helpers import route, composite, authenticate_simple +from rsocket.extensions.helpers import route, composite, authenticate_simple, authenticate_bearer from rsocket.extensions.mimetypes import WellKnownMimeTypes from rsocket.frame_helpers import ensure_bytes from rsocket.helpers import single_transport_provider @@ -49,35 +49,50 @@ async def transport_from_uri(uri: RSocketUri) -> Type[AbstractMessagingTransport raise Exception('Unsupported schema in CLI') -def build_composite_metadata(auth_simple, route_value): +def build_composite_metadata(auth_simple: str, route_value: str, auth_bearer: str): composite_items = [] + if route_value is not None: composite_items.append(route(route_value)) + if auth_simple is not None: composite_items.append(authenticate_simple(*auth_simple.split(':'))) + + if auth_bearer is not None: + composite_items.append(authenticate_bearer(auth_bearer)) + return composite_items @click.command() @click.option('-d', '--data', is_flag=False) +@click.option('-l', '--load', is_flag=False) @click.option('-m', '--metadata', is_flag=False, default=None) @click.option('-r', '--route', 'route_value', is_flag=False, default=None) +# @click.option('--limitRate', 'limit_rate', is_flag=False, default=None) +@click.option('--take', is_flag=False, default=None) @click.option('-u', '--as', '--authSimple', 'auth_simple', is_flag=False, default=None) -@click.option('--dataMimeType', is_flag=False, default='application/json') +@click.option('--ab', '--authBearer', 'auth_bearer', is_flag=False, default=None) +@click.option('--dataMimeType', '--dmt', 'data_mime_type', is_flag=False, default='application/json') +@click.option('--metadataMimeType', '--mmt', 'metadata_mime_type', is_flag=False, default='application/json') @click.option('--request', is_flag=True) @click.option('--stream', is_flag=True) @click.option('--channel', is_flag=True) @click.option('--fnf', is_flag=True) +@click.option('--debug', is_flag=True) +@click.option('--quiet', '-q', is_flag=True) @click.option('--version', is_flag=True) @click.argument('uri') -async def command(data, - metadata, route_value, auth_simple, - datamimetype, +async def command(data, load, + metadata, route_value, auth_simple, auth_bearer, + # limit_rate, + take, + data_mime_type, metadata_mime_type, request, stream, channel, fnf, - uri, version): + uri, debug, version, quiet): parsed_uri = parse_uri(uri) - composite_items = build_composite_metadata(auth_simple, route_value) + composite_items = build_composite_metadata(auth_simple, route_value, auth_bearer) transport = await transport_from_uri(parsed_uri) @@ -97,6 +112,7 @@ async def command(data, payload = Payload(ensure_bytes(data), metadata_value) result = None + if request: result = await awaitable_client.request_response(payload) elif stream: @@ -106,8 +122,10 @@ async def command(data, elif fnf: await awaitable_client.fire_and_forget(payload) - if result is not None: + if isinstance(result, Payload): print(result.data.decode('utf-8')) + elif isinstance(result, Collection): + print([p.data.decode('utf-8') for p in result]) if __name__ == '__main__': From adb59fb5671a96b47d7ac4ea5af2b0127f225983 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Sun, 23 Oct 2022 12:53:19 +0300 Subject: [PATCH 03/21] WIP cli --- rsocket/cli/command.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index 086d59df..10928227 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -49,7 +49,9 @@ async def transport_from_uri(uri: RSocketUri) -> Type[AbstractMessagingTransport raise Exception('Unsupported schema in CLI') -def build_composite_metadata(auth_simple: str, route_value: str, auth_bearer: str): +def build_composite_metadata(auth_simple: str, + route_value: str, + auth_bearer: str): composite_items = [] if route_value is not None: From a3fda0f51a0ad8fe837919d07cf1bf9d0b469f42 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Sun, 23 Oct 2022 21:29:46 +0300 Subject: [PATCH 04/21] improved cli. cli depends on reactivex package --- rsocket/cli/command.py | 72 ++++++++++++++++++++++++++++++++++++------ setup.py | 2 +- 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index 10928227..ac65c5ae 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -1,15 +1,18 @@ import asyncio +import logging from dataclasses import dataclass from typing import Optional, Type, Collection import asyncclick as click +from reactivex import operators -from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket from rsocket.extensions.helpers import route, composite, authenticate_simple, authenticate_bearer from rsocket.extensions.mimetypes import WellKnownMimeTypes +from rsocket.frame import MAX_REQUEST_N from rsocket.frame_helpers import ensure_bytes from rsocket.helpers import single_transport_provider from rsocket.payload import Payload +from rsocket.reactivex.reactivex_client import ReactiveXClient from rsocket.rsocket_client import RSocketClient from rsocket.transports.abstract_messaging import AbstractMessagingTransport from rsocket.transports.aiohttp_websocket import TransportAioHttpClient @@ -71,7 +74,7 @@ def build_composite_metadata(auth_simple: str, @click.option('-l', '--load', is_flag=False) @click.option('-m', '--metadata', is_flag=False, default=None) @click.option('-r', '--route', 'route_value', is_flag=False, default=None) -# @click.option('--limitRate', 'limit_rate', is_flag=False, default=None) +@click.option('--limitRate', 'limit_rate', is_flag=False, default=None) @click.option('--take', is_flag=False, default=None) @click.option('-u', '--as', '--authSimple', 'auth_simple', is_flag=False, default=None) @click.option('--ab', '--authBearer', 'auth_bearer', is_flag=False, default=None) @@ -87,42 +90,93 @@ def build_composite_metadata(auth_simple: str, @click.argument('uri') async def command(data, load, metadata, route_value, auth_simple, auth_bearer, - # limit_rate, + limit_rate, take, data_mime_type, metadata_mime_type, request, stream, channel, fnf, uri, debug, version, quiet): + if version: + print('v0.4') + return + + if quiet: + logging.basicConfig(handlers=[]) + + if debug: + logging.basicConfig(level=logging.DEBUG) + + take_n = None + + if take is not None: + take_n = int(take) + + if take_n == 0: + return + + if limit_rate is not None: + limit_rate = int(limit_rate) + + if not limit_rate > 0: + limit_rate = MAX_REQUEST_N + else: + limit_rate = MAX_REQUEST_N + parsed_uri = parse_uri(uri) composite_items = build_composite_metadata(auth_simple, route_value, auth_bearer) transport = await transport_from_uri(parsed_uri) - client_arguments = {} + client_arguments = { + 'data_encoding': data_mime_type or WellKnownMimeTypes.APPLICATION_JSON, + 'metadata_encoding': metadata_mime_type or WellKnownMimeTypes.APPLICATION_JSON, + } if len(composite_items) > 0: client_arguments['metadata_encoding'] = WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA async with RSocketClient(single_transport_provider(transport), **client_arguments) as client: - awaitable_client = AwaitableRSocket(client) + awaitable_client = ReactiveXClient(client) if len(composite_items) > 0: metadata_value = composite(*composite_items) else: metadata_value = metadata + if load: + with open(load) as fd: + data = fd.read() + payload = Payload(ensure_bytes(data), metadata_value) result = None if request: - result = await awaitable_client.request_response(payload) + result = await awaitable_client.request_response(payload).pipe(operators.single()) elif stream: - result = await awaitable_client.request_stream(payload) + operations = [operators.to_list()] + + if take_n is not None: + operations.append(operators.take(take)) + + result = await awaitable_client.request_stream(payload, request_limit=limit_rate).pipe( + *operations + ) elif channel: - result = await awaitable_client.request_channel(payload) + operations = [operators.to_list()] + + if take is not None: + take_n = int(take) + + if take_n == 0: + return + + operations.append(operators.take(take)) + result = await awaitable_client.request_channel(payload, request_limit=limit_rate).pipe( + *operations + ) elif fnf: - await awaitable_client.fire_and_forget(payload) + await awaitable_client.fire_and_forget(payload).pipe(operators.single()) if isinstance(result, Payload): print(result.data.decode('utf-8')) diff --git a/setup.py b/setup.py index 2030490d..f7a2585d 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ 'aiohttp': {'aiohttp >= 3.0.0'}, 'quart': {'quart >= 0.15.0'}, 'quic': {'aioquic >= 0.9.0'}, - 'cli': {'asyncclick >= 8.0.0'} + 'cli': {'asyncclick >= 8.0.0', 'reactivex >= 4.0.0'} }, classifiers=[ 'Development Status :: 3 - Alpha', From d6fc4c9e3a5639bd92c4176f188a4a524ac23f83 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Sun, 23 Oct 2022 21:46:52 +0300 Subject: [PATCH 05/21] removed cli dependency on reactivex --- rsocket/__init__.py | 1 + rsocket/awaitable/awaitable_rsocket.py | 12 +++--- rsocket/awaitable/collector_subscriber.py | 11 ++++- rsocket/cli/command.py | 51 +++++++---------------- setup.py | 2 +- tests/rsocket/test_request_stream.py | 8 +--- 6 files changed, 34 insertions(+), 51 deletions(-) diff --git a/rsocket/__init__.py b/rsocket/__init__.py index e69de29b..f0ede3d3 100644 --- a/rsocket/__init__.py +++ b/rsocket/__init__.py @@ -0,0 +1 @@ +__version__ = '0.4.1' diff --git a/rsocket/awaitable/awaitable_rsocket.py b/rsocket/awaitable/awaitable_rsocket.py index 64e2764d..43fc4fb2 100644 --- a/rsocket/awaitable/awaitable_rsocket.py +++ b/rsocket/awaitable/awaitable_rsocket.py @@ -24,20 +24,20 @@ async def request_response(self, payload: Payload) -> Payload: async def request_stream(self, payload: Payload, - initial_request_n=MAX_REQUEST_N) -> List[Payload]: - subscriber = CollectorSubscriber() + limit_rate=MAX_REQUEST_N) -> List[Payload]: + subscriber = CollectorSubscriber(limit_rate) - self._rsocket.request_stream(payload).initial_request_n(initial_request_n).subscribe(subscriber) + self._rsocket.request_stream(payload).initial_request_n(limit_rate).subscribe(subscriber) return await subscriber.run() async def request_channel(self, payload: Payload, publisher: Optional[Publisher] = None, - initial_request_n=MAX_REQUEST_N) -> List[Payload]: - subscriber = CollectorSubscriber() + limit_rate=MAX_REQUEST_N) -> List[Payload]: + subscriber = CollectorSubscriber(limit_rate) - self._rsocket.request_channel(payload, publisher).initial_request_n(initial_request_n).subscribe(subscriber) + self._rsocket.request_channel(payload, publisher).initial_request_n(limit_rate).subscribe(subscriber) return await subscriber.run() diff --git a/rsocket/awaitable/collector_subscriber.py b/rsocket/awaitable/collector_subscriber.py index 025f13a3..fd2f7504 100644 --- a/rsocket/awaitable/collector_subscriber.py +++ b/rsocket/awaitable/collector_subscriber.py @@ -2,11 +2,14 @@ from reactivestreams.subscriber import Subscriber from reactivestreams.subscription import DefaultSubscription +from rsocket.frame import MAX_REQUEST_N class CollectorSubscriber(Subscriber): - def __init__(self) -> None: + def __init__(self, limit_rate=MAX_REQUEST_N) -> None: + self._limit_rate = limit_rate + self._received_count = 0 self.is_done = asyncio.Event() self.error = None self.values = [] @@ -21,8 +24,14 @@ def on_subscribe(self, subscription: DefaultSubscription): def on_next(self, value, is_complete=False): self.values.append(value) + self._received_count += 1 + if is_complete: self.is_done.set() + else: + if self._received_count == self._limit_rate: + self._received_count = 0 + self.subscription.request(self._limit_rate) def on_error(self, exception: Exception): self.error = exception diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index ac65c5ae..f463b009 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -4,20 +4,19 @@ from typing import Optional, Type, Collection import asyncclick as click -from reactivex import operators +from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket from rsocket.extensions.helpers import route, composite, authenticate_simple, authenticate_bearer from rsocket.extensions.mimetypes import WellKnownMimeTypes from rsocket.frame import MAX_REQUEST_N from rsocket.frame_helpers import ensure_bytes from rsocket.helpers import single_transport_provider from rsocket.payload import Payload -from rsocket.reactivex.reactivex_client import ReactiveXClient from rsocket.rsocket_client import RSocketClient from rsocket.transports.abstract_messaging import AbstractMessagingTransport from rsocket.transports.aiohttp_websocket import TransportAioHttpClient from rsocket.transports.tcp import TransportTCP - +from importlib.metadata import version as get_version @dataclass(frozen=True) class RSocketUri: @@ -75,7 +74,7 @@ def build_composite_metadata(auth_simple: str, @click.option('-m', '--metadata', is_flag=False, default=None) @click.option('-r', '--route', 'route_value', is_flag=False, default=None) @click.option('--limitRate', 'limit_rate', is_flag=False, default=None) -@click.option('--take', is_flag=False, default=None) +# @click.option('--take', is_flag=False, default=None) @click.option('-u', '--as', '--authSimple', 'auth_simple', is_flag=False, default=None) @click.option('--ab', '--authBearer', 'auth_bearer', is_flag=False, default=None) @click.option('--dataMimeType', '--dmt', 'data_mime_type', is_flag=False, default='application/json') @@ -91,12 +90,12 @@ def build_composite_metadata(auth_simple: str, async def command(data, load, metadata, route_value, auth_simple, auth_bearer, limit_rate, - take, + # take, data_mime_type, metadata_mime_type, request, stream, channel, fnf, uri, debug, version, quiet): if version: - print('v0.4') + print(get_version('rsocket')) return if quiet: @@ -105,13 +104,11 @@ async def command(data, load, if debug: logging.basicConfig(level=logging.DEBUG) - take_n = None - - if take is not None: - take_n = int(take) - - if take_n == 0: - return + # if take is not None: + # take_n = int(take) + # + # if take_n == 0: + # return if limit_rate is not None: limit_rate = int(limit_rate) @@ -136,7 +133,7 @@ async def command(data, load, client_arguments['metadata_encoding'] = WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA async with RSocketClient(single_transport_provider(transport), **client_arguments) as client: - awaitable_client = ReactiveXClient(client) + awaitable_client = AwaitableRSocket(client) if len(composite_items) > 0: metadata_value = composite(*composite_items) @@ -152,31 +149,13 @@ async def command(data, load, result = None if request: - result = await awaitable_client.request_response(payload).pipe(operators.single()) + result = await awaitable_client.request_response(payload) elif stream: - operations = [operators.to_list()] - - if take_n is not None: - operations.append(operators.take(take)) - - result = await awaitable_client.request_stream(payload, request_limit=limit_rate).pipe( - *operations - ) + result = await awaitable_client.request_stream(payload, limit_rate=limit_rate) elif channel: - operations = [operators.to_list()] - - if take is not None: - take_n = int(take) - - if take_n == 0: - return - - operations.append(operators.take(take)) - result = await awaitable_client.request_channel(payload, request_limit=limit_rate).pipe( - *operations - ) + result = await awaitable_client.request_channel(payload, limit_rate=limit_rate) elif fnf: - await awaitable_client.fire_and_forget(payload).pipe(operators.single()) + await awaitable_client.fire_and_forget(payload) if isinstance(result, Payload): print(result.data.decode('utf-8')) diff --git a/setup.py b/setup.py index f7a2585d..2030490d 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ 'aiohttp': {'aiohttp >= 3.0.0'}, 'quart': {'quart >= 0.15.0'}, 'quic': {'aioquic >= 0.9.0'}, - 'cli': {'asyncclick >= 8.0.0', 'reactivex >= 4.0.0'} + 'cli': {'asyncclick >= 8.0.0'} }, classifiers=[ 'Development Status :: 3 - Alpha', diff --git a/tests/rsocket/test_request_stream.py b/tests/rsocket/test_request_stream.py index c9ed844d..d2b711d4 100644 --- a/tests/rsocket/test_request_stream.py +++ b/tests/rsocket/test_request_stream.py @@ -200,15 +200,9 @@ def request(self, n: int): async def request_stream(self, payload: Payload) -> Publisher: return self - class StreamSubscriber(CollectorSubscriber): - - def on_next(self, value, is_complete=False): - super().on_next(value, is_complete) - self.subscription.request(1) - server.set_handler_using_factory(Handler) - stream_subscriber = StreamSubscriber() + stream_subscriber = CollectorSubscriber(limit_rate=1) client.request_stream(Payload()).initial_request_n(1).subscribe(stream_subscriber) From 8663184fa140277eaea599d033a010eae89548da Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Mon, 24 Oct 2022 09:06:19 +0300 Subject: [PATCH 06/21] WIP cli - refactoring. added help to options. load data from stdin. limit total stream response support. small fixes. --- rsocket/awaitable/collector_subscriber.py | 8 +- rsocket/cli/command.py | 89 +++++++++++++---------- 2 files changed, 59 insertions(+), 38 deletions(-) diff --git a/rsocket/awaitable/collector_subscriber.py b/rsocket/awaitable/collector_subscriber.py index fd2f7504..cf29c599 100644 --- a/rsocket/awaitable/collector_subscriber.py +++ b/rsocket/awaitable/collector_subscriber.py @@ -7,9 +7,11 @@ class CollectorSubscriber(Subscriber): - def __init__(self, limit_rate=MAX_REQUEST_N) -> None: + def __init__(self, limit_rate=MAX_REQUEST_N, limit_count=None) -> None: + self._limit_count = limit_count self._limit_rate = limit_rate self._received_count = 0 + self._total_received_count = 0 self.is_done = asyncio.Event() self.error = None self.values = [] @@ -25,9 +27,13 @@ def on_next(self, value, is_complete=False): self.values.append(value) self._received_count += 1 + self._total_received_count += 1 if is_complete: self.is_done.set() + elif self._limit_count is not None and self._limit_count == self._total_received_count: + self.subscription.cancel() + self.is_done.set() else: if self._received_count == self._limit_rate: self._received_count = 0 diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index f463b009..3fd24abb 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -18,6 +18,7 @@ from rsocket.transports.tcp import TransportTCP from importlib.metadata import version as get_version + @dataclass(frozen=True) class RSocketUri: host: str @@ -69,33 +70,42 @@ def build_composite_metadata(auth_simple: str, @click.command() -@click.option('-d', '--data', is_flag=False) -@click.option('-l', '--load', is_flag=False) -@click.option('-m', '--metadata', is_flag=False, default=None) -@click.option('-r', '--route', 'route_value', is_flag=False, default=None) -@click.option('--limitRate', 'limit_rate', is_flag=False, default=None) -# @click.option('--take', is_flag=False, default=None) -@click.option('-u', '--as', '--authSimple', 'auth_simple', is_flag=False, default=None) -@click.option('--ab', '--authBearer', 'auth_bearer', is_flag=False, default=None) -@click.option('--dataMimeType', '--dmt', 'data_mime_type', is_flag=False, default='application/json') -@click.option('--metadataMimeType', '--mmt', 'metadata_mime_type', is_flag=False, default='application/json') +@click.option('-d', '--data', is_flag=False, help='Data. Use "-" to read data from standard input. (default: )') +@click.option('-l', '--load', is_flag=False, help='Load a file as Data. (e.g. ./foo.txt, /tmp/foo.txt)') +@click.option('-m', '--metadata', is_flag=False, default=None, help='Metadata (default: )') +@click.option('-r', '--route', 'route_value', is_flag=False, default=None, help='Enable Routing Metadata Extension') +@click.option('--limitRate', 'limit_rate', is_flag=False, default=None, type=int, help='Enable limitRate(rate)') +@click.option('--take', 'take_n', is_flag=False, default=None, type=int) +@click.option('-u', '--as', '--authSimple', 'auth_simple', is_flag=False, default=None, + help='Enable Authentication Metadata Extension (Simple). The format must be "username: password"') +@click.option('--sd', '--setupData', 'setup_data', is_flag=False, default=None) +@click.option('--sm', '--setupMetadata', 'setup_metadata', is_flag=False, default=None) +@click.option('--ab', '--authBearer', 'auth_bearer', is_flag=False, default=None, + help='Enable Authentication Metadata Extension (Bearer)') +@click.option('--dataMimeType', '--dmt', 'data_mime_type', is_flag=False, + help='MimeType for data (default: application/json)') +@click.option('--metadataMimeType', '--mmt', 'metadata_mime_type', is_flag=False, + help='MimeType for metadata (default:application/json)') @click.option('--request', is_flag=True) @click.option('--stream', is_flag=True) @click.option('--channel', is_flag=True) @click.option('--fnf', is_flag=True) -@click.option('--debug', is_flag=True) -@click.option('--quiet', '-q', is_flag=True) -@click.option('--version', is_flag=True) +@click.option('--debug', is_flag=True, help='Show debug log') +@click.option('--quiet', '-q', is_flag=True, help='Disable the output on next') +@click.option('--version', is_flag=True, help='Print version') @click.argument('uri') async def command(data, load, metadata, route_value, auth_simple, auth_bearer, - limit_rate, - # take, + limit_rate, take_n, + setup_data, setup_metadata, data_mime_type, metadata_mime_type, request, stream, channel, fnf, uri, debug, version, quiet): if version: - print(get_version('rsocket')) + try: + print(get_version('rsocket')) + except Exception: + print('Failed to find version') return if quiet: @@ -104,17 +114,11 @@ async def command(data, load, if debug: logging.basicConfig(level=logging.DEBUG) - # if take is not None: - # take_n = int(take) - # - # if take_n == 0: - # return - - if limit_rate is not None: - limit_rate = int(limit_rate) + if take_n == 0: + return - if not limit_rate > 0: - limit_rate = MAX_REQUEST_N + if limit_rate is not None and not limit_rate > 0: + limit_rate = MAX_REQUEST_N else: limit_rate = MAX_REQUEST_N @@ -122,17 +126,27 @@ async def command(data, load, composite_items = build_composite_metadata(auth_simple, route_value, auth_bearer) - transport = await transport_from_uri(parsed_uri) + if data == '-': + stdin_text = click.get_text_stream('stdin') + data = stdin_text.read() - client_arguments = { - 'data_encoding': data_mime_type or WellKnownMimeTypes.APPLICATION_JSON, - 'metadata_encoding': metadata_mime_type or WellKnownMimeTypes.APPLICATION_JSON, - } + transport = await transport_from_uri(parsed_uri) if len(composite_items) > 0: - client_arguments['metadata_encoding'] = WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA + metadata_mime_type = WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA + + setup_payload = None + + if setup_data is not None or setup_metadata is not None: + setup_payload = Payload( + ensure_bytes(setup_data), + ensure_bytes(setup_metadata) + ) - async with RSocketClient(single_transport_provider(transport), **client_arguments) as client: + async with RSocketClient(single_transport_provider(transport), + data_encoding=data_mime_type or WellKnownMimeTypes.APPLICATION_JSON, + metadata_encoding=metadata_mime_type or WellKnownMimeTypes.APPLICATION_JSON, + setup_payload=setup_payload) as client: awaitable_client = AwaitableRSocket(client) if len(composite_items) > 0: @@ -157,10 +171,11 @@ async def command(data, load, elif fnf: await awaitable_client.fire_and_forget(payload) - if isinstance(result, Payload): - print(result.data.decode('utf-8')) - elif isinstance(result, Collection): - print([p.data.decode('utf-8') for p in result]) + if not quiet: + if isinstance(result, Payload): + print(result.data.decode('utf-8')) + elif isinstance(result, Collection): + print([p.data.decode('utf-8') for p in result]) if __name__ == '__main__': From 289217d5d16d7c27419a95ccc9cbb36a7dd1858a Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Mon, 24 Oct 2022 09:11:08 +0300 Subject: [PATCH 07/21] WIP cli - refactoring --- rsocket/cli/command.py | 109 +++++++++++++++++++++++++++-------------- 1 file changed, 73 insertions(+), 36 deletions(-) diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index 3fd24abb..b2d38b14 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -117,31 +117,20 @@ async def command(data, load, if take_n == 0: return - if limit_rate is not None and not limit_rate > 0: - limit_rate = MAX_REQUEST_N - else: - limit_rate = MAX_REQUEST_N + limit_rate = normalize_limit_rate(limit_rate) parsed_uri = parse_uri(uri) composite_items = build_composite_metadata(auth_simple, route_value, auth_bearer) - if data == '-': - stdin_text = click.get_text_stream('stdin') - data = stdin_text.read() + data = normalize_data(data, load) transport = await transport_from_uri(parsed_uri) if len(composite_items) > 0: metadata_mime_type = WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA - setup_payload = None - - if setup_data is not None or setup_metadata is not None: - setup_payload = Payload( - ensure_bytes(setup_data), - ensure_bytes(setup_metadata) - ) + setup_payload = create_setup_payload(setup_data, setup_metadata) async with RSocketClient(single_transport_provider(transport), data_encoding=data_mime_type or WellKnownMimeTypes.APPLICATION_JSON, @@ -149,33 +138,81 @@ async def command(data, load, setup_payload=setup_payload) as client: awaitable_client = AwaitableRSocket(client) - if len(composite_items) > 0: - metadata_value = composite(*composite_items) - else: - metadata_value = metadata - - if load: - with open(load) as fd: - data = fd.read() + metadata_value = get_metadata_value(composite_items, metadata) payload = Payload(ensure_bytes(data), metadata_value) - result = None - - if request: - result = await awaitable_client.request_response(payload) - elif stream: - result = await awaitable_client.request_stream(payload, limit_rate=limit_rate) - elif channel: - result = await awaitable_client.request_channel(payload, limit_rate=limit_rate) - elif fnf: - await awaitable_client.fire_and_forget(payload) + result = await execute_request(awaitable_client, + channel, + fnf, + limit_rate, + payload, + request, + stream) if not quiet: - if isinstance(result, Payload): - print(result.data.decode('utf-8')) - elif isinstance(result, Collection): - print([p.data.decode('utf-8') for p in result]) + output_result(result) + + +def output_result(result): + if isinstance(result, Payload): + print(result.data.decode('utf-8')) + elif isinstance(result, Collection): + print([p.data.decode('utf-8') for p in result]) + + +async def execute_request(awaitable_client, channel, fnf, limit_rate, payload, request, stream): + result = None + + if request: + result = await awaitable_client.request_response(payload) + elif stream: + result = await awaitable_client.request_stream(payload, limit_rate=limit_rate) + elif channel: + result = await awaitable_client.request_channel(payload, limit_rate=limit_rate) + elif fnf: + await awaitable_client.fire_and_forget(payload) + + return result + + +def get_metadata_value(composite_items, metadata) -> bytes: + if len(composite_items) > 0: + metadata_value = composite(*composite_items) + else: + metadata_value = metadata + + return ensure_bytes(metadata_value) + + +def create_setup_payload(setup_data, setup_metadata) -> Optional[Payload]: + setup_payload = None + if setup_data is not None or setup_metadata is not None: + setup_payload = Payload( + ensure_bytes(setup_data), + ensure_bytes(setup_metadata) + ) + return setup_payload + + +def normalize_data(data: str, load: str) -> bytes: + if data == '-': + stdin_text = click.get_text_stream('stdin') + data = stdin_text.read() + + if load: + with open(load) as fd: + data = fd.read() + + return ensure_bytes(data) + + +def normalize_limit_rate(limit_rate): + if limit_rate is not None and not limit_rate > 0: + limit_rate = MAX_REQUEST_N + else: + limit_rate = MAX_REQUEST_N + return limit_rate if __name__ == '__main__': From 5410e3290659f588bef723a3f0fd354095e36dcf Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Mon, 24 Oct 2022 09:13:25 +0300 Subject: [PATCH 08/21] WIP cli - refactoring --- rsocket/cli/command.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index b2d38b14..02d4f72c 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -1,5 +1,6 @@ import asyncio import logging +from contextlib import asynccontextmanager from dataclasses import dataclass from typing import Optional, Type, Collection @@ -69,6 +70,17 @@ def build_composite_metadata(auth_simple: str, return composite_items +@asynccontextmanager +async def create_client(parsed_uri, data_mime_type, metadata_mime_type, setup_payload): + transport = await transport_from_uri(parsed_uri) + + async with RSocketClient(single_transport_provider(transport), + data_encoding=data_mime_type or WellKnownMimeTypes.APPLICATION_JSON, + metadata_encoding=metadata_mime_type or WellKnownMimeTypes.APPLICATION_JSON, + setup_payload=setup_payload) as client: + yield AwaitableRSocket(client) + + @click.command() @click.option('-d', '--data', is_flag=False, help='Data. Use "-" to read data from standard input. (default: )') @click.option('-l', '--load', is_flag=False, help='Load a file as Data. (e.g. ./foo.txt, /tmp/foo.txt)') @@ -125,24 +137,18 @@ async def command(data, load, data = normalize_data(data, load) - transport = await transport_from_uri(parsed_uri) - if len(composite_items) > 0: metadata_mime_type = WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA setup_payload = create_setup_payload(setup_data, setup_metadata) - async with RSocketClient(single_transport_provider(transport), - data_encoding=data_mime_type or WellKnownMimeTypes.APPLICATION_JSON, - metadata_encoding=metadata_mime_type or WellKnownMimeTypes.APPLICATION_JSON, - setup_payload=setup_payload) as client: - awaitable_client = AwaitableRSocket(client) + async with create_client(parsed_uri, data_mime_type, metadata_mime_type, setup_payload) as client: metadata_value = get_metadata_value(composite_items, metadata) payload = Payload(ensure_bytes(data), metadata_value) - result = await execute_request(awaitable_client, + result = await execute_request(client, channel, fnf, limit_rate, From 9ac6dc3606e68fa58ca695084e5f9d73669e9ec5 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Mon, 24 Oct 2022 09:14:10 +0300 Subject: [PATCH 09/21] WIP cli - refactoring --- rsocket/cli/command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index 02d4f72c..1fb4cf5b 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -146,7 +146,7 @@ async def command(data, load, metadata_value = get_metadata_value(composite_items, metadata) - payload = Payload(ensure_bytes(data), metadata_value) + payload = Payload(data, metadata_value) result = await execute_request(client, channel, From 1e2a6f47ad4288803fe01a13dd2a8d2caf2f854a Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Mon, 24 Oct 2022 09:20:06 +0300 Subject: [PATCH 10/21] WIP cli - refactoring --- rsocket/cli/command.py | 39 +++++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index 1fb4cf5b..f23afa59 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -129,30 +129,19 @@ async def command(data, load, if take_n == 0: return - limit_rate = normalize_limit_rate(limit_rate) - - parsed_uri = parse_uri(uri) - composite_items = build_composite_metadata(auth_simple, route_value, auth_bearer) - data = normalize_data(data, load) - - if len(composite_items) > 0: - metadata_mime_type = WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA - - setup_payload = create_setup_payload(setup_data, setup_metadata) - - async with create_client(parsed_uri, data_mime_type, metadata_mime_type, setup_payload) as client: - - metadata_value = get_metadata_value(composite_items, metadata) - - payload = Payload(data, metadata_value) + async with create_client(parse_uri(uri), + data_mime_type, + normalize_metadata_mime_type(composite_items, metadata_mime_type), + create_setup_payload(setup_data, setup_metadata) + ) as client: result = await execute_request(client, channel, fnf, - limit_rate, - payload, + normalize_limit_rate(limit_rate), + create_request_payload(data, load, metadata, composite_items), request, stream) @@ -160,6 +149,20 @@ async def command(data, load, output_result(result) +def normalize_metadata_mime_type(composite_items, metadata_mime_type): + if len(composite_items) > 0: + metadata_mime_type = WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA + + return metadata_mime_type + + +def create_request_payload(data, load, metadata, composite_items) -> Payload: + data = normalize_data(data, load) + metadata_value = get_metadata_value(composite_items, metadata) + payload = Payload(data, metadata_value) + return payload + + def output_result(result): if isinstance(result, Payload): print(result.data.decode('utf-8')) From e663fe0c771ac64585beed20ba988d2ec102bd15 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Mon, 24 Oct 2022 09:27:55 +0300 Subject: [PATCH 11/21] WIP cli - type hints and basic tests --- rsocket/cli/command.py | 29 ++++++++++------- tests/rsocket/test_cli_command.py | 52 +++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 12 deletions(-) create mode 100644 tests/rsocket/test_cli_command.py diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index f23afa59..b21753e4 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -2,7 +2,7 @@ import logging from contextlib import asynccontextmanager from dataclasses import dataclass -from typing import Optional, Type, Collection +from typing import Optional, Type, Collection, List import asyncclick as click @@ -29,7 +29,7 @@ class RSocketUri: original_uri: Optional[str] = None -def parse_uri(uri: str): +def parse_uri(uri: str) -> RSocketUri: schema, rest = uri.split(':', 1) rest = rest.strip('/') host_port = rest.split('/', 1) @@ -53,9 +53,9 @@ async def transport_from_uri(uri: RSocketUri) -> Type[AbstractMessagingTransport raise Exception('Unsupported schema in CLI') -def build_composite_metadata(auth_simple: str, - route_value: str, - auth_bearer: str): +def build_composite_metadata(auth_simple: Optional[str], + route_value: Optional[str], + auth_bearer: Optional[str]): composite_items = [] if route_value is not None: @@ -156,11 +156,13 @@ def normalize_metadata_mime_type(composite_items, metadata_mime_type): return metadata_mime_type -def create_request_payload(data, load, metadata, composite_items) -> Payload: +def create_request_payload(data: Optional[str], + load: Optional[str], + metadata: Optional[str], + composite_items: List) -> Payload: data = normalize_data(data, load) metadata_value = get_metadata_value(composite_items, metadata) - payload = Payload(data, metadata_value) - return payload + return Payload(data, metadata_value) def output_result(result): @@ -185,7 +187,7 @@ async def execute_request(awaitable_client, channel, fnf, limit_rate, payload, r return result -def get_metadata_value(composite_items, metadata) -> bytes: +def get_metadata_value(composite_items: List, metadata: Optional[str]) -> bytes: if len(composite_items) > 0: metadata_value = composite(*composite_items) else: @@ -194,22 +196,24 @@ def get_metadata_value(composite_items, metadata) -> bytes: return ensure_bytes(metadata_value) -def create_setup_payload(setup_data, setup_metadata) -> Optional[Payload]: +def create_setup_payload(setup_data: Optional[str], setup_metadata: Optional[str]) -> Optional[Payload]: setup_payload = None + if setup_data is not None or setup_metadata is not None: setup_payload = Payload( ensure_bytes(setup_data), ensure_bytes(setup_metadata) ) + return setup_payload -def normalize_data(data: str, load: str) -> bytes: +def normalize_data(data: Optional[str], load: Optional[str]) -> bytes: if data == '-': stdin_text = click.get_text_stream('stdin') data = stdin_text.read() - if load: + if load is not None: with open(load) as fd: data = fd.read() @@ -221,6 +225,7 @@ def normalize_limit_rate(limit_rate): limit_rate = MAX_REQUEST_N else: limit_rate = MAX_REQUEST_N + return limit_rate diff --git a/tests/rsocket/test_cli_command.py b/tests/rsocket/test_cli_command.py new file mode 100644 index 00000000..ba4e9045 --- /dev/null +++ b/tests/rsocket/test_cli_command.py @@ -0,0 +1,52 @@ +from rsocket.cli.command import parse_uri, build_composite_metadata, create_request_payload, get_metadata_value, \ + create_setup_payload, normalize_data, normalize_limit_rate +from rsocket.frame import MAX_REQUEST_N + + +def test_parse_uri(): + parsed = parse_uri('wss://localhost:6565') + + assert parsed.schema == 'wss' + assert parsed.port == '6565' + assert parsed.host == 'localhost' + + +def test_build_composite_metadata(): + composite = build_composite_metadata( + None, None, None + ) + + assert len(composite) == 0 + + +def test_create_request_payload(): + payload = create_request_payload( + None, None, None, [] + ) + + assert payload.data is None + assert payload.metadata is None + + +def test_get_metadata_value(): + result = get_metadata_value([], None) + + assert result is None + + +def test_create_setup_payload(): + result = create_setup_payload(None, None) + + assert result is None + + +def test_normalize_data(): + data = normalize_data(None, None) + + assert data is None + + +def test_normalize_limit_rate(): + result = normalize_limit_rate(None) + + assert result == MAX_REQUEST_N From 14d8987a1ade61f3ce269447b219fee271b86bd7 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Mon, 24 Oct 2022 09:30:52 +0300 Subject: [PATCH 12/21] WIP cli - organized help messages --- rsocket/cli/command.py | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index b21753e4..1d10fa1b 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -82,16 +82,24 @@ async def create_client(parsed_uri, data_mime_type, metadata_mime_type, setup_pa @click.command() -@click.option('-d', '--data', is_flag=False, help='Data. Use "-" to read data from standard input. (default: )') -@click.option('-l', '--load', is_flag=False, help='Load a file as Data. (e.g. ./foo.txt, /tmp/foo.txt)') -@click.option('-m', '--metadata', is_flag=False, default=None, help='Metadata (default: )') -@click.option('-r', '--route', 'route_value', is_flag=False, default=None, help='Enable Routing Metadata Extension') -@click.option('--limitRate', 'limit_rate', is_flag=False, default=None, type=int, help='Enable limitRate(rate)') -@click.option('--take', 'take_n', is_flag=False, default=None, type=int) +@click.option('-d', '--data', is_flag=False, + help='Data. Use "-" to read data from standard input. (default: )') +@click.option('-l', '--load', is_flag=False, + help='Load a file as Data. (e.g. ./foo.txt, /tmp/foo.txt)') +@click.option('-m', '--metadata', is_flag=False, default=None, + help='Metadata (default: )') +@click.option('-r', '--route', 'route_value', is_flag=False, default=None, + help='Enable Routing Metadata Extension') +@click.option('--limitRate', 'limit_rate', is_flag=False, default=None, type=int, + help='Enable limitRate(rate)') +@click.option('--take', 'take_n', is_flag=False, default=None, type=int, + help='Enable take(n)') @click.option('-u', '--as', '--authSimple', 'auth_simple', is_flag=False, default=None, help='Enable Authentication Metadata Extension (Simple). The format must be "username: password"') -@click.option('--sd', '--setupData', 'setup_data', is_flag=False, default=None) -@click.option('--sm', '--setupMetadata', 'setup_metadata', is_flag=False, default=None) +@click.option('--sd', '--setupData', 'setup_data', is_flag=False, default=None, + help='Data for Setup payload') +@click.option('--sm', '--setupMetadata', 'setup_metadata', is_flag=False, default=None, + help='Metadata for Setup payload') @click.option('--ab', '--authBearer', 'auth_bearer', is_flag=False, default=None, help='Enable Authentication Metadata Extension (Bearer)') @click.option('--dataMimeType', '--dmt', 'data_mime_type', is_flag=False, @@ -102,10 +110,13 @@ async def create_client(parsed_uri, data_mime_type, metadata_mime_type, setup_pa @click.option('--stream', is_flag=True) @click.option('--channel', is_flag=True) @click.option('--fnf', is_flag=True) -@click.option('--debug', is_flag=True, help='Show debug log') -@click.option('--quiet', '-q', is_flag=True, help='Disable the output on next') -@click.option('--version', is_flag=True, help='Print version') -@click.argument('uri') +@click.option('--debug', is_flag=True, + help='Show debug log') +@click.option('--quiet', '-q', is_flag=True, + help='Disable the output on next') +@click.option('--version', is_flag=True, + help='Print version') +@click.argument('uri', help='Connection URI. supported: tcp/ws') async def command(data, load, metadata, route_value, auth_simple, auth_bearer, limit_rate, take_n, From b6454c39758fd0c86610adb0e3b748709425eb3f Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Mon, 24 Oct 2022 09:33:53 +0300 Subject: [PATCH 13/21] WIP cli - organized help messages --- rsocket/cli/command.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index 1d10fa1b..b8b6ab79 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -81,7 +81,7 @@ async def create_client(parsed_uri, data_mime_type, metadata_mime_type, setup_pa yield AwaitableRSocket(client) -@click.command() +@click.command(help='Supported connection strings: tcp/ws') @click.option('-d', '--data', is_flag=False, help='Data. Use "-" to read data from standard input. (default: )') @click.option('-l', '--load', is_flag=False, @@ -116,7 +116,7 @@ async def create_client(parsed_uri, data_mime_type, metadata_mime_type, setup_pa help='Disable the output on next') @click.option('--version', is_flag=True, help='Print version') -@click.argument('uri', help='Connection URI. supported: tcp/ws') +@click.argument('uri') async def command(data, load, metadata, route_value, auth_simple, auth_bearer, limit_rate, take_n, From a85abcb4014de6361b5bd3772f6d8b330be9e517 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Mon, 24 Oct 2022 09:36:19 +0300 Subject: [PATCH 14/21] WIP cli - updated changelog --- CHANGELOG.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 908d810a..611fe2c5 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -6,6 +6,7 @@ v0.4.1 ====== - Removed data and metadata content from logs. Replaced with data and metadata sizes - Performance test examples available in *performance* folder +- Added command line tool (rsocket-py) v0.4.0 ====== From c104a0a7db531611ad4023ad76c0e874f9c54929 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Mon, 24 Oct 2022 10:41:59 +0300 Subject: [PATCH 15/21] WIP cli - added rsocket-py entry point extra --- README.md | 1 + setup.py | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/README.md b/README.md index e8b179dd..815d5126 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ or install any of the extras: * aiohttp * quart * quic +* cli Example: diff --git a/setup.py b/setup.py index 2030490d..c6d716e1 100644 --- a/setup.py +++ b/setup.py @@ -24,6 +24,11 @@ 'quic': {'aioquic >= 0.9.0'}, 'cli': {'asyncclick >= 8.0.0'} }, + entry_points={ + 'console_scripts': [ + 'rsocket-py = rsocket.cli.command:command [cli]', + ], + }, classifiers=[ 'Development Status :: 3 - Alpha', 'Programming Language :: Python', From 464df7fd660451b4e5eb4f4b58997c3e047f308b Mon Sep 17 00:00:00 2001 From: Gabriel Shaar Date: Tue, 25 Oct 2022 12:48:19 +0300 Subject: [PATCH 16/21] WIP - command line improvements and tests --- rsocket/cli/command.py | 16 +++++++----- tests/rsocket/helpers.py | 6 ++++- tests/rsocket/test_cli_command.py | 41 +++++++++++++++++++++++++++++++ tests/rsocket/test_fragments.py | 5 +--- 4 files changed, 57 insertions(+), 11 deletions(-) diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index b8b6ab79..a8455aa2 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -106,10 +106,14 @@ async def create_client(parsed_uri, data_mime_type, metadata_mime_type, setup_pa help='MimeType for data (default: application/json)') @click.option('--metadataMimeType', '--mmt', 'metadata_mime_type', is_flag=False, help='MimeType for metadata (default:application/json)') -@click.option('--request', is_flag=True) -@click.option('--stream', is_flag=True) -@click.option('--channel', is_flag=True) -@click.option('--fnf', is_flag=True) +@click.option('--request', is_flag=True, + help='Request response') +@click.option('--stream', is_flag=True, + help='Request stream') +@click.option('--channel', is_flag=True, + help='Request channel') +@click.option('--fnf', is_flag=True, + help='Fire and Forget') @click.option('--debug', is_flag=True, help='Show debug log') @click.option('--quiet', '-q', is_flag=True, @@ -222,11 +226,11 @@ def create_setup_payload(setup_data: Optional[str], setup_metadata: Optional[str def normalize_data(data: Optional[str], load: Optional[str]) -> bytes: if data == '-': stdin_text = click.get_text_stream('stdin') - data = stdin_text.read() + return ensure_bytes(stdin_text.read()) if load is not None: with open(load) as fd: - data = fd.read() + return ensure_bytes(fd.read()) return ensure_bytes(data) diff --git a/tests/rsocket/helpers.py b/tests/rsocket/helpers.py index 010e233d..096f5875 100644 --- a/tests/rsocket/helpers.py +++ b/tests/rsocket/helpers.py @@ -6,7 +6,7 @@ from typing import Tuple, Any from typing import Type, Callable -from rsocket.frame_helpers import str_to_bytes +from rsocket.frame_helpers import str_to_bytes, ensure_bytes from rsocket.helpers import create_future, noop from rsocket.logger import logger from rsocket.payload import Payload @@ -97,3 +97,7 @@ def get_components(pipe) -> Tuple[RSocketServer, RSocketClient]: def to_json_bytes(item: Any) -> bytes: return str_to_bytes(json.dumps(item)) + + +def create_data(base: bytes, multiplier: int, limit: float = None): + return b''.join([ensure_bytes(str(i)) + base for i in range(multiplier)])[0:limit] diff --git a/tests/rsocket/test_cli_command.py b/tests/rsocket/test_cli_command.py index ba4e9045..f075f116 100644 --- a/tests/rsocket/test_cli_command.py +++ b/tests/rsocket/test_cli_command.py @@ -1,6 +1,11 @@ +import io +import sys +import tempfile + from rsocket.cli.command import parse_uri, build_composite_metadata, create_request_payload, get_metadata_value, \ create_setup_payload, normalize_data, normalize_limit_rate from rsocket.frame import MAX_REQUEST_N +from tests.rsocket.helpers import create_data def test_parse_uri(): @@ -46,6 +51,42 @@ def test_normalize_data(): assert data is None +def test_normalize_data_from_file(): + with tempfile.NamedTemporaryFile() as fd: + fixture_data = create_data(b'1234567890', 20) + fd.write(fixture_data) + fd.flush() + + data = normalize_data(None, fd.name) + + assert data == fixture_data + + +def test_normalize_data_from_stdin(): + fixture_data = create_data(b'1234567890', 20) + stdin = io.BytesIO(fixture_data) + sys.stdin = stdin + + data = normalize_data('-', None) + + assert data == fixture_data + + +def test_normalize_data_from_stdin_takes_precedence_over_load_from_file(): + with tempfile.NamedTemporaryFile() as fd: + fixture_data_file = create_data(b'1234567890', 20) + fd.write(fixture_data_file) + fd.flush() + + fixture_data_stdin = create_data(b'0987654321', 20) + stdin = io.BytesIO(fixture_data_stdin) + sys.stdin = stdin + + data = normalize_data('-', fd.name) + + assert data == fixture_data_stdin + + def test_normalize_limit_rate(): result = normalize_limit_rate(None) diff --git a/tests/rsocket/test_fragments.py b/tests/rsocket/test_fragments.py index 714df9e9..5e79e5de 100644 --- a/tests/rsocket/test_fragments.py +++ b/tests/rsocket/test_fragments.py @@ -8,10 +8,7 @@ from rsocket.frame_fragment_cache import FrameFragmentCache from rsocket.frame_helpers import ensure_bytes from rsocket.payload import Payload - - -def create_data(base: bytes, multiplier: int, limit: float = None): - return b''.join([ensure_bytes(str(i)) + base for i in range(multiplier)])[0:limit] +from tests.rsocket.helpers import create_data def test_create_data(): From 3b2eb84bfc76302387cd34cd51b7e1911ebb7620 Mon Sep 17 00:00:00 2001 From: Gabriel Shaar Date: Tue, 25 Oct 2022 14:43:01 +0300 Subject: [PATCH 17/21] removed duplicate websocket examples (with/without ssl) --- README.md | 1 - examples/client_websocket.py | 32 +++++++++++++++------ examples/client_wss.py | 24 ---------------- examples/server_aiohttp_websocket.py | 25 +++++++++++++--- examples/server_aiohttp_websocket_secure.py | 32 --------------------- examples/test_examples.py | 9 +++--- 6 files changed, 50 insertions(+), 73 deletions(-) delete mode 100644 examples/client_wss.py delete mode 100644 examples/server_aiohttp_websocket_secure.py diff --git a/README.md b/README.md index 04e853a9..90730dc0 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,6 @@ all the examples | | ServerWithFragmentation | client_with_routing.py | | | server_quart_websocket.py | | client_websocket.py | | | server_aiohttp_websocket.py | | client_websocket.py | | -| server_aiohttp_websocket_secure.py | | client_wss.py | | # Build Status diff --git a/examples/client_websocket.py b/examples/client_websocket.py index f7a25ea5..a65ddebb 100644 --- a/examples/client_websocket.py +++ b/examples/client_websocket.py @@ -1,18 +1,34 @@ import asyncio import logging -import sys +import aiohttp +import asyncclick as click + +from rsocket.helpers import single_transport_provider from rsocket.payload import Payload +from rsocket.rsocket_client import RSocketClient +from rsocket.transports.aiohttp_websocket import TransportAioHttpClient from rsocket.transports.aiohttp_websocket import websocket_client -async def application(serve_port): - async with websocket_client('http://localhost:%s' % serve_port) as client: - result = await client.request_response(Payload(b'ping')) - print(result) +async def application(with_ssl: bool, serve_port: int): + if with_ssl: + async with aiohttp.ClientSession() as session: + async with session.ws_connect('wss://localhost:%s' % serve_port, verify_ssl=False) as websocket: + async with RSocketClient( + single_transport_provider(TransportAioHttpClient(websocket=websocket))) as client: + result = await client.request_response(Payload(b'ping')) + print(result) + + else: + async with websocket_client('http://localhost:%s' % serve_port) as client: + result = await client.request_response(Payload(b'ping')) + print(result) -if __name__ == '__main__': - port = sys.argv[1] if len(sys.argv) > 1 else 6565 +@click.command() +@click.option('--with-ssl', is_flag=False, default=False) +@click.option('--port', is_flag=False, default=6565) +async def command(with_ssl, port: int): logging.basicConfig(level=logging.DEBUG) - asyncio.run(application(port)) + asyncio.run(application(with_ssl, port)) diff --git a/examples/client_wss.py b/examples/client_wss.py deleted file mode 100644 index 97d085df..00000000 --- a/examples/client_wss.py +++ /dev/null @@ -1,24 +0,0 @@ -import asyncio -import logging -import sys - -import aiohttp - -from rsocket.helpers import single_transport_provider -from rsocket.payload import Payload -from rsocket.rsocket_client import RSocketClient -from rsocket.transports.aiohttp_websocket import TransportAioHttpClient - - -async def application(serve_port): - async with aiohttp.ClientSession() as session: - async with session.ws_connect('wss://localhost:%s' % serve_port, verify_ssl=False) as websocket: - async with RSocketClient(single_transport_provider(TransportAioHttpClient(websocket=websocket))) as client: - result = await client.request_response(Payload(b'ping')) - print(result) - - -if __name__ == '__main__': - port = sys.argv[1] if len(sys.argv) > 1 else 6565 - logging.basicConfig(level=logging.DEBUG) - asyncio.run(application(port)) diff --git a/examples/server_aiohttp_websocket.py b/examples/server_aiohttp_websocket.py index c77f0ad4..75231609 100644 --- a/examples/server_aiohttp_websocket.py +++ b/examples/server_aiohttp_websocket.py @@ -1,8 +1,10 @@ import logging -import sys +import ssl +import asyncclick as click from aiohttp import web +from examples.fixtures import cert_gen from rsocket.helpers import create_future from rsocket.local_typing import Awaitable from rsocket.payload import Payload @@ -16,9 +18,24 @@ async def request_response(self, payload: Payload) -> Awaitable[Payload]: return create_future(Payload(b'pong')) -if __name__ == '__main__': - port = sys.argv[1] if len(sys.argv) > 1 else 6565 +@click.command() +@click.option('--port', help='Port to listen on', default=6565, type=int) +@click.option('--with-ssl', is_flag=True, help='Enable SSL mode') +async def start_server(with_ssl: bool, port: int): logging.basicConfig(level=logging.DEBUG) app = web.Application() app.add_routes([web.get('/', websocket_handler_factory(handler_factory=Handler))]) - web.run_app(app, port=port) + + if with_ssl: + ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + + with cert_gen() as (certificate, key): + ssl_context.load_cert_chain(certificate, key) + else: + ssl_context = None + + await web._run_app(app, port=port, ssl_context=ssl_context) + + +if __name__ == '__main__': + start_server() diff --git a/examples/server_aiohttp_websocket_secure.py b/examples/server_aiohttp_websocket_secure.py deleted file mode 100644 index b60d14b1..00000000 --- a/examples/server_aiohttp_websocket_secure.py +++ /dev/null @@ -1,32 +0,0 @@ -import logging -import ssl -import sys - -from aiohttp import web - -from examples.fixtures import cert_gen -from rsocket.helpers import create_future -from rsocket.local_typing import Awaitable -from rsocket.payload import Payload -from rsocket.request_handler import BaseRequestHandler -from rsocket.transports.aiohttp_websocket import websocket_handler_factory - - -class Handler(BaseRequestHandler): - - async def request_response(self, payload: Payload) -> Awaitable[Payload]: - return create_future(Payload(b'pong')) - - -if __name__ == '__main__': - port = sys.argv[1] if len(sys.argv) > 1 else 6565 - logging.basicConfig(level=logging.DEBUG) - app = web.Application() - app.add_routes([web.get('/', websocket_handler_factory(handler_factory=Handler))]) - - ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - - with cert_gen() as (certificate, key): - ssl_context.load_cert_chain(certificate, key) - - web.run_app(app, port=port, ssl_context=ssl_context) diff --git a/examples/test_examples.py b/examples/test_examples.py index ab17bef5..fdaa8124 100644 --- a/examples/test_examples.py +++ b/examples/test_examples.py @@ -90,11 +90,12 @@ def test_client_server_over_websocket_aiohttp(unused_tcp_port): def test_client_server_over_websocket_secure_aiohttp(unused_tcp_port): - pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', './server_aiohttp_websocket_secure.py', str(unused_tcp_port)) + pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', 'server_aiohttp_websocket.py', '--port', str(unused_tcp_port), + '--with-ssl') try: sleep(2) - client = subprocess.Popen(['python3', './client_wss.py', str(unused_tcp_port)]) + client = subprocess.Popen(['python3', './client_websocket.py', '--port', str(unused_tcp_port), '--with-ssl']) client.wait(timeout=3) assert client.returncode == 0 @@ -103,11 +104,11 @@ def test_client_server_over_websocket_secure_aiohttp(unused_tcp_port): def test_client_server_over_websocket_quart(unused_tcp_port): - pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', './server_quart_websocket.py', str(unused_tcp_port)) + pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', './server_quart_websocket.py', '--port', str(unused_tcp_port)) try: sleep(2) - client = subprocess.Popen(['python3', './client_websocket.py', str(unused_tcp_port)]) + client = subprocess.Popen(['python3', './client_websocket.py', '--port', str(unused_tcp_port)]) client.wait(timeout=3) assert client.returncode == 0 From aa791ee1143c879aa84657701cf2661093cceffd Mon Sep 17 00:00:00 2001 From: Gabriel Shaar Date: Tue, 25 Oct 2022 15:07:01 +0300 Subject: [PATCH 18/21] added wss support for command line --- examples/server_aiohttp_websocket.py | 15 +++++- rsocket/cli/command.py | 74 ++++++++++++++++++++-------- 2 files changed, 68 insertions(+), 21 deletions(-) diff --git a/examples/server_aiohttp_websocket.py b/examples/server_aiohttp_websocket.py index 75231609..d976638d 100644 --- a/examples/server_aiohttp_websocket.py +++ b/examples/server_aiohttp_websocket.py @@ -9,7 +9,8 @@ from rsocket.local_typing import Awaitable from rsocket.payload import Payload from rsocket.request_handler import BaseRequestHandler -from rsocket.transports.aiohttp_websocket import websocket_handler_factory +from rsocket.rsocket_server import RSocketServer +from rsocket.transports.aiohttp_websocket import TransportAioHttpWebsocket class Handler(BaseRequestHandler): @@ -18,6 +19,18 @@ async def request_response(self, payload: Payload) -> Awaitable[Payload]: return create_future(Payload(b'pong')) +def websocket_handler_factory( **kwargs): + async def websocket_handler(request): + ws = web.WebSocketResponse() + await ws.prepare(request) + transport = TransportAioHttpWebsocket(ws) + RSocketServer(transport, **kwargs) + await transport.handle_incoming_ws_messages() + return ws + + return websocket_handler + + @click.command() @click.option('--port', help='Port to listen on', default=6565, type=int) @click.option('--with-ssl', is_flag=True, help='Enable SSL mode') diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index a8455aa2..9aafe427 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -4,13 +4,15 @@ from dataclasses import dataclass from typing import Optional, Type, Collection, List +import aiohttp import asyncclick as click +from werkzeug.routing import Map from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket from rsocket.extensions.helpers import route, composite, authenticate_simple, authenticate_bearer from rsocket.extensions.mimetypes import WellKnownMimeTypes from rsocket.frame import MAX_REQUEST_N -from rsocket.frame_helpers import ensure_bytes +from rsocket.frame_helpers import ensure_bytes, safe_len from rsocket.helpers import single_transport_provider from rsocket.payload import Payload from rsocket.rsocket_client import RSocketClient @@ -43,14 +45,21 @@ def parse_uri(uri: str) -> RSocketUri: return RSocketUri(host, port, schema, rest, uri) -async def transport_from_uri(uri: RSocketUri) -> Type[AbstractMessagingTransport]: +@asynccontextmanager +async def transport_from_uri(uri: RSocketUri, + verify_ssl=True, + headers: Optional[Map] = None) -> Type[AbstractMessagingTransport]: if uri.schema == 'tcp': connection = await asyncio.open_connection(uri.host, uri.port) - return TransportTCP(*connection) - elif uri.schema == 'ws': - return TransportAioHttpClient(uri.original_uri) - - raise Exception('Unsupported schema in CLI') + yield TransportTCP(*connection) + elif uri.schema in ['wss', 'ws']: + async with aiohttp.ClientSession() as session: + async with session.ws_connect(uri.original_uri, + verify_ssl=verify_ssl, + headers=headers) as websocket: + yield TransportAioHttpClient(websocket=websocket) + else: + raise Exception('Unsupported schema in CLI') def build_composite_metadata(auth_simple: Optional[str], @@ -71,17 +80,21 @@ def build_composite_metadata(auth_simple: Optional[str], @asynccontextmanager -async def create_client(parsed_uri, data_mime_type, metadata_mime_type, setup_payload): - transport = await transport_from_uri(parsed_uri) - - async with RSocketClient(single_transport_provider(transport), - data_encoding=data_mime_type or WellKnownMimeTypes.APPLICATION_JSON, - metadata_encoding=metadata_mime_type or WellKnownMimeTypes.APPLICATION_JSON, - setup_payload=setup_payload) as client: - yield AwaitableRSocket(client) - - -@click.command(help='Supported connection strings: tcp/ws') +async def create_client(parsed_uri, + data_mime_type, + metadata_mime_type, + setup_payload, + allow_untrusted_ssl=False, + http_headers=None): + async with transport_from_uri(parsed_uri, verify_ssl=not allow_untrusted_ssl, headers=http_headers) as transport: + async with RSocketClient(single_transport_provider(transport), + data_encoding=data_mime_type or WellKnownMimeTypes.APPLICATION_JSON, + metadata_encoding=metadata_mime_type or WellKnownMimeTypes.APPLICATION_JSON, + setup_payload=setup_payload) as client: + yield AwaitableRSocket(client) + + +@click.command(help='Supported connection strings: tcp/ws/wss') @click.option('-d', '--data', is_flag=False, help='Data. Use "-" to read data from standard input. (default: )') @click.option('-l', '--load', is_flag=False, @@ -106,6 +119,8 @@ async def create_client(parsed_uri, data_mime_type, metadata_mime_type, setup_pa help='MimeType for data (default: application/json)') @click.option('--metadataMimeType', '--mmt', 'metadata_mime_type', is_flag=False, help='MimeType for metadata (default:application/json)') +@click.option('--allowUntrustedSsl', 'allow_untrusted_ssl', is_flag=True, default=False, + help='Do not verify SSL certificate (for wss:// urls)') @click.option('--request', is_flag=True, help='Request response') @click.option('--stream', is_flag=True, @@ -120,11 +135,13 @@ async def create_client(parsed_uri, data_mime_type, metadata_mime_type, setup_pa help='Disable the output on next') @click.option('--version', is_flag=True, help='Print version') +@click.option('--httpHeader', 'http_header', multiple=True, help='ws/wss headers') @click.argument('uri') async def command(data, load, metadata, route_value, auth_simple, auth_bearer, - limit_rate, take_n, + limit_rate, take_n, allow_untrusted_ssl, setup_data, setup_metadata, + http_header, data_mime_type, metadata_mime_type, request, stream, channel, fnf, uri, debug, version, quiet): @@ -144,12 +161,16 @@ async def command(data, load, if take_n == 0: return + http_headers = parse_headers(http_header) + composite_items = build_composite_metadata(auth_simple, route_value, auth_bearer) async with create_client(parse_uri(uri), data_mime_type, normalize_metadata_mime_type(composite_items, metadata_mime_type), - create_setup_payload(setup_data, setup_metadata) + create_setup_payload(setup_data, setup_metadata), + allow_untrusted_ssl=allow_untrusted_ssl, + http_headers=http_headers ) as client: result = await execute_request(client, @@ -164,6 +185,19 @@ async def command(data, load, output_result(result) +def parse_headers(http_headers): + if safe_len(http_headers) > 0: + headers = dict() + + for header in http_headers: + parts = header.split('=', 2) + headers[parts[0]] = parts[1] + + return headers + + return None + + def normalize_metadata_mime_type(composite_items, metadata_mime_type): if len(composite_items) > 0: metadata_mime_type = WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA From a963321c0604bf3a1a5e686b2a03326eb9de687c Mon Sep 17 00:00:00 2001 From: Gabriel Shaar Date: Tue, 25 Oct 2022 15:09:14 +0300 Subject: [PATCH 19/21] cli - reorganized help options --- rsocket/cli/command.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index 9aafe427..3c2e80db 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -95,6 +95,14 @@ async def create_client(parsed_uri, @click.command(help='Supported connection strings: tcp/ws/wss') +@click.option('--request', is_flag=True, + help='Request response') +@click.option('--stream', is_flag=True, + help='Request stream') +@click.option('--channel', is_flag=True, + help='Request channel') +@click.option('--fnf', is_flag=True, + help='Fire and Forget') @click.option('-d', '--data', is_flag=False, help='Data. Use "-" to read data from standard input. (default: )') @click.option('-l', '--load', is_flag=False, @@ -121,21 +129,14 @@ async def create_client(parsed_uri, help='MimeType for metadata (default:application/json)') @click.option('--allowUntrustedSsl', 'allow_untrusted_ssl', is_flag=True, default=False, help='Do not verify SSL certificate (for wss:// urls)') -@click.option('--request', is_flag=True, - help='Request response') -@click.option('--stream', is_flag=True, - help='Request stream') -@click.option('--channel', is_flag=True, - help='Request channel') -@click.option('--fnf', is_flag=True, - help='Fire and Forget') +@click.option('--httpHeader', 'http_header', multiple=True, + help='ws/wss headers') @click.option('--debug', is_flag=True, help='Show debug log') @click.option('--quiet', '-q', is_flag=True, help='Disable the output on next') @click.option('--version', is_flag=True, help='Print version') -@click.option('--httpHeader', 'http_header', multiple=True, help='ws/wss headers') @click.argument('uri') async def command(data, load, metadata, route_value, auth_simple, auth_bearer, From 16c91d8e6518f99a2a0ada4d1a13ace7b401118b Mon Sep 17 00:00:00 2001 From: Gabriel Shaar Date: Tue, 25 Oct 2022 15:12:01 +0300 Subject: [PATCH 20/21] cli - reorganized help options --- rsocket/cli/command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsocket/cli/command.py b/rsocket/cli/command.py index 3c2e80db..b87072c1 100644 --- a/rsocket/cli/command.py +++ b/rsocket/cli/command.py @@ -94,7 +94,7 @@ async def create_client(parsed_uri, yield AwaitableRSocket(client) -@click.command(help='Supported connection strings: tcp/ws/wss') +@click.command(name='rsocket-py', help='Supported connection strings: tcp/ws/wss') @click.option('--request', is_flag=True, help='Request response') @click.option('--stream', is_flag=True, From b78c8d3f4eb19347eefdc8e06da794e278d6f077 Mon Sep 17 00:00:00 2001 From: Gabriel Shaar Date: Tue, 25 Oct 2022 15:23:20 +0300 Subject: [PATCH 21/21] server with routing example, added ws/wss transport options --- examples/server_with_routing.py | 56 +++++++++++++++++++++++++++------ 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/examples/server_with_routing.py b/examples/server_with_routing.py index b1b7d31e..aeb406eb 100644 --- a/examples/server_with_routing.py +++ b/examples/server_with_routing.py @@ -1,11 +1,15 @@ import asyncio import logging -import sys +import ssl from dataclasses import dataclass from datetime import timedelta from typing import Optional +import asyncclick as click +from aiohttp import web + from examples.example_fixtures import large_data1 +from examples.fixtures import cert_gen from examples.response_channel import response_stream_1, LoggingSubscriber from response_stream import response_stream_2 from rsocket.extensions.authentication import Authentication, AuthenticationSimple @@ -15,6 +19,7 @@ from rsocket.routing.request_router import RequestRouter from rsocket.routing.routing_request_handler import RoutingRequestHandler from rsocket.rsocket_server import RSocketServer +from rsocket.transports.aiohttp_websocket import TransportAioHttpWebsocket from rsocket.transports.tcp import TransportTCP router = RequestRouter() @@ -106,16 +111,49 @@ def handle_client(reader, writer): RSocketServer(TransportTCP(reader, writer), handler_factory=handler_factory) -async def run_server(server_port): - logging.info('Starting server at localhost:%s', server_port) +def websocket_handler_factory(**kwargs): + async def websocket_handler(request): + ws = web.WebSocketResponse() + await ws.prepare(request) + transport = TransportAioHttpWebsocket(ws) + RSocketServer(transport, **kwargs) + await transport.handle_incoming_ws_messages() + return ws + + return websocket_handler + + +@click.command() +@click.option('--port', help='Port to listen on', default=6565, type=int) +@click.option('--with-ssl', is_flag=True, help='Enable SSL mode') +@click.option('--transport', is_flag=False, default='tcp') +async def start_server(with_ssl: bool, port: int, transport: str): + logging.basicConfig(level=logging.DEBUG) + + logging.info(f'Starting {transport} server at localhost:{port}') + + if transport in ['ws', 'wss']: + app = web.Application() + app.add_routes([web.get('/', websocket_handler_factory(handler_factory=handler_factory))]) + + if with_ssl: + ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + + with cert_gen() as (certificate, key): + ssl_context.load_cert_chain(certificate, key) + else: + ssl_context = None - server = await asyncio.start_server(handle_client, 'localhost', server_port) + await web._run_app(app, port=port, ssl_context=ssl_context) + elif transport == 'tcp': - async with server: - await server.serve_forever() + server = await asyncio.start_server(handle_client, 'localhost', port) + + async with server: + await server.serve_forever() + else: + raise Exception(f'Unsupported transport {transport}') if __name__ == '__main__': - port = sys.argv[1] if len(sys.argv) > 1 else 6565 - logging.basicConfig(level=logging.DEBUG) - asyncio.run(run_server(port)) + start_server()