diff --git a/requirements.txt b/requirements.txt index 23f9461a..7ec6a707 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,18 @@ Rx==3.2.0 -aiohttp==3.10.2 + +aiohttp==3.11.12; python_version > "3.8" +aiohttp==3.10.11; python_version == "3.8" + aioquic==1.2.0 -asyncstdlib==3.12.5 -asyncclick==8.1.7.2 +asyncstdlib==3.13.0 + +asyncclick==8.1.8; python_version > "3.8" +asyncclick==8.1.7.2; python_version == "3.8" + coverage==6.5.0 coveralls==3.3.1 decoy==2.1.1 -flake8==7.1.1 +flake8==7.1.2 pytest-asyncio==0.25.3; python_version > "3.8" pytest-asyncio==0.23.4; python_version == "3.8" @@ -16,21 +22,34 @@ pytest-cov==4.1.0 pytest-profiling==1.8.1; python_version > "3.8" pytest-profiling==1.7.0; python_version == "3.8" -pytest-rerunfailures==13.0 +pytest-rerunfailures==15.0; python_version > "3.8" +pytest-rerunfailures==14.0; python_version == "3.8" + pytest-timeout==2.3.1 pytest-xdist==3.6.1 pytest==8.3.4; python_version > "3.8" pytest==7.4.4; python_version == "3.8" -quart==0.19.9 +quart==0.20.0; python_version > "3.8" +quart==0.19.9; python_version == "3.8" + reactivex==4.0.4 -starlette==0.40.0 + +starlette==0.45.3; python_version > "3.8" +starlette==0.44.0; python_version == "3.8" + cbitstruct==1.1.0; python_version <= "3.12" cloudevents==1.11.0 -pydantic==1.10.18 -Werkzeug==3.0.4 -graphql-core==3.2.5 -gql==3.5.0 -websockets==13.1 +pydantic==1.10.21 + +Werkzeug==3.1.3; python_version > "3.8" +Werkzeug==3.0.6; python_version == "3.8" + +graphql-core==3.2.3 +gql==3.5.1 + +websockets==15.0; python_version > "3.8" +websockets==13.1; python_version == "3.8" + asyncwebsockets==0.9.4 \ No newline at end of file diff --git a/rsocket/graphql/rsocket_transport.py b/rsocket/graphql/rsocket_transport.py index 407a013f..19188760 100644 --- a/rsocket/graphql/rsocket_transport.py +++ b/rsocket/graphql/rsocket_transport.py @@ -9,6 +9,7 @@ from reactivestreams.subscriber import DefaultSubscriber from rsocket.extensions.helpers import composite, route from rsocket.frame_helpers import str_to_bytes +from rsocket.logger import logger from rsocket.payload import Payload from rsocket.rsocket_client import RSocketClient @@ -104,6 +105,14 @@ def on_complete(self): def on_error(self, exception: Exception): self._received_queue.put_nowait(exception) + def cancel(self): + if self.subscription is not None: + self.subscription.cancel() + + def request(self, n: int): + if self.subscription is not None: + self.subscription.request(n) + rsocket_payload = self._create_rsocket_payload(document, variable_values, operation_name) received_queue = Queue() @@ -112,12 +121,20 @@ def on_error(self, exception: Exception): self._rsocket_client.request_stream(rsocket_payload).subscribe(subscriber) while True: - response = await received_queue.get() + try: + response = await received_queue.get() + + if isinstance(response, Exception): + raise response + + if response is complete_object: + break - if isinstance(response, Exception): - raise response + execution_result = self._response_to_execution_result(response) - if response is complete_object: - break + yield execution_result + except GeneratorExit: + logger().debug('Generator exited') + subscriber.cancel() + return - yield self._response_to_execution_result(response) diff --git a/rsocket/transports/aioquic_transport.py b/rsocket/transports/aioquic_transport.py index bee8de7c..7b50704e 100644 --- a/rsocket/transports/aioquic_transport.py +++ b/rsocket/transports/aioquic_transport.py @@ -102,6 +102,8 @@ async def send_frame(self, frame: Frame): async def incoming_data_listener(self): try: + logger().debug('Quic - Listener started') + await self._quic_protocol.wait_connected() while True: @@ -118,8 +120,11 @@ async def incoming_data_listener(self): logger().debug('Asyncio task canceled: incoming_data_listener') except Exception: self._incoming_frame_queue.put_nowait(RSocketTransportError()) + finally: + logger().debug('Quic - Listener stopped') async def close(self): + logger().debug('Quic - Closing transport') await cancel_if_task_exists(self._listener) self._quic_protocol.close() diff --git a/tests/rsocket/test_concurrency.py b/tests/rsocket/test_concurrency.py index 7a3d2893..813ce4de 100644 --- a/tests/rsocket/test_concurrency.py +++ b/tests/rsocket/test_concurrency.py @@ -1,4 +1,5 @@ import asyncio +import logging from typing import Tuple, Optional import pytest @@ -76,6 +77,8 @@ async def run(): request_2 = asyncio.create_task(measure_time(client.request_response(Payload(b'10')))) return await request_1, await request_2 + logging.debug("Starting concurrent requests") + measure_result = await measure_time(run()) results = measure_result.result diff --git a/tests/rsocket/test_routing.py b/tests/rsocket/test_routing.py index 32f94900..d452c5c1 100644 --- a/tests/rsocket/test_routing.py +++ b/tests/rsocket/test_routing.py @@ -5,12 +5,14 @@ from reactivestreams.subscriber import DefaultSubscriber from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket +from rsocket.error_codes import ErrorCode from rsocket.extensions.authentication import Authentication, AuthenticationSimple from rsocket.extensions.composite_metadata import CompositeMetadata from rsocket.extensions.helpers import route, composite, authenticate_simple from rsocket.extensions.mimetypes import WellKnownMimeTypes from rsocket.helpers import create_response from rsocket.payload import Payload +from rsocket.request_handler import BaseRequestHandler from rsocket.routing.request_router import RequestRouter from rsocket.routing.routing_request_handler import RoutingRequestHandler from rsocket.rx_support.rx_rsocket import RxRSocket @@ -357,3 +359,33 @@ def handler_factory(): 'pass')))) assert result.data == b'result' + + +@pytest.mark.allow_error_log(regex_filter='(RSocket error REJECTED_SETUP|Setup error)') +async def test_invalid_metadata_for_routing(lazy_pipe): + router = RequestRouter() + + async def authenticate(path: str, authentication: Authentication): + if not isinstance(authentication, AuthenticationSimple) or authentication.password != b'pass': + raise Exception('Invalid credentials') + + error_wait = asyncio.Event() + + def client_handler_factory(): + class ClientHandler(BaseRequestHandler): + async def on_error(self, error_code: ErrorCode, payload: Payload): + error_wait.set() + + return ClientHandler() + + @router.response('test.path') + async def response(): + return create_response(b'result') + + def handler_factory(): + return RoutingRequestHandler(router, authentication_verifier=authenticate) + + async with lazy_pipe( + client_arguments={'handler_factory': client_handler_factory}, + server_arguments={'handler_factory': handler_factory}) as (server, client): + await error_wait.wait() diff --git a/tests/test_integrations/test_graphql.py b/tests/test_integrations/test_graphql.py index 0bfd2771..6aff5327 100644 --- a/tests/test_integrations/test_graphql.py +++ b/tests/test_integrations/test_graphql.py @@ -1,3 +1,5 @@ +import asyncio + from gql import Client, gql from rsocket.extensions.mimetypes import WellKnownMimeTypes @@ -75,4 +77,38 @@ def handler_factory(): gql("""{__schema { types { name } } }"""), get_execution_result=True) - assert response.data == expected_schema \ No newline at end of file + assert response.data == expected_schema + + +async def test_graphql_break_loop(lazy_pipe, graphql_schema): + def handler_factory(): + return RoutingRequestHandler(graphql_handler(graphql_schema, 'graphql')) + + async with lazy_pipe( + client_arguments={'metadata_encoding': WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA}, + server_arguments={'handler_factory': handler_factory}) as (server, client): + graphql = Client( + schema=graphql_schema, + transport=RSocketTransport(client), + ) + + responses = [] + i = 0 + async for response in graphql.subscribe_async( + document=gql(""" + subscription { + greetings {message} + } + """), + get_execution_result=True): + responses.append(response.data) + i += 1 + if i > 4: + break + + assert len(responses) == 5 + assert responses[0] == {'greetings': {'message': 'Hello world 0'}} + + await asyncio.sleep(1) + + # assert responses[9] == {'greetings': {'message': 'Hello world 9'}} diff --git a/tests/tools/fixtures_aioquic.py b/tests/tools/fixtures_aioquic.py index 34f56032..2310b272 100644 --- a/tests/tools/fixtures_aioquic.py +++ b/tests/tools/fixtures_aioquic.py @@ -1,3 +1,4 @@ +import logging from asyncio import Event from contextlib import asynccontextmanager from typing import Optional @@ -34,12 +35,16 @@ def store_server(new_server): server = new_server wait_for_server.set() + logging.debug('test quic - starting server') + quic_server = await rsocket_serve(host='localhost', port=unused_tcp_port, configuration=server_configuration, on_server_create=store_server, **(server_arguments or {})) + logging.debug('test quic - server started') + try: # from datetime import timedelta # test_overrides = {'keep_alive_period': timedelta(minutes=20)} @@ -49,7 +54,12 @@ def store_server(new_server): configuration=quic_client_configuration(certificate)) as transport: async with RSocketClient(single_transport_provider(transport), **client_arguments) as client: + logging.debug('test quic - waiting for server to be ready') + await wait_for_server.wait() + + logging.debug('test quic - server and client ready, starting test') + yield server, client finally: if server is not None: