Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,11 @@
"SlurmClusterExecutor",
]

try:
from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache

__all__ += ["terminate_tasks_in_cache"]
except ImportError:
pass

__version__ = _version.__version__
31 changes: 29 additions & 2 deletions executorlib/task_scheduler/file/queue_spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def execute_with_pysqa(
cwd: None,
}
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks.
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].

Returns:
int: queuing system ID
Expand Down Expand Up @@ -102,7 +102,7 @@ def terminate_with_pysqa(
Args:
queue_id (int): Queuing system ID of the job to delete.
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks.
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
"""
qa = QueueAdapter(
directory=config_directory,
Expand All @@ -115,6 +115,33 @@ def terminate_with_pysqa(
qa.delete_job(process_id=queue_id)


def terminate_tasks_in_cache(
cache_directory: str,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
):
"""
Delete all jobs stored in the cache directory from the queuing system

Args:
cache_directory (str): The directory to store cache files.
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
"""
hdf5_file_lst = []
for root, _, files in os.walk(cache_directory):
hdf5_file_lst += [os.path.join(root, f) for f in files if f[-5:] == "_i.h5"]

for f in hdf5_file_lst:
queue_id = get_queue_id(f)
if queue_id is not None:
terminate_with_pysqa(
queue_id=queue_id,
config_directory=config_directory,
backend=backend,
)


def _pysqa_execute_command(
commands: str,
working_directory: Optional[str] = None,
Expand Down
12 changes: 12 additions & 0 deletions tests/test_fluxclusterexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
try:
import flux.job
from executorlib.task_scheduler.file.hdf import dump
from executorlib.task_scheduler.file.queue_spawner import terminate_with_pysqa, terminate_tasks_in_cache

skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("EXECUTORLIB_PMIX", None)
Expand Down Expand Up @@ -58,6 +59,9 @@ def test_executor_no_cwd(self):
self.assertEqual(len(os.listdir("executorlib_cache")), 2)
self.assertTrue(fs1.done())

def test_terminate_with_pysqa(self):
self.assertIsNone(terminate_with_pysqa(queue_id=1, backend="flux"))
Comment on lines +62 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance test coverage for better validation.

The current test only verifies that the function executes without errors, but doesn't validate the actual termination behavior. Consider:

  1. Mock the QueueAdapter: Use mocking to verify that get_status_of_job and delete_job are called appropriately
  2. Test edge cases: Test scenarios where the job doesn't exist or is already finished
  3. Parameterize backends: Test both "slurm" and "flux" backends if possible

Example improvement:

+    @unittest.mock.patch('executorlib.task_scheduler.file.queue_spawner.QueueAdapter')
+    def test_terminate_with_pysqa(self, mock_qa_class):
+        mock_qa = mock_qa_class.return_value
+        mock_qa.get_status_of_job.return_value = "running"
+        
+        result = terminate_with_pysqa(queue_id=1, backend="flux")
+        
+        self.assertIsNone(result)
+        mock_qa.get_status_of_job.assert_called_once_with(process_id=1)
+        mock_qa.delete_job.assert_called_once_with(process_id=1)
-    def test_terminate_with_pysqa(self):
-        self.assertIsNone(terminate_with_pysqa(queue_id=1, backend="flux"))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def test_terminate_with_pysqa(self):
self.assertIsNone(terminate_with_pysqa(queue_id=1, backend="flux"))
@unittest.mock.patch('executorlib.task_scheduler.file.queue_spawner.QueueAdapter')
def test_terminate_with_pysqa(self, mock_qa_class):
mock_qa = mock_qa_class.return_value
mock_qa.get_status_of_job.return_value = "running"
result = terminate_with_pysqa(queue_id=1, backend="flux")
self.assertIsNone(result)
mock_qa.get_status_of_job.assert_called_once_with(process_id=1)
mock_qa.delete_job.assert_called_once_with(process_id=1)
🤖 Prompt for AI Agents
In tests/test_fluxclusterexecutor.py around lines 62 to 63, the test for
terminate_with_pysqa only checks for no errors but does not verify the
function's behavior. Enhance the test by mocking the QueueAdapter to assert that
get_status_of_job and delete_job are called correctly. Add tests for edge cases
such as when the job does not exist or is already finished. Also, parameterize
the test to cover both "slurm" and "flux" backends to ensure broader coverage.


def test_executor_existing_files(self):
with FluxClusterExecutor(
resource_dict={"cores": 2, "cwd": "executorlib_cache"},
Expand Down Expand Up @@ -89,5 +93,13 @@ def test_executor_existing_files(self):
self.assertTrue(fs1.done())
self.assertEqual(len(os.listdir("executorlib_cache")), 4)

def test_terminate_tasks_in_cache(self):
file = os.path.join("executorlib_cache", "test_i.h5")
dump(file_name=file, data_dict={"queue_id": 1})
self.assertIsNone(terminate_tasks_in_cache(
cache_directory="executorlib_cache",
backend="flux",
))

def tearDown(self):
shutil.rmtree("executorlib_cache", ignore_errors=True)
Loading