From a72c2b0c410575011c9bbe2a3fbbdaa942c05dfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 08:36:55 +0200 Subject: [PATCH 1/2] Move scheduler to standalone --- executorlib/standalone/scheduler.py | 66 ++++++++++++++++++ .../task_scheduler/file/queue_spawner.py | 67 +------------------ .../task_scheduler/file/task_scheduler.py | 6 +- tests/test_fluxclusterexecutor.py | 3 +- tests/test_interactive_slurmspawner.py | 8 +-- 5 files changed, 77 insertions(+), 73 deletions(-) create mode 100644 executorlib/standalone/scheduler.py diff --git a/executorlib/standalone/scheduler.py b/executorlib/standalone/scheduler.py new file mode 100644 index 00000000..27668c13 --- /dev/null +++ b/executorlib/standalone/scheduler.py @@ -0,0 +1,66 @@ +import contextlib +import subprocess +from typing import Optional, Union + +from pysqa import QueueAdapter + + + +def terminate_with_pysqa( + queue_id: int, + config_directory: Optional[str] = None, + backend: Optional[str] = None, +): + """ + Delete job from queuing system + + Args: + queue_id (int): Queuing system ID of the job to delete. + config_directory (str, optional): path to the config directory. + backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"]. + """ + qa = QueueAdapter( + directory=config_directory, + queue_type=backend, + execute_command=pysqa_execute_command, + ) + status = qa.get_status_of_job(process_id=queue_id) + if status is not None and status not in ["finished", "error"]: + with contextlib.suppress(subprocess.CalledProcessError): + qa.delete_job(process_id=queue_id) + + +def pysqa_execute_command( + commands: str, + working_directory: Optional[str] = None, + split_output: bool = True, + shell: bool = False, + error_filename: str = "pysqa.err", +) -> Union[str, list[str]]: + """ + A wrapper around the subprocess.check_output function. Modified from pysqa to raise an exception if the subprocess + fails to submit the job to the queue. + + Args: + commands (str): The command(s) to be executed on the command line + working_directory (str, optional): The directory where the command is executed. Defaults to None. + split_output (bool, optional): Boolean flag to split newlines in the output. Defaults to True. + shell (bool, optional): Additional switch to convert commands to a single string. Defaults to False. + error_filename (str, optional): In case the execution fails, the output is written to this file. Defaults to "pysqa.err". + + Returns: + Union[str, List[str]]: Output of the shell command either as a string or as a list of strings + """ + if shell and isinstance(commands, list): + commands = " ".join(commands) + out = subprocess.check_output( + commands, + cwd=working_directory, + stderr=subprocess.STDOUT, + universal_newlines=True, + shell=not isinstance(commands, list), + ) + if out is not None and split_output: + return out.split("\n") + else: + return out diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index 16dff14f..3cd55587 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -1,11 +1,10 @@ -import contextlib import os -import subprocess -from typing import Optional, Union +from typing import Optional from pysqa import QueueAdapter from executorlib.standalone.inputcheck import check_file_exists +from executorlib.standalone.scheduler import terminate_with_pysqa, pysqa_execute_command from executorlib.task_scheduler.file.hdf import dump, get_queue_id @@ -43,7 +42,7 @@ def execute_with_pysqa( qa = QueueAdapter( directory=config_directory, queue_type=backend, - execute_command=_pysqa_execute_command, + execute_command=pysqa_execute_command, ) queue_id = get_queue_id(file_name=file_name) if os.path.exists(file_name) and ( @@ -91,30 +90,6 @@ def execute_with_pysqa( return queue_id -def terminate_with_pysqa( - queue_id: int, - config_directory: Optional[str] = None, - backend: Optional[str] = None, -): - """ - Delete job from queuing system - - Args: - queue_id (int): Queuing system ID of the job to delete. - config_directory (str, optional): path to the config directory. - backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"]. - """ - qa = QueueAdapter( - directory=config_directory, - queue_type=backend, - execute_command=_pysqa_execute_command, - ) - status = qa.get_status_of_job(process_id=queue_id) - if status is not None and status not in ["finished", "error"]: - with contextlib.suppress(subprocess.CalledProcessError): - qa.delete_job(process_id=queue_id) - - def terminate_tasks_in_cache( cache_directory: str, config_directory: Optional[str] = None, @@ -140,39 +115,3 @@ def terminate_tasks_in_cache( config_directory=config_directory, backend=backend, ) - - -def _pysqa_execute_command( - commands: str, - working_directory: Optional[str] = None, - split_output: bool = True, - shell: bool = False, - error_filename: str = "pysqa.err", -) -> Union[str, list[str]]: - """ - A wrapper around the subprocess.check_output function. Modified from pysqa to raise an exception if the subprocess - fails to submit the job to the queue. - - Args: - commands (str): The command(s) to be executed on the command line - working_directory (str, optional): The directory where the command is executed. Defaults to None. - split_output (bool, optional): Boolean flag to split newlines in the output. Defaults to True. - shell (bool, optional): Additional switch to convert commands to a single string. Defaults to False. - error_filename (str, optional): In case the execution fails, the output is written to this file. Defaults to "pysqa.err". - - Returns: - Union[str, List[str]]: Output of the shell command either as a string or as a list of strings - """ - if shell and isinstance(commands, list): - commands = " ".join(commands) - out = subprocess.check_output( - commands, - cwd=working_directory, - stderr=subprocess.STDOUT, - universal_newlines=True, - shell=not isinstance(commands, list), - ) - if out is not None and split_output: - return out.split("\n") - else: - return out diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index fe719d8b..47bcda04 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -17,10 +17,8 @@ ) try: - from executorlib.task_scheduler.file.queue_spawner import ( - execute_with_pysqa, - terminate_with_pysqa, - ) + from executorlib.standalone.scheduler import terminate_with_pysqa + from executorlib.task_scheduler.file.queue_spawner import execute_with_pysqa except ImportError: # If pysqa is not available fall back to executing tasks in a subprocess execute_with_pysqa = execute_in_subprocess # type: ignore diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index 51b18500..27645d86 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -11,7 +11,8 @@ try: import flux.job from executorlib.task_scheduler.file.hdf import dump - from executorlib.task_scheduler.file.queue_spawner import terminate_with_pysqa, terminate_tasks_in_cache, execute_with_pysqa + from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache, execute_with_pysqa + from executorlib.standalone.scheduler import terminate_with_pysqa skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("EXECUTORLIB_PMIX", None) diff --git a/tests/test_interactive_slurmspawner.py b/tests/test_interactive_slurmspawner.py index a0af5b67..2617b9e9 100644 --- a/tests/test_interactive_slurmspawner.py +++ b/tests/test_interactive_slurmspawner.py @@ -2,7 +2,7 @@ from executorlib.task_scheduler.interactive.slurmspawner import generate_slurm_command try: - from executorlib.task_scheduler.file.queue_spawner import _pysqa_execute_command + from executorlib.standalone.scheduler import pysqa_execute_command skip_pysqa_test = False except ImportError: @@ -14,7 +14,7 @@ ) class TestPysqaExecuteCommand(unittest.TestCase): def test_pysqa_execute_command_list(self): - out = _pysqa_execute_command( + out = pysqa_execute_command( commands=["echo", "test"], working_directory=None, split_output=True, @@ -25,7 +25,7 @@ def test_pysqa_execute_command_list(self): self.assertEqual("test", out[0]) def test_pysqa_execute_command_string(self): - out = _pysqa_execute_command( + out = pysqa_execute_command( commands="echo test", working_directory=None, split_output=False, @@ -37,7 +37,7 @@ def test_pysqa_execute_command_string(self): def test_pysqa_execute_command_fail(self): with self.assertRaises(FileNotFoundError): - _pysqa_execute_command( + pysqa_execute_command( commands=["no/executable/available"], working_directory=None, split_output=True, From bafbf4441272485cf0353af10657f53efb8d5823 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 27 Jul 2025 06:37:23 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/standalone/scheduler.py | 1 - executorlib/task_scheduler/file/queue_spawner.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/executorlib/standalone/scheduler.py b/executorlib/standalone/scheduler.py index 27668c13..bc68187b 100644 --- a/executorlib/standalone/scheduler.py +++ b/executorlib/standalone/scheduler.py @@ -5,7 +5,6 @@ from pysqa import QueueAdapter - def terminate_with_pysqa( queue_id: int, config_directory: Optional[str] = None, diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index 3cd55587..06cdf8f1 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -4,7 +4,7 @@ from pysqa import QueueAdapter from executorlib.standalone.inputcheck import check_file_exists -from executorlib.standalone.scheduler import terminate_with_pysqa, pysqa_execute_command +from executorlib.standalone.scheduler import pysqa_execute_command, terminate_with_pysqa from executorlib.task_scheduler.file.hdf import dump, get_queue_id