Skip to content
2 changes: 2 additions & 0 deletions executorlib/task_scheduler/file/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ def execute_tasks_h5(
cache_directory, task_key + "_o.h5"
) not in get_cache_files(cache_directory=cache_directory):
file_name = os.path.join(cache_directory, task_key + "_i.h5")
if os.path.exists(file_name):
os.remove(file_name)
dump(file_name=file_name, data_dict=data_dict)
if not disable_dependencies:
task_dependent_lst = [
Expand Down
32 changes: 32 additions & 0 deletions tests/test_fluxclusterexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

try:
import flux.job
from executorlib.task_scheduler.file.hdf import dump

skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("EXECUTORLIB_PMIX", None)
Expand Down Expand Up @@ -57,5 +58,36 @@ def test_executor_no_cwd(self):
self.assertEqual(len(os.listdir("executorlib_cache")), 2)
self.assertTrue(fs1.done())

def test_executor_existing_files(self):
with FluxClusterExecutor(
resource_dict={"cores": 2, "cwd": "executorlib_cache"},
block_allocation=False,
cache_directory="executorlib_cache",
) as exe:
cloudpickle_register(ind=1)
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
self.assertTrue(fs1.done())
self.assertEqual(len(os.listdir("executorlib_cache")), 4)
for file_name in os.listdir("executorlib_cache"):
file_path = os.path.join("executorlib_cache", file_name )
os.remove(file_path)
if ".h5" in file_path:
task_key = file_path[:-5] + "_i.h5"
dump(file_name=task_key, data_dict={"a": 1})

with FluxClusterExecutor(
resource_dict={"cores": 2, "cwd": "executorlib_cache"},
block_allocation=False,
cache_directory="executorlib_cache",
) as exe:
cloudpickle_register(ind=1)
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
self.assertTrue(fs1.done())
self.assertEqual(len(os.listdir("executorlib_cache")), 4)

def tearDown(self):
shutil.rmtree("executorlib_cache", ignore_errors=True)
Loading