Skip to content

Commit

Permalink
Fix slurm and mpi tests
Browse files Browse the repository at this point in the history
  • Loading branch information
frthjf committed May 25, 2023
1 parent 492da00 commit 6df495a
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 50 deletions.
2 changes: 1 addition & 1 deletion docs/examples/slurm_and_mpi_execution/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ Covers:
- MPI
- Slurm
- Slurm+MPI
- on_write_metadata
- on_write_metadata
15 changes: 10 additions & 5 deletions docs/examples/slurm_and_mpi_execution/mpi.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
import stat
import subprocess

from machinable import Execution
Expand All @@ -9,17 +11,20 @@ class Config:
n: int = 1

def __call__(self):
for component in self.components:
for executable in self.pending_executables:
script_file = self.save_file(
f"mpi-{executable.id}.sh",
executable.dispatch_code(),
)
st = os.stat(script_file)
os.chmod(script_file, st.st_mode | stat.S_IEXEC)
print(
subprocess.check_output(
[
self.config.runner,
"-n",
str(self.config.n),
self.save_file(
f"mpi-{component.id}.sh",
component.dispatch_code(),
),
script_file,
]
).decode("ascii")
)
10 changes: 6 additions & 4 deletions docs/examples/slurm_and_mpi_execution/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
class Slurm(Execution):
def __call__(self):
runner = commandlib.Command("sbatch")
script = "#!/usr/bin/env bash"
for component in self.components:
script = "#!/usr/bin/env bash\n"
for component in self.pending_executables:
resources = component.resources()
if "--job-name" not in resources:
resources["--job-name"] = f"{component.id}"
Expand All @@ -32,10 +32,12 @@ def __call__(self):
job_id = int(output.rsplit(" ", maxsplit=1)[-1])
except ValueError:
job_id = False
print(f"{output} for component {component.id}")
print(
f"{output} for component {component.id} ({component.local_directory()})"
)

# save job information
self.save_data(
self.save_file(
filepath="slurm.json",
data={
"job_id": job_id,
Expand Down
23 changes: 13 additions & 10 deletions docs/examples/slurm_and_mpi_execution/test_mpi.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import shutil

import pytest
from machinable import Component, Execution, Project
from machinable import Component, Execution, Index, Project

try:
import mpi4py
except ImportError:
mpi4py = None


class ExternalComponent(Component):
def on_create(self):
class MpiExample(Component):
def __call__(self):
print("Hello from MPI script")
self.save_file("test.txt", "hello")

Expand All @@ -19,10 +19,13 @@ def on_create(self):
not shutil.which("mpirun") or mpi4py is None,
reason="Test requires MPI environment",
)
def test_mpi_execution(tmp_storage):
with Project.instance("docs/examples"):
component = ExternalComponent()
with Execution.get("execution.mpi"):
component.launch()
assert component.is_finished()
assert component.load_file("test.txt") == "hello"
def test_mpi_execution(tmp_path):
with Index(
{"directory": str(tmp_path), "database": str(tmp_path / "test.sqlite")}
):
with Project("docs/examples/slurm_and_mpi_execution"):
component = MpiExample()
with Execution.get("mpi"):
component.launch()
assert component.is_finished()
assert component.load_file("test.txt") == "hello"
42 changes: 27 additions & 15 deletions docs/examples/slurm_and_mpi_execution/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import os
import shutil
import time
from pathlib import Path

import pytest
from machinable import Component, Execution, Project
from machinable import Component, Execution, Index, Project


class SlurmComponent(Component):
Expand All @@ -18,20 +19,31 @@ def __call__(self):
or "MACHINABLE_SLURM_TEST_RESOURCES" not in os.environ,
reason="Test requires Slurm environment",
)
def test_slurm_execution(tmp_storage):
def test_slurm_execution(tmp_path):
component = SlurmComponent()
with Project.instance("docs/examples"), Execution.get(
"execution.slurm",
resources=json.loads(
os.environ.get("MACHINABLE_SLURM_TEST_RESOURCES", "{}")
),
directory = os.environ.get("MACHINABLE_SLURM_TEST_DIRECTORY", None)
if directory is not None:
tmp_path = Path(directory) / component.uuid
with Index(
{"directory": str(tmp_path), "database": str(tmp_path / "test.sqlite")}
):
component.launch()
for _ in range(30):
if component.is_finished():
assert "Hello world from Slurm" in component.output()
assert component.load_file("test_run.json")["success"] is True
return
with Project("docs/examples/slurm_and_mpi_execution"):
with Execution.get(
"slurm",
resources=json.loads(
os.environ.get("MACHINABLE_SLURM_TEST_RESOURCES", "{}")
),
):
component.launch()

time.sleep(1)
assert False, "Timeout"
for _ in range(60):
if component.is_finished():
assert "Hello world from Slurm" in component.output()
assert (
component.load_file("test_run.json")["success"] is True
)
return

time.sleep(1)
print(component.output())
assert False, f"Timeout for {component.local_directory()}"
13 changes: 9 additions & 4 deletions src/machinable/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,23 @@ def collect(cls, components) -> "ComponentCollection":
"""Returns a collection of components"""
return ComponentCollection(components)

@property
def resources(self) -> Optional[Dict]:
if self.execution is None:
return None
return self.execution.load_file(f"resources-{self.id}.json", None)

def dispatch(self) -> Self:
"""Dispatch the component lifecycle"""
writes_meta_data = (
self.on_write_meta_data() is not False and self.is_mounted()
)
try:
self.on_before_dispatch()

self.on_seeding()

# meta-data
writes_meta_data = (
self.on_write_meta_data() is not False and self.is_mounted()
)

if writes_meta_data:
self.update_status("started")
self.save_file(
Expand Down Expand Up @@ -139,6 +139,11 @@ def beat():
raise errors.ComponentException(
f"{self.__class__.__name__} dispatch failed"
) from _ex
finally:
if writes_meta_data:
# propagate changes
for storage in Storage.active():
storage.update(self)

@property
def host_info(self) -> Optional[Dict]:
Expand Down
2 changes: 1 addition & 1 deletion src/machinable/element.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def uuid(self) -> str:

@property
def id(self) -> str:
return uuid_to_id(self.uuid)
return self.uuid[:6]

def version(
self, version: VersionType = sentinel, overwrite: bool = False
Expand Down
10 changes: 4 additions & 6 deletions src/machinable/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,17 @@ def active(cls) -> List["Storage"]:

return [cls.instance()]

def commit(self, interface: "Interface") -> bool:
def commit(self, interface: "Interface") -> None:
directory = interface.local_directory()
if not os.path.exists(directory):
os.makedirs(directory)
interface.to_directory(directory)

def update(self, interface: "Interface") -> None:
pass

def contains(self, uuid: str) -> bool:
return False

def retrieve(self, uuid: str, local_directory: str) -> bool:
return False

def update_status(
self, uuid: str, local_directory: Optional[str] = None
) -> None:
pass
3 changes: 3 additions & 0 deletions tests/test_component.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import stat

import commandlib
import pytest
Expand Down Expand Up @@ -179,6 +180,8 @@ def test_component_export(tmp_storage):
Execution().add(component).commit()
script = component.dispatch_code(inline=True)
script_filepath = component.save_file("run.sh", script)
st = os.stat(script_filepath)
os.chmod(script_filepath, st.st_mode | stat.S_IEXEC)

print(commandlib.Command("bash")(script_filepath).output())
assert component.is_finished()
Expand Down
8 changes: 4 additions & 4 deletions tests/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,21 @@ def default_resources(self, component):
r = {"test": 1, "a": True}
with Execution(resources={}) as execution:
component = Component()
assert component.resources is None
assert component.resources() is None
execution.resources(r)
component.launch()
assert component.resources == r
assert component.resources() == r

with Execution(resources={}) as execution:
# component is already finished so updating resources has no effect
execution.resources({"a": 2})
component.launch()
assert component.resources["a"] is True
assert component.resources()["a"] is True

e2 = Component()
execution.resources({"a": 3})
e2.launch()
assert e2.resources["a"] == 3
assert e2.resources()["a"] == 3


def test_interrupted_execution(tmp_storage):
Expand Down

0 comments on commit 6df495a

Please sign in to comment.