Skip to content

Draft: Adding new worker which uses PSI/J to run tasks #694

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 45 commits into from
Sep 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
71d0744
add psij worker
adi611 Sep 1, 2023
6d80f02
add psij worker - 2
adi611 Sep 1, 2023
85a05f6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 1, 2023
99ab49a
test
adi611 Sep 7, 2023
bb1078d
update psijworker
adi611 Sep 7, 2023
942a68d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 7, 2023
233a076
fix: distributing tests with pytest -n auto
adi611 Sep 12, 2023
d9bca02
fix: issue with distributing tests with pytest -n auto
adi611 Sep 12, 2023
86b4377
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 12, 2023
3eda5d3
generalize use-case for psijworker
adi611 Sep 15, 2023
f2907c0
fixes for conflicts
adi611 Sep 15, 2023
af84f02
fix path for stdout/stderr
adi611 Sep 16, 2023
4fd8bb2
replace with single function for psijworker
adi611 Sep 18, 2023
8ea8256
add psij subtype to WORKERS
adi611 Sep 19, 2023
c4dbb9b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 19, 2023
6b4e098
Merge remote-tracking branch 'origin/master' into adi611-patch-testps…
adi611 Sep 20, 2023
5035284
add testing for psij
adi611 Sep 20, 2023
892f258
add testing for psij - 2
adi611 Sep 20, 2023
fe97f8e
fix: could not raise exception
adi611 Sep 20, 2023
1ba1a8d
fix: check rerun error
adi611 Sep 20, 2023
cf51ce7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 20, 2023
7f7662a
remove hardcoding of plugin value - test_wf_lzoutall_st_2a
adi611 Sep 20, 2023
f3ebeda
test psij-slurm
adi611 Sep 20, 2023
089c360
test psij-slurm - 2
adi611 Sep 20, 2023
589755d
add psij option in conftest.py
adi611 Sep 20, 2023
8ba3219
add testing for psij - 3
adi611 Sep 20, 2023
c2eefb8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 20, 2023
8ff8fa4
add psij option in conftest.py - 2
adi611 Sep 20, 2023
ecd5a93
Merge branch 'adi611-patch-testpsij-1' of https://github.com/adi611/p…
adi611 Sep 20, 2023
50c12b2
fix: psij slurm workflow invalid requirement
adi611 Sep 20, 2023
cbfd9d6
check only for psij-slurm when --psij=slurm
adi611 Sep 20, 2023
4033c0d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 20, 2023
3677954
remove redundant imports
adi611 Sep 21, 2023
f0c62c1
make subtype as required parameter
adi611 Sep 21, 2023
25cbc2f
add/improve documentation for PsijWorker
adi611 Sep 21, 2023
105f38f
crlf to lf
adi611 Sep 22, 2023
22b97b5
correct line endings
adi611 Sep 22, 2023
8f160a0
check if subtype is valid
adi611 Sep 22, 2023
7a70c18
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 22, 2023
99a8b3d
add psijworker description
adi611 Sep 22, 2023
789800e
use pathlib
adi611 Sep 23, 2023
a2941eb
Merge branch 'adi611-patch-testpsij-1' of https://github.com/adi611/p…
adi611 Sep 23, 2023
07f8107
make naming more clear
adi611 Sep 23, 2023
9b7d4e5
improve psijworker performance - remove unnecessary pickle dumps
adi611 Sep 23, 2023
2c695d5
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .github/workflows/testpsijlocal.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: PSI/J-Local

on:
push:
branches:
- master
pull_request:

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

permissions:
contents: read

jobs:
test:
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
python-version: ['3.11']
fail-fast: false
runs-on: ${{ matrix.os }}

steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
repository: ${{ github.repository }}

- name: Setup Python version ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies for PSI/J
run: |
pip install -e ".[test, psij]"

- name: Run tests for PSI/J
run: |
pytest --color=yes -vs --psij=local -n auto pydra/engine --cov pydra --cov-config .coveragerc --cov-report xml:cov.xml

- name: Upload to codecov
run: codecov -f cov.xml -F unittests -e GITHUB_WORKFLOW
54 changes: 54 additions & 0 deletions .github/workflows/testpsijslurm.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
name: PSI/J-SLURM

on:
push:
branches:
- master
pull_request:

jobs:
build:
strategy:
matrix:
python-version: [3.11.5]
fail-fast: false
runs-on: ubuntu-latest
env:
DOCKER_IMAGE: adi611/docker-centos7-slurm:23.02.1

steps:
- name: Disable etelemetry
run: echo "NO_ET=TRUE" >> $GITHUB_ENV
- uses: actions/checkout@v4
- name: Pull docker image
run: |
docker pull $DOCKER_IMAGE
# Have image running in the background
docker run `bash <(curl -s https://codecov.io/env)` -itd -h slurmctl --cap-add sys_admin -d --name slurm -v `pwd`:/pydra -e NO_ET=$NO_ET $DOCKER_IMAGE
- name: Display previous jobs with sacct
run: |
echo "Allowing ports/daemons time to start" && sleep 10
docker exec slurm bash -c "sacctmgr -i add account none,test Cluster=linux Description='none' Organization='none'"
docker exec slurm bash -c "sacct && sinfo && squeue" 2&> /dev/null
if [ $? -ne 0 ]; then
echo "Slurm docker image error"
exit 1
fi
- name: Setup Python
run: |
docker exec slurm bash -c "echo $NO_ET"
docker exec slurm bash -c "ls -la && echo list top level dir"
docker exec slurm bash -c "ls -la /pydra && echo list pydra dir"
if [[ "${{ matrix.python-version }}" == "3.11.5" ]]; then
docker exec slurm bash -c "CONFIGURE_OPTS=\"-with-openssl=/opt/openssl\" pyenv install -v 3.11.5"
fi
docker exec slurm bash -c "pyenv global ${{ matrix.python-version }}"
docker exec slurm bash -c "pip install --upgrade pip && pip install -e /pydra[test,psij] && python -c 'import pydra; print(pydra.__version__)'"
- name: Run pytest
run: |
docker exec slurm bash -c "pytest --color=yes -vs -n auto --psij=slurm --cov pydra --cov-config /pydra/.coveragerc --cov-report xml:/pydra/cov.xml --doctest-modules /pydra/pydra/ -k 'not test_audit_prov and not test_audit_prov_messdir_1 and not test_audit_prov_messdir_2 and not test_audit_prov_wf and not test_audit_all'"
- name: Upload to codecov
run: |
docker exec slurm bash -c "pip install urllib3==1.26.6"
docker exec slurm bash -c "codecov --root /pydra -f /pydra/cov.xml -F unittests"
docker rm -f slurm
26 changes: 26 additions & 0 deletions pydra/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@

def pytest_addoption(parser):
parser.addoption("--dask", action="store_true", help="run all combinations")
parser.addoption(
"--psij",
action="store",
help="run with psij subtype plugin",
choices=["local", "slurm"],
)


def pytest_generate_tests(metafunc):
Expand All @@ -21,6 +27,16 @@ def pytest_generate_tests(metafunc):
except ValueError:
# Called as --pyargs, so --dask isn't available
pass
try:
if metafunc.config.getoption("psij"):
Plugins.append("psij-" + metafunc.config.getoption("psij"))
if (
bool(shutil.which("sbatch"))
and metafunc.config.getoption("psij") == "slurm"
):
Plugins.remove("slurm")
except ValueError:
pass
metafunc.parametrize("plugin_dask_opt", Plugins)

if "plugin" in metafunc.fixturenames:
Expand All @@ -35,6 +51,16 @@ def pytest_generate_tests(metafunc):
Plugins = ["slurm"]
else:
Plugins = ["cf"]
try:
if metafunc.config.getoption("psij"):
Plugins.append("psij-" + metafunc.config.getoption("psij"))
if (
bool(shutil.which("sbatch"))
and metafunc.config.getoption("psij") == "slurm"
):
Plugins.remove("slurm")
except ValueError:
pass
metafunc.parametrize("plugin", Plugins)


Expand Down
31 changes: 31 additions & 0 deletions pydra/engine/run_pickled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pickle
import sys
from pydra.engine.helpers import load_and_run


def run_pickled(*file_paths, rerun=False):
loaded_objects = []

for file_path in file_paths:
with open(file_path, "rb") as file:
loaded_objects.append(pickle.load(file))

if len(loaded_objects) == 1:
result = loaded_objects[0](rerun=rerun)
elif len(loaded_objects) == 2:
result = load_and_run(loaded_objects[0], loaded_objects[1], rerun=rerun)
else:
raise ValueError("Unsupported number of loaded objects")

print(f"Result: {result}")


if __name__ == "__main__":
rerun = False # Default value for rerun
file_paths = sys.argv[1:]

if "--rerun" in file_paths:
rerun = True
file_paths.remove("--rerun")

run_pickled(*file_paths, rerun=rerun)
2 changes: 1 addition & 1 deletion pydra/engine/tests/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4092,7 +4092,7 @@ def test_wf_lzoutall_st_2a(plugin, tmpdir):
wf.plugin = plugin
wf.cache_dir = tmpdir

with Submitter(plugin="cf") as sub:
with Submitter(plugin=plugin) as sub:
sub(wf)

assert wf.output_dir.exists()
Expand Down
148 changes: 148 additions & 0 deletions pydra/engine/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -892,10 +892,158 @@ def close(self):
pass


class PsijWorker(Worker):
"""A worker to execute tasks using PSI/J."""

def __init__(self, subtype, **kwargs):
"""
Initialize PsijWorker.

Parameters
----------
subtype : str
Scheduler for PSI/J.
"""
try:
import psij
except ImportError:
logger.critical("Please install psij.")
raise
logger.debug("Initialize PsijWorker")
self.psij = psij

# Check if the provided subtype is valid
valid_subtypes = ["local", "slurm"]
if subtype not in valid_subtypes:
raise ValueError(
f"Invalid 'subtype' provided. Available options: {', '.join(valid_subtypes)}"
)

self.subtype = subtype

def run_el(self, interface, rerun=False, **kwargs):
"""Run a task."""
return self.exec_psij(interface, rerun=rerun)

def make_spec(self, cmd=None, arg=None):
"""
Create a PSI/J job specification.

Parameters
----------
cmd : str, optional
Executable command. Defaults to None.
arg : list, optional
List of arguments. Defaults to None.

Returns
-------
psij.JobSpec
PSI/J job specification.
"""
spec = self.psij.JobSpec()
spec.executable = cmd
spec.arguments = arg

return spec

def make_job(self, spec, attributes):
"""
Create a PSI/J job.

Parameters
----------
spec : psij.JobSpec
PSI/J job specification.
attributes : any
Job attributes.

Returns
-------
psij.Job
PSI/J job.
"""
job = self.psij.Job()
job.spec = spec
return job

async def exec_psij(self, runnable, rerun=False):
"""
Run a task (coroutine wrapper).

Raises
------
Exception
If stderr is not empty.

Returns
-------
None
"""
import pickle
from pathlib import Path

jex = self.psij.JobExecutor.get_instance(self.subtype)
absolute_path = Path(__file__).parent

if isinstance(runnable, TaskBase):
cache_dir = runnable.cache_dir
file_path = cache_dir / "runnable_function.pkl"
with open(file_path, "wb") as file:
pickle.dump(runnable._run, file)
func_path = absolute_path / "run_pickled.py"
spec = self.make_spec("python", [func_path, file_path])
else: # it could be tuple that includes pickle files with tasks and inputs
cache_dir = runnable[-1].cache_dir
file_path_1 = cache_dir / "taskmain.pkl"
file_path_2 = cache_dir / "ind.pkl"
ind, task_main_pkl, task_orig = runnable
with open(file_path_1, "wb") as file:
pickle.dump(task_main_pkl, file)
with open(file_path_2, "wb") as file:
pickle.dump(ind, file)
func_path = absolute_path / "run_pickled.py"
spec = self.make_spec(
"python",
[
func_path,
file_path_1,
file_path_2,
],
)

if rerun:
spec.arguments.append("--rerun")

spec.stdout_path = cache_dir / "demo.stdout"
spec.stderr_path = cache_dir / "demo.stderr"

job = self.make_job(spec, None)
jex.submit(job)
job.wait()

if spec.stderr_path.stat().st_size > 0:
with open(spec.stderr_path, "r") as stderr_file:
stderr_contents = stderr_file.read()
raise Exception(
f"stderr_path '{spec.stderr_path}' is not empty. Contents:\n{stderr_contents}"
)

return

def close(self):
"""Finalize the internal pool of tasks."""
pass


WORKERS = {
"serial": SerialWorker,
"cf": ConcurrentFuturesWorker,
"slurm": SlurmWorker,
"dask": DaskWorker,
"sge": SGEWorker,
**{
"psij-" + subtype: lambda subtype=subtype: PsijWorker(subtype=subtype)
for subtype in ["local", "slurm"]
},
}
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ classifiers = [
dynamic = ["version"]

[project.optional-dependencies]
psij = [
"psij-python",
]
dask = [
"dask",
"distributed",
Expand Down