diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index e36301a9..067dec50 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -197,7 +197,6 @@ def create_executor( of the individual function. init_function (None): optional function to preset arguments for functions which are submitted later """ - max_cores = validate_number_of_cores(max_cores=max_cores, max_workers=max_workers) check_init_function(block_allocation=block_allocation, init_function=init_function) if flux_executor is not None and backend != "flux": backend = "flux" @@ -218,13 +217,19 @@ def create_executor( if block_allocation: resource_dict["init_function"] = init_function return InteractiveExecutor( - max_workers=int(max_cores / cores_per_worker), + max_workers=validate_number_of_cores( + max_cores=max_cores, + max_workers=max_workers, + cores_per_worker=cores_per_worker, + set_local_cores=False, + ), executor_kwargs=resource_dict, spawner=FluxPythonSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, + max_workers=max_workers, executor_kwargs=resource_dict, spawner=FluxPythonSpawner, ) @@ -234,13 +239,19 @@ def create_executor( if block_allocation: resource_dict["init_function"] = init_function return InteractiveExecutor( - max_workers=int(max_cores / cores_per_worker), + max_workers=validate_number_of_cores( + max_cores=max_cores, + max_workers=max_workers, + cores_per_worker=cores_per_worker, + set_local_cores=False, + ), executor_kwargs=resource_dict, spawner=SrunSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, + max_workers=max_workers, executor_kwargs=resource_dict, spawner=SrunSpawner, ) @@ -258,13 +269,19 @@ def create_executor( if block_allocation: resource_dict["init_function"] = init_function return InteractiveExecutor( - max_workers=int(max_cores / cores_per_worker), + max_workers=validate_number_of_cores( + max_cores=max_cores, + max_workers=max_workers, + cores_per_worker=cores_per_worker, + set_local_cores=True, + ), executor_kwargs=resource_dict, spawner=MpiExecSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, + max_workers=max_workers, executor_kwargs=resource_dict, spawner=MpiExecSpawner, ) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index d163cbfe..3eb79986 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -179,7 +179,8 @@ class InteractiveStepExecutor(ExecutorBase): def __init__( self, - max_cores: int = 1, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, executor_kwargs: dict = {}, spawner: BaseSpawner = MpiExecSpawner, ): @@ -187,6 +188,7 @@ def __init__( executor_kwargs["future_queue"] = self._future_queue executor_kwargs["spawner"] = spawner executor_kwargs["max_cores"] = max_cores + executor_kwargs["max_workers"] = max_workers self._set_process( RaisingThread( target=execute_separate_tasks, @@ -256,7 +258,8 @@ def execute_parallel_tasks( def execute_separate_tasks( future_queue: queue.Queue, spawner: BaseSpawner = MpiExecSpawner, - max_cores: int = 1, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, hostname_localhost: Optional[bool] = None, **kwargs, ): @@ -267,6 +270,9 @@ def execute_separate_tasks( future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process spawner (BaseSpawner): Interface to start process on selected compute resources max_cores (int): defines the number cores which can be used in parallel + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -296,6 +302,7 @@ def execute_separate_tasks( spawner=spawner, executor_kwargs=kwargs, max_cores=max_cores, + max_workers=max_workers, hostname_localhost=hostname_localhost, ) qtask_lst.append(qtask) @@ -389,7 +396,10 @@ def _get_backend_path( def _wait_for_free_slots( - active_task_dict: dict, cores_requested: int, max_cores: int + active_task_dict: dict, + cores_requested: int, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, ) -> dict: """ Wait for available computing resources to become available. @@ -398,12 +408,23 @@ def _wait_for_free_slots( active_task_dict (dict): Dictionary containing the future objects and the number of cores they require cores_requested (int): Number of cores required for executing the next task max_cores (int): Maximum number cores which can be used + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. Returns: dict: Dictionary containing the future objects and the number of cores they require """ - while sum(active_task_dict.values()) + cores_requested > max_cores: - active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} + if max_cores is not None: + while sum(active_task_dict.values()) + cores_requested > max_cores: + active_task_dict = { + k: v for k, v in active_task_dict.items() if not k.done() + } + elif max_workers is not None and max_cores is None: + while len(active_task_dict.values()) + 1 > max_workers: + active_task_dict = { + k: v for k, v in active_task_dict.items() if not k.done() + } return active_task_dict @@ -490,7 +511,8 @@ def _submit_function_to_separate_process( qtask: queue.Queue, spawner: BaseSpawner, executor_kwargs: dict, - max_cores: int = 1, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, hostname_localhost: Optional[bool] = None, ): """ @@ -503,6 +525,9 @@ def _submit_function_to_separate_process( spawner (BaseSpawner): Interface to start process on selected compute resources executor_kwargs (dict): keyword parameters used to initialize the Executor max_cores (int): defines the number cores which can be used in parallel + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -525,6 +550,7 @@ def _submit_function_to_separate_process( active_task_dict=active_task_dict, cores_requested=resource_dict["cores"], max_cores=max_cores, + max_workers=max_workers, ) active_task_dict[task_dict["future"]] = resource_dict["cores"] task_kwargs = executor_kwargs.copy() diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index 76f6c823..040bbe51 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -170,14 +170,21 @@ def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None: def validate_number_of_cores( - max_cores: Optional[int], max_workers: Optional[int] + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, + cores_per_worker: Optional[int] = None, + set_local_cores: bool = False, ) -> int: """ Validate the number of cores and return the appropriate value. """ - if max_workers is None and max_cores is None: - return multiprocessing.cpu_count() - elif max_workers is not None and max_cores is None: - return max_workers - else: - return max_cores + if max_cores is None and max_workers is None: + if not set_local_cores: + raise ValueError( + "Block allocation requires a fixed set of computational resources. Neither max_cores nor max_workers are defined." + ) + else: + max_workers = multiprocessing.cpu_count() + elif max_cores is not None and max_workers is None: + max_workers = int(max_cores / cores_per_worker) + return max_workers diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index 4876cbfc..9a002136 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -97,7 +97,7 @@ def tearDown(self): ) def test_meta_executor_parallel_cache(self): with Executor( - max_workers=2, + max_cores=2, resource_dict={"cores": 2}, backend="local", block_allocation=True, diff --git a/tests/test_shared_input_check.py b/tests/test_shared_input_check.py index 44f5e599..5e3b0766 100644 --- a/tests/test_shared_input_check.py +++ b/tests/test_shared_input_check.py @@ -98,12 +98,21 @@ def test_check_pysqa_config_directory(self): check_pysqa_config_directory(pysqa_config_directory="path/to/config") def test_validate_number_of_cores(self): + with self.assertRaises(ValueError): + validate_number_of_cores( + max_cores=None, max_workers=None, cores_per_worker=None + ) + with self.assertRaises(TypeError): + validate_number_of_cores( + max_cores=1, max_workers=None, cores_per_worker=None + ) self.assertIsInstance( - validate_number_of_cores(max_cores=None, max_workers=None), int - ) - self.assertIsInstance( - validate_number_of_cores(max_cores=1, max_workers=None), int + validate_number_of_cores(max_cores=1, max_workers=None, cores_per_worker=1), + int, ) self.assertIsInstance( - validate_number_of_cores(max_cores=None, max_workers=1), int + validate_number_of_cores( + max_cores=None, max_workers=1, cores_per_worker=None + ), + int, )