diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index ba4f3a17..28c031f3 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -68,24 +68,25 @@ def __init__( hostname_localhost=False, ): super().__init__() - self._process = RaisingThread( - target=executor_broker, - kwargs={ - # Broker Arguments - "future_queue": self._future_queue, - "max_workers": max_workers, - "hostname_localhost": hostname_localhost, - "executor_class": PyFluxSingleTaskExecutor, - # Executor Arguments - "cores": cores_per_worker, - "threads_per_core": threads_per_core, - "gpus_per_task": int(gpus_per_worker / cores_per_worker), - "init_function": init_function, - "cwd": cwd, - "executor": executor, - }, + self._set_process( + process=RaisingThread( + target=executor_broker, + kwargs={ + # Broker Arguments + "future_queue": self._future_queue, + "max_workers": max_workers, + "hostname_localhost": hostname_localhost, + "executor_class": PyFluxSingleTaskExecutor, + # Executor Arguments + "cores": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), + "init_function": init_function, + "cwd": cwd, + "executor": executor, + }, + ) ) - self._process.start() class PyFluxSingleTaskExecutor(ExecutorBase): @@ -121,23 +122,24 @@ def __init__( ): super().__init__() cloudpickle_register(ind=3) - self._process = RaisingThread( - target=execute_parallel_tasks, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - "cores": cores, - "interface_class": FluxPythonInterface, - "hostname_localhost": hostname_localhost, - "init_function": init_function, - # Interface Arguments - "threads_per_core": threads_per_core, - "gpus_per_core": gpus_per_task, - "cwd": cwd, - "executor": executor, - }, + self._set_process( + process=RaisingThread( + target=execute_parallel_tasks, + kwargs={ + # Executor Arguments + "future_queue": self._future_queue, + "cores": cores, + "interface_class": FluxPythonInterface, + "hostname_localhost": hostname_localhost, + "init_function": init_function, + # Interface Arguments + "threads_per_core": threads_per_core, + "gpus_per_core": gpus_per_task, + "cwd": cwd, + "executor": executor, + }, + ) ) - self._process.start() class FluxPythonInterface(BaseInterface): diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index ad4ef916..4f5de6b4 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -61,22 +61,23 @@ def __init__( hostname_localhost=False, ): super().__init__() - self._process = RaisingThread( - target=executor_broker, - kwargs={ - # Broker Arguments - "future_queue": self._future_queue, - "max_workers": max_workers, - "executor_class": PyMPISingleTaskExecutor, - "hostname_localhost": hostname_localhost, - # Executor Arguments - "cores": cores_per_worker, - "oversubscribe": oversubscribe, - "init_function": init_function, - "cwd": cwd, - }, + self._set_process( + process=RaisingThread( + target=executor_broker, + kwargs={ + # Broker Arguments + "future_queue": self._future_queue, + "max_workers": max_workers, + "executor_class": PyMPISingleTaskExecutor, + "hostname_localhost": hostname_localhost, + # Executor Arguments + "cores": cores_per_worker, + "oversubscribe": oversubscribe, + "init_function": init_function, + "cwd": cwd, + }, + ), ) - self._process.start() class PyMPISingleTaskExecutor(ExecutorBase): @@ -108,18 +109,19 @@ def __init__( ): super().__init__() cloudpickle_register(ind=3) - self._process = RaisingThread( - target=execute_parallel_tasks, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - "cores": cores, - "interface_class": MpiExecInterface, - "init_function": init_function, - # Interface Arguments - "cwd": cwd, - "oversubscribe": oversubscribe, - "hostname_localhost": hostname_localhost, - }, + self._set_process( + process=RaisingThread( + target=execute_parallel_tasks, + kwargs={ + # Executor Arguments + "future_queue": self._future_queue, + "cores": cores, + "interface_class": MpiExecInterface, + "init_function": init_function, + # Interface Arguments + "cwd": cwd, + "oversubscribe": oversubscribe, + "hostname_localhost": hostname_localhost, + }, + ) ) - self._process.start() diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index c429f616..eb88d629 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -67,6 +67,10 @@ def __del__(self): except (AttributeError, RuntimeError): pass + def _set_process(self, process): + self._process = process + self._process.start() + def cancel_items_in_queue(que): """ diff --git a/pympipool/shell/executor.py b/pympipool/shell/executor.py index 8be8ada7..cabc273c 100644 --- a/pympipool/shell/executor.py +++ b/pympipool/shell/executor.py @@ -44,13 +44,14 @@ class SubprocessSingleExecutor(ExecutorBase): def __init__(self): super().__init__() - self._process = RaisingThread( - target=execute_single_task, - kwargs={ - "future_queue": self._future_queue, - }, + self._set_process( + process=RaisingThread( + target=execute_single_task, + kwargs={ + "future_queue": self._future_queue, + }, + ), ) - self._process.start() def submit(self, *args, **kwargs): f = Future() @@ -83,16 +84,17 @@ def __init__( max_workers=1, ): super().__init__() - self._process = RaisingThread( - target=executor_broker, - kwargs={ - # Broker Arguments - "future_queue": self._future_queue, - "max_workers": max_workers, - "executor_class": SubprocessSingleExecutor, - }, + self._set_process( + process=RaisingThread( + target=executor_broker, + kwargs={ + # Broker Arguments + "future_queue": self._future_queue, + "max_workers": max_workers, + "executor_class": SubprocessSingleExecutor, + }, + ), ) - self._process.start() def submit(self, *args, **kwargs): """ diff --git a/pympipool/shell/interactive.py b/pympipool/shell/interactive.py index b155577b..adef58b3 100644 --- a/pympipool/shell/interactive.py +++ b/pympipool/shell/interactive.py @@ -107,13 +107,14 @@ class ShellExecutor(ExecutorBase): def __init__(self, *args, **kwargs): super().__init__() - self._process = RaisingThread( - target=execute_single_task, - kwargs={ - "future_queue": self._future_queue, - }, + self._set_process( + process=RaisingThread( + target=execute_single_task, + kwargs={ + "future_queue": self._future_queue, + }, + ), ) - self._process.start() self._future_queue.put({"init": True, "args": args, "kwargs": kwargs}) def submit(self, string_input, lines_to_read=None, stop_read_pattern=None): diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index db11b204..7f0d5a8a 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -64,24 +64,25 @@ def __init__( hostname_localhost=False, ): super().__init__() - self._process = RaisingThread( - target=executor_broker, - kwargs={ - # Broker Arguments - "future_queue": self._future_queue, - "max_workers": max_workers, - "hostname_localhost": hostname_localhost, - "executor_class": PySlurmSingleTaskExecutor, - # Executor Arguments - "cores": cores_per_worker, - "threads_per_core": threads_per_core, - "gpus_per_task": int(gpus_per_worker / cores_per_worker), - "oversubscribe": oversubscribe, - "init_function": init_function, - "cwd": cwd, - }, + self._set_process( + process=RaisingThread( + target=executor_broker, + kwargs={ + # Broker Arguments + "future_queue": self._future_queue, + "max_workers": max_workers, + "hostname_localhost": hostname_localhost, + "executor_class": PySlurmSingleTaskExecutor, + # Executor Arguments + "cores": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), + "oversubscribe": oversubscribe, + "init_function": init_function, + "cwd": cwd, + }, + ), ) - self._process.start() class PySlurmSingleTaskExecutor(ExecutorBase): @@ -117,20 +118,21 @@ def __init__( ): super().__init__() cloudpickle_register(ind=3) - self._process = RaisingThread( - target=execute_parallel_tasks, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - "cores": cores, - "interface_class": SrunInterface, - "init_function": init_function, - # Interface Arguments - "threads_per_core": threads_per_core, - "gpus_per_core": gpus_per_task, - "cwd": cwd, - "oversubscribe": oversubscribe, - "hostname_localhost": hostname_localhost, - }, + self._set_process( + process=RaisingThread( + target=execute_parallel_tasks, + kwargs={ + # Executor Arguments + "future_queue": self._future_queue, + "cores": cores, + "interface_class": SrunInterface, + "init_function": init_function, + # Interface Arguments + "threads_per_core": threads_per_core, + "gpus_per_core": gpus_per_task, + "cwd": cwd, + "oversubscribe": oversubscribe, + "hostname_localhost": hostname_localhost, + }, + ), ) - self._process.start()