diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 8547f6b2..2bf66ec7 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -36,4 +36,11 @@ "SlurmClusterExecutor", ] +try: + from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache + + __all__ += ["terminate_tasks_in_cache"] +except ImportError: + pass + __version__ = _version.__version__ diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index 2ec1400e..16dff14f 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -33,7 +33,7 @@ def execute_with_pysqa( cwd: None, } config_directory (str, optional): path to the config directory. - backend (str, optional): name of the backend used to spawn tasks. + backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"]. Returns: int: queuing system ID @@ -102,7 +102,7 @@ def terminate_with_pysqa( Args: queue_id (int): Queuing system ID of the job to delete. config_directory (str, optional): path to the config directory. - backend (str, optional): name of the backend used to spawn tasks. + backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"]. """ qa = QueueAdapter( directory=config_directory, @@ -115,6 +115,33 @@ def terminate_with_pysqa( qa.delete_job(process_id=queue_id) +def terminate_tasks_in_cache( + cache_directory: str, + config_directory: Optional[str] = None, + backend: Optional[str] = None, +): + """ + Delete all jobs stored in the cache directory from the queuing system + + Args: + cache_directory (str): The directory to store cache files. + config_directory (str, optional): path to the config directory. + backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"]. + """ + hdf5_file_lst = [] + for root, _, files in os.walk(cache_directory): + hdf5_file_lst += [os.path.join(root, f) for f in files if f[-5:] == "_i.h5"] + + for f in hdf5_file_lst: + queue_id = get_queue_id(f) + if queue_id is not None: + terminate_with_pysqa( + queue_id=queue_id, + config_directory=config_directory, + backend=backend, + ) + + def _pysqa_execute_command( commands: str, working_directory: Optional[str] = None, diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index cfb838aa..46637606 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -9,6 +9,7 @@ try: import flux.job from executorlib.task_scheduler.file.hdf import dump + from executorlib.task_scheduler.file.queue_spawner import terminate_with_pysqa, terminate_tasks_in_cache skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("EXECUTORLIB_PMIX", None) @@ -58,6 +59,9 @@ def test_executor_no_cwd(self): self.assertEqual(len(os.listdir("executorlib_cache")), 2) self.assertTrue(fs1.done()) + def test_terminate_with_pysqa(self): + self.assertIsNone(terminate_with_pysqa(queue_id=1, backend="flux")) + def test_executor_existing_files(self): with FluxClusterExecutor( resource_dict={"cores": 2, "cwd": "executorlib_cache"}, @@ -89,5 +93,13 @@ def test_executor_existing_files(self): self.assertTrue(fs1.done()) self.assertEqual(len(os.listdir("executorlib_cache")), 4) + def test_terminate_tasks_in_cache(self): + file = os.path.join("executorlib_cache", "test_i.h5") + dump(file_name=file, data_dict={"queue_id": 1}) + self.assertIsNone(terminate_tasks_in_cache( + cache_directory="executorlib_cache", + backend="flux", + )) + def tearDown(self): shutil.rmtree("executorlib_cache", ignore_errors=True)