diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 4740f5ef..9da71492 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -36,7 +36,7 @@ jobs: flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest run: | - pytest -n 1 --cov-report=html --cov --ignore=examples tests + pytest -n 4 --cov-report=html --cov --ignore=examples tests - name: Archive code coverage html report uses: actions/upload-artifact@v2 with: diff --git a/examples/tutorial/reactivex/chat_client.py b/examples/tutorial/reactivex/chat_client.py index c1a5abba..81403016 100644 --- a/examples/tutorial/reactivex/chat_client.py +++ b/examples/tutorial/reactivex/chat_client.py @@ -129,7 +129,7 @@ async def download(self, file_name): ) return await ReactiveXClient(self._rsocket).request_response(request).pipe( - operators.map(lambda _:_.data), + operators.map(lambda _: _.data), operators.last() ) diff --git a/examples/tutorial/reactivex/chat_server.py b/examples/tutorial/reactivex/chat_server.py index cad76fdf..21b0c4c3 100644 --- a/examples/tutorial/reactivex/chat_server.py +++ b/examples/tutorial/reactivex/chat_server.py @@ -7,6 +7,7 @@ from dataclasses import dataclass, field from typing import Dict, Optional, Set, Callable from weakref import WeakValueDictionary, WeakSet + import reactivex from more_itertools import first from reactivex import Observable, operators, Subject, Observer diff --git a/examples/tutorial/step4/chat_server.py b/examples/tutorial/step4/chat_server.py index f09666a0..bdde1934 100644 --- a/examples/tutorial/step4/chat_server.py +++ b/examples/tutorial/step4/chat_server.py @@ -8,8 +8,9 @@ from typing import Dict, Optional, Set, Awaitable from weakref import WeakValueDictionary, WeakSet -from examples.tutorial.step4.models import (Message, chat_filename_mimetype, dataclass_to_payload) from more_itertools import first + +from examples.tutorial.step4.models import (Message, chat_filename_mimetype, dataclass_to_payload) from reactivestreams.publisher import DefaultPublisher, Publisher from reactivestreams.subscriber import Subscriber from reactivestreams.subscription import DefaultSubscription diff --git a/examples/tutorial/step6/chat_client.py b/examples/tutorial/step6/chat_client.py index 06344bf8..e0a118d0 100644 --- a/examples/tutorial/step6/chat_client.py +++ b/examples/tutorial/step6/chat_client.py @@ -100,7 +100,6 @@ async def send_statistics(self): await self._rsocket.fire_and_forget(payload) def listen_for_statistics(self) -> StatisticsHandler: - self._statistics_subscriber = StatisticsHandler() self._rsocket.request_channel(Payload(metadata=composite( route('statistics') @@ -152,7 +151,6 @@ async def main(): async with RSocketClient(single_transport_provider(TransportTCP(*connection2)), metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA, fragment_size_bytes=1_000_000) as client2: - user1 = ChatClient(client1) user2 = ChatClient(client2) diff --git a/examples/tutorial/step6/chat_server.py b/examples/tutorial/step6/chat_server.py index c762fdf3..77398141 100644 --- a/examples/tutorial/step6/chat_server.py +++ b/examples/tutorial/step6/chat_server.py @@ -7,6 +7,7 @@ from dataclasses import dataclass, field from typing import Dict, Optional, Set, Awaitable, Tuple from weakref import WeakValueDictionary, WeakSet + from more_itertools import first from examples.tutorial.step6.models import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest, diff --git a/examples/tutorial/step7/chat_server.py b/examples/tutorial/step7/chat_server.py index f1ab0782..3115359c 100644 --- a/examples/tutorial/step7/chat_server.py +++ b/examples/tutorial/step7/chat_server.py @@ -7,6 +7,7 @@ from dataclasses import dataclass, field from typing import Dict, Optional, Set, Awaitable, Tuple from weakref import WeakValueDictionary, WeakSet + from more_itertools import first from examples.tutorial.step6.models import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest, @@ -29,6 +30,7 @@ class SessionId(str): # allow weak reference pass + @dataclass() class UserSessionData: username: str diff --git a/rsocket/frame.py b/rsocket/frame.py index 2af3eef4..a6146691 100644 --- a/rsocket/frame.py +++ b/rsocket/frame.py @@ -759,5 +759,6 @@ def serialize_with_frame_size_header(frame: Frame) -> bytes: RequestChannelFrame: 10, } + def get_header_length(frame: FragmentableFrame) -> int: return frame_header_length[frame.__class__] diff --git a/rsocket/frame_helpers.py b/rsocket/frame_helpers.py index 09e817cb..78863efe 100644 --- a/rsocket/frame_helpers.py +++ b/rsocket/frame_helpers.py @@ -3,7 +3,6 @@ to avoid circular dependencies. """ - import struct from typing import Union, Tuple, Optional diff --git a/rsocket/rsocket_base.py b/rsocket/rsocket_base.py index 710984ba..d2848b5f 100644 --- a/rsocket/rsocket_base.py +++ b/rsocket/rsocket_base.py @@ -381,8 +381,8 @@ async def _handle_next_frame(self, frame: Frame, async_frame_handler_by_type): elif self._stream_control.handle_stream(complete_frame): return else: - logger().debug('%s: Dropping frame from unknown stream %d', self._log_identifier(), - complete_frame.stream_id) + logger().warning('%s: Dropping frame from unknown stream %d', self._log_identifier(), + complete_frame.stream_id) async def _handle_frame_by_type(self, frame: Frame, async_frame_handler_by_type): frame_handler = async_frame_handler_by_type.get(type(frame), async_noop) diff --git a/rsocket/rx_support/rx_handler_adapter.py b/rsocket/rx_support/rx_handler_adapter.py index 48b29c26..4844fafd 100644 --- a/rsocket/rx_support/rx_handler_adapter.py +++ b/rsocket/rx_support/rx_handler_adapter.py @@ -9,7 +9,7 @@ from rsocket.error_codes import ErrorCode from rsocket.payload import Payload from rsocket.request_handler import RequestHandler -from rsocket.rx_support.back_pressure_publisher import BackPressurePublisher, observable_to_publisher +from rsocket.rx_support.back_pressure_publisher import observable_to_publisher from rsocket.rx_support.from_rsocket_publisher import RxSubscriberFromObserver from rsocket.rx_support.rx_handler import RxHandler diff --git a/rsocket/streams/helpers.py b/rsocket/streams/helpers.py index e91a2fa2..b6ec67dc 100644 --- a/rsocket/streams/helpers.py +++ b/rsocket/streams/helpers.py @@ -10,5 +10,3 @@ async def async_generator_from_queue(queue: Queue, stop_value=None): else: yield value queue.task_done() - - diff --git a/rsocket/transports/aioquic_transport.py b/rsocket/transports/aioquic_transport.py index 53230772..3e35ce25 100644 --- a/rsocket/transports/aioquic_transport.py +++ b/rsocket/transports/aioquic_transport.py @@ -91,6 +91,7 @@ async def send_frame(self, frame: Frame): with wrap_transport_exception(): await self._quic_protocol.query(frame) + await asyncio.sleep(0) async def incoming_data_listener(self): try: diff --git a/rsocket/transports/http3_transport.py b/rsocket/transports/http3_transport.py index 3a179f3f..38521fdc 100644 --- a/rsocket/transports/http3_transport.py +++ b/rsocket/transports/http3_transport.py @@ -161,6 +161,7 @@ async def send_frame(self, frame: Frame): try: data = serialize_with_frame_size_header(frame) await self._websocket.send_bytes(data) + await asyncio.sleep(0) except WebSocketDisconnect: self._disconnect_event.set() diff --git a/setup.cfg b/setup.cfg index 0030e1e3..ccdbf362 100644 --- a/setup.cfg +++ b/setup.cfg @@ -10,7 +10,7 @@ test = pytest [tool:pytest] addopts = --verbose asyncio_mode = auto -timeout = 7 +timeout = 10 ; TODO: Remove ignoring ResourceWarning after finding out why connection is not always closed after each test (rare event) filterwarnings = diff --git a/tests/conftest.py b/tests/conftest.py index 885afe3d..3f77890e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,11 +5,11 @@ import pytest from rsocket.frame_parser import FrameParser -from tests.tools.helpers_aiohttp import pipe_factory_aiohttp_websocket from tests.tools.fixtures_aioquic import pipe_factory_quic from tests.tools.fixtures_http3 import pipe_factory_http3 from tests.tools.fixtures_quart import pipe_factory_quart_websocket from tests.tools.fixtures_tcp import pipe_factory_tcp +from tests.tools.helpers_aiohttp import pipe_factory_aiohttp_websocket pytest_plugins = [ "tests.tools.fixtures_shared", @@ -35,7 +35,7 @@ def setup_logging(level=logging.DEBUG, use_file: bool = False): logging.basicConfig(level=level, handlers=handlers) -setup_logging() +setup_logging(logging.WARN) tested_transports = [ 'tcp', diff --git a/tests/rsocket/test_concurrency.py b/tests/rsocket/test_concurrency.py index 2f47d1bf..1a9c7f5c 100644 --- a/tests/rsocket/test_concurrency.py +++ b/tests/rsocket/test_concurrency.py @@ -1,6 +1,8 @@ import asyncio from typing import Tuple, Optional +import pytest + from rsocket.async_helpers import async_range from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket from rsocket.frame_helpers import ensure_bytes @@ -43,16 +45,17 @@ async def generator(): assert len(results[0].result) == 2000 assert len(results[1].result) == 10 - assert delta > 0.8 + assert delta > 0.2 -async def test_concurrent_fragmented_responses(lazy_pipe_tcp): # check problems with quic and http3 frame boundary +@pytest.mark.timeout(15) +async def test_concurrent_fragmented_responses(lazy_pipe): class Handler(BaseRequestHandler): async def request_response(self, request: Payload): data = 'a' * 100 * int(utf8_decode(request.data)) return create_future(Payload(ensure_bytes(data))) - async with lazy_pipe_tcp( + async with lazy_pipe( server_arguments={'handler_factory': Handler, 'fragment_size_bytes': 100}, client_arguments={'fragment_size_bytes': 100}) as (server, client): request_1 = asyncio.create_task(measure_time(client.request_response(Payload(b'10000')))) @@ -66,4 +69,6 @@ async def request_response(self, request: Payload): assert len(results[0].result.data) == 10000 * 100 assert len(results[1].result.data) == 10 * 100 - assert delta > 0.8 + assert delta > 0.2 + + await asyncio.sleep(2) diff --git a/tests/rsocket/test_internal.py b/tests/rsocket/test_internal.py index f487c95d..ca6e18ad 100644 --- a/tests/rsocket/test_internal.py +++ b/tests/rsocket/test_internal.py @@ -1,6 +1,7 @@ import asyncio import logging from weakref import WeakKeyDictionary + import pytest @@ -21,6 +22,7 @@ async def test_fail_on_error_log(fail_on_error_log): def test_weak_ref(): class S(str): pass + d = WeakKeyDictionary() a = S('abc') d[a] = 1 @@ -37,4 +39,4 @@ async def loop(ii): await asyncio.sleep(0) print(ii + str(i)) - await asyncio.gather(loop('a'), loop('b')) \ No newline at end of file + await asyncio.gather(loop('a'), loop('b')) diff --git a/tests/rsocket/test_multiple_streams.py b/tests/rsocket/test_multiple_streams.py index 74f792f4..d28eae62 100644 --- a/tests/rsocket/test_multiple_streams.py +++ b/tests/rsocket/test_multiple_streams.py @@ -61,7 +61,8 @@ def requester_generator(): sending_done=sending_done ).subscribe(CollectorSubscriber(limit_count=1)) - messages_received_from_server_stream = await AwaitableRSocket(client).request_stream(Payload(b'request text stream')) + messages_received_from_server_stream = await AwaitableRSocket(client).request_stream( + Payload(b'request text stream')) await sending_done.wait() diff --git a/tests/test_reactivex/test_concurrency.py b/tests/test_reactivex/test_concurrency.py index 8aa7b602..8ea5ba3d 100644 --- a/tests/test_reactivex/test_concurrency.py +++ b/tests/test_reactivex/test_concurrency.py @@ -47,4 +47,4 @@ async def test_concurrent_streams(pipe: Tuple[RSocketServer, RSocketClient]): delta = abs(results[0].delta - results[1].delta) - assert delta > 0.8 + assert delta > 0.2 diff --git a/tests/test_reactivex/test_helper.py b/tests/test_reactivex/test_helper.py index 218b5b57..aa702d9f 100644 --- a/tests/test_reactivex/test_helper.py +++ b/tests/test_reactivex/test_helper.py @@ -1,5 +1,3 @@ -import asyncio - import pytest from reactivex import operators, Subject @@ -33,4 +31,3 @@ async def generator(): assert len(result) == expected_n # await asyncio.sleep(1) # wait for task to finish - diff --git a/tests/test_reactivex/test_reactivex_canceled.py b/tests/test_reactivex/test_reactivex_canceled.py index 53899742..c34cfe0e 100644 --- a/tests/test_reactivex/test_reactivex_canceled.py +++ b/tests/test_reactivex/test_reactivex_canceled.py @@ -11,10 +11,10 @@ from reactivestreams.subscription import Subscription from rsocket.error_codes import ErrorCode from rsocket.payload import Payload +from rsocket.reactivex.reactivex_client import ReactiveXClient from rsocket.request_handler import BaseRequestHandler from rsocket.rsocket_client import RSocketClient from rsocket.rsocket_server import RSocketServer -from rsocket.reactivex.reactivex_client import ReactiveXClient from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator diff --git a/tests/test_reactivex/test_reactivex_error.py b/tests/test_reactivex/test_reactivex_error.py index e0da2543..0390ce61 100644 --- a/tests/test_reactivex/test_reactivex_error.py +++ b/tests/test_reactivex/test_reactivex_error.py @@ -4,17 +4,17 @@ import pytest import reactivex from reactivex import operators, Observer -from reactivex.scheduler.scheduler import Scheduler from reactivex.disposable import Disposable +from reactivex.scheduler.scheduler import Scheduler from reactivestreams.publisher import Publisher from reactivestreams.subscriber import Subscriber, DefaultSubscriber from reactivestreams.subscription import Subscription from rsocket.payload import Payload +from rsocket.reactivex.reactivex_client import ReactiveXClient from rsocket.request_handler import BaseRequestHandler from rsocket.rsocket_client import RSocketClient from rsocket.rsocket_server import RSocketServer -from rsocket.reactivex.reactivex_client import ReactiveXClient from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator diff --git a/tests/test_reactivex/test_reactivex_support.py b/tests/test_reactivex/test_reactivex_support.py index a5219117..7ed366bf 100644 --- a/tests/test_reactivex/test_reactivex_support.py +++ b/tests/test_reactivex/test_reactivex_support.py @@ -10,10 +10,10 @@ from reactivestreams.subscription import Subscription from rsocket.helpers import create_future, DefaultPublisherSubscription from rsocket.payload import Payload +from rsocket.reactivex.reactivex_client import ReactiveXClient from rsocket.request_handler import BaseRequestHandler from rsocket.rsocket_client import RSocketClient from rsocket.rsocket_server import RSocketServer -from rsocket.reactivex.reactivex_client import ReactiveXClient from rsocket.streams.empty_stream import EmptyStream from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator diff --git a/tests/test_reactivex/test_reactivex_timeout.py b/tests/test_reactivex/test_reactivex_timeout.py index 7a21cef5..6226e272 100644 --- a/tests/test_reactivex/test_reactivex_timeout.py +++ b/tests/test_reactivex/test_reactivex_timeout.py @@ -8,10 +8,10 @@ from reactivestreams.publisher import Publisher from rsocket.helpers import create_future, DefaultPublisherSubscription from rsocket.payload import Payload +from rsocket.reactivex.reactivex_client import ReactiveXClient from rsocket.request_handler import BaseRequestHandler from rsocket.rsocket_client import RSocketClient from rsocket.rsocket_server import RSocketServer -from rsocket.reactivex.reactivex_client import ReactiveXClient async def test_rx_support_request_stream_cancel_on_timeout(pipe: Tuple[RSocketServer, RSocketClient]): diff --git a/tests/tools/fixtures_shared.py b/tests/tools/fixtures_shared.py index 9a01b617..b72d942c 100644 --- a/tests/tools/fixtures_shared.py +++ b/tests/tools/fixtures_shared.py @@ -6,7 +6,6 @@ from cryptography.hazmat.primitives.asymmetric import ec - def generate_certificate(*, alternative_names, common_name, hash_algorithm, key): subject = issuer = x509.Name( [x509.NameAttribute(x509.NameOID.COMMON_NAME, common_name)] @@ -47,6 +46,3 @@ def generate_ec_certificate(common_name, alternative_names=None, curve=ec.SECP25 @pytest.fixture(scope="session") def generate_test_certificates(): return generate_ec_certificate(common_name="localhost") - - - diff --git a/tests/tools/helpers_aiohttp.py b/tests/tools/helpers_aiohttp.py index e0927509..cb639f78 100644 --- a/tests/tools/helpers_aiohttp.py +++ b/tests/tools/helpers_aiohttp.py @@ -30,4 +30,4 @@ def store_server(new_server): yield server, client await server.close() - assert_no_open_streams(client, server) \ No newline at end of file + assert_no_open_streams(client, server)