Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest -n 1 --cov-report=html --cov --ignore=examples tests
pytest -n 4 --cov-report=html --cov --ignore=examples tests
- name: Archive code coverage html report
uses: actions/upload-artifact@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorial/reactivex/chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async def download(self, file_name):
)

return await ReactiveXClient(self._rsocket).request_response(request).pipe(
operators.map(lambda _:_.data),
operators.map(lambda _: _.data),
operators.last()
)

Expand Down
1 change: 1 addition & 0 deletions examples/tutorial/reactivex/chat_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dataclasses import dataclass, field
from typing import Dict, Optional, Set, Callable
from weakref import WeakValueDictionary, WeakSet

import reactivex
from more_itertools import first
from reactivex import Observable, operators, Subject, Observer
Expand Down
3 changes: 2 additions & 1 deletion examples/tutorial/step4/chat_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
from typing import Dict, Optional, Set, Awaitable
from weakref import WeakValueDictionary, WeakSet

from examples.tutorial.step4.models import (Message, chat_filename_mimetype, dataclass_to_payload)
from more_itertools import first

from examples.tutorial.step4.models import (Message, chat_filename_mimetype, dataclass_to_payload)
from reactivestreams.publisher import DefaultPublisher, Publisher
from reactivestreams.subscriber import Subscriber
from reactivestreams.subscription import DefaultSubscription
Expand Down
2 changes: 0 additions & 2 deletions examples/tutorial/step6/chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ async def send_statistics(self):
await self._rsocket.fire_and_forget(payload)

def listen_for_statistics(self) -> StatisticsHandler:

self._statistics_subscriber = StatisticsHandler()
self._rsocket.request_channel(Payload(metadata=composite(
route('statistics')
Expand Down Expand Up @@ -152,7 +151,6 @@ async def main():
async with RSocketClient(single_transport_provider(TransportTCP(*connection2)),
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA,
fragment_size_bytes=1_000_000) as client2:

user1 = ChatClient(client1)
user2 = ChatClient(client2)

Expand Down
1 change: 1 addition & 0 deletions examples/tutorial/step6/chat_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dataclasses import dataclass, field
from typing import Dict, Optional, Set, Awaitable, Tuple
from weakref import WeakValueDictionary, WeakSet

from more_itertools import first

from examples.tutorial.step6.models import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest,
Expand Down
2 changes: 2 additions & 0 deletions examples/tutorial/step7/chat_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dataclasses import dataclass, field
from typing import Dict, Optional, Set, Awaitable, Tuple
from weakref import WeakValueDictionary, WeakSet

from more_itertools import first

from examples.tutorial.step6.models import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest,
Expand All @@ -29,6 +30,7 @@
class SessionId(str): # allow weak reference
pass


@dataclass()
class UserSessionData:
username: str
Expand Down
1 change: 1 addition & 0 deletions rsocket/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,5 +759,6 @@ def serialize_with_frame_size_header(frame: Frame) -> bytes:
RequestChannelFrame: 10,
}


def get_header_length(frame: FragmentableFrame) -> int:
return frame_header_length[frame.__class__]
1 change: 0 additions & 1 deletion rsocket/frame_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
to avoid circular dependencies.
"""


import struct
from typing import Union, Tuple, Optional

Expand Down
4 changes: 2 additions & 2 deletions rsocket/rsocket_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,8 @@ async def _handle_next_frame(self, frame: Frame, async_frame_handler_by_type):
elif self._stream_control.handle_stream(complete_frame):
return
else:
logger().debug('%s: Dropping frame from unknown stream %d', self._log_identifier(),
complete_frame.stream_id)
logger().warning('%s: Dropping frame from unknown stream %d', self._log_identifier(),
complete_frame.stream_id)

async def _handle_frame_by_type(self, frame: Frame, async_frame_handler_by_type):
frame_handler = async_frame_handler_by_type.get(type(frame), async_noop)
Expand Down
2 changes: 1 addition & 1 deletion rsocket/rx_support/rx_handler_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from rsocket.error_codes import ErrorCode
from rsocket.payload import Payload
from rsocket.request_handler import RequestHandler
from rsocket.rx_support.back_pressure_publisher import BackPressurePublisher, observable_to_publisher
from rsocket.rx_support.back_pressure_publisher import observable_to_publisher
from rsocket.rx_support.from_rsocket_publisher import RxSubscriberFromObserver
from rsocket.rx_support.rx_handler import RxHandler

Expand Down
2 changes: 0 additions & 2 deletions rsocket/streams/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,3 @@ async def async_generator_from_queue(queue: Queue, stop_value=None):
else:
yield value
queue.task_done()


1 change: 1 addition & 0 deletions rsocket/transports/aioquic_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ async def send_frame(self, frame: Frame):

with wrap_transport_exception():
await self._quic_protocol.query(frame)
await asyncio.sleep(0)

async def incoming_data_listener(self):
try:
Expand Down
1 change: 1 addition & 0 deletions rsocket/transports/http3_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ async def send_frame(self, frame: Frame):
try:
data = serialize_with_frame_size_header(frame)
await self._websocket.send_bytes(data)
await asyncio.sleep(0)
except WebSocketDisconnect:
self._disconnect_event.set()

Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ test = pytest
[tool:pytest]
addopts = --verbose
asyncio_mode = auto
timeout = 7
timeout = 10

; TODO: Remove ignoring ResourceWarning after finding out why connection is not always closed after each test (rare event)
filterwarnings =
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import pytest

from rsocket.frame_parser import FrameParser
from tests.tools.helpers_aiohttp import pipe_factory_aiohttp_websocket
from tests.tools.fixtures_aioquic import pipe_factory_quic
from tests.tools.fixtures_http3 import pipe_factory_http3
from tests.tools.fixtures_quart import pipe_factory_quart_websocket
from tests.tools.fixtures_tcp import pipe_factory_tcp
from tests.tools.helpers_aiohttp import pipe_factory_aiohttp_websocket

pytest_plugins = [
"tests.tools.fixtures_shared",
Expand All @@ -35,7 +35,7 @@ def setup_logging(level=logging.DEBUG, use_file: bool = False):
logging.basicConfig(level=level, handlers=handlers)


setup_logging()
setup_logging(logging.WARN)

tested_transports = [
'tcp',
Expand Down
13 changes: 9 additions & 4 deletions tests/rsocket/test_concurrency.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
from typing import Tuple, Optional

import pytest

from rsocket.async_helpers import async_range
from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket
from rsocket.frame_helpers import ensure_bytes
Expand Down Expand Up @@ -43,16 +45,17 @@ async def generator():

assert len(results[0].result) == 2000
assert len(results[1].result) == 10
assert delta > 0.8
assert delta > 0.2


async def test_concurrent_fragmented_responses(lazy_pipe_tcp): # check problems with quic and http3 frame boundary
@pytest.mark.timeout(15)
async def test_concurrent_fragmented_responses(lazy_pipe):
class Handler(BaseRequestHandler):
async def request_response(self, request: Payload):
data = 'a' * 100 * int(utf8_decode(request.data))
return create_future(Payload(ensure_bytes(data)))

async with lazy_pipe_tcp(
async with lazy_pipe(
server_arguments={'handler_factory': Handler, 'fragment_size_bytes': 100},
client_arguments={'fragment_size_bytes': 100}) as (server, client):
request_1 = asyncio.create_task(measure_time(client.request_response(Payload(b'10000'))))
Expand All @@ -66,4 +69,6 @@ async def request_response(self, request: Payload):

assert len(results[0].result.data) == 10000 * 100
assert len(results[1].result.data) == 10 * 100
assert delta > 0.8
assert delta > 0.2

await asyncio.sleep(2)
4 changes: 3 additions & 1 deletion tests/rsocket/test_internal.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
from weakref import WeakKeyDictionary

import pytest


Expand All @@ -21,6 +22,7 @@ async def test_fail_on_error_log(fail_on_error_log):
def test_weak_ref():
class S(str):
pass

d = WeakKeyDictionary()
a = S('abc')
d[a] = 1
Expand All @@ -37,4 +39,4 @@ async def loop(ii):
await asyncio.sleep(0)
print(ii + str(i))

await asyncio.gather(loop('a'), loop('b'))
await asyncio.gather(loop('a'), loop('b'))
3 changes: 2 additions & 1 deletion tests/rsocket/test_multiple_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def requester_generator():
sending_done=sending_done
).subscribe(CollectorSubscriber(limit_count=1))

messages_received_from_server_stream = await AwaitableRSocket(client).request_stream(Payload(b'request text stream'))
messages_received_from_server_stream = await AwaitableRSocket(client).request_stream(
Payload(b'request text stream'))

await sending_done.wait()

Expand Down
2 changes: 1 addition & 1 deletion tests/test_reactivex/test_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ async def test_concurrent_streams(pipe: Tuple[RSocketServer, RSocketClient]):

delta = abs(results[0].delta - results[1].delta)

assert delta > 0.8
assert delta > 0.2
3 changes: 0 additions & 3 deletions tests/test_reactivex/test_helper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import asyncio

import pytest
from reactivex import operators, Subject

Expand Down Expand Up @@ -33,4 +31,3 @@ async def generator():
assert len(result) == expected_n

# await asyncio.sleep(1) # wait for task to finish

2 changes: 1 addition & 1 deletion tests/test_reactivex/test_reactivex_canceled.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
from reactivestreams.subscription import Subscription
from rsocket.error_codes import ErrorCode
from rsocket.payload import Payload
from rsocket.reactivex.reactivex_client import ReactiveXClient
from rsocket.request_handler import BaseRequestHandler
from rsocket.rsocket_client import RSocketClient
from rsocket.rsocket_server import RSocketServer
from rsocket.reactivex.reactivex_client import ReactiveXClient
from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator


Expand Down
4 changes: 2 additions & 2 deletions tests/test_reactivex/test_reactivex_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
import pytest
import reactivex
from reactivex import operators, Observer
from reactivex.scheduler.scheduler import Scheduler
from reactivex.disposable import Disposable
from reactivex.scheduler.scheduler import Scheduler

from reactivestreams.publisher import Publisher
from reactivestreams.subscriber import Subscriber, DefaultSubscriber
from reactivestreams.subscription import Subscription
from rsocket.payload import Payload
from rsocket.reactivex.reactivex_client import ReactiveXClient
from rsocket.request_handler import BaseRequestHandler
from rsocket.rsocket_client import RSocketClient
from rsocket.rsocket_server import RSocketServer
from rsocket.reactivex.reactivex_client import ReactiveXClient
from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator


Expand Down
2 changes: 1 addition & 1 deletion tests/test_reactivex/test_reactivex_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from reactivestreams.subscription import Subscription
from rsocket.helpers import create_future, DefaultPublisherSubscription
from rsocket.payload import Payload
from rsocket.reactivex.reactivex_client import ReactiveXClient
from rsocket.request_handler import BaseRequestHandler
from rsocket.rsocket_client import RSocketClient
from rsocket.rsocket_server import RSocketServer
from rsocket.reactivex.reactivex_client import ReactiveXClient
from rsocket.streams.empty_stream import EmptyStream
from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator

Expand Down
2 changes: 1 addition & 1 deletion tests/test_reactivex/test_reactivex_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from reactivestreams.publisher import Publisher
from rsocket.helpers import create_future, DefaultPublisherSubscription
from rsocket.payload import Payload
from rsocket.reactivex.reactivex_client import ReactiveXClient
from rsocket.request_handler import BaseRequestHandler
from rsocket.rsocket_client import RSocketClient
from rsocket.rsocket_server import RSocketServer
from rsocket.reactivex.reactivex_client import ReactiveXClient


async def test_rx_support_request_stream_cancel_on_timeout(pipe: Tuple[RSocketServer, RSocketClient]):
Expand Down
4 changes: 0 additions & 4 deletions tests/tools/fixtures_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from cryptography.hazmat.primitives.asymmetric import ec



def generate_certificate(*, alternative_names, common_name, hash_algorithm, key):
subject = issuer = x509.Name(
[x509.NameAttribute(x509.NameOID.COMMON_NAME, common_name)]
Expand Down Expand Up @@ -47,6 +46,3 @@ def generate_ec_certificate(common_name, alternative_names=None, curve=ec.SECP25
@pytest.fixture(scope="session")
def generate_test_certificates():
return generate_ec_certificate(common_name="localhost")



2 changes: 1 addition & 1 deletion tests/tools/helpers_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ def store_server(new_server):
yield server, client

await server.close()
assert_no_open_streams(client, server)
assert_no_open_streams(client, server)