From 65e88d013d52fb4a6b5d97445ebde09e3f048730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 15 Feb 2025 10:06:36 +0100 Subject: [PATCH 1/4] Attach process kwargs to Executor --- executorlib/base/executor.py | 10 ++++------ executorlib/cache/executor.py | 21 +++++++++++---------- executorlib/interactive/executor.py | 14 +++++++------- executorlib/interactive/shared.py | 2 ++ 4 files changed, 24 insertions(+), 23 deletions(-) diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index 199a8841..d9efb7a3 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -27,6 +27,7 @@ def __init__(self, max_cores: Optional[int] = None): Initialize the ExecutorBase class. """ cloudpickle_register(ind=3) + self._process_kwargs = {} self._max_cores = max_cores self._future_queue: Optional[queue.Queue] = queue.Queue() self._process: Optional[Union[Thread, list[Thread]]] = None @@ -39,16 +40,13 @@ def info(self) -> Optional[dict]: Returns: Optional[dict]: Information about the executor. """ + meta_data_dict = self._process_kwargs.copy() + if "future_queue" in meta_data_dict: + del meta_data_dict["future_queue"] if self._process is not None and isinstance(self._process, list): - meta_data_dict = self._process[0]._kwargs.copy() # type: ignore - if "future_queue" in meta_data_dict: - del meta_data_dict["future_queue"] meta_data_dict["max_workers"] = len(self._process) return meta_data_dict elif self._process is not None: - meta_data_dict = self._process._kwargs.copy() # type: ignore - if "future_queue" in meta_data_dict: - del meta_data_dict["future_queue"] return meta_data_dict else: return None diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 36b29d26..27b18f8e 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -63,19 +63,20 @@ def __init__( terminate_function = terminate_subprocess cache_directory_path = os.path.abspath(cache_directory) os.makedirs(cache_directory_path, exist_ok=True) + self._process_kwargs = { + "future_queue": self._future_queue, + "execute_function": execute_function, + "cache_directory": cache_directory_path, + "resource_dict": resource_dict, + "terminate_function": terminate_function, + "pysqa_config_directory": pysqa_config_directory, + "backend": backend, + "disable_dependencies": disable_dependencies, + } self._set_process( Thread( target=execute_tasks_h5, - kwargs={ - "future_queue": self._future_queue, - "execute_function": execute_function, - "cache_directory": cache_directory_path, - "resource_dict": resource_dict, - "terminate_function": terminate_function, - "pysqa_config_directory": pysqa_config_directory, - "backend": backend, - "disable_dependencies": disable_dependencies, - }, + kwargs=self._kwargs, ) ) diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 8d46b1bc..105799b8 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -40,16 +40,16 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, ) -> None: super().__init__(max_cores=max_cores) + self._process_kwargs = { + "future_queue": self._future_queue, + "executor_queue": executor._future_queue, + "executor": executor, + "refresh_rate": refresh_rate, + } self._set_process( Thread( target=execute_tasks_with_dependencies, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - "executor_queue": executor._future_queue, - "executor": executor, - "refresh_rate": refresh_rate, - }, + kwargs=self._process_kwargs, ) ) self._future_hash_dict: dict = {} diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index f070d5ab..c174cf10 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -152,6 +152,7 @@ def __init__( executor_kwargs["future_queue"] = self._future_queue executor_kwargs["spawner"] = spawner executor_kwargs["queue_join_on_shutdown"] = False + self._process_kwargs = executor_kwargs self._set_process( process=[ Thread( @@ -209,6 +210,7 @@ def __init__( executor_kwargs["spawner"] = spawner executor_kwargs["max_cores"] = max_cores executor_kwargs["max_workers"] = max_workers + self._process_kwargs = executor_kwargs self._set_process( Thread( target=execute_separate_tasks, From 1c77bbd94ce827d6c8264842708a44931094acbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 15 Feb 2025 10:09:16 +0100 Subject: [PATCH 2/4] refactor dict --- executorlib/interactive/shared.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index c174cf10..bc0e3772 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -206,10 +206,12 @@ def __init__( if executor_kwargs is None: executor_kwargs = {} super().__init__(max_cores=executor_kwargs.get("max_cores")) - executor_kwargs["future_queue"] = self._future_queue - executor_kwargs["spawner"] = spawner - executor_kwargs["max_cores"] = max_cores - executor_kwargs["max_workers"] = max_workers + executor_kwargs.update({ + "future_queue": self._future_queue, + "spawner": spawner, + "max_cores": max_cores, + "max_workers": max_workers, + }) self._process_kwargs = executor_kwargs self._set_process( Thread( From e43a51e1e02892780d3cad2c29f7f7462ce52ab5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 15 Feb 2025 09:09:27 +0000 Subject: [PATCH 3/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/interactive/shared.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index bc0e3772..4078e89b 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -206,12 +206,14 @@ def __init__( if executor_kwargs is None: executor_kwargs = {} super().__init__(max_cores=executor_kwargs.get("max_cores")) - executor_kwargs.update({ - "future_queue": self._future_queue, - "spawner": spawner, - "max_cores": max_cores, - "max_workers": max_workers, - }) + executor_kwargs.update( + { + "future_queue": self._future_queue, + "spawner": spawner, + "max_cores": max_cores, + "max_workers": max_workers, + } + ) self._process_kwargs = executor_kwargs self._set_process( Thread( From 4b714ecd90feb3ce1b4b1e14987080e09675e1e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 15 Feb 2025 10:11:21 +0100 Subject: [PATCH 4/4] fixes --- executorlib/base/executor.py | 2 +- executorlib/cache/executor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index d9efb7a3..4791fe9a 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -27,7 +27,7 @@ def __init__(self, max_cores: Optional[int] = None): Initialize the ExecutorBase class. """ cloudpickle_register(ind=3) - self._process_kwargs = {} + self._process_kwargs: dict = {} self._max_cores = max_cores self._future_queue: Optional[queue.Queue] = queue.Queue() self._process: Optional[Union[Thread, list[Thread]]] = None diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 27b18f8e..9f9582c3 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -76,7 +76,7 @@ def __init__( self._set_process( Thread( target=execute_tasks_h5, - kwargs=self._kwargs, + kwargs=self._process_kwargs, ) )