Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions executorlib/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ def __init__(self, max_cores: Optional[int] = None):
self._future_queue: Optional[queue.Queue] = queue.Queue()
self._process: Optional[Union[Thread, list[Thread]]] = None

@property
def max_workers(self) -> Optional[int]:
return self._process_kwargs.get("max_workers")

@max_workers.setter
def max_workers(self, max_workers: int):
raise NotImplementedError("The max_workers setter is not implemented.")

@property
def info(self) -> Optional[dict]:
"""
Expand Down
35 changes: 33 additions & 2 deletions executorlib/interactive/blockallocation.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import queue
from concurrent.futures import Future
from threading import Thread
from typing import Callable, Optional
Expand Down Expand Up @@ -27,7 +28,7 @@ class BlockAllocationExecutor(ExecutorBase):
Examples:

>>> import numpy as np
>>> from executorlib.interactive.shared import BlockAllocationExecutor
>>> from executorlib.interactive.blockallocation import BlockAllocationExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
Expand Down Expand Up @@ -58,16 +59,46 @@ def __init__(
executor_kwargs["spawner"] = spawner
executor_kwargs["queue_join_on_shutdown"] = False
self._process_kwargs = executor_kwargs
self._max_workers = max_workers
self._set_process(
process=[
Thread(
target=execute_tasks,
kwargs=executor_kwargs,
)
for _ in range(max_workers)
for _ in range(self._max_workers)
],
)

@property
def max_workers(self) -> int:
return self._max_workers

@max_workers.setter
def max_workers(self, max_workers: int):
if isinstance(self._future_queue, queue.Queue) and isinstance(
self._process, list
):
if self._max_workers > max_workers:
for _ in range(self._max_workers - max_workers):
self._future_queue.queue.insert(0, {"shutdown": True, "wait": True})
while len(self._process) > max_workers:
self._process = [
process for process in self._process if process.is_alive()
]
elif self._max_workers < max_workers:
new_process_lst = [
Thread(
target=execute_tasks,
kwargs=self._process_kwargs,
)
for _ in range(max_workers - self._max_workers)
]
for process_instance in new_process_lst:
process_instance.start()
self._process += new_process_lst
self._max_workers = max_workers

def submit( # type: ignore
self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs
) -> Future:
Expand Down
40 changes: 38 additions & 2 deletions executorlib/interactive/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,39 @@ def info(self) -> Optional[dict]:
if isinstance(self._future_queue, queue.Queue):
f: Future = Future()
self._future_queue.queue.insert(
0, {"internal": True, "task": "info", "future": f}
0, {"internal": True, "task": "get_info", "future": f}
)
return f.result()
else:
return None

@property
def max_workers(self) -> Optional[int]:
if isinstance(self._future_queue, queue.Queue):
f: Future = Future()
self._future_queue.queue.insert(
0, {"internal": True, "task": "get_max_workers", "future": f}
)
return f.result()
else:
return None

@max_workers.setter
def max_workers(self, max_workers: int):
if isinstance(self._future_queue, queue.Queue):
f: Future = Future()
self._future_queue.queue.insert(
0,
{
"internal": True,
"task": "set_max_workers",
"max_workers": max_workers,
"future": f,
},
)
if not f.result():
raise NotImplementedError("The max_workers setter is not implemented.")

def submit( # type: ignore
self,
fn: Callable[..., Any],
Expand Down Expand Up @@ -188,8 +215,17 @@ def _execute_tasks_with_dependencies(
if ( # shutdown the executor
task_dict is not None and "internal" in task_dict and task_dict["internal"]
):
if task_dict["task"] == "info":
if task_dict["task"] == "get_info":
task_dict["future"].set_result(executor.info)
elif task_dict["task"] == "get_max_workers":
task_dict["future"].set_result(executor.max_workers)
elif task_dict["task"] == "set_max_workers":
try:
executor.max_workers = task_dict["max_workers"]
except NotImplementedError:
task_dict["future"].set_result(False)
else:
task_dict["future"].set_result(True)
elif ( # handle function submitted to the executor
task_dict is not None and "fn" in task_dict and "future" in task_dict
):
Expand Down
16 changes: 11 additions & 5 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import importlib.util
import os
import queue
Expand Down Expand Up @@ -57,7 +58,7 @@ def execute_tasks(
task_dict = future_queue.get()
if "shutdown" in task_dict and task_dict["shutdown"]:
interface.shutdown(wait=task_dict["wait"])
future_queue.task_done()
_task_done(future_queue=future_queue)
if queue_join_on_shutdown:
future_queue.join()
break
Expand Down Expand Up @@ -117,10 +118,10 @@ def _execute_task_without_cache(
f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
except Exception as thread_exception:
interface.shutdown(wait=True)
future_queue.task_done()
_task_done(future_queue=future_queue)
f.set_exception(exception=thread_exception)
else:
future_queue.task_done()
_task_done(future_queue=future_queue)


def _execute_task_with_cache(
Expand Down Expand Up @@ -161,13 +162,18 @@ def _execute_task_with_cache(
f.set_result(result)
except Exception as thread_exception:
interface.shutdown(wait=True)
future_queue.task_done()
_task_done(future_queue=future_queue)
f.set_exception(exception=thread_exception)
raise thread_exception
else:
future_queue.task_done()
_task_done(future_queue=future_queue)
else:
_, result = get_output(file_name=file_name)
future = task_dict["future"]
future.set_result(result)
_task_done(future_queue=future_queue)


def _task_done(future_queue: queue.Queue):
with contextlib.suppress(ValueError):
future_queue.task_done()
16 changes: 16 additions & 0 deletions tests/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ def test_pympiexecutor_two_workers(self):
self.assertTrue(fs_1.done())
self.assertTrue(fs_2.done())

def test_max_workers(self):
with BlockAllocationExecutor(
max_workers=2,
executor_kwargs={},
spawner=MpiExecSpawner,
) as exe:
self.assertEqual(exe.max_workers, 2)

def test_pympiexecutor_one_worker(self):
with BlockAllocationExecutor(
max_workers=1,
Expand Down Expand Up @@ -107,6 +115,14 @@ def test_pympiexecutor_two_workers(self):
self.assertTrue(fs_1.done())
self.assertTrue(fs_2.done())

def test_max_workers(self):
with OneTaskPerProcessExecutor(
max_workers=2,
executor_kwargs={},
spawner=MpiExecSpawner,
) as exe:
self.assertEqual(exe.max_workers, 2)

def test_pympiexecutor_one_worker(self):
with OneTaskPerProcessExecutor(
max_cores=1,
Expand Down
80 changes: 80 additions & 0 deletions tests/test_local_executor_resize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import unittest
from executorlib import SingleNodeExecutor
from executorlib.standalone.serialize import cloudpickle_register


def sleep_funct(sec):
from time import sleep
sleep(sec)
return sec


class TestResizing(unittest.TestCase):
def test_without_dependencies_decrease(self):
cloudpickle_register(ind=1)
with SingleNodeExecutor(max_workers=2, block_allocation=True, disable_dependencies=True) as exe:
future_lst = [exe.submit(sleep_funct, 1) for _ in range(4)]
self.assertEqual([f.done() for f in future_lst], [False, False, False, False])
self.assertEqual(len(exe), 4)
sleep_funct(sec=0.5)
exe.max_workers = 1
self.assertTrue(len(exe) >= 1)
self.assertEqual(len(exe._process), 1)
self.assertTrue(1 <= sum([f.done() for f in future_lst]) < 3)
self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1])
self.assertEqual([f.done() for f in future_lst], [True, True, True, True])

def test_without_dependencies_increase(self):
cloudpickle_register(ind=1)
with SingleNodeExecutor(max_workers=1, block_allocation=True, disable_dependencies=True) as exe:
future_lst = [exe.submit(sleep_funct, 0.1) for _ in range(4)]
self.assertEqual([f.done() for f in future_lst], [False, False, False, False])
self.assertEqual(len(exe), 4)
self.assertEqual(exe.max_workers, 1)
future_lst[0].result()
exe.max_workers = 2
self.assertEqual(exe.max_workers, 2)
self.assertTrue(len(exe) >= 1)
self.assertEqual(len(exe._process), 2)
self.assertEqual([f.done() for f in future_lst], [True, False, False, False])
self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1])
self.assertEqual([f.done() for f in future_lst], [True, True, True, True])

def test_with_dependencies_decrease(self):
cloudpickle_register(ind=1)
with SingleNodeExecutor(max_workers=2, block_allocation=True, disable_dependencies=False) as exe:
future_lst = [exe.submit(sleep_funct, 1) for _ in range(4)]
self.assertEqual([f.done() for f in future_lst], [False, False, False, False])
self.assertEqual(len(exe), 4)
sleep_funct(sec=0.5)
exe.max_workers = 1
self.assertTrue(1 <= sum([f.done() for f in future_lst]) < 3)
self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1])
self.assertEqual([f.done() for f in future_lst], [True, True, True, True])

def test_with_dependencies_increase(self):
cloudpickle_register(ind=1)
with SingleNodeExecutor(max_workers=1, block_allocation=True, disable_dependencies=False) as exe:
future_lst = [exe.submit(sleep_funct, 0.1) for _ in range(4)]
self.assertEqual([f.done() for f in future_lst], [False, False, False, False])
self.assertEqual(len(exe), 4)
self.assertEqual(exe.max_workers, 1)
future_lst[0].result()
exe.max_workers = 2
self.assertEqual(exe.max_workers, 2)
self.assertEqual([f.done() for f in future_lst], [True, False, False, False])
self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1])
self.assertEqual([f.done() for f in future_lst], [True, True, True, True])

def test_no_block_allocation(self):
with self.assertRaises(NotImplementedError):
with SingleNodeExecutor(block_allocation=False, disable_dependencies=False) as exe:
exe.max_workers = 2
with self.assertRaises(NotImplementedError):
with SingleNodeExecutor(block_allocation=False, disable_dependencies=True) as exe:
exe.max_workers = 2

def test_max_workers_stopped_executor(self):
exe = SingleNodeExecutor(block_allocation=True)
exe.shutdown(wait=True)
self.assertIsNone(exe.max_workers)
Loading