From 981b77eb7ea50677944c865a13e21c3b34260421 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 2 Jul 2025 17:52:55 +0200 Subject: [PATCH 1/5] SlurmClusterExecutor - create a separate working directory for each job Even when the user does not define a specific working directory, otherwise the submission script and the error log gets overwritten. --- executorlib/task_scheduler/file/queue_spawner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index ac6ecc5b..fb53f240 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -50,7 +50,9 @@ def execute_with_pysqa( if "cwd" in resource_dict and resource_dict["cwd"] is not None: cwd = resource_dict["cwd"] else: - cwd = cache_directory + folder = command[-1].split("_i.h5")[0] + cwd = os.path.join(cache_directory, folder) + os.makedirs(cwd, exist_ok=True) submit_kwargs = { "command": " ".join(command), "dependency_list": [str(qid) for qid in task_dependent_lst], From cab77e30d9c76eeb2d19ea8ca62ea5f0ac851b06 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 2 Jul 2025 18:12:08 +0200 Subject: [PATCH 2/5] The cache directory is a mandatory input --- executorlib/task_scheduler/file/queue_spawner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index fb53f240..ec3c34f1 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -10,18 +10,19 @@ def execute_with_pysqa( command: list, + cache_directory: str, task_dependent_lst: Optional[list[int]] = None, file_name: Optional[str] = None, resource_dict: Optional[dict] = None, config_directory: Optional[str] = None, backend: Optional[str] = None, - cache_directory: Optional[str] = None, ) -> Optional[int]: """ Execute a command by submitting it to the queuing system Args: command (list): The command to be executed. + cache_directory (str): The directory to store the HDF5 files. task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. file_name (str): Name of the HDF5 file which contains the Python function resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function. @@ -30,7 +31,6 @@ def execute_with_pysqa( } config_directory (str, optional): path to the config directory. backend (str, optional): name of the backend used to spawn tasks. - cache_directory (str): The directory to store the HDF5 files. Returns: int: queuing system ID From 95359586a162ebbf08d168511efd4de46e579f56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 2 Jul 2025 18:52:51 +0200 Subject: [PATCH 3/5] extend tests --- tests/test_fluxclusterexecutor.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index dab9985e..d64b2824 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -40,6 +40,20 @@ def test_executor(self): cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) + self.assertEqual(len(os.listdir("executorlib_cache")), 2) + self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs1.done()) + + def test_executor_no_cwd(self): + with FluxClusterExecutor( + resource_dict={"cores": 2}, + block_allocation=False, + cache_directory="executorlib_cache", + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(mpi_funct, 1) + self.assertFalse(fs1.done()) + self.assertEqual(len(os.listdir("executorlib_cache")), 2) self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) self.assertTrue(fs1.done()) From da6f6898d8b58f424062110de2e31d0fe1dc8b88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 2 Jul 2025 18:57:40 +0200 Subject: [PATCH 4/5] test length after successful execution --- tests/test_fluxclusterexecutor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index d64b2824..3a01aeb0 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -40,8 +40,8 @@ def test_executor(self): cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) - self.assertEqual(len(os.listdir("executorlib_cache")), 2) self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertEqual(len(os.listdir("executorlib_cache")), 2) self.assertTrue(fs1.done()) def test_executor_no_cwd(self): @@ -53,8 +53,8 @@ def test_executor_no_cwd(self): cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) - self.assertEqual(len(os.listdir("executorlib_cache")), 2) self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertEqual(len(os.listdir("executorlib_cache")), 2) self.assertTrue(fs1.done()) def tearDown(self): From 0d9ce2b27df8bd0d500911b4abf62350585bc2d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 2 Jul 2025 19:05:21 +0200 Subject: [PATCH 5/5] fix --- tests/test_fluxclusterexecutor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index 3a01aeb0..695f0709 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -41,7 +41,7 @@ def test_executor(self): fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) - self.assertEqual(len(os.listdir("executorlib_cache")), 2) + self.assertEqual(len(os.listdir("executorlib_cache")), 4) self.assertTrue(fs1.done()) def test_executor_no_cwd(self):