From 43c933021692d9c055dd28ef2f96e1d9ea331245 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 4 Aug 2023 11:16:57 -0600 Subject: [PATCH] Improve the code quality --- pympipool/interfaces/fluxbroker.py | 53 +++++++++++------------- pympipool/interfaces/taskbroker.py | 2 +- pympipool/interfaces/taskexecutor.py | 2 +- pympipool/legacy/interfaces/executor.py | 2 +- pympipool/legacy/shared/backend.py | 4 +- pympipool/shared/backend.py | 4 +- pympipool/{interfaces => shared}/base.py | 3 ++ pympipool/shared/broker.py | 10 ++--- pympipool/shared/communication.py | 4 +- pympipool/shared/taskexecutor.py | 4 +- tests/test_meta.py | 12 +++--- 11 files changed, 49 insertions(+), 51 deletions(-) rename pympipool/{interfaces => shared}/base.py (96%) diff --git a/pympipool/interfaces/fluxbroker.py b/pympipool/interfaces/fluxbroker.py index 69663c10..19bc9b81 100644 --- a/pympipool/interfaces/fluxbroker.py +++ b/pympipool/interfaces/fluxbroker.py @@ -5,14 +5,14 @@ from time import sleep from pympipool.shared.broker import ( - _get_future_done, - _execute_task_dict, + get_future_done, + execute_task_dict, ) -from pympipool.interfaces.base import ExecutorBase +from pympipool.shared.base import ExecutorBase from pympipool.shared.thread import RaisingThread from pympipool.shared.taskexecutor import ( cloudpickle_register, - _execute_parallel_tasks_loop, + execute_parallel_tasks_loop, ) from pympipool.shared.connections import FluxPythonInterface from pympipool.shared.communication import SocketInterface @@ -29,33 +29,28 @@ class SingleTaskExecutor(ExecutorBase): Args: cores (int): defines the number of MPI ranks to use for each function call gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter Simple example: ``` - import numpy as np - from pympipool import Executor - - def calc(i, j, k): - from mpi4py import MPI - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - return np.array([i, j, k]), size, rank - - def init_k(): - return {"k": 3} - - with Executor(cores=2, init_function=init_k) as p: - fs = p.submit(calc, 2, j=4) - print(fs.result()) - - >>> [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + >>> import numpy as np + >>> from pympipool import Executor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with Executor(cores=2, init_function=init_k) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] ``` """ @@ -149,7 +144,7 @@ def execute_parallel_tasks( gpus_per_core=gpus_per_task, executor=executor, ) - _execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) + execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) def interface_bootup( @@ -203,7 +198,7 @@ def executor_broker( except queue.Empty: sleep(sleep_interval) else: - if _execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): + if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): future_queue.task_done() else: future_queue.task_done() @@ -219,7 +214,7 @@ def _get_executor_list( executor=None, ): return { - _get_future_done(): SingleTaskExecutor( + get_future_done(): SingleTaskExecutor( cores=cores_per_worker, gpus_per_task=int(gpus_per_worker / cores_per_worker), init_function=init_function, diff --git a/pympipool/interfaces/taskbroker.py b/pympipool/interfaces/taskbroker.py index 23b4c3c9..1b962f9d 100644 --- a/pympipool/interfaces/taskbroker.py +++ b/pympipool/interfaces/taskbroker.py @@ -1,4 +1,4 @@ -from pympipool.interfaces.base import ExecutorBase +from pympipool.shared.base import ExecutorBase from pympipool.shared.thread import RaisingThread from pympipool.shared.broker import executor_broker diff --git a/pympipool/interfaces/taskexecutor.py b/pympipool/interfaces/taskexecutor.py index 5fae9c43..4315e309 100644 --- a/pympipool/interfaces/taskexecutor.py +++ b/pympipool/interfaces/taskexecutor.py @@ -1,4 +1,4 @@ -from pympipool.interfaces.base import ExecutorBase +from pympipool.shared.base import ExecutorBase from pympipool.shared.thread import RaisingThread from pympipool.shared.taskexecutor import ( execute_parallel_tasks, diff --git a/pympipool/legacy/interfaces/executor.py b/pympipool/legacy/interfaces/executor.py index 56b2905e..62c62fb6 100644 --- a/pympipool/legacy/interfaces/executor.py +++ b/pympipool/legacy/interfaces/executor.py @@ -1,4 +1,4 @@ -from pympipool.interfaces.base import ExecutorBase +from pympipool.shared.base import ExecutorBase from pympipool.shared.thread import RaisingThread from pympipool.legacy.shared.interface import execute_serial_tasks from pympipool.shared.taskexecutor import cloudpickle_register diff --git a/pympipool/legacy/shared/backend.py b/pympipool/legacy/shared/backend.py index 854297ad..6ab70d36 100644 --- a/pympipool/legacy/shared/backend.py +++ b/pympipool/legacy/shared/backend.py @@ -1,6 +1,6 @@ from tqdm import tqdm -from pympipool.shared.backend import call_funct, _update_default_dict_from_arguments +from pympipool.shared.backend import call_funct, update_default_dict_from_arguments def map_funct(executor, funct, lst, chunksize=1, cores_per_task=1, map_flag=True): @@ -42,7 +42,7 @@ def parse_arguments(argument_lst): Returns: dict: dictionary with the parsed arguments and their corresponding values """ - return _update_default_dict_from_arguments( + return update_default_dict_from_arguments( argument_lst=argument_lst, argument_dict={ "total_cores": "--cores-total", diff --git a/pympipool/shared/backend.py b/pympipool/shared/backend.py index 9821e750..5c917df9 100644 --- a/pympipool/shared/backend.py +++ b/pympipool/shared/backend.py @@ -40,7 +40,7 @@ def parse_arguments(argument_lst): Returns: dict: dictionary with the parsed arguments and their corresponding values """ - return _update_default_dict_from_arguments( + return update_default_dict_from_arguments( argument_lst=argument_lst, argument_dict={ "zmqport": "--zmqport", @@ -50,7 +50,7 @@ def parse_arguments(argument_lst): ) -def _update_default_dict_from_arguments(argument_lst, argument_dict, default_dict): +def update_default_dict_from_arguments(argument_lst, argument_dict, default_dict): default_dict.update( { k: argument_lst[argument_lst.index(v) + 1] diff --git a/pympipool/interfaces/base.py b/pympipool/shared/base.py similarity index 96% rename from pympipool/interfaces/base.py rename to pympipool/shared/base.py index 0a95e895..bba80301 100644 --- a/pympipool/interfaces/base.py +++ b/pympipool/shared/base.py @@ -9,6 +9,9 @@ def __init__(self): self._future_queue = queue.Queue() self._process = None + def future_queue(self): + return self._future_queue + def submit(self, fn, *args, **kwargs): """Submits a callable to be executed with the given arguments. diff --git a/pympipool/shared/broker.py b/pympipool/shared/broker.py index bf7790d8..f2c01091 100644 --- a/pympipool/shared/broker.py +++ b/pympipool/shared/broker.py @@ -37,18 +37,18 @@ def executor_broker( except queue.Empty: sleep(sleep_interval) else: - if _execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): + if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): future_queue.task_done() else: future_queue.task_done() break -def _execute_task_dict(task_dict, meta_future_lst): +def execute_task_dict(task_dict, meta_future_lst): if "fn" in task_dict.keys(): meta_future = next(as_completed(meta_future_lst.keys())) executor = meta_future_lst.pop(meta_future) - executor._future_queue.put(task_dict) + executor.future_queue.put(task_dict) meta_future_lst[task_dict["future"]] = executor return True elif "shutdown" in task_dict.keys() and task_dict["shutdown"]: @@ -72,7 +72,7 @@ def _get_executor_list( queue_adapter_kwargs=None, ): return { - _get_future_done(): Executor( + get_future_done(): Executor( cores=cores_per_worker, gpus_per_task=int(gpus_per_worker / cores_per_worker), oversubscribe=oversubscribe, @@ -87,7 +87,7 @@ def _get_executor_list( } -def _get_future_done(): +def get_future_done(): f = Future() f.set_result(True) return f diff --git a/pympipool/shared/communication.py b/pympipool/shared/communication.py index 35969c33..09fecb6c 100644 --- a/pympipool/shared/communication.py +++ b/pympipool/shared/communication.py @@ -1,5 +1,5 @@ import cloudpickle -import socket +from socket import gethostname import zmq from pympipool.shared.connections import get_connection_interface @@ -111,7 +111,7 @@ def interface_bootup( if enable_flux_backend or enable_slurm_backend or queue_adapter is not None: command_lst += [ "--host", - socket.gethostname(), + gethostname(), ] connections = get_connection_interface( cwd=cwd, diff --git a/pympipool/shared/taskexecutor.py b/pympipool/shared/taskexecutor.py index 8ee7367d..4bffcd2b 100644 --- a/pympipool/shared/taskexecutor.py +++ b/pympipool/shared/taskexecutor.py @@ -88,10 +88,10 @@ def execute_parallel_tasks( queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs, ) - _execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) + execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) -def _execute_parallel_tasks_loop(interface, future_queue): +def execute_parallel_tasks_loop(interface, future_queue): while True: task_dict = future_queue.get() if "shutdown" in task_dict.keys() and task_dict["shutdown"]: diff --git a/tests/test_meta.py b/tests/test_meta.py index ef72853c..d4351a44 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -3,8 +3,8 @@ import unittest from pympipool.shared.broker import ( executor_broker, - _execute_task_dict, - _get_future_done, + execute_task_dict, + get_future_done, _get_executor_list, ) @@ -24,7 +24,7 @@ def mpi_funct(i): class TestFutureCreation(unittest.TestCase): def test_get_future_done(self): - f = _get_future_done() + f = get_future_done() self.assertTrue(isinstance(f, Future)) self.assertTrue(f.done()) @@ -43,13 +43,13 @@ def test_meta_executor_future(self): def test_execute_task_dict(self): meta_future_lst = _get_executor_list(max_workers=1) f = Future() - self.assertTrue(_execute_task_dict( + self.assertTrue(execute_task_dict( task_dict={"fn": calc, "args": (1,), "kwargs": {}, "future": f}, meta_future_lst=meta_future_lst )) self.assertEqual(f.result(), 1) self.assertTrue(f.done()) - self.assertFalse(_execute_task_dict( + self.assertFalse(execute_task_dict( task_dict={"shutdown": True, "wait": True}, meta_future_lst=meta_future_lst )) @@ -57,7 +57,7 @@ def test_execute_task_dict(self): def test_execute_task_dict_error(self): meta_future_lst = _get_executor_list(max_workers=1) with self.assertRaises(ValueError): - _execute_task_dict(task_dict={}, meta_future_lst=meta_future_lst) + execute_task_dict(task_dict={}, meta_future_lst=meta_future_lst) list(meta_future_lst.values())[0].shutdown(wait=True) def test_executor_broker(self):