Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions executorlib/interactive/blockallocation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from concurrent.futures import Future
from threading import Thread
from typing import Callable, Optional

from executorlib.base.executor import ExecutorBase, cancel_items_in_queue
from executorlib.interactive.shared import execute_tasks
from executorlib.standalone.inputcheck import (
check_resource_dict,
check_resource_dict_is_empty,
)
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner


class BlockAllocationExecutor(ExecutorBase):
"""
The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib interfaces to distribute python
tasks on a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the
executorlib.interactive.executor.InteractiveExecutor can be executed in a serial python process and does not require
the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to
improves the usability in particular when used in combination with Jupyter notebooks.

Args:
max_workers (int): defines the number workers which can execute functions in parallel
executor_kwargs (dict): keyword arguments for the executor
spawner (BaseSpawner): interface class to initiate python processes

Examples:

>>> import numpy as np
>>> from executorlib.interactive.shared import BlockAllocationExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with BlockAllocationExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]

"""

def __init__(
self,
max_workers: int = 1,
executor_kwargs: Optional[dict] = None,
spawner: type[BaseSpawner] = MpiExecSpawner,
):
if executor_kwargs is None:
executor_kwargs = {}

Check warning on line 55 in executorlib/interactive/blockallocation.py

View check run for this annotation

Codecov / codecov/patch

executorlib/interactive/blockallocation.py#L55

Added line #L55 was not covered by tests
super().__init__(max_cores=executor_kwargs.get("max_cores"))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["queue_join_on_shutdown"] = False
self._process_kwargs = executor_kwargs
self._set_process(
process=[
Thread(
target=execute_tasks,
kwargs=executor_kwargs,
)
for _ in range(max_workers)
],
)

Comment on lines +48 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate or guard the max_workers parameter.

If max_workers is provided as zero or a negative number, it may result in an empty or invalid thread list, potentially causing runtime errors or wasted resources. Consider adding an assertion or handling to enforce max_workers >= 1.

 def __init__(
     self,
     max_workers: int = 1,
     executor_kwargs: Optional[dict] = None,
     spawner: type[BaseSpawner] = MpiExecSpawner,
 ):
+    if max_workers < 1:
+        raise ValueError("max_workers must be at least 1.")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(
self,
max_workers: int = 1,
executor_kwargs: Optional[dict] = None,
spawner: type[BaseSpawner] = MpiExecSpawner,
):
if executor_kwargs is None:
executor_kwargs = {}
super().__init__(max_cores=executor_kwargs.get("max_cores"))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["queue_join_on_shutdown"] = False
self._process_kwargs = executor_kwargs
self._set_process(
process=[
Thread(
target=execute_tasks,
kwargs=executor_kwargs,
)
for _ in range(max_workers)
],
)
def __init__(
self,
max_workers: int = 1,
executor_kwargs: Optional[dict] = None,
spawner: type[BaseSpawner] = MpiExecSpawner,
):
if max_workers < 1:
raise ValueError("max_workers must be at least 1.")
if executor_kwargs is None:
executor_kwargs = {}
super().__init__(max_cores=executor_kwargs.get("max_cores"))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["queue_join_on_shutdown"] = False
self._process_kwargs = executor_kwargs
self._set_process(
process=[
Thread(
target=execute_tasks,
kwargs=executor_kwargs,
)
for _ in range(max_workers)
],
)

def submit( # type: ignore
self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs
) -> Future:
"""
Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.

Args:
fn (Callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}

Returns:
Future: A Future representing the given call.
"""
if resource_dict is None:
resource_dict = {}
check_resource_dict_is_empty(resource_dict=resource_dict)
check_resource_dict(function=fn)
f: Future = Future()
if self._future_queue is not None:
self._future_queue.put(
{"fn": fn, "args": args, "kwargs": kwargs, "future": f}
)
return f

Comment on lines +71 to +108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Re-examine the resource_dict checks for consistency.

This method calls both check_resource_dict_is_empty and check_resource_dict, which might conflict if resource dictionaries with certain fields are valid. Verify whether you want to allow a non-empty resource_dict or enforce it to be empty. Consider removing one of these checks or clarifying the intended behavior.

-if resource_dict is None:
-    resource_dict = {}
-check_resource_dict_is_empty(resource_dict=resource_dict)
-check_resource_dict(function=fn)
+if resource_dict is None:
+    resource_dict = {}
+check_resource_dict(function=fn)
 # Remove or correct the empty-resource check if resources are allowed
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def submit( # type: ignore
self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs
) -> Future:
"""
Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Args:
fn (Callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}
Returns:
Future: A Future representing the given call.
"""
if resource_dict is None:
resource_dict = {}
check_resource_dict_is_empty(resource_dict=resource_dict)
check_resource_dict(function=fn)
f: Future = Future()
if self._future_queue is not None:
self._future_queue.put(
{"fn": fn, "args": args, "kwargs": kwargs, "future": f}
)
return f
def submit( # type: ignore
self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs
) -> Future:
"""
Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Args:
fn (Callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}
Returns:
Future: A Future representing the given call.
"""
if resource_dict is None:
resource_dict = {}
check_resource_dict(function=fn)
f: Future = Future()
if self._future_queue is not None:
self._future_queue.put(
{"fn": fn, "args": args, "kwargs": kwargs, "future": f}
)
return f

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
"""Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other
methods can be called after this one.

Args:
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
parallel_executors have been reclaimed.
cancel_futures: If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be
cancelled.
"""
if self._future_queue is not None:
if cancel_futures:
cancel_items_in_queue(que=self._future_queue)
if isinstance(self._process, list):
for _ in range(len(self._process)):
self._future_queue.put({"shutdown": True, "wait": wait})
if wait:
for process in self._process:
process.join()
self._future_queue.join()
self._process = None
self._future_queue = None

def _set_process(self, process: list[Thread]): # type: ignore
"""
Set the process for the executor.

Args:
process (List[RaisingThread]): The process for the executor.
"""
self._process = process
for process_instance in self._process:
process_instance.start()
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import queue
from concurrent.futures import Future
from threading import Thread
from time import sleep
from typing import Any, Callable, Optional

from executorlib.base.executor import ExecutorBase
from executorlib.interactive.shared import execute_tasks_with_dependencies
from executorlib.standalone.interactive.arguments import (
check_exception_was_raised,
get_exception_lst,
get_future_objects_from_input,
update_futures_in_input,
)
from executorlib.standalone.plot import (
draw,
generate_nodes_and_edges,
generate_task_hash,
)


class ExecutorWithDependencies(ExecutorBase):
class DependencyExecutor(ExecutorBase):
"""
ExecutorWithDependencies is a class that extends ExecutorBase and provides functionality for executing tasks with
dependencies.
Expand All @@ -20,8 +27,6 @@ class ExecutorWithDependencies(ExecutorBase):
refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.
plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.

Attributes:
_future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object.
Expand All @@ -48,7 +53,7 @@ def __init__(
}
self._set_process(
Thread(
target=execute_tasks_with_dependencies,
target=_execute_tasks_with_dependencies,
kwargs=self._process_kwargs,
)
)
Expand Down Expand Up @@ -132,3 +137,93 @@ def __exit__(
edge_lst=edge_lst,
filename=self._plot_dependency_graph_filename,
)


def _execute_tasks_with_dependencies(
future_queue: queue.Queue,
executor_queue: queue.Queue,
executor: ExecutorBase,
refresh_rate: float = 0.01,
):
"""
Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from
other tasks.

Args:
future_queue (Queue): Queue for receiving new tasks.
executor_queue (Queue): Queue for the internal executor.
executor (ExecutorBase): Executor to execute the tasks with after the dependencies are resolved.
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
"""
wait_lst = []
while True:
try:
task_dict = future_queue.get_nowait()
except queue.Empty:
task_dict = None
if ( # shutdown the executor
task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"]
):
executor.shutdown(wait=task_dict["wait"])
future_queue.task_done()
future_queue.join()
break
elif ( # handle function submitted to the executor
task_dict is not None and "fn" in task_dict and "future" in task_dict
):
future_lst, ready_flag = get_future_objects_from_input(
args=task_dict["args"], kwargs=task_dict["kwargs"]
)
exception_lst = get_exception_lst(future_lst=future_lst)
if not check_exception_was_raised(future_obj=task_dict["future"]):
if len(exception_lst) > 0:
task_dict["future"].set_exception(exception_lst[0])
elif len(future_lst) == 0 or ready_flag:
# No future objects are used in the input or all future objects are already done
task_dict["args"], task_dict["kwargs"] = update_futures_in_input(
args=task_dict["args"], kwargs=task_dict["kwargs"]
)
executor_queue.put(task_dict)
else: # Otherwise add the function to the wait list
task_dict["future_lst"] = future_lst
wait_lst.append(task_dict)
future_queue.task_done()
elif len(wait_lst) > 0:
number_waiting = len(wait_lst)
# Check functions in the wait list and execute them if all future objects are now ready
wait_lst = _update_waiting_task(
wait_lst=wait_lst, executor_queue=executor_queue
)
# if no job is ready, sleep for a moment
if len(wait_lst) == number_waiting:
sleep(refresh_rate)
else:
# If there is nothing else to do, sleep for a moment
sleep(refresh_rate)


def _update_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> list:
"""
Submit the waiting tasks, which future inputs have been completed, to the executor

Args:
wait_lst (list): List of waiting tasks
executor_queue (Queue): Queue of the internal executor

Returns:
list: list tasks which future inputs have not been completed
"""
wait_tmp_lst = []
for task_wait_dict in wait_lst:
exception_lst = get_exception_lst(future_lst=task_wait_dict["future_lst"])
if len(exception_lst) > 0:
task_wait_dict["future"].set_exception(exception_lst[0])
elif all(future.done() for future in task_wait_dict["future_lst"]):
del task_wait_dict["future_lst"]
task_wait_dict["args"], task_wait_dict["kwargs"] = update_futures_in_input(
args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"]
)
executor_queue.put(task_wait_dict)
else:
wait_tmp_lst.append(task_wait_dict)
return wait_tmp_lst
Loading
Loading