diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index 199a8841..4791fe9a 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -27,6 +27,7 @@ def __init__(self, max_cores: Optional[int] = None): Initialize the ExecutorBase class. """ cloudpickle_register(ind=3) + self._process_kwargs: dict = {} self._max_cores = max_cores self._future_queue: Optional[queue.Queue] = queue.Queue() self._process: Optional[Union[Thread, list[Thread]]] = None @@ -39,16 +40,13 @@ def info(self) -> Optional[dict]: Returns: Optional[dict]: Information about the executor. """ + meta_data_dict = self._process_kwargs.copy() + if "future_queue" in meta_data_dict: + del meta_data_dict["future_queue"] if self._process is not None and isinstance(self._process, list): - meta_data_dict = self._process[0]._kwargs.copy() # type: ignore - if "future_queue" in meta_data_dict: - del meta_data_dict["future_queue"] meta_data_dict["max_workers"] = len(self._process) return meta_data_dict elif self._process is not None: - meta_data_dict = self._process._kwargs.copy() # type: ignore - if "future_queue" in meta_data_dict: - del meta_data_dict["future_queue"] return meta_data_dict else: return None diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 36b29d26..9f9582c3 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -63,19 +63,20 @@ def __init__( terminate_function = terminate_subprocess cache_directory_path = os.path.abspath(cache_directory) os.makedirs(cache_directory_path, exist_ok=True) + self._process_kwargs = { + "future_queue": self._future_queue, + "execute_function": execute_function, + "cache_directory": cache_directory_path, + "resource_dict": resource_dict, + "terminate_function": terminate_function, + "pysqa_config_directory": pysqa_config_directory, + "backend": backend, + "disable_dependencies": disable_dependencies, + } self._set_process( Thread( target=execute_tasks_h5, - kwargs={ - "future_queue": self._future_queue, - "execute_function": execute_function, - "cache_directory": cache_directory_path, - "resource_dict": resource_dict, - "terminate_function": terminate_function, - "pysqa_config_directory": pysqa_config_directory, - "backend": backend, - "disable_dependencies": disable_dependencies, - }, + kwargs=self._process_kwargs, ) ) diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 8d46b1bc..105799b8 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -40,16 +40,16 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, ) -> None: super().__init__(max_cores=max_cores) + self._process_kwargs = { + "future_queue": self._future_queue, + "executor_queue": executor._future_queue, + "executor": executor, + "refresh_rate": refresh_rate, + } self._set_process( Thread( target=execute_tasks_with_dependencies, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - "executor_queue": executor._future_queue, - "executor": executor, - "refresh_rate": refresh_rate, - }, + kwargs=self._process_kwargs, ) ) self._future_hash_dict: dict = {} diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index f070d5ab..4078e89b 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -152,6 +152,7 @@ def __init__( executor_kwargs["future_queue"] = self._future_queue executor_kwargs["spawner"] = spawner executor_kwargs["queue_join_on_shutdown"] = False + self._process_kwargs = executor_kwargs self._set_process( process=[ Thread( @@ -205,10 +206,15 @@ def __init__( if executor_kwargs is None: executor_kwargs = {} super().__init__(max_cores=executor_kwargs.get("max_cores")) - executor_kwargs["future_queue"] = self._future_queue - executor_kwargs["spawner"] = spawner - executor_kwargs["max_cores"] = max_cores - executor_kwargs["max_workers"] = max_workers + executor_kwargs.update( + { + "future_queue": self._future_queue, + "spawner": spawner, + "max_cores": max_cores, + "max_workers": max_workers, + } + ) + self._process_kwargs = executor_kwargs self._set_process( Thread( target=execute_separate_tasks,