From c0e8cb3183addd7a44eed693cd11597eca6c4100 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 5 May 2024 10:46:24 -0500 Subject: [PATCH 1/4] Remove ShellExecutor and SubprocessExecutor --- pympipool/shell/__init__.py | 2 - pympipool/shell/executor.py | 89 ---------------- pympipool/shell/interactive.py | 181 -------------------------------- tests/test_shell_executor.py | 86 --------------- tests/test_shell_interactive.py | 70 ------------ 5 files changed, 428 deletions(-) delete mode 100644 pympipool/shell/__init__.py delete mode 100644 pympipool/shell/executor.py delete mode 100644 pympipool/shell/interactive.py delete mode 100644 tests/test_shell_executor.py delete mode 100644 tests/test_shell_interactive.py diff --git a/pympipool/shell/__init__.py b/pympipool/shell/__init__.py deleted file mode 100644 index 3086c26f..00000000 --- a/pympipool/shell/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from pympipool.shell.executor import SubprocessExecutor -from pympipool.shell.interactive import ShellExecutor diff --git a/pympipool/shell/executor.py b/pympipool/shell/executor.py deleted file mode 100644 index 92e4803a..00000000 --- a/pympipool/shell/executor.py +++ /dev/null @@ -1,89 +0,0 @@ -import queue -from concurrent.futures import Future -import subprocess - -from pympipool.shared.executorbase import ExecutorBroker -from pympipool.shared.thread import RaisingThread - - -def execute_single_task(future_queue: queue.Queue): - """ - Process items received via the queue. - - Args: - future_queue (queue.Queue): - """ - while True: - task_dict = future_queue.get() - if "shutdown" in task_dict.keys() and task_dict["shutdown"]: - future_queue.task_done() - future_queue.join() - break - elif "future" in task_dict.keys(): - f = task_dict.pop("future") - if f.set_running_or_notify_cancel(): - try: - f.set_result( - subprocess.check_output( - *task_dict["args"], **task_dict["kwargs"] - ) - ) - except Exception as thread_exception: - future_queue.task_done() - f.set_exception(exception=thread_exception) - raise thread_exception - else: - future_queue.task_done() - else: - raise KeyError(task_dict) - - -class SubprocessExecutor(ExecutorBroker): - """ - The pympipool.shell.SubprocessExecutor enables the submission of command line calls via the subprocess.check_output() - interface of the python standard library. It is based on the concurrent.futures.Executor class and returns a - concurrent.futures.Future object for every submitted command line call. Still it does not provide any option to - interact with the external executable during the execution. - - Args: - max_workers (int): defines the number workers which can execute functions in parallel - - Examples: - - >>> from pympipool import SubprocessExecutor - >>> with SubprocessExecutor(max_workers=2) as exe: - >>> future = exe.submit(["echo", "test"], universal_newlines=True) - >>> print(future.done(), future.result(), future.done()) - (False, "test", True) - - """ - - def __init__( - self, - max_workers: int = 1, - ): - super().__init__() - self._set_process( - process=[ - RaisingThread( - target=execute_single_task, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - }, - ) - for _ in range(max_workers) - ], - ) - - def submit(self, *args, **kwargs): - """ - Submit a command line call to be executed. The given arguments are provided to subprocess.Popen() as additional - inputs to control the execution. - - Returns: - A Future representing the given call. - """ - f = Future() - self._future_queue.put({"future": f, "args": args, "kwargs": kwargs}) - return f diff --git a/pympipool/shell/interactive.py b/pympipool/shell/interactive.py deleted file mode 100644 index 1a3dc6c4..00000000 --- a/pympipool/shell/interactive.py +++ /dev/null @@ -1,181 +0,0 @@ -import queue -import threading -from typing import Optional -from concurrent.futures import Future -import subprocess -from time import sleep - -from pympipool.shared.executorbase import cancel_items_in_queue, ExecutorBase -from pympipool.shared.thread import RaisingThread - - -def wait_for_process_to_stop(process: threading.Thread, sleep_interval: float = 10e-10): - """ - Wait for the subprocess.Popen() process to stop executing - - Args: - process (subprocess.Popen): process object - sleep_interval (float): interval to sleep during poll() calls - """ - while process.poll() is None: - sleep(sleep_interval) - - -def execute_single_task(future_queue: queue.Queue): - """ - Process items received via the queue. - - Args: - future_queue (queue.Queue): - """ - process = None - while True: - task_dict = future_queue.get() - if "shutdown" in task_dict.keys() and task_dict["shutdown"]: - if process is not None and process.poll() is None: - process.stdin.flush() - process.stdin.close() - process.stdout.close() - process.stderr.close() - process.terminate() - wait_for_process_to_stop(process=process) - future_queue.task_done() - # future_queue.join() - break - elif "init" in task_dict.keys() and task_dict["init"]: - process = subprocess.Popen( - *task_dict["args"], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - **task_dict["kwargs"], - ) - elif "future" in task_dict.keys(): - if process is None: - raise ValueError("process not initialized") - elif process.poll() is None: - f = task_dict.pop("future") - if f.set_running_or_notify_cancel(): - try: - process.stdin.write(task_dict["input"]) - process.stdin.flush() - lines_count = 0 - output = "" - while True: - output_current = process.stdout.readline() - output += output_current - lines_count += 1 - if ( - task_dict["stop_read_pattern"] is not None - and task_dict["stop_read_pattern"] in output_current - ): - break - elif ( - task_dict["lines_to_read"] is not None - and task_dict["lines_to_read"] == lines_count - ): - break - f.set_result(output) - except Exception as thread_exception: - future_queue.task_done() - f.set_exception(exception=thread_exception) - raise thread_exception - else: - future_queue.task_done() - else: - raise ValueError("process exited") - - -class ShellExecutor(ExecutorBase): - """ - In contrast to the other pympipool.shell.SubprocessExecutor and the pympipool.Executor the pympipool.shell.ShellExecutor - can only execute a single process at a given time. Still it adds the capability to interact with this process during - its execution. The initialization of the pympipool.shell.ShellExecutor takes the same input arguments as the - subprocess.Popen() call for the standard library to start a subprocess. - - Examples - - >>> from pympipool import ShellExecutor - >>> with ShellExecutor(["python", "count.py"], universal_newlines=True) as exe: - >>> future_lines = exe.submit(string_input="4", lines_to_read=5) - >>> print(future_lines.done(), future_lines.result(), future_lines.done()) - (False, "0\n1\n2\n3\ndone\n", True) - - >>> from pympipool import ShellExecutor - >>> with ShellExecutor(["python", "count.py"], universal_newlines=True) as exe: - >>> future_pattern = exe.submit(string_input="4", stop_read_pattern="done") - >>> print(future_pattern.done(), future_pattern.result(), future_pattern.done()) - (False, "0\n1\n2\n3\ndone\n", True) - """ - - def __init__(self, *args, **kwargs): - super().__init__() - self._set_process( - process=RaisingThread( - target=execute_single_task, - kwargs={ - "future_queue": self._future_queue, - }, - ), - ) - self._future_queue.put({"init": True, "args": args, "kwargs": kwargs}) - - def submit( - self, - string_input: str, - lines_to_read: Optional[int] = None, - stop_read_pattern: Optional[str] = None, - ): - """ - Submit the input as a string to the executable. In addition to the input the ShellExecutor also needs a measure - to identify the completion of the execution. This can either be provided based on the number of lines to read - using the `lines_to_read` parameter or by providing a string pattern using the `stop_read_pattern` to stop - reading new lines. One of these two stopping criteria has to be defined. - - Args: - string_input (str): Input to be communicated to the underlying executable - lines_to_read (None/int): integer number of lines to read from the command line (optional) - stop_read_pattern (None/str): string pattern to indicate the command line output is completed (optional) - - Returns: - A Future representing the given call. - """ - if lines_to_read is None and stop_read_pattern is None: - raise ValueError( - "Either the number of lines_to_read (int) or the stop_read_pattern (str) has to be defined." - ) - if string_input[-1:] != "\n": - string_input += "\n" - f = Future() - self._future_queue.put( - { - "future": f, - "input": string_input, - "lines_to_read": lines_to_read, - "stop_read_pattern": stop_read_pattern, - } - ) - return f - - def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): - """Clean-up the resources associated with the Executor. - - It is safe to call this method several times. Otherwise, no other - methods can be called after this one. - - Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by the - parallel_executors have been reclaimed. - cancel_futures: If True then shutdown will cancel all pending - futures. Futures that are completed or running will not be - cancelled. - """ - if cancel_futures: - cancel_items_in_queue(que=self._future_queue) - self._future_queue.put({"shutdown": True, "wait": wait}) - if wait: - self._process.join() - # self._future_queue.join() - self._process = None - self._future_queue = None diff --git a/tests/test_shell_executor.py b/tests/test_shell_executor.py deleted file mode 100644 index d2b8c255..00000000 --- a/tests/test_shell_executor.py +++ /dev/null @@ -1,86 +0,0 @@ -from concurrent.futures import Future -import queue -import unittest - -from pympipool.shell.executor import SubprocessExecutor, execute_single_task - - -class SubprocessExecutorTest(unittest.TestCase): - def test_execute_single_task(self): - test_queue = queue.Queue() - f = Future() - test_queue.put( - { - "future": f, - "args": [["echo", "test"]], - "kwargs": {"universal_newlines": True}, - } - ) - test_queue.put({"shutdown": True}) - self.assertFalse(f.done()) - execute_single_task(future_queue=test_queue) - self.assertTrue(f.done()) - self.assertEqual("test\n", f.result()) - - def test_wrong_error(self): - test_queue = queue.Queue() - test_queue.put({"wrong_key": True}) - with self.assertRaises(KeyError): - execute_single_task(future_queue=test_queue) - - def test_broken_executable(self): - test_queue = queue.Queue() - f = Future() - test_queue.put( - { - "future": f, - "args": [["/executable/does/not/exist"]], - "kwargs": {"universal_newlines": True}, - } - ) - with self.assertRaises(FileNotFoundError): - execute_single_task(future_queue=test_queue) - - def test_shell_static_executor_args(self): - with SubprocessExecutor(max_workers=1) as exe: - future = exe.submit(["echo", "test"], universal_newlines=True, shell=False) - self.assertFalse(future.done()) - self.assertEqual("test\n", future.result()) - self.assertTrue(future.done()) - - def test_shell_static_executor_binary(self): - with SubprocessExecutor(max_workers=1) as exe: - future = exe.submit(["echo", "test"], universal_newlines=False, shell=False) - self.assertFalse(future.done()) - self.assertEqual(b"test\n", future.result()) - self.assertTrue(future.done()) - - def test_shell_static_executor_shell(self): - with SubprocessExecutor(max_workers=1) as exe: - future = exe.submit("echo test", universal_newlines=True, shell=True) - self.assertFalse(future.done()) - self.assertEqual("test\n", future.result()) - self.assertTrue(future.done()) - - def test_shell_executor(self): - with SubprocessExecutor(max_workers=2) as exe: - f_1 = exe.submit(["echo", "test_1"], universal_newlines=True) - f_2 = exe.submit(["echo", "test_2"], universal_newlines=True) - f_3 = exe.submit(["echo", "test_3"], universal_newlines=True) - f_4 = exe.submit(["echo", "test_4"], universal_newlines=True) - self.assertFalse(f_1.done()) - self.assertFalse(f_2.done()) - self.assertFalse(f_3.done()) - self.assertFalse(f_4.done()) - self.assertEqual("test_1\n", f_1.result()) - self.assertEqual("test_2\n", f_2.result()) - self.assertTrue(f_1.done()) - self.assertTrue(f_2.done()) - self.assertFalse(f_3.done()) - self.assertFalse(f_4.done()) - self.assertEqual("test_3\n", f_3.result()) - self.assertEqual("test_4\n", f_4.result()) - self.assertTrue(f_1.done()) - self.assertTrue(f_2.done()) - self.assertTrue(f_3.done()) - self.assertTrue(f_4.done()) diff --git a/tests/test_shell_interactive.py b/tests/test_shell_interactive.py deleted file mode 100644 index 7eee1ba0..00000000 --- a/tests/test_shell_interactive.py +++ /dev/null @@ -1,70 +0,0 @@ -from concurrent.futures import Future -import os -import queue -import unittest - -from pympipool.shell.interactive import ShellExecutor, execute_single_task - - -class ShellInteractiveExecutorTest(unittest.TestCase): - def setUp(self): - self.executable_path = os.path.join( - os.path.dirname(__file__), "executables", "count.py" - ) - - def test_execute_single_task(self): - test_queue = queue.Queue() - future_lines = Future() - future_pattern = Future() - test_queue.put( - { - "init": True, - "args": [["python", self.executable_path]], - "kwargs": {"universal_newlines": True}, - } - ) - test_queue.put( - { - "future": future_lines, - "input": "4\n", - "lines_to_read": 5, - "stop_read_pattern": None, - } - ) - test_queue.put( - { - "future": future_pattern, - "input": "4\n", - "lines_to_read": None, - "stop_read_pattern": "done", - } - ) - test_queue.put({"shutdown": True}) - self.assertFalse(future_lines.done()) - self.assertFalse(future_pattern.done()) - execute_single_task(future_queue=test_queue) - self.assertTrue(future_lines.done()) - self.assertTrue(future_pattern.done()) - self.assertEqual("0\n1\n2\n3\ndone\n", future_lines.result()) - self.assertEqual("0\n1\n2\n3\ndone\n", future_pattern.result()) - - def test_shell_interactive_executor(self): - with ShellExecutor( - ["python", self.executable_path], universal_newlines=True - ) as exe: - future_lines = exe.submit( - string_input="4", lines_to_read=5, stop_read_pattern=None - ) - future_pattern = exe.submit( - string_input="4", lines_to_read=None, stop_read_pattern="done" - ) - self.assertFalse(future_lines.done()) - self.assertFalse(future_pattern.done()) - self.assertEqual("0\n1\n2\n3\ndone\n", future_lines.result()) - self.assertEqual("0\n1\n2\n3\ndone\n", future_pattern.result()) - self.assertTrue(future_lines.done()) - self.assertTrue(future_pattern.done()) - - def test_meta(self): - with ShellExecutor(["sleep"]) as exe: - self.assertEqual(exe.info, {}) From c7de6064abbec999472b650eaf0051314969ef36 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 5 May 2024 12:59:22 -0500 Subject: [PATCH 2/4] Depend on executorlib_core --- .ci_support/environment-docs.yml | 1 + .ci_support/environment-mpich.yml | 1 + .ci_support/environment-old.yml | 1 + .ci_support/environment-openmpi.yml | 1 + .ci_support/environment-win.yml | 1 + binder/environment.yml | 1 + pympipool/backend/mpiexec.py | 2 +- pympipool/backend/serial.py | 2 +- pympipool/scheduler/flux.py | 4 +- pympipool/scheduler/mpi.py | 4 +- pympipool/scheduler/slurm.py | 6 +- pympipool/shared/__init__.py | 10 -- pympipool/shared/communication.py | 179 --------------------------- pympipool/shared/dependencies.py | 3 +- pympipool/shared/executorbase.py | 184 +--------------------------- pympipool/shared/inputcheck.py | 15 --- pyproject.toml | 1 + tests/test_shared_communication.py | 20 +-- tests/test_shared_input_check.py | 15 +-- tests/test_shared_thread.py | 15 --- 20 files changed, 28 insertions(+), 438 deletions(-) delete mode 100644 pympipool/shared/communication.py delete mode 100644 tests/test_shared_thread.py diff --git a/.ci_support/environment-docs.yml b/.ci_support/environment-docs.yml index b07fcece..c83c350c 100644 --- a/.ci_support/environment-docs.yml +++ b/.ci_support/environment-docs.yml @@ -8,6 +8,7 @@ dependencies: - numpy - openmpi - cloudpickle =3.0.0 +- executorlib-core =0.0.1 - mpi4py =3.1.6 - tqdm =4.66.2 - pyzmq =26.0.2 diff --git a/.ci_support/environment-mpich.yml b/.ci_support/environment-mpich.yml index 202a3b64..0fd232a6 100644 --- a/.ci_support/environment-mpich.yml +++ b/.ci_support/environment-mpich.yml @@ -5,6 +5,7 @@ dependencies: - numpy - mpich - cloudpickle =3.0.0 +- executorlib-core =0.0.1 - mpi4py =3.1.6 - tqdm =4.66.2 - pyzmq =26.0.2 diff --git a/.ci_support/environment-old.yml b/.ci_support/environment-old.yml index 161f7ae1..99f442a5 100644 --- a/.ci_support/environment-old.yml +++ b/.ci_support/environment-old.yml @@ -5,6 +5,7 @@ dependencies: - numpy - openmpi =4.1.4 - cloudpickle =2.0.0 +- executorlib-core =0.0.1 - mpi4py =3.1.4 - tqdm =4.44.0 - pyzmq =25.0.0 \ No newline at end of file diff --git a/.ci_support/environment-openmpi.yml b/.ci_support/environment-openmpi.yml index 3214294c..f8cab2de 100644 --- a/.ci_support/environment-openmpi.yml +++ b/.ci_support/environment-openmpi.yml @@ -5,6 +5,7 @@ dependencies: - numpy - openmpi - cloudpickle =3.0.0 +- executorlib-core =0.0.1 - mpi4py =3.1.6 - tqdm =4.66.2 - pyzmq =26.0.2 diff --git a/.ci_support/environment-win.yml b/.ci_support/environment-win.yml index 7551e67a..cde0a6cb 100644 --- a/.ci_support/environment-win.yml +++ b/.ci_support/environment-win.yml @@ -5,6 +5,7 @@ dependencies: - numpy - msmpi - cloudpickle =3.0.0 +- executorlib-core =0.0.1 - mpi4py =3.1.6 - tqdm =4.66.2 - pyzmq =26.0.2 diff --git a/binder/environment.yml b/binder/environment.yml index c36df88c..0cd72958 100644 --- a/binder/environment.yml +++ b/binder/environment.yml @@ -5,6 +5,7 @@ dependencies: - numpy - openmpi - cloudpickle =3.0.0 +- executorlib-core =0.0.1 - mpi4py =3.1.6 - tqdm =4.66.2 - pyzmq =26.0.0 diff --git a/pympipool/backend/mpiexec.py b/pympipool/backend/mpiexec.py index 44217cf0..b5fe1373 100644 --- a/pympipool/backend/mpiexec.py +++ b/pympipool/backend/mpiexec.py @@ -4,7 +4,7 @@ import cloudpickle -from pympipool.shared.communication import ( +from executorlib_core.communication import ( interface_connect, interface_send, interface_shutdown, diff --git a/pympipool/backend/serial.py b/pympipool/backend/serial.py index e6398fdb..e3c8a683 100644 --- a/pympipool/backend/serial.py +++ b/pympipool/backend/serial.py @@ -2,7 +2,7 @@ import sys from typing import Optional -from pympipool.shared.communication import ( +from executorlib_core.communication import ( interface_connect, interface_send, interface_shutdown, diff --git a/pympipool/scheduler/flux.py b/pympipool/scheduler/flux.py index 32389de4..3438e192 100644 --- a/pympipool/scheduler/flux.py +++ b/pympipool/scheduler/flux.py @@ -1,16 +1,16 @@ import os from typing import Optional +from executorlib_core.base import ExecutorBroker +from executorlib_core.thread import RaisingThread import flux.job from pympipool.shared.executorbase import ( execute_parallel_tasks, execute_separate_tasks, - ExecutorBroker, ExecutorSteps, ) from pympipool.shared.interface import BaseInterface -from pympipool.shared.thread import RaisingThread class PyFluxExecutor(ExecutorBroker): diff --git a/pympipool/scheduler/mpi.py b/pympipool/scheduler/mpi.py index 0cbec602..45e2eb6c 100644 --- a/pympipool/scheduler/mpi.py +++ b/pympipool/scheduler/mpi.py @@ -1,13 +1,13 @@ from typing import Optional +from executorlib_core.base import ExecutorBroker +from executorlib_core.thread import RaisingThread from pympipool.shared.executorbase import ( execute_parallel_tasks, execute_separate_tasks, - ExecutorBroker, ExecutorSteps, ) from pympipool.shared.interface import MpiExecInterface -from pympipool.shared.thread import RaisingThread class PyMPIExecutor(ExecutorBroker): diff --git a/pympipool/scheduler/slurm.py b/pympipool/scheduler/slurm.py index 4cddacea..f8b93f13 100644 --- a/pympipool/scheduler/slurm.py +++ b/pympipool/scheduler/slurm.py @@ -1,12 +1,14 @@ from typing import Optional + +from executorlib_core.base import ExecutorBroker +from executorlib_core.thread import RaisingThread + from pympipool.shared.executorbase import ( execute_parallel_tasks, execute_separate_tasks, - ExecutorBroker, ExecutorSteps, ) from pympipool.shared.interface import SrunInterface -from pympipool.shared.thread import RaisingThread class PySlurmExecutor(ExecutorBroker): diff --git a/pympipool/shared/__init__.py b/pympipool/shared/__init__.py index 2daf2f44..424dc731 100644 --- a/pympipool/shared/__init__.py +++ b/pympipool/shared/__init__.py @@ -1,11 +1 @@ -from pympipool.shared.communication import ( - SocketInterface, - interface_bootup, - interface_connect, - interface_send, - interface_shutdown, - interface_receive, -) -from pympipool.shared.executorbase import cancel_items_in_queue -from pympipool.shared.thread import RaisingThread from pympipool.shared.interface import MpiExecInterface, SrunInterface diff --git a/pympipool/shared/communication.py b/pympipool/shared/communication.py deleted file mode 100644 index 6d15a325..00000000 --- a/pympipool/shared/communication.py +++ /dev/null @@ -1,179 +0,0 @@ -from socket import gethostname - -import cloudpickle -import zmq - - -class SocketInterface(object): - """ - The SocketInterface is an abstraction layer on top of the zero message queue. - - Args: - interface (pympipool.shared.interface.BaseInterface): Interface for starting the parallel process - """ - - def __init__(self, interface=None): - self._context = zmq.Context() - self._socket = self._context.socket(zmq.PAIR) - self._process = None - self._interface = interface - - def send_dict(self, input_dict: dict): - """ - Send a dictionary with instructions to a connected client process. - - Args: - input_dict (dict): dictionary of commands to be communicated. The key "shutdown" is reserved to stop the - connected client from listening. - """ - self._socket.send(cloudpickle.dumps(input_dict)) - - def receive_dict(self): - """ - Receive a dictionary from a connected client process. - - Returns: - dict: dictionary with response received from the connected client - """ - output = cloudpickle.loads(self._socket.recv()) - if "result" in output.keys(): - return output["result"] - else: - error_type = output["error_type"].split("'")[1] - raise eval(error_type)(output["error"]) - - def send_and_receive_dict(self, input_dict: dict) -> dict: - """ - Combine both the send_dict() and receive_dict() function in a single call. - - Args: - input_dict (dict): dictionary of commands to be communicated. The key "shutdown" is reserved to stop the - connected client from listening. - - Returns: - dict: dictionary with response received from the connected client - """ - self.send_dict(input_dict=input_dict) - return self.receive_dict() - - def bind_to_random_port(self): - """ - Identify a random port typically in the range from 49152 to 65536 to bind the SocketInterface instance to. Other - processes can then connect to this port to receive instructions and send results. - - Returns: - int: port the SocketInterface instance is bound to. - """ - return self._socket.bind_to_random_port("tcp://*") - - def bootup(self, command_lst: list[str]): - """ - Boot up the client process to connect to the SocketInterface. - - Args: - command_lst (list): list of strings to start the client process - """ - self._interface.bootup(command_lst=command_lst) - - def shutdown(self, wait: bool = True): - result = None - if self._interface.poll(): - result = self.send_and_receive_dict( - input_dict={"shutdown": True, "wait": wait} - ) - self._interface.shutdown(wait=wait) - if self._socket is not None: - self._socket.close() - if self._context is not None: - self._context.term() - self._process = None - self._socket = None - self._context = None - return result - - def __del__(self): - self.shutdown(wait=True) - - -def interface_bootup( - command_lst: list[str], - connections, - hostname_localhost: bool = False, -): - """ - Start interface for ZMQ communication - - Args: - command_lst (list): List of commands as strings - connections (pympipool.shared.interface.BaseInterface): Interface to start parallel process, like MPI, SLURM or - Flux - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true - - Returns: - pympipool.shared.communication.SocketInterface: socket interface for zmq communication - """ - if not hostname_localhost: - command_lst += [ - "--host", - gethostname(), - ] - interface = SocketInterface(interface=connections) - command_lst += [ - "--zmqport", - str(interface.bind_to_random_port()), - ] - interface.bootup(command_lst=command_lst) - return interface - - -def interface_connect(host: str, port: str): - """ - Connect to an existing SocketInterface instance by providing the hostname and the port as strings. - - Args: - host (str): hostname of the host running the SocketInterface instance to connect to. - port (str): port on the host the SocketInterface instance is running on. - """ - context = zmq.Context() - socket = context.socket(zmq.PAIR) - socket.connect("tcp://" + host + ":" + port) - return context, socket - - -def interface_send(socket: zmq.Socket, result_dict: dict): - """ - Send results to a SocketInterface instance. - - Args: - socket (zmq.Socket): socket for the connection - result_dict (dict): dictionary to be sent, supported keys are result, error and error_type. - """ - socket.send(cloudpickle.dumps(result_dict)) - - -def interface_receive(socket: zmq.Socket): - """ - Receive instructions from a SocketInterface instance. - - Args: - socket (zmq.Socket): socket for the connection - """ - return cloudpickle.loads(socket.recv()) - - -def interface_shutdown(socket: zmq.Socket, context: zmq.Context): - """ - Close the connection to a SocketInterface instance. - - Args: - socket (zmq.Socket): socket for the connection - context (zmq.sugar.context.Context): context for the connection - """ - socket.close() - context.term() diff --git a/pympipool/shared/dependencies.py b/pympipool/shared/dependencies.py index d561902a..30cfcdf3 100644 --- a/pympipool/shared/dependencies.py +++ b/pympipool/shared/dependencies.py @@ -1,6 +1,7 @@ +from executorlib_core.thread import RaisingThread + from pympipool.scheduler import create_executor from pympipool.shared.executorbase import ExecutorSteps, execute_tasks_with_dependencies -from pympipool.shared.thread import RaisingThread class ExecutorWithDependencies(ExecutorSteps): diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index d95916c5..0be57298 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -1,151 +1,19 @@ -from concurrent.futures import ( - Executor as FutureExecutor, - Future, -) -import inspect +from concurrent.futures import Future import os import queue import sys from time import sleep from typing import Optional, List -import cloudpickle - -from pympipool.shared.communication import interface_bootup -from pympipool.shared.thread import RaisingThread -from pympipool.shared.interface import BaseInterface -from pympipool.shared.inputcheck import ( +from executorlib_core.communication import interface_bootup +from executorlib_core.thread import RaisingThread +from executorlib_core.inputcheck import ( check_resource_dict, check_resource_dict_is_empty, ) +from executorlib_core.base import ExecutorBase - -class ExecutorBase(FutureExecutor): - def __init__(self): - cloudpickle_register(ind=3) - self._future_queue = queue.Queue() - self._process = None - - @property - def info(self): - if self._process is not None and isinstance(self._process, list): - meta_data_dict = self._process[0]._kwargs.copy() - if "future_queue" in meta_data_dict.keys(): - del meta_data_dict["future_queue"] - meta_data_dict["max_workers"] = len(self._process) - return meta_data_dict - elif self._process is not None: - meta_data_dict = self._process._kwargs.copy() - if "future_queue" in meta_data_dict.keys(): - del meta_data_dict["future_queue"] - return meta_data_dict - else: - return None - - @property - def future_queue(self): - return self._future_queue - - def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): - """ - Submits a callable to be executed with the given arguments. - - Schedules the callable to be executed as fn(*args, **kwargs) and returns - a Future instance representing the execution of the callable. - - Args: - fn (callable): function to submit for execution - args: arguments for the submitted function - kwargs: keyword arguments for the submitted function - resource_dict (dict): resource dictionary, which defines the resources used for the execution of the - function. Example resource dictionary: { - cores: 1, - threads_per_core: 1, - gpus_per_worker: 0, - oversubscribe: False, - cwd: None, - executor: None, - hostname_localhost: False, - } - - Returns: - A Future representing the given call. - """ - check_resource_dict_is_empty(resource_dict=resource_dict) - check_resource_dict(function=fn) - f = Future() - self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) - return f - - def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): - """ - Clean-up the resources associated with the Executor. - - It is safe to call this method several times. Otherwise, no other - methods can be called after this one. - - Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by the - parallel_executors have been reclaimed. - cancel_futures: If True then shutdown will cancel all pending - futures. Futures that are completed or running will not be - cancelled. - """ - if cancel_futures: - cancel_items_in_queue(que=self._future_queue) - self._future_queue.put({"shutdown": True, "wait": wait}) - if wait and self._process is not None: - self._process.join() - self._future_queue.join() - self._process = None - self._future_queue = None - - def _set_process(self, process: RaisingThread): - self._process = process - self._process.start() - - def __len__(self): - return self._future_queue.qsize() - - def __del__(self): - try: - self.shutdown(wait=False) - except (AttributeError, RuntimeError): - pass - - -class ExecutorBroker(ExecutorBase): - def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): - """Clean-up the resources associated with the Executor. - - It is safe to call this method several times. Otherwise, no other - methods can be called after this one. - - Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by the - parallel_executors have been reclaimed. - cancel_futures: If True then shutdown will cancel all pending - futures. Futures that are completed or running will not be - cancelled. - """ - if cancel_futures: - cancel_items_in_queue(que=self._future_queue) - if self._process is not None: - for _ in range(len(self._process)): - self._future_queue.put({"shutdown": True, "wait": wait}) - if wait: - for process in self._process: - process.join() - self._future_queue.join() - self._process = None - self._future_queue = None - - def _set_process(self, process: List[RaisingThread]): - self._process = process - for process in self._process: - process.start() +from pympipool.shared.interface import BaseInterface class ExecutorSteps(ExecutorBase): @@ -212,46 +80,6 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): self._future_queue = None -def cancel_items_in_queue(que: queue.Queue): - """ - Cancel items which are still waiting in the queue. If the executor is busy tasks remain in the queue, so the future - objects have to be cancelled when the executor shuts down. - - Args: - que (queue.Queue): Queue with task objects which should be executed - """ - while True: - try: - item = que.get_nowait() - if isinstance(item, dict) and "future" in item.keys(): - item["future"].cancel() - que.task_done() - except queue.Empty: - break - - -def cloudpickle_register(ind: int = 2): - """ - Cloudpickle can either pickle by value or pickle by reference. The functions which are communicated have to - be pickled by value rather than by reference, so the module which calls the map function is pickled by value. - https://github.com/cloudpipe/cloudpickle#overriding-pickles-serialization-mechanism-for-importable-constructs - inspect can help to find the module which is calling pympipool - https://docs.python.org/3/library/inspect.html - to learn more about inspect another good read is: - http://pymotw.com/2/inspect/index.html#module-inspect - 1 refers to 1 level higher than the map function - - Args: - ind (int): index of the level at which pickle by value starts while for the rest pickle by reference is used - """ - try: # When executed in a jupyter notebook this can cause a ValueError - in this case we just ignore it. - cloudpickle.register_pickle_by_value(inspect.getmodule(inspect.stack()[ind][0])) - except IndexError: - cloudpickle_register(ind=ind - 1) - except ValueError: - pass - - def execute_parallel_tasks( future_queue: queue.Queue, cores: int, diff --git a/pympipool/shared/inputcheck.py b/pympipool/shared/inputcheck.py index 69e6f87e..834241c0 100644 --- a/pympipool/shared/inputcheck.py +++ b/pympipool/shared/inputcheck.py @@ -43,21 +43,6 @@ def check_executor(executor): ) -def check_resource_dict(function): - if "resource_dict" in inspect.signature(function).parameters.keys(): - raise ValueError( - "The parameter resource_dict is used internally in pympipool, " - "so it cannot be used as parameter in the submitted functions." - ) - - -def check_resource_dict_is_empty(resource_dict): - if len(resource_dict) > 0: - raise ValueError( - "When block_allocation is enabled, the resource requirements have to be defined on the executor level." - ) - - def check_refresh_rate(refresh_rate): if refresh_rate != 0.01: raise ValueError( diff --git a/pyproject.toml b/pyproject.toml index ddc04349..b6d19001 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ classifiers = [ ] dependencies = [ "cloudpickle==3.0.0", + "executorlib-core==0.0.1", "mpi4py==3.1.6", "pyzmq==26.0.2", "tqdm==4.66.2", diff --git a/tests/test_shared_communication.py b/tests/test_shared_communication.py index 940b4ee9..ae3c38d6 100644 --- a/tests/test_shared_communication.py +++ b/tests/test_shared_communication.py @@ -3,16 +3,15 @@ import unittest import numpy as np -import zmq -from pympipool.shared.communication import ( +from executorlib_core.communication import ( interface_connect, interface_shutdown, interface_send, interface_receive, SocketInterface, ) -from pympipool.shared.executorbase import cloudpickle_register +from executorlib_core.base import cloudpickle_register from pympipool.shared.interface import MpiExecInterface @@ -43,18 +42,3 @@ def test_interface(self): interface.send_and_receive_dict(input_dict=task_dict), np.array(4) ) interface.shutdown(wait=True) - - -class TestZMQ(unittest.TestCase): - def test_initialize_zmq(self): - message = "test" - host = "localhost" - - context_server = zmq.Context() - socket_server = context_server.socket(zmq.PAIR) - port = str(socket_server.bind_to_random_port("tcp://*")) - context_client, socket_client = interface_connect(host=host, port=port) - interface_send(socket=socket_server, result_dict={"message": message}) - self.assertEqual(interface_receive(socket=socket_client), {"message": message}) - interface_shutdown(socket=socket_client, context=context_client) - interface_shutdown(socket=socket_server, context=context_server) diff --git a/tests/test_shared_input_check.py b/tests/test_shared_input_check.py index bf1076f2..931e2b97 100644 --- a/tests/test_shared_input_check.py +++ b/tests/test_shared_input_check.py @@ -9,8 +9,6 @@ check_backend, check_init_function, check_refresh_rate, - check_resource_dict, - check_resource_dict_is_empty, ) @@ -45,15 +43,4 @@ def test_check_init_function(self): def test_check_refresh_rate(self): with self.assertRaises(ValueError): - check_refresh_rate(refresh_rate=1) - - def test_check_resource_dict(self): - def simple_function(resource_dict): - return resource_dict - - with self.assertRaises(ValueError): - check_resource_dict(function=simple_function) - - def test_check_resource_dict_is_empty(self): - with self.assertRaises(ValueError): - check_resource_dict_is_empty(resource_dict={"a": 1}) + check_refresh_rate(refresh_rate=1) \ No newline at end of file diff --git a/tests/test_shared_thread.py b/tests/test_shared_thread.py deleted file mode 100644 index 5f58fda2..00000000 --- a/tests/test_shared_thread.py +++ /dev/null @@ -1,15 +0,0 @@ -import unittest - -from pympipool.shared.thread import RaisingThread - - -def raise_error(): - raise ValueError - - -class TestRaisingThread(unittest.TestCase): - def test_raising_thread(self): - with self.assertRaises(ValueError): - process = RaisingThread(target=raise_error) - process.start() - process.join() From bcbfe83ffff75e0038bad350e52f6f2070efd0e1 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 5 May 2024 13:01:00 -0500 Subject: [PATCH 3/4] Remove shellexecutor and subprocessexecutor from init --- pympipool/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pympipool/__init__.py b/pympipool/__init__.py index f6e0ee3d..9c95d7d4 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -1,8 +1,6 @@ from typing import Optional from ._version import get_versions from pympipool.scheduler import create_executor -from pympipool.shell.executor import SubprocessExecutor -from pympipool.shell.interactive import ShellExecutor from pympipool.shared.dependencies import ExecutorWithDependencies from pympipool.shared.inputcheck import check_refresh_rate as _check_refresh_rate From e7e37db8b8d26c78b3fe431b5a07679610234a8f Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 5 May 2024 13:04:30 -0500 Subject: [PATCH 4/4] fix tests --- tests/test_dependencies_executor.py | 2 +- tests/test_executor_backend_mpi.py | 2 +- tests/test_executor_backend_mpi_noblock.py | 2 +- tests/test_flux_executor.py | 3 ++- tests/test_integration_pyiron_workflow.py | 2 +- tests/test_mpi_executor.py | 10 +++++----- tests/test_shared_executorbase.py | 23 ---------------------- 7 files changed, 11 insertions(+), 33 deletions(-) delete mode 100644 tests/test_shared_executorbase.py diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index de35d333..ae5644ba 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -2,7 +2,7 @@ from time import sleep from pympipool import Executor -from pympipool.shared.executorbase import cloudpickle_register +from executorlib_core.base import cloudpickle_register def add_function(parameter_1, parameter_2): diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index 27798b5c..92186494 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -1,7 +1,7 @@ import unittest from pympipool import Executor -from pympipool.shared.executorbase import cloudpickle_register +from executorlib_core.base import cloudpickle_register def calc(i): diff --git a/tests/test_executor_backend_mpi_noblock.py b/tests/test_executor_backend_mpi_noblock.py index 246d3983..ea9c88fd 100644 --- a/tests/test_executor_backend_mpi_noblock.py +++ b/tests/test_executor_backend_mpi_noblock.py @@ -1,7 +1,7 @@ import unittest from pympipool import Executor -from pympipool.shared.executorbase import cloudpickle_register +from executorlib_core.base import cloudpickle_register def calc(i): diff --git a/tests/test_flux_executor.py b/tests/test_flux_executor.py index 2861ebdc..9a109d9e 100644 --- a/tests/test_flux_executor.py +++ b/tests/test_flux_executor.py @@ -4,8 +4,9 @@ import unittest import numpy as np +from executorlib_core.base import cloudpickle_register -from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks +from pympipool.shared.executorbase import execute_parallel_tasks try: diff --git a/tests/test_integration_pyiron_workflow.py b/tests/test_integration_pyiron_workflow.py index e9d101f2..77f151e1 100644 --- a/tests/test_integration_pyiron_workflow.py +++ b/tests/test_integration_pyiron_workflow.py @@ -12,7 +12,7 @@ import unittest from pympipool import Executor -from pympipool.shared.executorbase import cloudpickle_register +from executorlib_core.base import cloudpickle_register class Foo: diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index 35b26431..07c3438f 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -4,15 +4,15 @@ import unittest import numpy as np - -from pympipool.scheduler.mpi import PyMPIExecutor, PyMPIStepExecutor, MpiExecInterface -from pympipool.shared.backend import call_funct -from pympipool.shared.executorbase import ( +from executorlib_core.base import ( cloudpickle_register, - execute_parallel_tasks, ExecutorBase, ) +from pympipool.scheduler.mpi import PyMPIExecutor, PyMPIStepExecutor, MpiExecInterface +from pympipool.shared.backend import call_funct +from pympipool.shared.executorbase import execute_parallel_tasks + def calc(i): return i diff --git a/tests/test_shared_executorbase.py b/tests/test_shared_executorbase.py deleted file mode 100644 index 348fe59e..00000000 --- a/tests/test_shared_executorbase.py +++ /dev/null @@ -1,23 +0,0 @@ -from concurrent.futures import Future, CancelledError -from queue import Queue -import unittest - -from pympipool.shared.executorbase import cancel_items_in_queue - - -class TestQueue(unittest.TestCase): - def test_cancel_items_in_queue(self): - q = Queue() - fs1 = Future() - fs2 = Future() - q.put({"future": fs1}) - q.put({"future": fs2}) - cancel_items_in_queue(que=q) - self.assertEqual(q.qsize(), 0) - self.assertTrue(fs1.done()) - with self.assertRaises(CancelledError): - self.assertTrue(fs1.result()) - self.assertTrue(fs2.done()) - with self.assertRaises(CancelledError): - self.assertTrue(fs2.result()) - q.join()