Skip to content

Commit

Permalink
Linting
Browse files Browse the repository at this point in the history
  • Loading branch information
munrojm committed Sep 27, 2022
1 parent d0c7c49 commit ea18f55
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions src/maggma/cli/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ def manager(

# If workers send messages decode and figure out what do
if connections:
identity, _, msg = socket.recv_multipart()
identity, _, bmsg = socket.recv_multipart()

msg = msg.decode("utf-8")
msg = bmsg.decode("utf-8")

if "READY" in msg:
if identity not in workers:
Expand Down Expand Up @@ -227,7 +227,7 @@ async def worker(url: str, port: int, num_processes: int, no_bars: bool):
logger.info(f"Connnecting to Manager at {url}:{port}")
context = azmq.Context()
socket = context.socket(zmq.REQ)

socket.setsockopt_string(zmq.IDENTITY, identity)
socket.connect(f"{url}:{port}")

Expand All @@ -239,11 +239,12 @@ async def worker(url: str, port: int, num_processes: int, no_bars: bool):
while running:
await socket.send("READY_{}".format(hostname).encode("utf-8"))
try:
message = await asyncio.wait_for(socket.recv(), timeout=MANAGER_TIMEOUT)
bmessage: bytes = await asyncio.wait_for(socket.recv(), timeout=MANAGER_TIMEOUT) # type: ignore
except asyncio.TimeoutError:
socket.close()
raise RuntimeError("Stopping work as manager timed out.")
message = message.decode("utf-8")

message = bmessage.decode("utf-8")
if "@class" in message and "@module" in message:
# We have a valid builder
work = json.loads(message)
Expand Down

0 comments on commit ea18f55

Please sign in to comment.