diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f827688..44dcf9e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -55,4 +55,4 @@ repos: hooks: - id: mypy # Add any additional typed package dependencies here - additional_dependencies: [types-requests, proxystore==0.4.0a1] + additional_dependencies: [types-redis, types-requests, proxystore==0.4.0a1] diff --git a/psbench/benchmarks/endpoint_peering/main.py b/psbench/benchmarks/endpoint_peering/main.py deleted file mode 100644 index bed1827..0000000 --- a/psbench/benchmarks/endpoint_peering/main.py +++ /dev/null @@ -1,233 +0,0 @@ -"""Endpoint Peering Performance Test.""" -from __future__ import annotations - -import argparse -import asyncio -import logging -import socket -import sys -import uuid -from typing import NamedTuple -from typing import Sequence - -if sys.version_info >= (3, 8): # pragma: >3.7 cover - from typing import Literal -else: # pragma: <3.8 cover - from typing_extensions import Literal - -from proxystore.endpoint.endpoint import Endpoint - -from psbench.argparse import add_logging_options -from psbench.benchmarks.endpoint_peering.ops import test_evict -from psbench.benchmarks.endpoint_peering.ops import test_exists -from psbench.benchmarks.endpoint_peering.ops import test_get -from psbench.benchmarks.endpoint_peering.ops import test_set -from psbench.csv import CSVLogger -from psbench.logging import init_logging -from psbench.logging import TESTING_LOG_LEVEL - -OP_TYPE = Literal['EVICT', 'EXISTS', 'GET', 'SET'] - -logger = logging.getLogger('endpoint-peering') - - -class RunStats(NamedTuple): - """Stats for a given run configuration.""" - - op: OP_TYPE - payload_size_bytes: int | None - repeat: int - local_endpoint_uuid: str - remote_endpoint_uuid: str - total_time_ms: float - avg_time_ms: float - min_time_ms: float - max_time_ms: float - avg_bandwidth_mbps: float | None - - -async def run( - endpoint: Endpoint, - remote_endpoint: uuid.UUID | None, - op: OP_TYPE, - payload_size: int = 0, - repeat: int = 3, -) -> RunStats: - """Run test for single operation and measure performance. - - Args: - endpoint (Endpoint): local endpoint. - remote_endpoint (UUID): UUID of remote endpoint to peer with. - op (str): endpoint operation to test. - payload_size (int): bytes to send/receive for GET/SET operations. - repeat (int): number of times to repeat operation. If repeat is greater - than or equal to three, the slowest and fastest times will be - dropped to account for the first op being slower while establishing - a connection. - - Returns: - RunStats with summary of test run. - """ - logger.log(TESTING_LOG_LEVEL, f'starting endpoint peering test for {op}') - - if op == 'EVICT': - times_ms = await test_evict(endpoint, remote_endpoint, repeat) - elif op == 'EXISTS': - times_ms = await test_exists(endpoint, remote_endpoint, repeat) - elif op == 'GET': - times_ms = await test_get( - endpoint, - remote_endpoint, - payload_size, - repeat, - ) - elif op == 'SET': - times_ms = await test_set( - endpoint, - remote_endpoint, - payload_size, - repeat, - ) - else: - raise AssertionError(f'Unsupported operation {op}') - - if len(times_ms) >= 3: - times_ms = times_ms[1:-1] - - avg_time_s = sum(times_ms) / 1000 / len(times_ms) - payload_mb = payload_size / 1e6 - avg_bandwidth_mbps = ( - payload_mb / avg_time_s if op in ('GET', 'SET') else None - ) - - return RunStats( - op=op, - payload_size_bytes=payload_size if op in ('GET', 'SET') else None, - repeat=repeat, - local_endpoint_uuid=str(endpoint.uuid), - remote_endpoint_uuid=str(remote_endpoint), - total_time_ms=sum(times_ms), - avg_time_ms=sum(times_ms) / len(times_ms), - min_time_ms=min(times_ms), - max_time_ms=max(times_ms), - avg_bandwidth_mbps=avg_bandwidth_mbps, - ) - - -async def runner( - remote_endpoint: uuid.UUID | None, - ops: list[OP_TYPE], - *, - payload_sizes: list[int], - repeat: int, - server: str | None = None, - csv_file: str | None = None, -) -> None: - """Run matrix of test test configurations. - - Args: - remote_endpoint (UUID): remote endpoint UUID to peer with. - ops (str): endpoint operations to test. - payload_sizes (int): bytes to send/receive for GET/SET operations. - repeat (int): number of times to repeat operations. - server (str): signaling server address - csv_file (str): optional csv filepath to log results to. - """ - if csv_file is not None: - csv_logger = CSVLogger(csv_file, RunStats) - - async with Endpoint( - name=socket.gethostname(), - uuid=uuid.uuid4(), - signaling_server=server, - ) as endpoint: - for op in ops: - for payload_size in payload_sizes: - run_stats = await run( - endpoint, - remote_endpoint=remote_endpoint, - op=op, - payload_size=payload_size, - repeat=repeat, - ) - - logger.log(TESTING_LOG_LEVEL, run_stats) - if csv_file is not None: - csv_logger.log(run_stats) - - if csv_file is not None: - csv_logger.close() - logger.log(TESTING_LOG_LEVEL, f'results logged to {csv_file}') - - -def main(argv: Sequence[str] | None = None) -> int: - """Endpoint Peering test entrypoint.""" - argv = argv if argv is not None else sys.argv[1:] - - parser = argparse.ArgumentParser( - description='ProxyStore Endpoint Peering Bandwidth/Latency Test.', - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - ) - parser.add_argument( - '--remote', - required=True, - help='Remote Endpoint UUID', - ) - parser.add_argument( - '--ops', - choices=['GET', 'SET', 'EXISTS', 'EVICT'], - nargs='+', - required=True, - help='Endpoint operations to measure', - ) - parser.add_argument( - '--payload-sizes', - type=int, - nargs='+', - default=0, - help='Payload sizes for GET/SET operations', - ) - parser.add_argument( - '--server', - required=True, - help='Signaling server address for connecting to the remote endpoint', - ) - parser.add_argument( - '--repeat', - type=int, - default=10, - help='Number of times to repeat operations', - ) - parser.add_argument( - '--no-uvloop', - action='store_true', - help='Override using uvloop if available', - ) - add_logging_options(parser) - args = parser.parse_args(argv) - - init_logging(args.log_file, args.log_level, force=True) - - if not args.no_uvloop: - try: - import uvloop - - uvloop.install() - logger.info('uvloop available... using as event loop') - except ImportError: # pragma: no cover - logger.info('uvloop unavailable... using asyncio event loop') - else: - logger.info('uvloop override... using asyncio event loop') - - asyncio.run( - runner( - uuid.UUID(args.remote), - args.ops, - payload_sizes=args.payload_sizes, - repeat=args.repeat, - server=args.server, - csv_file=args.csv_file, - ), - ) - - return 0 diff --git a/psbench/benchmarks/endpoint_peering/__init__.py b/psbench/benchmarks/remote_ops/__init__.py similarity index 100% rename from psbench/benchmarks/endpoint_peering/__init__.py rename to psbench/benchmarks/remote_ops/__init__.py diff --git a/psbench/benchmarks/endpoint_peering/__main__.py b/psbench/benchmarks/remote_ops/__main__.py similarity index 61% rename from psbench/benchmarks/endpoint_peering/__main__.py rename to psbench/benchmarks/remote_ops/__main__.py index c064eb8..f51049c 100644 --- a/psbench/benchmarks/endpoint_peering/__main__.py +++ b/psbench/benchmarks/remote_ops/__main__.py @@ -1,6 +1,6 @@ from __future__ import annotations -from psbench.benchmarks.endpoint_peering.main import main +from psbench.benchmarks.remote_ops.main import main if __name__ == '__main__': raise SystemExit(main()) diff --git a/psbench/benchmarks/endpoint_peering/ops.py b/psbench/benchmarks/remote_ops/endpoint_ops.py similarity index 100% rename from psbench/benchmarks/endpoint_peering/ops.py rename to psbench/benchmarks/remote_ops/endpoint_ops.py diff --git a/psbench/benchmarks/remote_ops/main.py b/psbench/benchmarks/remote_ops/main.py new file mode 100644 index 0000000..ea37762 --- /dev/null +++ b/psbench/benchmarks/remote_ops/main.py @@ -0,0 +1,371 @@ +"""Remote Operation Performance Test. + +Provides comparisons between remote operations with endpoints +and Redis servers. +""" +from __future__ import annotations + +import argparse +import asyncio +import logging +import socket +import sys +import uuid +from typing import Any +from typing import NamedTuple +from typing import Sequence + +if sys.version_info >= (3, 8): # pragma: >3.7 cover + from typing import Literal +else: # pragma: <3.8 cover + from typing_extensions import Literal + +import redis +from proxystore.endpoint.endpoint import Endpoint + +from psbench.argparse import add_logging_options +import psbench.benchmarks.remote_ops.endpoint_ops as endpoint_ops +import psbench.benchmarks.remote_ops.redis_ops as redis_ops +from psbench.csv import CSVLogger +from psbench.logging import init_logging +from psbench.logging import TESTING_LOG_LEVEL + +BACKEND_TYPE = Literal['ENDPOINT', 'REDIS'] +OP_TYPE = Literal['EVICT', 'EXISTS', 'GET', 'SET'] + +logger = logging.getLogger('remote-ops') + + +class RunStats(NamedTuple): + """Stats for a given run configuration.""" + + backend: BACKEND_TYPE + op: OP_TYPE + payload_size_bytes: int | None + repeat: int + total_time_ms: float + avg_time_ms: float + min_time_ms: float + max_time_ms: float + avg_bandwidth_mbps: float | None + + +async def run_endpoint( + endpoint: Endpoint, + remote_endpoint: uuid.UUID | None, + op: OP_TYPE, + payload_size: int = 0, + repeat: int = 3, +) -> RunStats: + """Run test for single operation and measure performance. + + Args: + endpoint (Endpoint): local endpoint. + remote_endpoint (UUID): UUID of remote endpoint to peer with. + op (str): endpoint operation to test. + payload_size (int): bytes to send/receive for GET/SET operations. + repeat (int): number of times to repeat operation. If repeat is greater + than or equal to three, the slowest and fastest times will be + dropped to account for the first op being slower while establishing + a connection. + + Returns: + RunStats with summary of test run. + """ + logger.log(TESTING_LOG_LEVEL, f'starting endpoint peering test for {op}') + + if op == 'EVICT': + times_ms = await endpoint_ops.test_evict( + endpoint, + remote_endpoint, + repeat, + ) + elif op == 'EXISTS': + times_ms = await endpoint_ops.test_exists( + endpoint, + remote_endpoint, + repeat, + ) + elif op == 'GET': + times_ms = await endpoint_ops.test_get( + endpoint, + remote_endpoint, + payload_size, + repeat, + ) + elif op == 'SET': + times_ms = await endpoint_ops.test_set( + endpoint, + remote_endpoint, + payload_size, + repeat, + ) + else: + raise AssertionError(f'Unsupported operation {op}') + + if len(times_ms) >= 3: + times_ms = times_ms[1:-1] + + avg_time_s = sum(times_ms) / 1000 / len(times_ms) + payload_mb = payload_size / 1e6 + avg_bandwidth_mbps = ( + payload_mb / avg_time_s if op in ('GET', 'SET') else None + ) + + return RunStats( + backend='ENDPOINT', + op=op, + payload_size_bytes=payload_size if op in ('GET', 'SET') else None, + repeat=repeat, + total_time_ms=sum(times_ms), + avg_time_ms=sum(times_ms) / len(times_ms), + min_time_ms=min(times_ms), + max_time_ms=max(times_ms), + avg_bandwidth_mbps=avg_bandwidth_mbps, + ) + + +def run_redis( + client: redis.StrictRedis[Any], + op: OP_TYPE, + payload_size: int = 0, + repeat: int = 3, +) -> RunStats: + """Run test for single operation and measure performance. + + Args: + client (StrictRedis): Redis client connected to remote server. + op (str): endpoint operation to test. + payload_size (int): bytes to send/receive for GET/SET operations. + repeat (int): number of times to repeat operation. If repeat is greater + than or equal to three, the slowest and fastest times will be + dropped to account for the first op being slower while establishing + a connection. + + Returns: + RunStats with summary of test run. + """ + logger.log(TESTING_LOG_LEVEL, f'starting remote redis test for {op}') + + if op == 'EVICT': + times_ms = redis_ops.test_evict(client, repeat) + elif op == 'EXISTS': + times_ms = redis_ops.test_exists(client, repeat) + elif op == 'GET': + times_ms = redis_ops.test_get(client, payload_size, repeat) + elif op == 'SET': + times_ms = redis_ops.test_set(client, payload_size, repeat) + else: + raise AssertionError(f'Unsupported operation {op}') + + if len(times_ms) >= 3: + times_ms = times_ms[1:-1] + + avg_time_s = sum(times_ms) / 1000 / len(times_ms) + payload_mb = payload_size / 1e6 + avg_bandwidth_mbps = ( + payload_mb / avg_time_s if op in ('GET', 'SET') else None + ) + + return RunStats( + backend='REDIS', + op=op, + payload_size_bytes=payload_size if op in ('GET', 'SET') else None, + repeat=repeat, + total_time_ms=sum(times_ms), + avg_time_ms=sum(times_ms) / len(times_ms), + min_time_ms=min(times_ms), + max_time_ms=max(times_ms), + avg_bandwidth_mbps=avg_bandwidth_mbps, + ) + + +async def runner_endpoint( + remote_endpoint: uuid.UUID | None, + ops: list[OP_TYPE], + *, + payload_sizes: list[int], + repeat: int, + server: str | None = None, + csv_file: str | None = None, +) -> None: + """Run matrix of test test configurations with an Endpoint. + + Args: + remote_endpoint (UUID): remote endpoint UUID to peer with. + ops (str): endpoint operations to test. + payload_sizes (int): bytes to send/receive for GET/SET operations. + repeat (int): number of times to repeat operations. + server (str): signaling server address + csv_file (str): optional csv filepath to log results to. + """ + if csv_file is not None: + csv_logger = CSVLogger(csv_file, RunStats) + + async with Endpoint( + name=socket.gethostname(), + uuid=uuid.uuid4(), + signaling_server=server, + ) as endpoint: + for op in ops: + for i, payload_size in enumerate(payload_sizes): + # Only need to repeat for payload_size for GET/SET + if i == 0 or op in ['GET', 'SET']: + run_stats = await run_endpoint( + endpoint, + remote_endpoint=remote_endpoint, + op=op, + payload_size=payload_size, + repeat=repeat, + ) + + logger.log(TESTING_LOG_LEVEL, run_stats) + if csv_file is not None: + csv_logger.log(run_stats) + + if csv_file is not None: + csv_logger.close() + logger.log(TESTING_LOG_LEVEL, f'results logged to {csv_file}') + + +def runner_redis( + host: str, + port: int, + ops: list[OP_TYPE], + *, + payload_sizes: list[int], + repeat: int, + csv_file: str | None = None, +) -> None: + """Run matrix of test test configurations with a Redis server. + + Args: + host (str): remote Redis server hostname/IP. + port (int): remote Redis server port. + ops (str): endpoint operations to test. + payload_sizes (int): bytes to send/receive for GET/SET operations. + repeat (int): number of times to repeat operations. + csv_file (str): optional csv filepath to log results to. + """ + if csv_file is not None: + csv_logger = CSVLogger(csv_file, RunStats) + + client = redis.StrictRedis(host=host, port=port) + for op in ops: + for i, payload_size in enumerate(payload_sizes): + # Only need to repeat for payload_size for GET/SET + if i == 0 or op in ['GET', 'SET']: + run_stats = run_redis( + client, + op=op, + payload_size=payload_size, + repeat=repeat, + ) + + logger.log(TESTING_LOG_LEVEL, run_stats) + if csv_file is not None: + csv_logger.log(run_stats) + + if csv_file is not None: + csv_logger.close() + logger.log(TESTING_LOG_LEVEL, f'results logged to {csv_file}') + + +def main(argv: Sequence[str] | None = None) -> int: + """Remote ops test entrypoint.""" + argv = argv if argv is not None else sys.argv[1:] + + parser = argparse.ArgumentParser( + description='Remote ops performance test.', + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + 'backend', + choices=['ENDPOINT', 'REDIS'], + help='Remote objects store backend to test', + ) + parser.add_argument( + '--endpoint', + required='ENDPOINT' in sys.argv, + help='Remote Endpoint UUID', + ) + parser.add_argument( + '--redis-host', + required='REDIS' in sys.argv, + help='Redis server hostname/IP', + ) + parser.add_argument( + '--redis-port', + required='REDIS' in sys.argv, + help='Redis server port', + ) + parser.add_argument( + '--ops', + choices=['GET', 'SET', 'EXISTS', 'EVICT'], + nargs='+', + required=True, + help='Endpoint operations to measure', + ) + parser.add_argument( + '--payload-sizes', + type=int, + nargs='+', + default=0, + help='Payload sizes for GET/SET operations', + ) + parser.add_argument( + '--server', + required='ENDPOINT' in sys.argv, + help='Signaling server address for connecting to the remote endpoint', + ) + parser.add_argument( + '--repeat', + type=int, + default=10, + help='Number of times to repeat operations', + ) + parser.add_argument( + '--no-uvloop', + action='store_true', + help='Override using uvloop if available (for ENDPOINT backend only)', + ) + add_logging_options(parser) + args = parser.parse_args(argv) + + init_logging(args.log_file, args.log_level, force=True) + + if args.backend == 'ENDPOINT': + if not args.no_uvloop: + try: + import uvloop + + uvloop.install() + logger.info('uvloop available... using as event loop') + except ImportError: # pragma: no cover + logger.info('uvloop unavailable... using asyncio event loop') + else: + logger.info('uvloop override... using asyncio event loop') + + asyncio.run( + runner_endpoint( + uuid.UUID(args.endpoint), + args.ops, + payload_sizes=args.payload_sizes, + repeat=args.repeat, + server=args.server, + csv_file=args.csv_file, + ), + ) + elif args.backend == 'REDIS': + runner_redis( + args.redis_host, + args.redis_port, + args.ops, + payload_sizes=args.payload_sizes, + repeat=args.repeat, + csv_file=args.csv_file, + ) + else: + raise AssertionError('Unreachable.') + + return 0 diff --git a/psbench/benchmarks/remote_ops/redis_ops.py b/psbench/benchmarks/remote_ops/redis_ops.py new file mode 100644 index 0000000..6c1a504 --- /dev/null +++ b/psbench/benchmarks/remote_ops/redis_ops.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +import time +from typing import Any + +import redis + +from psbench.utils import randbytes + + +def test_evict( + client: redis.StrictRedis[Any], + repeat: int = 1, +) -> list[float]: + """Test Redis eviction. + + Args: + client (StrictRedis): client connection to remote Redis server. + repeat (int): repeat the operation this many times (default: 1). + + Returns: + list of times for each operation to complete. + """ + times_ms: list[float] = [] + + for _ in range(repeat): + start = time.perf_counter_ns() + client.delete('missing-key') + end = time.perf_counter_ns() + times_ms.append((end - start) / 1e6) + + return times_ms + + +def test_exists( + client: redis.StrictRedis[Any], + repeat: int = 1, +) -> list[float]: + """Test Redis key exists. + + Args: + client (StrictRedis): client connection to remote Redis server. + repeat (int): repeat the operation this many times (default: 1). + + Returns: + list of times for each operation to complete. + """ + times_ms: list[float] = [] + + for _ in range(repeat): + start = time.perf_counter_ns() + client.exists('missing-key') + end = time.perf_counter_ns() + times_ms.append((end - start) / 1e6) + + return times_ms + + +def test_get( + client: redis.StrictRedis[Any], + payload_size_bytes: int, + repeat: int = 1, +) -> list[float]: + """Test Redis get data. + + Args: + client (StrictRedis): client connection to remote Redis server. + payload_size_bytes (int): size of payload to request from target + endpoint. + repeat (int): repeat the operation this many times (default: 1). + + Returns: + list of times for each operation to complete. + """ + times_ms: list[float] = [] + + data = randbytes(payload_size_bytes) + client.set('key', data) + + for _ in range(repeat): + start = time.perf_counter_ns() + res = client.get('key') + assert isinstance(res, bytes) + end = time.perf_counter_ns() + times_ms.append((end - start) / 1e6) + + client.delete('key') + + return times_ms + + +def test_set( + client: redis.StrictRedis[Any], + payload_size_bytes: int, + repeat: int = 1, +) -> list[float]: + """Test Redis set data. + + Args: + client (StrictRedis): client connection to remote Redis server. + payload_size_bytes (int): size of payload to request from target + endpoint. + repeat (int): repeat the operation this many times (default: 1). + + Returns: + list of times for each operation to complete. + """ + times_ms: list[float] = [] + + data = randbytes(payload_size_bytes) + + for i in range(repeat): + key = f'key-{i}' + start = time.perf_counter_ns() + client.set(key, data) + end = time.perf_counter_ns() + times_ms.append((end - start) / 1e6) + + # Evict key immediately to keep memory usage low + client.delete(key) + + return times_ms diff --git a/setup.cfg b/setup.cfg index 4d73c86..4d57414 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,6 +21,7 @@ install_requires = funcx>=1.0.3 funcx-endpoint>=1.0.3 proxystore[endpoints]==0.4.0a2 + redis>=3.4 requests python_requires = >=3.7 include_package_data = True diff --git a/testing/mocking.py b/testing/mocking.py new file mode 100644 index 0000000..840e8c6 --- /dev/null +++ b/testing/mocking.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from typing import Any + + +class MockStrictRedis: + """Mock StrictRedis.""" + + def __init__(self, *args, **kwargs): + """Init MockStrictRedis.""" + self.data = {} + + def delete(self, key: str) -> None: + """Delete key.""" + if key in self.data: + del self.data[key] + + def exists(self, key: str) -> bool: + """Check if key exists.""" + return key in self.data + + def get(self, key: str) -> Any: + """Get value with key.""" + if key in self.data: + return self.data[key] + return None # pragma: no cover + + def set(self, key: str, value: str | bytes | int | float) -> None: + """Set value with key.""" + self.data[key] = value diff --git a/tests/benchmarks/endpoint_peering/main_test.py b/tests/benchmarks/endpoint_peering/main_test.py deleted file mode 100644 index 787c336..0000000 --- a/tests/benchmarks/endpoint_peering/main_test.py +++ /dev/null @@ -1,81 +0,0 @@ -from __future__ import annotations - -import sys -import tempfile -import uuid -from unittest import mock - -if sys.version_info >= (3, 8): # pragma: >=3.8 cover - from unittest.mock import AsyncMock -else: # pragma: <3.8 cover - from asynctest import CoroutineMock as AsyncMock - -import pytest - -from psbench.benchmarks.endpoint_peering.main import main -from psbench.benchmarks.endpoint_peering.main import runner - - -@pytest.mark.asyncio -async def test_runner() -> None: - await runner( - None, - ['GET', 'SET', 'EVICT', 'EXISTS'], - payload_sizes=[100, 1000], - repeat=1, - server=None, - ) - - -@pytest.mark.asyncio -async def test_csv_logging() -> None: - with tempfile.NamedTemporaryFile() as f: - assert len(f.readlines()) == 0 - await runner( - remote_endpoint=None, - ops=['EXISTS', 'EVICT'], - payload_sizes=[1, 2, 3], - repeat=3, - server=None, - csv_file=f.name, - ) - assert len(f.readlines()) == 1 + (2 * 3) - - -def test_main() -> None: - with mock.patch( - 'psbench.benchmarks.endpoint_peering.main.runner', - AsyncMock(), - ): - assert ( - main( - [ - '--remote', - str(uuid.uuid4()), - '--ops', - 'GET', - '--payload-sizes', - '1000', - '--server', - 'wss://localhost:8765', - ], - ) - == 0 - ) - - assert ( - main( - [ - '--remote', - str(uuid.uuid4()), - '--ops', - 'GET', - '--payload-sizes', - '1000', - '--server', - 'wss://localhost:8765', - '--no-uvloop', - ], - ) - == 0 - ) diff --git a/tests/benchmarks/endpoint_peering/__init__.py b/tests/benchmarks/remote_ops/__init__.py similarity index 100% rename from tests/benchmarks/endpoint_peering/__init__.py rename to tests/benchmarks/remote_ops/__init__.py diff --git a/tests/benchmarks/endpoint_peering/ops_test.py b/tests/benchmarks/remote_ops/endpoint_ops_test.py similarity index 94% rename from tests/benchmarks/endpoint_peering/ops_test.py rename to tests/benchmarks/remote_ops/endpoint_ops_test.py index 807d14d..bb88ee4 100644 --- a/tests/benchmarks/endpoint_peering/ops_test.py +++ b/tests/benchmarks/remote_ops/endpoint_ops_test.py @@ -7,7 +7,7 @@ import pytest_asyncio from proxystore.endpoint.endpoint import Endpoint -from psbench.benchmarks.endpoint_peering import ops +import psbench.benchmarks.remote_ops.endpoint_ops as ops @pytest_asyncio.fixture diff --git a/tests/benchmarks/remote_ops/main_test.py b/tests/benchmarks/remote_ops/main_test.py new file mode 100644 index 0000000..ecf95e4 --- /dev/null +++ b/tests/benchmarks/remote_ops/main_test.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +import sys +import tempfile +import uuid +from unittest import mock + +if sys.version_info >= (3, 8): # pragma: >=3.8 cover + from unittest.mock import AsyncMock +else: # pragma: <3.8 cover + from asynctest import CoroutineMock as AsyncMock + +import pytest + +from testing.mocking import MockStrictRedis +from psbench.benchmarks.remote_ops.main import main +from psbench.benchmarks.remote_ops.main import runner_endpoint +from psbench.benchmarks.remote_ops.main import runner_redis + + +@pytest.mark.asyncio +async def test_runner_endpoint() -> None: + await runner_endpoint( + None, + ['GET', 'SET', 'EVICT', 'EXISTS'], + payload_sizes=[100, 1000], + repeat=1, + server=None, + ) + + +def test_runner_redis() -> None: + with mock.patch('redis.StrictRedis', side_effect=MockStrictRedis): + runner_redis( + 'localhost', + 1234, + ['GET', 'SET', 'EVICT', 'EXISTS'], + payload_sizes=[100, 1000], + repeat=1, + ) + + +@pytest.mark.asyncio +async def test_csv_logging_endpoint() -> None: + with tempfile.NamedTemporaryFile() as f: + assert len(f.readlines()) == 0 + await runner_endpoint( + remote_endpoint=None, + ops=['EXISTS', 'EVICT', 'GET'], + payload_sizes=[1, 2, 3], + repeat=3, + server=None, + csv_file=f.name, + ) + # 1 for header, 1 for exists, 1 for evict, 3 for get (3 payload sizes) + assert len(f.readlines()) == 1 + 2 + 3 + + +def test_csv_logging_redis() -> None: + with tempfile.NamedTemporaryFile() as f: + assert len(f.readlines()) == 0 + with mock.patch('redis.StrictRedis', side_effect=MockStrictRedis): + runner_redis( + 'localhost', + 1234, + ops=['EXISTS', 'EVICT', 'GET'], + payload_sizes=[1, 2, 3], + repeat=3, + csv_file=f.name, + ) + assert len(f.readlines()) == 1 + 2 + 3 + + +def test_main() -> None: + with mock.patch( + 'psbench.benchmarks.remote_ops.main.runner_endpoint', + AsyncMock(), + ), mock.patch('psbench.benchmarks.remote_ops.main.runner_redis'): + assert ( + main( + [ + 'ENDPOINT', + '--endpoint', + str(uuid.uuid4()), + '--ops', + 'GET', + '--payload-sizes', + '1000', + '--server', + 'wss://localhost:8765', + ], + ) + == 0 + ) + + assert ( + main( + [ + 'ENDPOINT', + '--endpoint', + str(uuid.uuid4()), + '--ops', + 'GET', + '--payload-sizes', + '1000', + '--server', + 'wss://localhost:8765', + '--no-uvloop', + ], + ) + == 0 + ) + + assert ( + main( + [ + 'REDIS', + '--redis-host', + 'localhost', + '--redis-port', + '1234', + '--ops', + 'GET', + '--payload-sizes', + '1000', + ], + ) + == 0 + ) diff --git a/tests/benchmarks/remote_ops/redis_ops_test.py b/tests/benchmarks/remote_ops/redis_ops_test.py new file mode 100644 index 0000000..4dba4e4 --- /dev/null +++ b/tests/benchmarks/remote_ops/redis_ops_test.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from typing import Any +from typing import Generator + +import pytest +import redis + +import psbench.benchmarks.remote_ops.redis_ops as ops +from testing.mocking import MockStrictRedis + + +@pytest.fixture +def client() -> Generator[redis.StrictRedis[Any], None, None]: + yield MockStrictRedis() # type: ignore + + +def test_evict(client: redis.StrictRedis[Any]) -> None: + times = ops.test_evict(client, 2) + assert len(times) == 2 + + +def test_exists(client: redis.StrictRedis[Any]) -> None: + times = ops.test_exists(client, 2) + assert len(times) == 2 + + +def test_get(client: redis.StrictRedis[Any]) -> None: + times = ops.test_get(client, 100, 2) + assert len(times) == 2 + + +def test_set(client: redis.StrictRedis[Any]) -> None: + times = ops.test_set(client, 100, 2) + assert len(times) == 2 diff --git a/tox.ini b/tox.ini index d1bf5d2..95a1aab 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py37, py38, py39, py310, pre-commit, docs +envlist = py37, py38, py39, py310, pre-commit [testenv] deps = -rrequirements-dev.txt