diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index d85fb996..70b596d5 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -13,6 +13,10 @@ from pympipool.shared.communication import interface_bootup from pympipool.shared.thread import RaisingThread from pympipool.shared.interface import BaseInterface +from pympipool.shared.inputcheck import ( + check_resource_dict, + check_resource_dict_is_empty, +) class ExecutorBase(FutureExecutor): @@ -66,10 +70,8 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): Returns: A Future representing the given call. """ - if len(resource_dict) > 0: - raise ValueError( - "When block_allocation is enabled, the resource requirements have to be defined on the executor level." - ) + check_resource_dict_is_empty(resource_dict=resource_dict) + check_resource_dict(function=fn) f = Future() self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) return f @@ -171,6 +173,7 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): Returns: A Future representing the given call. """ + check_resource_dict(function=fn) f = Future() self._future_queue.put( { diff --git a/pympipool/shared/inputcheck.py b/pympipool/shared/inputcheck.py index 7cb97e75..e4a53870 100644 --- a/pympipool/shared/inputcheck.py +++ b/pympipool/shared/inputcheck.py @@ -1,3 +1,6 @@ +import inspect + + def check_oversubscribe(oversubscribe): if oversubscribe: raise ValueError( @@ -38,3 +41,18 @@ def check_executor(executor): raise ValueError( "The executor parameter is only supported for the flux framework backend." ) + + +def check_resource_dict(function): + if "resource_dict" in inspect.signature(function).parameters.keys(): + raise ValueError( + "The parameter resource_dict is used internally in pympipool, " + "so it cannot be used as parameter in the submitted functions." + ) + + +def check_resource_dict_is_empty(resource_dict): + if len(resource_dict) > 0: + raise ValueError( + "When block_allocation is enabled, the resource requirements have to be defined on the executor level." + ) diff --git a/tests/test_executor_backend_mpi_noblock.py b/tests/test_executor_backend_mpi_noblock.py index 0d040dd0..246d3983 100644 --- a/tests/test_executor_backend_mpi_noblock.py +++ b/tests/test_executor_backend_mpi_noblock.py @@ -16,6 +16,10 @@ def mpi_funct(i): return i, size, rank +def resource_dict(resource_dict): + return resource_dict + + class TestExecutorBackend(unittest.TestCase): def test_meta_executor_serial(self): with Executor( @@ -58,3 +62,13 @@ def test_errors(self): hostname_localhost=True, backend="mpi", ) + with self.assertRaises(ValueError): + with Executor( + max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=False + ) as exe: + exe.submit(resource_dict, resource_dict={}) + with self.assertRaises(ValueError): + with Executor( + max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=True + ) as exe: + exe.submit(resource_dict, resource_dict={})