Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions executorlib/standalone/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import contextlib
import subprocess
from typing import Optional, Union

from pysqa import QueueAdapter


def terminate_with_pysqa(
queue_id: int,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
):
"""
Delete job from queuing system

Args:
queue_id (int): Queuing system ID of the job to delete.
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
"""
qa = QueueAdapter(
directory=config_directory,
queue_type=backend,
execute_command=pysqa_execute_command,
)
status = qa.get_status_of_job(process_id=queue_id)
if status is not None and status not in ["finished", "error"]:
with contextlib.suppress(subprocess.CalledProcessError):
qa.delete_job(process_id=queue_id)


def pysqa_execute_command(
commands: str,
working_directory: Optional[str] = None,
split_output: bool = True,
shell: bool = False,
error_filename: str = "pysqa.err",
) -> Union[str, list[str]]:
Comment on lines +32 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix type annotation and parameter usage inconsistency.

The function signature declares commands: str but the implementation handles both strings and lists. Additionally, the error_filename parameter is documented but never used in the implementation.

 def pysqa_execute_command(
-    commands: str,
+    commands: Union[str, list[str]],
     working_directory: Optional[str] = None,
     split_output: bool = True,
     shell: bool = False,
-    error_filename: str = "pysqa.err",
 ) -> Union[str, list[str]]:

If error_filename is intended for future use, consider adding a comment explaining its purpose or implement its usage.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def pysqa_execute_command(
commands: str,
working_directory: Optional[str] = None,
split_output: bool = True,
shell: bool = False,
error_filename: str = "pysqa.err",
) -> Union[str, list[str]]:
def pysqa_execute_command(
commands: Union[str, list[str]],
working_directory: Optional[str] = None,
split_output: bool = True,
shell: bool = False,
) -> Union[str, list[str]]:
# ...
🤖 Prompt for AI Agents
In executorlib/standalone/scheduler.py around lines 32 to 38, the function
pysqa_execute_command has a type annotation for commands as str, but the
implementation supports both str and list types, causing inconsistency. Also,
the error_filename parameter is declared but unused. Fix this by updating the
commands parameter type annotation to accept both str and list[str], and either
implement the usage of error_filename in the function or add a comment
explaining its intended future use.

"""
A wrapper around the subprocess.check_output function. Modified from pysqa to raise an exception if the subprocess
fails to submit the job to the queue.

Args:
commands (str): The command(s) to be executed on the command line
working_directory (str, optional): The directory where the command is executed. Defaults to None.
split_output (bool, optional): Boolean flag to split newlines in the output. Defaults to True.
shell (bool, optional): Additional switch to convert commands to a single string. Defaults to False.
error_filename (str, optional): In case the execution fails, the output is written to this file. Defaults to "pysqa.err".

Returns:
Union[str, List[str]]: Output of the shell command either as a string or as a list of strings
"""
if shell and isinstance(commands, list):
commands = " ".join(commands)
out = subprocess.check_output(
commands,
cwd=working_directory,
stderr=subprocess.STDOUT,
universal_newlines=True,
shell=not isinstance(commands, list),
)
if out is not None and split_output:
return out.split("\n")
else:
return out
67 changes: 3 additions & 64 deletions executorlib/task_scheduler/file/queue_spawner.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import contextlib
import os
import subprocess
from typing import Optional, Union
from typing import Optional

from pysqa import QueueAdapter

from executorlib.standalone.inputcheck import check_file_exists
from executorlib.standalone.scheduler import pysqa_execute_command, terminate_with_pysqa
from executorlib.task_scheduler.file.hdf import dump, get_queue_id


Expand Down Expand Up @@ -43,7 +42,7 @@ def execute_with_pysqa(
qa = QueueAdapter(
directory=config_directory,
queue_type=backend,
execute_command=_pysqa_execute_command,
execute_command=pysqa_execute_command,
)
queue_id = get_queue_id(file_name=file_name)
if os.path.exists(file_name) and (
Expand Down Expand Up @@ -91,30 +90,6 @@ def execute_with_pysqa(
return queue_id


def terminate_with_pysqa(
queue_id: int,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
):
"""
Delete job from queuing system

Args:
queue_id (int): Queuing system ID of the job to delete.
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
"""
qa = QueueAdapter(
directory=config_directory,
queue_type=backend,
execute_command=_pysqa_execute_command,
)
status = qa.get_status_of_job(process_id=queue_id)
if status is not None and status not in ["finished", "error"]:
with contextlib.suppress(subprocess.CalledProcessError):
qa.delete_job(process_id=queue_id)


def terminate_tasks_in_cache(
cache_directory: str,
config_directory: Optional[str] = None,
Expand All @@ -140,39 +115,3 @@ def terminate_tasks_in_cache(
config_directory=config_directory,
backend=backend,
)


def _pysqa_execute_command(
commands: str,
working_directory: Optional[str] = None,
split_output: bool = True,
shell: bool = False,
error_filename: str = "pysqa.err",
) -> Union[str, list[str]]:
"""
A wrapper around the subprocess.check_output function. Modified from pysqa to raise an exception if the subprocess
fails to submit the job to the queue.

Args:
commands (str): The command(s) to be executed on the command line
working_directory (str, optional): The directory where the command is executed. Defaults to None.
split_output (bool, optional): Boolean flag to split newlines in the output. Defaults to True.
shell (bool, optional): Additional switch to convert commands to a single string. Defaults to False.
error_filename (str, optional): In case the execution fails, the output is written to this file. Defaults to "pysqa.err".

Returns:
Union[str, List[str]]: Output of the shell command either as a string or as a list of strings
"""
if shell and isinstance(commands, list):
commands = " ".join(commands)
out = subprocess.check_output(
commands,
cwd=working_directory,
stderr=subprocess.STDOUT,
universal_newlines=True,
shell=not isinstance(commands, list),
)
if out is not None and split_output:
return out.split("\n")
else:
return out
6 changes: 2 additions & 4 deletions executorlib/task_scheduler/file/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
)

try:
from executorlib.task_scheduler.file.queue_spawner import (
execute_with_pysqa,
terminate_with_pysqa,
)
from executorlib.standalone.scheduler import terminate_with_pysqa
from executorlib.task_scheduler.file.queue_spawner import execute_with_pysqa
except ImportError:
# If pysqa is not available fall back to executing tasks in a subprocess
execute_with_pysqa = execute_in_subprocess # type: ignore
Expand Down
3 changes: 2 additions & 1 deletion tests/test_fluxclusterexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
try:
import flux.job
from executorlib.task_scheduler.file.hdf import dump
from executorlib.task_scheduler.file.queue_spawner import terminate_with_pysqa, terminate_tasks_in_cache, execute_with_pysqa
from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache, execute_with_pysqa
from executorlib.standalone.scheduler import terminate_with_pysqa

skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("EXECUTORLIB_PMIX", None)
Expand Down
8 changes: 4 additions & 4 deletions tests/test_interactive_slurmspawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from executorlib.task_scheduler.interactive.slurmspawner import generate_slurm_command

try:
from executorlib.task_scheduler.file.queue_spawner import _pysqa_execute_command
from executorlib.standalone.scheduler import pysqa_execute_command

skip_pysqa_test = False
except ImportError:
Expand All @@ -14,7 +14,7 @@
)
class TestPysqaExecuteCommand(unittest.TestCase):
def test_pysqa_execute_command_list(self):
out = _pysqa_execute_command(
out = pysqa_execute_command(
commands=["echo", "test"],
working_directory=None,
split_output=True,
Expand All @@ -25,7 +25,7 @@ def test_pysqa_execute_command_list(self):
self.assertEqual("test", out[0])

def test_pysqa_execute_command_string(self):
out = _pysqa_execute_command(
out = pysqa_execute_command(
commands="echo test",
working_directory=None,
split_output=False,
Expand All @@ -37,7 +37,7 @@ def test_pysqa_execute_command_string(self):

def test_pysqa_execute_command_fail(self):
with self.assertRaises(FileNotFoundError):
_pysqa_execute_command(
pysqa_execute_command(
commands=["no/executable/available"],
working_directory=None,
split_output=True,
Expand Down
Loading