Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .ci_support/environment-mpich.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ dependencies:
- networkx =3.4.2
- pygraphviz =1.14
- ipython =8.29.0
- pysqa =0.2.0
1 change: 1 addition & 0 deletions .ci_support/environment-openmpi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ dependencies:
- matplotlib =3.9.2
- networkx =3.4.2
- pygraphviz =1.14
- pysqa =0.2.0
- ipython =8.29.0
2 changes: 1 addition & 1 deletion .github/workflows/unittest-flux-mpich.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ jobs:
timeout-minutes: 5
run: >
flux start
python -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py;
python -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py tests/test_cache_executor_pysqa_flux.py;
2 changes: 1 addition & 1 deletion .github/workflows/unittest-flux-openmpi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
timeout-minutes: 5
run: >
flux start
coverage run -a --omit="executorlib/_version.py,tests/*" -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py;
coverage run -a --omit="executorlib/_version.py,tests/*" -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py tests/test_cache_executor_pysqa_flux.py;
coverage xml
env:
PYMPIPOOL_PMIX: "pmix"
Expand Down
8 changes: 7 additions & 1 deletion executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def __init__(
cwd: Optional[str] = None,
execute_function: callable = execute_in_subprocess,
terminate_function: Optional[callable] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
):
"""
Initialize the FileExecutor.
Expand All @@ -27,7 +29,9 @@ def __init__(
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1.
terminate_function (callable, optional): The function to terminate the tasks.
cwd (str/None): current working directory where the parallel python task is executed
cwd (str, optional): current working directory where the parallel python task is executed
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks.
"""
super().__init__()
if execute_function == execute_in_subprocess and terminate_function is None:
Expand All @@ -44,6 +48,8 @@ def __init__(
"cores_per_worker": cores_per_worker,
"cwd": cwd,
"terminate_function": terminate_function,
"config_directory": config_directory,
"backend": backend,
},
)
)
6 changes: 6 additions & 0 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ def execute_tasks_h5(
cores_per_worker: int = 1,
cwd: Optional[str] = None,
terminate_function: Optional[callable] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
) -> None:
"""
Execute tasks stored in a queue using HDF5 files.
Expand All @@ -64,6 +66,8 @@ def execute_tasks_h5(
execute_function (callable): The function to execute the tasks.
cwd (str/None): current working directory where the parallel python task is executed
terminate_function (callable): The function to terminate the tasks.
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks.

Returns:
None
Expand Down Expand Up @@ -117,6 +121,8 @@ def execute_tasks_h5(
process_dict[k] for k in future_wait_key_lst
],
resource_dict=resource_dict,
config_directory=config_directory,
backend=backend,
)
file_name_dict[task_key] = os.path.join(
cache_directory, task_key + ".h5out"
Expand Down
39 changes: 39 additions & 0 deletions executorlib/standalone/cache/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import List, Optional

from pysqa import QueueAdapter


def execute_with_pysqa(
command: str,
resource_dict: dict,
task_dependent_lst: List[int] = [],
config_directory: Optional[str] = None,
backend: Optional[str] = None,
) -> int:
"""
Execute a command by submitting it to the queuing system

Args:
command (list): The command to be executed.
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
Example resource dictionary: {
cwd: None,
}
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks.

Returns:
int: queuing system ID
"""
if resource_dict is None:
resource_dict = {"cwd": "."}
qa = QueueAdapter(directory=config_directory, queue_type=backend)
submit_kwargs = {
"command": " ".join(command),
"dependency_list": [str(qid) for qid in task_dependent_lst],
"working_directory": resource_dict["cwd"],
}
del resource_dict["cwd"]
submit_kwargs.update(resource_dict)
return qa.submit_job(**submit_kwargs)
10 changes: 10 additions & 0 deletions executorlib/standalone/cache/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ def execute_in_subprocess(
command: list,
task_dependent_lst: list = [],
resource_dict: Optional[dict] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
) -> subprocess.Popen:
"""
Execute a command in a subprocess.
Expand All @@ -18,6 +20,8 @@ def execute_in_subprocess(
Example resource dictionary: {
cwd: None,
}
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks.

Returns:
subprocess.Popen: The subprocess object.
Expand All @@ -27,6 +31,12 @@ def execute_in_subprocess(
task_dependent_lst = [
task for task in task_dependent_lst if task.poll() is None
]
if config_directory is not None:
raise ValueError(
"config_directory parameter is not supported for subprocess spawner."
)
if backend is not None:
raise ValueError("backend parameter is not supported for subprocess spawner.")
if resource_dict is None:
resource_dict = {"cwd": None}
elif len(resource_dict) == 0:
Expand Down
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ Repository = "https://github.com/pyiron/executorlib"

[project.optional-dependencies]
mpi = ["mpi4py==4.0.1"]
hdf = [
"h5py==3.12.1",
]
hdf = ["h5py==3.12.1"]
graph = [
"pygraphviz==1.14",
"matplotlib==3.9.2",
"networkx==3.4.2",
"ipython==8.29.0",
]
queue = ["pysqa==0.2.0"]

[tool.setuptools.packages.find]
include = ["executorlib*"]
Expand Down
46 changes: 46 additions & 0 deletions tests/test_cache_executor_pysqa_flux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
import importlib
import unittest
import shutil

try:
import flux.job
from executorlib import FileExecutor
from executorlib.standalone.cache.queue import execute_with_pysqa

skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("PYMPIPOOL_PMIX", None)
except ImportError:
skip_flux_test = True


skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None


def mpi_funct(i):
from mpi4py import MPI

size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
return i, size, rank


@unittest.skipIf(
skip_flux_test or skip_mpi4py_test,
"h5py or mpi4py or flux are not installed, so the h5py, flux and mpi4py tests are skipped.",
)
class TestCacheExecutorPysqa(unittest.TestCase):
def test_executor(self):
with FileExecutor(
cores_per_worker=2,
execute_function=execute_with_pysqa,
backend="flux",
) as exe:
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
self.assertTrue(fs1.done())

def tearDown(self):
if os.path.exists("cache"):
shutil.rmtree("cache")
Loading