From b4a03664f6341963804045d23980465fa489c4e3 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Mon, 19 Dec 2022 21:02:04 +0200 Subject: [PATCH 1/3] added timeout test for reactivex stream --- .../test_reactivex_disconnect.py | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 tests/test_reactivex/test_reactivex_disconnect.py diff --git a/tests/test_reactivex/test_reactivex_disconnect.py b/tests/test_reactivex/test_reactivex_disconnect.py new file mode 100644 index 00000000..c539be13 --- /dev/null +++ b/tests/test_reactivex/test_reactivex_disconnect.py @@ -0,0 +1,52 @@ +import asyncio +from time import sleep +from typing import Tuple + +import pytest +import reactivex +from reactivex import operators, Observable +from reactivex.scheduler import ThreadPoolScheduler + +from rsocket.frame_helpers import ensure_bytes +from rsocket.payload import Payload +from rsocket.reactivex.reactivex_client import ReactiveXClient +from rsocket.reactivex.reactivex_handler import BaseReactivexHandler +from rsocket.reactivex.reactivex_handler_adapter import reactivex_handler_factory +from rsocket.rsocket_client import RSocketClient +from rsocket.rsocket_server import RSocketServer + + +async def test_serve_reactivex_stream_disconnected(pipe: Tuple[RSocketServer, RSocketClient]): + sent_counter = 0 + + def increment_sent_counter(): + nonlocal sent_counter + sent_counter += 1 + + def delayed(message): + sleep(0.3) + return message + + class Handler(BaseReactivexHandler): + + async def request_stream(self, payload: Payload) -> Observable: + return reactivex.from_((delayed('Feed Item: {}'.format(index)) for index in range(10)), + ThreadPoolScheduler()).pipe( + operators.do_action(on_next=lambda _: increment_sent_counter()), + operators.map(lambda _: Payload(ensure_bytes(_))) + ) + + server, client = pipe + + server.set_handler_using_factory(reactivex_handler_factory(Handler)) + + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for( + ReactiveXClient(client).request_stream(Payload(b'request text'), + request_limit=2).pipe( + operators.map(lambda payload: payload.data), + operators.to_list() + ), 1) + + assert 0 < sent_counter < 5 + From 793a9e19769b6e9dbf4a4ae518a057c2647f9ac4 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Mon, 19 Dec 2022 21:07:52 +0200 Subject: [PATCH 2/3] added timeout test for reactivex stream --- .../test_reactivex_disconnect.py | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/tests/test_reactivex/test_reactivex_disconnect.py b/tests/test_reactivex/test_reactivex_disconnect.py index c539be13..429b33a1 100644 --- a/tests/test_reactivex/test_reactivex_disconnect.py +++ b/tests/test_reactivex/test_reactivex_disconnect.py @@ -4,9 +4,12 @@ import pytest import reactivex +from apscheduler.schedulers.asyncio import AsyncIOScheduler from reactivex import operators, Observable +from reactivex.operators._tofuture import to_future_ from reactivex.scheduler import ThreadPoolScheduler +from rsocket.exceptions import RSocketProtocolError from rsocket.frame_helpers import ensure_bytes from rsocket.payload import Payload from rsocket.reactivex.reactivex_client import ReactiveXClient @@ -40,13 +43,22 @@ async def request_stream(self, payload: Payload) -> Observable: server.set_handler_using_factory(reactivex_handler_factory(Handler)) - with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for( - ReactiveXClient(client).request_stream(Payload(b'request text'), - request_limit=2).pipe( - operators.map(lambda payload: payload.data), - operators.to_list() - ), 1) + async def request(): + await ReactiveXClient(client).request_stream(Payload(b'request text'), + request_limit=2).pipe( + operators.map(lambda payload: payload.data), + operators.to_list(), + to_future_(scheduler=AsyncIOScheduler(loop=asyncio.get_event_loop())) + ) + + task = asyncio.create_task(request()) + + await asyncio.sleep(1) + + await client.close() assert 0 < sent_counter < 5 + with pytest.raises(RSocketProtocolError): + await task + From ae4d5474892962cd1239605b3bee3388a870a023 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Tue, 20 Dec 2022 11:08:26 +0200 Subject: [PATCH 3/3] fixed timeout test for reactivex stream --- tests/test_reactivex/test_reactivex_disconnect.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/test_reactivex/test_reactivex_disconnect.py b/tests/test_reactivex/test_reactivex_disconnect.py index 429b33a1..b65507ff 100644 --- a/tests/test_reactivex/test_reactivex_disconnect.py +++ b/tests/test_reactivex/test_reactivex_disconnect.py @@ -4,10 +4,10 @@ import pytest import reactivex -from apscheduler.schedulers.asyncio import AsyncIOScheduler from reactivex import operators, Observable from reactivex.operators._tofuture import to_future_ from reactivex.scheduler import ThreadPoolScheduler +from reactivex.scheduler.eventloop import AsyncIOScheduler from rsocket.exceptions import RSocketProtocolError from rsocket.frame_helpers import ensure_bytes @@ -45,7 +45,7 @@ async def request_stream(self, payload: Payload) -> Observable: async def request(): await ReactiveXClient(client).request_stream(Payload(b'request text'), - request_limit=2).pipe( + request_limit=2).pipe( operators.map(lambda payload: payload.data), operators.to_list(), to_future_(scheduler=AsyncIOScheduler(loop=asyncio.get_event_loop())) @@ -61,4 +61,3 @@ async def request(): with pytest.raises(RSocketProtocolError): await task -