diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 408758b5..f070d5ab 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -3,11 +3,10 @@ import queue import sys import time -from asyncio.exceptions import CancelledError -from concurrent.futures import Future, TimeoutError +from concurrent.futures import Future from threading import Thread from time import sleep -from typing import Any, Callable, Optional, Union +from typing import Callable, Optional from executorlib.base.executor import ExecutorBase, cancel_items_in_queue from executorlib.standalone.command import get_command_path @@ -15,6 +14,12 @@ check_resource_dict, check_resource_dict_is_empty, ) +from executorlib.standalone.interactive.arguments import ( + check_exception_was_raised, + get_exception_lst, + get_future_objects_from_input, + update_futures_in_input, +) from executorlib.standalone.interactive.communication import ( SocketInterface, interface_bootup, @@ -361,14 +366,16 @@ def execute_tasks_with_dependencies( elif ( # handle function submitted to the executor task_dict is not None and "fn" in task_dict and "future" in task_dict ): - future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict) - exception_lst = _get_exception_lst(future_lst=future_lst) - if not _get_exception(future_obj=task_dict["future"]): + future_lst, ready_flag = get_future_objects_from_input( + args=task_dict["args"], kwargs=task_dict["kwargs"] + ) + exception_lst = get_exception_lst(future_lst=future_lst) + if not check_exception_was_raised(future_obj=task_dict["future"]): if len(exception_lst) > 0: task_dict["future"].set_exception(exception_lst[0]) elif len(future_lst) == 0 or ready_flag: # No future objects are used in the input or all future objects are already done - task_dict["args"], task_dict["kwargs"] = _update_futures_in_input( + task_dict["args"], task_dict["kwargs"] = update_futures_in_input( args=task_dict["args"], kwargs=task_dict["kwargs"] ) executor_queue.put(task_dict) @@ -460,12 +467,12 @@ def _submit_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> l """ wait_tmp_lst = [] for task_wait_dict in wait_lst: - exception_lst = _get_exception_lst(future_lst=task_wait_dict["future_lst"]) + exception_lst = get_exception_lst(future_lst=task_wait_dict["future_lst"]) if len(exception_lst) > 0: task_wait_dict["future"].set_exception(exception_lst[0]) elif all(future.done() for future in task_wait_dict["future_lst"]): del task_wait_dict["future_lst"] - task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input( + task_wait_dict["args"], task_wait_dict["kwargs"] = update_futures_in_input( args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"] ) executor_queue.put(task_wait_dict) @@ -474,63 +481,6 @@ def _submit_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> l return wait_tmp_lst -def _update_futures_in_input(args: tuple, kwargs: dict) -> tuple[tuple, dict]: - """ - Evaluate future objects in the arguments and keyword arguments by calling future.result() - - Args: - args (tuple): function arguments - kwargs (dict): function keyword arguments - - Returns: - tuple, dict: arguments and keyword arguments with each future object in them being evaluated - """ - - def get_result(arg: Union[list[Future], Future]) -> Any: - if isinstance(arg, Future): - return arg.result() - elif isinstance(arg, list): - return [get_result(arg=el) for el in arg] - elif isinstance(arg, dict): - return {k: get_result(arg=v) for k, v in arg.items()} - else: - return arg - - args = tuple([get_result(arg=arg) for arg in args]) - kwargs = {key: get_result(arg=value) for key, value in kwargs.items()} - return args, kwargs - - -def _get_future_objects_from_input(task_dict: dict): - """ - Check the input parameters if they contain future objects and which of these future objects are executed - - Args: - task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys - {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} - - Returns: - list, boolean: list of future objects and boolean flag if all future objects are already done - """ - future_lst = [] - - def find_future_in_list(lst): - for el in lst: - if isinstance(el, Future): - future_lst.append(el) - elif isinstance(el, list): - find_future_in_list(lst=el) - elif isinstance(el, dict): - find_future_in_list(lst=el.values()) - - find_future_in_list(lst=task_dict["args"]) - find_future_in_list(lst=task_dict["kwargs"].values()) - boolean_flag = len([future for future in future_lst if future.done()]) == len( - future_lst - ) - return future_lst, boolean_flag - - def _submit_function_to_separate_process( task_dict: dict, active_task_dict: dict, @@ -670,15 +620,3 @@ def _execute_task_with_cache( future = task_dict["future"] future.set_result(result) future_queue.task_done() - - -def _get_exception_lst(future_lst: list[Future]) -> list: - return [f.exception() for f in future_lst if _get_exception(future_obj=f)] - - -def _get_exception(future_obj: Future) -> bool: - try: - excp = future_obj.exception(timeout=10**-10) - return excp is not None and not isinstance(excp, CancelledError) - except TimeoutError: - return False diff --git a/executorlib/standalone/__init__.py b/executorlib/standalone/__init__.py index c752f544..757663a8 100644 --- a/executorlib/standalone/__init__.py +++ b/executorlib/standalone/__init__.py @@ -1,3 +1,8 @@ +""" +Submodules in the executorlib.standalone module do not depend on other modules of the executorlib package. This strict +separation simplifies the development, testing and debugging. +""" + from executorlib.standalone.interactive.communication import ( SocketInterface, interface_bootup, diff --git a/executorlib/standalone/interactive/arguments.py b/executorlib/standalone/interactive/arguments.py new file mode 100644 index 00000000..f51f0b02 --- /dev/null +++ b/executorlib/standalone/interactive/arguments.py @@ -0,0 +1,93 @@ +from asyncio.exceptions import CancelledError +from concurrent.futures import Future, TimeoutError +from typing import Any, Union + + +def get_future_objects_from_input(args: tuple, kwargs: dict): + """ + Check the input parameters if they contain future objects and which of these future objects are executed + + Args: + args (tuple): function arguments + kwargs (dict): function keyword arguments + + Returns: + list, boolean: list of future objects and boolean flag if all future objects are already done + """ + future_lst = [] + + def find_future_in_list(lst): + for el in lst: + if isinstance(el, Future): + future_lst.append(el) + elif isinstance(el, list): + find_future_in_list(lst=el) + elif isinstance(el, dict): + find_future_in_list(lst=el.values()) + + find_future_in_list(lst=args) + find_future_in_list(lst=kwargs.values()) + boolean_flag = len([future for future in future_lst if future.done()]) == len( + future_lst + ) + return future_lst, boolean_flag + + +def get_exception_lst(future_lst: list[Future]) -> list: + """ + Get list of exceptions raised by the future objects in the list of future objects + + Args: + future_lst (list): list of future objects + + Returns: + list: list of exceptions raised by the future objects in the list of future objects. Returns empty list if no + exception was raised. + """ + return [ + f.exception() for f in future_lst if check_exception_was_raised(future_obj=f) + ] + + +def check_exception_was_raised(future_obj: Future) -> bool: + """ + Check if exception was raised by future object + + Args: + future_obj (Future): future object + + Returns: + bool: True if exception was raised, False otherwise + """ + try: + excp = future_obj.exception(timeout=10**-10) + return excp is not None and not isinstance(excp, CancelledError) + except TimeoutError: + return False + + +def update_futures_in_input(args: tuple, kwargs: dict) -> tuple[tuple, dict]: + """ + Evaluate future objects in the arguments and keyword arguments by calling future.result() + + Args: + args (tuple): function arguments + kwargs (dict): function keyword arguments + + Returns: + tuple, dict: arguments and keyword arguments with each future object in them being evaluated + """ + + def get_result(arg: Union[list[Future], Future]) -> Any: + if isinstance(arg, Future): + return arg.result() + elif isinstance(arg, list): + return [get_result(arg=el) for el in arg] + elif isinstance(arg, dict): + return {k: get_result(arg=v) for k, v in arg.items()} + else: + return arg + + args = tuple([get_result(arg=arg) for arg in args]) + kwargs = {key: get_result(arg=value) for key, value in kwargs.items()} + return args, kwargs diff --git a/tests/test_interactive_future_arguments.py b/tests/test_interactive_future_arguments.py new file mode 100644 index 00000000..2e86e9eb --- /dev/null +++ b/tests/test_interactive_future_arguments.py @@ -0,0 +1,69 @@ +from concurrent.futures import Future +import unittest + +from executorlib.standalone.interactive.arguments import ( + check_exception_was_raised, + get_exception_lst, + get_future_objects_from_input, + update_futures_in_input, +) + + +class TestSerial(unittest.TestCase): + def test_get_future_objects_from_input_with_future(self): + input_args = (1, 2, Future(), [Future()], {3: Future()}) + input_kwargs = {"a": 1, "b": [Future()], "c": {"d": Future()}, "e": Future()} + future_lst, boolean_flag = get_future_objects_from_input(args=input_args, kwargs=input_kwargs) + self.assertEqual(len(future_lst), 6) + self.assertFalse(boolean_flag) + + def test_get_future_objects_from_input_without_future(self): + input_args = (1, 2) + input_kwargs = {"a": 1} + future_lst, boolean_flag = get_future_objects_from_input(args=input_args, kwargs=input_kwargs) + self.assertEqual(len(future_lst), 0) + self.assertTrue(boolean_flag) + + def test_update_futures_in_input_with_future(self): + f1 = Future() + f1.set_result(1) + f2 = Future() + f2.set_result(2) + f3 = Future() + f3.set_result(3) + f4 = Future() + f4.set_result(4) + f5 = Future() + f5.set_result(5) + f6 = Future() + f6.set_result(6) + input_args = (1, 2, f1, [f2], {3: f3}) + input_kwargs = {"a": 1, "b": [f4], "c": {"d": f5}, "e": f6} + output_args, output_kwargs = update_futures_in_input(args=input_args, kwargs=input_kwargs) + self.assertEqual(output_args, (1, 2, 1, [2], {3: 3})) + self.assertEqual(output_kwargs, {"a": 1, "b": [4], "c": {"d": 5}, "e": 6}) + + def test_update_futures_in_input_without_future(self): + input_args = (1, 2) + input_kwargs = {"a": 1} + output_args, output_kwargs = update_futures_in_input(args=input_args, kwargs=input_kwargs) + self.assertEqual(input_args, output_args) + self.assertEqual(input_kwargs, output_kwargs) + + def test_check_exception_was_raised(self): + f_with_exception = Future() + f_with_exception.set_exception(ValueError()) + f_without_exception = Future() + self.assertTrue(check_exception_was_raised(future_obj=f_with_exception)) + self.assertFalse(check_exception_was_raised(future_obj=f_without_exception)) + + def test_get_exception_lst(self): + f_with_exception = Future() + f_with_exception.set_exception(ValueError()) + f_without_exception = Future() + future_with_exception_lst = [f_with_exception, f_with_exception, f_without_exception, f_without_exception, f_with_exception] + future_without_exception_lst = [f_without_exception, f_without_exception, f_without_exception, f_without_exception] + exception_lst = get_exception_lst(future_lst=future_with_exception_lst) + self.assertEqual(len(exception_lst), 3) + exception_lst = get_exception_lst(future_lst=future_without_exception_lst) + self.assertEqual(len(exception_lst), 0)