From 1d36d1719fa59cc0f46c02dc4673cf1640176335 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 15 Feb 2025 15:45:47 +0100 Subject: [PATCH 01/12] [feature] resize block allocation executor --- executorlib/interactive/blockallocation.py | 33 ++++++++++++++++++++-- executorlib/interactive/shared.py | 17 +++++++---- tests/test_local_executor_resize.py | 22 +++++++++++++++ 3 files changed, 65 insertions(+), 7 deletions(-) create mode 100644 tests/test_local_executor_resize.py diff --git a/executorlib/interactive/blockallocation.py b/executorlib/interactive/blockallocation.py index fb1e79e4..666d8b7c 100644 --- a/executorlib/interactive/blockallocation.py +++ b/executorlib/interactive/blockallocation.py @@ -27,7 +27,7 @@ class BlockAllocationExecutor(ExecutorBase): Examples: >>> import numpy as np - >>> from executorlib.interactive.shared import BlockAllocationExecutor + >>> from executorlib.interactive.blockallocation import BlockAllocationExecutor >>> >>> def calc(i, j, k): >>> from mpi4py import MPI @@ -58,16 +58,45 @@ def __init__( executor_kwargs["spawner"] = spawner executor_kwargs["queue_join_on_shutdown"] = False self._process_kwargs = executor_kwargs + self._max_workers = max_workers self._set_process( process=[ Thread( target=execute_tasks, kwargs=executor_kwargs, ) - for _ in range(max_workers) + for _ in range(self._max_workers) ], ) + @property + def max_workers(self) -> int: + return self._max_workers + + @max_workers.setter + def max_workers(self, max_workers: int): + if self._max_workers > max_workers: + for _ in range(self._max_workers - max_workers): + self._future_queue.queue.insert(0, {"shutdown": True, "wait": True}) + while len(self._process) > max_workers: + self._process = [ + process + for process in self._process + if process.is_alive() + ] + elif self._max_workers < max_workers: + new_process_lst = [ + Thread( + target=execute_tasks, + kwargs=self._process_kwargs, + ) + for _ in range(max_workers - self._max_workers) + ] + for process_instance in new_process_lst: + process_instance.start() + self._process += new_process_lst + self._max_workers = max_workers + def submit( # type: ignore self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs ) -> Future: diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 8c134bba..97ceea8e 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -57,7 +57,7 @@ def execute_tasks( task_dict = future_queue.get() if "shutdown" in task_dict and task_dict["shutdown"]: interface.shutdown(wait=task_dict["wait"]) - future_queue.task_done() + _task_done(future_queue=future_queue) if queue_join_on_shutdown: future_queue.join() break @@ -117,10 +117,10 @@ def _execute_task_without_cache( f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) except Exception as thread_exception: interface.shutdown(wait=True) - future_queue.task_done() + _task_done(future_queue=future_queue) f.set_exception(exception=thread_exception) else: - future_queue.task_done() + _task_done(future_queue=future_queue) def _execute_task_with_cache( @@ -161,13 +161,20 @@ def _execute_task_with_cache( f.set_result(result) except Exception as thread_exception: interface.shutdown(wait=True) - future_queue.task_done() + _task_done(future_queue=future_queue) f.set_exception(exception=thread_exception) raise thread_exception else: - future_queue.task_done() + _task_done(future_queue=future_queue) else: _, result = get_output(file_name=file_name) future = task_dict["future"] future.set_result(result) + _task_done(future_queue=future_queue) + + +def _task_done(future_queue: queue.Queue): + try: future_queue.task_done() + except ValueError: + pass \ No newline at end of file diff --git a/tests/test_local_executor_resize.py b/tests/test_local_executor_resize.py new file mode 100644 index 00000000..8c3e32f0 --- /dev/null +++ b/tests/test_local_executor_resize.py @@ -0,0 +1,22 @@ +import unittest +from executorlib import SingleNodeExecutor + + +def sleep_funct(sec): + from time import sleep + sleep(sec) + return sec + + +class TestResizing(unittest.TestCase): + def test_without_dependencies(self): + with SingleNodeExecutor(max_workers=2, block_allocation=True, disable_dependencies=True) as exe: + future_lst = [exe.submit(sleep_funct, 0.1) for _ in range(4)] + self.assertEqual([f.done() for f in future_lst], [False, False, False, False]) + self.assertEqual(len(exe), 4) + sleep_funct(sec=0.01) + exe.max_workers = 1 + self.assertEqual(len(exe), 1) + self.assertEqual(len(exe._process), 1) + self.assertEqual([f.done() for f in future_lst], [True, True, False, False]) + self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1]) From 5899fc848e19a965a453f540ad41a625de94b9cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 15 Feb 2025 16:26:50 +0100 Subject: [PATCH 02/12] extend test --- tests/test_local_executor_resize.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/tests/test_local_executor_resize.py b/tests/test_local_executor_resize.py index 8c3e32f0..7e0bfcc9 100644 --- a/tests/test_local_executor_resize.py +++ b/tests/test_local_executor_resize.py @@ -9,14 +9,28 @@ def sleep_funct(sec): class TestResizing(unittest.TestCase): - def test_without_dependencies(self): + def test_without_dependencies_decrease(self): with SingleNodeExecutor(max_workers=2, block_allocation=True, disable_dependencies=True) as exe: - future_lst = [exe.submit(sleep_funct, 0.1) for _ in range(4)] + future_lst = [exe.submit(sleep_funct, 1) for _ in range(4)] self.assertEqual([f.done() for f in future_lst], [False, False, False, False]) self.assertEqual(len(exe), 4) - sleep_funct(sec=0.01) + sleep_funct(sec=0.5) exe.max_workers = 1 self.assertEqual(len(exe), 1) self.assertEqual(len(exe._process), 1) self.assertEqual([f.done() for f in future_lst], [True, True, False, False]) + self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1]) + self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) + + def test_without_dependencies_increase(self): + with SingleNodeExecutor(max_workers=1, block_allocation=True, disable_dependencies=True) as exe: + future_lst = [exe.submit(sleep_funct, 0.1) for _ in range(4)] + self.assertEqual([f.done() for f in future_lst], [False, False, False, False]) + self.assertEqual(len(exe), 4) + future_lst[0].result() + exe.max_workers = 2 + self.assertEqual(len(exe), 2) + self.assertEqual(len(exe._process), 2) + self.assertEqual([f.done() for f in future_lst], [True, False, False, False]) self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1]) + self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) \ No newline at end of file From 003b2c4d6426d53d62444129a1ad66fa3e5fe0ef Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 15 Feb 2025 15:27:36 +0000 Subject: [PATCH 03/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/interactive/blockallocation.py | 4 +--- executorlib/interactive/shared.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/executorlib/interactive/blockallocation.py b/executorlib/interactive/blockallocation.py index 666d8b7c..8c2af4e5 100644 --- a/executorlib/interactive/blockallocation.py +++ b/executorlib/interactive/blockallocation.py @@ -80,9 +80,7 @@ def max_workers(self, max_workers: int): self._future_queue.queue.insert(0, {"shutdown": True, "wait": True}) while len(self._process) > max_workers: self._process = [ - process - for process in self._process - if process.is_alive() + process for process in self._process if process.is_alive() ] elif self._max_workers < max_workers: new_process_lst = [ diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 97ceea8e..85d37bdc 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -177,4 +177,4 @@ def _task_done(future_queue: queue.Queue): try: future_queue.task_done() except ValueError: - pass \ No newline at end of file + pass From 147d56ee2cf97e6b0074cfac77e3df49f2dec4a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 15 Feb 2025 16:33:32 +0100 Subject: [PATCH 04/12] fixes --- executorlib/interactive/blockallocation.py | 16 +++++++++------- executorlib/interactive/shared.py | 5 ++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/executorlib/interactive/blockallocation.py b/executorlib/interactive/blockallocation.py index 666d8b7c..4a6be7a2 100644 --- a/executorlib/interactive/blockallocation.py +++ b/executorlib/interactive/blockallocation.py @@ -78,12 +78,13 @@ def max_workers(self, max_workers: int): if self._max_workers > max_workers: for _ in range(self._max_workers - max_workers): self._future_queue.queue.insert(0, {"shutdown": True, "wait": True}) - while len(self._process) > max_workers: - self._process = [ - process - for process in self._process - if process.is_alive() - ] + if isinstance(self._process, list): + while len(self._process) > max_workers: + self._process = [ + process + for process in self._process + if process.is_alive() + ] elif self._max_workers < max_workers: new_process_lst = [ Thread( @@ -94,7 +95,8 @@ def max_workers(self, max_workers: int): ] for process_instance in new_process_lst: process_instance.start() - self._process += new_process_lst + if isinstance(self._process, list): + self._process += new_process_lst self._max_workers = max_workers def submit( # type: ignore diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 97ceea8e..1c39aaad 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -1,3 +1,4 @@ +import contextlib import importlib.util import os import queue @@ -174,7 +175,5 @@ def _execute_task_with_cache( def _task_done(future_queue: queue.Queue): - try: + with contextlib.suppress(ValueError): future_queue.task_done() - except ValueError: - pass \ No newline at end of file From 529f9c0decbe217615d70a48a84ccb16a043cf5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 15 Feb 2025 17:09:54 +0100 Subject: [PATCH 05/12] fix type check --- executorlib/interactive/blockallocation.py | 32 +++++++++++----------- tests/test_local_executor_resize.py | 3 ++ 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/executorlib/interactive/blockallocation.py b/executorlib/interactive/blockallocation.py index 30f7935d..dbddb113 100644 --- a/executorlib/interactive/blockallocation.py +++ b/executorlib/interactive/blockallocation.py @@ -1,6 +1,7 @@ from concurrent.futures import Future from threading import Thread from typing import Callable, Optional +import queue from executorlib.base.executor import ExecutorBase, cancel_items_in_queue from executorlib.interactive.shared import execute_tasks @@ -75,27 +76,26 @@ def max_workers(self) -> int: @max_workers.setter def max_workers(self, max_workers: int): - if self._max_workers > max_workers: - for _ in range(self._max_workers - max_workers): - self._future_queue.queue.insert(0, {"shutdown": True, "wait": True}) - if isinstance(self._process, list): + if isinstance(self._future_queue, queue.Queue) and isinstance(self._process, list): + if self._max_workers > max_workers: + for _ in range(self._max_workers - max_workers): + self._future_queue.queue.insert(0, {"shutdown": True, "wait": True}) while len(self._process) > max_workers: self._process = [ process for process in self._process if process.is_alive() ] - elif self._max_workers < max_workers: - new_process_lst = [ - Thread( - target=execute_tasks, - kwargs=self._process_kwargs, - ) - for _ in range(max_workers - self._max_workers) - ] - for process_instance in new_process_lst: - process_instance.start() - if isinstance(self._process, list): + elif self._max_workers < max_workers: + new_process_lst = [ + Thread( + target=execute_tasks, + kwargs=self._process_kwargs, + ) + for _ in range(max_workers - self._max_workers) + ] + for process_instance in new_process_lst: + process_instance.start() self._process += new_process_lst - self._max_workers = max_workers + self._max_workers = max_workers def submit( # type: ignore self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs diff --git a/tests/test_local_executor_resize.py b/tests/test_local_executor_resize.py index 7e0bfcc9..c88058b5 100644 --- a/tests/test_local_executor_resize.py +++ b/tests/test_local_executor_resize.py @@ -1,5 +1,6 @@ import unittest from executorlib import SingleNodeExecutor +from executorlib.standalone.serialize import cloudpickle_register def sleep_funct(sec): @@ -10,6 +11,7 @@ def sleep_funct(sec): class TestResizing(unittest.TestCase): def test_without_dependencies_decrease(self): + cloudpickle_register(ind=1) with SingleNodeExecutor(max_workers=2, block_allocation=True, disable_dependencies=True) as exe: future_lst = [exe.submit(sleep_funct, 1) for _ in range(4)] self.assertEqual([f.done() for f in future_lst], [False, False, False, False]) @@ -23,6 +25,7 @@ def test_without_dependencies_decrease(self): self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) def test_without_dependencies_increase(self): + cloudpickle_register(ind=1) with SingleNodeExecutor(max_workers=1, block_allocation=True, disable_dependencies=True) as exe: future_lst = [exe.submit(sleep_funct, 0.1) for _ in range(4)] self.assertEqual([f.done() for f in future_lst], [False, False, False, False]) From 2b81d0bb5dc049e47d8aa3e5c3f95896428fbbc5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 15 Feb 2025 16:10:03 +0000 Subject: [PATCH 06/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/interactive/blockallocation.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/executorlib/interactive/blockallocation.py b/executorlib/interactive/blockallocation.py index dbddb113..9e6500b7 100644 --- a/executorlib/interactive/blockallocation.py +++ b/executorlib/interactive/blockallocation.py @@ -1,7 +1,7 @@ +import queue from concurrent.futures import Future from threading import Thread from typing import Callable, Optional -import queue from executorlib.base.executor import ExecutorBase, cancel_items_in_queue from executorlib.interactive.shared import execute_tasks @@ -76,7 +76,9 @@ def max_workers(self) -> int: @max_workers.setter def max_workers(self, max_workers: int): - if isinstance(self._future_queue, queue.Queue) and isinstance(self._process, list): + if isinstance(self._future_queue, queue.Queue) and isinstance( + self._process, list + ): if self._max_workers > max_workers: for _ in range(self._max_workers - max_workers): self._future_queue.queue.insert(0, {"shutdown": True, "wait": True}) From 158eb9301819d9eee218fd703565b5585afebb6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 15 Feb 2025 17:26:42 +0100 Subject: [PATCH 07/12] fixes --- tests/test_local_executor_resize.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_local_executor_resize.py b/tests/test_local_executor_resize.py index c88058b5..63aea566 100644 --- a/tests/test_local_executor_resize.py +++ b/tests/test_local_executor_resize.py @@ -18,7 +18,7 @@ def test_without_dependencies_decrease(self): self.assertEqual(len(exe), 4) sleep_funct(sec=0.5) exe.max_workers = 1 - self.assertEqual(len(exe), 1) + self.assertTrue(len(exe) >= 1) self.assertEqual(len(exe._process), 1) self.assertEqual([f.done() for f in future_lst], [True, True, False, False]) self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1]) From 705db80c8982f9cd2f96dd4fbb71c497212c0bf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 15 Feb 2025 17:33:36 +0100 Subject: [PATCH 08/12] test property --- tests/test_local_executor_resize.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_local_executor_resize.py b/tests/test_local_executor_resize.py index 63aea566..8960690c 100644 --- a/tests/test_local_executor_resize.py +++ b/tests/test_local_executor_resize.py @@ -30,8 +30,10 @@ def test_without_dependencies_increase(self): future_lst = [exe.submit(sleep_funct, 0.1) for _ in range(4)] self.assertEqual([f.done() for f in future_lst], [False, False, False, False]) self.assertEqual(len(exe), 4) + self.assertEqual(exe.max_workers, 1) future_lst[0].result() exe.max_workers = 2 + self.assertEqual(exe.max_workers, 2) self.assertEqual(len(exe), 2) self.assertEqual(len(exe._process), 2) self.assertEqual([f.done() for f in future_lst], [True, False, False, False]) From e9c856da33bde0743d33da1c87af78db0fc76084 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 15 Feb 2025 17:57:25 +0100 Subject: [PATCH 09/12] test with dependencies --- executorlib/base/executor.py | 11 +++++++++ executorlib/interactive/dependency.py | 34 ++++++++++++++++++++++++-- tests/test_local_executor_resize.py | 35 +++++++++++++++++++++++++-- 3 files changed, 76 insertions(+), 4 deletions(-) diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index 4791fe9a..b10aae27 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -32,6 +32,17 @@ def __init__(self, max_cores: Optional[int] = None): self._future_queue: Optional[queue.Queue] = queue.Queue() self._process: Optional[Union[Thread, list[Thread]]] = None + @property + def max_workers(self) -> Optional[int]: + if isinstance(self._process, list): + return len(self._process) + else: + return self._process_kwargs.get("max_workers") + + @max_workers.setter + def max_workers(self, max_workers: int): + raise NotImplementedError("The max_workers setter is not implemented.") + @property def info(self) -> Optional[dict]: """ diff --git a/executorlib/interactive/dependency.py b/executorlib/interactive/dependency.py index 95c7ca98..19a79054 100644 --- a/executorlib/interactive/dependency.py +++ b/executorlib/interactive/dependency.py @@ -76,12 +76,33 @@ def info(self) -> Optional[dict]: if isinstance(self._future_queue, queue.Queue): f: Future = Future() self._future_queue.queue.insert( - 0, {"internal": True, "task": "info", "future": f} + 0, {"internal": True, "task": "get_info", "future": f} ) return f.result() else: return None + @property + def max_workers(self) -> Optional[int]: + if isinstance(self._future_queue, queue.Queue): + f: Future = Future() + self._future_queue.queue.insert( + 0, {"internal": True, "task": "get_max_workers", "future": f} + ) + return f.result() + else: + return None + + @max_workers.setter + def max_workers(self, max_workers: int): + if isinstance(self._future_queue, queue.Queue): + f: Future = Future() + self._future_queue.queue.insert( + 0, {"internal": True, "task": "set_max_workers", "max_workers": max_workers, "future": f} + ) + if not f.result(): + raise NotImplementedError("The max_workers setter is not implemented.") + def submit( # type: ignore self, fn: Callable[..., Any], @@ -188,8 +209,17 @@ def _execute_tasks_with_dependencies( if ( # shutdown the executor task_dict is not None and "internal" in task_dict and task_dict["internal"] ): - if task_dict["task"] == "info": + if task_dict["task"] == "get_info": task_dict["future"].set_result(executor.info) + elif task_dict["task"] == "get_max_workers": + task_dict["future"].set_result(executor.max_workers) + elif task_dict["task"] == "set_max_workers": + try: + executor.max_workers = task_dict["max_workers"] + except NotImplementedError: + task_dict["future"].set_result(False) + else: + task_dict["future"].set_result(True) elif ( # handle function submitted to the executor task_dict is not None and "fn" in task_dict and "future" in task_dict ): diff --git a/tests/test_local_executor_resize.py b/tests/test_local_executor_resize.py index 8960690c..c62aa7b3 100644 --- a/tests/test_local_executor_resize.py +++ b/tests/test_local_executor_resize.py @@ -34,8 +34,39 @@ def test_without_dependencies_increase(self): future_lst[0].result() exe.max_workers = 2 self.assertEqual(exe.max_workers, 2) - self.assertEqual(len(exe), 2) + self.assertTrue(len(exe) >= 1) self.assertEqual(len(exe._process), 2) self.assertEqual([f.done() for f in future_lst], [True, False, False, False]) self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1]) - self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) \ No newline at end of file + self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) + + def test_with_dependencies_decrease(self): + cloudpickle_register(ind=1) + with SingleNodeExecutor(max_workers=2, block_allocation=True, disable_dependencies=False) as exe: + future_lst = [exe.submit(sleep_funct, 1) for _ in range(4)] + self.assertEqual([f.done() for f in future_lst], [False, False, False, False]) + self.assertEqual(len(exe), 4) + sleep_funct(sec=0.5) + exe.max_workers = 1 + self.assertEqual([f.done() for f in future_lst], [True, True, False, False]) + self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1]) + self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) + + def test_with_dependencies_increase(self): + cloudpickle_register(ind=1) + with SingleNodeExecutor(max_workers=1, block_allocation=True, disable_dependencies=False) as exe: + future_lst = [exe.submit(sleep_funct, 0.1) for _ in range(4)] + self.assertEqual([f.done() for f in future_lst], [False, False, False, False]) + self.assertEqual(len(exe), 4) + self.assertEqual(exe.max_workers, 1) + future_lst[0].result() + exe.max_workers = 2 + self.assertEqual(exe.max_workers, 2) + self.assertEqual([f.done() for f in future_lst], [True, False, False, False]) + self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1]) + self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) + + def test_no_block_allocation(self): + with self.assertRaises(NotImplementedError): + with SingleNodeExecutor(block_allocation=False) as exe: + exe.max_workers = 2 \ No newline at end of file From f384d76626caa40524b4ef4fe8324a43c1b8ffcd Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 15 Feb 2025 16:57:36 +0000 Subject: [PATCH 10/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/interactive/dependency.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/executorlib/interactive/dependency.py b/executorlib/interactive/dependency.py index 19a79054..fe821bab 100644 --- a/executorlib/interactive/dependency.py +++ b/executorlib/interactive/dependency.py @@ -98,7 +98,13 @@ def max_workers(self, max_workers: int): if isinstance(self._future_queue, queue.Queue): f: Future = Future() self._future_queue.queue.insert( - 0, {"internal": True, "task": "set_max_workers", "max_workers": max_workers, "future": f} + 0, + { + "internal": True, + "task": "set_max_workers", + "max_workers": max_workers, + "future": f, + }, ) if not f.result(): raise NotImplementedError("The max_workers setter is not implemented.") From 6022dec24b8b2b6b9fb8976b0f4b986e6b5c2e32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 15 Feb 2025 18:09:45 +0100 Subject: [PATCH 11/12] fixes and increased test coverage --- tests/test_local_executor.py | 16 ++++++++++++++++ tests/test_local_executor_resize.py | 16 ++++++++++++---- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index a52a1327..4d36fb86 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -77,6 +77,14 @@ def test_pympiexecutor_two_workers(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) + def test_max_workers(self): + with BlockAllocationExecutor( + max_workers=2, + executor_kwargs={}, + spawner=MpiExecSpawner, + ) as exe: + self.assertEqual(exe.max_workers, 2) + def test_pympiexecutor_one_worker(self): with BlockAllocationExecutor( max_workers=1, @@ -107,6 +115,14 @@ def test_pympiexecutor_two_workers(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) + def test_max_workers(self): + with OneTaskPerProcessExecutor( + max_workers=2, + executor_kwargs={}, + spawner=MpiExecSpawner, + ) as exe: + self.assertEqual(exe.max_workers, 2) + def test_pympiexecutor_one_worker(self): with OneTaskPerProcessExecutor( max_cores=1, diff --git a/tests/test_local_executor_resize.py b/tests/test_local_executor_resize.py index c62aa7b3..a33b3293 100644 --- a/tests/test_local_executor_resize.py +++ b/tests/test_local_executor_resize.py @@ -20,7 +20,7 @@ def test_without_dependencies_decrease(self): exe.max_workers = 1 self.assertTrue(len(exe) >= 1) self.assertEqual(len(exe._process), 1) - self.assertEqual([f.done() for f in future_lst], [True, True, False, False]) + self.assertTrue(1 <= sum([f.done() for f in future_lst]) < 3) self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1]) self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) @@ -48,7 +48,7 @@ def test_with_dependencies_decrease(self): self.assertEqual(len(exe), 4) sleep_funct(sec=0.5) exe.max_workers = 1 - self.assertEqual([f.done() for f in future_lst], [True, True, False, False]) + self.assertTrue(1 <= sum([f.done() for f in future_lst]) < 3) self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1]) self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) @@ -68,5 +68,13 @@ def test_with_dependencies_increase(self): def test_no_block_allocation(self): with self.assertRaises(NotImplementedError): - with SingleNodeExecutor(block_allocation=False) as exe: - exe.max_workers = 2 \ No newline at end of file + with SingleNodeExecutor(block_allocation=False, disable_dependencies=False) as exe: + exe.max_workers = 2 + with self.assertRaises(NotImplementedError): + with SingleNodeExecutor(block_allocation=False, disable_dependencies=True) as exe: + exe.max_workers = 2 + + def test_max_workers_stopped_executor(self): + exe = SingleNodeExecutor(block_allocation=True) + exe.shutdown(wait=True) + self.assertIsNone(exe.max_workers) From 35f97d9751c61af8137134bcb3d6297c99391232 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 15 Feb 2025 18:43:56 +0100 Subject: [PATCH 12/12] fix base property --- executorlib/base/executor.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index b10aae27..ce30c30a 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -34,10 +34,7 @@ def __init__(self, max_cores: Optional[int] = None): @property def max_workers(self) -> Optional[int]: - if isinstance(self._process, list): - return len(self._process) - else: - return self._process_kwargs.get("max_workers") + return self._process_kwargs.get("max_workers") @max_workers.setter def max_workers(self, max_workers: int):