Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions executorlib/task_scheduler/file/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def done(self) -> bool:

def execute_tasks_h5(
future_queue: queue.Queue,
cache_directory: str,
execute_function: Callable,
resource_dict: dict,
terminate_function: Optional[Callable] = None,
Expand All @@ -65,7 +64,6 @@ def execute_tasks_h5(

Args:
future_queue (queue.Queue): The queue containing the tasks.
cache_directory (str): The directory to store the HDF5 files.
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- cwd (str/None): current working directory where the parallel python task is executed
Expand All @@ -81,6 +79,7 @@ def execute_tasks_h5(
"""
memory_dict: dict = {}
process_dict: dict = {}
cache_dir_dict: dict = {}
file_name_dict: dict = {}
while True:
task_dict = None
Expand All @@ -104,6 +103,7 @@ def execute_tasks_h5(
{k: v for k, v in resource_dict.items() if k not in task_resource_dict}
)
cache_key = task_resource_dict.pop("cache_key", None)
cache_directory = os.path.abspath(task_resource_dict.pop("cache_directory"))
task_key, data_dict = serialize_funct_h5(
fn=task_dict["fn"],
fn_args=task_args,
Expand Down Expand Up @@ -147,11 +147,14 @@ def execute_tasks_h5(
cache_directory, task_key + "_o.h5"
)
memory_dict[task_key] = task_dict["future"]
cache_dir_dict[task_key] = cache_directory
future_queue.task_done()
else:
memory_dict = {
key: _check_task_output(
task_key=key, future_obj=value, cache_directory=cache_directory
task_key=key,
future_obj=value,
cache_directory=cache_dir_dict[key],
)
for key, value in memory_dict.items()
if not value.done()
Expand Down
16 changes: 6 additions & 10 deletions executorlib/task_scheduler/file/task_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from threading import Thread
from typing import Callable, Optional

Expand Down Expand Up @@ -27,7 +26,6 @@
class FileTaskScheduler(TaskSchedulerBase):
def __init__(
self,
cache_directory: str = "executorlib_cache",
resource_dict: Optional[dict] = None,
execute_function: Callable = execute_with_pysqa,
terminate_function: Optional[Callable] = None,
Expand All @@ -39,10 +37,10 @@ def __init__(
Initialize the FileExecutor.

Args:
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- cwd (str/None): current working directory where the parallel python task is executed
- cache_directory (str): The directory to store cache files.
execute_function (Callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
terminate_function (Callable, optional): The function to terminate the tasks.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
Expand All @@ -53,6 +51,7 @@ def __init__(
default_resource_dict = {
"cores": 1,
"cwd": None,
"cache_directory": "executorlib_cache",
}
if resource_dict is None:
resource_dict = {}
Expand All @@ -61,12 +60,10 @@ def __init__(
)
if execute_function == execute_in_subprocess and terminate_function is None:
terminate_function = terminate_subprocess
cache_directory_path = os.path.abspath(cache_directory)
self._process_kwargs = {
"resource_dict": resource_dict,
"future_queue": self._future_queue,
"execute_function": execute_function,
"cache_directory": cache_directory_path,
"resource_dict": resource_dict,
"terminate_function": terminate_function,
"pysqa_config_directory": pysqa_config_directory,
"backend": backend,
Expand All @@ -81,11 +78,11 @@ def __init__(


def create_file_executor(
resource_dict: dict,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify the breaking change is intentional and documented.

The change from resource_dict: Optional[dict] = None to resource_dict: dict makes this parameter required, which is a breaking change. This should be intentional and properly documented.


🏁 Script executed:

#!/bin/bash
# Description: Check if this breaking change is mentioned in documentation or changelog
# Expected: Find references to the API change in documentation files

# Search for documentation about this change
fd -e md -e rst -e txt -e py --exec grep -l "resource_dict" {} \; | head -10
echo "---"
# Look for changelog or migration docs
fd -e md -e rst -e txt -e py -I --exec grep -l -i "breaking\|changelog\|migration" {} \; | head -5

Length of output: 608


Add documentation for the breaking change to resource_dict

I didn’t find any mentions of this API change in your documentation or changelog. Since making resource_dict required is a breaking change, please:

  • Add an entry under a “Breaking Changes” section in your CHANGELOG (e.g. docs/CHANGELOG.md), noting that
    executorlib/task_scheduler/file/task_scheduler.py: TaskScheduler.__init__ now requires resource_dict: dict.
  • Update any migration guide or README to reflect the new required parameter.
  • Consider adding a docstring note in executorlib/task_scheduler/file/task_scheduler.py (around line 80) calling out the parameter requirement.
🤖 Prompt for AI Agents
In executorlib/task_scheduler/file/task_scheduler.py around line 80, the
resource_dict parameter in TaskScheduler.__init__ is now required, which is a
breaking change. Add an entry under a "Breaking Changes" section in
docs/CHANGELOG.md describing this new requirement. Update any migration guides
or README files to mention the new required resource_dict parameter. Also, add a
note in the __init__ method's docstring near line 80 to document that
resource_dict is now a mandatory argument.

max_workers: Optional[int] = None,
backend: str = "flux_submission",
max_cores: Optional[int] = None,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
Expand All @@ -96,8 +93,6 @@ def create_file_executor(
init_function: Optional[Callable] = None,
disable_dependencies: bool = False,
):
if cache_directory is None:
cache_directory = "executorlib_cache"
if block_allocation:
raise ValueError(
"The option block_allocation is not available with the pysqa based backend."
Expand All @@ -106,14 +101,15 @@ def create_file_executor(
raise ValueError(
"The option to specify an init_function is not available with the pysqa based backend."
)
if cache_directory is not None:
resource_dict["cache_directory"] = cache_directory
check_flux_executor_pmi_mode(flux_executor_pmi_mode=flux_executor_pmi_mode)
check_max_workers_and_cores(max_cores=max_cores, max_workers=max_workers)
check_hostname_localhost(hostname_localhost=hostname_localhost)
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_flux_log_files(flux_log_files=flux_log_files)
return FileTaskScheduler(
cache_directory=cache_directory,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend=backend.split("_submission")[0],
Expand Down
15 changes: 7 additions & 8 deletions tests/test_cache_fileexecutor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ def test_executor_dependence_mixed(self):
self.assertTrue(fs2.done())

def test_create_file_executor_error(self):
with self.assertRaises(TypeError):
create_file_executor()
with self.assertRaises(ValueError):
create_file_executor(block_allocation=True)
create_file_executor(block_allocation=True, resource_dict={})
with self.assertRaises(ValueError):
create_file_executor(init_function=True)
create_file_executor(init_function=True, resource_dict={})

def test_executor_dependence_error(self):
with self.assertRaises(ValueError):
Expand Down Expand Up @@ -106,9 +108,8 @@ def test_executor_function(self):
target=execute_tasks_h5,
kwargs={
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"resource_dict": {"cores": 1, "cwd": None},
"resource_dict": {"cores": 1, "cwd": None, "cache_directory": cache_dir},
"terminate_function": terminate_subprocess,
},
)
Expand Down Expand Up @@ -147,9 +148,8 @@ def test_executor_function_dependence_kwargs(self):
target=execute_tasks_h5,
kwargs={
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"resource_dict": {"cores": 1, "cwd": None},
"resource_dict": {"cores": 1, "cwd": None, "cache_directory": cache_dir},
"terminate_function": terminate_subprocess,
},
)
Expand Down Expand Up @@ -188,9 +188,8 @@ def test_executor_function_dependence_args(self):
target=execute_tasks_h5,
kwargs={
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"resource_dict": {"cores": 1},
"resource_dict": {"cores": 1, "cache_directory": cache_dir},
"terminate_function": terminate_subprocess,
},
)
Expand Down
Loading