Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NF: Add Environment class, with initial Native/Docker implementations #516

Merged
merged 39 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9406a9c
NF: Add Environment class, with initial Native/Docker implementations
effigies Mar 22, 2022
9f11b85
TEST: Add basic test for native environment
effigies Mar 31, 2022
58b4a34
FIX: import and arg
effigies Mar 31, 2022
39874d4
removing resetting self.output_ to None (not sure why it was needed)
djarecka Jun 15, 2022
b6b8130
implementing execute for the Docker env; changes to the ShellCommandT…
djarecka Jan 19, 2023
5922c47
updating docker tests; fixing issue with working directory for docker
djarecka Jan 30, 2023
dcac97e
adding need docker to the tests
djarecka Feb 27, 2023
4f21181
FIX: Bad rebase
effigies Aug 29, 2023
42a0588
RF: Rewrite _check_input with FileSets
effigies Aug 29, 2023
3dbc054
FIX: Missing argument
effigies Aug 29, 2023
81abb09
FIX: Get mode
effigies Aug 29, 2023
f495366
TEST: Update tests to be typing-friendly
effigies Aug 29, 2023
377692a
Merge branch 'rf/environments' of https://github.com/nipype/pydra int…
djarecka Sep 16, 2023
2eb88da
Update pydra/engine/task.py
djarecka Sep 17, 2023
ff3f5d0
Update pydra/engine/task.py
djarecka Sep 17, 2023
6889577
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 17, 2023
25f14a2
Merge branch 'rf/environments' of https://github.com/nipype/pydra int…
djarecka Sep 17, 2023
e4644a5
removing temporary old env, some renaming
djarecka Sep 17, 2023
fa240d9
Merge pull request #705 from djarecka/env
djarecka Sep 17, 2023
aaa399f
adding environment to run_el for slurm worker
djarecka Sep 18, 2023
7d39b4d
cleaning DockerTask remains
djarecka Sep 19, 2023
4d2098b
Merge pull request #706 from djarecka/env
djarecka Sep 19, 2023
bd76c3d
adding the Singularity environment class
djarecka Oct 5, 2023
9cc0904
Merge pull request #711 from djarecka/env
djarecka Oct 5, 2023
979bb60
removing psi plugin from one test
djarecka Oct 28, 2023
2dd5603
creating Container class
djarecka Nov 2, 2023
161635b
cleaning: removing ContainerTask and ContainerSpec
djarecka Nov 3, 2023
8d60dd1
adding xarg to env command, changing output_cpath to root used in the…
djarecka Nov 3, 2023
58038f5
Merge pull request #718 from djarecka/env
djarecka Nov 3, 2023
9673908
Update pydra/engine/environments.py
djarecka Nov 15, 2023
89de9e6
Update pydra/engine/environments.py
djarecka Nov 15, 2023
bfaa681
Update pydra/engine/environments.py
djarecka Nov 15, 2023
575785e
Update pydra/engine/tests/test_specs.py
djarecka Nov 15, 2023
3d54253
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 15, 2023
73e6856
Update pydra/engine/environments.py
djarecka Nov 15, 2023
d1793cb
small fix
djarecka Nov 15, 2023
89c1e27
Merge pull request #721 from djarecka/env
djarecka Nov 15, 2023
be5a870
small edits to the core
djarecka Nov 15, 2023
0b0c71b
Merge pull request #722 from djarecka/env
djarecka Nov 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pydra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
import attr

from . import mark
from .engine import AuditFlag, DockerTask, ShellCommandTask, Submitter, Workflow, specs
from .engine import AuditFlag, ShellCommandTask, Submitter, Workflow, specs

__all__ = (
"Submitter",
"Workflow",
"AuditFlag",
"ShellCommandTask",
"DockerTask",
"specs",
"mark",
)
Expand Down
3 changes: 1 addition & 2 deletions pydra/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
"""The core of the workflow engine."""
from .submitter import Submitter
from .core import Workflow
from .task import AuditFlag, ShellCommandTask, DockerTask
from .task import AuditFlag, ShellCommandTask
from . import specs

__all__ = [
"AuditFlag",
"DockerTask",
"ShellCommandTask",
"Submitter",
"Workflow",
Expand Down
19 changes: 13 additions & 6 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,13 @@ def cont_dim(self, cont_dim):
self._cont_dim = cont_dim

def __call__(
self, submitter=None, plugin=None, plugin_kwargs=None, rerun=False, **kwargs
self,
submitter=None,
plugin=None,
plugin_kwargs=None,
rerun=False,
environment=None,
**kwargs,
):
"""Make tasks callable themselves."""
from .submitter import Submitter
Expand All @@ -449,9 +455,9 @@ def __call__(
if submitter:
with submitter as sub:
self.inputs = attr.evolve(self.inputs, **kwargs)
res = sub(self)
res = sub(self, environment=environment)
else: # tasks without state could be run without a submitter
res = self._run(rerun=rerun, **kwargs)
res = self._run(rerun=rerun, environment=environment, **kwargs)
return res

def _modify_inputs(self):
Expand Down Expand Up @@ -501,7 +507,7 @@ def _populate_filesystem(self, checksum, output_dir):
shutil.rmtree(output_dir)
output_dir.mkdir(parents=False, exist_ok=self.can_resume)

def _run(self, rerun=False, **kwargs):
def _run(self, rerun=False, environment=None, **kwargs):
self.inputs = attr.evolve(self.inputs, **kwargs)
self.inputs.check_fields_input_spec()

Expand All @@ -518,6 +524,7 @@ def _run(self, rerun=False, **kwargs):
return result
cwd = os.getcwd()
self._populate_filesystem(checksum, output_dir)
os.chdir(output_dir)
orig_inputs = self._modify_inputs()
result = Result(output=None, runtime=None, errored=False)
self.hooks.pre_run_task(self)
Expand All @@ -526,7 +533,7 @@ def _run(self, rerun=False, **kwargs):
self.audit.audit_task(task=self)
try:
self.audit.monitor()
self._run_task()
self._run_task(environment=environment)
result.output = self._collect_outputs(output_dir=output_dir)
except Exception:
etype, eval, etr = sys.exc_info()
Expand All @@ -538,7 +545,7 @@ def _run(self, rerun=False, **kwargs):
self.hooks.post_run_task(self, result)
self.audit.finalize_audit(result)
save(output_dir, result=result, task=self)
self.output_ = None
# self.output_ = None
djarecka marked this conversation as resolved.
Show resolved Hide resolved
# removing the additional file with the chcksum
(self.cache_dir / f"{self.uid}_info.json").unlink()
# # function etc. shouldn't change anyway, so removing
Expand Down
123 changes: 123 additions & 0 deletions pydra/engine/environments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from .helpers import execute

from pathlib import Path


class Environment:
effigies marked this conversation as resolved.
Show resolved Hide resolved
def setup(self):
pass

Check warning on line 8 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L8

Added line #L8 was not covered by tests

def execute(self, task):
raise NotImplementedError

Check warning on line 11 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L11

Added line #L11 was not covered by tests

def teardown(self):
pass

Check warning on line 14 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L14

Added line #L14 was not covered by tests


class Native(Environment):
def execute(self, task):
# breakpoint()
# args = task.render_arguments_in_root()
keys = ["return_code", "stdout", "stderr"]
values = execute(task.command_args(), strip=task.strip)
output = dict(zip(keys, values))
if output["return_code"]:
msg = f"Error running '{task.name}' task with {task.command_args()}:"
if output["stderr"]:
msg += "\n\nstderr:\n" + output["stderr"]
if output["stdout"]:
msg += "\n\nstdout:\n" + output["stdout"]
raise RuntimeError(msg)
return output


class Docker(Environment):
def __init__(self, image, tag="latest", output_cpath="/output_pydra", xargs=None):
self.image = image
self.tag = tag
self.xargs = xargs
self.output_cpath = output_cpath

@staticmethod
def bind(loc, mode="ro", root="/mnt/pydra"): # TODO
# XXX Failure mode: {loc} overwrites a critical directory in image
djarecka marked this conversation as resolved.
Show resolved Hide resolved
# To fix, we'll need to update any args within loc to a new location
# such as /mnt/pydra/loc
loc_abs = Path(loc).absolute()
return f"{loc_abs}:{root}{loc_abs}:{mode}" # TODO: moving entire path?

def execute(self, task, root="/mnt/pydra"):
# XXX Need to mount all input locations
docker_img = f"{self.image}:{self.tag}"
# TODO ?
# Skips over any inputs in task.cache_dir
# Needs to include `out_file`s when not relative to working dir
# Possibly a `TargetFile` type to distinguish between `File` and `str`?
mounts = task.get_bindings(root=root)

# todo adding xargsy etc
docker_args = ["docker", "run", "-v", self.bind(task.cache_dir, "rw")]
docker_args.extend(
" ".join(
[f"-v {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
).split()
)
docker_args.extend(["-w", f"{root}{task.output_dir}"])
keys = ["return_code", "stdout", "stderr"]
# print("\n Docker args", docker_args)
djarecka marked this conversation as resolved.
Show resolved Hide resolved

values = execute(
docker_args + [docker_img] + task.command_args(root="/mnt/pydra"),
strip=task.strip,
)
output = dict(zip(keys, values))
if output["return_code"]:
if output["stderr"]:
raise RuntimeError(output["stderr"])

Check warning on line 76 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L76

Added line #L76 was not covered by tests
else:
raise RuntimeError(output["stdout"])

Check warning on line 78 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L78

Added line #L78 was not covered by tests
# Any outputs that have been created with a re-rooted path need
# to be de-rooted
# task.finalize_outputs("/mnt/pydra") TODO: probably don't need it
return output


class Singularity(Docker):
djarecka marked this conversation as resolved.
Show resolved Hide resolved
def execute(self, task, root="/mnt/pydra"):
# XXX Need to mount all input locations
singularity_img = f"{self.image}:{self.tag}"
# TODO ?
# Skips over any inputs in task.cache_dir
# Needs to include `out_file`s when not relative to working dir
# Possibly a `TargetFile` type to distinguish between `File` and `str`?
mounts = task.get_bindings(root=root)

# todo adding xargsy etc
singularity_args = [
"singularity",
"exec",
"-B",
self.bind(task.cache_dir, "rw"),
]
singularity_args.extend(
" ".join(
[f"-B {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
).split()
)
singularity_args.extend(["--pwd", f"{root}{task.output_dir}"])
keys = ["return_code", "stdout", "stderr"]

values = execute(
singularity_args + [singularity_img] + task.command_args(root="/mnt/pydra"),
strip=task.strip,
)
output = dict(zip(keys, values))
if output["return_code"]:
if output["stderr"]:
raise RuntimeError(output["stderr"])

Check warning on line 117 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L117

Added line #L117 was not covered by tests
else:
raise RuntimeError(output["stdout"])

Check warning on line 119 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L119

Added line #L119 was not covered by tests
# Any outputs that have been created with a re-rooted path need
# to be de-rooted
# task.finalize_outputs("/mnt/pydra") TODO: probably don't need it
return output
14 changes: 0 additions & 14 deletions pydra/engine/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,20 +693,6 @@ class ContainerSpec(ShellSpec):
)
djarecka marked this conversation as resolved.
Show resolved Hide resolved


@attr.s(auto_attribs=True, kw_only=True)
class DockerSpec(ContainerSpec):
"""Particularize container specifications to the Docker engine."""

container: str = attr.ib("docker", metadata={"help_string": "container"})


@attr.s(auto_attribs=True, kw_only=True)
class SingularitySpec(ContainerSpec):
"""Particularize container specifications to Singularity."""

container: str = attr.ib("singularity", metadata={"help_string": "container type"})


@attr.s
class LazyInterface:
_task: "core.TaskBase" = attr.ib()
Expand Down
15 changes: 9 additions & 6 deletions pydra/engine/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ def __init__(self, plugin="cf", **kwargs):
raise NotImplementedError(f"No worker for {self.plugin}")
self.worker.loop = self.loop

def __call__(self, runnable, cache_locations=None, rerun=False):
def __call__(self, runnable, cache_locations=None, rerun=False, environment=None):
"""Submitter run function."""
if cache_locations is not None:
runnable.cache_locations = cache_locations
self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
self.loop.run_until_complete(
self.submit_from_call(runnable, rerun, environment)
)
return runnable.result()

async def submit_from_call(self, runnable, rerun):
async def submit_from_call(self, runnable, rerun, environment):
"""
This coroutine should only be called once per Submitter call,
and serves as the bridge between sync/async lands.
Expand All @@ -56,7 +58,7 @@ async def submit_from_call(self, runnable, rerun):
Once Python 3.10 is the minimum, this should probably be refactored into using
structural pattern matching.
"""
if is_workflow(runnable):
if is_workflow(runnable): # TODO: env to wf
# connect and calculate the checksum of the graph before running
runnable._connect_and_propagate_to_tasks(override_task_caches=True)
# 0
Expand All @@ -74,10 +76,11 @@ async def submit_from_call(self, runnable, rerun):
# 2
if runnable.state is None:
# run_el should always return a coroutine
await self.worker.run_el(runnable, rerun=rerun)
print("in SUBM", environment)
await self.worker.run_el(runnable, rerun=rerun, environment=environment)
# 3
else:
await self.expand_runnable(runnable, wait=True, rerun=rerun)
await self.expand_runnable(runnable, wait=True, rerun=rerun) # TODO
return True

async def expand_runnable(self, runnable, wait=False, rerun=False):
Expand Down
Loading
Loading