diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index 4791fe9a..ce30c30a 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -32,6 +32,14 @@ def __init__(self, max_cores: Optional[int] = None): self._future_queue: Optional[queue.Queue] = queue.Queue() self._process: Optional[Union[Thread, list[Thread]]] = None + @property + def max_workers(self) -> Optional[int]: + return self._process_kwargs.get("max_workers") + + @max_workers.setter + def max_workers(self, max_workers: int): + raise NotImplementedError("The max_workers setter is not implemented.") + @property def info(self) -> Optional[dict]: """ diff --git a/executorlib/interactive/blockallocation.py b/executorlib/interactive/blockallocation.py index fb1e79e4..9e6500b7 100644 --- a/executorlib/interactive/blockallocation.py +++ b/executorlib/interactive/blockallocation.py @@ -1,3 +1,4 @@ +import queue from concurrent.futures import Future from threading import Thread from typing import Callable, Optional @@ -27,7 +28,7 @@ class BlockAllocationExecutor(ExecutorBase): Examples: >>> import numpy as np - >>> from executorlib.interactive.shared import BlockAllocationExecutor + >>> from executorlib.interactive.blockallocation import BlockAllocationExecutor >>> >>> def calc(i, j, k): >>> from mpi4py import MPI @@ -58,16 +59,46 @@ def __init__( executor_kwargs["spawner"] = spawner executor_kwargs["queue_join_on_shutdown"] = False self._process_kwargs = executor_kwargs + self._max_workers = max_workers self._set_process( process=[ Thread( target=execute_tasks, kwargs=executor_kwargs, ) - for _ in range(max_workers) + for _ in range(self._max_workers) ], ) + @property + def max_workers(self) -> int: + return self._max_workers + + @max_workers.setter + def max_workers(self, max_workers: int): + if isinstance(self._future_queue, queue.Queue) and isinstance( + self._process, list + ): + if self._max_workers > max_workers: + for _ in range(self._max_workers - max_workers): + self._future_queue.queue.insert(0, {"shutdown": True, "wait": True}) + while len(self._process) > max_workers: + self._process = [ + process for process in self._process if process.is_alive() + ] + elif self._max_workers < max_workers: + new_process_lst = [ + Thread( + target=execute_tasks, + kwargs=self._process_kwargs, + ) + for _ in range(max_workers - self._max_workers) + ] + for process_instance in new_process_lst: + process_instance.start() + self._process += new_process_lst + self._max_workers = max_workers + def submit( # type: ignore self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs ) -> Future: diff --git a/executorlib/interactive/dependency.py b/executorlib/interactive/dependency.py index 95c7ca98..fe821bab 100644 --- a/executorlib/interactive/dependency.py +++ b/executorlib/interactive/dependency.py @@ -76,12 +76,39 @@ def info(self) -> Optional[dict]: if isinstance(self._future_queue, queue.Queue): f: Future = Future() self._future_queue.queue.insert( - 0, {"internal": True, "task": "info", "future": f} + 0, {"internal": True, "task": "get_info", "future": f} ) return f.result() else: return None + @property + def max_workers(self) -> Optional[int]: + if isinstance(self._future_queue, queue.Queue): + f: Future = Future() + self._future_queue.queue.insert( + 0, {"internal": True, "task": "get_max_workers", "future": f} + ) + return f.result() + else: + return None + + @max_workers.setter + def max_workers(self, max_workers: int): + if isinstance(self._future_queue, queue.Queue): + f: Future = Future() + self._future_queue.queue.insert( + 0, + { + "internal": True, + "task": "set_max_workers", + "max_workers": max_workers, + "future": f, + }, + ) + if not f.result(): + raise NotImplementedError("The max_workers setter is not implemented.") + def submit( # type: ignore self, fn: Callable[..., Any], @@ -188,8 +215,17 @@ def _execute_tasks_with_dependencies( if ( # shutdown the executor task_dict is not None and "internal" in task_dict and task_dict["internal"] ): - if task_dict["task"] == "info": + if task_dict["task"] == "get_info": task_dict["future"].set_result(executor.info) + elif task_dict["task"] == "get_max_workers": + task_dict["future"].set_result(executor.max_workers) + elif task_dict["task"] == "set_max_workers": + try: + executor.max_workers = task_dict["max_workers"] + except NotImplementedError: + task_dict["future"].set_result(False) + else: + task_dict["future"].set_result(True) elif ( # handle function submitted to the executor task_dict is not None and "fn" in task_dict and "future" in task_dict ): diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 8c134bba..1c39aaad 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -1,3 +1,4 @@ +import contextlib import importlib.util import os import queue @@ -57,7 +58,7 @@ def execute_tasks( task_dict = future_queue.get() if "shutdown" in task_dict and task_dict["shutdown"]: interface.shutdown(wait=task_dict["wait"]) - future_queue.task_done() + _task_done(future_queue=future_queue) if queue_join_on_shutdown: future_queue.join() break @@ -117,10 +118,10 @@ def _execute_task_without_cache( f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) except Exception as thread_exception: interface.shutdown(wait=True) - future_queue.task_done() + _task_done(future_queue=future_queue) f.set_exception(exception=thread_exception) else: - future_queue.task_done() + _task_done(future_queue=future_queue) def _execute_task_with_cache( @@ -161,13 +162,18 @@ def _execute_task_with_cache( f.set_result(result) except Exception as thread_exception: interface.shutdown(wait=True) - future_queue.task_done() + _task_done(future_queue=future_queue) f.set_exception(exception=thread_exception) raise thread_exception else: - future_queue.task_done() + _task_done(future_queue=future_queue) else: _, result = get_output(file_name=file_name) future = task_dict["future"] future.set_result(result) + _task_done(future_queue=future_queue) + + +def _task_done(future_queue: queue.Queue): + with contextlib.suppress(ValueError): future_queue.task_done() diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index a52a1327..4d36fb86 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -77,6 +77,14 @@ def test_pympiexecutor_two_workers(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) + def test_max_workers(self): + with BlockAllocationExecutor( + max_workers=2, + executor_kwargs={}, + spawner=MpiExecSpawner, + ) as exe: + self.assertEqual(exe.max_workers, 2) + def test_pympiexecutor_one_worker(self): with BlockAllocationExecutor( max_workers=1, @@ -107,6 +115,14 @@ def test_pympiexecutor_two_workers(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) + def test_max_workers(self): + with OneTaskPerProcessExecutor( + max_workers=2, + executor_kwargs={}, + spawner=MpiExecSpawner, + ) as exe: + self.assertEqual(exe.max_workers, 2) + def test_pympiexecutor_one_worker(self): with OneTaskPerProcessExecutor( max_cores=1, diff --git a/tests/test_local_executor_resize.py b/tests/test_local_executor_resize.py new file mode 100644 index 00000000..a33b3293 --- /dev/null +++ b/tests/test_local_executor_resize.py @@ -0,0 +1,80 @@ +import unittest +from executorlib import SingleNodeExecutor +from executorlib.standalone.serialize import cloudpickle_register + + +def sleep_funct(sec): + from time import sleep + sleep(sec) + return sec + + +class TestResizing(unittest.TestCase): + def test_without_dependencies_decrease(self): + cloudpickle_register(ind=1) + with SingleNodeExecutor(max_workers=2, block_allocation=True, disable_dependencies=True) as exe: + future_lst = [exe.submit(sleep_funct, 1) for _ in range(4)] + self.assertEqual([f.done() for f in future_lst], [False, False, False, False]) + self.assertEqual(len(exe), 4) + sleep_funct(sec=0.5) + exe.max_workers = 1 + self.assertTrue(len(exe) >= 1) + self.assertEqual(len(exe._process), 1) + self.assertTrue(1 <= sum([f.done() for f in future_lst]) < 3) + self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1]) + self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) + + def test_without_dependencies_increase(self): + cloudpickle_register(ind=1) + with SingleNodeExecutor(max_workers=1, block_allocation=True, disable_dependencies=True) as exe: + future_lst = [exe.submit(sleep_funct, 0.1) for _ in range(4)] + self.assertEqual([f.done() for f in future_lst], [False, False, False, False]) + self.assertEqual(len(exe), 4) + self.assertEqual(exe.max_workers, 1) + future_lst[0].result() + exe.max_workers = 2 + self.assertEqual(exe.max_workers, 2) + self.assertTrue(len(exe) >= 1) + self.assertEqual(len(exe._process), 2) + self.assertEqual([f.done() for f in future_lst], [True, False, False, False]) + self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1]) + self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) + + def test_with_dependencies_decrease(self): + cloudpickle_register(ind=1) + with SingleNodeExecutor(max_workers=2, block_allocation=True, disable_dependencies=False) as exe: + future_lst = [exe.submit(sleep_funct, 1) for _ in range(4)] + self.assertEqual([f.done() for f in future_lst], [False, False, False, False]) + self.assertEqual(len(exe), 4) + sleep_funct(sec=0.5) + exe.max_workers = 1 + self.assertTrue(1 <= sum([f.done() for f in future_lst]) < 3) + self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1]) + self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) + + def test_with_dependencies_increase(self): + cloudpickle_register(ind=1) + with SingleNodeExecutor(max_workers=1, block_allocation=True, disable_dependencies=False) as exe: + future_lst = [exe.submit(sleep_funct, 0.1) for _ in range(4)] + self.assertEqual([f.done() for f in future_lst], [False, False, False, False]) + self.assertEqual(len(exe), 4) + self.assertEqual(exe.max_workers, 1) + future_lst[0].result() + exe.max_workers = 2 + self.assertEqual(exe.max_workers, 2) + self.assertEqual([f.done() for f in future_lst], [True, False, False, False]) + self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1]) + self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) + + def test_no_block_allocation(self): + with self.assertRaises(NotImplementedError): + with SingleNodeExecutor(block_allocation=False, disable_dependencies=False) as exe: + exe.max_workers = 2 + with self.assertRaises(NotImplementedError): + with SingleNodeExecutor(block_allocation=False, disable_dependencies=True) as exe: + exe.max_workers = 2 + + def test_max_workers_stopped_executor(self): + exe = SingleNodeExecutor(block_allocation=True) + exe.shutdown(wait=True) + self.assertIsNone(exe.max_workers)