Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ class FileExecutor(ExecutorBase):
def __init__(
self,
cache_directory: str = "cache",
execute_function: callable = execute_in_subprocess,
cores_per_worker: int = 1,
cwd: Optional[str] = None,
execute_function: callable = execute_in_subprocess,
terminate_function: Optional[callable] = None,
):
"""
Expand All @@ -26,8 +26,8 @@ def __init__(
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1.
cwd (str/None): current working directory where the parallel python task is executed
terminate_function (callable, optional): The function to terminate the tasks.
cwd (str/None): current working directory where the parallel python task is executed
"""
super().__init__()
if execute_function == execute_in_subprocess and terminate_function is None:
Expand Down
13 changes: 9 additions & 4 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ def done(self) -> bool:
def execute_tasks_h5(
future_queue: queue.Queue,
cache_directory: str,
cores_per_worker: int,
execute_function: callable,
cwd: Optional[str],
cores_per_worker: int = 1,
cwd: Optional[str] = None,
terminate_function: Optional[callable] = None,
) -> None:
"""
Expand Down Expand Up @@ -93,11 +93,16 @@ def execute_tasks_h5(
memory_dict=memory_dict,
file_name_dict=file_name_dict,
)
resource_dict = task_dict["resource_dict"].copy()
if "cores" not in resource_dict:
resource_dict["cores"] = cores_per_worker
if "cwd" not in resource_dict:
resource_dict["cwd"] = cwd
task_key, data_dict = serialize_funct_h5(
fn=task_dict["fn"],
fn_args=task_args,
fn_kwargs=task_kwargs,
resource_dict=task_dict["resource_dict"],
resource_dict=resource_dict,
)
if task_key not in memory_dict.keys():
if task_key + ".h5out" not in os.listdir(cache_directory):
Expand All @@ -111,7 +116,7 @@ def execute_tasks_h5(
task_dependent_lst=[
process_dict[k] for k in future_wait_key_lst
],
cwd=cwd,
resource_dict=resource_dict,
)
file_name_dict[task_key] = os.path.join(
cache_directory, task_key + ".h5out"
Expand Down
13 changes: 10 additions & 3 deletions executorlib/standalone/cache/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@
def execute_in_subprocess(
command: list,
task_dependent_lst: list = [],
cwd: Optional[str] = None,
resource_dict: Optional[dict] = None,
) -> subprocess.Popen:
"""
Execute a command in a subprocess.

Args:
command (list): The command to be executed.
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
cwd (str/None): current working directory where the parallel python task is executed
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
Example resource dictionary: {
cwd: None,
}

Returns:
subprocess.Popen: The subprocess object.
Expand All @@ -24,7 +27,11 @@ def execute_in_subprocess(
task_dependent_lst = [
task for task in task_dependent_lst if task.poll() is None
]
return subprocess.Popen(command, universal_newlines=True, cwd=cwd)
if resource_dict is None:
resource_dict = {"cwd": None}
elif len(resource_dict) == 0:
resource_dict = {"cwd": None}
return subprocess.Popen(command, universal_newlines=True, cwd=resource_dict["cwd"])
Comment on lines +30 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify conditions and future-proof resource handling.

The current implementation can be simplified and made more robust.

Consider these improvements:

-    if resource_dict is None:
-        resource_dict = {"cwd": None}
-    elif len(resource_dict) == 0:
-        resource_dict = {"cwd": None}
+    if resource_dict is None or len(resource_dict) == 0:
+        resource_dict = {"cwd": None}
+    # Create a copy to avoid modifying the input dictionary
+    resource_dict = resource_dict.copy()

Additionally, consider defining a set of supported resource keys as constants to make the code more maintainable as new resource types are added in the future:

SUPPORTED_RESOURCES = {"cwd"}
🧰 Tools
🪛 Ruff

30-33: Combine if branches using logical or operator

Combine if branches

(SIM114)



def terminate_subprocess(task):
Expand Down
3 changes: 0 additions & 3 deletions tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def test_executor_function(self):
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"cwd": None,
"terminate_function": terminate_subprocess,
},
)
Expand Down Expand Up @@ -115,7 +114,6 @@ def test_executor_function_dependence_kwargs(self):
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"cwd": None,
"terminate_function": terminate_subprocess,
},
)
Expand Down Expand Up @@ -157,7 +155,6 @@ def test_executor_function_dependence_args(self):
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"cwd": None,
"terminate_function": terminate_subprocess,
},
)
Expand Down
Loading