Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ reactivex==4.0.4
starlette==0.22.0
asyncclick==8.1.3.4
pytest-profiling==1.7.0
pytest-xdist==3.0.2
pytest-xdist==3.0.2
decoy==1.11.1
2 changes: 1 addition & 1 deletion rsocket/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ def exception_to_error_frame(stream_id: int, exception: Exception) -> ErrorFrame
frame.data = ensure_bytes(exception.data)
else:
frame.error_code = ErrorCode.APPLICATION_ERROR
frame.data = str(exception).encode()
frame.data = ensure_bytes(str(exception))

return frame

Expand Down
4 changes: 1 addition & 3 deletions rsocket/reactivex/reactivex_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
from datetime import timedelta
from typing import Optional, Union, Callable

import reactivex
from reactivex import Observable, Subject

from rsocket.error_codes import ErrorCode
from rsocket.extensions.composite_metadata import CompositeMetadata
from rsocket.helpers import create_error_future
from rsocket.logger import logger
from rsocket.payload import Payload
from rsocket.reactivex.reactivex_channel import ReactivexChannel
Expand Down Expand Up @@ -82,7 +80,7 @@ async def request_fire_and_forget(self, payload: Payload):
"""The requester isn't listening for errors. Nothing to do."""

async def request_response(self, payload: Payload) -> Observable:
return reactivex.from_future(create_error_future(RuntimeError('Not implemented')))
raise RuntimeError('Not implemented')

async def request_stream(self, payload: Payload) -> Observable:
raise RuntimeError('Not implemented')
Expand Down
3 changes: 1 addition & 2 deletions rsocket/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from reactivestreams.subscriber import Subscriber
from rsocket.error_codes import ErrorCode
from rsocket.extensions.composite_metadata import CompositeMetadata
from rsocket.helpers import create_error_future
from rsocket.local_typing import Awaitable
from rsocket.logger import logger
from rsocket.payload import Payload
Expand Down Expand Up @@ -90,7 +89,7 @@ async def on_metadata_push(self, payload: Payload):
"""Nothing by default"""

async def request_response(self, payload: Payload) -> Awaitable[Payload]:
return create_error_future(RuntimeError('Not implemented'))
raise RuntimeError('Not implemented')

async def request_stream(self, payload: Payload) -> Publisher:
raise RuntimeError('Not implemented')
Expand Down
4 changes: 1 addition & 3 deletions rsocket/rx_support/rx_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@
from datetime import timedelta
from typing import Optional, Union, Callable

import rx
from rx import Observable
from rx.core.typing import Subject

from rsocket.error_codes import ErrorCode
from rsocket.extensions.composite_metadata import CompositeMetadata
from rsocket.helpers import create_error_future
from rsocket.logger import logger
from rsocket.payload import Payload
from rsocket.rx_support.rx_channel import RxChannel
Expand Down Expand Up @@ -83,7 +81,7 @@ async def request_fire_and_forget(self, payload: Payload):
"""The requester isn't listening for errors. Nothing to do."""

async def request_response(self, payload: Payload) -> Observable:
return rx.from_future(create_error_future(RuntimeError('Not implemented')))
raise RuntimeError('Not implemented')

async def request_stream(self, payload: Payload) -> Observable:
raise RuntimeError('Not implemented')
Expand Down
45 changes: 44 additions & 1 deletion tests/rsocket/test_cli_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import tempfile

import pytest
from decoy import Decoy

from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket
from rsocket.cli.command import parse_uri, build_composite_metadata, create_request_payload, get_metadata_value, \
create_setup_payload, normalize_data, normalize_limit_rate, RequestType, get_request_type, parse_headers, \
normalize_metadata_mime_type
normalize_metadata_mime_type, execute_request
from rsocket.extensions.helpers import route, authenticate_simple, authenticate_bearer
from rsocket.extensions.mimetypes import WellKnownMimeTypes
from rsocket.frame import MAX_REQUEST_N
from rsocket.helpers import create_future
from rsocket.payload import Payload
from tests.rsocket.helpers import create_data

Expand Down Expand Up @@ -175,3 +178,43 @@ def test_normalize_metadata_mime_type(composite_items, metadata_mime_type, expec
actual = normalize_metadata_mime_type(composite_items, metadata_mime_type)

assert actual == expected


async def test_execute_request_response(decoy: Decoy):
client = decoy.mock(cls=AwaitableRSocket)

decoy.when(await client.request_response(Payload())).then_return(Payload(b'abc'))

result = await execute_request(client, RequestType.response, 3, Payload())

assert result.data == b'abc'


async def test_execute_request_stream(decoy: Decoy):
client = decoy.mock(cls=AwaitableRSocket)

decoy.when(await client.request_stream(Payload(), limit_rate=3)).then_return([Payload(b'abc')])

result = await execute_request(client, RequestType.stream, 3, Payload())

assert result[0].data == b'abc'


async def test_execute_request_channel(decoy: Decoy):
client = decoy.mock(cls=AwaitableRSocket)

decoy.when(await client.request_channel(Payload(), limit_rate=3)).then_return([Payload(b'abc')])

result = await execute_request(client, RequestType.channel, 3, Payload())

assert result[0].data == b'abc'


async def test_execute_request_fnf(decoy: Decoy):
client = decoy.mock(cls=AwaitableRSocket)

decoy.when(client.fire_and_forget(Payload())).then_return(create_future(None))

result = await execute_request(client, RequestType.fnf, 3, Payload())

assert result is None
23 changes: 23 additions & 0 deletions tests/rsocket/test_stream_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from asyncio import Queue

from rsocket.streams.helpers import async_generator_from_queue


async def test_async_generator_from_queue():
queue = Queue()

for i in range(10):
queue.put_nowait(i)

queue.put_nowait(None)

async def collect():
results = []
async for i in async_generator_from_queue(queue):
results.append(i)

return results

r = await collect()

assert r == list(range(10))
17 changes: 17 additions & 0 deletions tests/rx_support/test_rx_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import pytest

from rsocket.payload import Payload
from rsocket.rx_support.rx_handler import BaseRxHandler


async def test_rx_handler():
handler = BaseRxHandler()

with pytest.raises(Exception):
await handler.request_channel(Payload())

with pytest.raises(Exception):
await handler.request_response(Payload())

with pytest.raises(Exception):
await handler.request_channel(Payload())
17 changes: 17 additions & 0 deletions tests/test_reactivex/test_reactivex_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import pytest

from rsocket.payload import Payload
from rsocket.reactivex.reactivex_handler import BaseReactivexHandler


async def test_reactivex_handler():
handler = BaseReactivexHandler()

with pytest.raises(Exception):
await handler.request_channel(Payload())

with pytest.raises(Exception):
await handler.request_response(Payload())

with pytest.raises(Exception):
await handler.request_channel(Payload())