diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 0c30ca38..b9800103 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -2,7 +2,11 @@ from typing import Optional from executorlib.base.executor import ExecutorBase -from executorlib.cache.shared import execute_in_subprocess, execute_tasks_h5 +from executorlib.cache.shared import execute_tasks_h5 +from executorlib.standalone.cache.spawner import ( + execute_in_subprocess, + terminate_subprocess, +) from executorlib.standalone.thread import RaisingThread @@ -13,6 +17,7 @@ def __init__( execute_function: callable = execute_in_subprocess, cores_per_worker: int = 1, cwd: Optional[str] = None, + terminate_function: Optional[callable] = None, ): """ Initialize the FileExecutor. @@ -22,8 +27,11 @@ def __init__( execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1. cwd (str/None): current working directory where the parallel python task is executed + terminate_function (callable, optional): The function to terminate the tasks. """ super().__init__() + if execute_function == execute_in_subprocess and terminate_function is None: + terminate_function = terminate_subprocess cache_directory_path = os.path.abspath(cache_directory) os.makedirs(cache_directory_path, exist_ok=True) self._set_process( @@ -35,6 +43,7 @@ def __init__( "cache_directory": cache_directory_path, "cores_per_worker": cores_per_worker, "cwd": cwd, + "terminate_function": terminate_function, }, ) ) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index 77a7da43..ffe8a95b 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -1,7 +1,6 @@ import importlib.util import os import queue -import subprocess import sys from concurrent.futures import Future from typing import Optional, Tuple @@ -47,36 +46,13 @@ def done(self) -> bool: return get_output(file_name=self._file_name)[0] -def execute_in_subprocess( - command: list, - task_dependent_lst: list = [], - cwd: Optional[str] = None, -) -> subprocess.Popen: - """ - Execute a command in a subprocess. - - Args: - command (list): The command to be executed. - task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. - cwd (str/None): current working directory where the parallel python task is executed - - Returns: - subprocess.Popen: The subprocess object. - - """ - while len(task_dependent_lst) > 0: - task_dependent_lst = [ - task for task in task_dependent_lst if task.poll() is None - ] - return subprocess.Popen(command, universal_newlines=True, cwd=cwd) - - def execute_tasks_h5( future_queue: queue.Queue, cache_directory: str, cores_per_worker: int, execute_function: callable, cwd: Optional[str], + terminate_function: Optional[callable] = None, ) -> None: """ Execute tasks stored in a queue using HDF5 files. @@ -87,6 +63,7 @@ def execute_tasks_h5( cores_per_worker (int): The number of cores per worker. execute_function (callable): The function to execute the tasks. cwd (str/None): current working directory where the parallel python task is executed + terminate_function (callable): The function to terminate the tasks. Returns: None @@ -104,6 +81,9 @@ def execute_tasks_h5( and "shutdown" in task_dict.keys() and task_dict["shutdown"] ): + if terminate_function is not None: + for task in process_dict.values(): + terminate_function(task=task) future_queue.task_done() future_queue.join() break diff --git a/executorlib/standalone/cache/__init__.py b/executorlib/standalone/cache/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/executorlib/standalone/cache/spawner.py b/executorlib/standalone/cache/spawner.py new file mode 100644 index 00000000..aa6a39dc --- /dev/null +++ b/executorlib/standalone/cache/spawner.py @@ -0,0 +1,39 @@ +import subprocess +import time +from typing import Optional + + +def execute_in_subprocess( + command: list, + task_dependent_lst: list = [], + cwd: Optional[str] = None, +) -> subprocess.Popen: + """ + Execute a command in a subprocess. + + Args: + command (list): The command to be executed. + task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. + cwd (str/None): current working directory where the parallel python task is executed + + Returns: + subprocess.Popen: The subprocess object. + + """ + while len(task_dependent_lst) > 0: + task_dependent_lst = [ + task for task in task_dependent_lst if task.poll() is None + ] + return subprocess.Popen(command, universal_newlines=True, cwd=cwd) + + +def terminate_subprocess(task): + """ + Terminate a subprocess and wait for it to complete. + + Args: + task (subprocess.Popen): The subprocess.Popen instance to terminate + """ + task.terminate() + while task.poll() is None: + time.sleep(0.1) diff --git a/tests/test_cache_executor_mpi.py b/tests/test_cache_executor_mpi.py index 1110be6d..8fa66ccd 100644 --- a/tests/test_cache_executor_mpi.py +++ b/tests/test_cache_executor_mpi.py @@ -6,7 +6,6 @@ try: from executorlib import FileExecutor - from executorlib.cache.shared import execute_tasks_h5, execute_in_subprocess skip_h5io_test = False except ImportError: diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 7b3c57fb..a4d453f7 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -8,7 +8,11 @@ try: from executorlib import FileExecutor - from executorlib.cache.shared import execute_tasks_h5, execute_in_subprocess + from executorlib.cache.shared import execute_tasks_h5 + from executorlib.standalone.cache.spawner import ( + execute_in_subprocess, + terminate_subprocess, + ) skip_h5io_test = False except ImportError: @@ -62,6 +66,7 @@ def test_executor_function(self): "execute_function": execute_in_subprocess, "cores_per_worker": 1, "cwd": None, + "terminate_function": terminate_subprocess, }, ) process.start() @@ -87,6 +92,7 @@ def test_executor_function_dependence_kwargs(self): "execute_function": execute_in_subprocess, "cores_per_worker": 1, "cwd": None, + "terminate_function": terminate_subprocess, }, ) process.start() @@ -112,6 +118,7 @@ def test_executor_function_dependence_args(self): "execute_function": execute_in_subprocess, "cores_per_worker": 1, "cwd": None, + "terminate_function": terminate_subprocess, }, ) process.start()