diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index ce30c30a..59a42322 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -72,6 +72,7 @@ def future_queue(self) -> Optional[queue.Queue]: def submit( # type: ignore self, fn: Callable, + /, *args, resource_dict: Optional[dict] = None, **kwargs, @@ -178,3 +179,111 @@ def __del__(self): """ with contextlib.suppress(AttributeError, RuntimeError): self.shutdown(wait=False) + + +class ExecutorInterface(FutureExecutor): + """ + Interface class for the executor. + + Args: + executor (ExecutorBase): internal executor + """ + + def __init__(self, executor: ExecutorBase): + self._task_scheduler = executor + + @property + def max_workers(self) -> Optional[int]: + return self._task_scheduler.max_workers + + @max_workers.setter + def max_workers(self, max_workers: int): + self._task_scheduler.max_workers = max_workers + + @property + def info(self) -> Optional[dict]: + """ + Get the information about the executor. + + Returns: + Optional[dict]: Information about the executor. + """ + return self._task_scheduler.info + + @property + def future_queue(self) -> Optional[queue.Queue]: + """ + Get the future queue. + + Returns: + queue.Queue: The future queue. + """ + return self._task_scheduler.future_queue + + def submit( # type: ignore + self, + fn: Callable, + /, + *args, + resource_dict: Optional[dict] = None, + **kwargs, + ) -> Future: + """ + Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Args: + fn (callable): function to submit for execution + args: arguments for the submitted function + kwargs: keyword arguments for the submitted function + resource_dict (dict): resource dictionary, which defines the resources used for the execution of the + function. Example resource dictionary: { + cores: 1, + threads_per_core: 1, + gpus_per_worker: 0, + oversubscribe: False, + cwd: None, + executor: None, + hostname_localhost: False, + } + + Returns: + Future: A Future representing the given call. + """ + return self._task_scheduler.submit( + *([fn] + list(args)), resource_dict=resource_dict, **kwargs + ) + + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): + """ + Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait (bool): If True then shutdown will not return until all running + futures have finished executing and the resources used by the + parallel_executors have been reclaimed. + cancel_futures (bool): If True then shutdown will cancel all pending + futures. Futures that are completed or running will not be + cancelled. + """ + self._task_scheduler.shutdown(wait=wait, cancel_futures=cancel_futures) + + def __len__(self) -> int: + """ + Get the length of the executor. + + Returns: + int: The length of the executor. + """ + return len(self._task_scheduler) + + def __exit__(self, *args, **kwargs) -> None: + """ + Exit method called when exiting the context manager. + """ + self._task_scheduler.__exit__(*args, **kwargs) diff --git a/executorlib/interfaces/flux.py b/executorlib/interfaces/flux.py index b6b1c058..0d85b5b3 100644 --- a/executorlib/interfaces/flux.py +++ b/executorlib/interfaces/flux.py @@ -1,6 +1,7 @@ import contextlib from typing import Callable, Optional, Union +from executorlib.base.executor import ExecutorInterface from executorlib.interactive.blockallocation import BlockAllocationExecutor from executorlib.interactive.dependency import DependencyExecutor from executorlib.interactive.onetoone import OneTaskPerProcessExecutor @@ -21,7 +22,7 @@ ) -class FluxJobExecutor: +class FluxJobExecutor(ExecutorInterface): """ The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or preferable the flux framework for distributing python functions within a given resource allocation. In contrast to @@ -104,27 +105,6 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, - ): - # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. - pass - - def __new__( - cls, - max_workers: Optional[int] = None, - cache_directory: Optional[str] = None, - max_cores: Optional[int] = None, - resource_dict: Optional[dict] = None, - flux_executor=None, - flux_executor_pmi_mode: Optional[str] = None, - flux_executor_nesting: bool = False, - flux_log_files: bool = False, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, - disable_dependencies: bool = False, - refresh_rate: float = 0.01, - plot_dependency_graph: bool = False, - plot_dependency_graph_filename: Optional[str] = None, ): """ Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, @@ -186,7 +166,31 @@ def __new__( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) if not disable_dependencies: - return DependencyExecutor( + super().__init__( + executor=DependencyExecutor( + executor=create_flux_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + flux_executor=flux_executor, + flux_executor_pmi_mode=flux_executor_pmi_mode, + flux_executor_nesting=flux_executor_nesting, + flux_log_files=flux_log_files, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) + ) + else: + check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) + check_refresh_rate(refresh_rate=refresh_rate) + super().__init__( executor=create_flux_executor( max_workers=max_workers, cache_directory=cache_directory, @@ -199,31 +203,11 @@ def __new__( hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, - ), - max_cores=max_cores, - refresh_rate=refresh_rate, - plot_dependency_graph=plot_dependency_graph, - plot_dependency_graph_filename=plot_dependency_graph_filename, - ) - else: - check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) - check_refresh_rate(refresh_rate=refresh_rate) - return create_flux_executor( - max_workers=max_workers, - cache_directory=cache_directory, - max_cores=max_cores, - resource_dict=resource_dict, - flux_executor=flux_executor, - flux_executor_pmi_mode=flux_executor_pmi_mode, - flux_executor_nesting=flux_executor_nesting, - flux_log_files=flux_log_files, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, + ) ) -class FluxClusterExecutor: +class FluxClusterExecutor(ExecutorInterface): """ The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or preferable the flux framework for distributing python functions within a given resource allocation. In contrast to @@ -299,24 +283,6 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, - ): - # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. - pass - - def __new__( - cls, - max_workers: Optional[int] = None, - cache_directory: Optional[str] = None, - max_cores: Optional[int] = None, - resource_dict: Optional[dict] = None, - pysqa_config_directory: Optional[str] = None, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, - disable_dependencies: bool = False, - refresh_rate: float = 0.01, - plot_dependency_graph: bool = False, - plot_dependency_graph_filename: Optional[str] = None, ): """ Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, @@ -377,41 +343,45 @@ def __new__( if not plot_dependency_graph: from executorlib.cache.executor import create_file_executor - return create_file_executor( - max_workers=max_workers, - backend="flux_submission", - max_cores=max_cores, - cache_directory=cache_directory, - resource_dict=resource_dict, - flux_executor=None, - flux_executor_pmi_mode=None, - flux_executor_nesting=False, - flux_log_files=False, - pysqa_config_directory=pysqa_config_directory, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, - disable_dependencies=disable_dependencies, - ) - else: - return DependencyExecutor( - executor=create_flux_executor( + super().__init__( + executor=create_file_executor( max_workers=max_workers, - cache_directory=cache_directory, + backend="flux_submission", max_cores=max_cores, + cache_directory=cache_directory, resource_dict=resource_dict, flux_executor=None, flux_executor_pmi_mode=None, flux_executor_nesting=False, flux_log_files=False, + pysqa_config_directory=pysqa_config_directory, hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, - ), - max_cores=max_cores, - refresh_rate=refresh_rate, - plot_dependency_graph=plot_dependency_graph, - plot_dependency_graph_filename=plot_dependency_graph_filename, + disable_dependencies=disable_dependencies, + ) + ) + else: + super().__init__( + executor=DependencyExecutor( + executor=create_flux_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + flux_executor=None, + flux_executor_pmi_mode=None, + flux_executor_nesting=False, + flux_log_files=False, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) ) diff --git a/executorlib/interfaces/single.py b/executorlib/interfaces/single.py index e551d55b..667104a8 100644 --- a/executorlib/interfaces/single.py +++ b/executorlib/interfaces/single.py @@ -1,5 +1,6 @@ from typing import Callable, Optional, Union +from executorlib.base.executor import ExecutorInterface from executorlib.interactive.blockallocation import BlockAllocationExecutor from executorlib.interactive.dependency import DependencyExecutor from executorlib.interactive.onetoone import OneTaskPerProcessExecutor @@ -14,7 +15,7 @@ from executorlib.standalone.interactive.spawner import MpiExecSpawner -class SingleNodeExecutor: +class SingleNodeExecutor(ExecutorInterface): """ The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or preferable the flux framework for distributing python functions within a given resource allocation. In contrast to @@ -88,23 +89,6 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, - ): - # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. - pass - - def __new__( - cls, - max_workers: Optional[int] = None, - cache_directory: Optional[str] = None, - max_cores: Optional[int] = None, - resource_dict: Optional[dict] = None, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, - disable_dependencies: bool = False, - refresh_rate: float = 0.01, - plot_dependency_graph: bool = False, - plot_dependency_graph_filename: Optional[str] = None, ): """ Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, @@ -162,7 +146,27 @@ def __new__( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) if not disable_dependencies: - return DependencyExecutor( + super().__init__( + executor=DependencyExecutor( + executor=create_single_node_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) + ) + else: + check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) + check_refresh_rate(refresh_rate=refresh_rate) + super().__init__( executor=create_single_node_executor( max_workers=max_workers, cache_directory=cache_directory, @@ -171,23 +175,7 @@ def __new__( hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, - ), - max_cores=max_cores, - refresh_rate=refresh_rate, - plot_dependency_graph=plot_dependency_graph, - plot_dependency_graph_filename=plot_dependency_graph_filename, - ) - else: - check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) - check_refresh_rate(refresh_rate=refresh_rate) - return create_single_node_executor( - max_workers=max_workers, - cache_directory=cache_directory, - max_cores=max_cores, - resource_dict=resource_dict, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, + ) ) diff --git a/executorlib/interfaces/slurm.py b/executorlib/interfaces/slurm.py index bdb2ef82..d9365fd2 100644 --- a/executorlib/interfaces/slurm.py +++ b/executorlib/interfaces/slurm.py @@ -1,5 +1,6 @@ from typing import Callable, Optional, Union +from executorlib.base.executor import ExecutorInterface from executorlib.interactive.blockallocation import BlockAllocationExecutor from executorlib.interactive.dependency import DependencyExecutor from executorlib.interactive.onetoone import OneTaskPerProcessExecutor @@ -12,7 +13,7 @@ ) -class SlurmClusterExecutor: +class SlurmClusterExecutor(ExecutorInterface): """ The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or preferable the flux framework for distributing python functions within a given resource allocation. In contrast to @@ -88,24 +89,6 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, - ): - # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. - pass - - def __new__( - cls, - max_workers: Optional[int] = None, - cache_directory: Optional[str] = None, - max_cores: Optional[int] = None, - resource_dict: Optional[dict] = None, - pysqa_config_directory: Optional[str] = None, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, - disable_dependencies: bool = False, - refresh_rate: float = 0.01, - plot_dependency_graph: bool = False, - plot_dependency_graph_filename: Optional[str] = None, ): """ Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, @@ -166,41 +149,45 @@ def __new__( if not plot_dependency_graph: from executorlib.cache.executor import create_file_executor - return create_file_executor( - max_workers=max_workers, - backend="slurm_submission", - max_cores=max_cores, - cache_directory=cache_directory, - resource_dict=resource_dict, - flux_executor=None, - flux_executor_pmi_mode=None, - flux_executor_nesting=False, - flux_log_files=False, - pysqa_config_directory=pysqa_config_directory, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, - disable_dependencies=disable_dependencies, - ) - else: - return DependencyExecutor( - executor=create_slurm_executor( + super().__init__( + executor=create_file_executor( max_workers=max_workers, - cache_directory=cache_directory, + backend="slurm_submission", max_cores=max_cores, + cache_directory=cache_directory, resource_dict=resource_dict, + flux_executor=None, + flux_executor_pmi_mode=None, + flux_executor_nesting=False, + flux_log_files=False, + pysqa_config_directory=pysqa_config_directory, hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, - ), - max_cores=max_cores, - refresh_rate=refresh_rate, - plot_dependency_graph=plot_dependency_graph, - plot_dependency_graph_filename=plot_dependency_graph_filename, + disable_dependencies=disable_dependencies, + ) + ) + else: + super().__init__( + executor=DependencyExecutor( + executor=create_slurm_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) ) -class SlurmJobExecutor: +class SlurmJobExecutor(ExecutorInterface): """ The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or preferable the flux framework for distributing python functions within a given resource allocation. In contrast to @@ -278,23 +265,6 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, - ): - # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. - pass - - def __new__( - cls, - max_workers: Optional[int] = None, - cache_directory: Optional[str] = None, - max_cores: Optional[int] = None, - resource_dict: Optional[dict] = None, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, - disable_dependencies: bool = False, - refresh_rate: float = 0.01, - plot_dependency_graph: bool = False, - plot_dependency_graph_filename: Optional[str] = None, ): """ Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, @@ -356,7 +326,27 @@ def __new__( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) if not disable_dependencies: - return DependencyExecutor( + super().__init__( + executor=DependencyExecutor( + executor=create_slurm_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) + ) + else: + check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) + check_refresh_rate(refresh_rate=refresh_rate) + super().__init__( executor=create_slurm_executor( max_workers=max_workers, cache_directory=cache_directory, @@ -365,23 +355,7 @@ def __new__( hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, - ), - max_cores=max_cores, - refresh_rate=refresh_rate, - plot_dependency_graph=plot_dependency_graph, - plot_dependency_graph_filename=plot_dependency_graph_filename, - ) - else: - check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) - check_refresh_rate(refresh_rate=refresh_rate) - return create_slurm_executor( - max_workers=max_workers, - cache_directory=cache_directory, - max_cores=max_cores, - resource_dict=resource_dict, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, + ) ) diff --git a/notebooks/4-developer.ipynb b/notebooks/4-developer.ipynb index 14e3ed56..97f7b0c9 100644 --- a/notebooks/4-developer.ipynb +++ b/notebooks/4-developer.ipynb @@ -80,7 +80,7 @@ "Given the level of separation the integration of submodules from the standalone module in external software packages should be the easiest way to benefit from the developments in executorlib beyond just using the `Executor` class. \n", "\n", "## Interface Class Hierarchy\n", - "executorlib provides five different interfaces, namely `SingleNodeExecutor`, `SlurmClusterExecutor`, `SlurmJobExecutor`, `FluxClusterExecutor` and `FluxJobExecutor`, internally these are mapped to four types of `Executor` classes, namely `BlockAllocationExecutor`, `DependencyExecutor`, `FileExecutor` and `OneTaskPerProcessExecutor` depending on which options are selected. The dependence is illustrated in the following table:\n", + "executorlib provides five different interfaces, namely `SingleNodeExecutor`, `SlurmClusterExecutor`, `SlurmJobExecutor`, `FluxClusterExecutor` and `FluxJobExecutor`, internally these are mapped to four types of task schedulers `Executor._task_scheduler`, namely `BlockAllocationExecutor`, `DependencyExecutor`, `FileExecutor` and `OneTaskPerProcessExecutor` depending on which options are selected. The dependence is illustrated in the following table:\n", "\n", "| | `BlockAllocationExecutor` | `DependencyExecutor` | `FileExecutor` | `OneTaskPerProcessExecutor` |\n", "|-------------------------------------------------------------------------|---------------------------|--------------------------|----------------|-----------------------------|\n", diff --git a/tests/test_fluxjobexecutor_plot.py b/tests/test_fluxjobexecutor_plot.py index 1cc5ce54..74b920f6 100644 --- a/tests/test_fluxjobexecutor_plot.py +++ b/tests/test_fluxjobexecutor_plot.py @@ -53,12 +53,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._future_hash_dict), 2) - self.assertEqual(len(exe._task_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -98,12 +98,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._future_hash_dict), 7) - self.assertEqual(len(exe._task_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) @@ -124,12 +124,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._future_hash_dict), 2) - self.assertEqual(len(exe._task_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -167,12 +167,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._future_hash_dict), 7) - self.assertEqual(len(exe._task_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) diff --git a/tests/test_singlenodeexecutor_plot_dependency.py b/tests/test_singlenodeexecutor_plot_dependency.py index 8b8f4741..fd7c50ee 100644 --- a/tests/test_singlenodeexecutor_plot_dependency.py +++ b/tests/test_singlenodeexecutor_plot_dependency.py @@ -59,12 +59,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._future_hash_dict), 2) - self.assertEqual(len(exe._task_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -120,12 +120,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._future_hash_dict), 7) - self.assertEqual(len(exe._task_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) @@ -137,12 +137,12 @@ def test_future_input_dict(self): return_input_dict, input_dict={"a": exe.submit(sum, [2, 2])}, ) - self.assertEqual(len(exe._future_hash_dict), 2) - self.assertEqual(len(exe._task_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 4) @@ -165,12 +165,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._future_hash_dict), 2) - self.assertEqual(len(exe._task_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -210,12 +210,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._future_hash_dict), 7) - self.assertEqual(len(exe._task_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) @@ -236,12 +236,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._future_hash_dict), 2) - self.assertEqual(len(exe._task_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -279,12 +279,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._future_hash_dict), 7) - self.assertEqual(len(exe._task_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) diff --git a/tests/test_singlenodeexecutor_resize.py b/tests/test_singlenodeexecutor_resize.py index a33b3293..90589aa2 100644 --- a/tests/test_singlenodeexecutor_resize.py +++ b/tests/test_singlenodeexecutor_resize.py @@ -19,7 +19,7 @@ def test_without_dependencies_decrease(self): sleep_funct(sec=0.5) exe.max_workers = 1 self.assertTrue(len(exe) >= 1) - self.assertEqual(len(exe._process), 1) + self.assertEqual(len(exe._task_scheduler._process), 1) self.assertTrue(1 <= sum([f.done() for f in future_lst]) < 3) self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1]) self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) @@ -35,7 +35,7 @@ def test_without_dependencies_increase(self): exe.max_workers = 2 self.assertEqual(exe.max_workers, 2) self.assertTrue(len(exe) >= 1) - self.assertEqual(len(exe._process), 2) + self.assertEqual(len(exe._task_scheduler._process), 2) self.assertEqual([f.done() for f in future_lst], [True, False, False, False]) self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1]) self.assertEqual([f.done() for f in future_lst], [True, True, True, True])