Skip to content

Commit

Permalink
Rename num_workers
Browse files Browse the repository at this point in the history
  • Loading branch information
munrojm committed Jan 26, 2022
1 parent c27da97 commit 4104335
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 10 deletions.
44 changes: 36 additions & 8 deletions src/maggma/cli/__init__.py
Expand Up @@ -31,9 +31,9 @@
)
@click.option(
"-n",
"--num-workers",
"num_workers",
help="Number of worker processes. Defaults to single processing",
"--num-processes",
"num_processes",
help="Number of processes to spawn for each worker. Defaults to single processing",
default=1,
type=click.IntRange(1),
)
Expand All @@ -56,13 +56,35 @@
help="Port for distributed communication."
" mrun will find an open port if None is provided to the manager",
)
@click.option("-N", "--num-chunks", "num_chunks", default=0, type=int)
@click.option("-w", "--num-workers", "num_workers", default=0, type=int)
@click.option(
"-N",
"--num-chunks",
"num_chunks",
default=0,
type=int,
help="Number of chunks to distribute to workers",
)
@click.option(
"-d",
"--num-workers",
"num_workers",
default=0,
type=int,
help="Number of distributed workers to process chunks",
)
@click.option(
"--no_bars", is_flag=True, help="Turns of Progress Bars for headless operations"
)
def run(
builders, verbosity, reporting_store, num_workers, url, port, num_chunks, no_bars
builders,
verbosity,
reporting_store,
num_workers,
url,
port,
num_chunks,
no_bars,
num_processes,
):

# Set Logging
Expand Down Expand Up @@ -101,12 +123,18 @@ def run(
root.critical(f"Using random port for mrun manager: {port}")
loop.run_until_complete(
manager(
url=url, port=port, builders=builder_objects, num_chunks=num_chunks, num_workers=num_workers
url=url,
port=port,
builders=builder_objects,
num_chunks=num_chunks,
num_workers=num_workers,
)
)
else:
# worker
loop.run_until_complete(worker(url=url, port=port, num_workers=num_workers))
loop.run_until_complete(
worker(url=url, port=port, num_processes=num_processes)
)
else:
if num_workers == 1:
for builder in builder_objects:
Expand Down
4 changes: 2 additions & 2 deletions src/maggma/cli/distributed.py
Expand Up @@ -80,7 +80,7 @@ async def manager(
socket.close()


async def worker(url: str, port: int, num_workers: int):
async def worker(url: str, port: int, num_processes: int):
"""
Simple distributed worker that connects to a manager asks for work and deploys
using multiprocessing
Expand All @@ -105,7 +105,7 @@ async def worker(url: str, port: int, num_workers: int):
if "@class" in work and "@module" in work:
# We have a valid builder
builder = MontyDecoder().process_decoded(work)
await multi(builder, num_workers)
await multi(builder, num_processes)
elif work == "EXIT":
# End the worker
running = False
Expand Down

0 comments on commit 4104335

Please sign in to comment.