Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions executorlib/standalone/command.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import importlib.util
import os
import sys


def get_command_path(executable: str) -> str:
Expand All @@ -12,3 +14,55 @@ def get_command_path(executable: str) -> str:
str: absolute path to the executable script
"""
return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable))


def get_cache_execute_command(file_name: str, cores: int = 1) -> list:
"""
Get command to call backend as a list of two strings

Args:
file_name (str): The name of the file.
cores (int, optional): Number of cores used to execute the task. Defaults to 1.

Returns:
list[str]: List of strings containing the python executable path and the backend script to execute
"""
command_lst = [sys.executable]
if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
command_lst = (
["mpiexec", "-n", str(cores)]
+ command_lst
+ [get_command_path(executable="cache_parallel.py"), file_name]
)
elif cores > 1:
raise ImportError(
"mpi4py is required for parallel calculations. Please install mpi4py."
)
else:
command_lst += [get_command_path(executable="cache_serial.py"), file_name]
return command_lst
Comment on lines +19 to +43
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify MPI command construction and consider edge cases.

The function logic is mostly correct, but there are a few considerations:

  1. The MPI command construction for parallel execution looks correct
  2. The error handling for missing mpi4py is appropriate
  3. However, there's an inconsistency between the two functions in MPI command construction

The get_cache_execute_command function adds mpiexec -n <cores> before the Python executable, while get_interactive_execute_command doesn't. This inconsistency could lead to different execution behaviors:


🏁 Script executed:

#!/bin/bash
# Verify the MPI command construction difference between cache and interactive modes
echo "=== Cache command (with mpiexec) ==="
ast-grep --pattern $'get_cache_execute_command($$$) {
  $$$
  command_lst = (
    ["mpiexec", "-n", str(cores)]
    + command_lst
    + [get_command_path($$$), file_name]
  )
  $$$
}'

echo "=== Interactive command (without mpiexec) ==="
ast-grep --pattern $'get_interactive_execute_command($$$) {
  $$$
  command_lst += [get_command_path($$$)]
  $$$
}'

Length of output: 485


🏁 Script executed:

#!/bin/bash
# Locate and display the definition of get_interactive_execute_command
rg -n "def get_interactive_execute_command" -n

Length of output: 122


🏁 Script executed:

#!/bin/bash
# Display get_interactive_execute_command implementation for comparison
sed -n '46,90p' executorlib/standalone/command.py

Length of output: 918


Inconsistent MPI invocation in interactive mode

The get_interactive_execute_command function does not wrap the call in mpiexec -n <cores>, unlike get_cache_execute_command. This leads to different parallel execution behaviors:

• File: executorlib/standalone/command.py
get_cache_execute_command (lines 19–43) prefixes the command with ["mpiexec", "-n", str(cores)].
get_interactive_execute_command (lines 46–66) only appends [sys.executable, …] without mpiexec.

To unify behavior, update get_interactive_execute_command so that when cores > 1 && mpi4py is available, it constructs the command as:

-    if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
-        command_lst += [get_command_path(executable="interactive_parallel.py")]
+    if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
+        command_lst = (
+            ["mpiexec", "-n", str(cores)]
+            + command_lst
+            + [get_command_path(executable="interactive_parallel.py")]
+        )

This ensures both “cache” and “interactive” modes use MPI correctly for parallel execution.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In executorlib/standalone/command.py around lines 46 to 66, update the
get_interactive_execute_command function to prepend the command list with
["mpiexec", "-n", str(cores)] when cores > 1 and mpi4py is available, similar to
get_cache_execute_command. This involves checking for mpi4py using
importlib.util.find_spec and raising an ImportError if mpi4py is missing when
cores > 1. Adjust the command construction to ensure consistent MPI invocation
for parallel execution in both cache and interactive modes.



def get_interactive_execute_command(
cores: int,
) -> list:
"""
Get command to call backend as a list of two strings

Args:
cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py
else interactive_serial.py

Returns:
list[str]: List of strings containing the python executable path and the backend script to execute
"""
command_lst = [sys.executable]
if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
command_lst += [get_command_path(executable="interactive_parallel.py")]
elif cores > 1:
raise ImportError(
"mpi4py is required for parallel calculations. Please install mpi4py."
)
else:
command_lst += [get_command_path(executable="interactive_serial.py")]
return command_lst
Comment on lines +46 to +68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix inconsistent MPI command construction.

The get_interactive_execute_command function doesn't prepend mpiexec for parallel execution, which is inconsistent with get_cache_execute_command. This could cause parallel execution to fail.

Apply this diff to fix the MPI command construction:

 def get_interactive_execute_command(
     cores: int,
 ) -> list:
     """
     Get command to call backend as a list of two strings
 
     Args:
         cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py
                      else interactive_serial.py
 
     Returns:
         list[str]: List of strings containing the python executable path and the backend script to execute
     """
     command_lst = [sys.executable]
     if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
-        command_lst += [get_command_path(executable="interactive_parallel.py")]
+        command_lst = (
+            ["mpiexec", "-n", str(cores)]
+            + command_lst
+            + [get_command_path(executable="interactive_parallel.py")]
+        )
     elif cores > 1:
         raise ImportError(
             "mpi4py is required for parallel calculations. Please install mpi4py."
         )
     else:
         command_lst += [get_command_path(executable="interactive_serial.py")]
     return command_lst
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def get_interactive_execute_command(
cores: int,
) -> list:
"""
Get command to call backend as a list of two strings
Args:
cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py
else interactive_serial.py
Returns:
list[str]: List of strings containing the python executable path and the backend script to execute
"""
command_lst = [sys.executable]
if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
command_lst += [get_command_path(executable="interactive_parallel.py")]
elif cores > 1:
raise ImportError(
"mpi4py is required for parallel calculations. Please install mpi4py."
)
else:
command_lst += [get_command_path(executable="interactive_serial.py")]
return command_lst
def get_interactive_execute_command(
cores: int,
) -> list:
"""
Get command to call backend as a list of two strings
Args:
cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py
else interactive_serial.py
Returns:
list[str]: List of strings containing the python executable path and the backend script to execute
"""
command_lst = [sys.executable]
if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
command_lst = (
["mpiexec", "-n", str(cores)]
+ command_lst
+ [get_command_path(executable="interactive_parallel.py")]
)
elif cores > 1:
raise ImportError(
"mpi4py is required for parallel calculations. Please install mpi4py."
)
else:
command_lst += [get_command_path(executable="interactive_serial.py")]
return command_lst
🤖 Prompt for AI Agents
In executorlib/standalone/command.py around lines 46 to 68, the function
get_interactive_execute_command constructs the command list for parallel
execution without prepending 'mpiexec', unlike get_cache_execute_command. To fix
this, modify the command list to start with 'mpiexec' followed by sys.executable
and the parallel script when cores > 1 and mpi4py is available. This ensures
consistent MPI command construction and proper parallel execution.

33 changes: 2 additions & 31 deletions executorlib/task_scheduler/file/shared.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import contextlib
import importlib.util
import os
import queue
import sys
from concurrent.futures import Future
from typing import Any, Callable, Optional

from executorlib.standalone.cache import get_cache_files
from executorlib.standalone.command import get_command_path
from executorlib.standalone.command import get_cache_execute_command
from executorlib.standalone.serialize import serialize_funct_h5
from executorlib.task_scheduler.file.hdf import get_output
from executorlib.task_scheduler.file.subprocess_spawner import terminate_subprocess
Expand Down Expand Up @@ -153,7 +151,7 @@ def execute_tasks_h5(
)
task_dependent_lst = []
process_dict[task_key] = execute_function(
command=_get_execute_command(
command=get_cache_execute_command(
file_name=file_name,
cores=task_resource_dict["cores"],
),
Expand Down Expand Up @@ -183,33 +181,6 @@ def execute_tasks_h5(
}


def _get_execute_command(file_name: str, cores: int = 1) -> list:
"""
Get command to call backend as a list of two strings

Args:
file_name (str): The name of the file.
cores (int, optional): Number of cores used to execute the task. Defaults to 1.

Returns:
list[str]: List of strings containing the python executable path and the backend script to execute
"""
command_lst = [sys.executable]
if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
command_lst = (
["mpiexec", "-n", str(cores)]
+ command_lst
+ [get_command_path(executable="cache_parallel.py"), file_name]
)
elif cores > 1:
raise ImportError(
"mpi4py is required for parallel calculations. Please install mpi4py."
)
else:
command_lst += [get_command_path(executable="cache_serial.py"), file_name]
return command_lst


def _check_task_output(
task_key: str, future_obj: Future, cache_directory: str
) -> Future:
Expand Down
30 changes: 2 additions & 28 deletions executorlib/task_scheduler/interactive/shared.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import contextlib
import importlib.util
import os
import queue
import sys
import time
from typing import Callable, Optional

from executorlib.standalone.cache import get_cache_files
from executorlib.standalone.command import get_command_path
from executorlib.standalone.command import get_interactive_execute_command
from executorlib.standalone.interactive.communication import (
SocketInterface,
interface_bootup,
Expand Down Expand Up @@ -51,7 +49,7 @@ def execute_tasks(
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
"""
interface = interface_bootup(
command_lst=_get_backend_path(
command_lst=get_interactive_execute_command(
cores=cores,
),
connections=spawner(cores=cores, **kwargs),
Expand Down Expand Up @@ -87,30 +85,6 @@ def execute_tasks(
)


def _get_backend_path(
cores: int,
) -> list:
"""
Get command to call backend as a list of two strings

Args:
cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py

Returns:
list[str]: List of strings containing the python executable path and the backend script to execute
"""
command_lst = [sys.executable]
if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
command_lst += [get_command_path(executable="interactive_parallel.py")]
elif cores > 1:
raise ImportError(
"mpi4py is required for parallel calculations. Please install mpi4py."
)
else:
command_lst += [get_command_path(executable="interactive_serial.py")]
return command_lst


def _execute_task_without_cache(
interface: SocketInterface, task_dict: dict, future_queue: queue.Queue
):
Expand Down
4 changes: 2 additions & 2 deletions tests/test_fluxclusterexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

from executorlib import FluxClusterExecutor
from executorlib.standalone.serialize import cloudpickle_register
from executorlib.standalone.command import get_cache_execute_command

try:
import flux.job
from executorlib.task_scheduler.file.hdf import dump
from executorlib.task_scheduler.file.queue_spawner import terminate_with_pysqa, terminate_tasks_in_cache, execute_with_pysqa
from executorlib.task_scheduler.file.shared import _get_execute_command

skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("EXECUTORLIB_PMIX", None)
Expand Down Expand Up @@ -63,7 +63,7 @@ def test_executor_no_cwd(self):

def test_pysqa_interface(self):
queue_id = execute_with_pysqa(
command=_get_execute_command(
command=get_cache_execute_command(
file_name="test_i.h5",
cores=1,
),
Expand Down
Loading