From 0b3395675c0711f99aa7329b4423f154720f3047 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 2 Jul 2025 17:47:07 +0200 Subject: [PATCH 01/10] Fix SlurmClusterExecutor bug --- executorlib/task_scheduler/file/shared.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 5af53074..99623eea 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -112,7 +112,7 @@ def execute_tasks_h5( cache_key=cache_key, ) if task_key not in memory_dict: - if task_key + "_o.h5" not in get_cache_files( + if os.path.join(cache_directory, task_key + "_o.h5") not in get_cache_files( cache_directory=cache_directory ): file_name = os.path.join(cache_directory, task_key + "_i.h5") From 504b08bfd6e2dadec400b0443eacc01cb2f54193 Mon Sep 17 00:00:00 2001 From: pyiron-runner Date: Wed, 2 Jul 2025 15:48:02 +0000 Subject: [PATCH 02/10] Format black --- executorlib/task_scheduler/file/shared.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 99623eea..d7212674 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -112,9 +112,9 @@ def execute_tasks_h5( cache_key=cache_key, ) if task_key not in memory_dict: - if os.path.join(cache_directory, task_key + "_o.h5") not in get_cache_files( - cache_directory=cache_directory - ): + if os.path.join( + cache_directory, task_key + "_o.h5" + ) not in get_cache_files(cache_directory=cache_directory): file_name = os.path.join(cache_directory, task_key + "_i.h5") dump(file_name=file_name, data_dict=data_dict) if not disable_dependencies: From 2af3c615926378f5d38a55e0cbe5b0635f39c290 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 2 Jul 2025 18:04:25 +0200 Subject: [PATCH 03/10] SlurmClusterExecutor - resubmission of crashed jobs --- executorlib/task_scheduler/file/shared.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 5af53074..5ccb7782 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -116,7 +116,11 @@ def execute_tasks_h5( cache_directory=cache_directory ): file_name = os.path.join(cache_directory, task_key + "_i.h5") - dump(file_name=file_name, data_dict=data_dict) + try: + dump(file_name=file_name, data_dict=data_dict) + except ValueError: + os.remove(file_name) + dump(file_name=file_name, data_dict=data_dict) if not disable_dependencies: task_dependent_lst = [ process_dict[k] for k in future_wait_key_lst From e85953e8b2cdd1104989ce9984d377f4f7eee085 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 2 Jul 2025 18:07:55 +0200 Subject: [PATCH 04/10] remove file rather than try and except --- executorlib/task_scheduler/file/shared.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 5ccb7782..50e22de5 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -116,11 +116,9 @@ def execute_tasks_h5( cache_directory=cache_directory ): file_name = os.path.join(cache_directory, task_key + "_i.h5") - try: - dump(file_name=file_name, data_dict=data_dict) - except ValueError: + if os.path.exists(file_name): os.remove(file_name) - dump(file_name=file_name, data_dict=data_dict) + dump(file_name=file_name, data_dict=data_dict) if not disable_dependencies: task_dependent_lst = [ process_dict[k] for k in future_wait_key_lst From 39d53fcd8fa71cd0375de1e3b7362f7ab43577b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 2 Jul 2025 19:22:11 +0200 Subject: [PATCH 05/10] add test --- tests/test_fluxclusterexecutor.py | 44 +++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index dab9985e..92fa3606 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -41,7 +41,51 @@ def test_executor(self): fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertEqual(len(os.listdir("executorlib_cache")), 4) self.assertTrue(fs1.done()) + def test_executor_existing_files(self): + with FluxClusterExecutor( + resource_dict={"cores": 2, "cwd": "executorlib_cache"}, + block_allocation=False, + cache_directory="executorlib_cache", + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(mpi_funct, 1) + self.assertFalse(fs1.done()) + self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs1.done()) + self.assertEqual(len(os.listdir("executorlib_cache")), 4) + + with FluxClusterExecutor( + resource_dict={"cores": 2, "cwd": "executorlib_cache"}, + block_allocation=False, + cache_directory="executorlib_cache", + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(mpi_funct, 1) + self.assertFalse(fs1.done()) + self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs1.done()) + self.assertEqual(len(os.listdir("executorlib_cache")), 4) + for file_name in os.listdir("executorlib_cache"): + file_path = os.path.join("executorlib_cache", file_name ) + os.remove(file_path) + if ".h5" in file_path: + with open(file_path, "w") as f: + f.write("test") + + with FluxClusterExecutor( + resource_dict={"cores": 2, "cwd": "executorlib_cache"}, + block_allocation=False, + cache_directory="executorlib_cache", + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(mpi_funct, 1) + self.assertFalse(fs1.done()) + self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs1.done()) + self.assertEqual(len(os.listdir("executorlib_cache")), 4) + def tearDown(self): shutil.rmtree("executorlib_cache", ignore_errors=True) From 740058121c04ff8038afe8425ce4da808904e81f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 2 Jul 2025 19:29:48 +0200 Subject: [PATCH 06/10] extend tests --- tests/test_fluxclusterexecutor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index 92fa3606..88a7fa88 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -67,7 +67,7 @@ def test_executor_existing_files(self): self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) self.assertTrue(fs1.done()) - self.assertEqual(len(os.listdir("executorlib_cache")), 4) + self.assertEqual(len(os.listdir("executorlib_cache")), 5) for file_name in os.listdir("executorlib_cache"): file_path = os.path.join("executorlib_cache", file_name ) os.remove(file_path) @@ -85,7 +85,7 @@ def test_executor_existing_files(self): self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) self.assertTrue(fs1.done()) - self.assertEqual(len(os.listdir("executorlib_cache")), 4) + self.assertEqual(len(os.listdir("executorlib_cache")), 6) def tearDown(self): shutil.rmtree("executorlib_cache", ignore_errors=True) From 17fadfae1f228120c67b5a97185226adbd6c38b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 2 Jul 2025 19:43:15 +0200 Subject: [PATCH 07/10] fixes --- tests/test_fluxclusterexecutor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index 88a7fa88..92fa3606 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -67,7 +67,7 @@ def test_executor_existing_files(self): self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) self.assertTrue(fs1.done()) - self.assertEqual(len(os.listdir("executorlib_cache")), 5) + self.assertEqual(len(os.listdir("executorlib_cache")), 4) for file_name in os.listdir("executorlib_cache"): file_path = os.path.join("executorlib_cache", file_name ) os.remove(file_path) @@ -85,7 +85,7 @@ def test_executor_existing_files(self): self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) self.assertTrue(fs1.done()) - self.assertEqual(len(os.listdir("executorlib_cache")), 6) + self.assertEqual(len(os.listdir("executorlib_cache")), 4) def tearDown(self): shutil.rmtree("executorlib_cache", ignore_errors=True) From 8a7b9932a6b34ae311da4073c61c89510716fc1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 2 Jul 2025 19:53:57 +0200 Subject: [PATCH 08/10] fix file write --- tests/test_fluxclusterexecutor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index 92fa3606..8fa1b095 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -5,6 +5,7 @@ from executorlib import FluxClusterExecutor from executorlib.standalone.serialize import cloudpickle_register +from executorlib.task_scheduler.file.hdf import dump try: import flux.job @@ -72,8 +73,7 @@ def test_executor_existing_files(self): file_path = os.path.join("executorlib_cache", file_name ) os.remove(file_path) if ".h5" in file_path: - with open(file_path, "w") as f: - f.write("test") + dump(file_name=file_path, data_dict={"a": 1}) with FluxClusterExecutor( resource_dict={"cores": 2, "cwd": "executorlib_cache"}, From 5c0493116915511136f6a3f91eed32c3b53a57d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 2 Jul 2025 19:57:04 +0200 Subject: [PATCH 09/10] fix import --- tests/test_fluxclusterexecutor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index 8fa1b095..e37d1018 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -5,10 +5,10 @@ from executorlib import FluxClusterExecutor from executorlib.standalone.serialize import cloudpickle_register -from executorlib.task_scheduler.file.hdf import dump try: import flux.job + from executorlib.task_scheduler.file.hdf import dump skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("EXECUTORLIB_PMIX", None) From 21e5a4f3790ccbc4e989caba723ceb1504929ce6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 2 Jul 2025 20:22:09 +0200 Subject: [PATCH 10/10] input files --- tests/test_fluxclusterexecutor.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index e37d1018..f99b9fb7 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -46,18 +46,6 @@ def test_executor(self): self.assertTrue(fs1.done()) def test_executor_existing_files(self): - with FluxClusterExecutor( - resource_dict={"cores": 2, "cwd": "executorlib_cache"}, - block_allocation=False, - cache_directory="executorlib_cache", - ) as exe: - cloudpickle_register(ind=1) - fs1 = exe.submit(mpi_funct, 1) - self.assertFalse(fs1.done()) - self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) - self.assertTrue(fs1.done()) - self.assertEqual(len(os.listdir("executorlib_cache")), 4) - with FluxClusterExecutor( resource_dict={"cores": 2, "cwd": "executorlib_cache"}, block_allocation=False, @@ -73,7 +61,8 @@ def test_executor_existing_files(self): file_path = os.path.join("executorlib_cache", file_name ) os.remove(file_path) if ".h5" in file_path: - dump(file_name=file_path, data_dict={"a": 1}) + task_key = file_path[:-5] + "_i.h5" + dump(file_name=task_key, data_dict={"a": 1}) with FluxClusterExecutor( resource_dict={"cores": 2, "cwd": "executorlib_cache"},