From 3907f08eefd8abe5af5dd10bb950dd8245095778 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 18 Feb 2024 11:12:44 +0100 Subject: [PATCH] Implement a Executor._set_process() function to set the process --- pympipool/flux/executor.py | 66 ++++++++++++++++---------------- pympipool/mpi/executor.py | 58 ++++++++++++++-------------- pympipool/shared/executorbase.py | 4 ++ pympipool/shell/executor.py | 32 ++++++++-------- pympipool/shell/interactive.py | 13 ++++--- pympipool/slurm/executor.py | 66 ++++++++++++++++---------------- 6 files changed, 126 insertions(+), 113 deletions(-) diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index 31a724ed..e9c27b6f 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -68,24 +68,25 @@ def __init__( hostname_localhost=False, ): super().__init__() - self._process = RaisingThread( - target=executor_broker, - kwargs={ - # Broker Arguments - "future_queue": self._future_queue, - "max_workers": max_workers, - "hostname_localhost": hostname_localhost, - "executor_class": PyFluxSingleTaskExecutor, - # Executor Arguments - "cores": cores_per_worker, - "threads_per_core": threads_per_core, - "gpus_per_task": int(gpus_per_worker / cores_per_worker), - "init_function": init_function, - "cwd": cwd, - "executor": executor, - }, + self._set_process( + process=RaisingThread( + target=executor_broker, + kwargs={ + # Broker Arguments + "future_queue": self._future_queue, + "max_workers": max_workers, + "hostname_localhost": hostname_localhost, + "executor_class": PyFluxSingleTaskExecutor, + # Executor Arguments + "cores": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), + "init_function": init_function, + "cwd": cwd, + "executor": executor, + }, + ) ) - self._process.start() class PyFluxSingleTaskExecutor(ExecutorBase): @@ -120,22 +121,23 @@ def __init__( hostname_localhost=False, ): super().__init__() - self._process = RaisingThread( - target=execute_parallel_tasks, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - "cores": cores, - "interface_class": FluxPythonInterface, - "hostname_localhost": hostname_localhost, - # Interface Arguments - "threads_per_core": threads_per_core, - "gpus_per_core": gpus_per_task, - "cwd": cwd, - "executor": executor, - }, + self._set_process( + process=RaisingThread( + target=execute_parallel_tasks, + kwargs={ + # Executor Arguments + "future_queue": self._future_queue, + "cores": cores, + "interface_class": FluxPythonInterface, + "hostname_localhost": hostname_localhost, + # Interface Arguments + "threads_per_core": threads_per_core, + "gpus_per_core": gpus_per_task, + "cwd": cwd, + "executor": executor, + }, + ) ) - self._process.start() self._set_init_function(init_function=init_function) cloudpickle_register(ind=3) diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index eefa2431..aef34a9e 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -61,22 +61,23 @@ def __init__( hostname_localhost=False, ): super().__init__() - self._process = RaisingThread( - target=executor_broker, - kwargs={ - # Broker Arguments - "future_queue": self._future_queue, - "max_workers": max_workers, - "executor_class": PyMPISingleTaskExecutor, - "hostname_localhost": hostname_localhost, - # Executor Arguments - "cores": cores_per_worker, - "oversubscribe": oversubscribe, - "init_function": init_function, - "cwd": cwd, - }, + self._set_process( + process=RaisingThread( + target=executor_broker, + kwargs={ + # Broker Arguments + "future_queue": self._future_queue, + "max_workers": max_workers, + "executor_class": PyMPISingleTaskExecutor, + "hostname_localhost": hostname_localhost, + # Executor Arguments + "cores": cores_per_worker, + "oversubscribe": oversubscribe, + "init_function": init_function, + "cwd": cwd, + }, + ), ) - self._process.start() class PyMPISingleTaskExecutor(ExecutorBase): @@ -107,19 +108,20 @@ def __init__( hostname_localhost=False, ): super().__init__() - self._process = RaisingThread( - target=execute_parallel_tasks, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - "cores": cores, - "interface_class": MpiExecInterface, - # Interface Arguments - "cwd": cwd, - "oversubscribe": oversubscribe, - "hostname_localhost": hostname_localhost, - }, + self._set_process( + process=RaisingThread( + target=execute_parallel_tasks, + kwargs={ + # Executor Arguments + "future_queue": self._future_queue, + "cores": cores, + "interface_class": MpiExecInterface, + # Interface Arguments + "cwd": cwd, + "oversubscribe": oversubscribe, + "hostname_localhost": hostname_localhost, + }, + ) ) - self._process.start() self._set_init_function(init_function=init_function) cloudpickle_register(ind=3) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index b0bc3cd7..cf774d0b 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -73,6 +73,10 @@ def _set_init_function(self, init_function): {"init": True, "fn": init_function, "args": (), "kwargs": {}} ) + def _set_process(self, process): + self._process = process + self._process.start() + def cancel_items_in_queue(que): """ diff --git a/pympipool/shell/executor.py b/pympipool/shell/executor.py index 8be8ada7..cabc273c 100644 --- a/pympipool/shell/executor.py +++ b/pympipool/shell/executor.py @@ -44,13 +44,14 @@ class SubprocessSingleExecutor(ExecutorBase): def __init__(self): super().__init__() - self._process = RaisingThread( - target=execute_single_task, - kwargs={ - "future_queue": self._future_queue, - }, + self._set_process( + process=RaisingThread( + target=execute_single_task, + kwargs={ + "future_queue": self._future_queue, + }, + ), ) - self._process.start() def submit(self, *args, **kwargs): f = Future() @@ -83,16 +84,17 @@ def __init__( max_workers=1, ): super().__init__() - self._process = RaisingThread( - target=executor_broker, - kwargs={ - # Broker Arguments - "future_queue": self._future_queue, - "max_workers": max_workers, - "executor_class": SubprocessSingleExecutor, - }, + self._set_process( + process=RaisingThread( + target=executor_broker, + kwargs={ + # Broker Arguments + "future_queue": self._future_queue, + "max_workers": max_workers, + "executor_class": SubprocessSingleExecutor, + }, + ), ) - self._process.start() def submit(self, *args, **kwargs): """ diff --git a/pympipool/shell/interactive.py b/pympipool/shell/interactive.py index b155577b..adef58b3 100644 --- a/pympipool/shell/interactive.py +++ b/pympipool/shell/interactive.py @@ -107,13 +107,14 @@ class ShellExecutor(ExecutorBase): def __init__(self, *args, **kwargs): super().__init__() - self._process = RaisingThread( - target=execute_single_task, - kwargs={ - "future_queue": self._future_queue, - }, + self._set_process( + process=RaisingThread( + target=execute_single_task, + kwargs={ + "future_queue": self._future_queue, + }, + ), ) - self._process.start() self._future_queue.put({"init": True, "args": args, "kwargs": kwargs}) def submit(self, string_input, lines_to_read=None, stop_read_pattern=None): diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index 4824441f..92c6d994 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -65,24 +65,25 @@ def __init__( hostname_localhost=False, ): super().__init__() - self._process = RaisingThread( - target=executor_broker, - kwargs={ - # Broker Arguments - "future_queue": self._future_queue, - "max_workers": max_workers, - "hostname_localhost": hostname_localhost, - "executor_class": PySlurmSingleTaskExecutor, - # Executor Arguments - "cores": cores_per_worker, - "threads_per_core": threads_per_core, - "gpus_per_task": int(gpus_per_worker / cores_per_worker), - "oversubscribe": oversubscribe, - "init_function": init_function, - "cwd": cwd, - }, + self._set_process( + process=RaisingThread( + target=executor_broker, + kwargs={ + # Broker Arguments + "future_queue": self._future_queue, + "max_workers": max_workers, + "hostname_localhost": hostname_localhost, + "executor_class": PySlurmSingleTaskExecutor, + # Executor Arguments + "cores": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), + "oversubscribe": oversubscribe, + "init_function": init_function, + "cwd": cwd, + }, + ), ) - self._process.start() class PySlurmSingleTaskExecutor(ExecutorBase): @@ -117,21 +118,22 @@ def __init__( hostname_localhost=False, ): super().__init__() - self._process = RaisingThread( - target=execute_parallel_tasks, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - "cores": cores, - "interface_class": SrunInterface, - # Interface Arguments - "threads_per_core": threads_per_core, - "gpus_per_core": gpus_per_task, - "cwd": cwd, - "oversubscribe": oversubscribe, - "hostname_localhost": hostname_localhost, - }, + self._set_process( + process=RaisingThread( + target=execute_parallel_tasks, + kwargs={ + # Executor Arguments + "future_queue": self._future_queue, + "cores": cores, + "interface_class": SrunInterface, + # Interface Arguments + "threads_per_core": threads_per_core, + "gpus_per_core": gpus_per_task, + "cwd": cwd, + "oversubscribe": oversubscribe, + "hostname_localhost": hostname_localhost, + }, + ), ) - self._process.start() self._set_init_function(init_function=init_function) cloudpickle_register(ind=3)