From 51f79a7dd30db0075961f802b1340ef339f3c1e7 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Thu, 24 Mar 2022 18:23:41 +0200 Subject: [PATCH 01/15] initial draft of quic implementation --- requirements.txt | 1 + ...act_websocket.py => abstract_messaging.py} | 4 +- rsocket/transports/aiohttp_websocket.py | 6 +- rsocket/transports/aioquic_transport.py | 80 +++++++++++++++++++ rsocket/transports/quart_websocket.py | 6 +- rsocket/transports/tcp.py | 2 +- rsocket/transports/transport.py | 2 +- setup.py | 3 +- tests/rsocket/test_connection_lost.py | 4 +- 9 files changed, 95 insertions(+), 13 deletions(-) rename rsocket/transports/{abstract_websocket.py => abstract_messaging.py} (72%) create mode 100644 rsocket/transports/aioquic_transport.py diff --git a/requirements.txt b/requirements.txt index e13e5ee3..12d5e8a5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ Rx==3.2.0 aiohttp==3.8.1 quart==0.16.3 coveralls==3.3.1 +aioquic==0.9.19 \ No newline at end of file diff --git a/rsocket/transports/abstract_websocket.py b/rsocket/transports/abstract_messaging.py similarity index 72% rename from rsocket/transports/abstract_websocket.py rename to rsocket/transports/abstract_messaging.py index 1918207b..6d1133f5 100644 --- a/rsocket/transports/abstract_websocket.py +++ b/rsocket/transports/abstract_messaging.py @@ -4,12 +4,12 @@ from rsocket.transports.transport import Transport -class AbstractWebsocketTransport(Transport, metaclass=abc.ABCMeta): +class AbstractMessagingTransport(Transport, metaclass=abc.ABCMeta): def __init__(self): super().__init__() self._incoming_frame_queue = asyncio.Queue() - async def next_frame_generator(self, is_server_alive): + async def next_frame_generator(self, is_server_alive: bool): frame = await self._incoming_frame_queue.get() async def frame_generator(): diff --git a/rsocket/transports/aiohttp_websocket.py b/rsocket/transports/aiohttp_websocket.py index b9a8f1da..e67e44d0 100644 --- a/rsocket/transports/aiohttp_websocket.py +++ b/rsocket/transports/aiohttp_websocket.py @@ -9,7 +9,7 @@ from rsocket.logger import logger from rsocket.rsocket_client import RSocketClient from rsocket.rsocket_server import RSocketServer -from rsocket.transports.abstract_websocket import AbstractWebsocketTransport +from rsocket.transports.abstract_messaging import AbstractMessagingTransport @asynccontextmanager @@ -35,7 +35,7 @@ async def websocket_handler(request): return websocket_handler -class TransportAioHttpClient(AbstractWebsocketTransport): +class TransportAioHttpClient(AbstractMessagingTransport): def __init__(self, url): super().__init__() @@ -69,7 +69,7 @@ async def close(self): await self._message_handler -class TransportAioHttpWebsocket(AbstractWebsocketTransport): +class TransportAioHttpWebsocket(AbstractMessagingTransport): def __init__(self, websocket): super().__init__() self._ws = websocket diff --git a/rsocket/transports/aioquic_transport.py b/rsocket/transports/aioquic_transport.py new file mode 100644 index 00000000..183273bf --- /dev/null +++ b/rsocket/transports/aioquic_transport.py @@ -0,0 +1,80 @@ +import asyncio +from typing import cast + +from aioquic.asyncio import QuicConnectionProtocol, connect, serve +from aioquic.quic.configuration import QuicConfiguration +from aioquic.quic.events import QuicEvent, StreamDataReceived + +from rsocket.frame import Frame +from rsocket.rsocket_server import RSocketServer +from rsocket.transports.abstract_messaging import AbstractMessagingTransport +from rsocket.transports.transport import Transport + + +def rsocket_connect(host: str, port: int, configuration: QuicConfiguration = None) -> Transport: + if configuration is None: + configuration = QuicConfiguration(alpn_protocols=["doq-i03"], is_client=True) + + client = cast(RSocketQuicProtocol, await connect( + host, + port, + configuration=configuration, + create_protocol=RSocketQuicProtocol, + ).__anext__()) + + return RSocketQuicTransport(client) + + +async def rsocket_serve(host: str, + port: int, + handler_factory, + configuration: QuicConfiguration = None, + **kwargs): + def protocol_factory(*protocol_args, **protocol_kwargs): + protocol = RSocketQuicProtocol(*protocol_args, **protocol_kwargs) + RSocketServer(RSocketQuicTransport(protocol), handler_factory=handler_factory, **kwargs) + return protocol + + return asyncio.create_task(serve( + host, + port, + create_protocol=protocol_factory, + configuration=configuration)) + + +class RSocketQuicProtocol(QuicConnectionProtocol): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.frame_queue = asyncio.Queue() + + async def query(self, frame: Frame) -> None: + data = frame.serialize() + + stream_id = self._quic.get_next_available_stream_id() + self._quic.send_stream_data(stream_id, data, end_stream=True) + self.transmit() + + def quic_event_received(self, event: QuicEvent) -> None: + if isinstance(event, StreamDataReceived): + self.frame_queue.put_nowait(event.data) + + +class RSocketQuicTransport(AbstractMessagingTransport): + def __init__(self, quic_protocol: RSocketQuicProtocol): + super().__init__() + self._quic_protocol = quic_protocol + self._incoming_bytes_queue = quic_protocol.frame_queue + asyncio.create_task(self.incoming_data_listener()) + + async def send_frame(self, frame: Frame): + await self._quic_protocol.query(frame) + + async def incoming_data_listener(self): + while True: + data = await self._incoming_bytes_queue.get() + + async for frame in self._frame_parser.receive_data(data, 0): + self._incoming_frame_queue.put_nowait(frame) + + async def close(self): + self._quic_protocol.close() diff --git a/rsocket/transports/quart_websocket.py b/rsocket/transports/quart_websocket.py index 7fdf0a65..ba456d95 100644 --- a/rsocket/transports/quart_websocket.py +++ b/rsocket/transports/quart_websocket.py @@ -5,12 +5,12 @@ from rsocket.frame import Frame from rsocket.logger import logger from rsocket.rsocket_server import RSocketServer -from rsocket.transports.abstract_websocket import AbstractWebsocketTransport +from rsocket.transports.abstract_messaging import AbstractMessagingTransport async def websocket_handler(*args, on_server_create=None, **kwargs): transport = TransportQuartWebsocket() - server = RSocketServer(transport=transport, *args, **kwargs) + server = RSocketServer(transport, *args, **kwargs) if on_server_create is not None: on_server_create(server) @@ -18,7 +18,7 @@ async def websocket_handler(*args, on_server_create=None, **kwargs): await transport.handle_incoming_ws_messages() -class TransportQuartWebsocket(AbstractWebsocketTransport): +class TransportQuartWebsocket(AbstractMessagingTransport): async def handle_incoming_ws_messages(self): try: diff --git a/rsocket/transports/tcp.py b/rsocket/transports/tcp.py index 376123c2..8263a194 100644 --- a/rsocket/transports/tcp.py +++ b/rsocket/transports/tcp.py @@ -23,7 +23,7 @@ async def close(self): self._writer.close() await self._writer.wait_closed() - async def next_frame_generator(self, is_server_alive): + async def next_frame_generator(self, is_server_alive: bool): with wrap_transport_exception(): data = await self._reader.read(1024) diff --git a/rsocket/transports/transport.py b/rsocket/transports/transport.py index 9ef41777..c461571c 100644 --- a/rsocket/transports/transport.py +++ b/rsocket/transports/transport.py @@ -17,7 +17,7 @@ async def send_frame(self, frame: Frame): ... @abc.abstractmethod - async def next_frame_generator(self, is_server_alive): + async def next_frame_generator(self, is_server_alive: bool): ... @abc.abstractmethod diff --git a/setup.py b/setup.py index 49567ab8..da387011 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,8 @@ extras_require={ 'rx': {'Rx >= 3.0.0'}, 'aiohttp': {'aiohttp >= 3.0.0'}, - 'quart': {'quart >= 0.15.0'} + 'quart': {'quart >= 0.15.0'}, + 'quic': {'aioquic >= 0.9.0'} }, classifiers=[ 'Development Status :: 3 - Alpha', diff --git a/tests/rsocket/test_connection_lost.py b/tests/rsocket/test_connection_lost.py index 80be7062..69043689 100644 --- a/tests/rsocket/test_connection_lost.py +++ b/tests/rsocket/test_connection_lost.py @@ -110,7 +110,7 @@ async def connect(self): async def send_frame(self, frame: Frame): pass - async def next_frame_generator(self, is_server_alive): + async def next_frame_generator(self, is_server_alive: bool): pass async def close(self): @@ -118,7 +118,7 @@ async def close(self): @pytest.mark.allow_error_log(regex_filter='Connection error') -async def test_connection_failure(unused_tcp_port): +async def test_connection_failure(unused_tcp_port: int): index_iterator = iter(range(1, 3)) wait_for_server = Event() From 209e105018c43dc9efa54d55964e0e3138d14f46 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Thu, 24 Mar 2022 19:59:49 +0200 Subject: [PATCH 02/15] first attempt at integrating to unit tests. minor fixes. stopped due to required certificate for tests --- rsocket/transports/aioquic_transport.py | 26 ++++++--- tests/conftest.py | 73 ++++++++++++++++++++++++- tests/rsocket/test_request_response.py | 1 + 3 files changed, 90 insertions(+), 10 deletions(-) diff --git a/rsocket/transports/aioquic_transport.py b/rsocket/transports/aioquic_transport.py index 183273bf..99808fff 100644 --- a/rsocket/transports/aioquic_transport.py +++ b/rsocket/transports/aioquic_transport.py @@ -11,7 +11,7 @@ from rsocket.transports.transport import Transport -def rsocket_connect(host: str, port: int, configuration: QuicConfiguration = None) -> Transport: +async def rsocket_connect(host: str, port: int, configuration: QuicConfiguration = None) -> Transport: if configuration is None: configuration = QuicConfiguration(alpn_protocols=["doq-i03"], is_client=True) @@ -20,19 +20,29 @@ def rsocket_connect(host: str, port: int, configuration: QuicConfiguration = Non port, configuration=configuration, create_protocol=RSocketQuicProtocol, - ).__anext__()) + ).__aenter__()) return RSocketQuicTransport(client) -async def rsocket_serve(host: str, - port: int, - handler_factory, - configuration: QuicConfiguration = None, - **kwargs): +def rsocket_serve(host: str, + port: int, + configuration: QuicConfiguration = None, + on_server_create=None, + **kwargs): + if configuration is None: + configuration = QuicConfiguration( + alpn_protocols=["doq-i03"], + is_client=False + ) + def protocol_factory(*protocol_args, **protocol_kwargs): protocol = RSocketQuicProtocol(*protocol_args, **protocol_kwargs) - RSocketServer(RSocketQuicTransport(protocol), handler_factory=handler_factory, **kwargs) + server = RSocketServer(RSocketQuicTransport(protocol), **kwargs) + + if on_server_create is not None: + on_server_create(server) + return protocol return asyncio.create_task(serve( diff --git a/tests/conftest.py b/tests/conftest.py index 888d4efd..7dbb8de8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,7 +5,8 @@ from asyncio import Event from asyncio.base_events import Server from contextlib import asynccontextmanager -from typing import Optional +import socket +from typing import Optional, Tuple import pytest @@ -15,6 +16,7 @@ from rsocket.rsocket_client import RSocketClient from rsocket.rsocket_server import RSocketServer from rsocket.transports.aiohttp_websocket import websocket_client, websocket_handler_factory +from rsocket.transports.aioquic_transport import rsocket_connect, rsocket_serve from rsocket.transports.tcp import TransportTCP from tests.rsocket.helpers import assert_no_open_streams @@ -24,7 +26,8 @@ tested_transports = [ 'tcp', 'aiohttp', - 'quart' + 'quart', + 'quic' ] @@ -85,6 +88,8 @@ def get_pipe_factory_by_id(aiohttp_raw_server, transport_id: str): return pipe_factory_quart_websocket if transport_id == 'aiohttp': return functools.partial(pipe_factory_aiohttp_websocket, aiohttp_raw_server) + if transport_id == 'quic': + return pipe_factory_quic @pytest.fixture @@ -175,6 +180,39 @@ async def finalize() -> None: event_loop.run_until_complete(finalize()) +@asynccontextmanager +async def pipe_factory_quic(unused_tcp_port, + client_arguments=None, + server_arguments=None): + server: Optional[RSocketBase] = None + wait_for_server = Event() + + def store_server(new_server): + nonlocal server + server = new_server + wait_for_server.set() + + quic_server = await rsocket_serve(host='localhost', + port=unused_tcp_port, + on_server_create=store_server, + **(server_arguments or {})) + + # test_overrides = {'keep_alive_period': timedelta(minutes=20)} + client_arguments = client_arguments or {} + # client_arguments.update(test_overrides) + transport = await rsocket_connect('localhost', unused_tcp_port) + + async with RSocketClient(single_transport_provider(transport), + **client_arguments) as client: + await wait_for_server.wait() + yield server, client + await server.close() + assert_no_open_streams(client, server) + + quic_server.cancel() + await quic_server + + @asynccontextmanager async def pipe_factory_aiohttp_websocket(aiohttp_raw_server, unused_tcp_port, client_arguments=None, server_arguments=None): @@ -236,3 +274,34 @@ async def ws(): await server_task except asyncio.CancelledError: pass + + +def generate_openssl_certificate_and_key() -> Tuple[str, bytes]: + import random + from OpenSSL import crypto + + pkey = crypto.PKey() + pkey.generate_key(crypto.TYPE_RSA, 2048) + + x509 = crypto.X509() + subject = x509.get_subject() + subject.commonName = socket.gethostname() + x509.set_issuer(subject) + x509.gmtime_adj_notBefore(0) + x509.gmtime_adj_notAfter(5 * 365 * 24 * 60 * 60) + x509.set_pubkey(pkey) + x509.set_serial_number(random.randrange(100000)) + x509.set_version(2) + x509.add_extensions([ + crypto.X509Extension(b'subjectAltName', False, + ','.join([ + 'DNS:%s' % socket.gethostname(), + 'DNS:*.%s' % socket.gethostname(), + 'DNS:localhost', + 'DNS:*.localhost']).encode()), + crypto.X509Extension(b"basicConstraints", True, b"CA:false")]) + + x509.sign(pkey, 'SHA256') + + return (crypto.dump_certificate(crypto.FILETYPE_PEM, x509), + crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)) diff --git a/tests/rsocket/test_request_response.py b/tests/rsocket/test_request_response.py index 6db97ba0..f8df0854 100644 --- a/tests/rsocket/test_request_response.py +++ b/tests/rsocket/test_request_response.py @@ -10,6 +10,7 @@ from tests.rsocket.helpers import future_from_payload +@pytest.mark.timeout(4) async def test_request_response_awaitable_wrapper(pipe): class Handler(BaseRequestHandler): async def request_response(self, request: Payload): From 8be8eb6984197197b3b6b1b681c23daacefbf920 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Fri, 25 Mar 2022 16:13:54 +0300 Subject: [PATCH 03/15] refactored large conftest to smaller code segments. quic protocol connects but only sends one frame and closes. --- rsocket/transports/aioquic_transport.py | 5 +- tests/conftest.py | 224 +----------------------- tests/tools/__init__.py | 0 tests/tools/fixtures_aiohttp.py | 56 ++++++ tests/tools/fixtures_aioquic.py | 102 +++++++++++ tests/tools/fixtures_quart.py | 46 +++++ tests/tools/fixtures_tcp.py | 67 +++++++ 7 files changed, 279 insertions(+), 221 deletions(-) create mode 100644 tests/tools/__init__.py create mode 100644 tests/tools/fixtures_aiohttp.py create mode 100644 tests/tools/fixtures_aioquic.py create mode 100644 tests/tools/fixtures_quart.py create mode 100644 tests/tools/fixtures_tcp.py diff --git a/rsocket/transports/aioquic_transport.py b/rsocket/transports/aioquic_transport.py index 99808fff..52e9076e 100644 --- a/rsocket/transports/aioquic_transport.py +++ b/rsocket/transports/aioquic_transport.py @@ -13,7 +13,9 @@ async def rsocket_connect(host: str, port: int, configuration: QuicConfiguration = None) -> Transport: if configuration is None: - configuration = QuicConfiguration(alpn_protocols=["doq-i03"], is_client=True) + configuration = QuicConfiguration( + is_client=True + ) client = cast(RSocketQuicProtocol, await connect( host, @@ -32,7 +34,6 @@ def rsocket_serve(host: str, **kwargs): if configuration is None: configuration = QuicConfiguration( - alpn_protocols=["doq-i03"], is_client=False ) diff --git a/tests/conftest.py b/tests/conftest.py index 7dbb8de8..f6e54777 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,25 +1,15 @@ -import asyncio import functools import logging import re -from asyncio import Event -from asyncio.base_events import Server -from contextlib import asynccontextmanager -import socket -from typing import Optional, Tuple import pytest from rsocket.frame_parser import FrameParser -from rsocket.helpers import single_transport_provider -from rsocket.rsocket_base import RSocketBase -from rsocket.rsocket_client import RSocketClient -from rsocket.rsocket_server import RSocketServer -from rsocket.transports.aiohttp_websocket import websocket_client, websocket_handler_factory -from rsocket.transports.aioquic_transport import rsocket_connect, rsocket_serve - -from rsocket.transports.tcp import TransportTCP -from tests.rsocket.helpers import assert_no_open_streams +# noinspection PyUnresolvedReferences +from tests.tools.fixtures_aiohttp import pipe_factory_aiohttp_websocket, aiohttp_raw_server +from tests.tools.fixtures_aioquic import pipe_factory_quic +from tests.tools.fixtures_quart import pipe_factory_quart_websocket +from tests.tools.fixtures_tcp import pipe_factory_tcp logging.basicConfig(level=logging.DEBUG) @@ -98,210 +88,6 @@ async def pipe_tcp_without_auto_connect(unused_tcp_port): yield components -@asynccontextmanager -async def pipe_factory_tcp(unused_tcp_port, client_arguments=None, server_arguments=None, auto_connect_client=True): - wait_for_server = Event() - - def session(*connection): - nonlocal server - server = RSocketServer(TransportTCP(*connection), **(server_arguments or {})) - wait_for_server.set() - - async def start(): - nonlocal service, client - service = await asyncio.start_server(session, host, port) - connection = await asyncio.open_connection(host, port) - nonlocal client_arguments - # test_overrides = {'keep_alive_period': timedelta(minutes=20)} - client_arguments = client_arguments or {} - - # client_arguments.update(test_overrides) - - client = RSocketClient(single_transport_provider(TransportTCP(*connection)), **(client_arguments or {})) - - if auto_connect_client: - await client.connect() - - async def finish(): - if auto_connect_client: - await client.close() - - await server.close() - - service.close() - - service: Optional[Server] = None - server: Optional[RSocketServer] = None - client: Optional[RSocketClient] = None - port = unused_tcp_port - host = 'localhost' - - await start() - - async def server_provider(): - await wait_for_server.wait() - return server - - try: - if auto_connect_client: - await wait_for_server.wait() - yield server, client - else: - yield server_provider, client - - assert_no_open_streams(client, server) - finally: - await finish() - - @pytest.fixture def frame_parser(): return FrameParser() - - -@pytest.fixture -def aiohttp_raw_server(event_loop: asyncio.BaseEventLoop, unused_tcp_port): - from aiohttp.test_utils import RawTestServer - - servers = [] - - async def go(handler, *args, **kwargs): # type: ignore[no-untyped-def] - server = RawTestServer(handler, port=unused_tcp_port) - await server.start_server(**kwargs) - servers.append(server) - return server - - yield go - - async def finalize() -> None: - while servers: - await servers.pop().close() - - event_loop.run_until_complete(finalize()) - - -@asynccontextmanager -async def pipe_factory_quic(unused_tcp_port, - client_arguments=None, - server_arguments=None): - server: Optional[RSocketBase] = None - wait_for_server = Event() - - def store_server(new_server): - nonlocal server - server = new_server - wait_for_server.set() - - quic_server = await rsocket_serve(host='localhost', - port=unused_tcp_port, - on_server_create=store_server, - **(server_arguments or {})) - - # test_overrides = {'keep_alive_period': timedelta(minutes=20)} - client_arguments = client_arguments or {} - # client_arguments.update(test_overrides) - transport = await rsocket_connect('localhost', unused_tcp_port) - - async with RSocketClient(single_transport_provider(transport), - **client_arguments) as client: - await wait_for_server.wait() - yield server, client - await server.close() - assert_no_open_streams(client, server) - - quic_server.cancel() - await quic_server - - -@asynccontextmanager -async def pipe_factory_aiohttp_websocket(aiohttp_raw_server, unused_tcp_port, client_arguments=None, - server_arguments=None): - server: Optional[RSocketBase] = None - wait_for_server = Event() - - def store_server(new_server): - nonlocal server - server = new_server - wait_for_server.set() - - await aiohttp_raw_server(websocket_handler_factory(on_server_create=store_server, **(server_arguments or {}))) - - # test_overrides = {'keep_alive_period': timedelta(minutes=20)} - client_arguments = client_arguments or {} - # client_arguments.update(test_overrides) - - async with websocket_client('http://localhost:{}'.format(unused_tcp_port), - **client_arguments) as client: - await wait_for_server.wait() - yield server, client - await server.close() - assert_no_open_streams(client, server) - - -@asynccontextmanager -async def pipe_factory_quart_websocket(unused_tcp_port, client_arguments=None, server_arguments=None): - from quart import Quart - from rsocket.transports.quart_websocket import websocket_handler - - app = Quart(__name__) - server: Optional[RSocketBase] = None - wait_for_server = Event() - - def store_server(new_server): - nonlocal server - server = new_server - wait_for_server.set() - - @app.websocket("/") - async def ws(): - await websocket_handler(on_server_create=store_server, **(server_arguments or {})) - # test_overrides = {'keep_alive_period': timedelta(minutes=20)} - - client_arguments = client_arguments or {} - # client_arguments.update(test_overrides) - server_task = asyncio.create_task(app.run_task(port=unused_tcp_port)) - await asyncio.sleep(0.1) - - async with websocket_client('http://localhost:{}'.format(unused_tcp_port), - **client_arguments) as client: - await wait_for_server.wait() - yield server, client - await server.close() - assert_no_open_streams(client, server) - - try: - server_task.cancel() - await server_task - except asyncio.CancelledError: - pass - - -def generate_openssl_certificate_and_key() -> Tuple[str, bytes]: - import random - from OpenSSL import crypto - - pkey = crypto.PKey() - pkey.generate_key(crypto.TYPE_RSA, 2048) - - x509 = crypto.X509() - subject = x509.get_subject() - subject.commonName = socket.gethostname() - x509.set_issuer(subject) - x509.gmtime_adj_notBefore(0) - x509.gmtime_adj_notAfter(5 * 365 * 24 * 60 * 60) - x509.set_pubkey(pkey) - x509.set_serial_number(random.randrange(100000)) - x509.set_version(2) - x509.add_extensions([ - crypto.X509Extension(b'subjectAltName', False, - ','.join([ - 'DNS:%s' % socket.gethostname(), - 'DNS:*.%s' % socket.gethostname(), - 'DNS:localhost', - 'DNS:*.localhost']).encode()), - crypto.X509Extension(b"basicConstraints", True, b"CA:false")]) - - x509.sign(pkey, 'SHA256') - - return (crypto.dump_certificate(crypto.FILETYPE_PEM, x509), - crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)) diff --git a/tests/tools/__init__.py b/tests/tools/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/tools/fixtures_aiohttp.py b/tests/tools/fixtures_aiohttp.py new file mode 100644 index 00000000..df3abb50 --- /dev/null +++ b/tests/tools/fixtures_aiohttp.py @@ -0,0 +1,56 @@ +import asyncio +from asyncio import Event +from contextlib import asynccontextmanager +from typing import Optional + +import pytest + +from rsocket.rsocket_base import RSocketBase +from rsocket.transports.aiohttp_websocket import websocket_client, websocket_handler_factory +from tests.rsocket.helpers import assert_no_open_streams + + +@pytest.fixture +def aiohttp_raw_server(event_loop: asyncio.BaseEventLoop, unused_tcp_port): + from aiohttp.test_utils import RawTestServer + + servers = [] + + async def go(handler, *args, **kwargs): # type: ignore[no-untyped-def] + server = RawTestServer(handler, port=unused_tcp_port) + await server.start_server(**kwargs) + servers.append(server) + return server + + yield go + + async def finalize() -> None: + while servers: + await servers.pop().close() + + event_loop.run_until_complete(finalize()) + + +@asynccontextmanager +async def pipe_factory_aiohttp_websocket(aiohttp_raw_server, unused_tcp_port, client_arguments=None, + server_arguments=None): + server: Optional[RSocketBase] = None + wait_for_server = Event() + + def store_server(new_server): + nonlocal server + server = new_server + wait_for_server.set() + + await aiohttp_raw_server(websocket_handler_factory(on_server_create=store_server, **(server_arguments or {}))) + + # test_overrides = {'keep_alive_period': timedelta(minutes=20)} + client_arguments = client_arguments or {} + # client_arguments.update(test_overrides) + + async with websocket_client('http://localhost:{}'.format(unused_tcp_port), + **client_arguments) as client: + await wait_for_server.wait() + yield server, client + await server.close() + assert_no_open_streams(client, server) \ No newline at end of file diff --git a/tests/tools/fixtures_aioquic.py b/tests/tools/fixtures_aioquic.py new file mode 100644 index 00000000..7fec067b --- /dev/null +++ b/tests/tools/fixtures_aioquic.py @@ -0,0 +1,102 @@ +import datetime +from asyncio import Event +from contextlib import asynccontextmanager +from typing import Optional + +from aioquic.quic.configuration import QuicConfiguration +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec + +from rsocket.helpers import single_transport_provider +from rsocket.rsocket_base import RSocketBase +from rsocket.rsocket_client import RSocketClient +from rsocket.transports.aioquic_transport import rsocket_connect, rsocket_serve +from tests.rsocket.helpers import assert_no_open_streams + + +def generate_certificate(*, alternative_names, common_name, hash_algorithm, key): + subject = issuer = x509.Name( + [x509.NameAttribute(x509.NameOID.COMMON_NAME, common_name)] + ) + + builder = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.datetime.utcnow()) + .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=10)) + ) + if alternative_names: + builder = builder.add_extension( + x509.SubjectAlternativeName( + [x509.DNSName(name) for name in alternative_names] + ), + critical=False, + ) + cert = builder.sign(key, hash_algorithm) + return cert, key + + +def generate_ec_certificate(common_name, alternative_names=None, curve=ec.SECP256R1): + if alternative_names is None: + alternative_names = [] + + key = ec.generate_private_key(curve=curve) + return generate_certificate( + alternative_names=alternative_names, + common_name=common_name, + hash_algorithm=hashes.SHA256(), + key=key, + ) + + +@asynccontextmanager +async def pipe_factory_quic(unused_tcp_port, + client_arguments=None, + server_arguments=None): + certificate, private_key = generate_ec_certificate(common_name="localhost") + + server_configuration = QuicConfiguration( + certificate=certificate, + private_key=private_key, + is_client=False + ) + + client_configuration = QuicConfiguration( + is_client=True + ) + cadata = certificate.public_bytes(serialization.Encoding.PEM) + client_configuration.load_verify_locations(cadata=cadata, cafile=None) + + server: Optional[RSocketBase] = None + wait_for_server = Event() + + def store_server(new_server): + nonlocal server + server = new_server + wait_for_server.set() + + quic_server = await rsocket_serve(host='localhost', + port=unused_tcp_port, + configuration=server_configuration, + on_server_create=store_server, + **(server_arguments or {})) + + # test_overrides = {'keep_alive_period': timedelta(minutes=20)} + client_arguments = client_arguments or {} + # client_arguments.update(test_overrides) + transport = await rsocket_connect('localhost', unused_tcp_port, + configuration=client_configuration) + + async with RSocketClient(single_transport_provider(transport), + **client_arguments) as client: + await wait_for_server.wait() + yield server, client + await server.close() + assert_no_open_streams(client, server) + + quic_server.cancel() + await quic_server diff --git a/tests/tools/fixtures_quart.py b/tests/tools/fixtures_quart.py new file mode 100644 index 00000000..f9011ddb --- /dev/null +++ b/tests/tools/fixtures_quart.py @@ -0,0 +1,46 @@ +import asyncio +from asyncio import Event +from contextlib import asynccontextmanager +from typing import Optional + +from rsocket.rsocket_base import RSocketBase +from rsocket.transports.aiohttp_websocket import websocket_client +from tests.rsocket.helpers import assert_no_open_streams + + +@asynccontextmanager +async def pipe_factory_quart_websocket(unused_tcp_port, client_arguments=None, server_arguments=None): + from quart import Quart + from rsocket.transports.quart_websocket import websocket_handler + + app = Quart(__name__) + server: Optional[RSocketBase] = None + wait_for_server = Event() + + def store_server(new_server): + nonlocal server + server = new_server + wait_for_server.set() + + @app.websocket("/") + async def ws(): + await websocket_handler(on_server_create=store_server, **(server_arguments or {})) + # test_overrides = {'keep_alive_period': timedelta(minutes=20)} + + client_arguments = client_arguments or {} + # client_arguments.update(test_overrides) + server_task = asyncio.create_task(app.run_task(port=unused_tcp_port)) + await asyncio.sleep(0.1) + + async with websocket_client('http://localhost:{}'.format(unused_tcp_port), + **client_arguments) as client: + await wait_for_server.wait() + yield server, client + await server.close() + assert_no_open_streams(client, server) + + try: + server_task.cancel() + await server_task + except asyncio.CancelledError: + pass diff --git a/tests/tools/fixtures_tcp.py b/tests/tools/fixtures_tcp.py new file mode 100644 index 00000000..f2227a52 --- /dev/null +++ b/tests/tools/fixtures_tcp.py @@ -0,0 +1,67 @@ +import asyncio +from asyncio import Event +from asyncio.base_events import Server +from contextlib import asynccontextmanager +from typing import Optional + +from rsocket.helpers import single_transport_provider +from rsocket.rsocket_client import RSocketClient +from rsocket.rsocket_server import RSocketServer +from rsocket.transports.tcp import TransportTCP +from tests.rsocket.helpers import assert_no_open_streams + + +@asynccontextmanager +async def pipe_factory_tcp(unused_tcp_port, client_arguments=None, server_arguments=None, auto_connect_client=True): + wait_for_server = Event() + + def session(*connection): + nonlocal server + server = RSocketServer(TransportTCP(*connection), **(server_arguments or {})) + wait_for_server.set() + + async def start(): + nonlocal service, client + service = await asyncio.start_server(session, host, port) + connection = await asyncio.open_connection(host, port) + nonlocal client_arguments + # test_overrides = {'keep_alive_period': timedelta(minutes=20)} + client_arguments = client_arguments or {} + + # client_arguments.update(test_overrides) + + client = RSocketClient(single_transport_provider(TransportTCP(*connection)), **(client_arguments or {})) + + if auto_connect_client: + await client.connect() + + async def finish(): + if auto_connect_client: + await client.close() + + await server.close() + + service.close() + + service: Optional[Server] = None + server: Optional[RSocketServer] = None + client: Optional[RSocketClient] = None + port = unused_tcp_port + host = 'localhost' + + await start() + + async def server_provider(): + await wait_for_server.wait() + return server + + try: + if auto_connect_client: + await wait_for_server.wait() + yield server, client + else: + yield server_provider, client + + assert_no_open_streams(client, server) + finally: + await finish() From cdffe8d4964cb69f9eec1942a22215870374a546 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Fri, 25 Mar 2022 17:01:24 +0300 Subject: [PATCH 04/15] initial working quic support. --- examples/client_quic.py | 0 examples/server_quic.py | 0 rsocket/transports/aioquic_transport.py | 30 ++++++++++++------------- tests/tools/fixtures_aioquic.py | 23 +++++++++---------- 4 files changed, 26 insertions(+), 27 deletions(-) create mode 100644 examples/client_quic.py create mode 100644 examples/server_quic.py diff --git a/examples/client_quic.py b/examples/client_quic.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/server_quic.py b/examples/server_quic.py new file mode 100644 index 00000000..e69de29b diff --git a/rsocket/transports/aioquic_transport.py b/rsocket/transports/aioquic_transport.py index 52e9076e..af9b1ab7 100644 --- a/rsocket/transports/aioquic_transport.py +++ b/rsocket/transports/aioquic_transport.py @@ -1,5 +1,5 @@ import asyncio -from typing import cast +from contextlib import asynccontextmanager from aioquic.asyncio import QuicConnectionProtocol, connect, serve from aioquic.quic.configuration import QuicConfiguration @@ -11,20 +11,20 @@ from rsocket.transports.transport import Transport +@asynccontextmanager async def rsocket_connect(host: str, port: int, configuration: QuicConfiguration = None) -> Transport: if configuration is None: configuration = QuicConfiguration( is_client=True ) - client = cast(RSocketQuicProtocol, await connect( - host, - port, - configuration=configuration, - create_protocol=RSocketQuicProtocol, - ).__aenter__()) - - return RSocketQuicTransport(client) + async with connect( + host, + port, + configuration=configuration, + create_protocol=RSocketQuicProtocol, + ) as client: + yield RSocketQuicTransport(client) def rsocket_serve(host: str, @@ -46,23 +46,22 @@ def protocol_factory(*protocol_args, **protocol_kwargs): return protocol - return asyncio.create_task(serve( + return serve( host, port, create_protocol=protocol_factory, - configuration=configuration)) + configuration=configuration) class RSocketQuicProtocol(QuicConnectionProtocol): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.frame_queue = asyncio.Queue() + self._stream_id = self._quic.get_next_available_stream_id() async def query(self, frame: Frame) -> None: data = frame.serialize() - - stream_id = self._quic.get_next_available_stream_id() - self._quic.send_stream_data(stream_id, data, end_stream=True) + self._quic.send_stream_data(self._stream_id, data, end_stream=False) self.transmit() def quic_event_received(self, event: QuicEvent) -> None: @@ -75,7 +74,7 @@ def __init__(self, quic_protocol: RSocketQuicProtocol): super().__init__() self._quic_protocol = quic_protocol self._incoming_bytes_queue = quic_protocol.frame_queue - asyncio.create_task(self.incoming_data_listener()) + self._listener = asyncio.create_task(self.incoming_data_listener()) async def send_frame(self, frame: Frame): await self._quic_protocol.query(frame) @@ -89,3 +88,4 @@ async def incoming_data_listener(self): async def close(self): self._quic_protocol.close() + self._listener.cancel() diff --git a/tests/tools/fixtures_aioquic.py b/tests/tools/fixtures_aioquic.py index 7fec067b..7fba4b67 100644 --- a/tests/tools/fixtures_aioquic.py +++ b/tests/tools/fixtures_aioquic.py @@ -85,18 +85,17 @@ def store_server(new_server): on_server_create=store_server, **(server_arguments or {})) + # from datetime import timedelta # test_overrides = {'keep_alive_period': timedelta(minutes=20)} client_arguments = client_arguments or {} # client_arguments.update(test_overrides) - transport = await rsocket_connect('localhost', unused_tcp_port, - configuration=client_configuration) - - async with RSocketClient(single_transport_provider(transport), - **client_arguments) as client: - await wait_for_server.wait() - yield server, client - await server.close() - assert_no_open_streams(client, server) - - quic_server.cancel() - await quic_server + async with rsocket_connect('localhost', unused_tcp_port, + configuration=client_configuration) as transport: + async with RSocketClient(single_transport_provider(transport), + **client_arguments) as client: + await wait_for_server.wait() + yield server, client + await server.close() + assert_no_open_streams(client, server) + + quic_server.close() From a31ceaef97f00bbae5140c9acdfe84363e8342b5 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Fri, 25 Mar 2022 17:19:09 +0300 Subject: [PATCH 05/15] temporary fix for jinja2 breaking quart. --- requirements.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 12d5e8a5..a8979588 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,6 @@ Rx==3.2.0 aiohttp==3.8.1 quart==0.16.3 coveralls==3.3.1 -aioquic==0.9.19 \ No newline at end of file +aioquic==0.9.19 + +Jinja2==3.0.3 \ No newline at end of file From 68275c8ea338a6d62996cacd69260ae1a1d8a8a4 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Fri, 25 Mar 2022 17:19:37 +0300 Subject: [PATCH 06/15] temporary fix for jinja2 breaking quart. --- tox.ini | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tox.ini b/tox.ini index 4d4f4183..36db5e11 100644 --- a/tox.ini +++ b/tox.ini @@ -13,5 +13,7 @@ deps = flake8==4.0.1 aiohttp==3.8.1 quart==0.16.3 coveralls==3.3.1 + aioquic==0.9.19 + Jinja2==3.0.3 commands = pytest --cov-report=html --cov --ignore=examples From df0d3854d4a4d8034157fbe2a81fe084912a2269 Mon Sep 17 00:00:00 2001 From: "gabis@precog.co" Date: Fri, 25 Mar 2022 17:33:54 +0300 Subject: [PATCH 07/15] added quic examples server/client --- examples/certificates/pycacert.pem | 99 ++++++++++++++++++++++++++++++ examples/certificates/ssl_cert.pem | 34 ++++++++++ examples/certificates/ssl_key.pem | 40 ++++++++++++ examples/client_quic.py | 55 +++++++++++++++++ examples/server_quic.py | 48 +++++++++++++++ examples/test_examples.py | 13 ++++ 6 files changed, 289 insertions(+) create mode 100644 examples/certificates/pycacert.pem create mode 100644 examples/certificates/ssl_cert.pem create mode 100644 examples/certificates/ssl_key.pem diff --git a/examples/certificates/pycacert.pem b/examples/certificates/pycacert.pem new file mode 100644 index 00000000..73150c96 --- /dev/null +++ b/examples/certificates/pycacert.pem @@ -0,0 +1,99 @@ +Certificate: + Data: + Version: 3 (0x2) + Serial Number: + cb:2d:80:99:5a:69:52:5b + Signature Algorithm: sha256WithRSAEncryption + Issuer: C=XY, O=Python Software Foundation CA, CN=our-ca-server + Validity + Not Before: Aug 29 14:23:16 2018 GMT + Not After : Aug 26 14:23:16 2028 GMT + Subject: C=XY, O=Python Software Foundation CA, CN=our-ca-server + Subject Public Key Info: + Public Key Algorithm: rsaEncryption + Public-Key: (3072 bit) + Modulus: + 00:97:ed:55:41:ba:36:17:95:db:71:1c:d3:e1:61: + ac:58:73:e3:c6:96:cf:2b:1f:b8:08:f5:9d:4b:4b: + c7:30:f6:b8:0b:b3:52:72:a0:bb:c9:4d:3b:8e:df: + 22:8e:01:57:81:c9:92:73:cc:00:c6:ec:70:b0:3a: + 17:40:c1:df:f2:8c:36:4c:c4:a7:81:e7:b6:24:68: + e2:a0:7e:35:07:2f:a0:5b:f9:45:46:f7:1e:f0:46: + 11:fe:ca:1a:3c:50:f1:26:a9:5f:9c:22:9c:f8:41: + e1:df:4f:12:95:19:2f:5c:90:01:17:6e:7e:3e:7d: + cf:e9:09:af:25:f8:f8:42:77:2d:6d:5f:36:f2:78: + 1e:7d:4a:87:68:63:6c:06:71:1b:8d:fa:25:fe:d4: + d3:f5:a5:17:b1:ef:ea:17:cb:54:c8:27:99:80:cb: + 3c:45:f1:2c:52:1c:dd:1f:51:45:20:50:1e:5e:ab: + 57:73:1b:41:78:96:de:84:a4:7a:dd:8f:30:85:36: + 58:79:76:a0:d2:61:c8:1b:a9:94:99:63:c6:ee:f8: + 14:bf:b4:52:56:31:97:fa:eb:ac:53:9e:95:ce:4c: + c4:5a:4a:b7:ca:03:27:5b:35:57:ce:02:dc:ec:ca: + 69:f8:8a:5a:39:cb:16:20:15:03:24:61:6c:f4:7a: + fc:b6:48:e5:59:10:5c:49:d0:23:9f:fb:71:5e:3a: + e9:68:9f:34:72:80:27:b6:3f:4c:b1:d9:db:63:7f: + 67:68:4a:6e:11:f8:e8:c0:f4:5a:16:39:53:0b:68: + de:77:fa:45:e7:f8:91:cd:78:cd:28:94:97:71:54: + fb:cf:f0:37:de:c9:26:c5:dc:1b:9e:89:6d:09:ac: + c8:44:71:cb:6d:f1:97:31:d5:4c:20:33:bf:75:4a: + a0:e0:dc:69:11:ed:2a:b4:64:10:11:30:8b:0e:b0: + a7:10:d8:8a:c5:aa:1b:c8:26:8a:25:e7:66:9f:a5: + 6a:1a:2f:7c:5f:83:c6:78:4f:1f + Exponent: 65537 (0x10001) + X509v3 extensions: + X509v3 Subject Key Identifier: + DD:BF:CA:DA:E6:D1:34:BA:37:75:21:CA:6F:9A:08:28:F2:35:B6:48 + X509v3 Authority Key Identifier: + keyid:DD:BF:CA:DA:E6:D1:34:BA:37:75:21:CA:6F:9A:08:28:F2:35:B6:48 + + X509v3 Basic Constraints: + CA:TRUE + Signature Algorithm: sha256WithRSAEncryption + 33:6a:54:d3:6b:c0:d7:01:5f:9d:f4:05:c1:93:66:90:50:d0: + b7:18:e9:b0:1e:4a:a0:b6:da:76:93:af:84:db:ad:15:54:31: + 15:13:e4:de:7e:4e:0c:d5:09:1c:34:35:b6:e5:4c:d6:6f:65: + 7d:32:5f:eb:fc:a9:6b:07:f7:49:82:e5:81:7e:07:80:9a:63: + f8:2c:c3:40:bc:8f:d4:2a:da:3e:d1:ee:08:b7:4d:a7:84:ca: + f4:3f:a1:98:45:be:b1:05:69:e7:df:d7:99:ab:1b:ee:8b:30: + cc:f7:fc:e7:d4:0b:17:ae:97:bf:e4:7b:fd:0f:a7:b4:85:79: + e3:59:e2:16:87:bf:1f:29:45:2c:23:93:76:be:c0:87:1d:de: + ec:2b:42:6a:e5:bb:c8:f4:0a:4a:08:0a:8c:5c:d8:7d:4d:d1: + b8:bf:d5:f7:29:ed:92:d1:94:04:e8:35:06:57:7f:2c:23:97: + 87:a5:35:8d:26:d3:1a:47:f2:16:d7:d9:c6:d4:1f:23:43:d3: + 26:99:39:ca:20:f4:71:23:6f:0c:4a:76:76:f7:76:1f:b3:fe: + bf:47:b0:fc:2a:56:81:e1:d2:dd:ee:08:d8:f4:ff:5a:dc:25: + 61:8a:91:02:b9:86:1c:f2:50:73:76:25:35:fc:b6:25:26:15: + cb:eb:c4:2b:61:0c:1c:e7:ee:2f:17:9b:ec:f0:d4:a1:84:e7: + d2:af:de:e4:1b:24:14:a7:01:87:e3:ab:29:58:46:a0:d9:c0: + 0a:e0:8d:d7:59:d3:1b:f8:54:20:3e:78:a5:a5:c8:4f:8b:03: + c4:96:9f:ec:fb:47:cf:76:2d:8d:65:34:27:bf:fa:ae:01:05: + 8a:f3:92:0a:dd:89:6c:97:a1:c7:e7:60:51:e7:ac:eb:4b:7d: + 2c:b8:65:c9:fe:5d:6a:48:55:8e:e4:c7:f9:6a:40:e1:b8:64: + 45:e9:b5:59:29:a5:5f:cf:7d:58:7d:64:79:e5:a4:09:ac:1e: + 76:65:3d:94:c4:68 +-----BEGIN CERTIFICATE----- +MIIEbTCCAtWgAwIBAgIJAMstgJlaaVJbMA0GCSqGSIb3DQEBCwUAME0xCzAJBgNV +BAYTAlhZMSYwJAYDVQQKDB1QeXRob24gU29mdHdhcmUgRm91bmRhdGlvbiBDQTEW +MBQGA1UEAwwNb3VyLWNhLXNlcnZlcjAeFw0xODA4MjkxNDIzMTZaFw0yODA4MjYx +NDIzMTZaME0xCzAJBgNVBAYTAlhZMSYwJAYDVQQKDB1QeXRob24gU29mdHdhcmUg +Rm91bmRhdGlvbiBDQTEWMBQGA1UEAwwNb3VyLWNhLXNlcnZlcjCCAaIwDQYJKoZI +hvcNAQEBBQADggGPADCCAYoCggGBAJftVUG6NheV23Ec0+FhrFhz48aWzysfuAj1 +nUtLxzD2uAuzUnKgu8lNO47fIo4BV4HJknPMAMbscLA6F0DB3/KMNkzEp4HntiRo +4qB+NQcvoFv5RUb3HvBGEf7KGjxQ8SapX5winPhB4d9PEpUZL1yQARdufj59z+kJ +ryX4+EJ3LW1fNvJ4Hn1Kh2hjbAZxG436Jf7U0/WlF7Hv6hfLVMgnmYDLPEXxLFIc +3R9RRSBQHl6rV3MbQXiW3oSket2PMIU2WHl2oNJhyBuplJljxu74FL+0UlYxl/rr +rFOelc5MxFpKt8oDJ1s1V84C3OzKafiKWjnLFiAVAyRhbPR6/LZI5VkQXEnQI5/7 +cV466WifNHKAJ7Y/TLHZ22N/Z2hKbhH46MD0WhY5Uwto3nf6Ref4kc14zSiUl3FU ++8/wN97JJsXcG56JbQmsyERxy23xlzHVTCAzv3VKoODcaRHtKrRkEBEwiw6wpxDY +isWqG8gmiiXnZp+lahovfF+DxnhPHwIDAQABo1AwTjAdBgNVHQ4EFgQU3b/K2ubR +NLo3dSHKb5oIKPI1tkgwHwYDVR0jBBgwFoAU3b/K2ubRNLo3dSHKb5oIKPI1tkgw +DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAYEAM2pU02vA1wFfnfQFwZNm +kFDQtxjpsB5KoLbadpOvhNutFVQxFRPk3n5ODNUJHDQ1tuVM1m9lfTJf6/ypawf3 +SYLlgX4HgJpj+CzDQLyP1CraPtHuCLdNp4TK9D+hmEW+sQVp59/Xmasb7oswzPf8 +59QLF66Xv+R7/Q+ntIV541niFoe/HylFLCOTdr7Ahx3e7CtCauW7yPQKSggKjFzY +fU3RuL/V9yntktGUBOg1Bld/LCOXh6U1jSbTGkfyFtfZxtQfI0PTJpk5yiD0cSNv +DEp2dvd2H7P+v0ew/CpWgeHS3e4I2PT/WtwlYYqRArmGHPJQc3YlNfy2JSYVy+vE +K2EMHOfuLxeb7PDUoYTn0q/e5BskFKcBh+OrKVhGoNnACuCN11nTG/hUID54paXI +T4sDxJaf7PtHz3YtjWU0J7/6rgEFivOSCt2JbJehx+dgUees60t9LLhlyf5dakhV +juTH+WpA4bhkRem1WSmlX899WH1keeWkCawedmU9lMRo +-----END CERTIFICATE----- diff --git a/examples/certificates/ssl_cert.pem b/examples/certificates/ssl_cert.pem new file mode 100644 index 00000000..1f7127a4 --- /dev/null +++ b/examples/certificates/ssl_cert.pem @@ -0,0 +1,34 @@ +-----BEGIN CERTIFICATE----- +MIIF8TCCBFmgAwIBAgIJAMstgJlaaVJcMA0GCSqGSIb3DQEBCwUAME0xCzAJBgNV +BAYTAlhZMSYwJAYDVQQKDB1QeXRob24gU29mdHdhcmUgRm91bmRhdGlvbiBDQTEW +MBQGA1UEAwwNb3VyLWNhLXNlcnZlcjAeFw0xODA4MjkxNDIzMTZaFw0yODA3MDcx +NDIzMTZaMF8xCzAJBgNVBAYTAlhZMRcwFQYDVQQHDA5DYXN0bGUgQW50aHJheDEj +MCEGA1UECgwaUHl0aG9uIFNvZnR3YXJlIEZvdW5kYXRpb24xEjAQBgNVBAMMCWxv +Y2FsaG9zdDCCAaIwDQYJKoZIhvcNAQEBBQADggGPADCCAYoCggGBAJ8oLzdB739k +YxZiFukBFGIpyjqYkj0I015p/sDz1MT7DljcZLBLy7OqnkLpB5tnM8256DwdihPA +3zlnfEzTfr9DD0qFBW2H5cMCoz7X17koeRhzGDd3dkjUeBjXvR5qRosG8wM3lQug +U7AizY+3Azaj1yN3mZ9K5a20jr58Kqinz+Xxx6sb2JfYYff2neJbBahNm5id0AD2 +pi/TthZqO5DURJYo+MdgZOcy+7jEjOJsLWZd3Yzq78iM07qDjbpIoVpENZCTHTWA +hX8LIqz0OBmh4weQpm4+plU7E4r4D82uauocWw8iyuznCTtABWO7n9fWySmf9QZC +WYxHAFpBQs6zUVqAD7nhFdTqpQ9bRiaEnjE4HiAccPW+MAoSxFnv/rNzEzI6b4zU +NspFMfg1aNVamdjxdpUZ1GG1Okf0yPJykqEX4PZl3La1Be2q7YZ1wydR523Xd+f3 +EO4/g+imETSKn8gyCf6Rvib175L4r2WV1CXQH7gFwZYCod6WHYq5TQIDAQABo4IB +wDCCAbwwFAYDVR0RBA0wC4IJbG9jYWxob3N0MA4GA1UdDwEB/wQEAwIFoDAdBgNV +HSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDAYDVR0TAQH/BAIwADAdBgNVHQ4E +FgQUj+od4zNcABazi29rb9NMy7XLfFUwfQYDVR0jBHYwdIAU3b/K2ubRNLo3dSHK +b5oIKPI1tkihUaRPME0xCzAJBgNVBAYTAlhZMSYwJAYDVQQKDB1QeXRob24gU29m +dHdhcmUgRm91bmRhdGlvbiBDQTEWMBQGA1UEAwwNb3VyLWNhLXNlcnZlcoIJAMst +gJlaaVJbMIGDBggrBgEFBQcBAQR3MHUwPAYIKwYBBQUHMAKGMGh0dHA6Ly90ZXN0 +Y2EucHl0aG9udGVzdC5uZXQvdGVzdGNhL3B5Y2FjZXJ0LmNlcjA1BggrBgEFBQcw +AYYpaHR0cDovL3Rlc3RjYS5weXRob250ZXN0Lm5ldC90ZXN0Y2Evb2NzcC8wQwYD +VR0fBDwwOjA4oDagNIYyaHR0cDovL3Rlc3RjYS5weXRob250ZXN0Lm5ldC90ZXN0 +Y2EvcmV2b2NhdGlvbi5jcmwwDQYJKoZIhvcNAQELBQADggGBACf1jFkQ9MbnKAC/ +uo17EwPxHKZfswZVpCK527LVRr33DN1DbrR5ZWchDCpV7kCOhZ+fR7sKKk22ZHSY +oH+u3PEu20J3GOB1iyY1aMNB7WvId3JvappdVWkC/VpUyFfLsGUDFuIPADmZZqCb +iJMX4loteTVfl1d4xK/1mV6Gq9MRrRqiDfpSELn+v53OM9mGspwW+NZ1CIrbCuW0 +KxZ/tPkqn8PSd9fNZR70bB7rWbnwrl+kH8xKxLl6qdlrMmg74WWwhLeQxK7+9DdP +IaDenzqx5cwWBGY/C0HcQj0gPuy3lSs1V/q+f7Y6uspPWP51PgiJLIywXS75iRAr ++UFGTzwAtyfTZSQoFyMmMULqfk6T5HtoVMqfRvPvK+mFDLWEstU1NIB1K/CRI7gI +AY65ClTU+zRS/tlF8IA7tsFvgtEf8jsI9kamlidhS1gyeg4dWcVErV4aeTPB1AUv +StPYQkKNM+NjytWHl5tNuBoDNLsc0gI/WSPiI4CIY8LwomOoiw== +-----END CERTIFICATE----- diff --git a/examples/certificates/ssl_key.pem b/examples/certificates/ssl_key.pem new file mode 100644 index 00000000..b8fdec10 --- /dev/null +++ b/examples/certificates/ssl_key.pem @@ -0,0 +1,40 @@ +-----BEGIN PRIVATE KEY----- +MIIG/QIBADANBgkqhkiG9w0BAQEFAASCBucwggbjAgEAAoIBgQCfKC83Qe9/ZGMW +YhbpARRiKco6mJI9CNNeaf7A89TE+w5Y3GSwS8uzqp5C6QebZzPNueg8HYoTwN85 +Z3xM036/Qw9KhQVth+XDAqM+19e5KHkYcxg3d3ZI1HgY170eakaLBvMDN5ULoFOw +Is2PtwM2o9cjd5mfSuWttI6+fCqop8/l8cerG9iX2GH39p3iWwWoTZuYndAA9qYv +07YWajuQ1ESWKPjHYGTnMvu4xIzibC1mXd2M6u/IjNO6g426SKFaRDWQkx01gIV/ +CyKs9DgZoeMHkKZuPqZVOxOK+A/NrmrqHFsPIsrs5wk7QAVju5/X1skpn/UGQlmM +RwBaQULOs1FagA+54RXU6qUPW0YmhJ4xOB4gHHD1vjAKEsRZ7/6zcxMyOm+M1DbK +RTH4NWjVWpnY8XaVGdRhtTpH9MjycpKhF+D2Zdy2tQXtqu2GdcMnUedt13fn9xDu +P4PophE0ip/IMgn+kb4m9e+S+K9lldQl0B+4BcGWAqHelh2KuU0CAwEAAQKCAYEA +lKiWIYjmyRjdLKUGPTES9vWNvNmRjozV0RQ0LcoSbMMLDZkeO0UwyWqOVHUQ8+ib +jIcfEjeNJxI57oZopeHOO5vJhpNlFH+g7ltiW2qERqA1K88lSXm99Bzw6FNqhCRE +K8ub5N9fyfJA+P4o/xm0WK8EXk5yIUV17p/9zJJxzgKgv2jsVTi3QG2OZGvn4Oug +ByomMZEGHkBDzdxz8c/cP1Tlk1RFuwSgews178k2xq7AYSM/s0YmHi7b/RSvptX6 +1v8P8kXNUe4AwTaNyrlvF2lwIadZ8h1hA7tCE2n44b7a7KfhAkwcbr1T59ioYh6P +zxsyPT678uD51dbtD/DXJCcoeeFOb8uzkR2KNcrnQzZpCJnRq4Gp5ybxwsxxuzpr +gz0gbNlhuWtE7EoSzmIK9t+WTS7IM2CvZymd6/OAh1Fuw6AQhSp64XRp3OfMMAAC +Ie2EPtKj4islWGT8VoUjuRYGmdRh4duAH1dkiAXOWA3R7y5a1/y/iE8KE8BtxocB +AoHBAM8aiURgpu1Fs0Oqz6izec7KSLL3l8hmW+MKUOfk/Ybng6FrTFsL5YtzR+Ap +wW4wwWnnIKEc1JLiZ7g8agRETK8hr5PwFXUn/GSWC0SMsazLJToySQS5LOV0tLzK +kJ3jtNU7tnlDGNkCHTHSoVL2T/8t+IkZI/h5Z6wjlYPvU2Iu0nVIXtiG+alv4A6M +Hrh9l5or4mjB6rGnVXeYohLkCm6s/W97ahVxLMcEdbsBo1prm2JqGnSoiR/tEFC/ +QHQnbQKBwQDEu7kW0Yg9sZ89QtYtVQ1YpixFZORaUeRIRLnpEs1w7L1mCbOZ2Lj9 +JHxsH05cYAc7HJfPwwxv3+3aGAIC/dfu4VSwEFtatAzUpzlhzKS5+HQCWB4JUNNU +MQ3+FwK2xQX4Ph8t+OzrFiYcK2g0An5UxWMa2HWIAWUOhnTOydAVsoH6yP31cVm4 +0hxoABCwflaNLNGjRUyfBpLTAcNu/YtcE+KREy7YAAgXXrhRSO4XpLsSXwLnLT7/ +YOkoBWDcTWECgcBPWnSUDZCIQ3efithMZJBciqd2Y2X19Dpq8O31HImD4jtOY0V7 +cUB/wSkeHAGwjd/eCyA2e0x8B2IEdqmMfvr+86JJxekC3dJYXCFvH5WIhsH53YCa +3bT1KlWCLP9ib/g+58VQC0R/Cc9T4sfLePNH7D5ZkZd1wlbV30CPr+i8KwKay6MD +xhvtLx+jk07GE+E9wmjbCMo7TclyrLoVEOlqZMAqshgApT+p9eyCPetwXuDHwa3n +WxhHclcZCV7R4rUCgcAkdGSnxcvpIrDPOUNWwxvmAWTStw9ZbTNP8OxCNCm9cyDl +d4bAS1h8D/a+Uk7C70hnu7Sl2w7C7Eu2zhwRUdhhe3+l4GINPK/j99i6NqGPlGpq +xMlMEJ4YS768BqeKFpg0l85PRoEgTsphDeoROSUPsEPdBZ9BxIBlYKTkbKESZDGR +twzYHljx1n1NCDYPflmrb1KpXn4EOcObNghw2KqqNUUWfOeBPwBA1FxzM4BrAStp +DBINpGS4Dc0mjViVegECgcA3hTtm82XdxQXj9LQmb/E3lKx/7H87XIOeNMmvjYuZ +iS9wKrkF+u42vyoDxcKMCnxP5056wpdST4p56r+SBwVTHcc3lGBSGcMTIfwRXrj3 +thOA2our2n4ouNIsYyTlcsQSzifwmpRmVMRPxl9fYVdEWUgB83FgHT0D9avvZnF9 +t9OccnGJXShAIZIBADhVj/JwG4FbaX42NijD5PNpVLk1Y17OV0I576T9SfaQoBjJ +aH1M/zC4aVaS0DYB/Gxq7v8= +-----END PRIVATE KEY----- diff --git a/examples/client_quic.py b/examples/client_quic.py index e69de29b..bd8e9ee2 100644 --- a/examples/client_quic.py +++ b/examples/client_quic.py @@ -0,0 +1,55 @@ +import asyncio +import logging +import sys +from pathlib import Path + +from aioquic.quic.configuration import QuicConfiguration + +from reactivestreams.subscriber import DefaultSubscriber +from rsocket.helpers import single_transport_provider +from rsocket.payload import Payload +from rsocket.rsocket_client import RSocketClient +from rsocket.transports.aioquic_transport import rsocket_connect + + +class StreamSubscriber(DefaultSubscriber): + + def on_next(self, value, is_complete=False): + logging.info('RS: {}'.format(value)) + self.subscription.request(1) + + +async def main(server_port): + logging.info('Connecting to server at localhost:%s', server_port) + + client_configuration = QuicConfiguration( + is_client=True + ) + ca_file_path = Path(__file__).parent / 'certificates' / 'pycacert.pem' + client_configuration.load_verify_locations(cafile=str(ca_file_path)) + + async with rsocket_connect('localhost', server_port, + configuration=client_configuration) as transport: + async with RSocketClient(single_transport_provider(transport)) as client: + payload = Payload(b'%Y-%m-%d %H:%M:%S') + + async def run_request_response(): + try: + while True: + result = await client.request_response(payload) + logging.info('Response: {}'.format(result.data)) + await asyncio.sleep(1) + except asyncio.CancelledError: + pass + + task = asyncio.create_task(run_request_response()) + + await asyncio.sleep(5) + task.cancel() + await task + + +if __name__ == '__main__': + port = sys.argv[1] if len(sys.argv) > 1 else 6565 + logging.basicConfig(level=logging.DEBUG) + asyncio.run(main(port)) diff --git a/examples/server_quic.py b/examples/server_quic.py index e69de29b..dbf975b5 100644 --- a/examples/server_quic.py +++ b/examples/server_quic.py @@ -0,0 +1,48 @@ +import asyncio +import logging +import sys +from datetime import datetime +from pathlib import Path + +from aioquic.quic.configuration import QuicConfiguration + +from rsocket.helpers import create_future +from rsocket.payload import Payload +from rsocket.request_handler import BaseRequestHandler +from rsocket.transports.aioquic_transport import rsocket_serve + + +class Handler(BaseRequestHandler): + async def request_response(self, payload: Payload) -> asyncio.Future: + await asyncio.sleep(0.1) # Simulate not immediate process + date_time_format = payload.data.decode('utf-8') + formatted_date_time = datetime.now().strftime(date_time_format) + return create_future(Payload(formatted_date_time.encode('utf-8'))) + + +def run_server(server_port): + logging.info('Starting server at localhost:%s', server_port) + + configuration = QuicConfiguration( + is_client=False + ) + + certificates_path = Path(__file__).parent / 'certificates' + configuration.load_cert_chain(certificates_path / 'ssl_cert.pem', certificates_path / 'ssl_key.pem') + + return rsocket_serve(host='localhost', + port=server_port, + configuration=configuration, + handler_factory=Handler) + + +if __name__ == '__main__': + port = sys.argv[1] if len(sys.argv) > 1 else 6565 + logging.basicConfig(level=logging.DEBUG) + + loop = asyncio.get_event_loop() + loop.run_until_complete(run_server(port)) + try: + loop.run_forever() + except KeyboardInterrupt: + pass diff --git a/examples/test_examples.py b/examples/test_examples.py index 2a4c2f9e..c2c54286 100644 --- a/examples/test_examples.py +++ b/examples/test_examples.py @@ -19,6 +19,19 @@ def test_simple_client_server(unused_tcp_port): os.kill(pid, signal.SIGTERM) +def test_quic_client_server(unused_tcp_port): + pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', './server_quic.py', str(unused_tcp_port)) + + try: + sleep(2) + client = subprocess.Popen(['python3', './client.py_quic', str(unused_tcp_port)]) + client.wait(timeout=10) + + assert client.returncode == 0 + finally: + os.kill(pid, signal.SIGTERM) + + @pytest.mark.timeout(30) def test_client_server_with_routing(unused_tcp_port): pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', './server_with_routing.py', str(unused_tcp_port)) From 70a98ef4fa7082f67ccdc1bfa3c1da1579953416 Mon Sep 17 00:00:00 2001 From: gabi Date: Fri, 25 Mar 2022 18:51:50 +0300 Subject: [PATCH 08/15] tests - make ssl certificate generation session scoped. --- tests/conftest.py | 17 ++++++++++------- tests/tools/fixtures_aioquic.py | 13 ++++++++++--- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index f6e54777..181002de 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,7 +7,8 @@ from rsocket.frame_parser import FrameParser # noinspection PyUnresolvedReferences from tests.tools.fixtures_aiohttp import pipe_factory_aiohttp_websocket, aiohttp_raw_server -from tests.tools.fixtures_aioquic import pipe_factory_quic +# noinspection PyUnresolvedReferences +from tests.tools.fixtures_aioquic import pipe_factory_quic, generate_test_certificates from tests.tools.fixtures_quart import pipe_factory_quart_websocket from tests.tools.fixtures_tcp import pipe_factory_tcp @@ -48,14 +49,14 @@ def is_allowed_error(record): @pytest.fixture(params=tested_transports) -async def lazy_pipe(request, aiohttp_raw_server, unused_tcp_port): - pipe_factory = get_pipe_factory_by_id(aiohttp_raw_server, request.param) +async def lazy_pipe(request, aiohttp_raw_server, unused_tcp_port, generate_test_certificates): + pipe_factory = get_pipe_factory_by_id(aiohttp_raw_server, request.param, generate_test_certificates) yield functools.partial(pipe_factory, unused_tcp_port) @pytest.fixture(params=tested_transports) -async def pipe(request, aiohttp_raw_server, unused_tcp_port): - pipe_factory = get_pipe_factory_by_id(aiohttp_raw_server, request.param) +async def pipe(request, aiohttp_raw_server, unused_tcp_port, generate_test_certificates): + pipe_factory = get_pipe_factory_by_id(aiohttp_raw_server, request.param, generate_test_certificates) async with pipe_factory(unused_tcp_port) as components: yield components @@ -71,7 +72,9 @@ async def lazy_pipe_tcp(aiohttp_raw_server, unused_tcp_port): yield functools.partial(pipe_factory_tcp, unused_tcp_port) -def get_pipe_factory_by_id(aiohttp_raw_server, transport_id: str): +def get_pipe_factory_by_id(aiohttp_raw_server, + transport_id: str, + generate_test_certificates): if transport_id == 'tcp': return pipe_factory_tcp if transport_id == 'quart': @@ -79,7 +82,7 @@ def get_pipe_factory_by_id(aiohttp_raw_server, transport_id: str): if transport_id == 'aiohttp': return functools.partial(pipe_factory_aiohttp_websocket, aiohttp_raw_server) if transport_id == 'quic': - return pipe_factory_quic + return functools.partial(pipe_factory_quic, generate_test_certificates) @pytest.fixture diff --git a/tests/tools/fixtures_aioquic.py b/tests/tools/fixtures_aioquic.py index 7fba4b67..8eab1667 100644 --- a/tests/tools/fixtures_aioquic.py +++ b/tests/tools/fixtures_aioquic.py @@ -3,6 +3,7 @@ from contextlib import asynccontextmanager from typing import Optional +import pytest from aioquic.quic.configuration import QuicConfiguration from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization @@ -53,11 +54,17 @@ def generate_ec_certificate(common_name, alternative_names=None, curve=ec.SECP25 ) +@pytest.fixture(scope="session") +def generate_test_certificates(): + return generate_ec_certificate(common_name="localhost") + + @asynccontextmanager -async def pipe_factory_quic(unused_tcp_port, +async def pipe_factory_quic(generate_test_certificates, + unused_tcp_port, client_arguments=None, server_arguments=None): - certificate, private_key = generate_ec_certificate(common_name="localhost") + certificate, private_key = generate_test_certificates server_configuration = QuicConfiguration( certificate=certificate, @@ -90,7 +97,7 @@ def store_server(new_server): client_arguments = client_arguments or {} # client_arguments.update(test_overrides) async with rsocket_connect('localhost', unused_tcp_port, - configuration=client_configuration) as transport: + configuration=client_configuration) as transport: async with RSocketClient(single_transport_provider(transport), **client_arguments) as client: await wait_for_server.wait() From 6e45985d00bfa2a049a3432425cabe341c48d34c Mon Sep 17 00:00:00 2001 From: gabi Date: Fri, 25 Mar 2022 21:01:47 +0300 Subject: [PATCH 09/15] tests - reduced default timeout to 7 seconds --- setup.cfg | 2 +- tests/rsocket/test_lease.py | 2 ++ tests/rsocket/test_request_stream.py | 1 + 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index d5bfe179..a6d4318f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -10,5 +10,5 @@ test = pytest [tool:pytest] addopts = --verbose asyncio_mode = auto -timeout = 15 +timeout = 7 diff --git a/tests/rsocket/test_lease.py b/tests/rsocket/test_lease.py index 17a24241..ac6e4250 100644 --- a/tests/rsocket/test_lease.py +++ b/tests/rsocket/test_lease.py @@ -56,6 +56,7 @@ async def request_response(self, request: Payload): assert response == Payload(b'data: dog', b'meta: cat') +@pytest.mark.timeout(15) async def test_request_response_with_client_and_server_side_lease_works(lazy_pipe): class Handler(BaseRequestHandler): async def request_response(self, request: Payload): @@ -103,6 +104,7 @@ async def request_response(self, request: Payload): await asyncio.wait_for(client.request_response(Payload(b'invalid request')), 3) +@pytest.mark.timeout(15) async def test_request_response_with_lease_client_side_exception_requests_late(lazy_pipe): class Handler(BaseRequestHandler): async def request_response(self, request: Payload): diff --git a/tests/rsocket/test_request_stream.py b/tests/rsocket/test_request_stream.py index d1ddd21c..c868e6d6 100644 --- a/tests/rsocket/test_request_stream.py +++ b/tests/rsocket/test_request_stream.py @@ -246,6 +246,7 @@ async def request_stream(self, payload: Payload) -> Publisher: assert fragments_sent == 24 +@pytest.mark.timeout(15) async def test_request_stream_concurrent_request_n(pipe: Tuple[RSocketServer, RSocketClient]): server, client = pipe From 7f7319a8c1bb16e6c7286d832c0265ddaad32cf0 Mon Sep 17 00:00:00 2001 From: Gabriel Shaar Date: Fri, 25 Mar 2022 21:11:33 +0300 Subject: [PATCH 10/15] disabled treating warnings as errors --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 292315e4..8a5320a1 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -36,7 +36,7 @@ jobs: flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest run: | - pytest --cov-report=html --cov -Werror tests + pytest --cov-report=html --cov tests - name: Archive code coverage html report uses: actions/upload-artifact@v2 with: From 239717a3aacf53971c826edf9710564a7422ec63 Mon Sep 17 00:00:00 2001 From: gabi Date: Fri, 25 Mar 2022 21:28:35 +0300 Subject: [PATCH 11/15] possibly quic transport fixes dangling asyncio tasks --- rsocket/transports/aioquic_transport.py | 19 +++++++++++++------ tests/tools/fixtures_aiohttp.py | 5 +++-- tests/tools/fixtures_aioquic.py | 9 +++++---- tests/tools/fixtures_quart.py | 5 +++-- 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/rsocket/transports/aioquic_transport.py b/rsocket/transports/aioquic_transport.py index af9b1ab7..58c3b7b1 100644 --- a/rsocket/transports/aioquic_transport.py +++ b/rsocket/transports/aioquic_transport.py @@ -6,6 +6,8 @@ from aioquic.quic.events import QuicEvent, StreamDataReceived from rsocket.frame import Frame +from rsocket.helpers import wrap_transport_exception +from rsocket.logger import logger from rsocket.rsocket_server import RSocketServer from rsocket.transports.abstract_messaging import AbstractMessagingTransport from rsocket.transports.transport import Transport @@ -77,15 +79,20 @@ def __init__(self, quic_protocol: RSocketQuicProtocol): self._listener = asyncio.create_task(self.incoming_data_listener()) async def send_frame(self, frame: Frame): - await self._quic_protocol.query(frame) + with wrap_transport_exception(): + await self._quic_protocol.query(frame) async def incoming_data_listener(self): - while True: - data = await self._incoming_bytes_queue.get() + with wrap_transport_exception(): + try: + while True: + data = await self._incoming_bytes_queue.get() - async for frame in self._frame_parser.receive_data(data, 0): - self._incoming_frame_queue.put_nowait(frame) + async for frame in self._frame_parser.receive_data(data, 0): + self._incoming_frame_queue.put_nowait(frame) + except asyncio.CancelledError: + logger().debug('Asyncio task canceled: incoming_data_listener') async def close(self): - self._quic_protocol.close() self._listener.cancel() + self._quic_protocol.close() diff --git a/tests/tools/fixtures_aiohttp.py b/tests/tools/fixtures_aiohttp.py index df3abb50..2e3e3591 100644 --- a/tests/tools/fixtures_aiohttp.py +++ b/tests/tools/fixtures_aiohttp.py @@ -52,5 +52,6 @@ def store_server(new_server): **client_arguments) as client: await wait_for_server.wait() yield server, client - await server.close() - assert_no_open_streams(client, server) \ No newline at end of file + + await server.close() + assert_no_open_streams(client, server) diff --git a/tests/tools/fixtures_aioquic.py b/tests/tools/fixtures_aioquic.py index 8eab1667..281bbcb5 100644 --- a/tests/tools/fixtures_aioquic.py +++ b/tests/tools/fixtures_aioquic.py @@ -75,8 +75,8 @@ async def pipe_factory_quic(generate_test_certificates, client_configuration = QuicConfiguration( is_client=True ) - cadata = certificate.public_bytes(serialization.Encoding.PEM) - client_configuration.load_verify_locations(cadata=cadata, cafile=None) + ca_data = certificate.public_bytes(serialization.Encoding.PEM) + client_configuration.load_verify_locations(cadata=ca_data, cafile=None) server: Optional[RSocketBase] = None wait_for_server = Event() @@ -102,7 +102,8 @@ def store_server(new_server): **client_arguments) as client: await wait_for_server.wait() yield server, client - await server.close() - assert_no_open_streams(client, server) + + await server.close() + assert_no_open_streams(client, server) quic_server.close() diff --git a/tests/tools/fixtures_quart.py b/tests/tools/fixtures_quart.py index f9011ddb..655cf3de 100644 --- a/tests/tools/fixtures_quart.py +++ b/tests/tools/fixtures_quart.py @@ -36,8 +36,9 @@ async def ws(): **client_arguments) as client: await wait_for_server.wait() yield server, client - await server.close() - assert_no_open_streams(client, server) + + await server.close() + assert_no_open_streams(client, server) try: server_task.cancel() From 47a72ceb850869f3ea54c922de2eae0fdb3a7b43 Mon Sep 17 00:00:00 2001 From: gabi Date: Fri, 25 Mar 2022 21:33:27 +0300 Subject: [PATCH 12/15] ignore example tests --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 8a5320a1..5b151458 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -36,7 +36,7 @@ jobs: flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest run: | - pytest --cov-report=html --cov tests + pytest --cov-report=html --cov --ignore=examples tests - name: Archive code coverage html report uses: actions/upload-artifact@v2 with: From 0b3231c90a51d2db12a1a9535c91f6ded1890e08 Mon Sep 17 00:00:00 2001 From: gabi Date: Fri, 25 Mar 2022 21:50:07 +0300 Subject: [PATCH 13/15] configure pytest to treat all warnings as errors except deprecation warnings --- setup.cfg | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index a6d4318f..27fbb769 100644 --- a/setup.cfg +++ b/setup.cfg @@ -11,4 +11,6 @@ test = pytest addopts = --verbose asyncio_mode = auto timeout = 7 - +filterwarnings = + error + ignore::DeprecationWarning From 20d263ccb2d36274e2bc395cb712aebf5e19ef22 Mon Sep 17 00:00:00 2001 From: gabi Date: Fri, 25 Mar 2022 21:56:42 +0300 Subject: [PATCH 14/15] flake8 fixes --- tests/tools/fixtures_aioquic.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/tests/tools/fixtures_aioquic.py b/tests/tools/fixtures_aioquic.py index 281bbcb5..984fc13b 100644 --- a/tests/tools/fixtures_aioquic.py +++ b/tests/tools/fixtures_aioquic.py @@ -21,15 +21,14 @@ def generate_certificate(*, alternative_names, common_name, hash_algorithm, key) [x509.NameAttribute(x509.NameOID.COMMON_NAME, common_name)] ) - builder = ( - x509.CertificateBuilder() - .subject_name(subject) - .issuer_name(issuer) - .public_key(key.public_key()) - .serial_number(x509.random_serial_number()) - .not_valid_before(datetime.datetime.utcnow()) - .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=10)) - ) + builder = (x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.datetime.utcnow()) + .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=10)) + ) if alternative_names: builder = builder.add_extension( x509.SubjectAlternativeName( From b976253de97d18aa238f8f8b51a0eda9015e77c4 Mon Sep 17 00:00:00 2001 From: Gabriel Shaar Date: Fri, 25 Mar 2022 22:27:52 +0300 Subject: [PATCH 15/15] updated readme.md with quic support --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 3997169f..7de561d2 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ all the examples | server (python) | server (java) | client (python) | client(java) | |-----------------------------|---------------|------------------------------------|-----------------| | server.py | | client.py | | +| server_quic.py | | client_quic.py | | | server_with_lease.py | | | ClientWithLease | | server_with_routing.py | | client_with_routing.py | Client | | | Server | run_against_example_java_server.py | | @@ -74,6 +75,7 @@ all the examples - [ ] Transports - [X] TCP - [X] Websocket + - [X] QUIC - [ ] HTTP/2 - [ ] Aeron - [X] RxPy Integration