From abdca3a7416f376b56a821afd8ab0b1080056244 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Thu, 22 Sep 2022 14:33:52 -0500 Subject: [PATCH] Add endpoint-peering benchmark --- docs/endpoint-peering.md | 36 +++ .../benchmarks/endpoint_peering/__init__.py | 0 .../benchmarks/endpoint_peering/__main__.py | 6 + psbench/benchmarks/endpoint_peering/main.py | 233 ++++++++++++++++++ psbench/benchmarks/endpoint_peering/ops.py | 134 ++++++++++ requirements-dev.txt | 2 + tests/benchmarks/endpoint_peering/__init__.py | 0 .../benchmarks/endpoint_peering/main_test.py | 81 ++++++ tests/benchmarks/endpoint_peering/ops_test.py | 41 +++ 9 files changed, 533 insertions(+) create mode 100644 docs/endpoint-peering.md create mode 100644 psbench/benchmarks/endpoint_peering/__init__.py create mode 100644 psbench/benchmarks/endpoint_peering/__main__.py create mode 100644 psbench/benchmarks/endpoint_peering/main.py create mode 100644 psbench/benchmarks/endpoint_peering/ops.py create mode 100644 tests/benchmarks/endpoint_peering/__init__.py create mode 100644 tests/benchmarks/endpoint_peering/main_test.py create mode 100644 tests/benchmarks/endpoint_peering/ops_test.py diff --git a/docs/endpoint-peering.md b/docs/endpoint-peering.md new file mode 100644 index 0000000..e118835 --- /dev/null +++ b/docs/endpoint-peering.md @@ -0,0 +1,36 @@ +# ProxyStore Endpoint Peering Test + +This benchmark measures latency and bandwidth from a local endpoint to a remote. + +## Setup + +1. On the local and remote systems, create a new virtual environment. + ``` + $ virtualenv venv + $ . venv/bin/activate + ``` +2. Install the `psbench` package (must be done from root of repository). + ``` + $ pip install . + ``` +3. On the remote system, create a ProxyStore endpoint. + ``` + $ proxystore-endpoint configure psbench --server {signaling-server-address} + $ proxystore-endpoint start psbench + ``` + The returned endpoint UUID will be needed in the next step. + +## Benchmark + +The benchmark can be configured using CLI parameters. + +``` +$ python -m psbench.benchmarks.endpoint_peering \ + --remote b8aba48a-386d-4977-b5c9-9bcbbaebd0bf \ + --ops GET SET \ + --payload-sizes 1 1000 1000000 \ + --repeat 5 \ + --server {signaling-server-address} +``` + +The full list of options can be found using `--help`. diff --git a/psbench/benchmarks/endpoint_peering/__init__.py b/psbench/benchmarks/endpoint_peering/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/psbench/benchmarks/endpoint_peering/__main__.py b/psbench/benchmarks/endpoint_peering/__main__.py new file mode 100644 index 0000000..c064eb8 --- /dev/null +++ b/psbench/benchmarks/endpoint_peering/__main__.py @@ -0,0 +1,6 @@ +from __future__ import annotations + +from psbench.benchmarks.endpoint_peering.main import main + +if __name__ == '__main__': + raise SystemExit(main()) diff --git a/psbench/benchmarks/endpoint_peering/main.py b/psbench/benchmarks/endpoint_peering/main.py new file mode 100644 index 0000000..bed1827 --- /dev/null +++ b/psbench/benchmarks/endpoint_peering/main.py @@ -0,0 +1,233 @@ +"""Endpoint Peering Performance Test.""" +from __future__ import annotations + +import argparse +import asyncio +import logging +import socket +import sys +import uuid +from typing import NamedTuple +from typing import Sequence + +if sys.version_info >= (3, 8): # pragma: >3.7 cover + from typing import Literal +else: # pragma: <3.8 cover + from typing_extensions import Literal + +from proxystore.endpoint.endpoint import Endpoint + +from psbench.argparse import add_logging_options +from psbench.benchmarks.endpoint_peering.ops import test_evict +from psbench.benchmarks.endpoint_peering.ops import test_exists +from psbench.benchmarks.endpoint_peering.ops import test_get +from psbench.benchmarks.endpoint_peering.ops import test_set +from psbench.csv import CSVLogger +from psbench.logging import init_logging +from psbench.logging import TESTING_LOG_LEVEL + +OP_TYPE = Literal['EVICT', 'EXISTS', 'GET', 'SET'] + +logger = logging.getLogger('endpoint-peering') + + +class RunStats(NamedTuple): + """Stats for a given run configuration.""" + + op: OP_TYPE + payload_size_bytes: int | None + repeat: int + local_endpoint_uuid: str + remote_endpoint_uuid: str + total_time_ms: float + avg_time_ms: float + min_time_ms: float + max_time_ms: float + avg_bandwidth_mbps: float | None + + +async def run( + endpoint: Endpoint, + remote_endpoint: uuid.UUID | None, + op: OP_TYPE, + payload_size: int = 0, + repeat: int = 3, +) -> RunStats: + """Run test for single operation and measure performance. + + Args: + endpoint (Endpoint): local endpoint. + remote_endpoint (UUID): UUID of remote endpoint to peer with. + op (str): endpoint operation to test. + payload_size (int): bytes to send/receive for GET/SET operations. + repeat (int): number of times to repeat operation. If repeat is greater + than or equal to three, the slowest and fastest times will be + dropped to account for the first op being slower while establishing + a connection. + + Returns: + RunStats with summary of test run. + """ + logger.log(TESTING_LOG_LEVEL, f'starting endpoint peering test for {op}') + + if op == 'EVICT': + times_ms = await test_evict(endpoint, remote_endpoint, repeat) + elif op == 'EXISTS': + times_ms = await test_exists(endpoint, remote_endpoint, repeat) + elif op == 'GET': + times_ms = await test_get( + endpoint, + remote_endpoint, + payload_size, + repeat, + ) + elif op == 'SET': + times_ms = await test_set( + endpoint, + remote_endpoint, + payload_size, + repeat, + ) + else: + raise AssertionError(f'Unsupported operation {op}') + + if len(times_ms) >= 3: + times_ms = times_ms[1:-1] + + avg_time_s = sum(times_ms) / 1000 / len(times_ms) + payload_mb = payload_size / 1e6 + avg_bandwidth_mbps = ( + payload_mb / avg_time_s if op in ('GET', 'SET') else None + ) + + return RunStats( + op=op, + payload_size_bytes=payload_size if op in ('GET', 'SET') else None, + repeat=repeat, + local_endpoint_uuid=str(endpoint.uuid), + remote_endpoint_uuid=str(remote_endpoint), + total_time_ms=sum(times_ms), + avg_time_ms=sum(times_ms) / len(times_ms), + min_time_ms=min(times_ms), + max_time_ms=max(times_ms), + avg_bandwidth_mbps=avg_bandwidth_mbps, + ) + + +async def runner( + remote_endpoint: uuid.UUID | None, + ops: list[OP_TYPE], + *, + payload_sizes: list[int], + repeat: int, + server: str | None = None, + csv_file: str | None = None, +) -> None: + """Run matrix of test test configurations. + + Args: + remote_endpoint (UUID): remote endpoint UUID to peer with. + ops (str): endpoint operations to test. + payload_sizes (int): bytes to send/receive for GET/SET operations. + repeat (int): number of times to repeat operations. + server (str): signaling server address + csv_file (str): optional csv filepath to log results to. + """ + if csv_file is not None: + csv_logger = CSVLogger(csv_file, RunStats) + + async with Endpoint( + name=socket.gethostname(), + uuid=uuid.uuid4(), + signaling_server=server, + ) as endpoint: + for op in ops: + for payload_size in payload_sizes: + run_stats = await run( + endpoint, + remote_endpoint=remote_endpoint, + op=op, + payload_size=payload_size, + repeat=repeat, + ) + + logger.log(TESTING_LOG_LEVEL, run_stats) + if csv_file is not None: + csv_logger.log(run_stats) + + if csv_file is not None: + csv_logger.close() + logger.log(TESTING_LOG_LEVEL, f'results logged to {csv_file}') + + +def main(argv: Sequence[str] | None = None) -> int: + """Endpoint Peering test entrypoint.""" + argv = argv if argv is not None else sys.argv[1:] + + parser = argparse.ArgumentParser( + description='ProxyStore Endpoint Peering Bandwidth/Latency Test.', + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + '--remote', + required=True, + help='Remote Endpoint UUID', + ) + parser.add_argument( + '--ops', + choices=['GET', 'SET', 'EXISTS', 'EVICT'], + nargs='+', + required=True, + help='Endpoint operations to measure', + ) + parser.add_argument( + '--payload-sizes', + type=int, + nargs='+', + default=0, + help='Payload sizes for GET/SET operations', + ) + parser.add_argument( + '--server', + required=True, + help='Signaling server address for connecting to the remote endpoint', + ) + parser.add_argument( + '--repeat', + type=int, + default=10, + help='Number of times to repeat operations', + ) + parser.add_argument( + '--no-uvloop', + action='store_true', + help='Override using uvloop if available', + ) + add_logging_options(parser) + args = parser.parse_args(argv) + + init_logging(args.log_file, args.log_level, force=True) + + if not args.no_uvloop: + try: + import uvloop + + uvloop.install() + logger.info('uvloop available... using as event loop') + except ImportError: # pragma: no cover + logger.info('uvloop unavailable... using asyncio event loop') + else: + logger.info('uvloop override... using asyncio event loop') + + asyncio.run( + runner( + uuid.UUID(args.remote), + args.ops, + payload_sizes=args.payload_sizes, + repeat=args.repeat, + server=args.server, + csv_file=args.csv_file, + ), + ) + + return 0 diff --git a/psbench/benchmarks/endpoint_peering/ops.py b/psbench/benchmarks/endpoint_peering/ops.py new file mode 100644 index 0000000..0ae1d52 --- /dev/null +++ b/psbench/benchmarks/endpoint_peering/ops.py @@ -0,0 +1,134 @@ +from __future__ import annotations + +import time +import uuid + +from proxystore.endpoint.endpoint import Endpoint + +from psbench.utils import randbytes + + +async def test_evict( + endpoint: Endpoint, + target_endpoint: uuid.UUID | None, + repeat: int = 1, +) -> list[float]: + """Test endpoint eviction. + + Args: + endpoint (Endpoint): local endpoint. + target_endpoint (UUID): optional UUID of target endpoint to perform + operation on (local endpoint forwards op to target). + repeat (int): repeat the operation this many times (default: 1). + + Returns: + list of times for each operation to complete. + """ + times_ms: list[float] = [] + + for _ in range(repeat): + start = time.perf_counter_ns() + await endpoint.evict('missing-key', target_endpoint) + end = time.perf_counter_ns() + times_ms.append((end - start) / 1e6) + + return times_ms + + +async def test_exists( + endpoint: Endpoint, + target_endpoint: uuid.UUID | None, + repeat: int = 1, +) -> list[float]: + """Test endpoint key exists. + + Args: + endpoint (Endpoint): local endpoint. + target_endpoint (UUID): optional UUID of target endpoint to perform + operation on (local endpoint forwards op to target). + repeat (int): repeat the operation this many times (default: 1). + + Returns: + list of times for each operation to complete. + """ + times_ms: list[float] = [] + + for _ in range(repeat): + start = time.perf_counter_ns() + await endpoint.exists('missing-key', target_endpoint) + end = time.perf_counter_ns() + times_ms.append((end - start) / 1e6) + + return times_ms + + +async def test_get( + endpoint: Endpoint, + target_endpoint: uuid.UUID | None, + payload_size_bytes: int, + repeat: int = 1, +) -> list[float]: + """Test endpoint get data. + + Args: + endpoint (Endpoint): local endpoint. + target_endpoint (UUID): optional UUID of target endpoint to perform + operation on (local endpoint forwards op to target). + payload_size_bytes (int): size of payload to request from target + endpoint. + repeat (int): repeat the operation this many times (default: 1). + + Returns: + list of times for each operation to complete. + """ + times_ms: list[float] = [] + + data = randbytes(payload_size_bytes) + await endpoint.set('key', data, target_endpoint) + + for _ in range(repeat): + start = time.perf_counter_ns() + res = await endpoint.get('key', target_endpoint) + assert isinstance(res, bytes) + end = time.perf_counter_ns() + times_ms.append((end - start) / 1e6) + + await endpoint.evict('key') + + return times_ms + + +async def test_set( + endpoint: Endpoint, + target_endpoint: uuid.UUID | None, + payload_size_bytes: int, + repeat: int = 1, +) -> list[float]: + """Test endpoint set data. + + Args: + endpoint (Endpoint): local endpoint. + target_endpoint (UUID): optional UUID of target endpoint to perform + operation on (local endpoint forwards op to target). + payload_size_bytes (int): size of payload to request from target + endpoint. + repeat (int): repeat the operation this many times (default: 1). + + Returns: + list of times for each operation to complete. + """ + times_ms: list[float] = [] + + data = randbytes(payload_size_bytes) + + for i in range(repeat): + key = f'key-{i}' + start = time.perf_counter_ns() + await endpoint.set(key, data, target_endpoint) + end = time.perf_counter_ns() + times_ms.append((end - start) / 1e6) + + # Evict key immediately to keep memory usage low + await endpoint.evict(key) + + return times_ms diff --git a/requirements-dev.txt b/requirements-dev.txt index d3643ab..e3c217b 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,3 +1,4 @@ +asynctest; python_version < '3.8' black covdefaults>=2.2 coverage @@ -8,6 +9,7 @@ mypy pep8-naming pre-commit pytest +pytest-asyncio pytest-cov tox virtualenv diff --git a/tests/benchmarks/endpoint_peering/__init__.py b/tests/benchmarks/endpoint_peering/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/benchmarks/endpoint_peering/main_test.py b/tests/benchmarks/endpoint_peering/main_test.py new file mode 100644 index 0000000..787c336 --- /dev/null +++ b/tests/benchmarks/endpoint_peering/main_test.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import sys +import tempfile +import uuid +from unittest import mock + +if sys.version_info >= (3, 8): # pragma: >=3.8 cover + from unittest.mock import AsyncMock +else: # pragma: <3.8 cover + from asynctest import CoroutineMock as AsyncMock + +import pytest + +from psbench.benchmarks.endpoint_peering.main import main +from psbench.benchmarks.endpoint_peering.main import runner + + +@pytest.mark.asyncio +async def test_runner() -> None: + await runner( + None, + ['GET', 'SET', 'EVICT', 'EXISTS'], + payload_sizes=[100, 1000], + repeat=1, + server=None, + ) + + +@pytest.mark.asyncio +async def test_csv_logging() -> None: + with tempfile.NamedTemporaryFile() as f: + assert len(f.readlines()) == 0 + await runner( + remote_endpoint=None, + ops=['EXISTS', 'EVICT'], + payload_sizes=[1, 2, 3], + repeat=3, + server=None, + csv_file=f.name, + ) + assert len(f.readlines()) == 1 + (2 * 3) + + +def test_main() -> None: + with mock.patch( + 'psbench.benchmarks.endpoint_peering.main.runner', + AsyncMock(), + ): + assert ( + main( + [ + '--remote', + str(uuid.uuid4()), + '--ops', + 'GET', + '--payload-sizes', + '1000', + '--server', + 'wss://localhost:8765', + ], + ) + == 0 + ) + + assert ( + main( + [ + '--remote', + str(uuid.uuid4()), + '--ops', + 'GET', + '--payload-sizes', + '1000', + '--server', + 'wss://localhost:8765', + '--no-uvloop', + ], + ) + == 0 + ) diff --git a/tests/benchmarks/endpoint_peering/ops_test.py b/tests/benchmarks/endpoint_peering/ops_test.py new file mode 100644 index 0000000..807d14d --- /dev/null +++ b/tests/benchmarks/endpoint_peering/ops_test.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import uuid +from typing import AsyncGenerator + +import pytest +import pytest_asyncio +from proxystore.endpoint.endpoint import Endpoint + +from psbench.benchmarks.endpoint_peering import ops + + +@pytest_asyncio.fixture +@pytest.mark.asyncio +async def endpoint() -> AsyncGenerator[Endpoint, None]: + async with Endpoint('test-ep', uuid.uuid4()) as ep: + yield ep + + +@pytest.mark.asyncio +async def test_evict(endpoint: Endpoint) -> None: + times = await ops.test_evict(endpoint, None, 2) + assert len(times) == 2 + + +@pytest.mark.asyncio +async def test_exists(endpoint: Endpoint) -> None: + times = await ops.test_exists(endpoint, None, 2) + assert len(times) == 2 + + +@pytest.mark.asyncio +async def test_get(endpoint: Endpoint) -> None: + times = await ops.test_get(endpoint, None, 100, 2) + assert len(times) == 2 + + +@pytest.mark.asyncio +async def test_set(endpoint: Endpoint) -> None: + times = await ops.test_set(endpoint, None, 100, 2) + assert len(times) == 2