From 833db95ad02d8ed51798a30016ecb24d91c55b3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 11 Jul 2025 08:14:49 +0200 Subject: [PATCH 01/12] Create cache directory only during dump() --- executorlib/task_scheduler/interactive/shared.py | 1 - 1 file changed, 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 3742dcfc..7c27e043 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -160,7 +160,6 @@ def _execute_task_with_cache( resource_dict=task_dict.get("resource_dict", {}), cache_key=cache_key, ) - os.makedirs(cache_directory, exist_ok=True) file_name = os.path.abspath(os.path.join(cache_directory, task_key + "_o.h5")) if file_name not in get_cache_files(cache_directory=cache_directory): f = task_dict.pop("future") From 1213de220156048997091e4671e1e338f28f94eb Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Jul 2025 09:55:36 +0200 Subject: [PATCH 02/12] Update task_scheduler.py --- executorlib/task_scheduler/file/task_scheduler.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index 74eaccdd..f2fdfd96 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -61,12 +61,10 @@ def __init__( ) if execute_function == execute_in_subprocess and terminate_function is None: terminate_function = terminate_subprocess - cache_directory_path = os.path.abspath(cache_directory) - os.makedirs(cache_directory_path, exist_ok=True) self._process_kwargs = { "future_queue": self._future_queue, "execute_function": execute_function, - "cache_directory": cache_directory_path, + "cache_directory": os.path.abspath(cache_directory), "resource_dict": resource_dict, "terminate_function": terminate_function, "pysqa_config_directory": pysqa_config_directory, From e1c6c73f3378f7b419468a9306e6446ec4a610df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 11 Jul 2025 10:14:13 +0200 Subject: [PATCH 03/12] fix cache directory in resource_dict --- executorlib/task_scheduler/file/shared.py | 21 ++++++++++++------- .../task_scheduler/file/task_scheduler.py | 11 +++++----- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 665ecbe4..4a5fa67c 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -52,7 +52,6 @@ def done(self) -> bool: def execute_tasks_h5( future_queue: queue.Queue, - cache_directory: str, execute_function: Callable, resource_dict: dict, terminate_function: Optional[Callable] = None, @@ -65,7 +64,6 @@ def execute_tasks_h5( Args: future_queue (queue.Queue): The queue containing the tasks. - cache_directory (str): The directory to store the HDF5 files. resource_dict (dict): A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - cwd (str/None): current working directory where the parallel python task is executed @@ -104,6 +102,7 @@ def execute_tasks_h5( {k: v for k, v in resource_dict.items() if k not in task_resource_dict} ) cache_key = task_resource_dict.pop("cache_key", None) + cache_directory = task_resource_dict.pop("cache_directory") task_key, data_dict = serialize_funct_h5( fn=task_dict["fn"], fn_args=task_args, @@ -146,15 +145,23 @@ def execute_tasks_h5( file_name_dict[task_key] = os.path.join( cache_directory, task_key + "_o.h5" ) - memory_dict[task_key] = task_dict["future"] + memory_dict[task_key] = { + "future": task_dict["future"], + "cache_directory": cache_directory, + } future_queue.task_done() else: memory_dict = { - key: _check_task_output( - task_key=key, future_obj=value, cache_directory=cache_directory - ) + key: { + "future": _check_task_output( + task_key=key, + future_obj=value["future"], + cache_directory=value["cache_directory"], + ), + "cache_directory": value["cache_directory"], + } for key, value in memory_dict.items() - if not value.done() + if not value["future"].done() } diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index 74eaccdd..c1098927 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -27,7 +27,7 @@ class FileTaskScheduler(TaskSchedulerBase): def __init__( self, - cache_directory: str = "executorlib_cache", + cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None, execute_function: Callable = execute_with_pysqa, terminate_function: Optional[Callable] = None, @@ -53,6 +53,7 @@ def __init__( default_resource_dict = { "cores": 1, "cwd": None, + "cache_directory": cache_directory, } if resource_dict is None: resource_dict = {} @@ -61,12 +62,9 @@ def __init__( ) if execute_function == execute_in_subprocess and terminate_function is None: terminate_function = terminate_subprocess - cache_directory_path = os.path.abspath(cache_directory) - os.makedirs(cache_directory_path, exist_ok=True) self._process_kwargs = { "future_queue": self._future_queue, "execute_function": execute_function, - "cache_directory": cache_directory_path, "resource_dict": resource_dict, "terminate_function": terminate_function, "pysqa_config_directory": pysqa_config_directory, @@ -98,7 +96,9 @@ def create_file_executor( disable_dependencies: bool = False, ): if cache_directory is None: - cache_directory = "executorlib_cache" + resource_dict["cache_directory"] = os.path.abspath("executorlib_cache") + else: + resource_dict["cache_directory"] = os.path.abspath(cache_directory) if block_allocation: raise ValueError( "The option block_allocation is not available with the pysqa based backend." @@ -114,7 +114,6 @@ def create_file_executor( check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) check_flux_log_files(flux_log_files=flux_log_files) return FileTaskScheduler( - cache_directory=cache_directory, resource_dict=resource_dict, pysqa_config_directory=pysqa_config_directory, backend=backend.split("_submission")[0], From 059f88887582cdbad543d5d6c19bd7ef80634b83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 11 Jul 2025 10:20:03 +0200 Subject: [PATCH 04/12] fix type hints --- executorlib/task_scheduler/file/task_scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index c1098927..2bfc9cdf 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -63,9 +63,9 @@ def __init__( if execute_function == execute_in_subprocess and terminate_function is None: terminate_function = terminate_subprocess self._process_kwargs = { + "resource_dict": resource_dict, "future_queue": self._future_queue, "execute_function": execute_function, - "resource_dict": resource_dict, "terminate_function": terminate_function, "pysqa_config_directory": pysqa_config_directory, "backend": backend, @@ -80,11 +80,11 @@ def __init__( def create_file_executor( + resource_dict: dict, max_workers: Optional[int] = None, backend: str = "flux_submission", max_cores: Optional[int] = None, cache_directory: Optional[str] = None, - resource_dict: Optional[dict] = None, flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, From 2b049e5da275b89ce0f1a03f7e17dff1a3697f9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 11 Jul 2025 10:29:21 +0200 Subject: [PATCH 05/12] fixes to test --- .../task_scheduler/file/task_scheduler.py | 3 --- tests/test_cache_fileexecutor_mpi.py | 3 ++- tests/test_cache_fileexecutor_serial.py | 25 ++++++++++++++----- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index 2bfc9cdf..9e504438 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -27,7 +27,6 @@ class FileTaskScheduler(TaskSchedulerBase): def __init__( self, - cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None, execute_function: Callable = execute_with_pysqa, terminate_function: Optional[Callable] = None, @@ -39,7 +38,6 @@ def __init__( Initialize the FileExecutor. Args: - cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". resource_dict (dict): A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - cwd (str/None): current working directory where the parallel python task is executed @@ -53,7 +51,6 @@ def __init__( default_resource_dict = { "cores": 1, "cwd": None, - "cache_directory": cache_directory, } if resource_dict is None: resource_dict = {} diff --git a/tests/test_cache_fileexecutor_mpi.py b/tests/test_cache_fileexecutor_mpi.py index 38f93a9c..19635a21 100644 --- a/tests/test_cache_fileexecutor_mpi.py +++ b/tests/test_cache_fileexecutor_mpi.py @@ -32,7 +32,8 @@ def mpi_funct(i): class TestCacheExecutorMPI(unittest.TestCase): def test_executor(self): with FileTaskScheduler( - resource_dict={"cores": 2}, execute_function=execute_in_subprocess + resource_dict={"cores": 2, "cache_directory": "executorlib_cache"}, + execute_function=execute_in_subprocess, ) as exe: fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) diff --git a/tests/test_cache_fileexecutor_serial.py b/tests/test_cache_fileexecutor_serial.py index 8b68df53..5fe79439 100644 --- a/tests/test_cache_fileexecutor_serial.py +++ b/tests/test_cache_fileexecutor_serial.py @@ -36,21 +36,30 @@ def get_error(a): ) class TestCacheExecutorSerial(unittest.TestCase): def test_executor_mixed(self): - with FileTaskScheduler(execute_function=execute_in_subprocess) as exe: + with FileTaskScheduler( + execute_function=execute_in_subprocess, + resource_dict={"cache_directory": "executorlib_cache"}, + ) as exe: fs1 = exe.submit(my_funct, 1, b=2) self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), 3) self.assertTrue(fs1.done()) def test_executor_mixed_cache_key(self): - with FileTaskScheduler(execute_function=execute_in_subprocess) as exe: + with FileTaskScheduler( + execute_function=execute_in_subprocess, + resource_dict={"cache_directory": "executorlib_cache"}, + ) as exe: fs1 = exe.submit(my_funct, 1, b=2, resource_dict={"cache_key": "a/b/c"}) self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), 3) self.assertTrue(fs1.done()) def test_executor_dependence_mixed(self): - with FileTaskScheduler(execute_function=execute_in_subprocess) as exe: + with FileTaskScheduler( + execute_function=execute_in_subprocess, + resource_dict={"cache_directory": "executorlib_cache"}, + ) as exe: fs1 = exe.submit(my_funct, 1, b=2) fs2 = exe.submit(my_funct, 1, b=fs1) self.assertFalse(fs2.done()) @@ -66,7 +75,9 @@ def test_create_file_executor_error(self): def test_executor_dependence_error(self): with self.assertRaises(ValueError): with FileTaskScheduler( - execute_function=execute_in_subprocess, disable_dependencies=True + execute_function=execute_in_subprocess, + disable_dependencies=True, + resource_dict={"cache_directory": "executorlib_cache"}, ) as exe: fs = exe.submit(my_funct, 1, b=exe.submit(my_funct, 1, b=2)) fs.result() @@ -74,7 +85,8 @@ def test_executor_dependence_error(self): def test_executor_working_directory(self): cwd = os.path.join(os.path.dirname(__file__), "executables") with FileTaskScheduler( - resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess + resource_dict={"cwd": cwd, "cache_directory": "executorlib_cache"}, + execute_function=execute_in_subprocess, ) as exe: fs1 = exe.submit(list_files_in_working_directory) self.assertEqual(fs1.result(), os.listdir(cwd)) @@ -82,7 +94,8 @@ def test_executor_working_directory(self): def test_executor_error(self): cwd = os.path.join(os.path.dirname(__file__), "executables") with FileTaskScheduler( - resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess + resource_dict={"cwd": cwd, "cache_directory": "executorlib_cache"}, + execute_function=execute_in_subprocess, ) as exe: fs1 = exe.submit(get_error, a=1) with self.assertRaises(ValueError): From 45941f60a0788ef26c4b78381131ccf249dd699a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 11 Jul 2025 10:42:02 +0200 Subject: [PATCH 06/12] fix file checks --- executorlib/task_scheduler/file/shared.py | 2 +- executorlib/task_scheduler/file/task_scheduler.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 4a5fa67c..e4966ca4 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -102,7 +102,7 @@ def execute_tasks_h5( {k: v for k, v in resource_dict.items() if k not in task_resource_dict} ) cache_key = task_resource_dict.pop("cache_key", None) - cache_directory = task_resource_dict.pop("cache_directory") + cache_directory = os.path.abspath(task_resource_dict.pop("cache_directory")) task_key, data_dict = serialize_funct_h5( fn=task_dict["fn"], fn_args=task_args, diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index 9e504438..2aad465f 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -93,9 +93,9 @@ def create_file_executor( disable_dependencies: bool = False, ): if cache_directory is None: - resource_dict["cache_directory"] = os.path.abspath("executorlib_cache") + resource_dict["cache_directory"] = "executorlib_cache" else: - resource_dict["cache_directory"] = os.path.abspath(cache_directory) + resource_dict["cache_directory"] = cache_directory if block_allocation: raise ValueError( "The option block_allocation is not available with the pysqa based backend." From 452f3f5d7b4ee07a04dc3cdc455e3aae5128010c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 11 Jul 2025 08:42:27 +0000 Subject: [PATCH 07/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/task_scheduler/file/task_scheduler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index 2aad465f..73d3f8e5 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -1,4 +1,3 @@ -import os from threading import Thread from typing import Callable, Optional From 6d8e451eada10e4f69f3f8aebedc26e2829b7156 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 11 Jul 2025 11:02:01 +0200 Subject: [PATCH 08/12] Create directory later --- executorlib/task_scheduler/file/task_scheduler.py | 1 - executorlib/task_scheduler/interactive/shared.py | 1 - 2 files changed, 2 deletions(-) diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index 74eaccdd..a373d902 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -62,7 +62,6 @@ def __init__( if execute_function == execute_in_subprocess and terminate_function is None: terminate_function = terminate_subprocess cache_directory_path = os.path.abspath(cache_directory) - os.makedirs(cache_directory_path, exist_ok=True) self._process_kwargs = { "future_queue": self._future_queue, "execute_function": execute_function, diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 3742dcfc..7c27e043 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -160,7 +160,6 @@ def _execute_task_with_cache( resource_dict=task_dict.get("resource_dict", {}), cache_key=cache_key, ) - os.makedirs(cache_directory, exist_ok=True) file_name = os.path.abspath(os.path.join(cache_directory, task_key + "_o.h5")) if file_name not in get_cache_files(cache_directory=cache_directory): f = task_dict.pop("future") From 91c6110d02fee5fc2d4b6cb4ae785dd588e05ae0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 11 Jul 2025 12:06:42 +0200 Subject: [PATCH 09/12] introduce cache_dir_dict --- executorlib/task_scheduler/file/shared.py | 22 ++++++------- .../task_scheduler/file/task_scheduler.py | 7 ++--- tests/test_cache_fileexecutor_mpi.py | 3 +- tests/test_cache_fileexecutor_serial.py | 31 ++++++------------- 4 files changed, 23 insertions(+), 40 deletions(-) diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index e4966ca4..98642e93 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -79,6 +79,7 @@ def execute_tasks_h5( """ memory_dict: dict = {} process_dict: dict = {} + cache_dir_dict: dict = {} file_name_dict: dict = {} while True: task_dict = None @@ -145,23 +146,18 @@ def execute_tasks_h5( file_name_dict[task_key] = os.path.join( cache_directory, task_key + "_o.h5" ) - memory_dict[task_key] = { - "future": task_dict["future"], - "cache_directory": cache_directory, - } + memory_dict[task_key] = task_dict["future"] + cache_dir_dict[task_key] = cache_directory future_queue.task_done() else: memory_dict = { - key: { - "future": _check_task_output( - task_key=key, - future_obj=value["future"], - cache_directory=value["cache_directory"], - ), - "cache_directory": value["cache_directory"], - } + key: _check_task_output( + task_key=key, + future_obj=value, + cache_directory=cache_dir_dict[key], + ) for key, value in memory_dict.items() - if not value["future"].done() + if not value.done() } diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index 73d3f8e5..f76babb2 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -50,6 +50,7 @@ def __init__( default_resource_dict = { "cores": 1, "cwd": None, + "cache_directory": "executorlib_cache", } if resource_dict is None: resource_dict = {} @@ -91,10 +92,6 @@ def create_file_executor( init_function: Optional[Callable] = None, disable_dependencies: bool = False, ): - if cache_directory is None: - resource_dict["cache_directory"] = "executorlib_cache" - else: - resource_dict["cache_directory"] = cache_directory if block_allocation: raise ValueError( "The option block_allocation is not available with the pysqa based backend." @@ -103,6 +100,8 @@ def create_file_executor( raise ValueError( "The option to specify an init_function is not available with the pysqa based backend." ) + if cache_directory is not None: + resource_dict["cache_directory"] = cache_directory check_flux_executor_pmi_mode(flux_executor_pmi_mode=flux_executor_pmi_mode) check_max_workers_and_cores(max_cores=max_cores, max_workers=max_workers) check_hostname_localhost(hostname_localhost=hostname_localhost) diff --git a/tests/test_cache_fileexecutor_mpi.py b/tests/test_cache_fileexecutor_mpi.py index 19635a21..c53d469b 100644 --- a/tests/test_cache_fileexecutor_mpi.py +++ b/tests/test_cache_fileexecutor_mpi.py @@ -32,8 +32,7 @@ def mpi_funct(i): class TestCacheExecutorMPI(unittest.TestCase): def test_executor(self): with FileTaskScheduler( - resource_dict={"cores": 2, "cache_directory": "executorlib_cache"}, - execute_function=execute_in_subprocess, + resource_dict={"cores": 2}, execute_function=execute_in_subprocess, ) as exe: fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) diff --git a/tests/test_cache_fileexecutor_serial.py b/tests/test_cache_fileexecutor_serial.py index 5fe79439..2311f74c 100644 --- a/tests/test_cache_fileexecutor_serial.py +++ b/tests/test_cache_fileexecutor_serial.py @@ -36,30 +36,21 @@ def get_error(a): ) class TestCacheExecutorSerial(unittest.TestCase): def test_executor_mixed(self): - with FileTaskScheduler( - execute_function=execute_in_subprocess, - resource_dict={"cache_directory": "executorlib_cache"}, - ) as exe: + with FileTaskScheduler(execute_function=execute_in_subprocess) as exe: fs1 = exe.submit(my_funct, 1, b=2) self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), 3) self.assertTrue(fs1.done()) def test_executor_mixed_cache_key(self): - with FileTaskScheduler( - execute_function=execute_in_subprocess, - resource_dict={"cache_directory": "executorlib_cache"}, - ) as exe: + with FileTaskScheduler(execute_function=execute_in_subprocess) as exe: fs1 = exe.submit(my_funct, 1, b=2, resource_dict={"cache_key": "a/b/c"}) self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), 3) self.assertTrue(fs1.done()) def test_executor_dependence_mixed(self): - with FileTaskScheduler( - execute_function=execute_in_subprocess, - resource_dict={"cache_directory": "executorlib_cache"}, - ) as exe: + with FileTaskScheduler(execute_function=execute_in_subprocess) as exe: fs1 = exe.submit(my_funct, 1, b=2) fs2 = exe.submit(my_funct, 1, b=fs1) self.assertFalse(fs2.done()) @@ -67,17 +58,17 @@ def test_executor_dependence_mixed(self): self.assertTrue(fs2.done()) def test_create_file_executor_error(self): + with self.assertRaises(TypeError): + create_file_executor() with self.assertRaises(ValueError): - create_file_executor(block_allocation=True) + create_file_executor(block_allocation=True, resource_dict={}) with self.assertRaises(ValueError): - create_file_executor(init_function=True) + create_file_executor(init_function=True, resource_dict={}) def test_executor_dependence_error(self): with self.assertRaises(ValueError): with FileTaskScheduler( - execute_function=execute_in_subprocess, - disable_dependencies=True, - resource_dict={"cache_directory": "executorlib_cache"}, + execute_function=execute_in_subprocess, disable_dependencies=True, ) as exe: fs = exe.submit(my_funct, 1, b=exe.submit(my_funct, 1, b=2)) fs.result() @@ -85,8 +76,7 @@ def test_executor_dependence_error(self): def test_executor_working_directory(self): cwd = os.path.join(os.path.dirname(__file__), "executables") with FileTaskScheduler( - resource_dict={"cwd": cwd, "cache_directory": "executorlib_cache"}, - execute_function=execute_in_subprocess, + resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess, ) as exe: fs1 = exe.submit(list_files_in_working_directory) self.assertEqual(fs1.result(), os.listdir(cwd)) @@ -94,8 +84,7 @@ def test_executor_working_directory(self): def test_executor_error(self): cwd = os.path.join(os.path.dirname(__file__), "executables") with FileTaskScheduler( - resource_dict={"cwd": cwd, "cache_directory": "executorlib_cache"}, - execute_function=execute_in_subprocess, + resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess, ) as exe: fs1 = exe.submit(get_error, a=1) with self.assertRaises(ValueError): From b615fd0e9603e3dbfbf53fd314930f1406103111 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 11 Jul 2025 12:07:37 +0200 Subject: [PATCH 10/12] minor fixes --- tests/test_cache_fileexecutor_mpi.py | 2 +- tests/test_cache_fileexecutor_serial.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_cache_fileexecutor_mpi.py b/tests/test_cache_fileexecutor_mpi.py index c53d469b..38f93a9c 100644 --- a/tests/test_cache_fileexecutor_mpi.py +++ b/tests/test_cache_fileexecutor_mpi.py @@ -32,7 +32,7 @@ def mpi_funct(i): class TestCacheExecutorMPI(unittest.TestCase): def test_executor(self): with FileTaskScheduler( - resource_dict={"cores": 2}, execute_function=execute_in_subprocess, + resource_dict={"cores": 2}, execute_function=execute_in_subprocess ) as exe: fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) diff --git a/tests/test_cache_fileexecutor_serial.py b/tests/test_cache_fileexecutor_serial.py index 2311f74c..bb723a45 100644 --- a/tests/test_cache_fileexecutor_serial.py +++ b/tests/test_cache_fileexecutor_serial.py @@ -68,7 +68,7 @@ def test_create_file_executor_error(self): def test_executor_dependence_error(self): with self.assertRaises(ValueError): with FileTaskScheduler( - execute_function=execute_in_subprocess, disable_dependencies=True, + execute_function=execute_in_subprocess, disable_dependencies=True ) as exe: fs = exe.submit(my_funct, 1, b=exe.submit(my_funct, 1, b=2)) fs.result() @@ -76,7 +76,7 @@ def test_executor_dependence_error(self): def test_executor_working_directory(self): cwd = os.path.join(os.path.dirname(__file__), "executables") with FileTaskScheduler( - resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess, + resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess ) as exe: fs1 = exe.submit(list_files_in_working_directory) self.assertEqual(fs1.result(), os.listdir(cwd)) @@ -84,7 +84,7 @@ def test_executor_working_directory(self): def test_executor_error(self): cwd = os.path.join(os.path.dirname(__file__), "executables") with FileTaskScheduler( - resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess, + resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess ) as exe: fs1 = exe.submit(get_error, a=1) with self.assertRaises(ValueError): From 157635f9b58b1dba345c21b9bd258585f7d4a921 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 11 Jul 2025 12:16:43 +0200 Subject: [PATCH 11/12] fix tests --- tests/test_cache_fileexecutor_serial.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/test_cache_fileexecutor_serial.py b/tests/test_cache_fileexecutor_serial.py index bb723a45..eb62c166 100644 --- a/tests/test_cache_fileexecutor_serial.py +++ b/tests/test_cache_fileexecutor_serial.py @@ -108,9 +108,8 @@ def test_executor_function(self): target=execute_tasks_h5, kwargs={ "future_queue": q, - "cache_directory": cache_dir, "execute_function": execute_in_subprocess, - "resource_dict": {"cores": 1, "cwd": None}, + "resource_dict": {"cores": 1, "cwd": None, "cache_directory": cache_dir}, "terminate_function": terminate_subprocess, }, ) @@ -149,9 +148,8 @@ def test_executor_function_dependence_kwargs(self): target=execute_tasks_h5, kwargs={ "future_queue": q, - "cache_directory": cache_dir, "execute_function": execute_in_subprocess, - "resource_dict": {"cores": 1, "cwd": None}, + "resource_dict": {"cores": 1, "cwd": None, "cache_directory": cache_dir}, "terminate_function": terminate_subprocess, }, ) @@ -190,9 +188,8 @@ def test_executor_function_dependence_args(self): target=execute_tasks_h5, kwargs={ "future_queue": q, - "cache_directory": cache_dir, "execute_function": execute_in_subprocess, - "resource_dict": {"cores": 1}, + "resource_dict": {"cores": 1, "cache_directory": cache_dir}, "terminate_function": terminate_subprocess, }, ) From 1aff63116bc97391cb9ab3c7cff4af629ccf8f31 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Jul 2025 16:12:27 +0200 Subject: [PATCH 12/12] Update executorlib/task_scheduler/file/task_scheduler.py Co-authored-by: Liam Huber --- executorlib/task_scheduler/file/task_scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index f76babb2..65daffab 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -40,6 +40,7 @@ def __init__( resource_dict (dict): A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - cwd (str/None): current working directory where the parallel python task is executed + - cache_directory (str): The directory to store cache files. execute_function (Callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. terminate_function (Callable, optional): The function to terminate the tasks. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).