From 53300729f5ccd96d93de05a947a5a4a32bc5541b Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 6 Nov 2024 17:15:50 +0100 Subject: [PATCH 1/2] Add option to disable_dependencies for cache --- executorlib/__init__.py | 1 + executorlib/cache/executor.py | 5 +++++ executorlib/cache/shared.py | 13 ++++++++++--- tests/test_cache_executor_serial.py | 5 +++++ 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index ea57d8a6..b4ae786e 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -195,6 +195,7 @@ def __new__( hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, + disable_dependencies=disable_dependencies, ) elif not disable_dependencies: _check_pysqa_config_directory(pysqa_config_directory=pysqa_config_directory) diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index cf750780..3ecfd6f3 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -32,6 +32,7 @@ def __init__( terminate_function: Optional[callable] = None, pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, + disable_dependencies: bool = False, ): """ Initialize the FileExecutor. @@ -45,6 +46,7 @@ def __init__( terminate_function (callable, optional): The function to terminate the tasks. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. + disable_dependencies (boolean): Disable resolving future objects during the submission. """ super().__init__() default_resource_dict = { @@ -71,6 +73,7 @@ def __init__( "terminate_function": terminate_function, "pysqa_config_directory": pysqa_config_directory, "backend": backend, + "disable_dependencies": disable_dependencies, }, ) ) @@ -89,6 +92,7 @@ def create_file_executor( hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[callable] = None, + disable_dependencies: bool = False, ): if cache_directory is None: cache_directory = "executorlib_cache" @@ -110,4 +114,5 @@ def create_file_executor( resource_dict=resource_dict, pysqa_config_directory=pysqa_config_directory, backend=backend.split("pysqa_")[-1], + disable_dependencies=disable_dependencies, ) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index 22177b32..c1f3adab 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -54,6 +54,7 @@ def execute_tasks_h5( terminate_function: Optional[callable] = None, pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, + disable_dependencies: bool = False, ) -> None: """ Execute tasks stored in a queue using HDF5 files. @@ -111,14 +112,20 @@ def execute_tasks_h5( if task_key + ".h5out" not in os.listdir(cache_directory): file_name = os.path.join(cache_directory, task_key + ".h5in") dump(file_name=file_name, data_dict=data_dict) + if not disable_dependencies: + task_dependent_lst = [ + process_dict[k] for k in future_wait_key_lst + ] + else: + if len(future_wait_key_lst) > 0: + raise ValueError("Future objects are not supported as input if disable_dependencies=True.") + task_dependent_lst = [] process_dict[task_key] = execute_function( command=_get_execute_command( file_name=file_name, cores=task_resource_dict["cores"], ), - task_dependent_lst=[ - process_dict[k] for k in future_wait_key_lst - ], + task_dependent_lst=task_dependent_lst, resource_dict=task_resource_dict, config_directory=pysqa_config_directory, backend=backend, diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 6cb2465e..3e98b1e5 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -46,6 +46,11 @@ def test_executor_dependence_mixed(self): self.assertEqual(fs2.result(), 4) self.assertTrue(fs2.done()) + def test_executor_dependence_error(self): + with self.assertRaises(ValueError): + with FileExecutor(execute_function=execute_in_subprocess, disable_dependencies=True) as exe: + exe.submit(my_funct, 1, b=exe.submit(my_funct, 1, b=2)) + def test_executor_working_directory(self): cwd = os.path.join(os.path.dirname(__file__), "executables") with FileExecutor( From 44afd9c4dad0c277135f833261e962f9c7a4f61c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 6 Nov 2024 16:16:13 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/cache/shared.py | 4 +++- tests/test_cache_executor_serial.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index c1f3adab..9cd540c0 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -118,7 +118,9 @@ def execute_tasks_h5( ] else: if len(future_wait_key_lst) > 0: - raise ValueError("Future objects are not supported as input if disable_dependencies=True.") + raise ValueError( + "Future objects are not supported as input if disable_dependencies=True." + ) task_dependent_lst = [] process_dict[task_key] = execute_function( command=_get_execute_command( diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 3e98b1e5..c5962aac 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -48,7 +48,9 @@ def test_executor_dependence_mixed(self): def test_executor_dependence_error(self): with self.assertRaises(ValueError): - with FileExecutor(execute_function=execute_in_subprocess, disable_dependencies=True) as exe: + with FileExecutor( + execute_function=execute_in_subprocess, disable_dependencies=True + ) as exe: exe.submit(my_funct, 1, b=exe.submit(my_funct, 1, b=2)) def test_executor_working_directory(self):