Skip to content

Commit

Permalink
Add worker error test
Browse files Browse the repository at this point in the history
  • Loading branch information
munrojm committed Jan 26, 2022
1 parent dd58756 commit 2bd945f
Showing 1 changed file with 38 additions and 0 deletions.
38 changes: 38 additions & 0 deletions tests/cli/test_distributed.py
Expand Up @@ -41,6 +41,17 @@ def prechunk(self, num_chunks):
return [{"val": i} for i in range(num_chunks)]


class DummyBuilderError(DummyBuilderWithNoPrechunk):
def prechunk(self, num_chunks):
return [{"val": i} for i in range(num_chunks)]

def get_items(self):
raise ValueError("Dummy error")

def process_items(self, items):
raise ValueError("Dummy error")


SERVER_URL = "tcp://127.0.0.1"
SERVER_PORT = 8234

Expand Down Expand Up @@ -106,6 +117,33 @@ async def test_worker():
worker_task.cancel()


@pytest.mark.asyncio
async def test_worker_error():
context = zmq.Context()
socket = context.socket(REP)
socket.bind(f"{SERVER_URL}:{SERVER_PORT}")

worker_task = asyncio.create_task(worker(SERVER_URL, SERVER_PORT, num_workers=1))

message = await socket.recv()
assert message == HOSTNAME.encode("utf-8")

dummy_work = {
"@module": "tests.cli.test_distributed",
"@class": "DummyBuilderError",
"@version": None,
"dummy_prechunk": False,
"val": 0,
}

await socket.send(json.dumps(dummy_work).encode("utf-8"))
await asyncio.sleep(1)
message = await socket.recv()
assert message.decode("utf-8") == "ERROR"

worker_task.cancel()


@pytest.mark.asyncio
async def test_no_prechunk(caplog):

Expand Down

0 comments on commit 2bd945f

Please sign in to comment.