Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion performance/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import logging


def pytest_configure(config):
config.addinivalue_line("markers", "performance: marks performance tests")


def setup_logging(level=logging.DEBUG, use_file: bool = False):
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

Expand All @@ -19,4 +23,4 @@ def setup_logging(level=logging.DEBUG, use_file: bool = False):
logging.basicConfig(level=level, handlers=handlers)


setup_logging(logging.ERROR)
setup_logging(logging.DEBUG)
32 changes: 21 additions & 11 deletions performance/performance_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,28 @@
from rsocket.rsocket_client import RSocketClient
from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator
from rsocket.transports.tcp import TransportTCP
from tests.rsocket.helpers import to_json_bytes
from tests.rsocket.helpers import to_json_bytes, create_large_random_data


data_size = 1920 # * 1080 * 3
large_data = create_large_random_data(data_size)


def sample_publisher(wait_for_requester_complete: Event,
response_count: int = 3) -> Publisher:
response_count: int = 3,
data_generator=lambda index: ('Item to server from client on channel: %s' % index).encode('utf-8')
) -> Publisher:
async def generator() -> AsyncGenerator[Tuple[Fragment, bool], None]:
current_response = 0
for i in range(response_count):
is_complete = (current_response + 1) == response_count
is_complete = (i + 1) == response_count

message = 'Item to server from client on channel: %s' % current_response
yield Fragment(message.encode('utf-8')), is_complete
message = data_generator(i)
yield Payload(message), is_complete

if is_complete:
wait_for_requester_complete.set()
break

current_response += 1

return StreamFromAsyncGenerator(generator)


Expand All @@ -47,13 +50,20 @@ async def request_response(self):

return await self._client.request_response(payload)

async def large_request(self):
payload = Payload(large_data, composite(
route('large'),
authenticate_simple('user', '12345')
))

return await self._client.request_response(payload)

async def request_channel(self):
requester_completion_event = Event()
payload = Payload(b'The quick brown fox', composite(
route('channel'),
authenticate_simple('user', '12345')
))
publisher = sample_publisher(requester_completion_event)
publisher = sample_publisher(Event())

return await self._client.request_channel(payload, publisher, limit_rate=5)

Expand Down Expand Up @@ -97,7 +107,7 @@ async def __aenter__(self):
connection = await asyncio.open_connection('localhost', self._server_port)

self._client = AwaitableRSocket(RSocketClient(
single_transport_provider(TransportTCP(*connection)),
single_transport_provider(TransportTCP(*connection, read_buffer_size=data_size + 3000)),
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA)
)

Expand Down
14 changes: 12 additions & 2 deletions performance/performance_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
from rsocket.rsocket_server import RSocketServer
from rsocket.transports.tcp import TransportTCP
from performance.sample_responses import response_stream_2, response_stream_1, LoggingSubscriber
from tests.rsocket.helpers import create_large_random_data

data_size = 1920 # * 1080 * 3
large_data = create_large_random_data(data_size)

router = RequestRouter()

Expand All @@ -34,6 +38,12 @@ async def single_request_response(payload, composite_metadata):
return create_future(Payload(b'single_response'))


@router.response('large')
async def single_request_response(payload, composite_metadata):
logging.info('Got single request')
return create_future(Payload(large_data))


@router.response('last_fnf')
async def get_last_fnf():
logging.info('Got single request')
Expand Down Expand Up @@ -102,9 +112,9 @@ def handler_factory():

def client_handler_factory(on_ready=None):
def handle_client(reader, writer):
RSocketServer(TransportTCP(reader, writer),
RSocketServer(TransportTCP(reader, writer, read_buffer_size=data_size + 3000),
handler_factory=handler_factory,
fragment_size_bytes=64_000,
# fragment_size_bytes=64_000,
on_ready=on_ready
)

Expand Down
8 changes: 8 additions & 0 deletions performance/test_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from performance.performance_client import PerformanceClient
from performance.performance_server import run_server
from rsocket.rsocket_server import RSocketServer
from tests.tools.helpers import measure_time


@pytest.mark.timeout(5)
Expand All @@ -31,6 +32,13 @@ async def test_request_stream(unused_tcp_port):
assert result is not None


@pytest.mark.performance
async def test_large_request():
async with run_with_client(6565) as client:
result = await measure_time(client.large_request())
print(result.delta)


@asynccontextmanager
async def run_against_server(unused_tcp_port: int) -> PerformanceClient:
server_ready = asyncio.Event()
Expand Down
Loading