diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index c51b82f12..83e43d443 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -31,9 +31,9 @@ ) @click.option( "-n", - "--num-workers", - "num_workers", - help="Number of worker processes. Defaults to single processing", + "--num-processes", + "num_processes", + help="Number of processes to spawn for each worker. Defaults to single processing", default=1, type=click.IntRange(1), ) @@ -56,13 +56,35 @@ help="Port for distributed communication." " mrun will find an open port if None is provided to the manager", ) -@click.option("-N", "--num-chunks", "num_chunks", default=0, type=int) -@click.option("-w", "--num-workers", "num_workers", default=0, type=int) +@click.option( + "-N", + "--num-chunks", + "num_chunks", + default=0, + type=int, + help="Number of chunks to distribute to workers", +) +@click.option( + "-d", + "--num-workers", + "num_workers", + default=0, + type=int, + help="Number of distributed workers to process chunks", +) @click.option( "--no_bars", is_flag=True, help="Turns of Progress Bars for headless operations" ) def run( - builders, verbosity, reporting_store, num_workers, url, port, num_chunks, no_bars + builders, + verbosity, + reporting_store, + num_workers, + url, + port, + num_chunks, + no_bars, + num_processes, ): # Set Logging @@ -101,12 +123,18 @@ def run( root.critical(f"Using random port for mrun manager: {port}") loop.run_until_complete( manager( - url=url, port=port, builders=builder_objects, num_chunks=num_chunks, num_workers=num_workers + url=url, + port=port, + builders=builder_objects, + num_chunks=num_chunks, + num_workers=num_workers, ) ) else: # worker - loop.run_until_complete(worker(url=url, port=port, num_workers=num_workers)) + loop.run_until_complete( + worker(url=url, port=port, num_processes=num_processes) + ) else: if num_workers == 1: for builder in builder_objects: diff --git a/src/maggma/cli/distributed.py b/src/maggma/cli/distributed.py index baba10240..623b9ea6e 100644 --- a/src/maggma/cli/distributed.py +++ b/src/maggma/cli/distributed.py @@ -80,7 +80,7 @@ async def manager( socket.close() -async def worker(url: str, port: int, num_workers: int): +async def worker(url: str, port: int, num_processes: int): """ Simple distributed worker that connects to a manager asks for work and deploys using multiprocessing @@ -105,7 +105,7 @@ async def worker(url: str, port: int, num_workers: int): if "@class" in work and "@module" in work: # We have a valid builder builder = MontyDecoder().process_decoded(work) - await multi(builder, num_workers) + await multi(builder, num_processes) elif work == "EXIT": # End the worker running = False