Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions executorlib/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self, max_cores: Optional[int] = None):
Initialize the ExecutorBase class.
"""
cloudpickle_register(ind=3)
self._process_kwargs: dict = {}
self._max_cores = max_cores
self._future_queue: Optional[queue.Queue] = queue.Queue()
self._process: Optional[Union[Thread, list[Thread]]] = None
Expand All @@ -39,16 +40,13 @@ def info(self) -> Optional[dict]:
Returns:
Optional[dict]: Information about the executor.
"""
meta_data_dict = self._process_kwargs.copy()
if "future_queue" in meta_data_dict:
del meta_data_dict["future_queue"]
if self._process is not None and isinstance(self._process, list):
meta_data_dict = self._process[0]._kwargs.copy() # type: ignore
if "future_queue" in meta_data_dict:
del meta_data_dict["future_queue"]
meta_data_dict["max_workers"] = len(self._process)
return meta_data_dict
elif self._process is not None:
meta_data_dict = self._process._kwargs.copy() # type: ignore
if "future_queue" in meta_data_dict:
del meta_data_dict["future_queue"]
return meta_data_dict
else:
return None
Expand Down
21 changes: 11 additions & 10 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,20 @@ def __init__(
terminate_function = terminate_subprocess
cache_directory_path = os.path.abspath(cache_directory)
os.makedirs(cache_directory_path, exist_ok=True)
self._process_kwargs = {
"future_queue": self._future_queue,
"execute_function": execute_function,
"cache_directory": cache_directory_path,
"resource_dict": resource_dict,
"terminate_function": terminate_function,
"pysqa_config_directory": pysqa_config_directory,
"backend": backend,
"disable_dependencies": disable_dependencies,
}
self._set_process(
Thread(
target=execute_tasks_h5,
kwargs={
"future_queue": self._future_queue,
"execute_function": execute_function,
"cache_directory": cache_directory_path,
"resource_dict": resource_dict,
"terminate_function": terminate_function,
"pysqa_config_directory": pysqa_config_directory,
"backend": backend,
"disable_dependencies": disable_dependencies,
},
kwargs=self._process_kwargs,
)
)

Expand Down
14 changes: 7 additions & 7 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ def __init__(
plot_dependency_graph_filename: Optional[str] = None,
) -> None:
super().__init__(max_cores=max_cores)
self._process_kwargs = {
"future_queue": self._future_queue,
"executor_queue": executor._future_queue,
"executor": executor,
"refresh_rate": refresh_rate,
}
self._set_process(
Thread(
target=execute_tasks_with_dependencies,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"executor_queue": executor._future_queue,
"executor": executor,
"refresh_rate": refresh_rate,
},
kwargs=self._process_kwargs,
)
)
self._future_hash_dict: dict = {}
Expand Down
14 changes: 10 additions & 4 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def __init__(
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["queue_join_on_shutdown"] = False
self._process_kwargs = executor_kwargs
self._set_process(
process=[
Thread(
Expand Down Expand Up @@ -205,10 +206,15 @@ def __init__(
if executor_kwargs is None:
executor_kwargs = {}
super().__init__(max_cores=executor_kwargs.get("max_cores"))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["max_cores"] = max_cores
executor_kwargs["max_workers"] = max_workers
executor_kwargs.update(
{
"future_queue": self._future_queue,
"spawner": spawner,
"max_cores": max_cores,
"max_workers": max_workers,
}
)
self._process_kwargs = executor_kwargs
self._set_process(
Thread(
target=execute_separate_tasks,
Expand Down
Loading