|
8 | 8 |
|
9 | 9 | from executorlib.base.executor import ExecutorBase, cancel_items_in_queue |
10 | 10 | from executorlib.standalone.command import get_command_path |
11 | | -from executorlib.standalone.inputcheck import check_resource_dict |
| 11 | +from executorlib.standalone.inputcheck import ( |
| 12 | + check_resource_dict, |
| 13 | + check_resource_dict_is_empty, |
| 14 | +) |
12 | 15 | from executorlib.standalone.interactive.communication import ( |
13 | 16 | SocketInterface, |
14 | 17 | interface_bootup, |
|
19 | 22 |
|
20 | 23 |
|
21 | 24 | class ExecutorBroker(ExecutorBase): |
| 25 | + def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future: |
| 26 | + """ |
| 27 | + Submits a callable to be executed with the given arguments. |
| 28 | +
|
| 29 | + Schedules the callable to be executed as fn(*args, **kwargs) and returns |
| 30 | + a Future instance representing the execution of the callable. |
| 31 | +
|
| 32 | + Args: |
| 33 | + fn (callable): function to submit for execution |
| 34 | + args: arguments for the submitted function |
| 35 | + kwargs: keyword arguments for the submitted function |
| 36 | + resource_dict (dict): resource dictionary, which defines the resources used for the execution of the |
| 37 | + function. Example resource dictionary: { |
| 38 | + cores: 1, |
| 39 | + threads_per_core: 1, |
| 40 | + gpus_per_worker: 0, |
| 41 | + oversubscribe: False, |
| 42 | + cwd: None, |
| 43 | + executor: None, |
| 44 | + hostname_localhost: False, |
| 45 | + } |
| 46 | +
|
| 47 | + Returns: |
| 48 | + Future: A Future representing the given call. |
| 49 | + """ |
| 50 | + check_resource_dict_is_empty(resource_dict=resource_dict) |
| 51 | + check_resource_dict(function=fn) |
| 52 | + f = Future() |
| 53 | + self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) |
| 54 | + return f |
| 55 | + |
22 | 56 | def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): |
23 | 57 | """Clean-up the resources associated with the Executor. |
24 | 58 |
|
@@ -57,46 +91,6 @@ def _set_process(self, process: List[RaisingThread]): |
57 | 91 | process.start() |
58 | 92 |
|
59 | 93 |
|
60 | | -class ExecutorSteps(ExecutorBase): |
61 | | - def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): |
62 | | - """ |
63 | | - Submits a callable to be executed with the given arguments. |
64 | | -
|
65 | | - Schedules the callable to be executed as fn(*args, **kwargs) and returns |
66 | | - a Future instance representing the execution of the callable. |
67 | | -
|
68 | | - Args: |
69 | | - fn (callable): function to submit for execution |
70 | | - args: arguments for the submitted function |
71 | | - kwargs: keyword arguments for the submitted function |
72 | | - resource_dict (dict): resource dictionary, which defines the resources used for the execution of the |
73 | | - function. Example resource dictionary: { |
74 | | - cores: 1, |
75 | | - threads_per_core: 1, |
76 | | - gpus_per_worker: 0, |
77 | | - oversubscribe: False, |
78 | | - cwd: None, |
79 | | - executor: None, |
80 | | - hostname_localhost: False, |
81 | | - } |
82 | | -
|
83 | | - Returns: |
84 | | - A Future representing the given call. |
85 | | - """ |
86 | | - check_resource_dict(function=fn) |
87 | | - f = Future() |
88 | | - self._future_queue.put( |
89 | | - { |
90 | | - "fn": fn, |
91 | | - "args": args, |
92 | | - "kwargs": kwargs, |
93 | | - "future": f, |
94 | | - "resource_dict": resource_dict, |
95 | | - } |
96 | | - ) |
97 | | - return f |
98 | | - |
99 | | - |
100 | 94 | class InteractiveExecutor(ExecutorBroker): |
101 | 95 | """ |
102 | 96 | The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib interfaces to distribute python |
@@ -151,7 +145,7 @@ def __init__( |
151 | 145 | ) |
152 | 146 |
|
153 | 147 |
|
154 | | -class InteractiveStepExecutor(ExecutorSteps): |
| 148 | +class InteractiveStepExecutor(ExecutorBase): |
155 | 149 | """ |
156 | 150 | The executorlib.interactive.executor.InteractiveStepExecutor leverages the executorlib interfaces to distribute python |
157 | 151 | tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveStepExecutor |
@@ -596,7 +590,10 @@ def _execute_task_with_cache( |
596 | 590 |
|
597 | 591 | future = task_dict["future"] |
598 | 592 | task_key, data_dict = serialize_funct_h5( |
599 | | - task_dict["fn"], *task_dict["args"], **task_dict["kwargs"] |
| 593 | + fn=task_dict["fn"], |
| 594 | + fn_args=task_dict["args"], |
| 595 | + fn_kwargs=task_dict["kwargs"], |
| 596 | + resource_dict=task_dict["resource_dict"], |
600 | 597 | ) |
601 | 598 | os.makedirs(cache_directory, exist_ok=True) |
602 | 599 | file_name = os.path.join(cache_directory, task_key + ".h5out") |
|
0 commit comments