Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
from typing import Optional

from executorlib.base.executor import ExecutorBase
from executorlib.cache.shared import execute_in_subprocess, execute_tasks_h5
from executorlib.cache.shared import execute_tasks_h5
from executorlib.standalone.cache.spawner import (
execute_in_subprocess,
terminate_subprocess,
)
from executorlib.standalone.thread import RaisingThread


Expand All @@ -13,6 +17,7 @@ def __init__(
execute_function: callable = execute_in_subprocess,
cores_per_worker: int = 1,
cwd: Optional[str] = None,
terminate_function: Optional[callable] = None,
):
"""
Initialize the FileExecutor.
Expand All @@ -22,8 +27,11 @@ def __init__(
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1.
cwd (str/None): current working directory where the parallel python task is executed
terminate_function (callable, optional): The function to terminate the tasks.
"""
super().__init__()
if execute_function == execute_in_subprocess and terminate_function is None:
terminate_function = terminate_subprocess
cache_directory_path = os.path.abspath(cache_directory)
os.makedirs(cache_directory_path, exist_ok=True)
self._set_process(
Expand All @@ -35,6 +43,7 @@ def __init__(
"cache_directory": cache_directory_path,
"cores_per_worker": cores_per_worker,
"cwd": cwd,
"terminate_function": terminate_function,
},
)
)
30 changes: 5 additions & 25 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import importlib.util
import os
import queue
import subprocess
import sys
from concurrent.futures import Future
from typing import Optional, Tuple
Expand Down Expand Up @@ -47,36 +46,13 @@ def done(self) -> bool:
return get_output(file_name=self._file_name)[0]


def execute_in_subprocess(
command: list,
task_dependent_lst: list = [],
cwd: Optional[str] = None,
) -> subprocess.Popen:
"""
Execute a command in a subprocess.

Args:
command (list): The command to be executed.
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
cwd (str/None): current working directory where the parallel python task is executed

Returns:
subprocess.Popen: The subprocess object.

"""
while len(task_dependent_lst) > 0:
task_dependent_lst = [
task for task in task_dependent_lst if task.poll() is None
]
return subprocess.Popen(command, universal_newlines=True, cwd=cwd)


def execute_tasks_h5(
future_queue: queue.Queue,
cache_directory: str,
cores_per_worker: int,
execute_function: callable,
cwd: Optional[str],
terminate_function: Optional[callable] = None,
) -> None:
"""
Execute tasks stored in a queue using HDF5 files.
Expand All @@ -87,6 +63,7 @@ def execute_tasks_h5(
cores_per_worker (int): The number of cores per worker.
execute_function (callable): The function to execute the tasks.
cwd (str/None): current working directory where the parallel python task is executed
terminate_function (callable): The function to terminate the tasks.

Returns:
None
Expand All @@ -104,6 +81,9 @@ def execute_tasks_h5(
and "shutdown" in task_dict.keys()
and task_dict["shutdown"]
):
if terminate_function is not None:
for task in process_dict.values():
terminate_function(task=task)
Comment on lines +84 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling and logging to termination logic.

The current implementation might leave processes running if termination fails for any task.

Consider this more robust implementation:

     if terminate_function is not None:
+        termination_errors = []
         for task in process_dict.values():
-            terminate_function(task=task)
+            try:
+                terminate_function(task=task)
+            except Exception as e:
+                termination_errors.append(f"Failed to terminate task: {e}")
+        if termination_errors:
+            raise RuntimeError(f"Errors during shutdown: {'; '.join(termination_errors)}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if terminate_function is not None:
for task in process_dict.values():
terminate_function(task=task)
if terminate_function is not None:
termination_errors = []
for task in process_dict.values():
try:
terminate_function(task=task)
except Exception as e:
termination_errors.append(f"Failed to terminate task: {e}")
if termination_errors:
raise RuntimeError(f"Errors during shutdown: {'; '.join(termination_errors)}")

future_queue.task_done()
future_queue.join()
break
Expand Down
Empty file.
39 changes: 39 additions & 0 deletions executorlib/standalone/cache/spawner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import subprocess
import time
from typing import Optional


def execute_in_subprocess(
command: list,
task_dependent_lst: list = [],
cwd: Optional[str] = None,
) -> subprocess.Popen:
Comment on lines +6 to +10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix mutable default argument and add error handling

The function has a few potential issues that should be addressed:

Apply this diff to fix the mutable default argument and add error handling:

 def execute_in_subprocess(
     command: list,
-    task_dependent_lst: list = [],
+    task_dependent_lst: Optional[list] = None,
     cwd: Optional[str] = None,
 ) -> subprocess.Popen:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def execute_in_subprocess(
command: list,
task_dependent_lst: list = [],
cwd: Optional[str] = None,
) -> subprocess.Popen:
def execute_in_subprocess(
command: list,
task_dependent_lst: Optional[list] = None,
cwd: Optional[str] = None,
) -> subprocess.Popen:
🧰 Tools
🪛 Ruff

8-8: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

"""
Execute a command in a subprocess.
Args:
command (list): The command to be executed.
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
cwd (str/None): current working directory where the parallel python task is executed
Returns:
subprocess.Popen: The subprocess object.
"""
while len(task_dependent_lst) > 0:
task_dependent_lst = [
task for task in task_dependent_lst if task.poll() is None
]
return subprocess.Popen(command, universal_newlines=True, cwd=cwd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for subprocess creation

The subprocess creation should be wrapped in a try-except block to handle potential errors.

Here's a suggested implementation:

-    return subprocess.Popen(command, universal_newlines=True, cwd=cwd)
+    try:
+        return subprocess.Popen(
+            command,
+            universal_newlines=True,
+            cwd=cwd,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE
+        )
+    except (OSError, subprocess.SubprocessError) as e:
+        raise RuntimeError(f"Failed to execute command {command}: {str(e)}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return subprocess.Popen(command, universal_newlines=True, cwd=cwd)
try:
return subprocess.Popen(
command,
universal_newlines=True,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
except (OSError, subprocess.SubprocessError) as e:
raise RuntimeError(f"Failed to execute command {command}: {str(e)}")

Comment on lines +23 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add timeout and error handling for dependent tasks

The while loop could potentially run indefinitely if dependent tasks never complete. Consider adding a timeout mechanism and proper error handling.

Here's a suggested implementation:

+    if task_dependent_lst is None:
+        task_dependent_lst = []
+    start_time = time.time()
+    timeout = 3600  # 1 hour timeout, adjust as needed
     while len(task_dependent_lst) > 0:
+        if time.time() - start_time > timeout:
+            raise TimeoutError("Dependent tasks did not complete within the timeout period")
         task_dependent_lst = [
             task for task in task_dependent_lst if task.poll() is None
         ]
+        time.sleep(0.1)  # Add small delay to prevent CPU spinning

Committable suggestion was skipped due to low confidence.



def terminate_subprocess(task):
"""
Terminate a subprocess and wait for it to complete.
Args:
task (subprocess.Popen): The subprocess.Popen instance to terminate
"""
task.terminate()
while task.poll() is None:
time.sleep(0.1)
Comment on lines +30 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance subprocess termination with graceful shutdown and timeout

The current implementation could be improved with proper type hints, graceful shutdown attempt, timeout mechanism, and error handling.

Here's a suggested implementation:

-def terminate_subprocess(task):
+def terminate_subprocess(task: subprocess.Popen, timeout: float = 10.0) -> None:
     """
     Terminate a subprocess and wait for it to complete.
 
     Args:
         task (subprocess.Popen): The subprocess.Popen instance to terminate
+        timeout (float): Maximum time to wait for process termination in seconds
+
+    Raises:
+        TimeoutError: If the process doesn't terminate within the timeout period
     """
-    task.terminate()
-    while task.poll() is None:
-        time.sleep(0.1)
+    try:
+        # First attempt graceful shutdown
+        task.terminate()
+        start_time = time.time()
+        
+        while task.poll() is None:
+            if time.time() - start_time > timeout:
+                # Force kill if graceful shutdown fails
+                task.kill()
+                if task.poll() is None:
+                    raise TimeoutError(f"Failed to terminate process {task.pid} within {timeout} seconds")
+            time.sleep(0.1)
+    except Exception as e:
+        raise RuntimeError(f"Error while terminating process {task.pid}: {str(e)}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def terminate_subprocess(task):
"""
Terminate a subprocess and wait for it to complete.
Args:
task (subprocess.Popen): The subprocess.Popen instance to terminate
"""
task.terminate()
while task.poll() is None:
time.sleep(0.1)
def terminate_subprocess(task: subprocess.Popen, timeout: float = 10.0) -> None:
"""
Terminate a subprocess and wait for it to complete.
Args:
task (subprocess.Popen): The subprocess.Popen instance to terminate
timeout (float): Maximum time to wait for process termination in seconds
Raises:
TimeoutError: If the process doesn't terminate within the timeout period
"""
try:
# First attempt graceful shutdown
task.terminate()
start_time = time.time()
while task.poll() is None:
if time.time() - start_time > timeout:
# Force kill if graceful shutdown fails
task.kill()
if task.poll() is None:
raise TimeoutError(f"Failed to terminate process {task.pid} within {timeout} seconds")
time.sleep(0.1)
except Exception as e:
raise RuntimeError(f"Error while terminating process {task.pid}: {str(e)}")

1 change: 0 additions & 1 deletion tests/test_cache_executor_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

try:
from executorlib import FileExecutor
from executorlib.cache.shared import execute_tasks_h5, execute_in_subprocess

skip_h5io_test = False
except ImportError:
Expand Down
9 changes: 8 additions & 1 deletion tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

try:
from executorlib import FileExecutor
from executorlib.cache.shared import execute_tasks_h5, execute_in_subprocess
from executorlib.cache.shared import execute_tasks_h5
from executorlib.standalone.cache.spawner import (
execute_in_subprocess,
terminate_subprocess,
)

skip_h5io_test = False
except ImportError:
Expand Down Expand Up @@ -62,6 +66,7 @@ def test_executor_function(self):
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"cwd": None,
"terminate_function": terminate_subprocess,
},
)
process.start()
Expand All @@ -87,6 +92,7 @@ def test_executor_function_dependence_kwargs(self):
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"cwd": None,
"terminate_function": terminate_subprocess,
},
)
process.start()
Expand All @@ -112,6 +118,7 @@ def test_executor_function_dependence_args(self):
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"cwd": None,
"terminate_function": terminate_subprocess,
},
)
process.start()
Expand Down
Loading