From 4d01b06635b8e8df308f780cd7c20baab2b9a07a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Oct 2024 18:45:09 +0100 Subject: [PATCH 1/6] Cache: Terminate processes when closing executor --- executorlib/cache/executor.py | 7 ++++++- executorlib/cache/shared.py | 14 +++++++++++++- tests/test_cache_executor_serial.py | 5 ++++- 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 280f6f6c..a7657b41 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -1,7 +1,8 @@ import os +from typing import Optional from executorlib.base.executor import ExecutorBase -from executorlib.cache.shared import execute_in_subprocess, execute_tasks_h5 +from executorlib.cache.shared import execute_in_subprocess, execute_tasks_h5, terminate_subprocess from executorlib.standalone.thread import RaisingThread @@ -11,6 +12,7 @@ def __init__( cache_directory: str = "cache", execute_function: callable = execute_in_subprocess, cores_per_worker: int = 1, + terminate_function: Optional[callable] = None, ): """ Initialize the FileExecutor. @@ -21,6 +23,8 @@ def __init__( cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1. """ super().__init__() + if execute_function == execute_in_subprocess and terminate_function is None: + terminate_function = terminate_subprocess cache_directory_path = os.path.abspath(cache_directory) os.makedirs(cache_directory_path, exist_ok=True) self._set_process( @@ -31,6 +35,7 @@ def __init__( "execute_function": execute_function, "cache_directory": cache_directory_path, "cores_per_worker": cores_per_worker, + "terminate_function": terminate_function, }, ) ) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index 5db0454b..9a4bf391 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -3,8 +3,9 @@ import queue import subprocess import sys +import time from concurrent.futures import Future -from typing import Tuple +from typing import Tuple, Optional from executorlib.standalone.command import get_command_path from executorlib.standalone.hdf import dump, get_output @@ -68,11 +69,18 @@ def execute_in_subprocess( return subprocess.Popen(command, universal_newlines=True) +def terminate_subprocess(task): + task.terminate() + while task.poll() is None: + time.sleep(0.1) + + def execute_tasks_h5( future_queue: queue.Queue, cache_directory: str, cores_per_worker: int, execute_function: callable, + terminate_function: Optional[callable] = None, ) -> None: """ Execute tasks stored in a queue using HDF5 files. @@ -82,6 +90,7 @@ def execute_tasks_h5( cache_directory (str): The directory to store the HDF5 files. cores_per_worker (int): The number of cores per worker. execute_function (callable): The function to execute the tasks. + terminate_function (callable): The function to terminate the tasks. Returns: None @@ -99,6 +108,9 @@ def execute_tasks_h5( and "shutdown" in task_dict.keys() and task_dict["shutdown"] ): + if terminate_function is not None: + for task in process_dict.values(): + terminate_function(task=task) future_queue.task_done() future_queue.join() break diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 0884f601..a8bbf3c3 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -8,7 +8,7 @@ try: from executorlib import FileExecutor - from executorlib.cache.shared import execute_tasks_h5, execute_in_subprocess + from executorlib.cache.shared import execute_tasks_h5, execute_in_subprocess, terminate_subprocess skip_h5io_test = False except ImportError: @@ -51,6 +51,7 @@ def test_executor_function(self): "cache_directory": cache_dir, "execute_function": execute_in_subprocess, "cores_per_worker": 1, + "terminate_function": terminate_subprocess, }, ) process.start() @@ -74,6 +75,7 @@ def test_executor_function_dependence_kwargs(self): "cache_directory": cache_dir, "execute_function": execute_in_subprocess, "cores_per_worker": 1, + "terminate_function": terminate_subprocess, }, ) process.start() @@ -97,6 +99,7 @@ def test_executor_function_dependence_args(self): "cache_directory": cache_dir, "execute_function": execute_in_subprocess, "cores_per_worker": 1, + "terminate_function": terminate_subprocess, }, ) process.start() From 9307b0172178d980aa98032a5258bdd7f44dde7d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 27 Oct 2024 17:45:33 +0000 Subject: [PATCH 2/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/cache/executor.py | 6 +++++- executorlib/cache/shared.py | 2 +- tests/test_cache_executor_serial.py | 6 +++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index a7657b41..d62f801e 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -2,7 +2,11 @@ from typing import Optional from executorlib.base.executor import ExecutorBase -from executorlib.cache.shared import execute_in_subprocess, execute_tasks_h5, terminate_subprocess +from executorlib.cache.shared import ( + execute_in_subprocess, + execute_tasks_h5, + terminate_subprocess, +) from executorlib.standalone.thread import RaisingThread diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index 9a4bf391..8e3267b6 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -5,7 +5,7 @@ import sys import time from concurrent.futures import Future -from typing import Tuple, Optional +from typing import Optional, Tuple from executorlib.standalone.command import get_command_path from executorlib.standalone.hdf import dump, get_output diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index a8bbf3c3..e14c5801 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -8,7 +8,11 @@ try: from executorlib import FileExecutor - from executorlib.cache.shared import execute_tasks_h5, execute_in_subprocess, terminate_subprocess + from executorlib.cache.shared import ( + execute_tasks_h5, + execute_in_subprocess, + terminate_subprocess, + ) skip_h5io_test = False except ImportError: From ad441a53272f45fb5bbcf073d688033802e98169 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Oct 2024 18:49:41 +0100 Subject: [PATCH 3/6] Merge main --- executorlib/cache/executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 617022d0..2191fcb2 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -16,8 +16,8 @@ def __init__( cache_directory: str = "cache", execute_function: callable = execute_in_subprocess, cores_per_worker: int = 1, - cwd: Optional[str] = None, - terminate_function: Optional[callable] = None, + cwd: Optional[str] = None, + terminate_function: Optional[callable] = None, ): """ Initialize the FileExecutor. From 54924b540f6806afe76a4d8a64901376f13d839f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Oct 2024 18:50:22 +0100 Subject: [PATCH 4/6] fixes --- executorlib/cache/executor.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 2191fcb2..357df6b9 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -16,8 +16,8 @@ def __init__( cache_directory: str = "cache", execute_function: callable = execute_in_subprocess, cores_per_worker: int = 1, - cwd: Optional[str] = None, - terminate_function: Optional[callable] = None, + cwd: Optional[str] = None, + terminate_function: Optional[callable] = None, ): """ Initialize the FileExecutor. @@ -27,6 +27,7 @@ def __init__( execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1. cwd (str/None): current working directory where the parallel python task is executed + terminate_function (callable, optional): The function to terminate the tasks. """ super().__init__() if execute_function == execute_in_subprocess and terminate_function is None: From 5a18e973a956db2a849772bd3679b70c35fdc598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Oct 2024 18:56:14 +0100 Subject: [PATCH 5/6] refactor cache spawner --- executorlib/cache/executor.py | 4 +-- executorlib/cache/shared.py | 32 ------------------- executorlib/standalone/cache/__init__.py | 0 executorlib/standalone/cache/spawner.py | 39 ++++++++++++++++++++++++ tests/test_cache_executor_mpi.py | 1 - tests/test_cache_executor_serial.py | 4 +-- 6 files changed, 43 insertions(+), 37 deletions(-) create mode 100644 executorlib/standalone/cache/__init__.py create mode 100644 executorlib/standalone/cache/spawner.py diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 357df6b9..b9800103 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -2,9 +2,9 @@ from typing import Optional from executorlib.base.executor import ExecutorBase -from executorlib.cache.shared import ( +from executorlib.cache.shared import execute_tasks_h5 +from executorlib.standalone.cache.spawner import ( execute_in_subprocess, - execute_tasks_h5, terminate_subprocess, ) from executorlib.standalone.thread import RaisingThread diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index 1effa7a3..ffe8a95b 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -1,9 +1,7 @@ import importlib.util import os import queue -import subprocess import sys -import time from concurrent.futures import Future from typing import Optional, Tuple @@ -48,36 +46,6 @@ def done(self) -> bool: return get_output(file_name=self._file_name)[0] -def execute_in_subprocess( - command: list, - task_dependent_lst: list = [], - cwd: Optional[str] = None, -) -> subprocess.Popen: - """ - Execute a command in a subprocess. - - Args: - command (list): The command to be executed. - task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. - cwd (str/None): current working directory where the parallel python task is executed - - Returns: - subprocess.Popen: The subprocess object. - - """ - while len(task_dependent_lst) > 0: - task_dependent_lst = [ - task for task in task_dependent_lst if task.poll() is None - ] - return subprocess.Popen(command, universal_newlines=True, cwd=cwd) - - -def terminate_subprocess(task): - task.terminate() - while task.poll() is None: - time.sleep(0.1) - - def execute_tasks_h5( future_queue: queue.Queue, cache_directory: str, diff --git a/executorlib/standalone/cache/__init__.py b/executorlib/standalone/cache/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/executorlib/standalone/cache/spawner.py b/executorlib/standalone/cache/spawner.py new file mode 100644 index 00000000..50325094 --- /dev/null +++ b/executorlib/standalone/cache/spawner.py @@ -0,0 +1,39 @@ +import subprocess +import time +from typing import Optional + + +def execute_in_subprocess( + command: list, + task_dependent_lst: list = [], + cwd: Optional[str] = None, +) -> subprocess.Popen: + """ + Execute a command in a subprocess. + + Args: + command (list): The command to be executed. + task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. + cwd (str/None): current working directory where the parallel python task is executed + + Returns: + subprocess.Popen: The subprocess object. + + """ + while len(task_dependent_lst) > 0: + task_dependent_lst = [ + task for task in task_dependent_lst if task.poll() is None + ] + return subprocess.Popen(command, universal_newlines=True, cwd=cwd) + + +def terminate_subprocess(task): + """ + Terminate a subprocess and wait for it to complete. + + Args: + task (subprocess.Popen): The subprocess.Popen instance to terminate + """ + task.terminate() + while task.poll() is None: + time.sleep(0.1) \ No newline at end of file diff --git a/tests/test_cache_executor_mpi.py b/tests/test_cache_executor_mpi.py index 1110be6d..8fa66ccd 100644 --- a/tests/test_cache_executor_mpi.py +++ b/tests/test_cache_executor_mpi.py @@ -6,7 +6,6 @@ try: from executorlib import FileExecutor - from executorlib.cache.shared import execute_tasks_h5, execute_in_subprocess skip_h5io_test = False except ImportError: diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 77b74fc7..a4d453f7 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -8,8 +8,8 @@ try: from executorlib import FileExecutor - from executorlib.cache.shared import ( - execute_tasks_h5, + from executorlib.cache.shared import execute_tasks_h5 + from executorlib.standalone.cache.spawner import ( execute_in_subprocess, terminate_subprocess, ) From cf5b211e84bbe06e76bb7114c797a0e00763128e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 27 Oct 2024 17:56:24 +0000 Subject: [PATCH 6/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/standalone/cache/spawner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/standalone/cache/spawner.py b/executorlib/standalone/cache/spawner.py index 50325094..aa6a39dc 100644 --- a/executorlib/standalone/cache/spawner.py +++ b/executorlib/standalone/cache/spawner.py @@ -36,4 +36,4 @@ def terminate_subprocess(task): """ task.terminate() while task.poll() is None: - time.sleep(0.1) \ No newline at end of file + time.sleep(0.1)