From 2bd945f803bec3769668729a89c8532c330affd7 Mon Sep 17 00:00:00 2001 From: Jason Munro Date: Wed, 26 Jan 2022 11:53:30 -0800 Subject: [PATCH] Add worker error test --- tests/cli/test_distributed.py | 38 +++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/cli/test_distributed.py b/tests/cli/test_distributed.py index 6cbefedbc..f9708400a 100644 --- a/tests/cli/test_distributed.py +++ b/tests/cli/test_distributed.py @@ -41,6 +41,17 @@ def prechunk(self, num_chunks): return [{"val": i} for i in range(num_chunks)] +class DummyBuilderError(DummyBuilderWithNoPrechunk): + def prechunk(self, num_chunks): + return [{"val": i} for i in range(num_chunks)] + + def get_items(self): + raise ValueError("Dummy error") + + def process_items(self, items): + raise ValueError("Dummy error") + + SERVER_URL = "tcp://127.0.0.1" SERVER_PORT = 8234 @@ -106,6 +117,33 @@ async def test_worker(): worker_task.cancel() +@pytest.mark.asyncio +async def test_worker_error(): + context = zmq.Context() + socket = context.socket(REP) + socket.bind(f"{SERVER_URL}:{SERVER_PORT}") + + worker_task = asyncio.create_task(worker(SERVER_URL, SERVER_PORT, num_workers=1)) + + message = await socket.recv() + assert message == HOSTNAME.encode("utf-8") + + dummy_work = { + "@module": "tests.cli.test_distributed", + "@class": "DummyBuilderError", + "@version": None, + "dummy_prechunk": False, + "val": 0, + } + + await socket.send(json.dumps(dummy_work).encode("utf-8")) + await asyncio.sleep(1) + message = await socket.recv() + assert message.decode("utf-8") == "ERROR" + + worker_task.cancel() + + @pytest.mark.asyncio async def test_no_prechunk(caplog):