diff --git a/executorlib/executor/flux.py b/executorlib/executor/flux.py index c1e46c4d..be8fa27c 100644 --- a/executorlib/executor/flux.py +++ b/executorlib/executor/flux.py @@ -43,6 +43,7 @@ class FluxJobExecutor(BaseExecutor): compute notes. Defaults to False. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. + - restart_limit (int): The maximum number of restarting worker processes. Default: 0 pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. diff --git a/executorlib/executor/single.py b/executorlib/executor/single.py index 8c8c5ba1..2f75d0c0 100644 --- a/executorlib/executor/single.py +++ b/executorlib/executor/single.py @@ -120,6 +120,7 @@ def __init__( only) - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. + - restart_limit (int): The maximum number of restarting worker processes. Default: 0 hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And diff --git a/executorlib/executor/slurm.py b/executorlib/executor/slurm.py index 16747bd5..e8244d1b 100644 --- a/executorlib/executor/slurm.py +++ b/executorlib/executor/slurm.py @@ -43,6 +43,7 @@ class SlurmClusterExecutor(BaseExecutor): - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. + - restart_limit (int): The maximum number of restarting worker processes. Default: 0 pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index f8d5ca5b..68c6379a 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -1,7 +1,7 @@ import logging import sys from socket import gethostname -from typing import Any, Optional +from typing import Any, Callable, Optional import cloudpickle import zmq @@ -42,6 +42,17 @@ def __init__( if log_obj_size: self._logger = logging.getLogger("executorlib") self._spawner = spawner + self._command_lst: list[str] = [] + self._booted_sucessfully: bool = False + self._stop_function: Optional[Callable] = None + + @property + def status(self) -> bool: + return self._booted_sucessfully + + @status.setter + def status(self, status: bool): + self._booted_sucessfully = status def send_dict(self, input_dict: dict): """ @@ -67,7 +78,9 @@ def receive_dict(self) -> dict: while len(response_lst) == 0: response_lst = self._poller.poll(self._time_out_ms) if not self._spawner.poll(): - raise ExecutorlibSocketError() + raise ExecutorlibSocketError( + "SocketInterface crashed during execution." + ) data = self._socket.recv(zmq.NOBLOCK) if self._logger is not None: self._logger.warning( @@ -105,20 +118,30 @@ def bind_to_random_port(self) -> int: def bootup( self, - command_lst: list[str], - ) -> bool: + command_lst: Optional[list[str]] = None, + stop_function: Optional[Callable] = None, + ): """ Boot up the client process to connect to the SocketInterface. Args: command_lst (list): list of strings to start the client process - - Returns: - bool: Whether the interface was successfully started. + stop_function (Callable): Function to stop the interface. """ - return self._spawner.bootup( - command_lst=command_lst, - ) + if command_lst is not None: + self._command_lst = command_lst + if stop_function is not None: + self._stop_function = stop_function + if len(self._command_lst) == 0: + raise ValueError("No command defined to boot up SocketInterface.") + if not self._spawner.bootup( + command_lst=self._command_lst, + stop_function=self._stop_function, + ): + self._reset_socket() + self._booted_sucessfully = False + else: + self._booted_sucessfully = True def shutdown(self, wait: bool = True): """ @@ -162,6 +185,7 @@ def interface_bootup( hostname_localhost: Optional[bool] = None, log_obj_size: bool = False, worker_id: Optional[int] = None, + stop_function: Optional[Callable] = None, ) -> SocketInterface: """ Start interface for ZMQ communication @@ -180,6 +204,7 @@ def interface_bootup( log_obj_size (boolean): Enable debug mode which reports the size of the communicated objects. worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource distribution. + stop_function (Callable): Function to stop the interface. Returns: executorlib.shared.communication.SocketInterface: socket interface for zmq communication @@ -203,6 +228,7 @@ def interface_bootup( ] interface.bootup( command_lst=command_lst, + stop_function=stop_function, ) return interface diff --git a/executorlib/standalone/interactive/spawner.py b/executorlib/standalone/interactive/spawner.py index 054bcdc2..0f957c1a 100644 --- a/executorlib/standalone/interactive/spawner.py +++ b/executorlib/standalone/interactive/spawner.py @@ -1,7 +1,7 @@ import os import subprocess from abc import ABC, abstractmethod -from typing import Optional +from typing import Callable, Optional MPI_COMMAND = "mpiexec" @@ -29,12 +29,17 @@ def __init__( def bootup( self, command_lst: list[str], - ): + stop_function: Optional[Callable] = None, + ) -> bool: """ Method to start the interface. Args: command_lst (list[str]): The command list to execute. + stop_function (Callable): Function to stop the interface. + + Returns: + bool: Whether the interface was successfully started. """ raise NotImplementedError @@ -87,12 +92,17 @@ def __init__( def bootup( self, command_lst: list[str], - ): + stop_function: Optional[Callable] = None, + ) -> bool: """ Method to start the subprocess interface. Args: command_lst (list[str]): The command list to execute. + stop_function (Callable): Function to stop the interface. + + Returns: + bool: Whether the interface was successfully started. """ if self._cwd is not None: os.makedirs(self._cwd, exist_ok=True) diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index bfc153c3..d093b77d 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -1,4 +1,5 @@ import queue +import random from concurrent.futures import Future from threading import Thread from typing import Callable, Optional @@ -8,11 +9,21 @@ check_resource_dict, check_resource_dict_is_empty, ) -from executorlib.standalone.interactive.communication import interface_bootup +from executorlib.standalone.interactive.communication import ( + ExecutorlibSocketError, + SocketInterface, + interface_bootup, +) from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.standalone.queue import cancel_items_in_queue from executorlib.task_scheduler.base import TaskSchedulerBase -from executorlib.task_scheduler.interactive.shared import execute_task_dict, task_done +from executorlib.task_scheduler.interactive.shared import ( + execute_task_dict, + reset_task_dict, + task_done, +) + +_interrupt_bootup_dict: dict = {} class BlockAllocationTaskScheduler(TaskSchedulerBase): @@ -63,11 +74,18 @@ def __init__( executor_kwargs["queue_join_on_shutdown"] = False self._process_kwargs = executor_kwargs self._max_workers = max_workers + self_id = random.getrandbits(128) + self._self_id = self_id + _interrupt_bootup_dict[self._self_id] = False self._set_process( process=[ Thread( target=_execute_multiple_tasks, - kwargs=executor_kwargs | {"worker_id": worker_id}, + kwargs=executor_kwargs + | { + "worker_id": worker_id, + "stop_function": lambda: _interrupt_bootup_dict[self_id], + }, ) for worker_id in range(self._max_workers) ], @@ -157,6 +175,7 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): if cancel_futures: cancel_items_in_queue(que=self._future_queue) if isinstance(self._process, list): + _interrupt_bootup_dict[self._self_id] = True for _ in range(len(self._process)): self._future_queue.put({"shutdown": True, "wait": wait}) if wait: @@ -190,6 +209,8 @@ def _execute_multiple_tasks( log_obj_size: bool = False, error_log_file: Optional[str] = None, worker_id: Optional[int] = None, + stop_function: Optional[Callable] = None, + restart_limit: int = 0, **kwargs, ) -> None: """ @@ -216,6 +237,8 @@ def _execute_multiple_tasks( submitted to the Executor. worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource distribution. + stop_function (Callable): Function to stop the interface. + restart_limit (int): The maximum number of restarting worker processes. """ interface = interface_bootup( command_lst=get_interactive_execute_command( @@ -225,34 +248,66 @@ def _execute_multiple_tasks( hostname_localhost=hostname_localhost, log_obj_size=log_obj_size, worker_id=worker_id, + stop_function=stop_function, + ) + interface_initialization_exception = _set_init_function( + interface=interface, + init_function=init_function, ) + restart_counter = 0 + while True: + if not interface.status and restart_counter > restart_limit: + interface.status = True # no more restarts + interface_initialization_exception = ExecutorlibSocketError( + "SocketInterface crashed during execution." + ) + elif not interface.status: + interface.bootup() + interface_initialization_exception = _set_init_function( + interface=interface, + init_function=init_function, + ) + restart_counter += 1 + else: # interface.status == True + task_dict = future_queue.get() + if "shutdown" in task_dict and task_dict["shutdown"]: + if interface.status: + interface.shutdown(wait=task_dict["wait"]) + task_done(future_queue=future_queue) + if queue_join_on_shutdown: + future_queue.join() + break + elif "fn" in task_dict and "future" in task_dict: + f = task_dict.pop("future") + if interface_initialization_exception is not None: + f.set_exception(exception=interface_initialization_exception) + else: + # The interface failed during the execution + interface.status = execute_task_dict( + task_dict=task_dict, + future_obj=f, + interface=interface, + cache_directory=cache_directory, + cache_key=cache_key, + error_log_file=error_log_file, + ) + if not interface.status: + reset_task_dict( + future_obj=f, future_queue=future_queue, task_dict=task_dict + ) + task_done(future_queue=future_queue) + + +def _set_init_function( + interface: SocketInterface, + init_function: Optional[Callable] = None, +) -> Optional[Exception]: interface_initialization_exception = None - if init_function is not None: + if init_function is not None and interface.status: try: _ = interface.send_and_receive_dict( input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} ) except Exception as init_exception: interface_initialization_exception = init_exception - while True: - task_dict = future_queue.get() - if "shutdown" in task_dict and task_dict["shutdown"]: - interface.shutdown(wait=task_dict["wait"]) - task_done(future_queue=future_queue) - if queue_join_on_shutdown: - future_queue.join() - break - elif "fn" in task_dict and "future" in task_dict: - f = task_dict.pop("future") - if interface_initialization_exception is not None: - f.set_exception(exception=interface_initialization_exception) - else: - execute_task_dict( - task_dict=task_dict, - future_obj=f, - interface=interface, - cache_directory=cache_directory, - cache_key=cache_key, - error_log_file=error_log_file, - ) - task_done(future_queue=future_queue) + return interface_initialization_exception diff --git a/executorlib/task_scheduler/interactive/onetoone.py b/executorlib/task_scheduler/interactive/onetoone.py index aa443133..d303ea94 100644 --- a/executorlib/task_scheduler/interactive/onetoone.py +++ b/executorlib/task_scheduler/interactive/onetoone.py @@ -4,7 +4,10 @@ from typing import Optional from executorlib.standalone.command import get_interactive_execute_command -from executorlib.standalone.interactive.communication import interface_bootup +from executorlib.standalone.interactive.communication import ( + ExecutorlibSocketError, + interface_bootup, +) from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.task_scheduler.base import TaskSchedulerBase from executorlib.task_scheduler.interactive.shared import execute_task_dict @@ -230,7 +233,7 @@ def _execute_task_in_thread( error_log_file: Optional[str] = None, worker_id: Optional[int] = None, **kwargs, -) -> None: +): """ Execute a single tasks in parallel using the message passing interface (MPI). @@ -256,7 +259,7 @@ def _execute_task_in_thread( worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource distribution. """ - execute_task_dict( + if not execute_task_dict( task_dict=task_dict, future_obj=future_obj, interface=interface_bootup( @@ -271,4 +274,7 @@ def _execute_task_in_thread( cache_directory=cache_directory, cache_key=cache_key, error_log_file=error_log_file, - ) + ): + future_obj.set_exception( + ExecutorlibSocketError("SocketInterface crashed during execution.") + ) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 3199f9da..e4084222 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -3,9 +3,13 @@ import queue import time from concurrent.futures import Future +from concurrent.futures._base import PENDING from typing import Optional -from executorlib.standalone.interactive.communication import SocketInterface +from executorlib.standalone.interactive.communication import ( + ExecutorlibSocketError, + SocketInterface, +) from executorlib.standalone.serialize import serialize_funct @@ -16,7 +20,7 @@ def execute_task_dict( cache_directory: Optional[str] = None, cache_key: Optional[str] = None, error_log_file: Optional[str] = None, -): +) -> bool: """ Execute the task in the task_dict by communicating it via the interface. @@ -30,22 +34,27 @@ def execute_task_dict( overwritten by setting the cache_key. error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. + + Returns: + bool: True if the task was submitted successfully, False otherwise. """ if not future_obj.done() and future_obj.set_running_or_notify_cancel(): if error_log_file is not None: task_dict["error_log_file"] = error_log_file if cache_directory is None: - _execute_task_without_cache( + return _execute_task_without_cache( interface=interface, task_dict=task_dict, future_obj=future_obj ) else: - _execute_task_with_cache( + return _execute_task_with_cache( interface=interface, task_dict=task_dict, cache_directory=cache_directory, cache_key=cache_key, future_obj=future_obj, ) + else: + return True def task_done(future_queue: queue.Queue): @@ -59,9 +68,27 @@ def task_done(future_queue: queue.Queue): future_queue.task_done() +def reset_task_dict(future_obj: Future, future_queue: queue.Queue, task_dict: dict): + """ + Reset the task dictionary for resubmission to the queue. + + Args: + future_obj (Future): A Future representing the given call. + future_queue (queue): Queue of task dictionaries waiting for execution. + task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys + {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} + + Returns: + bool: True if the task was submitted successfully, False otherwise. + """ + future_obj._state = PENDING + task_done(future_queue=future_queue) + future_queue.put(task_dict | {"future": future_obj}) + + def _execute_task_without_cache( interface: SocketInterface, task_dict: dict, future_obj: Future -): +) -> bool: """ Execute the task in the task_dict by communicating it via the interface. @@ -70,11 +97,18 @@ def _execute_task_without_cache( task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} future_obj (Future): A Future representing the given call. + + Returns: + bool: True if the task was submitted successfully, False otherwise. """ try: future_obj.set_result(interface.send_and_receive_dict(input_dict=task_dict)) except Exception as thread_exception: - future_obj.set_exception(exception=thread_exception) + if isinstance(thread_exception, ExecutorlibSocketError): + return False + else: + future_obj.set_exception(exception=thread_exception) + return True def _execute_task_with_cache( @@ -83,7 +117,7 @@ def _execute_task_with_cache( future_obj: Future, cache_directory: str, cache_key: Optional[str] = None, -): +) -> bool: """ Execute the task in the task_dict by communicating it via the interface using the cache in the cache directory. @@ -115,7 +149,11 @@ def _execute_task_with_cache( dump(file_name=file_name, data_dict=data_dict) future_obj.set_result(result) except Exception as thread_exception: - future_obj.set_exception(exception=thread_exception) + if isinstance(thread_exception, ExecutorlibSocketError): + return False + else: + future_obj.set_exception(exception=thread_exception) else: _, _, result = get_output(file_name=file_name) future_obj.set_result(result) + return True diff --git a/executorlib/task_scheduler/interactive/spawner_flux.py b/executorlib/task_scheduler/interactive/spawner_flux.py index 99e08bd2..3c061a41 100644 --- a/executorlib/task_scheduler/interactive/spawner_flux.py +++ b/executorlib/task_scheduler/interactive/spawner_flux.py @@ -1,5 +1,5 @@ import os -from typing import Optional +from typing import Callable, Optional import flux import flux.job @@ -75,12 +75,14 @@ def __init__( def bootup( self, command_lst: list[str], + stop_function: Optional[Callable] = None, ) -> bool: """ Boot up the client process to connect to the SocketInterface. Args: command_lst (list[str]): List of strings to start the client process. + stop_function (Callable): Function to stop the interface. Raises: ValueError: If oversubscribing is not supported for the Flux adapter or if conda environments are not supported. diff --git a/tests/test_standalone_interactive_communication.py b/tests/test_standalone_interactive_communication.py index 2a1f410a..a644a42d 100644 --- a/tests/test_standalone_interactive_communication.py +++ b/tests/test_standalone_interactive_communication.py @@ -3,6 +3,7 @@ import sys import unittest from time import sleep +from typing import Callable, Optional import numpy as np import zmq @@ -26,6 +27,10 @@ def calc(i): return np.array(i**2) +class BrokenSpawner(MpiExecSpawner): + def bootup(self, command_lst: list[str], stop_function: Optional[Callable] = None,): + return False + class TestInterface(unittest.TestCase): @unittest.skipIf( skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." @@ -36,7 +41,7 @@ def test_interface_mpi(self): interface = SocketInterface( spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False) ) - success_flag = interface.bootup( + interface.bootup( command_lst=[ sys.executable, os.path.abspath( @@ -53,7 +58,7 @@ def test_interface_mpi(self): str(interface.bind_to_random_port()), ] ) - self.assertTrue(success_flag) + self.assertTrue(interface.status) self.assertEqual( interface.send_and_receive_dict(input_dict=task_dict), np.array(4) ) @@ -66,7 +71,7 @@ def test_interface_serial_without_debug(self): spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), log_obj_size=False, ) - success_flag = interface.bootup( + interface.bootup( command_lst=[ sys.executable, os.path.abspath( @@ -83,7 +88,7 @@ def test_interface_serial_without_debug(self): str(interface.bind_to_random_port()), ] ) - self.assertTrue(success_flag) + self.assertTrue(interface.status) self.assertEqual( interface.send_and_receive_dict(input_dict=task_dict), np.array(4) ) @@ -96,7 +101,7 @@ def test_interface_serial_with_debug(self): spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), log_obj_size=True, ) - success_flag = interface.bootup( + interface.bootup( command_lst=[ sys.executable, os.path.abspath( @@ -113,7 +118,7 @@ def test_interface_serial_with_debug(self): str(interface.bind_to_random_port()), ] ) - self.assertTrue(success_flag) + self.assertTrue(interface.status) self.assertEqual( interface.send_and_receive_dict(input_dict=task_dict), np.array(4) ) @@ -125,13 +130,31 @@ def test_interface_serial_with_error(self): spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), log_obj_size=True, ) - success_flag = interface.bootup(command_lst=["bash", "exit"]) - self.assertTrue(success_flag) + interface.bootup(command_lst=["bash", "exit"]) + self.assertTrue(interface.status) while interface._spawner.poll(): sleep(0.1) self.assertFalse(interface._spawner.poll()) interface.shutdown(wait=True) + def test_interface_serial_wrong_input(self): + cloudpickle_register(ind=1) + interface = SocketInterface( + spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), + log_obj_size=True, + ) + with self.assertRaises(ValueError): + interface.bootup(command_lst=None) + + def test_interface_serial_with_broken_spawner(self): + cloudpickle_register(ind=1) + interface = SocketInterface( + spawner=BrokenSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), + log_obj_size=True, + ) + interface.bootup(command_lst=["bash", "exit"]) + self.assertFalse(interface.status) + def test_interface_serial_with_stopped_process(self): cloudpickle_register(ind=1) task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}} @@ -139,7 +162,7 @@ def test_interface_serial_with_stopped_process(self): spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), log_obj_size=True, ) - success_flag = interface.bootup( + interface.bootup( command_lst=[ sys.executable, os.path.abspath( @@ -156,7 +179,7 @@ def test_interface_serial_with_stopped_process(self): str(interface.bind_to_random_port()), ] ) - self.assertTrue(success_flag) + self.assertTrue(interface.status) interface.send_dict(input_dict=task_dict) interface._spawner._process.terminate() with self.assertRaises(ExecutorlibSocketError): diff --git a/tests/test_task_scheduler_interactive_shared.py b/tests/test_task_scheduler_interactive_shared.py new file mode 100644 index 00000000..bd691d3b --- /dev/null +++ b/tests/test_task_scheduler_interactive_shared.py @@ -0,0 +1,192 @@ +import shutil +from concurrent.futures import Future +import unittest + +from executorlib.standalone.command import get_interactive_execute_command +from executorlib.standalone.interactive.communication import interface_bootup, ExecutorlibSocketError +from executorlib.standalone.interactive.spawner import SubprocessSpawner +from executorlib.standalone.serialize import cloudpickle_register +from executorlib.task_scheduler.interactive.shared import execute_task_dict + +try: + import h5py + + skip_h5py_test = False +except ImportError: + skip_h5py_test = True + + +def get_error(): + raise ExecutorlibSocketError() + + +class TestExecuteTaskDictWithoutCache(unittest.TestCase): + def test_execute_task_sum(self): + cloudpickle_register(ind=1) + f = Future() + interface = interface_bootup( + command_lst=get_interactive_execute_command( + cores=1, + ), + connections=SubprocessSpawner(), + hostname_localhost=True, + log_obj_size=False, + worker_id=1, + stop_function=None, + ) + self.assertTrue(interface.status) + self.assertFalse(f.done()) + result = execute_task_dict( + task_dict={"fn": sum, "args": ([1, 2], ), "kwargs": {}}, + future_obj=f, + interface=interface, + cache_directory=None, + cache_key=None, + error_log_file=None, + ) + self.assertTrue(result) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 3) + + def test_execute_task_done(self): + cloudpickle_register(ind=1) + f = Future() + f.set_result(5) + interface = interface_bootup( + command_lst=get_interactive_execute_command( + cores=1, + ), + connections=SubprocessSpawner(), + hostname_localhost=True, + log_obj_size=False, + worker_id=1, + stop_function=None, + ) + self.assertTrue(interface.status) + self.assertTrue(f.done()) + result = execute_task_dict( + task_dict={"fn": sum, "args": ([1, 2], ), "kwargs": {}}, + future_obj=f, + interface=interface, + cache_directory=None, + cache_key=None, + error_log_file=None, + ) + self.assertTrue(result) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 5) + + def test_execute_task_error(self): + cloudpickle_register(ind=1) + f = Future() + interface = interface_bootup( + command_lst=get_interactive_execute_command( + cores=1, + ), + connections=SubprocessSpawner(), + hostname_localhost=True, + log_obj_size=False, + worker_id=1, + stop_function=None, + ) + self.assertTrue(interface.status) + self.assertFalse(f.done()) + result = execute_task_dict( + task_dict={"fn": get_error, "args": (), "kwargs": {}}, + future_obj=f, + interface=interface, + cache_directory=None, + cache_key=None, + error_log_file=None, + ) + self.assertFalse(result) + self.assertFalse(f.done()) + + +@unittest.skipIf( + skip_h5py_test, "h5py is not installed, so the h5io tests are skipped." +) +class TestExecuteTaskDictWithCache(unittest.TestCase): + def tearDown(self): + shutil.rmtree("cache_execute_task", ignore_errors=True) + + def test_execute_task_sum(self): + cloudpickle_register(ind=1) + f = Future() + interface = interface_bootup( + command_lst=get_interactive_execute_command( + cores=1, + ), + connections=SubprocessSpawner(), + hostname_localhost=True, + log_obj_size=False, + worker_id=1, + stop_function=None, + ) + self.assertTrue(interface.status) + self.assertFalse(f.done()) + result = execute_task_dict( + task_dict={"fn": sum, "args": ([1, 2], ), "kwargs": {}}, + future_obj=f, + interface=interface, + cache_directory="cache_execute_task", + cache_key=None, + error_log_file=None, + ) + self.assertTrue(result) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 3) + + def test_execute_task_done(self): + cloudpickle_register(ind=1) + f = Future() + f.set_result(5) + interface = interface_bootup( + command_lst=get_interactive_execute_command( + cores=1, + ), + connections=SubprocessSpawner(), + hostname_localhost=True, + log_obj_size=False, + worker_id=1, + stop_function=None, + ) + self.assertTrue(interface.status) + self.assertTrue(f.done()) + result = execute_task_dict( + task_dict={"fn": sum, "args": ([1, 2], ), "kwargs": {}}, + future_obj=f, + interface=interface, + cache_directory="cache_execute_task", + cache_key=None, + error_log_file=None, + ) + self.assertTrue(result) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 5) + + def test_execute_task_error(self): + cloudpickle_register(ind=1) + f = Future() + interface = interface_bootup( + command_lst=get_interactive_execute_command( + cores=1, + ), + connections=SubprocessSpawner(), + hostname_localhost=True, + log_obj_size=False, + worker_id=1, + stop_function=None, + ) + self.assertTrue(interface.status) + self.assertFalse(f.done()) + result = execute_task_dict( + task_dict={"fn": get_error, "args": (), "kwargs": {}}, + future_obj=f, + interface=interface, + cache_directory="cache_execute_task", + cache_key=None, + error_log_file=None, + ) + self.assertFalse(result) + self.assertFalse(f.done()) \ No newline at end of file