From 5288c91eb921913c228c745823844ff04e3394af Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 4 Aug 2023 11:28:28 -0600 Subject: [PATCH 1/4] Import gethostname() directly --- pympipool/interfaces/fluxbroker.py | 4 ++-- pympipool/shared/communication.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pympipool/interfaces/fluxbroker.py b/pympipool/interfaces/fluxbroker.py index 69663c10..83cca5ba 100644 --- a/pympipool/interfaces/fluxbroker.py +++ b/pympipool/interfaces/fluxbroker.py @@ -1,6 +1,6 @@ import os import queue -import socket +from socket import gethostname import sys from time import sleep @@ -161,7 +161,7 @@ def interface_bootup( ): command_lst += [ "--host", - socket.gethostname(), + gethostname(), ] connections = FluxPythonInterface( cwd=cwd, diff --git a/pympipool/shared/communication.py b/pympipool/shared/communication.py index 35969c33..09fecb6c 100644 --- a/pympipool/shared/communication.py +++ b/pympipool/shared/communication.py @@ -1,5 +1,5 @@ import cloudpickle -import socket +from socket import gethostname import zmq from pympipool.shared.connections import get_connection_interface @@ -111,7 +111,7 @@ def interface_bootup( if enable_flux_backend or enable_slurm_backend or queue_adapter is not None: command_lst += [ "--host", - socket.gethostname(), + gethostname(), ] connections = get_connection_interface( cwd=cwd, From 47d77ab48604abdc92f37b465ce0789cff3c1cf4 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 4 Aug 2023 11:34:41 -0600 Subject: [PATCH 2/4] Code improvements --- pympipool/interfaces/base.py | 4 ++++ pympipool/interfaces/fluxbroker.py | 17 ++++++----------- pympipool/legacy/shared/backend.py | 4 ++-- pympipool/shared/backend.py | 4 ++-- pympipool/shared/broker.py | 6 +++--- pympipool/shared/taskexecutor.py | 4 ++-- 6 files changed, 19 insertions(+), 20 deletions(-) diff --git a/pympipool/interfaces/base.py b/pympipool/interfaces/base.py index 0a95e895..30cdfa15 100644 --- a/pympipool/interfaces/base.py +++ b/pympipool/interfaces/base.py @@ -9,6 +9,10 @@ def __init__(self): self._future_queue = queue.Queue() self._process = None + @property + def future_queue(self): + return self._future_queue + def submit(self, fn, *args, **kwargs): """Submits a callable to be executed with the given arguments. diff --git a/pympipool/interfaces/fluxbroker.py b/pympipool/interfaces/fluxbroker.py index 83cca5ba..e1611839 100644 --- a/pympipool/interfaces/fluxbroker.py +++ b/pympipool/interfaces/fluxbroker.py @@ -5,14 +5,14 @@ from time import sleep from pympipool.shared.broker import ( - _get_future_done, - _execute_task_dict, + get_future_done, + execute_task_dict, ) from pympipool.interfaces.base import ExecutorBase from pympipool.shared.thread import RaisingThread from pympipool.shared.taskexecutor import ( cloudpickle_register, - _execute_parallel_tasks_loop, + execute_parallel_tasks_loop, ) from pympipool.shared.connections import FluxPythonInterface from pympipool.shared.communication import SocketInterface @@ -29,13 +29,8 @@ class SingleTaskExecutor(ExecutorBase): Args: cores (int): defines the number of MPI ranks to use for each function call gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter Simple example: ``` @@ -149,7 +144,7 @@ def execute_parallel_tasks( gpus_per_core=gpus_per_task, executor=executor, ) - _execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) + execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) def interface_bootup( @@ -203,7 +198,7 @@ def executor_broker( except queue.Empty: sleep(sleep_interval) else: - if _execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): + if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): future_queue.task_done() else: future_queue.task_done() @@ -219,7 +214,7 @@ def _get_executor_list( executor=None, ): return { - _get_future_done(): SingleTaskExecutor( + get_future_done(): SingleTaskExecutor( cores=cores_per_worker, gpus_per_task=int(gpus_per_worker / cores_per_worker), init_function=init_function, diff --git a/pympipool/legacy/shared/backend.py b/pympipool/legacy/shared/backend.py index 854297ad..6ab70d36 100644 --- a/pympipool/legacy/shared/backend.py +++ b/pympipool/legacy/shared/backend.py @@ -1,6 +1,6 @@ from tqdm import tqdm -from pympipool.shared.backend import call_funct, _update_default_dict_from_arguments +from pympipool.shared.backend import call_funct, update_default_dict_from_arguments def map_funct(executor, funct, lst, chunksize=1, cores_per_task=1, map_flag=True): @@ -42,7 +42,7 @@ def parse_arguments(argument_lst): Returns: dict: dictionary with the parsed arguments and their corresponding values """ - return _update_default_dict_from_arguments( + return update_default_dict_from_arguments( argument_lst=argument_lst, argument_dict={ "total_cores": "--cores-total", diff --git a/pympipool/shared/backend.py b/pympipool/shared/backend.py index 9821e750..5c917df9 100644 --- a/pympipool/shared/backend.py +++ b/pympipool/shared/backend.py @@ -40,7 +40,7 @@ def parse_arguments(argument_lst): Returns: dict: dictionary with the parsed arguments and their corresponding values """ - return _update_default_dict_from_arguments( + return update_default_dict_from_arguments( argument_lst=argument_lst, argument_dict={ "zmqport": "--zmqport", @@ -50,7 +50,7 @@ def parse_arguments(argument_lst): ) -def _update_default_dict_from_arguments(argument_lst, argument_dict, default_dict): +def update_default_dict_from_arguments(argument_lst, argument_dict, default_dict): default_dict.update( { k: argument_lst[argument_lst.index(v) + 1] diff --git a/pympipool/shared/broker.py b/pympipool/shared/broker.py index bf7790d8..bb7d1fd0 100644 --- a/pympipool/shared/broker.py +++ b/pympipool/shared/broker.py @@ -48,7 +48,7 @@ def _execute_task_dict(task_dict, meta_future_lst): if "fn" in task_dict.keys(): meta_future = next(as_completed(meta_future_lst.keys())) executor = meta_future_lst.pop(meta_future) - executor._future_queue.put(task_dict) + executor.future_queue.put(task_dict) meta_future_lst[task_dict["future"]] = executor return True elif "shutdown" in task_dict.keys() and task_dict["shutdown"]: @@ -72,7 +72,7 @@ def _get_executor_list( queue_adapter_kwargs=None, ): return { - _get_future_done(): Executor( + get_future_done(): Executor( cores=cores_per_worker, gpus_per_task=int(gpus_per_worker / cores_per_worker), oversubscribe=oversubscribe, @@ -87,7 +87,7 @@ def _get_executor_list( } -def _get_future_done(): +def get_future_done(): f = Future() f.set_result(True) return f diff --git a/pympipool/shared/taskexecutor.py b/pympipool/shared/taskexecutor.py index 8ee7367d..4bffcd2b 100644 --- a/pympipool/shared/taskexecutor.py +++ b/pympipool/shared/taskexecutor.py @@ -88,10 +88,10 @@ def execute_parallel_tasks( queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs, ) - _execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) + execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) -def _execute_parallel_tasks_loop(interface, future_queue): +def execute_parallel_tasks_loop(interface, future_queue): while True: task_dict = future_queue.get() if "shutdown" in task_dict.keys() and task_dict["shutdown"]: From 170f028d36602305109402e5cd9e69c16ec55644 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 4 Aug 2023 11:35:48 -0600 Subject: [PATCH 3/4] fix test --- tests/test_meta.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_meta.py b/tests/test_meta.py index ef72853c..f9cb8714 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -4,7 +4,7 @@ from pympipool.shared.broker import ( executor_broker, _execute_task_dict, - _get_future_done, + get_future_done, _get_executor_list, ) @@ -24,7 +24,7 @@ def mpi_funct(i): class TestFutureCreation(unittest.TestCase): def test_get_future_done(self): - f = _get_future_done() + f = get_future_done() self.assertTrue(isinstance(f, Future)) self.assertTrue(f.done()) From 0102bf1524941db56ed8b49dc2c86446a9d95fdd Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 4 Aug 2023 11:37:01 -0600 Subject: [PATCH 4/4] More fixes --- pympipool/shared/broker.py | 4 ++-- tests/test_meta.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pympipool/shared/broker.py b/pympipool/shared/broker.py index bb7d1fd0..f2c01091 100644 --- a/pympipool/shared/broker.py +++ b/pympipool/shared/broker.py @@ -37,14 +37,14 @@ def executor_broker( except queue.Empty: sleep(sleep_interval) else: - if _execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): + if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): future_queue.task_done() else: future_queue.task_done() break -def _execute_task_dict(task_dict, meta_future_lst): +def execute_task_dict(task_dict, meta_future_lst): if "fn" in task_dict.keys(): meta_future = next(as_completed(meta_future_lst.keys())) executor = meta_future_lst.pop(meta_future) diff --git a/tests/test_meta.py b/tests/test_meta.py index f9cb8714..d4351a44 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -3,7 +3,7 @@ import unittest from pympipool.shared.broker import ( executor_broker, - _execute_task_dict, + execute_task_dict, get_future_done, _get_executor_list, ) @@ -43,13 +43,13 @@ def test_meta_executor_future(self): def test_execute_task_dict(self): meta_future_lst = _get_executor_list(max_workers=1) f = Future() - self.assertTrue(_execute_task_dict( + self.assertTrue(execute_task_dict( task_dict={"fn": calc, "args": (1,), "kwargs": {}, "future": f}, meta_future_lst=meta_future_lst )) self.assertEqual(f.result(), 1) self.assertTrue(f.done()) - self.assertFalse(_execute_task_dict( + self.assertFalse(execute_task_dict( task_dict={"shutdown": True, "wait": True}, meta_future_lst=meta_future_lst )) @@ -57,7 +57,7 @@ def test_execute_task_dict(self): def test_execute_task_dict_error(self): meta_future_lst = _get_executor_list(max_workers=1) with self.assertRaises(ValueError): - _execute_task_dict(task_dict={}, meta_future_lst=meta_future_lst) + execute_task_dict(task_dict={}, meta_future_lst=meta_future_lst) list(meta_future_lst.values())[0].shutdown(wait=True) def test_executor_broker(self):