Skip to content

Commit

Permalink
Merge pull request #149 from materialsproject/dependabot/pip/pytest-a…
Browse files Browse the repository at this point in the history
…syncio-0.11.0

Bump pytest-asyncio from 0.10.0 to 0.11.0
  • Loading branch information
shyamd committed May 2, 2020
2 parents 00cdf91 + 4cc23d4 commit b562b3d
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 22 deletions.
2 changes: 1 addition & 1 deletion requirements-testing.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pre-commit==2.3.0
pytest==5.4.1
pytest-asyncio==0.10.0
pytest-asyncio==0.11.0
pytest-cov==2.8.1
pytest-mock==3.1.0
moto==1.3.14
Expand Down
9 changes: 6 additions & 3 deletions src/maggma/cli/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@ async def master(url: str, builders: List[Builder], num_chunks: int):
try:

builder.connect()
chunks_dicts = builder.prechunk(num_chunks)
chunks_dicts = list(builder.prechunk(num_chunks))

logger.info(f"Distributing {num_chunks} chunks to workers")
logger.info(f"Distributing {len(chunks_dicts)} chunks to workers")
for chunk_dict in tqdm(chunks_dicts, desc="Chunks"):
temp_builder_dict = dict(**builder_dict)
temp_builder_dict.update(chunk_dict)
temp_builder_dict = jsanitize(temp_builder_dict)

# Wait for client connection that announces client and says it is ready to do work
logger.debug("Waiting for a worker")
worker = await workers.arecv_msg()
logger.debug(
f"Got connection from worker: {worker.pipe.remote_address}"
Expand All @@ -57,7 +58,9 @@ async def master(url: str, builders: List[Builder], num_chunks: int):
)

# Clean up and tell workers to shut down
await wait([pipe.asend("{}".encode("utf-8")) for pipe in workers.pipes])
await wait(
[pipe.asend(json.dumps({}).encode("utf-8")) for pipe in workers.pipes]
)


async def worker(url: str, num_workers: int):
Expand Down
33 changes: 15 additions & 18 deletions tests/cli/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,35 @@ def prechunk(self, num_chunks):
return [{"val": i} for i in range(num_chunks)]


@pytest.fixture
async def master_server():
SERVER_URL = "tcp://127.0.0.1:8234"


@pytest.fixture("function")
async def master_server(event_loop, log_to_stdout):

task = asyncio.create_task(
master(
"tcp://127.0.0.1:8234", [DummyBuilder(dummy_prechunk=False)], num_chunks=10
)
master(SERVER_URL, [DummyBuilder(dummy_prechunk=False)], num_chunks=10)
)
yield
yield task
task.cancel()


@pytest.mark.asyncio
async def test_master_wait_for_ready(master_server):
with Pair1(
dial="tcp://127.0.0.1:8234", polyamorous=True, recv_timeout=100
) as master:
with Pair1(dial=SERVER_URL, polyamorous=True, recv_timeout=100) as master:
with pytest.raises(Timeout):
master.recv()


@pytest.mark.asyncio
async def test_master_give_out_chunks(master_server):

with Pair1(dial="tcp://127.0.0.1:8234", polyamorous=True) as master_socket:
async def test_master_give_out_chunks(master_server, log_to_stdout):
with Pair1(dial=SERVER_URL, polyamorous=True, recv_timeout=500) as master_socket:

for i in range(0, 10):
log_to_stdout.debug(f"Going to ask Master for work: {i}")
await master_socket.asend(b"Ready")
message = await master_socket.arecv()

print(message)
work = json.loads(message.decode("utf-8"))

assert work["@class"] == "DummyBuilder"
Expand All @@ -77,11 +76,9 @@ async def test_master_give_out_chunks(master_server):

@pytest.mark.asyncio
async def test_worker():
with Pair1(
listen="tcp://127.0.0.1:8234", polyamorous=True, recv_timeout=100
) as worker_socket:
with Pair1(listen=SERVER_URL, polyamorous=True, recv_timeout=100) as worker_socket:

worker_task = asyncio.create_task(worker("tcp://127.0.0.1:8234", num_workers=1))
worker_task = asyncio.create_task(worker(SERVER_URL, num_workers=1))

message = await worker_socket.arecv()
assert message == b"Ready"
Expand Down Expand Up @@ -113,7 +110,7 @@ async def test_no_prechunk(caplog):

asyncio.create_task(
master(
"tcp://127.0.0.1:8234",
SERVER_URL,
[DummyBuilderWithNoPrechunk(dummy_prechunk=False)],
num_chunks=10,
)
Expand Down
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from pathlib import Path
import sys
import pytest
import logging


@pytest.fixture
Expand All @@ -14,3 +16,17 @@ def db_json(test_dir):
db_dir = test_dir / "settings_files"
db_json = db_dir / "db.json"
return db_json.resolve()


@pytest.fixture
def log_to_stdout():
# Set Logging
root = logging.getLogger()
root.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
ch.setFormatter(formatter)
root.addHandler(ch)
return root

0 comments on commit b562b3d

Please sign in to comment.