Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ def create_executor(
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
"""
max_cores = validate_number_of_cores(max_cores=max_cores, max_workers=max_workers)
check_init_function(block_allocation=block_allocation, init_function=init_function)
if flux_executor is not None and backend != "flux":
backend = "flux"
Expand All @@ -218,13 +217,19 @@ def create_executor(
if block_allocation:
resource_dict["init_function"] = init_function
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
max_workers=validate_number_of_cores(
max_cores=max_cores,
max_workers=max_workers,
cores_per_worker=cores_per_worker,
set_local_cores=False,
),
executor_kwargs=resource_dict,
spawner=FluxPythonSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
max_workers=max_workers,
executor_kwargs=resource_dict,
spawner=FluxPythonSpawner,
)
Expand All @@ -234,13 +239,19 @@ def create_executor(
if block_allocation:
resource_dict["init_function"] = init_function
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
max_workers=validate_number_of_cores(
max_cores=max_cores,
max_workers=max_workers,
cores_per_worker=cores_per_worker,
set_local_cores=False,
),
executor_kwargs=resource_dict,
spawner=SrunSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
max_workers=max_workers,
executor_kwargs=resource_dict,
spawner=SrunSpawner,
)
Expand All @@ -258,13 +269,19 @@ def create_executor(
if block_allocation:
resource_dict["init_function"] = init_function
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
max_workers=validate_number_of_cores(
max_cores=max_cores,
max_workers=max_workers,
cores_per_worker=cores_per_worker,
set_local_cores=True,
),
executor_kwargs=resource_dict,
spawner=MpiExecSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
max_workers=max_workers,
executor_kwargs=resource_dict,
spawner=MpiExecSpawner,
)
38 changes: 32 additions & 6 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,16 @@ class InteractiveStepExecutor(ExecutorBase):

def __init__(
self,
max_cores: int = 1,
max_cores: Optional[int] = None,
max_workers: Optional[int] = None,
executor_kwargs: dict = {},
spawner: BaseSpawner = MpiExecSpawner,
):
super().__init__(max_cores=executor_kwargs.get("max_cores", None))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["max_cores"] = max_cores
executor_kwargs["max_workers"] = max_workers
self._set_process(
RaisingThread(
target=execute_separate_tasks,
Expand Down Expand Up @@ -256,7 +258,8 @@ def execute_parallel_tasks(
def execute_separate_tasks(
future_queue: queue.Queue,
spawner: BaseSpawner = MpiExecSpawner,
max_cores: int = 1,
max_cores: Optional[int] = None,
max_workers: Optional[int] = None,
hostname_localhost: Optional[bool] = None,
**kwargs,
):
Expand All @@ -267,6 +270,9 @@ def execute_separate_tasks(
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
spawner (BaseSpawner): Interface to start process on selected compute resources
max_cores (int): defines the number cores which can be used in parallel
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
recommended, as computers have a limited number of compute cores.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -296,6 +302,7 @@ def execute_separate_tasks(
spawner=spawner,
executor_kwargs=kwargs,
max_cores=max_cores,
max_workers=max_workers,
hostname_localhost=hostname_localhost,
)
qtask_lst.append(qtask)
Expand Down Expand Up @@ -389,7 +396,10 @@ def _get_backend_path(


def _wait_for_free_slots(
active_task_dict: dict, cores_requested: int, max_cores: int
active_task_dict: dict,
cores_requested: int,
max_cores: Optional[int] = None,
max_workers: Optional[int] = None,
) -> dict:
"""
Wait for available computing resources to become available.
Expand All @@ -398,12 +408,23 @@ def _wait_for_free_slots(
active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
cores_requested (int): Number of cores required for executing the next task
max_cores (int): Maximum number cores which can be used
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
recommended, as computers have a limited number of compute cores.

Returns:
dict: Dictionary containing the future objects and the number of cores they require
"""
while sum(active_task_dict.values()) + cores_requested > max_cores:
active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()}
if max_cores is not None:
while sum(active_task_dict.values()) + cores_requested > max_cores:
active_task_dict = {
k: v for k, v in active_task_dict.items() if not k.done()
}
elif max_workers is not None and max_cores is None:
while len(active_task_dict.values()) + 1 > max_workers:
active_task_dict = {
k: v for k, v in active_task_dict.items() if not k.done()
}
return active_task_dict


Expand Down Expand Up @@ -490,7 +511,8 @@ def _submit_function_to_separate_process(
qtask: queue.Queue,
spawner: BaseSpawner,
executor_kwargs: dict,
max_cores: int = 1,
max_cores: Optional[int] = None,
max_workers: Optional[int] = None,
hostname_localhost: Optional[bool] = None,
):
"""
Expand All @@ -503,6 +525,9 @@ def _submit_function_to_separate_process(
spawner (BaseSpawner): Interface to start process on selected compute resources
executor_kwargs (dict): keyword parameters used to initialize the Executor
max_cores (int): defines the number cores which can be used in parallel
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
recommended, as computers have a limited number of compute cores.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand All @@ -525,6 +550,7 @@ def _submit_function_to_separate_process(
active_task_dict=active_task_dict,
cores_requested=resource_dict["cores"],
max_cores=max_cores,
max_workers=max_workers,
)
active_task_dict[task_dict["future"]] = resource_dict["cores"]
task_kwargs = executor_kwargs.copy()
Expand Down
21 changes: 14 additions & 7 deletions executorlib/standalone/inputcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,21 @@ def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None:


def validate_number_of_cores(
max_cores: Optional[int], max_workers: Optional[int]
max_cores: Optional[int] = None,
max_workers: Optional[int] = None,
cores_per_worker: Optional[int] = None,
set_local_cores: bool = False,
) -> int:
"""
Validate the number of cores and return the appropriate value.
"""
if max_workers is None and max_cores is None:
return multiprocessing.cpu_count()
elif max_workers is not None and max_cores is None:
return max_workers
else:
return max_cores
if max_cores is None and max_workers is None:
if not set_local_cores:
raise ValueError(
"Block allocation requires a fixed set of computational resources. Neither max_cores nor max_workers are defined."
)
else:
max_workers = multiprocessing.cpu_count()
elif max_cores is not None and max_workers is None:
max_workers = int(max_cores / cores_per_worker)
Comment on lines +175 to +189
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent potential division by zero in validate_number_of_cores

In the calculation max_workers = int(max_cores / cores_per_worker), there is a risk of division by zero or a TypeError if cores_per_worker is None or zero. Ensure that cores_per_worker is validated to be a non-zero integer before performing the division.

Consider adding a check to validate cores_per_worker:

elif max_cores is not None and max_workers is None:
+    if cores_per_worker is None or cores_per_worker == 0:
+        raise ValueError("cores_per_worker must be a non-zero integer.")
     max_workers = int(max_cores / cores_per_worker)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
cores_per_worker: Optional[int] = None,
set_local_cores: bool = False,
) -> int:
"""
Validate the number of cores and return the appropriate value.
"""
if max_workers is None and max_cores is None:
return multiprocessing.cpu_count()
elif max_workers is not None and max_cores is None:
return max_workers
else:
return max_cores
if max_cores is None and max_workers is None:
if not set_local_cores:
raise ValueError(
"Block allocation requires a fixed set of computational resources. Neither max_cores nor max_workers are defined."
)
else:
max_workers = multiprocessing.cpu_count()
elif max_cores is not None and max_workers is None:
max_workers = int(max_cores / cores_per_worker)
cores_per_worker: Optional[int] = None,
set_local_cores: bool = False,
) -> int:
"""
Validate the number of cores and return the appropriate value.
"""
if max_cores is None and max_workers is None:
if not set_local_cores:
raise ValueError(
"Block allocation requires a fixed set of computational resources. Neither max_cores nor max_workers are defined."
)
else:
max_workers = multiprocessing.cpu_count()
elif max_cores is not None and max_workers is None:
if cores_per_worker is None or cores_per_worker == 0:
raise ValueError("cores_per_worker must be a non-zero integer.")
max_workers = int(max_cores / cores_per_worker)

return max_workers
2 changes: 1 addition & 1 deletion tests/test_executor_backend_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def tearDown(self):
)
def test_meta_executor_parallel_cache(self):
with Executor(
max_workers=2,
max_cores=2,
resource_dict={"cores": 2},
backend="local",
block_allocation=True,
Expand Down
19 changes: 14 additions & 5 deletions tests/test_shared_input_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,21 @@ def test_check_pysqa_config_directory(self):
check_pysqa_config_directory(pysqa_config_directory="path/to/config")

def test_validate_number_of_cores(self):
with self.assertRaises(ValueError):
validate_number_of_cores(
max_cores=None, max_workers=None, cores_per_worker=None
)
with self.assertRaises(TypeError):
validate_number_of_cores(
max_cores=1, max_workers=None, cores_per_worker=None
)
self.assertIsInstance(
validate_number_of_cores(max_cores=None, max_workers=None), int
)
self.assertIsInstance(
validate_number_of_cores(max_cores=1, max_workers=None), int
validate_number_of_cores(max_cores=1, max_workers=None, cores_per_worker=1),
int,
)
self.assertIsInstance(
validate_number_of_cores(max_cores=None, max_workers=1), int
validate_number_of_cores(
max_cores=None, max_workers=1, cores_per_worker=None
),
int,
)
Loading