diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index d7212674..665ecbe4 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -116,6 +116,8 @@ def execute_tasks_h5( cache_directory, task_key + "_o.h5" ) not in get_cache_files(cache_directory=cache_directory): file_name = os.path.join(cache_directory, task_key + "_i.h5") + if os.path.exists(file_name): + os.remove(file_name) dump(file_name=file_name, data_dict=data_dict) if not disable_dependencies: task_dependent_lst = [ diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index 695f0709..cfb838aa 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -8,6 +8,7 @@ try: import flux.job + from executorlib.task_scheduler.file.hdf import dump skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("EXECUTORLIB_PMIX", None) @@ -57,5 +58,36 @@ def test_executor_no_cwd(self): self.assertEqual(len(os.listdir("executorlib_cache")), 2) self.assertTrue(fs1.done()) + def test_executor_existing_files(self): + with FluxClusterExecutor( + resource_dict={"cores": 2, "cwd": "executorlib_cache"}, + block_allocation=False, + cache_directory="executorlib_cache", + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(mpi_funct, 1) + self.assertFalse(fs1.done()) + self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs1.done()) + self.assertEqual(len(os.listdir("executorlib_cache")), 4) + for file_name in os.listdir("executorlib_cache"): + file_path = os.path.join("executorlib_cache", file_name ) + os.remove(file_path) + if ".h5" in file_path: + task_key = file_path[:-5] + "_i.h5" + dump(file_name=task_key, data_dict={"a": 1}) + + with FluxClusterExecutor( + resource_dict={"cores": 2, "cwd": "executorlib_cache"}, + block_allocation=False, + cache_directory="executorlib_cache", + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(mpi_funct, 1) + self.assertFalse(fs1.done()) + self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs1.done()) + self.assertEqual(len(os.listdir("executorlib_cache")), 4) + def tearDown(self): shutil.rmtree("executorlib_cache", ignore_errors=True)