From e8e48722a8d16ecfa0118fabf64b24ed16d7bde6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 19 Jul 2025 14:42:26 +0200 Subject: [PATCH 1/3] Refactor get_cache_execute_command() and get_interative_exeute_command() --- executorlib/standalone/command.py | 54 +++++++++++++++++++ executorlib/task_scheduler/file/shared.py | 33 +----------- .../task_scheduler/interactive/shared.py | 30 +---------- tests/test_fluxclusterexecutor.py | 4 +- 4 files changed, 60 insertions(+), 61 deletions(-) diff --git a/executorlib/standalone/command.py b/executorlib/standalone/command.py index 8dfa8e83..03ce8572 100644 --- a/executorlib/standalone/command.py +++ b/executorlib/standalone/command.py @@ -1,4 +1,6 @@ import os +import sys +import importlib def get_command_path(executable: str) -> str: @@ -12,3 +14,55 @@ def get_command_path(executable: str) -> str: str: absolute path to the executable script """ return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable)) + + +def get_cache_execute_command(file_name: str, cores: int = 1) -> list: + """ + Get command to call backend as a list of two strings + + Args: + file_name (str): The name of the file. + cores (int, optional): Number of cores used to execute the task. Defaults to 1. + + Returns: + list[str]: List of strings containing the python executable path and the backend script to execute + """ + command_lst = [sys.executable] + if cores > 1 and importlib.util.find_spec("mpi4py") is not None: + command_lst = ( + ["mpiexec", "-n", str(cores)] + + command_lst + + [get_command_path(executable="cache_parallel.py"), file_name] + ) + elif cores > 1: + raise ImportError( + "mpi4py is required for parallel calculations. Please install mpi4py." + ) + else: + command_lst += [get_command_path(executable="cache_serial.py"), file_name] + return command_lst + + +def get_interactive_execute_command( + cores: int, +) -> list: + """ + Get command to call backend as a list of two strings + + Args: + cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py + else interactive_serial.py + + Returns: + list[str]: List of strings containing the python executable path and the backend script to execute + """ + command_lst = [sys.executable] + if cores > 1 and importlib.util.find_spec("mpi4py") is not None: + command_lst += [get_command_path(executable="interactive_parallel.py")] + elif cores > 1: + raise ImportError( + "mpi4py is required for parallel calculations. Please install mpi4py." + ) + else: + command_lst += [get_command_path(executable="interactive_serial.py")] + return command_lst diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 1944321f..0c5ac882 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -1,13 +1,11 @@ import contextlib -import importlib.util import os import queue -import sys from concurrent.futures import Future from typing import Any, Callable, Optional from executorlib.standalone.cache import get_cache_files -from executorlib.standalone.command import get_command_path +from executorlib.standalone.command import get_cache_execute_command from executorlib.standalone.serialize import serialize_funct_h5 from executorlib.task_scheduler.file.hdf import get_output from executorlib.task_scheduler.file.subprocess_spawner import terminate_subprocess @@ -153,7 +151,7 @@ def execute_tasks_h5( ) task_dependent_lst = [] process_dict[task_key] = execute_function( - command=_get_execute_command( + command=get_cache_execute_command( file_name=file_name, cores=task_resource_dict["cores"], ), @@ -183,33 +181,6 @@ def execute_tasks_h5( } -def _get_execute_command(file_name: str, cores: int = 1) -> list: - """ - Get command to call backend as a list of two strings - - Args: - file_name (str): The name of the file. - cores (int, optional): Number of cores used to execute the task. Defaults to 1. - - Returns: - list[str]: List of strings containing the python executable path and the backend script to execute - """ - command_lst = [sys.executable] - if cores > 1 and importlib.util.find_spec("mpi4py") is not None: - command_lst = ( - ["mpiexec", "-n", str(cores)] - + command_lst - + [get_command_path(executable="cache_parallel.py"), file_name] - ) - elif cores > 1: - raise ImportError( - "mpi4py is required for parallel calculations. Please install mpi4py." - ) - else: - command_lst += [get_command_path(executable="cache_serial.py"), file_name] - return command_lst - - def _check_task_output( task_key: str, future_obj: Future, cache_directory: str ) -> Future: diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index cc22ca2c..47a04e5a 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -1,13 +1,11 @@ import contextlib -import importlib.util import os import queue -import sys import time from typing import Callable, Optional from executorlib.standalone.cache import get_cache_files -from executorlib.standalone.command import get_command_path +from executorlib.standalone.command import get_interactive_execute_command from executorlib.standalone.interactive.communication import ( SocketInterface, interface_bootup, @@ -51,7 +49,7 @@ def execute_tasks( log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. """ interface = interface_bootup( - command_lst=_get_backend_path( + command_lst=get_interactive_execute_command( cores=cores, ), connections=spawner(cores=cores, **kwargs), @@ -87,30 +85,6 @@ def execute_tasks( ) -def _get_backend_path( - cores: int, -) -> list: - """ - Get command to call backend as a list of two strings - - Args: - cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py - - Returns: - list[str]: List of strings containing the python executable path and the backend script to execute - """ - command_lst = [sys.executable] - if cores > 1 and importlib.util.find_spec("mpi4py") is not None: - command_lst += [get_command_path(executable="interactive_parallel.py")] - elif cores > 1: - raise ImportError( - "mpi4py is required for parallel calculations. Please install mpi4py." - ) - else: - command_lst += [get_command_path(executable="interactive_serial.py")] - return command_lst - - def _execute_task_without_cache( interface: SocketInterface, task_dict: dict, future_queue: queue.Queue ): diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index f2f90ce0..51b18500 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -6,12 +6,12 @@ from executorlib import FluxClusterExecutor from executorlib.standalone.serialize import cloudpickle_register +from executorlib.standalone.command import get_cache_execute_command try: import flux.job from executorlib.task_scheduler.file.hdf import dump from executorlib.task_scheduler.file.queue_spawner import terminate_with_pysqa, terminate_tasks_in_cache, execute_with_pysqa - from executorlib.task_scheduler.file.shared import _get_execute_command skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("EXECUTORLIB_PMIX", None) @@ -63,7 +63,7 @@ def test_executor_no_cwd(self): def test_pysqa_interface(self): queue_id = execute_with_pysqa( - command=_get_execute_command( + command=get_cache_execute_command( file_name="test_i.h5", cores=1, ), From a23aee8b564ee7d4fedf402278e02ebdbc9f767f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 19 Jul 2025 12:42:59 +0000 Subject: [PATCH 2/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/standalone/command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/standalone/command.py b/executorlib/standalone/command.py index 03ce8572..6620cb08 100644 --- a/executorlib/standalone/command.py +++ b/executorlib/standalone/command.py @@ -1,6 +1,6 @@ +import importlib import os import sys -import importlib def get_command_path(executable: str) -> str: From 12d6d33116377f89540fc521a0f4ff0652409de6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 19 Jul 2025 14:45:39 +0200 Subject: [PATCH 3/3] fix import --- executorlib/standalone/command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/standalone/command.py b/executorlib/standalone/command.py index 6620cb08..aa396caa 100644 --- a/executorlib/standalone/command.py +++ b/executorlib/standalone/command.py @@ -1,4 +1,4 @@ -import importlib +import importlib.util import os import sys