From 0f59f579e55c0691c2e803ba1165586cc3d8cfaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Oct 2024 18:18:46 +0100 Subject: [PATCH 1/3] Cache: Add working directory parameter --- executorlib/cache/executor.py | 4 ++++ executorlib/cache/shared.py | 12 ++++++++---- tests/test_cache_executor_serial.py | 10 ++++++++++ 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 280f6f6c..0c30ca38 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -1,4 +1,5 @@ import os +from typing import Optional from executorlib.base.executor import ExecutorBase from executorlib.cache.shared import execute_in_subprocess, execute_tasks_h5 @@ -11,6 +12,7 @@ def __init__( cache_directory: str = "cache", execute_function: callable = execute_in_subprocess, cores_per_worker: int = 1, + cwd: Optional[str] = None, ): """ Initialize the FileExecutor. @@ -19,6 +21,7 @@ def __init__( cache_directory (str, optional): The directory to store cache files. Defaults to "cache". execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1. + cwd (str/None): current working directory where the parallel python task is executed """ super().__init__() cache_directory_path = os.path.abspath(cache_directory) @@ -31,6 +34,7 @@ def __init__( "execute_function": execute_function, "cache_directory": cache_directory_path, "cores_per_worker": cores_per_worker, + "cwd": cwd, }, ) ) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index 5db0454b..64402b38 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -4,7 +4,7 @@ import subprocess import sys from concurrent.futures import Future -from typing import Tuple +from typing import Tuple, Optional from executorlib.standalone.command import get_command_path from executorlib.standalone.hdf import dump, get_output @@ -48,14 +48,15 @@ def done(self) -> bool: def execute_in_subprocess( - command: list, task_dependent_lst: list = [] + command: list, task_dependent_lst: list = [], cwd: Optional[str] = None, ) -> subprocess.Popen: """ Execute a command in a subprocess. Args: command (list): The command to be executed. - task_dependent_lst (list, optional): A list of subprocesses that the current subprocess depends on. Defaults to []. + task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. + cwd (str/None): current working directory where the parallel python task is executed Returns: subprocess.Popen: The subprocess object. @@ -65,7 +66,7 @@ def execute_in_subprocess( task_dependent_lst = [ task for task in task_dependent_lst if task.poll() is None ] - return subprocess.Popen(command, universal_newlines=True) + return subprocess.Popen(command, universal_newlines=True, cwd=cwd) def execute_tasks_h5( @@ -73,6 +74,7 @@ def execute_tasks_h5( cache_directory: str, cores_per_worker: int, execute_function: callable, + cwd: Optional[str], ) -> None: """ Execute tasks stored in a queue using HDF5 files. @@ -82,6 +84,7 @@ def execute_tasks_h5( cache_directory (str): The directory to store the HDF5 files. cores_per_worker (int): The number of cores per worker. execute_function (callable): The function to execute the tasks. + cwd (str/None): current working directory where the parallel python task is executed Returns: None @@ -123,6 +126,7 @@ def execute_tasks_h5( task_dependent_lst=[ process_dict[k] for k in future_wait_key_lst ], + cwd=cwd, ) file_name_dict[task_key] = os.path.join( cache_directory, task_key + ".h5out" diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 0884f601..7d60034e 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -19,6 +19,10 @@ def my_funct(a, b): return a + b +def list_files_in_working_directory(): + return os.listdir(os.getcwd()) + + @unittest.skipIf( skip_h5io_test, "h5io is not installed, so the h5io tests are skipped." ) @@ -38,6 +42,12 @@ def test_executor_dependence_mixed(self): self.assertEqual(fs2.result(), 4) self.assertTrue(fs2.done()) + def test_executor_working_directory(self): + cwd = os.path.join(os.path.dirname(__file__), "executables") + with FileExecutor(cwd=cwd) as exe: + fs1 = exe.submit(list_files_in_working_directory) + self.assertEqual(fs1.result(), os.listdir(cwd)) + def test_executor_function(self): fs1 = Future() q = Queue() From 6f9b59e4441433d3639f302ba488077b73d9afe7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 27 Oct 2024 17:19:19 +0000 Subject: [PATCH 2/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/cache/shared.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index 64402b38..77a7da43 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -4,7 +4,7 @@ import subprocess import sys from concurrent.futures import Future -from typing import Tuple, Optional +from typing import Optional, Tuple from executorlib.standalone.command import get_command_path from executorlib.standalone.hdf import dump, get_output @@ -48,7 +48,9 @@ def done(self) -> bool: def execute_in_subprocess( - command: list, task_dependent_lst: list = [], cwd: Optional[str] = None, + command: list, + task_dependent_lst: list = [], + cwd: Optional[str] = None, ) -> subprocess.Popen: """ Execute a command in a subprocess. From e19d4b3ad794161a0e95fe546f5267152a64d8d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Oct 2024 18:29:12 +0100 Subject: [PATCH 3/3] fix tests --- tests/test_cache_executor_serial.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 7d60034e..7b3c57fb 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -61,6 +61,7 @@ def test_executor_function(self): "cache_directory": cache_dir, "execute_function": execute_in_subprocess, "cores_per_worker": 1, + "cwd": None, }, ) process.start() @@ -68,6 +69,7 @@ def test_executor_function(self): self.assertEqual(fs1.result(), 3) self.assertTrue(fs1.done()) q.put({"shutdown": True, "wait": True}) + process.join() def test_executor_function_dependence_kwargs(self): fs1 = Future() @@ -84,6 +86,7 @@ def test_executor_function_dependence_kwargs(self): "cache_directory": cache_dir, "execute_function": execute_in_subprocess, "cores_per_worker": 1, + "cwd": None, }, ) process.start() @@ -91,6 +94,7 @@ def test_executor_function_dependence_kwargs(self): self.assertEqual(fs2.result(), 4) self.assertTrue(fs2.done()) q.put({"shutdown": True, "wait": True}) + process.join() def test_executor_function_dependence_args(self): fs1 = Future() @@ -107,6 +111,7 @@ def test_executor_function_dependence_args(self): "cache_directory": cache_dir, "execute_function": execute_in_subprocess, "cores_per_worker": 1, + "cwd": None, }, ) process.start() @@ -114,6 +119,7 @@ def test_executor_function_dependence_args(self): self.assertEqual(fs2.result(), 5) self.assertTrue(fs2.done()) q.put({"shutdown": True, "wait": True}) + process.join() def tearDown(self): if os.path.exists("cache"):