From 780005c48c16344a17a088abbb0f1b2770db07dc Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 22 Apr 2024 08:51:53 -0500 Subject: [PATCH 1/3] Raise error when resource_dict is used in function parameters --- pympipool/shared/executorbase.py | 4 ++++ tests/test_executor_backend_mpi_noblock.py | 14 ++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index d85fb996..ffbcc1f6 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -70,6 +70,8 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): raise ValueError( "When block_allocation is enabled, the resource requirements have to be defined on the executor level." ) + if "resource_dict" in inspect.signature(fn).parameters.keys(): + raise ValueError("The parameter resource_dict is used internally in pympipool, so it cannot be used as parameter in the submitted functions.") f = Future() self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) return f @@ -171,6 +173,8 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): Returns: A Future representing the given call. """ + if "resource_dict" in inspect.signature(fn).parameters.keys(): + raise ValueError("The parameter resource_dict is used internally in pympipool, so it cannot be used as parameter in the submitted functions.") f = Future() self._future_queue.put( { diff --git a/tests/test_executor_backend_mpi_noblock.py b/tests/test_executor_backend_mpi_noblock.py index 0d040dd0..246d3983 100644 --- a/tests/test_executor_backend_mpi_noblock.py +++ b/tests/test_executor_backend_mpi_noblock.py @@ -16,6 +16,10 @@ def mpi_funct(i): return i, size, rank +def resource_dict(resource_dict): + return resource_dict + + class TestExecutorBackend(unittest.TestCase): def test_meta_executor_serial(self): with Executor( @@ -58,3 +62,13 @@ def test_errors(self): hostname_localhost=True, backend="mpi", ) + with self.assertRaises(ValueError): + with Executor( + max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=False + ) as exe: + exe.submit(resource_dict, resource_dict={}) + with self.assertRaises(ValueError): + with Executor( + max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=True + ) as exe: + exe.submit(resource_dict, resource_dict={}) From 60ff224de0f108f830bf6166dc3a7d87caac846a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 22 Apr 2024 08:52:47 -0500 Subject: [PATCH 2/3] black formatting --- pympipool/shared/executorbase.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index ffbcc1f6..cae37416 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -71,7 +71,9 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): "When block_allocation is enabled, the resource requirements have to be defined on the executor level." ) if "resource_dict" in inspect.signature(fn).parameters.keys(): - raise ValueError("The parameter resource_dict is used internally in pympipool, so it cannot be used as parameter in the submitted functions.") + raise ValueError( + "The parameter resource_dict is used internally in pympipool, so it cannot be used as parameter in the submitted functions." + ) f = Future() self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) return f @@ -174,7 +176,9 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): A Future representing the given call. """ if "resource_dict" in inspect.signature(fn).parameters.keys(): - raise ValueError("The parameter resource_dict is used internally in pympipool, so it cannot be used as parameter in the submitted functions.") + raise ValueError( + "The parameter resource_dict is used internally in pympipool, so it cannot be used as parameter in the submitted functions." + ) f = Future() self._future_queue.put( { From f9b65ce398b2d17a62129102c4491860150f87b9 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 22 Apr 2024 09:10:31 -0500 Subject: [PATCH 3/3] Store functions in central place --- pympipool/shared/executorbase.py | 19 +++++++------------ pympipool/shared/inputcheck.py | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index cae37416..70b596d5 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -13,6 +13,10 @@ from pympipool.shared.communication import interface_bootup from pympipool.shared.thread import RaisingThread from pympipool.shared.interface import BaseInterface +from pympipool.shared.inputcheck import ( + check_resource_dict, + check_resource_dict_is_empty, +) class ExecutorBase(FutureExecutor): @@ -66,14 +70,8 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): Returns: A Future representing the given call. """ - if len(resource_dict) > 0: - raise ValueError( - "When block_allocation is enabled, the resource requirements have to be defined on the executor level." - ) - if "resource_dict" in inspect.signature(fn).parameters.keys(): - raise ValueError( - "The parameter resource_dict is used internally in pympipool, so it cannot be used as parameter in the submitted functions." - ) + check_resource_dict_is_empty(resource_dict=resource_dict) + check_resource_dict(function=fn) f = Future() self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) return f @@ -175,10 +173,7 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): Returns: A Future representing the given call. """ - if "resource_dict" in inspect.signature(fn).parameters.keys(): - raise ValueError( - "The parameter resource_dict is used internally in pympipool, so it cannot be used as parameter in the submitted functions." - ) + check_resource_dict(function=fn) f = Future() self._future_queue.put( { diff --git a/pympipool/shared/inputcheck.py b/pympipool/shared/inputcheck.py index 7cb97e75..e4a53870 100644 --- a/pympipool/shared/inputcheck.py +++ b/pympipool/shared/inputcheck.py @@ -1,3 +1,6 @@ +import inspect + + def check_oversubscribe(oversubscribe): if oversubscribe: raise ValueError( @@ -38,3 +41,18 @@ def check_executor(executor): raise ValueError( "The executor parameter is only supported for the flux framework backend." ) + + +def check_resource_dict(function): + if "resource_dict" in inspect.signature(function).parameters.keys(): + raise ValueError( + "The parameter resource_dict is used internally in pympipool, " + "so it cannot be used as parameter in the submitted functions." + ) + + +def check_resource_dict_is_empty(resource_dict): + if len(resource_dict) > 0: + raise ValueError( + "When block_allocation is enabled, the resource requirements have to be defined on the executor level." + )