diff --git a/pympipool/__init__.py b/pympipool/__init__.py index aa9bc881..15cdb67a 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -38,6 +38,7 @@ class Executor: oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed + sleep_interval (float): synchronization interval - default 0.1 hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -76,6 +77,7 @@ def __init__( oversubscribe=False, init_function=None, cwd=None, + sleep_interval=0.1, executor=None, hostname_localhost=False, ): @@ -91,6 +93,7 @@ def __new__( oversubscribe=False, init_function=None, cwd=None, + sleep_interval=0.1, executor=None, hostname_localhost=False, ): @@ -110,6 +113,7 @@ def __new__( oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed + sleep_interval (float): synchronization interval - default 0.1 hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -132,6 +136,7 @@ def __new__( gpus_per_worker=gpus_per_worker, init_function=init_function, cwd=cwd, + sleep_interval=sleep_interval, hostname_localhost=hostname_localhost, ) elif slurm_installed: @@ -140,6 +145,7 @@ def __new__( cores_per_worker=cores_per_worker, init_function=init_function, cwd=cwd, + sleep_interval=sleep_interval, hostname_localhost=hostname_localhost, ) else: @@ -162,5 +168,6 @@ def __new__( cores_per_worker=cores_per_worker, init_function=init_function, cwd=cwd, + sleep_interval=sleep_interval, hostname_localhost=hostname_localhost, ) diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index 31a724ed..4e5b7b81 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -26,6 +26,7 @@ class PyFluxExecutor(ExecutorBase): gpus_per_worker (int): number of GPUs per worker - defaults to 0 init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed + sleep_interval (float): synchronization interval - default 0.1 executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an @@ -64,6 +65,7 @@ def __init__( gpus_per_worker=0, init_function=None, cwd=None, + sleep_interval=0.1, executor=None, hostname_localhost=False, ): @@ -74,6 +76,7 @@ def __init__( # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, + "sleep_interval": sleep_interval, "hostname_localhost": hostname_localhost, "executor_class": PyFluxSingleTaskExecutor, # Executor Arguments diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index eefa2431..94add9f5 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -22,6 +22,7 @@ class PyMPIExecutor(ExecutorBase): oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed + sleep_interval (float): synchronization interval - default 0.1 hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -58,6 +59,7 @@ def __init__( oversubscribe=False, init_function=None, cwd=None, + sleep_interval=0.1, hostname_localhost=False, ): super().__init__() @@ -67,6 +69,7 @@ def __init__( # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, + "sleep_interval": sleep_interval, "executor_class": PyMPISingleTaskExecutor, "hostname_localhost": hostname_localhost, # Executor Arguments diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index b0bc3cd7..40f4a96a 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -7,6 +7,7 @@ import os import queue import sys +from time import sleep import cloudpickle @@ -175,6 +176,7 @@ def executor_broker( future_queue, max_workers, executor_class, + sleep_interval=0.1, **kwargs, ): meta_future_lst = _get_executor_dict( @@ -183,14 +185,17 @@ def executor_broker( **kwargs, ) while True: - if execute_task_dict( - task_dict=future_queue.get(), meta_future_lst=meta_future_lst - ): - future_queue.task_done() + try: + task_dict = future_queue.get_nowait() + except queue.Empty: + sleep(sleep_interval) else: - future_queue.task_done() - future_queue.join() - break + if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): + future_queue.task_done() + else: + future_queue.task_done() + future_queue.join() + break def execute_task_dict(task_dict, meta_future_lst): diff --git a/pympipool/shell/executor.py b/pympipool/shell/executor.py index 8be8ada7..d9eda6fa 100644 --- a/pympipool/shell/executor.py +++ b/pympipool/shell/executor.py @@ -67,6 +67,7 @@ class SubprocessExecutor(ExecutorBase): Args: max_workers (int): defines the number workers which can execute functions in parallel + sleep_interval (float): synchronization interval - default 0.1 Examples: @@ -81,6 +82,7 @@ class SubprocessExecutor(ExecutorBase): def __init__( self, max_workers=1, + sleep_interval=0.1, ): super().__init__() self._process = RaisingThread( @@ -89,6 +91,7 @@ def __init__( # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, + "sleep_interval": sleep_interval, "executor_class": SubprocessSingleExecutor, }, ) diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index 4824441f..397d61b0 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -62,6 +62,7 @@ def __init__( oversubscribe=False, init_function=None, cwd=None, + sleep_interval=0.1, hostname_localhost=False, ): super().__init__() @@ -71,6 +72,7 @@ def __init__( # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, + "sleep_interval": sleep_interval, "hostname_localhost": hostname_localhost, "executor_class": PySlurmSingleTaskExecutor, # Executor Arguments