Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
Rx==3.2.0
aiohttp==3.10.2

aiohttp==3.11.12; python_version > "3.8"
aiohttp==3.10.11; python_version == "3.8"

aioquic==1.2.0
asyncstdlib==3.12.5
asyncclick==8.1.7.2
asyncstdlib==3.13.0

asyncclick==8.1.8; python_version > "3.8"
asyncclick==8.1.7.2; python_version == "3.8"

coverage==6.5.0
coveralls==3.3.1
decoy==2.1.1
flake8==7.1.1
flake8==7.1.2

pytest-asyncio==0.25.3; python_version > "3.8"
pytest-asyncio==0.23.4; python_version == "3.8"
Expand All @@ -16,21 +22,34 @@ pytest-cov==4.1.0
pytest-profiling==1.8.1; python_version > "3.8"
pytest-profiling==1.7.0; python_version == "3.8"

pytest-rerunfailures==13.0
pytest-rerunfailures==15.0; python_version > "3.8"
pytest-rerunfailures==14.0; python_version == "3.8"

pytest-timeout==2.3.1
pytest-xdist==3.6.1

pytest==8.3.4; python_version > "3.8"
pytest==7.4.4; python_version == "3.8"

quart==0.19.9
quart==0.20.0; python_version > "3.8"
quart==0.19.9; python_version == "3.8"

reactivex==4.0.4
starlette==0.40.0

starlette==0.45.3; python_version > "3.8"
starlette==0.44.0; python_version == "3.8"

cbitstruct==1.1.0; python_version <= "3.12"
cloudevents==1.11.0
pydantic==1.10.18
Werkzeug==3.0.4
graphql-core==3.2.5
gql==3.5.0
websockets==13.1
pydantic==1.10.21

Werkzeug==3.1.3; python_version > "3.8"
Werkzeug==3.0.6; python_version == "3.8"

graphql-core==3.2.3
gql==3.5.1

websockets==15.0; python_version > "3.8"
websockets==13.1; python_version == "3.8"

asyncwebsockets==0.9.4
29 changes: 23 additions & 6 deletions rsocket/graphql/rsocket_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from reactivestreams.subscriber import DefaultSubscriber
from rsocket.extensions.helpers import composite, route
from rsocket.frame_helpers import str_to_bytes
from rsocket.logger import logger
from rsocket.payload import Payload
from rsocket.rsocket_client import RSocketClient

Expand Down Expand Up @@ -104,6 +105,14 @@ def on_complete(self):
def on_error(self, exception: Exception):
self._received_queue.put_nowait(exception)

def cancel(self):
if self.subscription is not None:
self.subscription.cancel()

def request(self, n: int):
if self.subscription is not None:
self.subscription.request(n)

rsocket_payload = self._create_rsocket_payload(document, variable_values, operation_name)

received_queue = Queue()
Expand All @@ -112,12 +121,20 @@ def on_error(self, exception: Exception):
self._rsocket_client.request_stream(rsocket_payload).subscribe(subscriber)

while True:
response = await received_queue.get()
try:
response = await received_queue.get()

if isinstance(response, Exception):
raise response

if response is complete_object:
break

if isinstance(response, Exception):
raise response
execution_result = self._response_to_execution_result(response)

if response is complete_object:
break
yield execution_result
except GeneratorExit:
logger().debug('Generator exited')
subscriber.cancel()
return

yield self._response_to_execution_result(response)
5 changes: 5 additions & 0 deletions rsocket/transports/aioquic_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ async def send_frame(self, frame: Frame):

async def incoming_data_listener(self):
try:
logger().debug('Quic - Listener started')

await self._quic_protocol.wait_connected()

while True:
Expand All @@ -118,8 +120,11 @@ async def incoming_data_listener(self):
logger().debug('Asyncio task canceled: incoming_data_listener')
except Exception:
self._incoming_frame_queue.put_nowait(RSocketTransportError())
finally:
logger().debug('Quic - Listener stopped')

async def close(self):
logger().debug('Quic - Closing transport')
await cancel_if_task_exists(self._listener)
self._quic_protocol.close()

Expand Down
3 changes: 3 additions & 0 deletions tests/rsocket/test_concurrency.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging
from typing import Tuple, Optional

import pytest
Expand Down Expand Up @@ -76,6 +77,8 @@ async def run():
request_2 = asyncio.create_task(measure_time(client.request_response(Payload(b'10'))))
return await request_1, await request_2

logging.debug("Starting concurrent requests")

measure_result = await measure_time(run())

results = measure_result.result
Expand Down
32 changes: 32 additions & 0 deletions tests/rsocket/test_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

from reactivestreams.subscriber import DefaultSubscriber
from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket
from rsocket.error_codes import ErrorCode
from rsocket.extensions.authentication import Authentication, AuthenticationSimple
from rsocket.extensions.composite_metadata import CompositeMetadata
from rsocket.extensions.helpers import route, composite, authenticate_simple
from rsocket.extensions.mimetypes import WellKnownMimeTypes
from rsocket.helpers import create_response
from rsocket.payload import Payload
from rsocket.request_handler import BaseRequestHandler
from rsocket.routing.request_router import RequestRouter
from rsocket.routing.routing_request_handler import RoutingRequestHandler
from rsocket.rx_support.rx_rsocket import RxRSocket
Expand Down Expand Up @@ -357,3 +359,33 @@ def handler_factory():
'pass'))))

assert result.data == b'result'


@pytest.mark.allow_error_log(regex_filter='(RSocket error REJECTED_SETUP|Setup error)')
async def test_invalid_metadata_for_routing(lazy_pipe):
router = RequestRouter()

async def authenticate(path: str, authentication: Authentication):
if not isinstance(authentication, AuthenticationSimple) or authentication.password != b'pass':
raise Exception('Invalid credentials')

error_wait = asyncio.Event()

def client_handler_factory():
class ClientHandler(BaseRequestHandler):
async def on_error(self, error_code: ErrorCode, payload: Payload):
error_wait.set()

return ClientHandler()

@router.response('test.path')
async def response():
return create_response(b'result')

def handler_factory():
return RoutingRequestHandler(router, authentication_verifier=authenticate)

async with lazy_pipe(
client_arguments={'handler_factory': client_handler_factory},
server_arguments={'handler_factory': handler_factory}) as (server, client):
await error_wait.wait()
38 changes: 37 additions & 1 deletion tests/test_integrations/test_graphql.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio

from gql import Client, gql

from rsocket.extensions.mimetypes import WellKnownMimeTypes
Expand Down Expand Up @@ -75,4 +77,38 @@ def handler_factory():
gql("""{__schema { types { name } } }"""),
get_execution_result=True)

assert response.data == expected_schema
assert response.data == expected_schema


async def test_graphql_break_loop(lazy_pipe, graphql_schema):
def handler_factory():
return RoutingRequestHandler(graphql_handler(graphql_schema, 'graphql'))

async with lazy_pipe(
client_arguments={'metadata_encoding': WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA},
server_arguments={'handler_factory': handler_factory}) as (server, client):
graphql = Client(
schema=graphql_schema,
transport=RSocketTransport(client),
)

responses = []
i = 0
async for response in graphql.subscribe_async(
document=gql("""
subscription {
greetings {message}
}
"""),
get_execution_result=True):
responses.append(response.data)
i += 1
if i > 4:
break

assert len(responses) == 5
assert responses[0] == {'greetings': {'message': 'Hello world 0'}}

await asyncio.sleep(1)

# assert responses[9] == {'greetings': {'message': 'Hello world 9'}}
10 changes: 10 additions & 0 deletions tests/tools/fixtures_aioquic.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from asyncio import Event
from contextlib import asynccontextmanager
from typing import Optional
Expand Down Expand Up @@ -34,12 +35,16 @@ def store_server(new_server):
server = new_server
wait_for_server.set()

logging.debug('test quic - starting server')

quic_server = await rsocket_serve(host='localhost',
port=unused_tcp_port,
configuration=server_configuration,
on_server_create=store_server,
**(server_arguments or {}))

logging.debug('test quic - server started')

try:
# from datetime import timedelta
# test_overrides = {'keep_alive_period': timedelta(minutes=20)}
Expand All @@ -49,7 +54,12 @@ def store_server(new_server):
configuration=quic_client_configuration(certificate)) as transport:
async with RSocketClient(single_transport_provider(transport),
**client_arguments) as client:
logging.debug('test quic - waiting for server to be ready')

await wait_for_server.wait()

logging.debug('test quic - server and client ready, starting test')

yield server, client
finally:
if server is not None:
Expand Down
Loading