From d540f1565134027d2c267c5cdbad4f17f12729c9 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 9 Nov 2024 16:27:10 +0100 Subject: [PATCH 1/5] Update validate_number_of_cores() --- executorlib/interactive/executor.py | 22 ++++++++++++++++---- executorlib/interactive/shared.py | 31 ++++++++++++++++++++++------ executorlib/standalone/inputcheck.py | 18 +++++++++------- 3 files changed, 54 insertions(+), 17 deletions(-) diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index cd5ce42c..6e55ae5a 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -197,7 +197,6 @@ def create_executor( of the individual function. init_function (None): optional function to preset arguments for functions which are submitted later """ - max_cores = validate_number_of_cores(max_cores=max_cores, max_workers=max_workers) check_init_function(block_allocation=block_allocation, init_function=init_function) if flux_executor is not None and backend != "flux": backend = "flux" @@ -218,13 +217,18 @@ def create_executor( if block_allocation: resource_dict["init_function"] = init_function return InteractiveExecutor( - max_workers=int(max_cores / cores_per_worker), + max_workers=validate_number_of_cores( + max_cores=max_cores, + max_workers=max_workers, + set_local_cores=False, + ), executor_kwargs=resource_dict, spawner=FluxPythonSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, + max_workers=max_workers, executor_kwargs=resource_dict, spawner=FluxPythonSpawner, ) @@ -234,13 +238,18 @@ def create_executor( if block_allocation: resource_dict["init_function"] = init_function return InteractiveExecutor( - max_workers=int(max_cores / cores_per_worker), + max_workers=validate_number_of_cores( + max_cores=max_cores, + max_workers=max_workers, + set_local_cores=False, + ), executor_kwargs=resource_dict, spawner=SrunSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, + max_workers=max_workers, executor_kwargs=resource_dict, spawner=SrunSpawner, ) @@ -258,13 +267,18 @@ def create_executor( if block_allocation: resource_dict["init_function"] = init_function return InteractiveExecutor( - max_workers=int(max_cores / cores_per_worker), + max_workers=validate_number_of_cores( + max_cores=max_cores, + max_workers=max_workers, + set_local_cores=True, + ), executor_kwargs=resource_dict, spawner=MpiExecSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, + max_workers=max_workers, executor_kwargs=resource_dict, spawner=MpiExecSpawner, ) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index aebca15d..dc8014bf 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -179,7 +179,8 @@ class InteractiveStepExecutor(ExecutorBase): def __init__( self, - max_cores: int = 1, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, executor_kwargs: dict = {}, spawner: BaseSpawner = MpiExecSpawner, ): @@ -187,6 +188,7 @@ def __init__( executor_kwargs["future_queue"] = self._future_queue executor_kwargs["spawner"] = spawner executor_kwargs["max_cores"] = max_cores + executor_kwargs["max_workers"] = max_workers self._set_process( RaisingThread( target=execute_separate_tasks, @@ -256,7 +258,8 @@ def execute_parallel_tasks( def execute_separate_tasks( future_queue: queue.Queue, spawner: BaseSpawner = MpiExecSpawner, - max_cores: int = 1, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, hostname_localhost: Optional[bool] = None, **kwargs, ): @@ -267,6 +270,9 @@ def execute_separate_tasks( future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process spawner (BaseSpawner): Interface to start process on selected compute resources max_cores (int): defines the number cores which can be used in parallel + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -296,6 +302,7 @@ def execute_separate_tasks( spawner=spawner, executor_kwargs=kwargs, max_cores=max_cores, + max_workers=max_workers, hostname_localhost=hostname_localhost, ) qtask_lst.append(qtask) @@ -389,7 +396,7 @@ def _get_backend_path( def _wait_for_free_slots( - active_task_dict: dict, cores_requested: int, max_cores: int + active_task_dict: dict, cores_requested: int, max_cores: Optional[int] = None, max_workers: Optional[int] = None, ) -> dict: """ Wait for available computing resources to become available. @@ -398,12 +405,19 @@ def _wait_for_free_slots( active_task_dict (dict): Dictionary containing the future objects and the number of cores they require cores_requested (int): Number of cores required for executing the next task max_cores (int): Maximum number cores which can be used + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. Returns: dict: Dictionary containing the future objects and the number of cores they require """ - while sum(active_task_dict.values()) + cores_requested > max_cores: - active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} + if max_cores is not None: + while sum(active_task_dict.values()) + cores_requested > max_cores: + active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} + elif max_workers is not None and max_cores is None: + while len(active_task_dict.values()) + 1 > max_workers: + active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} return active_task_dict @@ -490,7 +504,8 @@ def _submit_function_to_separate_process( qtask: queue.Queue, spawner: BaseSpawner, executor_kwargs: dict, - max_cores: int = 1, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, hostname_localhost: Optional[bool] = None, ): """ @@ -503,6 +518,9 @@ def _submit_function_to_separate_process( spawner (BaseSpawner): Interface to start process on selected compute resources executor_kwargs (dict): keyword parameters used to initialize the Executor max_cores (int): defines the number cores which can be used in parallel + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -525,6 +543,7 @@ def _submit_function_to_separate_process( active_task_dict=active_task_dict, cores_requested=resource_dict["cores"], max_cores=max_cores, + max_workers=max_workers, ) active_task_dict[task_dict["future"]] = resource_dict["cores"] task_kwargs = executor_kwargs.copy() diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index 76f6c823..64429882 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -170,14 +170,18 @@ def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None: def validate_number_of_cores( - max_cores: Optional[int], max_workers: Optional[int] + max_cores: Optional[int] = None, max_workers: Optional[int] = None, set_local_cores: bool = False, ) -> int: """ Validate the number of cores and return the appropriate value. """ - if max_workers is None and max_cores is None: - return multiprocessing.cpu_count() - elif max_workers is not None and max_cores is None: - return max_workers - else: - return max_cores + if max_cores is None and max_workers is None: + if not set_local_cores: + raise ValueError( + "Block allocation requires a fixed set of computational resources. Neither max_cores nor max_workers are defined." + ) + else: + max_workers = multiprocessing.cpu_count() + elif max_cores is not None and max_workers is None: + max_workers = int(max_cores / cores_per_worker) + return max_workers From c859e07d43ea4f561c4314e493debe7ff68e9115 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 9 Nov 2024 15:28:08 +0000 Subject: [PATCH 2/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/interactive/shared.py | 13 ++++++++++--- executorlib/standalone/inputcheck.py | 4 +++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index dc8014bf..727aa58f 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -396,7 +396,10 @@ def _get_backend_path( def _wait_for_free_slots( - active_task_dict: dict, cores_requested: int, max_cores: Optional[int] = None, max_workers: Optional[int] = None, + active_task_dict: dict, + cores_requested: int, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, ) -> dict: """ Wait for available computing resources to become available. @@ -414,10 +417,14 @@ def _wait_for_free_slots( """ if max_cores is not None: while sum(active_task_dict.values()) + cores_requested > max_cores: - active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} + active_task_dict = { + k: v for k, v in active_task_dict.items() if not k.done() + } elif max_workers is not None and max_cores is None: while len(active_task_dict.values()) + 1 > max_workers: - active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} + active_task_dict = { + k: v for k, v in active_task_dict.items() if not k.done() + } return active_task_dict diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index 64429882..d776cf6b 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -170,7 +170,9 @@ def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None: def validate_number_of_cores( - max_cores: Optional[int] = None, max_workers: Optional[int] = None, set_local_cores: bool = False, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, + set_local_cores: bool = False, ) -> int: """ Validate the number of cores and return the appropriate value. From 28e214aa8ceef799f2d092f2ad993f42c09b22db Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 9 Nov 2024 16:34:21 +0100 Subject: [PATCH 3/5] fixes --- executorlib/interactive/executor.py | 3 +++ executorlib/standalone/inputcheck.py | 1 + tests/test_shared_input_check.py | 11 ++++++----- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 6e55ae5a..9dee5abd 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -220,6 +220,7 @@ def create_executor( max_workers=validate_number_of_cores( max_cores=max_cores, max_workers=max_workers, + cores_per_worker=cores_per_worker, set_local_cores=False, ), executor_kwargs=resource_dict, @@ -241,6 +242,7 @@ def create_executor( max_workers=validate_number_of_cores( max_cores=max_cores, max_workers=max_workers, + cores_per_worker=cores_per_worker, set_local_cores=False, ), executor_kwargs=resource_dict, @@ -270,6 +272,7 @@ def create_executor( max_workers=validate_number_of_cores( max_cores=max_cores, max_workers=max_workers, + cores_per_worker=cores_per_worker, set_local_cores=True, ), executor_kwargs=resource_dict, diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index d776cf6b..040bbe51 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -172,6 +172,7 @@ def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None: def validate_number_of_cores( max_cores: Optional[int] = None, max_workers: Optional[int] = None, + cores_per_worker: Optional[int] = None, set_local_cores: bool = False, ) -> int: """ diff --git a/tests/test_shared_input_check.py b/tests/test_shared_input_check.py index 44f5e599..21cc089d 100644 --- a/tests/test_shared_input_check.py +++ b/tests/test_shared_input_check.py @@ -98,12 +98,13 @@ def test_check_pysqa_config_directory(self): check_pysqa_config_directory(pysqa_config_directory="path/to/config") def test_validate_number_of_cores(self): + with self.assertRaises(ValueError): + validate_number_of_cores(max_cores=None, max_workers=None, cores_per_worker=None) + with self.assertRaises(TypeError): + validate_number_of_cores(max_cores=1, max_workers=None, cores_per_worker=None) self.assertIsInstance( - validate_number_of_cores(max_cores=None, max_workers=None), int - ) - self.assertIsInstance( - validate_number_of_cores(max_cores=1, max_workers=None), int + validate_number_of_cores(max_cores=1, max_workers=None, cores_per_worker=1), int ) self.assertIsInstance( - validate_number_of_cores(max_cores=None, max_workers=1), int + validate_number_of_cores(max_cores=None, max_workers=1, cores_per_worker=None), int ) From a345de5896443a2c125eeb5ed19572f7a8634638 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 9 Nov 2024 15:34:29 +0000 Subject: [PATCH 4/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_shared_input_check.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/tests/test_shared_input_check.py b/tests/test_shared_input_check.py index 21cc089d..5e3b0766 100644 --- a/tests/test_shared_input_check.py +++ b/tests/test_shared_input_check.py @@ -99,12 +99,20 @@ def test_check_pysqa_config_directory(self): def test_validate_number_of_cores(self): with self.assertRaises(ValueError): - validate_number_of_cores(max_cores=None, max_workers=None, cores_per_worker=None) + validate_number_of_cores( + max_cores=None, max_workers=None, cores_per_worker=None + ) with self.assertRaises(TypeError): - validate_number_of_cores(max_cores=1, max_workers=None, cores_per_worker=None) + validate_number_of_cores( + max_cores=1, max_workers=None, cores_per_worker=None + ) self.assertIsInstance( - validate_number_of_cores(max_cores=1, max_workers=None, cores_per_worker=1), int + validate_number_of_cores(max_cores=1, max_workers=None, cores_per_worker=1), + int, ) self.assertIsInstance( - validate_number_of_cores(max_cores=None, max_workers=1, cores_per_worker=None), int + validate_number_of_cores( + max_cores=None, max_workers=1, cores_per_worker=None + ), + int, ) From 5d4671592a16a4032f05e1dc19855009fbd3368a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 10 Nov 2024 08:43:53 +0100 Subject: [PATCH 5/5] fix test --- tests/test_executor_backend_mpi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index 9a4309b2..29a8d58d 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -91,7 +91,7 @@ def tearDown(self): ) def test_meta_executor_parallel_cache(self): with Executor( - max_workers=2, + max_cores=2, resource_dict={"cores": 2}, backend="local", block_allocation=True,