Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NF: Add Environment class, with initial Native/Docker implementations #516

Merged
merged 39 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9406a9c
NF: Add Environment class, with initial Native/Docker implementations
effigies Mar 22, 2022
9f11b85
TEST: Add basic test for native environment
effigies Mar 31, 2022
58b4a34
FIX: import and arg
effigies Mar 31, 2022
39874d4
removing resetting self.output_ to None (not sure why it was needed)
djarecka Jun 15, 2022
b6b8130
implementing execute for the Docker env; changes to the ShellCommandT…
djarecka Jan 19, 2023
5922c47
updating docker tests; fixing issue with working directory for docker
djarecka Jan 30, 2023
dcac97e
adding need docker to the tests
djarecka Feb 27, 2023
4f21181
FIX: Bad rebase
effigies Aug 29, 2023
42a0588
RF: Rewrite _check_input with FileSets
effigies Aug 29, 2023
3dbc054
FIX: Missing argument
effigies Aug 29, 2023
81abb09
FIX: Get mode
effigies Aug 29, 2023
f495366
TEST: Update tests to be typing-friendly
effigies Aug 29, 2023
377692a
Merge branch 'rf/environments' of https://github.com/nipype/pydra int…
djarecka Sep 16, 2023
2eb88da
Update pydra/engine/task.py
djarecka Sep 17, 2023
ff3f5d0
Update pydra/engine/task.py
djarecka Sep 17, 2023
6889577
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 17, 2023
25f14a2
Merge branch 'rf/environments' of https://github.com/nipype/pydra int…
djarecka Sep 17, 2023
e4644a5
removing temporary old env, some renaming
djarecka Sep 17, 2023
fa240d9
Merge pull request #705 from djarecka/env
djarecka Sep 17, 2023
aaa399f
adding environment to run_el for slurm worker
djarecka Sep 18, 2023
7d39b4d
cleaning DockerTask remains
djarecka Sep 19, 2023
4d2098b
Merge pull request #706 from djarecka/env
djarecka Sep 19, 2023
bd76c3d
adding the Singularity environment class
djarecka Oct 5, 2023
9cc0904
Merge pull request #711 from djarecka/env
djarecka Oct 5, 2023
979bb60
removing psi plugin from one test
djarecka Oct 28, 2023
2dd5603
creating Container class
djarecka Nov 2, 2023
161635b
cleaning: removing ContainerTask and ContainerSpec
djarecka Nov 3, 2023
8d60dd1
adding xarg to env command, changing output_cpath to root used in the…
djarecka Nov 3, 2023
58038f5
Merge pull request #718 from djarecka/env
djarecka Nov 3, 2023
9673908
Update pydra/engine/environments.py
djarecka Nov 15, 2023
89de9e6
Update pydra/engine/environments.py
djarecka Nov 15, 2023
bfaa681
Update pydra/engine/environments.py
djarecka Nov 15, 2023
575785e
Update pydra/engine/tests/test_specs.py
djarecka Nov 15, 2023
3d54253
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 15, 2023
73e6856
Update pydra/engine/environments.py
djarecka Nov 15, 2023
d1793cb
small fix
djarecka Nov 15, 2023
89c1e27
Merge pull request #721 from djarecka/env
djarecka Nov 15, 2023
be5a870
small edits to the core
djarecka Nov 15, 2023
0b0c71b
Merge pull request #722 from djarecka/env
djarecka Nov 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pydra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
import attr

from . import mark
from .engine import AuditFlag, DockerTask, ShellCommandTask, Submitter, Workflow, specs
from .engine import AuditFlag, ShellCommandTask, Submitter, Workflow, specs

__all__ = (
"Submitter",
"Workflow",
"AuditFlag",
"ShellCommandTask",
"DockerTask",
"specs",
"mark",
)
Expand Down
3 changes: 1 addition & 2 deletions pydra/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
"""The core of the workflow engine."""
from .submitter import Submitter
from .core import Workflow
from .task import AuditFlag, ShellCommandTask, DockerTask
from .task import AuditFlag, ShellCommandTask
from . import specs

__all__ = [
"AuditFlag",
"DockerTask",
"ShellCommandTask",
"Submitter",
"Workflow",
Expand Down
23 changes: 14 additions & 9 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,13 @@ def cont_dim(self, cont_dim):
self._cont_dim = cont_dim

def __call__(
self, submitter=None, plugin=None, plugin_kwargs=None, rerun=False, **kwargs
self,
submitter=None,
plugin=None,
plugin_kwargs=None,
rerun=False,
environment=None,
**kwargs,
):
"""Make tasks callable themselves."""
from .submitter import Submitter
Expand All @@ -449,9 +455,9 @@ def __call__(
if submitter:
with submitter as sub:
self.inputs = attr.evolve(self.inputs, **kwargs)
res = sub(self)
res = sub(self, environment=environment)
else: # tasks without state could be run without a submitter
res = self._run(rerun=rerun, **kwargs)
res = self._run(rerun=rerun, environment=environment, **kwargs)
return res

def _modify_inputs(self):
Expand Down Expand Up @@ -501,7 +507,7 @@ def _populate_filesystem(self, checksum, output_dir):
shutil.rmtree(output_dir)
output_dir.mkdir(parents=False, exist_ok=self.can_resume)

def _run(self, rerun=False, **kwargs):
def _run(self, rerun=False, environment=None, **kwargs):
self.inputs = attr.evolve(self.inputs, **kwargs)
self.inputs.check_fields_input_spec()

Expand All @@ -518,6 +524,7 @@ def _run(self, rerun=False, **kwargs):
return result
cwd = os.getcwd()
self._populate_filesystem(checksum, output_dir)
os.chdir(output_dir)
orig_inputs = self._modify_inputs()
result = Result(output=None, runtime=None, errored=False)
self.hooks.pre_run_task(self)
Expand All @@ -526,7 +533,7 @@ def _run(self, rerun=False, **kwargs):
self.audit.audit_task(task=self)
try:
self.audit.monitor()
self._run_task()
self._run_task(environment=environment)
result.output = self._collect_outputs(output_dir=output_dir)
except Exception:
etype, eval, etr = sys.exc_info()
Expand All @@ -538,7 +545,6 @@ def _run(self, rerun=False, **kwargs):
self.hooks.post_run_task(self, result)
self.audit.finalize_audit(result)
save(output_dir, result=result, task=self)
self.output_ = None
# removing the additional file with the chcksum
(self.cache_dir / f"{self.uid}_info.json").unlink()
# # function etc. shouldn't change anyway, so removing
Expand All @@ -551,15 +557,14 @@ def _run(self, rerun=False, **kwargs):
return result

def _collect_outputs(self, output_dir):
run_output = self.output_
output_klass = make_klass(self.output_spec)
output = output_klass(
**{f.name: attr.NOTHING for f in attr.fields(output_klass)}
)
other_output = output.collect_additional_outputs(
self.inputs, output_dir, run_output
self.inputs, output_dir, self.output_
)
return attr.evolve(output, **run_output, **other_output)
return attr.evolve(output, **self.output_, **other_output)

def split(
self,
Expand Down
157 changes: 157 additions & 0 deletions pydra/engine/environments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from .helpers import execute

from pathlib import Path


class Environment:
effigies marked this conversation as resolved.
Show resolved Hide resolved
"""
Base class for environments that are used to execute tasks.
Right now it is asssumed that the environment, including container images,
are available and are not removed at the end
TODO: add setup and teardown methods
"""

def setup(self):
pass

Check warning on line 15 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L15

Added line #L15 was not covered by tests

def execute(self, task):
"""
Execute the task in the environment.

Parameters
----------
task : TaskBase
the task to execute

Returns
-------
output
Output of the task.
"""
raise NotImplementedError

Check warning on line 31 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L31

Added line #L31 was not covered by tests

def teardown(self):
pass

Check warning on line 34 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L34

Added line #L34 was not covered by tests


class Native(Environment):
"""
Native environment, i.e. the tasks are executed in the current python environment.
"""

def execute(self, task):
keys = ["return_code", "stdout", "stderr"]
values = execute(task.command_args(), strip=task.strip)
output = dict(zip(keys, values))
if output["return_code"]:
msg = f"Error running '{task.name}' task with {task.command_args()}:"
if output["stderr"]:
msg += "\n\nstderr:\n" + output["stderr"]
if output["stdout"]:
msg += "\n\nstdout:\n" + output["stdout"]
raise RuntimeError(msg)
return output


class Container(Environment):
"""
Base class for container environments used by Docker and Singularity.

Parameters
----------
image : str
Name of the container image
tag : str
Tag of the container image
root : str
Base path for mounting host directories into the container
xargs : Union[str, List[str]]
Extra arguments to be passed to the container
"""

def __init__(self, image, tag="latest", root="/mnt/pydra", xargs=None):
self.image = image
self.tag = tag
if xargs is None:
xargs = []
elif isinstance(xargs, str):
xargs = xargs.split()
self.xargs = xargs
self.root = root

def bind(self, loc, mode="ro"):
loc_abs = Path(loc).absolute()
return f"{loc_abs}:{self.root}{loc_abs}:{mode}"


class Docker(Container):
"""Docker environment."""

def execute(self, task):
docker_img = f"{self.image}:{self.tag}"
# mounting all input locations
mounts = task.get_bindings(root=self.root)
effigies marked this conversation as resolved.
Show resolved Hide resolved

docker_args = [
"docker",
"run",
"-v",
self.bind(task.cache_dir, "rw"),
*self.xargs,
]
docker_args.extend(
" ".join(
[f"-v {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
).split()
)
docker_args.extend(["-w", f"{self.root}{task.output_dir}"])
keys = ["return_code", "stdout", "stderr"]

values = execute(
docker_args + [docker_img] + task.command_args(root=self.root),
strip=task.strip,
)
output = dict(zip(keys, values))
if output["return_code"]:
if output["stderr"]:
raise RuntimeError(output["stderr"])

Check warning on line 117 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L117

Added line #L117 was not covered by tests
else:
raise RuntimeError(output["stdout"])

Check warning on line 119 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L119

Added line #L119 was not covered by tests
return output


class Singularity(Container):
"""Singularity environment."""

def execute(self, task):
singularity_img = f"{self.image}:{self.tag}"
# mounting all input locations
mounts = task.get_bindings(root=self.root)

# todo adding xargsy etc
singularity_args = [
"singularity",
"exec",
"-B",
self.bind(task.cache_dir, "rw"),
*self.xargs,
]
singularity_args.extend(
" ".join(
[f"-B {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
).split()
)
singularity_args.extend(["--pwd", f"{self.root}{task.output_dir}"])
keys = ["return_code", "stdout", "stderr"]

values = execute(
singularity_args + [singularity_img] + task.command_args(root=self.root),
strip=task.strip,
)
output = dict(zip(keys, values))
if output["return_code"]:
if output["stderr"]:
raise RuntimeError(output["stderr"])

Check warning on line 154 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L154

Added line #L154 was not covered by tests
else:
raise RuntimeError(output["stdout"])

Check warning on line 156 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L156

Added line #L156 was not covered by tests
return output
31 changes: 0 additions & 31 deletions pydra/engine/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,37 +676,6 @@ def _check_requires(self, fld, inputs):
return False


@attr.s(auto_attribs=True, kw_only=True)
class ContainerSpec(ShellSpec):
"""Refine the generic command-line specification to container execution."""

image: ty.Union[File, str] = attr.ib(
metadata={"help_string": "image", "mandatory": True}
)
"""The image to be containerized."""
container: ty.Union[File, str, None] = attr.ib(
metadata={"help_string": "container"}
)
"""The container."""
container_xargs: ty.Optional[ty.List[str]] = attr.ib(
default=None, metadata={"help_string": "todo"}
)


@attr.s(auto_attribs=True, kw_only=True)
class DockerSpec(ContainerSpec):
"""Particularize container specifications to the Docker engine."""

container: str = attr.ib("docker", metadata={"help_string": "container"})


@attr.s(auto_attribs=True, kw_only=True)
class SingularitySpec(ContainerSpec):
"""Particularize container specifications to Singularity."""

container: str = attr.ib("singularity", metadata={"help_string": "container type"})


@attr.s
class LazyInterface:
_task: "core.TaskBase" = attr.ib()
Expand Down
15 changes: 9 additions & 6 deletions pydra/engine/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ def __init__(self, plugin="cf", **kwargs):
raise NotImplementedError(f"No worker for {self.plugin}")
self.worker.loop = self.loop

def __call__(self, runnable, cache_locations=None, rerun=False):
def __call__(self, runnable, cache_locations=None, rerun=False, environment=None):
"""Submitter run function."""
if cache_locations is not None:
runnable.cache_locations = cache_locations
self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
self.loop.run_until_complete(
self.submit_from_call(runnable, rerun, environment)
)
return runnable.result()

async def submit_from_call(self, runnable, rerun):
async def submit_from_call(self, runnable, rerun, environment):
"""
This coroutine should only be called once per Submitter call,
and serves as the bridge between sync/async lands.
Expand All @@ -56,7 +58,7 @@ async def submit_from_call(self, runnable, rerun):
Once Python 3.10 is the minimum, this should probably be refactored into using
structural pattern matching.
"""
if is_workflow(runnable):
if is_workflow(runnable): # TODO: env to wf
# connect and calculate the checksum of the graph before running
runnable._connect_and_propagate_to_tasks(override_task_caches=True)
# 0
Expand All @@ -74,10 +76,11 @@ async def submit_from_call(self, runnable, rerun):
# 2
if runnable.state is None:
# run_el should always return a coroutine
await self.worker.run_el(runnable, rerun=rerun)
print("in SUBM", environment)
await self.worker.run_el(runnable, rerun=rerun, environment=environment)
# 3
else:
await self.expand_runnable(runnable, wait=True, rerun=rerun)
await self.expand_runnable(runnable, wait=True, rerun=rerun) # TODO
return True

async def expand_runnable(self, runnable, wait=False, rerun=False):
Expand Down
Loading
Loading