From 1a53651245c76be87c48a0d52a06d08053b7e26f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 26 Apr 2025 07:01:53 +0200 Subject: [PATCH 01/14] Interfaces: Use Base Interface rather than factory pattern --- executorlib/base/executor.py | 104 ++++++++++++++++++++++ executorlib/interfaces/flux.py | 144 ++++++++++++------------------- executorlib/interfaces/single.py | 60 ++++++------- executorlib/interfaces/slurm.py | 134 ++++++++++++---------------- 4 files changed, 239 insertions(+), 203 deletions(-) diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index ce30c30a..853e57d9 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -178,3 +178,107 @@ def __del__(self): """ with contextlib.suppress(AttributeError, RuntimeError): self.shutdown(wait=False) + + +class ExecutorInterface(FutureExecutor): + """ + Interface class for the executor. + + Args: + executor (ExecutorBase): internal executor + """ + def __init__(self, executor: ExecutorBase): + self._executor = executor + + @property + def max_workers(self) -> Optional[int]: + return self._executor.max_workers + + @max_workers.setter + def max_workers(self, max_workers: int): + self._executor.max_workers = max_workers + + @property + def info(self) -> Optional[dict]: + """ + Get the information about the executor. + + Returns: + Optional[dict]: Information about the executor. + """ + return self._executor.info + + @property + def future_queue(self) -> Optional[queue.Queue]: + """ + Get the future queue. + + Returns: + queue.Queue: The future queue. + """ + return self._executor.future_queue + + def submit( # type: ignore + self, + fn: Callable, + *args, + resource_dict: Optional[dict] = None, + **kwargs, + ) -> Future: + """ + Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Args: + fn (callable): function to submit for execution + args: arguments for the submitted function + kwargs: keyword arguments for the submitted function + resource_dict (dict): resource dictionary, which defines the resources used for the execution of the + function. Example resource dictionary: { + cores: 1, + threads_per_core: 1, + gpus_per_worker: 0, + oversubscribe: False, + cwd: None, + executor: None, + hostname_localhost: False, + } + + Returns: + Future: A Future representing the given call. + """ + return self._executor.submit(fn=fn, *args, resource_dict=resource_dict, **kwargs) + + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): + """ + Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait (bool): If True then shutdown will not return until all running + futures have finished executing and the resources used by the + parallel_executors have been reclaimed. + cancel_futures (bool): If True then shutdown will cancel all pending + futures. Futures that are completed or running will not be + cancelled. + """ + self._executor.shutdown(wait=wait, cancel_futures=cancel_futures) + + def __len__(self) -> int: + """ + Get the length of the executor. + + Returns: + int: The length of the executor. + """ + return len(self._executor) + + def __del__(self): + """ + Clean-up the resources associated with the Executor. + """ + del self._executor diff --git a/executorlib/interfaces/flux.py b/executorlib/interfaces/flux.py index b6b1c058..fa9a7787 100644 --- a/executorlib/interfaces/flux.py +++ b/executorlib/interfaces/flux.py @@ -1,6 +1,7 @@ import contextlib from typing import Callable, Optional, Union +from executorlib.base.executor import ExecutorInterface from executorlib.interactive.blockallocation import BlockAllocationExecutor from executorlib.interactive.dependency import DependencyExecutor from executorlib.interactive.onetoone import OneTaskPerProcessExecutor @@ -21,7 +22,7 @@ ) -class FluxJobExecutor: +class FluxJobExecutor(ExecutorInterface): """ The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or preferable the flux framework for distributing python functions within a given resource allocation. In contrast to @@ -104,27 +105,6 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, - ): - # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. - pass - - def __new__( - cls, - max_workers: Optional[int] = None, - cache_directory: Optional[str] = None, - max_cores: Optional[int] = None, - resource_dict: Optional[dict] = None, - flux_executor=None, - flux_executor_pmi_mode: Optional[str] = None, - flux_executor_nesting: bool = False, - flux_log_files: bool = False, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, - disable_dependencies: bool = False, - refresh_rate: float = 0.01, - plot_dependency_graph: bool = False, - plot_dependency_graph_filename: Optional[str] = None, ): """ Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, @@ -186,7 +166,31 @@ def __new__( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) if not disable_dependencies: - return DependencyExecutor( + super().__init__( + executor=DependencyExecutor( + executor=create_flux_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + flux_executor=flux_executor, + flux_executor_pmi_mode=flux_executor_pmi_mode, + flux_executor_nesting=flux_executor_nesting, + flux_log_files=flux_log_files, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) + ) + else: + check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) + check_refresh_rate(refresh_rate=refresh_rate) + super().__init__( executor=create_flux_executor( max_workers=max_workers, cache_directory=cache_directory, @@ -199,27 +203,7 @@ def __new__( hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, - ), - max_cores=max_cores, - refresh_rate=refresh_rate, - plot_dependency_graph=plot_dependency_graph, - plot_dependency_graph_filename=plot_dependency_graph_filename, - ) - else: - check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) - check_refresh_rate(refresh_rate=refresh_rate) - return create_flux_executor( - max_workers=max_workers, - cache_directory=cache_directory, - max_cores=max_cores, - resource_dict=resource_dict, - flux_executor=flux_executor, - flux_executor_pmi_mode=flux_executor_pmi_mode, - flux_executor_nesting=flux_executor_nesting, - flux_log_files=flux_log_files, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, + ) ) @@ -299,24 +283,6 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, - ): - # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. - pass - - def __new__( - cls, - max_workers: Optional[int] = None, - cache_directory: Optional[str] = None, - max_cores: Optional[int] = None, - resource_dict: Optional[dict] = None, - pysqa_config_directory: Optional[str] = None, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, - disable_dependencies: bool = False, - refresh_rate: float = 0.01, - plot_dependency_graph: bool = False, - plot_dependency_graph_filename: Optional[str] = None, ): """ Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, @@ -377,41 +343,45 @@ def __new__( if not plot_dependency_graph: from executorlib.cache.executor import create_file_executor - return create_file_executor( - max_workers=max_workers, - backend="flux_submission", - max_cores=max_cores, - cache_directory=cache_directory, - resource_dict=resource_dict, - flux_executor=None, - flux_executor_pmi_mode=None, - flux_executor_nesting=False, - flux_log_files=False, - pysqa_config_directory=pysqa_config_directory, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, - disable_dependencies=disable_dependencies, - ) - else: - return DependencyExecutor( - executor=create_flux_executor( + super().__init__( + executor=create_file_executor( max_workers=max_workers, - cache_directory=cache_directory, + backend="flux_submission", max_cores=max_cores, + cache_directory=cache_directory, resource_dict=resource_dict, flux_executor=None, flux_executor_pmi_mode=None, flux_executor_nesting=False, flux_log_files=False, + pysqa_config_directory=pysqa_config_directory, hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, - ), - max_cores=max_cores, - refresh_rate=refresh_rate, - plot_dependency_graph=plot_dependency_graph, - plot_dependency_graph_filename=plot_dependency_graph_filename, + disable_dependencies=disable_dependencies, + ) + ) + else: + super().__init__( + executor=DependencyExecutor( + executor=create_flux_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + flux_executor=None, + flux_executor_pmi_mode=None, + flux_executor_nesting=False, + flux_log_files=False, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) ) diff --git a/executorlib/interfaces/single.py b/executorlib/interfaces/single.py index e551d55b..667104a8 100644 --- a/executorlib/interfaces/single.py +++ b/executorlib/interfaces/single.py @@ -1,5 +1,6 @@ from typing import Callable, Optional, Union +from executorlib.base.executor import ExecutorInterface from executorlib.interactive.blockallocation import BlockAllocationExecutor from executorlib.interactive.dependency import DependencyExecutor from executorlib.interactive.onetoone import OneTaskPerProcessExecutor @@ -14,7 +15,7 @@ from executorlib.standalone.interactive.spawner import MpiExecSpawner -class SingleNodeExecutor: +class SingleNodeExecutor(ExecutorInterface): """ The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or preferable the flux framework for distributing python functions within a given resource allocation. In contrast to @@ -88,23 +89,6 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, - ): - # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. - pass - - def __new__( - cls, - max_workers: Optional[int] = None, - cache_directory: Optional[str] = None, - max_cores: Optional[int] = None, - resource_dict: Optional[dict] = None, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, - disable_dependencies: bool = False, - refresh_rate: float = 0.01, - plot_dependency_graph: bool = False, - plot_dependency_graph_filename: Optional[str] = None, ): """ Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, @@ -162,7 +146,27 @@ def __new__( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) if not disable_dependencies: - return DependencyExecutor( + super().__init__( + executor=DependencyExecutor( + executor=create_single_node_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) + ) + else: + check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) + check_refresh_rate(refresh_rate=refresh_rate) + super().__init__( executor=create_single_node_executor( max_workers=max_workers, cache_directory=cache_directory, @@ -171,23 +175,7 @@ def __new__( hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, - ), - max_cores=max_cores, - refresh_rate=refresh_rate, - plot_dependency_graph=plot_dependency_graph, - plot_dependency_graph_filename=plot_dependency_graph_filename, - ) - else: - check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) - check_refresh_rate(refresh_rate=refresh_rate) - return create_single_node_executor( - max_workers=max_workers, - cache_directory=cache_directory, - max_cores=max_cores, - resource_dict=resource_dict, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, + ) ) diff --git a/executorlib/interfaces/slurm.py b/executorlib/interfaces/slurm.py index bdb2ef82..d9365fd2 100644 --- a/executorlib/interfaces/slurm.py +++ b/executorlib/interfaces/slurm.py @@ -1,5 +1,6 @@ from typing import Callable, Optional, Union +from executorlib.base.executor import ExecutorInterface from executorlib.interactive.blockallocation import BlockAllocationExecutor from executorlib.interactive.dependency import DependencyExecutor from executorlib.interactive.onetoone import OneTaskPerProcessExecutor @@ -12,7 +13,7 @@ ) -class SlurmClusterExecutor: +class SlurmClusterExecutor(ExecutorInterface): """ The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or preferable the flux framework for distributing python functions within a given resource allocation. In contrast to @@ -88,24 +89,6 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, - ): - # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. - pass - - def __new__( - cls, - max_workers: Optional[int] = None, - cache_directory: Optional[str] = None, - max_cores: Optional[int] = None, - resource_dict: Optional[dict] = None, - pysqa_config_directory: Optional[str] = None, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, - disable_dependencies: bool = False, - refresh_rate: float = 0.01, - plot_dependency_graph: bool = False, - plot_dependency_graph_filename: Optional[str] = None, ): """ Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, @@ -166,41 +149,45 @@ def __new__( if not plot_dependency_graph: from executorlib.cache.executor import create_file_executor - return create_file_executor( - max_workers=max_workers, - backend="slurm_submission", - max_cores=max_cores, - cache_directory=cache_directory, - resource_dict=resource_dict, - flux_executor=None, - flux_executor_pmi_mode=None, - flux_executor_nesting=False, - flux_log_files=False, - pysqa_config_directory=pysqa_config_directory, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, - disable_dependencies=disable_dependencies, - ) - else: - return DependencyExecutor( - executor=create_slurm_executor( + super().__init__( + executor=create_file_executor( max_workers=max_workers, - cache_directory=cache_directory, + backend="slurm_submission", max_cores=max_cores, + cache_directory=cache_directory, resource_dict=resource_dict, + flux_executor=None, + flux_executor_pmi_mode=None, + flux_executor_nesting=False, + flux_log_files=False, + pysqa_config_directory=pysqa_config_directory, hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, - ), - max_cores=max_cores, - refresh_rate=refresh_rate, - plot_dependency_graph=plot_dependency_graph, - plot_dependency_graph_filename=plot_dependency_graph_filename, + disable_dependencies=disable_dependencies, + ) + ) + else: + super().__init__( + executor=DependencyExecutor( + executor=create_slurm_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) ) -class SlurmJobExecutor: +class SlurmJobExecutor(ExecutorInterface): """ The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or preferable the flux framework for distributing python functions within a given resource allocation. In contrast to @@ -278,23 +265,6 @@ def __init__( refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, - ): - # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. - pass - - def __new__( - cls, - max_workers: Optional[int] = None, - cache_directory: Optional[str] = None, - max_cores: Optional[int] = None, - resource_dict: Optional[dict] = None, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, - disable_dependencies: bool = False, - refresh_rate: float = 0.01, - plot_dependency_graph: bool = False, - plot_dependency_graph_filename: Optional[str] = None, ): """ Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, @@ -356,7 +326,27 @@ def __new__( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) if not disable_dependencies: - return DependencyExecutor( + super().__init__( + executor=DependencyExecutor( + executor=create_slurm_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) + ) + else: + check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) + check_refresh_rate(refresh_rate=refresh_rate) + super().__init__( executor=create_slurm_executor( max_workers=max_workers, cache_directory=cache_directory, @@ -365,23 +355,7 @@ def __new__( hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, - ), - max_cores=max_cores, - refresh_rate=refresh_rate, - plot_dependency_graph=plot_dependency_graph, - plot_dependency_graph_filename=plot_dependency_graph_filename, - ) - else: - check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) - check_refresh_rate(refresh_rate=refresh_rate) - return create_slurm_executor( - max_workers=max_workers, - cache_directory=cache_directory, - max_cores=max_cores, - resource_dict=resource_dict, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, + ) ) From ad7ed14c4e723940c2a52d87c85d3f54c67e6a6d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 26 Apr 2025 05:02:28 +0000 Subject: [PATCH 02/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/base/executor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index 853e57d9..67b52a73 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -187,6 +187,7 @@ class ExecutorInterface(FutureExecutor): Args: executor (ExecutorBase): internal executor """ + def __init__(self, executor: ExecutorBase): self._executor = executor @@ -249,7 +250,9 @@ def submit( # type: ignore Returns: Future: A Future representing the given call. """ - return self._executor.submit(fn=fn, *args, resource_dict=resource_dict, **kwargs) + return self._executor.submit( + fn=fn, *args, resource_dict=resource_dict, **kwargs + ) def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): """ From c25422257e699dd03ed276353b905c2b3081b2d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 26 Apr 2025 07:05:23 +0200 Subject: [PATCH 03/14] special character from concurrent futures --- executorlib/base/executor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index 853e57d9..2609700b 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -72,6 +72,7 @@ def future_queue(self) -> Optional[queue.Queue]: def submit( # type: ignore self, fn: Callable, + /, *args, resource_dict: Optional[dict] = None, **kwargs, @@ -221,6 +222,7 @@ def future_queue(self) -> Optional[queue.Queue]: def submit( # type: ignore self, fn: Callable, + /, *args, resource_dict: Optional[dict] = None, **kwargs, From fc003829ae7f271fa533b32e671f49896cc367ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 26 Apr 2025 07:07:22 +0200 Subject: [PATCH 04/14] fix mypy --- executorlib/interfaces/flux.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/interfaces/flux.py b/executorlib/interfaces/flux.py index fa9a7787..0d85b5b3 100644 --- a/executorlib/interfaces/flux.py +++ b/executorlib/interfaces/flux.py @@ -207,7 +207,7 @@ def __init__( ) -class FluxClusterExecutor: +class FluxClusterExecutor(ExecutorInterface): """ The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or preferable the flux framework for distributing python functions within a given resource allocation. In contrast to From 5f664194cce83ae11c1ccb686d7950559d642332 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 26 Apr 2025 07:18:56 +0200 Subject: [PATCH 05/14] another workaround --- executorlib/base/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index c646a528..2e6ee8b7 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -253,7 +253,7 @@ def submit( # type: ignore Future: A Future representing the given call. """ return self._executor.submit( - fn=fn, *args, resource_dict=resource_dict, **kwargs + *([fn] + list(args)), resource_dict=resource_dict, **kwargs ) def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): From 295c48a16cab3065e67d02033550caee1425f7a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 26 Apr 2025 07:21:59 +0200 Subject: [PATCH 06/14] fix resize test --- tests/test_singlenodeexecutor_resize.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_singlenodeexecutor_resize.py b/tests/test_singlenodeexecutor_resize.py index a33b3293..c0a30924 100644 --- a/tests/test_singlenodeexecutor_resize.py +++ b/tests/test_singlenodeexecutor_resize.py @@ -19,7 +19,7 @@ def test_without_dependencies_decrease(self): sleep_funct(sec=0.5) exe.max_workers = 1 self.assertTrue(len(exe) >= 1) - self.assertEqual(len(exe._process), 1) + self.assertEqual(len(exe._executor._process), 1) self.assertTrue(1 <= sum([f.done() for f in future_lst]) < 3) self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1]) self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) @@ -35,7 +35,7 @@ def test_without_dependencies_increase(self): exe.max_workers = 2 self.assertEqual(exe.max_workers, 2) self.assertTrue(len(exe) >= 1) - self.assertEqual(len(exe._process), 2) + self.assertEqual(len(exe._executor._process), 2) self.assertEqual([f.done() for f in future_lst], [True, False, False, False]) self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1]) self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) From a7018b0588fe6f72c6a56ee5c85b1b2c864a8d1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 26 Apr 2025 07:25:21 +0200 Subject: [PATCH 07/14] fix plot test --- ...test_singlenodeexecutor_plot_dependency.py | 56 +++++++++---------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/tests/test_singlenodeexecutor_plot_dependency.py b/tests/test_singlenodeexecutor_plot_dependency.py index 8b8f4741..a3c23d2a 100644 --- a/tests/test_singlenodeexecutor_plot_dependency.py +++ b/tests/test_singlenodeexecutor_plot_dependency.py @@ -59,12 +59,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._future_hash_dict), 2) - self.assertEqual(len(exe._task_hash_dict), 2) + self.assertEqual(len(exe._executor._future_hash_dict), 2) + self.assertEqual(len(exe._executor._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._executor._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._executor._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -120,12 +120,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._future_hash_dict), 7) - self.assertEqual(len(exe._task_hash_dict), 7) + self.assertEqual(len(exe._executor._future_hash_dict), 7) + self.assertEqual(len(exe._executor._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._executor._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._executor._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) @@ -137,12 +137,12 @@ def test_future_input_dict(self): return_input_dict, input_dict={"a": exe.submit(sum, [2, 2])}, ) - self.assertEqual(len(exe._future_hash_dict), 2) - self.assertEqual(len(exe._task_hash_dict), 2) + self.assertEqual(len(exe._executor._future_hash_dict), 2) + self.assertEqual(len(exe._executor._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._executor._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._executor._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 4) @@ -165,12 +165,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._future_hash_dict), 2) - self.assertEqual(len(exe._task_hash_dict), 2) + self.assertEqual(len(exe._executor._future_hash_dict), 2) + self.assertEqual(len(exe._executor._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._executor._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._executor._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -210,12 +210,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._future_hash_dict), 7) - self.assertEqual(len(exe._task_hash_dict), 7) + self.assertEqual(len(exe._executor._future_hash_dict), 7) + self.assertEqual(len(exe._executor._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._executor._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._executor._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) @@ -236,12 +236,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._future_hash_dict), 2) - self.assertEqual(len(exe._task_hash_dict), 2) + self.assertEqual(len(exe._executor._future_hash_dict), 2) + self.assertEqual(len(exe._executor._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._executor._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._executor._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -279,12 +279,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._future_hash_dict), 7) - self.assertEqual(len(exe._task_hash_dict), 7) + self.assertEqual(len(exe._executor._future_hash_dict), 7) + self.assertEqual(len(exe._executor._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._executor._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._executor._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) From 7e4b84abc853db628ad687ef95bc28f3fd53f2e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 26 Apr 2025 07:26:40 +0200 Subject: [PATCH 08/14] remove del --- executorlib/base/executor.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index 2e6ee8b7..750d01b9 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -281,9 +281,3 @@ def __len__(self) -> int: int: The length of the executor. """ return len(self._executor) - - def __del__(self): - """ - Clean-up the resources associated with the Executor. - """ - del self._executor From 8c50b33cd46d706f36fe7b66f722a135da13eec1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 26 Apr 2025 07:59:29 +0200 Subject: [PATCH 09/14] fix exit issue --- executorlib/base/executor.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index 750d01b9..9d0e5b90 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -7,7 +7,7 @@ Future, ) from threading import Thread -from typing import Callable, Optional, Union +from typing import Any, Callable, Optional, Union from executorlib.standalone.inputcheck import check_resource_dict from executorlib.standalone.queue import cancel_items_in_queue @@ -281,3 +281,20 @@ def __len__(self) -> int: int: The length of the executor. """ return len(self._executor) + + def __exit__( + self, + exc_type: Any, + exc_val: Any, + exc_tb: Any, + ) -> None: + """ + Exit method called when exiting the context manager. + + Args: + exc_type: The type of the exception. + exc_val: The exception instance. + exc_tb: The traceback object. + + """ + self._executor.__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb) \ No newline at end of file From 5be646b6a98733ea283871eb34b0789ed6ab04d7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 26 Apr 2025 05:59:40 +0000 Subject: [PATCH 10/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/base/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index 9d0e5b90..4c1c18c4 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -297,4 +297,4 @@ def __exit__( exc_tb: The traceback object. """ - self._executor.__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb) \ No newline at end of file + self._executor.__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb) From 234623c48321bc8daffd554c18212fa43f70a7ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 26 Apr 2025 08:05:38 +0200 Subject: [PATCH 11/14] fixes --- executorlib/base/executor.py | 15 ++------------ tests/test_fluxjobexecutor_plot.py | 32 +++++++++++++++--------------- 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index 4c1c18c4..2157d344 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -282,19 +282,8 @@ def __len__(self) -> int: """ return len(self._executor) - def __exit__( - self, - exc_type: Any, - exc_val: Any, - exc_tb: Any, - ) -> None: + def __exit__(self, *args, **kwargs) -> None: """ Exit method called when exiting the context manager. - - Args: - exc_type: The type of the exception. - exc_val: The exception instance. - exc_tb: The traceback object. - """ - self._executor.__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb) + self._executor.__exit__(*args, **kwargs) diff --git a/tests/test_fluxjobexecutor_plot.py b/tests/test_fluxjobexecutor_plot.py index 1cc5ce54..de5b3a6d 100644 --- a/tests/test_fluxjobexecutor_plot.py +++ b/tests/test_fluxjobexecutor_plot.py @@ -53,12 +53,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._future_hash_dict), 2) - self.assertEqual(len(exe._task_hash_dict), 2) + self.assertEqual(len(exe._executor._future_hash_dict), 2) + self.assertEqual(len(exe._executor._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._executor._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._executor._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -98,12 +98,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._future_hash_dict), 7) - self.assertEqual(len(exe._task_hash_dict), 7) + self.assertEqual(len(exe._executor._future_hash_dict), 7) + self.assertEqual(len(exe._executor._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._executor._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._executor._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) @@ -124,12 +124,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._future_hash_dict), 2) - self.assertEqual(len(exe._task_hash_dict), 2) + self.assertEqual(len(exe._executor._future_hash_dict), 2) + self.assertEqual(len(exe._executor._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._executor._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._executor._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -167,12 +167,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._future_hash_dict), 7) - self.assertEqual(len(exe._task_hash_dict), 7) + self.assertEqual(len(exe._executor._future_hash_dict), 7) + self.assertEqual(len(exe._executor._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, + task_hash_dict=exe._executor._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() + v: k for k, v in exe._executor._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) From b451a6d87cff7f6cb6277f4a4c727f5cc258bd51 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 26 Apr 2025 06:05:48 +0000 Subject: [PATCH 12/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/base/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index 2157d344..0921fd28 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -7,7 +7,7 @@ Future, ) from threading import Thread -from typing import Any, Callable, Optional, Union +from typing import Callable, Optional, Union from executorlib.standalone.inputcheck import check_resource_dict from executorlib.standalone.queue import cancel_items_in_queue From e53a6ca7a1a11d724aa357e1c6d625ce5bb507b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 26 Apr 2025 08:09:31 +0200 Subject: [PATCH 13/14] Rename internal variable to _task_scheduler --- executorlib/base/executor.py | 18 +++--- tests/test_fluxjobexecutor_plot.py | 32 +++++------ ...test_singlenodeexecutor_plot_dependency.py | 56 +++++++++---------- tests/test_singlenodeexecutor_resize.py | 4 +- 4 files changed, 55 insertions(+), 55 deletions(-) diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index 0921fd28..59a42322 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -190,15 +190,15 @@ class ExecutorInterface(FutureExecutor): """ def __init__(self, executor: ExecutorBase): - self._executor = executor + self._task_scheduler = executor @property def max_workers(self) -> Optional[int]: - return self._executor.max_workers + return self._task_scheduler.max_workers @max_workers.setter def max_workers(self, max_workers: int): - self._executor.max_workers = max_workers + self._task_scheduler.max_workers = max_workers @property def info(self) -> Optional[dict]: @@ -208,7 +208,7 @@ def info(self) -> Optional[dict]: Returns: Optional[dict]: Information about the executor. """ - return self._executor.info + return self._task_scheduler.info @property def future_queue(self) -> Optional[queue.Queue]: @@ -218,7 +218,7 @@ def future_queue(self) -> Optional[queue.Queue]: Returns: queue.Queue: The future queue. """ - return self._executor.future_queue + return self._task_scheduler.future_queue def submit( # type: ignore self, @@ -252,7 +252,7 @@ def submit( # type: ignore Returns: Future: A Future representing the given call. """ - return self._executor.submit( + return self._task_scheduler.submit( *([fn] + list(args)), resource_dict=resource_dict, **kwargs ) @@ -271,7 +271,7 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): futures. Futures that are completed or running will not be cancelled. """ - self._executor.shutdown(wait=wait, cancel_futures=cancel_futures) + self._task_scheduler.shutdown(wait=wait, cancel_futures=cancel_futures) def __len__(self) -> int: """ @@ -280,10 +280,10 @@ def __len__(self) -> int: Returns: int: The length of the executor. """ - return len(self._executor) + return len(self._task_scheduler) def __exit__(self, *args, **kwargs) -> None: """ Exit method called when exiting the context manager. """ - self._executor.__exit__(*args, **kwargs) + self._task_scheduler.__exit__(*args, **kwargs) diff --git a/tests/test_fluxjobexecutor_plot.py b/tests/test_fluxjobexecutor_plot.py index de5b3a6d..74b920f6 100644 --- a/tests/test_fluxjobexecutor_plot.py +++ b/tests/test_fluxjobexecutor_plot.py @@ -53,12 +53,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._executor._future_hash_dict), 2) - self.assertEqual(len(exe._executor._task_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._executor._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._executor._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -98,12 +98,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._executor._future_hash_dict), 7) - self.assertEqual(len(exe._executor._task_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._executor._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._executor._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) @@ -124,12 +124,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._executor._future_hash_dict), 2) - self.assertEqual(len(exe._executor._task_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._executor._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._executor._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -167,12 +167,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._executor._future_hash_dict), 7) - self.assertEqual(len(exe._executor._task_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._executor._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._executor._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) diff --git a/tests/test_singlenodeexecutor_plot_dependency.py b/tests/test_singlenodeexecutor_plot_dependency.py index a3c23d2a..fd7c50ee 100644 --- a/tests/test_singlenodeexecutor_plot_dependency.py +++ b/tests/test_singlenodeexecutor_plot_dependency.py @@ -59,12 +59,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._executor._future_hash_dict), 2) - self.assertEqual(len(exe._executor._task_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._executor._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._executor._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -120,12 +120,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._executor._future_hash_dict), 7) - self.assertEqual(len(exe._executor._task_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._executor._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._executor._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) @@ -137,12 +137,12 @@ def test_future_input_dict(self): return_input_dict, input_dict={"a": exe.submit(sum, [2, 2])}, ) - self.assertEqual(len(exe._executor._future_hash_dict), 2) - self.assertEqual(len(exe._executor._task_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._executor._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._executor._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 4) @@ -165,12 +165,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._executor._future_hash_dict), 2) - self.assertEqual(len(exe._executor._task_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._executor._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._executor._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -210,12 +210,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._executor._future_hash_dict), 7) - self.assertEqual(len(exe._executor._task_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._executor._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._executor._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) @@ -236,12 +236,12 @@ def test_executor_dependency_plot(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) - self.assertEqual(len(exe._executor._future_hash_dict), 2) - self.assertEqual(len(exe._executor._task_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._executor._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._executor._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 5) @@ -279,12 +279,12 @@ def test_many_to_one_plot(self): for l in lst: self.assertTrue(l.done()) self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._executor._future_hash_dict), 7) - self.assertEqual(len(exe._executor._task_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 7) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 7) nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._executor._task_hash_dict, + task_hash_dict=exe._task_scheduler._task_hash_dict, future_hash_inverse_dict={ - v: k for k, v in exe._executor._future_hash_dict.items() + v: k for k, v in exe._task_scheduler._future_hash_dict.items() }, ) self.assertEqual(len(nodes), 19) diff --git a/tests/test_singlenodeexecutor_resize.py b/tests/test_singlenodeexecutor_resize.py index c0a30924..90589aa2 100644 --- a/tests/test_singlenodeexecutor_resize.py +++ b/tests/test_singlenodeexecutor_resize.py @@ -19,7 +19,7 @@ def test_without_dependencies_decrease(self): sleep_funct(sec=0.5) exe.max_workers = 1 self.assertTrue(len(exe) >= 1) - self.assertEqual(len(exe._executor._process), 1) + self.assertEqual(len(exe._task_scheduler._process), 1) self.assertTrue(1 <= sum([f.done() for f in future_lst]) < 3) self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1]) self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) @@ -35,7 +35,7 @@ def test_without_dependencies_increase(self): exe.max_workers = 2 self.assertEqual(exe.max_workers, 2) self.assertTrue(len(exe) >= 1) - self.assertEqual(len(exe._executor._process), 2) + self.assertEqual(len(exe._task_scheduler._process), 2) self.assertEqual([f.done() for f in future_lst], [True, False, False, False]) self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1]) self.assertEqual([f.done() for f in future_lst], [True, True, True, True]) From 6a708e8ba0b59d3bcda3d71084ba0a0e1d9f42c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 26 Apr 2025 08:29:16 +0200 Subject: [PATCH 14/14] update Documentation --- notebooks/4-developer.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/notebooks/4-developer.ipynb b/notebooks/4-developer.ipynb index 14e3ed56..97f7b0c9 100644 --- a/notebooks/4-developer.ipynb +++ b/notebooks/4-developer.ipynb @@ -80,7 +80,7 @@ "Given the level of separation the integration of submodules from the standalone module in external software packages should be the easiest way to benefit from the developments in executorlib beyond just using the `Executor` class. \n", "\n", "## Interface Class Hierarchy\n", - "executorlib provides five different interfaces, namely `SingleNodeExecutor`, `SlurmClusterExecutor`, `SlurmJobExecutor`, `FluxClusterExecutor` and `FluxJobExecutor`, internally these are mapped to four types of `Executor` classes, namely `BlockAllocationExecutor`, `DependencyExecutor`, `FileExecutor` and `OneTaskPerProcessExecutor` depending on which options are selected. The dependence is illustrated in the following table:\n", + "executorlib provides five different interfaces, namely `SingleNodeExecutor`, `SlurmClusterExecutor`, `SlurmJobExecutor`, `FluxClusterExecutor` and `FluxJobExecutor`, internally these are mapped to four types of task schedulers `Executor._task_scheduler`, namely `BlockAllocationExecutor`, `DependencyExecutor`, `FileExecutor` and `OneTaskPerProcessExecutor` depending on which options are selected. The dependence is illustrated in the following table:\n", "\n", "| | `BlockAllocationExecutor` | `DependencyExecutor` | `FileExecutor` | `OneTaskPerProcessExecutor` |\n", "|-------------------------------------------------------------------------|---------------------------|--------------------------|----------------|-----------------------------|\n",