From c2ad08949ebe6ea20e65a69b05610d716db18ab4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 16 Jul 2025 22:31:22 +0200 Subject: [PATCH 1/9] Add terminate_tasks_in_cache() function to kill remaining tasks --- executorlib/__init__.py | 6 +++++ .../task_scheduler/file/queue_spawner.py | 25 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 8547f6b2..3697e298 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -36,4 +36,10 @@ "SlurmClusterExecutor", ] +try: + from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache + __all__ += ["terminate_tasks_in_cache"] +except ImportError: + pass + __version__ = _version.__version__ diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index 7060dd6a..2465a17c 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -113,6 +113,31 @@ def terminate_with_pysqa( qa.delete_job(process_id=queue_id) +def terminate_tasks_in_cache( + cache_directory: str, + config_directory: Optional[str] = None, + backend: Optional[str] = None, +): + """ + Delete all jobs stored in the cache directory from the queuing system + + Args: + cache_directory (str): The directory to store cache files. + config_directory (str, optional): path to the config directory. + backend (str, optional): name of the backend used to spawn tasks. + """ + hdf5_file_lst = [] + for root, folder, files in os.walk(cache_directory): + hdf5_file_lst += [os.path.join(root, f) for f in files if "_i.h5" == f[-5:]] + + for f in hdf5_file_lst: + terminate_with_pysqa( + queue_id=get_queue_id(f), + config_directory=config_directory, + backend=backend, + ) + + def _pysqa_execute_command( commands: str, working_directory: Optional[str] = None, From a8b8c8ca623185c25a2c904b295ac2f0e927e8ad Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 16 Jul 2025 20:34:42 +0000 Subject: [PATCH 2/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/__init__.py | 1 + executorlib/task_scheduler/file/queue_spawner.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 3697e298..2bf66ec7 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -38,6 +38,7 @@ try: from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache + __all__ += ["terminate_tasks_in_cache"] except ImportError: pass diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index 2465a17c..a3dd319f 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -128,7 +128,7 @@ def terminate_tasks_in_cache( """ hdf5_file_lst = [] for root, folder, files in os.walk(cache_directory): - hdf5_file_lst += [os.path.join(root, f) for f in files if "_i.h5" == f[-5:]] + hdf5_file_lst += [os.path.join(root, f) for f in files if f[-5:] == "_i.h5"] for f in hdf5_file_lst: terminate_with_pysqa( From a6dffc55f3fcd5e2af401db572d2dd08c67affc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 16 Jul 2025 22:36:42 +0200 Subject: [PATCH 3/9] Remove unused variable --- executorlib/task_scheduler/file/queue_spawner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index 2465a17c..50bb2d53 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -127,7 +127,7 @@ def terminate_tasks_in_cache( backend (str, optional): name of the backend used to spawn tasks. """ hdf5_file_lst = [] - for root, folder, files in os.walk(cache_directory): + for root, _, files in os.walk(cache_directory): hdf5_file_lst += [os.path.join(root, f) for f in files if "_i.h5" == f[-5:]] for f in hdf5_file_lst: From f1cc703a4290ed4b3bbb9ac12aa1a20ac8a1b832 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 16 Jul 2025 20:38:11 +0000 Subject: [PATCH 4/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/task_scheduler/file/queue_spawner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index 50bb2d53..ebced754 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -128,7 +128,7 @@ def terminate_tasks_in_cache( """ hdf5_file_lst = [] for root, _, files in os.walk(cache_directory): - hdf5_file_lst += [os.path.join(root, f) for f in files if "_i.h5" == f[-5:]] + hdf5_file_lst += [os.path.join(root, f) for f in files if f[-5:] == "_i.h5"] for f in hdf5_file_lst: terminate_with_pysqa( From 689cc573a1aa8c64cf12868bde4895feceeb4a0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 16 Jul 2025 22:42:25 +0200 Subject: [PATCH 5/9] fix type hint --- executorlib/task_scheduler/file/queue_spawner.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index 50bb2d53..7d01e402 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -131,11 +131,13 @@ def terminate_tasks_in_cache( hdf5_file_lst += [os.path.join(root, f) for f in files if "_i.h5" == f[-5:]] for f in hdf5_file_lst: - terminate_with_pysqa( - queue_id=get_queue_id(f), - config_directory=config_directory, - backend=backend, - ) + queue_id = get_queue_id(f) + if queue_id is not None: + terminate_with_pysqa( + queue_id=queue_id, + config_directory=config_directory, + backend=backend, + ) def _pysqa_execute_command( From d535493ab33fa4fff9fe136512811fec36214806 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 16 Jul 2025 23:07:18 +0200 Subject: [PATCH 6/9] Add Docstrings --- executorlib/task_scheduler/file/queue_spawner.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index 7307d713..16dff14f 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -33,7 +33,7 @@ def execute_with_pysqa( cwd: None, } config_directory (str, optional): path to the config directory. - backend (str, optional): name of the backend used to spawn tasks. + backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"]. Returns: int: queuing system ID @@ -102,7 +102,7 @@ def terminate_with_pysqa( Args: queue_id (int): Queuing system ID of the job to delete. config_directory (str, optional): path to the config directory. - backend (str, optional): name of the backend used to spawn tasks. + backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"]. """ qa = QueueAdapter( directory=config_directory, @@ -126,7 +126,7 @@ def terminate_tasks_in_cache( Args: cache_directory (str): The directory to store cache files. config_directory (str, optional): path to the config directory. - backend (str, optional): name of the backend used to spawn tasks. + backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"]. """ hdf5_file_lst = [] for root, _, files in os.walk(cache_directory): From 8aad2404c358fcda137842456e49dcebf16877d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 16 Jul 2025 23:07:30 +0200 Subject: [PATCH 7/9] Add flux based tests --- tests/test_fluxclusterexecutor.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index cfb838aa..d555aec8 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -9,6 +9,7 @@ try: import flux.job from executorlib.task_scheduler.file.hdf import dump + from executorlib.task_scheduler.file.queue_spawner import terminate_with_pysqa, terminate_tasks_in_cache skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("EXECUTORLIB_PMIX", None) @@ -58,6 +59,9 @@ def test_executor_no_cwd(self): self.assertEqual(len(os.listdir("executorlib_cache")), 2) self.assertTrue(fs1.done()) + def test_terminate_with_pysqa(self): + self.assertisNone(terminate_with_pysqa(queue_id=1, backend="flux")) + def test_executor_existing_files(self): with FluxClusterExecutor( resource_dict={"cores": 2, "cwd": "executorlib_cache"}, @@ -89,5 +93,13 @@ def test_executor_existing_files(self): self.assertTrue(fs1.done()) self.assertEqual(len(os.listdir("executorlib_cache")), 4) + def terminate_tasks_in_cache(self): + file = os.path.join("executorlib_cache", "test_i.h5") + dump(file_name=file, data_dict={"queue_id": 1}) + self.assertisNone(terminate_tasks_in_cache( + cache_directory="executorlib_cache", + backend="flux", + )) + def tearDown(self): shutil.rmtree("executorlib_cache", ignore_errors=True) From 3d82db4075c02dcc6e97bd4db7720e965e5f6b6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 16 Jul 2025 23:12:42 +0200 Subject: [PATCH 8/9] fix tests --- tests/test_fluxclusterexecutor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index d555aec8..6d19fb8c 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -60,7 +60,7 @@ def test_executor_no_cwd(self): self.assertTrue(fs1.done()) def test_terminate_with_pysqa(self): - self.assertisNone(terminate_with_pysqa(queue_id=1, backend="flux")) + self.assertIsNone(terminate_with_pysqa(queue_id=1, backend="flux")) def test_executor_existing_files(self): with FluxClusterExecutor( @@ -96,7 +96,7 @@ def test_executor_existing_files(self): def terminate_tasks_in_cache(self): file = os.path.join("executorlib_cache", "test_i.h5") dump(file_name=file, data_dict={"queue_id": 1}) - self.assertisNone(terminate_tasks_in_cache( + self.assertIsNone(terminate_tasks_in_cache( cache_directory="executorlib_cache", backend="flux", )) From b27655214ac477503c5d4ab47c4a94a9f02a4c56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 16 Jul 2025 23:18:03 +0200 Subject: [PATCH 9/9] functions need to be named test --- tests/test_fluxclusterexecutor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index 6d19fb8c..46637606 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -93,7 +93,7 @@ def test_executor_existing_files(self): self.assertTrue(fs1.done()) self.assertEqual(len(os.listdir("executorlib_cache")), 4) - def terminate_tasks_in_cache(self): + def test_terminate_tasks_in_cache(self): file = os.path.join("executorlib_cache", "test_i.h5") dump(file_name=file, data_dict={"queue_id": 1}) self.assertIsNone(terminate_tasks_in_cache(