Skip to content

Commit

Permalink
Merge branch 'main' into dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Apr 22, 2024
2 parents 033677b + 73b3178 commit c6094e6
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 4 deletions.
1 change: 1 addition & 0 deletions .github/workflows/unittest-flux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ jobs:
run: >
flux start
coverage run -a --omit="pympipool/_version.py,tests/*" -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py;
coverage xml
env:
OMPI_MCA_plm: 'isolated'
OMPI_MCA_rmaps_base_oversubscribe: 'yes'
Expand Down
11 changes: 7 additions & 4 deletions pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
from pympipool.shared.communication import interface_bootup
from pympipool.shared.thread import RaisingThread
from pympipool.shared.interface import BaseInterface
from pympipool.shared.inputcheck import (
check_resource_dict,
check_resource_dict_is_empty,
)


class ExecutorBase(FutureExecutor):
Expand Down Expand Up @@ -67,10 +71,8 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs):
Returns:
A Future representing the given call.
"""
if len(resource_dict) > 0:
raise ValueError(
"When block_allocation is enabled, the resource requirements have to be defined on the executor level."
)
check_resource_dict_is_empty(resource_dict=resource_dict)
check_resource_dict(function=fn)
f = Future()
self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
return f
Expand Down Expand Up @@ -172,6 +174,7 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs):
Returns:
A Future representing the given call.
"""
check_resource_dict(function=fn)
f = Future()
self._future_queue.put(
{
Expand Down
18 changes: 18 additions & 0 deletions pympipool/shared/inputcheck.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import inspect


def check_oversubscribe(oversubscribe):
if oversubscribe:
raise ValueError(
Expand Down Expand Up @@ -38,3 +41,18 @@ def check_executor(executor):
raise ValueError(
"The executor parameter is only supported for the flux framework backend."
)


def check_resource_dict(function):
if "resource_dict" in inspect.signature(function).parameters.keys():
raise ValueError(
"The parameter resource_dict is used internally in pympipool, "
"so it cannot be used as parameter in the submitted functions."
)


def check_resource_dict_is_empty(resource_dict):
if len(resource_dict) > 0:
raise ValueError(
"When block_allocation is enabled, the resource requirements have to be defined on the executor level."
)
14 changes: 14 additions & 0 deletions tests/test_executor_backend_mpi_noblock.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def mpi_funct(i):
return i, size, rank


def resource_dict(resource_dict):
return resource_dict


class TestExecutorBackend(unittest.TestCase):
def test_meta_executor_serial(self):
with Executor(
Expand Down Expand Up @@ -58,3 +62,13 @@ def test_errors(self):
hostname_localhost=True,
backend="mpi",
)
with self.assertRaises(ValueError):
with Executor(
max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=False
) as exe:
exe.submit(resource_dict, resource_dict={})
with self.assertRaises(ValueError):
with Executor(
max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=True
) as exe:
exe.submit(resource_dict, resource_dict={})

0 comments on commit c6094e6

Please sign in to comment.