From 8a9f99a312ffcbe1bee680ee8da4d6b76ef1c0d9 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Thu, 29 Sep 2022 12:30:58 -0500 Subject: [PATCH 1/4] Rename endpoint_peering to remote_ops --- .../__init__.py | 0 .../__main__.py | 2 +- .../{endpoint_peering => remote_ops}/main.py | 18 +++++++++++------- .../{endpoint_peering => remote_ops}/ops.py | 0 .../__init__.py | 0 .../main_test.py | 6 +++--- .../ops_test.py | 2 +- 7 files changed, 16 insertions(+), 12 deletions(-) rename psbench/benchmarks/{endpoint_peering => remote_ops}/__init__.py (100%) rename psbench/benchmarks/{endpoint_peering => remote_ops}/__main__.py (61%) rename psbench/benchmarks/{endpoint_peering => remote_ops}/main.py (93%) rename psbench/benchmarks/{endpoint_peering => remote_ops}/ops.py (100%) rename tests/benchmarks/{endpoint_peering => remote_ops}/__init__.py (100%) rename tests/benchmarks/{endpoint_peering => remote_ops}/main_test.py (91%) rename tests/benchmarks/{endpoint_peering => remote_ops}/ops_test.py (94%) diff --git a/psbench/benchmarks/endpoint_peering/__init__.py b/psbench/benchmarks/remote_ops/__init__.py similarity index 100% rename from psbench/benchmarks/endpoint_peering/__init__.py rename to psbench/benchmarks/remote_ops/__init__.py diff --git a/psbench/benchmarks/endpoint_peering/__main__.py b/psbench/benchmarks/remote_ops/__main__.py similarity index 61% rename from psbench/benchmarks/endpoint_peering/__main__.py rename to psbench/benchmarks/remote_ops/__main__.py index c064eb8..f51049c 100644 --- a/psbench/benchmarks/endpoint_peering/__main__.py +++ b/psbench/benchmarks/remote_ops/__main__.py @@ -1,6 +1,6 @@ from __future__ import annotations -from psbench.benchmarks.endpoint_peering.main import main +from psbench.benchmarks.remote_ops.main import main if __name__ == '__main__': raise SystemExit(main()) diff --git a/psbench/benchmarks/endpoint_peering/main.py b/psbench/benchmarks/remote_ops/main.py similarity index 93% rename from psbench/benchmarks/endpoint_peering/main.py rename to psbench/benchmarks/remote_ops/main.py index bed1827..18c487f 100644 --- a/psbench/benchmarks/endpoint_peering/main.py +++ b/psbench/benchmarks/remote_ops/main.py @@ -1,4 +1,8 @@ -"""Endpoint Peering Performance Test.""" +"""Remote Operation Performance Test. + +Provides comparisons between remote operations with endpoints +and Redis servers. +""" from __future__ import annotations import argparse @@ -18,10 +22,10 @@ from proxystore.endpoint.endpoint import Endpoint from psbench.argparse import add_logging_options -from psbench.benchmarks.endpoint_peering.ops import test_evict -from psbench.benchmarks.endpoint_peering.ops import test_exists -from psbench.benchmarks.endpoint_peering.ops import test_get -from psbench.benchmarks.endpoint_peering.ops import test_set +from psbench.benchmarks.remote_ops.ops import test_evict +from psbench.benchmarks.remote_ops.ops import test_exists +from psbench.benchmarks.remote_ops.ops import test_get +from psbench.benchmarks.remote_ops.ops import test_set from psbench.csv import CSVLogger from psbench.logging import init_logging from psbench.logging import TESTING_LOG_LEVEL @@ -161,11 +165,11 @@ async def runner( def main(argv: Sequence[str] | None = None) -> int: - """Endpoint Peering test entrypoint.""" + """Remote ops test entrypoint.""" argv = argv if argv is not None else sys.argv[1:] parser = argparse.ArgumentParser( - description='ProxyStore Endpoint Peering Bandwidth/Latency Test.', + description='Remote ops performance test.', formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( diff --git a/psbench/benchmarks/endpoint_peering/ops.py b/psbench/benchmarks/remote_ops/ops.py similarity index 100% rename from psbench/benchmarks/endpoint_peering/ops.py rename to psbench/benchmarks/remote_ops/ops.py diff --git a/tests/benchmarks/endpoint_peering/__init__.py b/tests/benchmarks/remote_ops/__init__.py similarity index 100% rename from tests/benchmarks/endpoint_peering/__init__.py rename to tests/benchmarks/remote_ops/__init__.py diff --git a/tests/benchmarks/endpoint_peering/main_test.py b/tests/benchmarks/remote_ops/main_test.py similarity index 91% rename from tests/benchmarks/endpoint_peering/main_test.py rename to tests/benchmarks/remote_ops/main_test.py index 787c336..d14d4fc 100644 --- a/tests/benchmarks/endpoint_peering/main_test.py +++ b/tests/benchmarks/remote_ops/main_test.py @@ -12,8 +12,8 @@ import pytest -from psbench.benchmarks.endpoint_peering.main import main -from psbench.benchmarks.endpoint_peering.main import runner +from psbench.benchmarks.remote_ops.main import main +from psbench.benchmarks.remote_ops.main import runner @pytest.mark.asyncio @@ -44,7 +44,7 @@ async def test_csv_logging() -> None: def test_main() -> None: with mock.patch( - 'psbench.benchmarks.endpoint_peering.main.runner', + 'psbench.benchmarks.remote_ops.main.runner', AsyncMock(), ): assert ( diff --git a/tests/benchmarks/endpoint_peering/ops_test.py b/tests/benchmarks/remote_ops/ops_test.py similarity index 94% rename from tests/benchmarks/endpoint_peering/ops_test.py rename to tests/benchmarks/remote_ops/ops_test.py index 807d14d..94e201f 100644 --- a/tests/benchmarks/endpoint_peering/ops_test.py +++ b/tests/benchmarks/remote_ops/ops_test.py @@ -7,7 +7,7 @@ import pytest_asyncio from proxystore.endpoint.endpoint import Endpoint -from psbench.benchmarks.endpoint_peering import ops +from psbench.benchmarks.remote_ops import ops @pytest_asyncio.fixture From 5f435c7af890111c61d600f044e477d7c1a01a84 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Thu, 29 Sep 2022 12:42:03 -0500 Subject: [PATCH 2/4] Framework for multiple backends in remote_ops --- psbench/benchmarks/remote_ops/main.py | 74 +++++++++++++++--------- tests/benchmarks/remote_ops/main_test.py | 18 +++--- 2 files changed, 57 insertions(+), 35 deletions(-) diff --git a/psbench/benchmarks/remote_ops/main.py b/psbench/benchmarks/remote_ops/main.py index 18c487f..37d57ab 100644 --- a/psbench/benchmarks/remote_ops/main.py +++ b/psbench/benchmarks/remote_ops/main.py @@ -50,7 +50,7 @@ class RunStats(NamedTuple): avg_bandwidth_mbps: float | None -async def run( +async def run_endpoint( endpoint: Endpoint, remote_endpoint: uuid.UUID | None, op: OP_TYPE, @@ -118,7 +118,7 @@ async def run( ) -async def runner( +async def runner_endpoint( remote_endpoint: uuid.UUID | None, ops: list[OP_TYPE], *, @@ -147,7 +147,7 @@ async def runner( ) as endpoint: for op in ops: for payload_size in payload_sizes: - run_stats = await run( + run_stats = await run_endpoint( endpoint, remote_endpoint=remote_endpoint, op=op, @@ -173,10 +173,25 @@ def main(argv: Sequence[str] | None = None) -> int: formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( - '--remote', - required=True, + 'backend', + choices=['ENDPOINT', 'REDIS'], + help='Remote objects store backend to test', + ) + parser.add_argument( + '--endpoint', + required='ENDPOINT' in sys.argv, help='Remote Endpoint UUID', ) + parser.add_argument( + '--redis-host', + required='REDIS' in sys.argv, + help='Redis server hostname/IP', + ) + parser.add_argument( + '--redis-port', + required='REDIS' in sys.argv, + help='Redis server port', + ) parser.add_argument( '--ops', choices=['GET', 'SET', 'EXISTS', 'EVICT'], @@ -193,7 +208,7 @@ def main(argv: Sequence[str] | None = None) -> int: ) parser.add_argument( '--server', - required=True, + required='ENDPOINT' in sys.argv, help='Signaling server address for connecting to the remote endpoint', ) parser.add_argument( @@ -205,33 +220,38 @@ def main(argv: Sequence[str] | None = None) -> int: parser.add_argument( '--no-uvloop', action='store_true', - help='Override using uvloop if available', + help='Override using uvloop if available (for ENDPOINT backend only)', ) add_logging_options(parser) args = parser.parse_args(argv) init_logging(args.log_file, args.log_level, force=True) - if not args.no_uvloop: - try: - import uvloop - - uvloop.install() - logger.info('uvloop available... using as event loop') - except ImportError: # pragma: no cover - logger.info('uvloop unavailable... using asyncio event loop') + if args.backend == 'ENDPOINT': + if not args.no_uvloop: + try: + import uvloop + + uvloop.install() + logger.info('uvloop available... using as event loop') + except ImportError: # pragma: no cover + logger.info('uvloop unavailable... using asyncio event loop') + else: + logger.info('uvloop override... using asyncio event loop') + + asyncio.run( + runner_endpoint( + uuid.UUID(args.endpoint), + args.ops, + payload_sizes=args.payload_sizes, + repeat=args.repeat, + server=args.server, + csv_file=args.csv_file, + ), + ) + elif args.backend == 'REDIS': + pass else: - logger.info('uvloop override... using asyncio event loop') - - asyncio.run( - runner( - uuid.UUID(args.remote), - args.ops, - payload_sizes=args.payload_sizes, - repeat=args.repeat, - server=args.server, - csv_file=args.csv_file, - ), - ) + raise AssertionError('Unreachable.') return 0 diff --git a/tests/benchmarks/remote_ops/main_test.py b/tests/benchmarks/remote_ops/main_test.py index d14d4fc..fbff8b1 100644 --- a/tests/benchmarks/remote_ops/main_test.py +++ b/tests/benchmarks/remote_ops/main_test.py @@ -13,12 +13,12 @@ import pytest from psbench.benchmarks.remote_ops.main import main -from psbench.benchmarks.remote_ops.main import runner +from psbench.benchmarks.remote_ops.main import runner_endpoint @pytest.mark.asyncio -async def test_runner() -> None: - await runner( +async def test_runner_endpoint() -> None: + await runner_endpoint( None, ['GET', 'SET', 'EVICT', 'EXISTS'], payload_sizes=[100, 1000], @@ -28,10 +28,10 @@ async def test_runner() -> None: @pytest.mark.asyncio -async def test_csv_logging() -> None: +async def test_csv_logging_endpoint() -> None: with tempfile.NamedTemporaryFile() as f: assert len(f.readlines()) == 0 - await runner( + await runner_endpoint( remote_endpoint=None, ops=['EXISTS', 'EVICT'], payload_sizes=[1, 2, 3], @@ -44,13 +44,14 @@ async def test_csv_logging() -> None: def test_main() -> None: with mock.patch( - 'psbench.benchmarks.remote_ops.main.runner', + 'psbench.benchmarks.remote_ops.main.runner_endpoint', AsyncMock(), ): assert ( main( [ - '--remote', + 'ENDPOINT', + '--endpoint', str(uuid.uuid4()), '--ops', 'GET', @@ -66,7 +67,8 @@ def test_main() -> None: assert ( main( [ - '--remote', + 'ENDPOINT', + '--endpoint', str(uuid.uuid4()), '--ops', 'GET', From 5c80f6ff55ba4ebb841dfb21adcce3b5e51fb137 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Thu, 29 Sep 2022 13:34:58 -0500 Subject: [PATCH 3/4] Support Redis in remote_ops benchmark --- .pre-commit-config.yaml | 2 +- .../remote_ops/{ops.py => endpoint_ops.py} | 0 psbench/benchmarks/remote_ops/main.py | 140 ++++++++++++++++-- psbench/benchmarks/remote_ops/redis_ops.py | 122 +++++++++++++++ setup.cfg | 1 + testing/mocking.py | 30 ++++ .../{ops_test.py => endpoint_ops_test.py} | 2 +- tests/benchmarks/remote_ops/main_test.py | 47 +++++- tests/benchmarks/remote_ops/redis_ops_test.py | 35 +++++ tox.ini | 2 +- 10 files changed, 362 insertions(+), 19 deletions(-) rename psbench/benchmarks/remote_ops/{ops.py => endpoint_ops.py} (100%) create mode 100644 psbench/benchmarks/remote_ops/redis_ops.py create mode 100644 testing/mocking.py rename tests/benchmarks/remote_ops/{ops_test.py => endpoint_ops_test.py} (94%) create mode 100644 tests/benchmarks/remote_ops/redis_ops_test.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b38857e..b92e7e5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -55,4 +55,4 @@ repos: hooks: - id: mypy # Add any additional typed package dependencies here - additional_dependencies: [types-requests, proxystore==0.4.0a1] + additional_dependencies: [types-redis, types-requests, proxystore==0.4.0a1] diff --git a/psbench/benchmarks/remote_ops/ops.py b/psbench/benchmarks/remote_ops/endpoint_ops.py similarity index 100% rename from psbench/benchmarks/remote_ops/ops.py rename to psbench/benchmarks/remote_ops/endpoint_ops.py diff --git a/psbench/benchmarks/remote_ops/main.py b/psbench/benchmarks/remote_ops/main.py index 37d57ab..ff985e6 100644 --- a/psbench/benchmarks/remote_ops/main.py +++ b/psbench/benchmarks/remote_ops/main.py @@ -11,6 +11,7 @@ import socket import sys import uuid +from typing import Any from typing import NamedTuple from typing import Sequence @@ -19,30 +20,29 @@ else: # pragma: <3.8 cover from typing_extensions import Literal +import redis from proxystore.endpoint.endpoint import Endpoint from psbench.argparse import add_logging_options -from psbench.benchmarks.remote_ops.ops import test_evict -from psbench.benchmarks.remote_ops.ops import test_exists -from psbench.benchmarks.remote_ops.ops import test_get -from psbench.benchmarks.remote_ops.ops import test_set +import psbench.benchmarks.remote_ops.endpoint_ops as endpoint_ops +import psbench.benchmarks.remote_ops.redis_ops as redis_ops from psbench.csv import CSVLogger from psbench.logging import init_logging from psbench.logging import TESTING_LOG_LEVEL +BACKEND_TYPE = Literal['ENDPOINT', 'REDIS'] OP_TYPE = Literal['EVICT', 'EXISTS', 'GET', 'SET'] -logger = logging.getLogger('endpoint-peering') +logger = logging.getLogger('remote-ops') class RunStats(NamedTuple): """Stats for a given run configuration.""" + backend: BACKEND_TYPE op: OP_TYPE payload_size_bytes: int | None repeat: int - local_endpoint_uuid: str - remote_endpoint_uuid: str total_time_ms: float avg_time_ms: float min_time_ms: float @@ -75,18 +75,26 @@ async def run_endpoint( logger.log(TESTING_LOG_LEVEL, f'starting endpoint peering test for {op}') if op == 'EVICT': - times_ms = await test_evict(endpoint, remote_endpoint, repeat) + times_ms = await endpoint_ops.test_evict( + endpoint, + remote_endpoint, + repeat, + ) elif op == 'EXISTS': - times_ms = await test_exists(endpoint, remote_endpoint, repeat) + times_ms = await endpoint_ops.test_exists( + endpoint, + remote_endpoint, + repeat, + ) elif op == 'GET': - times_ms = await test_get( + times_ms = await endpoint_ops.test_get( endpoint, remote_endpoint, payload_size, repeat, ) elif op == 'SET': - times_ms = await test_set( + times_ms = await endpoint_ops.test_set( endpoint, remote_endpoint, payload_size, @@ -105,11 +113,65 @@ async def run_endpoint( ) return RunStats( + backend='ENDPOINT', + op=op, + payload_size_bytes=payload_size if op in ('GET', 'SET') else None, + repeat=repeat, + total_time_ms=sum(times_ms), + avg_time_ms=sum(times_ms) / len(times_ms), + min_time_ms=min(times_ms), + max_time_ms=max(times_ms), + avg_bandwidth_mbps=avg_bandwidth_mbps, + ) + + +def run_redis( + client: redis.StrictRedis[Any], + op: OP_TYPE, + payload_size: int = 0, + repeat: int = 3, +) -> RunStats: + """Run test for single operation and measure performance. + + Args: + client (StrictRedis): Redis client connected to remote server. + op (str): endpoint operation to test. + payload_size (int): bytes to send/receive for GET/SET operations. + repeat (int): number of times to repeat operation. If repeat is greater + than or equal to three, the slowest and fastest times will be + dropped to account for the first op being slower while establishing + a connection. + + Returns: + RunStats with summary of test run. + """ + logger.log(TESTING_LOG_LEVEL, f'starting remote redis test for {op}') + + if op == 'EVICT': + times_ms = redis_ops.test_evict(client, repeat) + elif op == 'EXISTS': + times_ms = redis_ops.test_exists(client, repeat) + elif op == 'GET': + times_ms = redis_ops.test_get(client, payload_size, repeat) + elif op == 'SET': + times_ms = redis_ops.test_set(client, payload_size, repeat) + else: + raise AssertionError(f'Unsupported operation {op}') + + if len(times_ms) >= 3: + times_ms = times_ms[1:-1] + + avg_time_s = sum(times_ms) / 1000 / len(times_ms) + payload_mb = payload_size / 1e6 + avg_bandwidth_mbps = ( + payload_mb / avg_time_s if op in ('GET', 'SET') else None + ) + + return RunStats( + backend='REDIS', op=op, payload_size_bytes=payload_size if op in ('GET', 'SET') else None, repeat=repeat, - local_endpoint_uuid=str(endpoint.uuid), - remote_endpoint_uuid=str(remote_endpoint), total_time_ms=sum(times_ms), avg_time_ms=sum(times_ms) / len(times_ms), min_time_ms=min(times_ms), @@ -127,7 +189,7 @@ async def runner_endpoint( server: str | None = None, csv_file: str | None = None, ) -> None: - """Run matrix of test test configurations. + """Run matrix of test test configurations with an Endpoint. Args: remote_endpoint (UUID): remote endpoint UUID to peer with. @@ -164,6 +226,47 @@ async def runner_endpoint( logger.log(TESTING_LOG_LEVEL, f'results logged to {csv_file}') +def runner_redis( + host: str, + port: int, + ops: list[OP_TYPE], + *, + payload_sizes: list[int], + repeat: int, + csv_file: str | None = None, +) -> None: + """Run matrix of test test configurations with a Redis server. + + Args: + host (str): remote Redis server hostname/IP. + port (int): remote Redis server port. + ops (str): endpoint operations to test. + payload_sizes (int): bytes to send/receive for GET/SET operations. + repeat (int): number of times to repeat operations. + csv_file (str): optional csv filepath to log results to. + """ + if csv_file is not None: + csv_logger = CSVLogger(csv_file, RunStats) + + client = redis.StrictRedis(host=host, port=port) + for op in ops: + for payload_size in payload_sizes: + run_stats = run_redis( + client, + op=op, + payload_size=payload_size, + repeat=repeat, + ) + + logger.log(TESTING_LOG_LEVEL, run_stats) + if csv_file is not None: + csv_logger.log(run_stats) + + if csv_file is not None: + csv_logger.close() + logger.log(TESTING_LOG_LEVEL, f'results logged to {csv_file}') + + def main(argv: Sequence[str] | None = None) -> int: """Remote ops test entrypoint.""" argv = argv if argv is not None else sys.argv[1:] @@ -250,7 +353,14 @@ def main(argv: Sequence[str] | None = None) -> int: ), ) elif args.backend == 'REDIS': - pass + runner_redis( + args.redis_host, + args.redis_port, + args.ops, + payload_sizes=args.payload_sizes, + repeat=args.repeat, + csv_file=args.csv_file, + ) else: raise AssertionError('Unreachable.') diff --git a/psbench/benchmarks/remote_ops/redis_ops.py b/psbench/benchmarks/remote_ops/redis_ops.py new file mode 100644 index 0000000..6c1a504 --- /dev/null +++ b/psbench/benchmarks/remote_ops/redis_ops.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +import time +from typing import Any + +import redis + +from psbench.utils import randbytes + + +def test_evict( + client: redis.StrictRedis[Any], + repeat: int = 1, +) -> list[float]: + """Test Redis eviction. + + Args: + client (StrictRedis): client connection to remote Redis server. + repeat (int): repeat the operation this many times (default: 1). + + Returns: + list of times for each operation to complete. + """ + times_ms: list[float] = [] + + for _ in range(repeat): + start = time.perf_counter_ns() + client.delete('missing-key') + end = time.perf_counter_ns() + times_ms.append((end - start) / 1e6) + + return times_ms + + +def test_exists( + client: redis.StrictRedis[Any], + repeat: int = 1, +) -> list[float]: + """Test Redis key exists. + + Args: + client (StrictRedis): client connection to remote Redis server. + repeat (int): repeat the operation this many times (default: 1). + + Returns: + list of times for each operation to complete. + """ + times_ms: list[float] = [] + + for _ in range(repeat): + start = time.perf_counter_ns() + client.exists('missing-key') + end = time.perf_counter_ns() + times_ms.append((end - start) / 1e6) + + return times_ms + + +def test_get( + client: redis.StrictRedis[Any], + payload_size_bytes: int, + repeat: int = 1, +) -> list[float]: + """Test Redis get data. + + Args: + client (StrictRedis): client connection to remote Redis server. + payload_size_bytes (int): size of payload to request from target + endpoint. + repeat (int): repeat the operation this many times (default: 1). + + Returns: + list of times for each operation to complete. + """ + times_ms: list[float] = [] + + data = randbytes(payload_size_bytes) + client.set('key', data) + + for _ in range(repeat): + start = time.perf_counter_ns() + res = client.get('key') + assert isinstance(res, bytes) + end = time.perf_counter_ns() + times_ms.append((end - start) / 1e6) + + client.delete('key') + + return times_ms + + +def test_set( + client: redis.StrictRedis[Any], + payload_size_bytes: int, + repeat: int = 1, +) -> list[float]: + """Test Redis set data. + + Args: + client (StrictRedis): client connection to remote Redis server. + payload_size_bytes (int): size of payload to request from target + endpoint. + repeat (int): repeat the operation this many times (default: 1). + + Returns: + list of times for each operation to complete. + """ + times_ms: list[float] = [] + + data = randbytes(payload_size_bytes) + + for i in range(repeat): + key = f'key-{i}' + start = time.perf_counter_ns() + client.set(key, data) + end = time.perf_counter_ns() + times_ms.append((end - start) / 1e6) + + # Evict key immediately to keep memory usage low + client.delete(key) + + return times_ms diff --git a/setup.cfg b/setup.cfg index 4d73c86..4d57414 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,6 +21,7 @@ install_requires = funcx>=1.0.3 funcx-endpoint>=1.0.3 proxystore[endpoints]==0.4.0a2 + redis>=3.4 requests python_requires = >=3.7 include_package_data = True diff --git a/testing/mocking.py b/testing/mocking.py new file mode 100644 index 0000000..840e8c6 --- /dev/null +++ b/testing/mocking.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from typing import Any + + +class MockStrictRedis: + """Mock StrictRedis.""" + + def __init__(self, *args, **kwargs): + """Init MockStrictRedis.""" + self.data = {} + + def delete(self, key: str) -> None: + """Delete key.""" + if key in self.data: + del self.data[key] + + def exists(self, key: str) -> bool: + """Check if key exists.""" + return key in self.data + + def get(self, key: str) -> Any: + """Get value with key.""" + if key in self.data: + return self.data[key] + return None # pragma: no cover + + def set(self, key: str, value: str | bytes | int | float) -> None: + """Set value with key.""" + self.data[key] = value diff --git a/tests/benchmarks/remote_ops/ops_test.py b/tests/benchmarks/remote_ops/endpoint_ops_test.py similarity index 94% rename from tests/benchmarks/remote_ops/ops_test.py rename to tests/benchmarks/remote_ops/endpoint_ops_test.py index 94e201f..bb88ee4 100644 --- a/tests/benchmarks/remote_ops/ops_test.py +++ b/tests/benchmarks/remote_ops/endpoint_ops_test.py @@ -7,7 +7,7 @@ import pytest_asyncio from proxystore.endpoint.endpoint import Endpoint -from psbench.benchmarks.remote_ops import ops +import psbench.benchmarks.remote_ops.endpoint_ops as ops @pytest_asyncio.fixture diff --git a/tests/benchmarks/remote_ops/main_test.py b/tests/benchmarks/remote_ops/main_test.py index fbff8b1..62be556 100644 --- a/tests/benchmarks/remote_ops/main_test.py +++ b/tests/benchmarks/remote_ops/main_test.py @@ -12,8 +12,10 @@ import pytest +from testing.mocking import MockStrictRedis from psbench.benchmarks.remote_ops.main import main from psbench.benchmarks.remote_ops.main import runner_endpoint +from psbench.benchmarks.remote_ops.main import runner_redis @pytest.mark.asyncio @@ -27,6 +29,17 @@ async def test_runner_endpoint() -> None: ) +def test_runner_redis() -> None: + with mock.patch('redis.StrictRedis', side_effect=MockStrictRedis): + runner_redis( + 'localhost', + 1234, + ['GET', 'SET', 'EVICT', 'EXISTS'], + payload_sizes=[100, 1000], + repeat=1, + ) + + @pytest.mark.asyncio async def test_csv_logging_endpoint() -> None: with tempfile.NamedTemporaryFile() as f: @@ -42,11 +55,26 @@ async def test_csv_logging_endpoint() -> None: assert len(f.readlines()) == 1 + (2 * 3) +def test_csv_logging_redis() -> None: + with tempfile.NamedTemporaryFile() as f: + assert len(f.readlines()) == 0 + with mock.patch('redis.StrictRedis', side_effect=MockStrictRedis): + runner_redis( + 'localhost', + 1234, + ops=['EXISTS', 'EVICT'], + payload_sizes=[1, 2, 3], + repeat=3, + csv_file=f.name, + ) + assert len(f.readlines()) == 1 + (2 * 3) + + def test_main() -> None: with mock.patch( 'psbench.benchmarks.remote_ops.main.runner_endpoint', AsyncMock(), - ): + ), mock.patch('psbench.benchmarks.remote_ops.main.runner_redis'): assert ( main( [ @@ -81,3 +109,20 @@ def test_main() -> None: ) == 0 ) + + assert ( + main( + [ + 'REDIS', + '--redis-host', + 'localhost', + '--redis-port', + '1234', + '--ops', + 'GET', + '--payload-sizes', + '1000', + ], + ) + == 0 + ) diff --git a/tests/benchmarks/remote_ops/redis_ops_test.py b/tests/benchmarks/remote_ops/redis_ops_test.py new file mode 100644 index 0000000..4dba4e4 --- /dev/null +++ b/tests/benchmarks/remote_ops/redis_ops_test.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from typing import Any +from typing import Generator + +import pytest +import redis + +import psbench.benchmarks.remote_ops.redis_ops as ops +from testing.mocking import MockStrictRedis + + +@pytest.fixture +def client() -> Generator[redis.StrictRedis[Any], None, None]: + yield MockStrictRedis() # type: ignore + + +def test_evict(client: redis.StrictRedis[Any]) -> None: + times = ops.test_evict(client, 2) + assert len(times) == 2 + + +def test_exists(client: redis.StrictRedis[Any]) -> None: + times = ops.test_exists(client, 2) + assert len(times) == 2 + + +def test_get(client: redis.StrictRedis[Any]) -> None: + times = ops.test_get(client, 100, 2) + assert len(times) == 2 + + +def test_set(client: redis.StrictRedis[Any]) -> None: + times = ops.test_set(client, 100, 2) + assert len(times) == 2 diff --git a/tox.ini b/tox.ini index d1bf5d2..95a1aab 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py37, py38, py39, py310, pre-commit, docs +envlist = py37, py38, py39, py310, pre-commit [testenv] deps = -rrequirements-dev.txt From 350700d0b8ac513cdc8c6e4adf2d045c39a7b391 Mon Sep 17 00:00:00 2001 From: Greg Pauloski <18683347+gpauloski@users.noreply.github.com> Date: Thu, 29 Sep 2022 15:36:12 -0500 Subject: [PATCH 4/4] Fix repeated evict/exists ops --- psbench/benchmarks/remote_ops/main.py | 50 +++++++++++++----------- tests/benchmarks/remote_ops/main_test.py | 9 +++-- 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/psbench/benchmarks/remote_ops/main.py b/psbench/benchmarks/remote_ops/main.py index ff985e6..ea37762 100644 --- a/psbench/benchmarks/remote_ops/main.py +++ b/psbench/benchmarks/remote_ops/main.py @@ -208,18 +208,20 @@ async def runner_endpoint( signaling_server=server, ) as endpoint: for op in ops: - for payload_size in payload_sizes: - run_stats = await run_endpoint( - endpoint, - remote_endpoint=remote_endpoint, - op=op, - payload_size=payload_size, - repeat=repeat, - ) - - logger.log(TESTING_LOG_LEVEL, run_stats) - if csv_file is not None: - csv_logger.log(run_stats) + for i, payload_size in enumerate(payload_sizes): + # Only need to repeat for payload_size for GET/SET + if i == 0 or op in ['GET', 'SET']: + run_stats = await run_endpoint( + endpoint, + remote_endpoint=remote_endpoint, + op=op, + payload_size=payload_size, + repeat=repeat, + ) + + logger.log(TESTING_LOG_LEVEL, run_stats) + if csv_file is not None: + csv_logger.log(run_stats) if csv_file is not None: csv_logger.close() @@ -250,17 +252,19 @@ def runner_redis( client = redis.StrictRedis(host=host, port=port) for op in ops: - for payload_size in payload_sizes: - run_stats = run_redis( - client, - op=op, - payload_size=payload_size, - repeat=repeat, - ) - - logger.log(TESTING_LOG_LEVEL, run_stats) - if csv_file is not None: - csv_logger.log(run_stats) + for i, payload_size in enumerate(payload_sizes): + # Only need to repeat for payload_size for GET/SET + if i == 0 or op in ['GET', 'SET']: + run_stats = run_redis( + client, + op=op, + payload_size=payload_size, + repeat=repeat, + ) + + logger.log(TESTING_LOG_LEVEL, run_stats) + if csv_file is not None: + csv_logger.log(run_stats) if csv_file is not None: csv_logger.close() diff --git a/tests/benchmarks/remote_ops/main_test.py b/tests/benchmarks/remote_ops/main_test.py index 62be556..ecf95e4 100644 --- a/tests/benchmarks/remote_ops/main_test.py +++ b/tests/benchmarks/remote_ops/main_test.py @@ -46,13 +46,14 @@ async def test_csv_logging_endpoint() -> None: assert len(f.readlines()) == 0 await runner_endpoint( remote_endpoint=None, - ops=['EXISTS', 'EVICT'], + ops=['EXISTS', 'EVICT', 'GET'], payload_sizes=[1, 2, 3], repeat=3, server=None, csv_file=f.name, ) - assert len(f.readlines()) == 1 + (2 * 3) + # 1 for header, 1 for exists, 1 for evict, 3 for get (3 payload sizes) + assert len(f.readlines()) == 1 + 2 + 3 def test_csv_logging_redis() -> None: @@ -62,12 +63,12 @@ def test_csv_logging_redis() -> None: runner_redis( 'localhost', 1234, - ops=['EXISTS', 'EVICT'], + ops=['EXISTS', 'EVICT', 'GET'], payload_sizes=[1, 2, 3], repeat=3, csv_file=f.name, ) - assert len(f.readlines()) == 1 + (2 * 3) + assert len(f.readlines()) == 1 + 2 + 3 def test_main() -> None: