Skip to content

Conversation

@jan-janssen
Copy link
Member

@jan-janssen jan-janssen commented Feb 15, 2025

Summary by CodeRabbit

  • New Features
    • Introduced new executor options, including BlockAllocationExecutor, OneTaskPerProcessExecutor, and DependencyExecutor, enhancing parallel and dependency-based task execution for improved performance in environments like Jupyter notebooks.
  • Refactor
    • Streamlined the task execution framework by removing legacy components and standardizing executor interfaces and naming for more consistent usage.
  • Tests
    • Updated test suites to validate the new executors and execution workflows, ensuring reliable and stable functionality for task management.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 15, 2025

Important

Review skipped

Review was skipped as selected files did not have any reviewable changes.

💤 Files selected but had no reviewable changes (1)
  • executorlib/interactive/fluxspawner.py

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

This pull request refactors and extends the executor framework. It introduces new executor classes—BlockAllocationExecutor, DependencyExecutor (renaming the old ExecutorWithDependencies), and OneTaskPerProcessExecutor—with methods for task submission, dependency handling, and shutdown. Additionally, legacy executors and methods are removed or renamed (e.g. execute_parallel_tasksexecute_tasks), and interface modules as well as related tests are updated to reflect the new executor implementations and control flows.

Changes

File(s) Change Summary
executorlib/interactive/blockallocation.py
executorlib/interactive/dependency.py
executorlib/interactive/onetoone.py
Added new executor classes: BlockAllocationExecutor, DependencyExecutor (renamed from ExecutorWithDependencies), and OneTaskPerProcessExecutor. Introduced methods such as __init__, submit, shutdown, _set_process, _execute_tasks_with_dependencies, and _update_waiting_task to handle task execution and dependency management.
executorlib/interactive/shared.py Removed executor classes and methods: ExecutorBroker, InteractiveExecutor, InteractiveStepExecutor; eliminated methods like submit, shutdown, _set_process, execute_separate_tasks, _wait_for_free_slots, _submit_waiting_task, and _submit_function_to_separate_process. Renamed execute_parallel_tasks to execute_tasks.
executorlib/interfaces/... (flux.py, single.py, slurm.py) Updated return types and class references by replacing legacy executors with new ones (e.g. ExecutorWithDependenciesDependencyExecutor, InteractiveExecutorOneTaskPerProcessExecutor, InteractiveStepExecutorBlockAllocationExecutor). Also updated function signatures (e.g. validate_max_workers_fluxvalidate_max_workers).
tests/... (test_dependencies_executor.py,
test_flux_executor.py, test_local_executor.py,
test_local_executor_future.py, test_shell_executor.py,
test_shell_interactive.py)
Modified tests to reflect renaming of functions (from execute_parallel_tasks to execute_tasks) and replacement of executor classes with the new implementations, ensuring consistency in the execution and dependency management pipelines.

Sequence Diagram(s)

BlockAllocationExecutor Task Submission Flow

sequenceDiagram
  participant User
  participant Executor as BlockAllocationExecutor
  participant Worker
  participant Future
  User->>Executor: submit(task, args, resource_dict)
  Executor->>Future: Return Future object
  Executor->>Worker: Launch worker thread (_set_process)
  Worker->>Executor: Execute task
  Worker->>Future: Complete Future with result
  Executor->>User: Return eventual result
Loading

DependencyExecutor Task Dependency Handling

sequenceDiagram
  participant Queue as FutureQueue
  participant Executor as DependencyExecutor
  participant Updater as _update_waiting_task
  participant Task
  Queue-->>Executor: Enqueue tasks with dependencies
  Executor->>Executor: _execute_tasks_with_dependencies (process tasks)
  Executor->>Task: Check dependency readiness
  Task-->>Executor: Return result/exception
  Executor->>Updater: Update waiting tasks if needed
Loading

Possibly related PRs

  • Debug error handling #577: Enhances error handling in task submission that directly impacts the new submit method in the BlockAllocationExecutor class.

Poem

In the meadow of code, I happily hop,
New executors spring up with a joyful plop.
BlockAllocation, Dependency, OneTask in line,
They manage tasks so gracefully fine.
With whiskers twitching at every change,
This rabbit cheers as we rearrange!
🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
executorlib/interactive/shared.py (1)

102-124: 🛠️ Refactor suggestion

Ensure consistent exception propagation.

In _execute_task_without_cache, if an exception occurs, you mark the Future with that exception and shut down the interface but do not re-raise. Compare this to _execute_task_with_cache, which re-raises exceptions. Consider unifying these behaviors for consistency, if continued processing after a single task failure is not intended.

🧹 Nitpick comments (17)
executorlib/interactive/blockallocation.py (1)

15-46: Docstring references the old executor class name.

Within the docstring, multiple references to InteractiveExecutor may confuse users since the class is now defined as BlockAllocationExecutor. Consider clarifying or renaming these references to match the new class.

executorlib/interactive/shared.py (2)

17-76: Refine error handling in the main execution loop.

  1. The docstring states “Execute a single tasks in parallel,” which may be grammatically incorrect or incomplete. Consider clarifying the wording to match its parallel-processing capabilities.
  2. When an exception occurs during task execution, the entire interface is shut down. This might be too aggressive if the intent is to continue processing other tasks in the queue. Evaluate whether a per-task failure strategy is more appropriate.

126-174: Prevent partial shutdown on single-task failure.

When _execute_task_with_cache encounters an exception, the interface is shut down and the exception is bubbled up. Verify that this design aligns with production needs. If you would like partial tolerance (other tasks still proceed), consider removing the shutdown call or making it configurable.

executorlib/interactive/dependency.py (3)

21-67: Rename class references for clarity.

The docstring within the DependencyExecutor class refers to “ExecutorWithDependencies.” Update these references to match the renamed class for consistency.


56-59: Single-thread approach is straightforward.

Launching a single thread that processes dependent tasks in an event loop is clear. This design avoids complex concurrency but might become a bottleneck if many tasks arrive quickly. Monitor performance if expected throughput is high.


142-203: Efficiently handle dependencies.

The _execute_tasks_with_dependencies function effectively manages a queue of tasks with dependencies, storing unsatisfied tasks in wait_lst. However, repeatedly polling for tasks and sleeping can degrade performance for large volumes. If performance is a concern, consider replacing sleep-based polling with a more event-driven approach.

executorlib/interactive/onetoone.py (6)

1-3: Consider adding module-level docstring.
While you have comprehensive docstrings for the class and functions, adding a short module-level docstring explaining this file’s purpose (i.e., an executor that spawns tasks in individual processes or threads) would improve discoverability and maintainability.


10-40: Ensure the docstring correctly reflects class behavior.
The docstring references executorlib.interactive.executor.InteractiveStepExecutor for comparison, but that class is not in this file. It may cause confusion if someone reads this docstring in isolation. Consider clarifying that this class is newly introduced and how it compares or replaces other executors in the refactored architecture.


42-66: Validate max_cores vs. max_workers in constructor.
The constructor sets super().__init__(max_cores=executor_kwargs.get("max_cores")) but also merges user-supplied values for max_cores and max_workers in the executor_kwargs dict. If a user provides conflicting numbers, you might want to log a warning or validate consistency to avoid unintended resource usage.


69-94: Minor grammar fix in docstring.
The docstring says “Execute a single tasks in parallel,” which is ungrammatical. Update “tasks” to “task.”

Apply this diff to fix it:

-    Execute a single tasks in parallel using the message passing interface (MPI).
+    Execute a single task in parallel using the message passing interface (MPI).

95-122: Potential busy loop when waiting for shutdown.
Currently, the while loop waits for new tasks indefinitely and breaks only upon receiving "shutdown". If the queue is empty for long periods, this thread will remain idle, continuously calling future_queue.get() in a blocking manner. Although blocking calls are typically okay, consider adding a small timeout or logging to handle graceful period inactivity or give user feedback.


125-155: Avoid tight polling for resource availability.
Inside _wait_for_free_slots, the code repeatedly rebuilds active_task_dict until enough resources are freed. This can spin tightly if tasks are long-running. Consider adding a brief sleep interval or wait-notification approach (e.g., condition variables) to reduce CPU overhead while waiting for resources.

executorlib/interfaces/single.py (2)

202-203: Update docstring for return type.
The docstring at line 235 states "Returns: InteractiveStepExecutor/ InteractiveExecutor" but the function now returns either OneTaskPerProcessExecutor or BlockAllocationExecutor. Consider aligning the docstring’s "Returns" section with the new classes.


267-272: Consider unifying parameter name usage.
OneTaskPerProcessExecutor accepts both max_cores and max_workers. Here you pass them. Make sure code references (and docstrings) consistently mention how these two parameters interact (i.e., if they are synonyms in practice, or if they each serve a unique purpose).

executorlib/interfaces/flux.py (2)

428-429: Align return docstring with updated executor types.
Similar to other files, the docstring on line 465 references “InteractiveStepExecutor/ InteractiveExecutor.” Update it to “OneTaskPerProcessExecutor / BlockAllocationExecutor” to reflect actual return values.


505-510: OneTaskPerProcessExecutor flux semantics.
Returning OneTaskPerProcessExecutor in a flux context is interesting, as Flux typically handles multi-rank MPI tasks. Verify that the user experience is correct when mixing flux with a single-process executor.

tests/test_dependencies_executor.py (1)

9-9: Consider the implications of using a private function in tests.

The change from execute_tasks_with_dependencies to _execute_tasks_with_dependencies makes the function private (indicated by the underscore prefix). While the test coverage is maintained, consider either:

  1. Making the function public if it's part of the API
  2. Moving these tests to a private test suite if they're testing implementation details

Also applies to: 93-94, 145-146, 199-200

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3befb3f and 3bf183e.

📒 Files selected for processing (13)
  • executorlib/interactive/blockallocation.py (1 hunks)
  • executorlib/interactive/dependency.py (3 hunks)
  • executorlib/interactive/onetoone.py (1 hunks)
  • executorlib/interactive/shared.py (3 hunks)
  • executorlib/interfaces/flux.py (6 hunks)
  • executorlib/interfaces/single.py (5 hunks)
  • executorlib/interfaces/slurm.py (5 hunks)
  • tests/test_dependencies_executor.py (4 hunks)
  • tests/test_flux_executor.py (8 hunks)
  • tests/test_local_executor.py (29 hunks)
  • tests/test_local_executor_future.py (5 hunks)
  • tests/test_shell_executor.py (4 hunks)
  • tests/test_shell_interactive.py (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • tests/test_shell_executor.py
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: unittest_openmpi (macos-latest, 3.13)
  • GitHub Check: unittest_win
  • GitHub Check: unittest_mpich (macos-latest, 3.13)
🔇 Additional comments (21)
executorlib/interactive/blockallocation.py (2)

109-135: Orderly shutdown process looks good.

The shutdown logic cleanly signals all worker threads, joins them if requested, and clears state references to avoid future submissions. This approach is straightforward and meets typical executor shutdown needs.


136-146: Process initialization is clearly structured.

Using _set_process to encapsulate thread creation and starting the threads immediately is an understandable design. It keeps important concurrency concerns centralized. No changes required here.

executorlib/interactive/dependency.py (1)

205-230: Dependency resolution logic is clean.

The _update_waiting_task helper function correctly checks dependencies, updates arguments, and schedules tasks once dependencies are met. The usage of check_exception_was_raised and get_exception_lst ensures robust error handling for dependent tasks.

executorlib/interactive/onetoone.py (1)

158-222: Clarify resource override logic.
In _wrap_execute_task_in_separate_process, if resource_dict["cores"] is 1 and executor_kwargs["cores"] is ≥ 1, you override the resource dict to use executor_kwargs["cores"]. This could inadvertently override an intentionally set value of 1. If this is by design, document it; if not, add strict logic or checks to confirm which value should prevail in scenarios where both are set.

executorlib/interfaces/single.py (3)

3-5: Imports align with refactored executors.
The removal of old executor imports and addition of BlockAllocationExecutor, DependencyExecutor, and OneTaskPerProcessExecutor is consistent with the new architecture.


164-165: Confirm user intentions when dependencies are disabled.
If disable_dependencies is False, you wrap the returned executor with DependencyExecutor. Otherwise, you return a different executor. This is correct per the design; however, ensure that your user-facing docs clarify that disabling dependencies completely omits the dependency resolution path.


256-256: Ensure correct block allocation usage.
When returning BlockAllocationExecutor, all resources must be defined on the executor, rather than in subsequent calls. Confirm that this usage is thoroughly documented or validated for new users unfamiliar with the block-allocation design.

executorlib/interfaces/flux.py (5)

3-5: Imports updated to new executor classes.
Replacing old classes with BlockAllocationExecutor, DependencyExecutor, and OneTaskPerProcessExecutor is consistent with the broader refactoring. The approach looks coherent.


17-17: Check for fallback or error handling in case flux-base is absent.
If the flux-based modules are not installed, you catch ImportError. Make sure to degrade gracefully if Flux is unavailable at runtime, so that usage paths that do not require Flux can still proceed.


187-188: DependencyExecutor layering.
When disable_dependencies is False, you wrap the returned executor with DependencyExecutor. Ensure the underlying code path doesn’t inadvertently rewrap or create circular references if chaining multiple executors in different places.


494-495: Validate concurrency constraints with validate_max_workers.
You call validate_number_of_cores and then validate_max_workers while creating a BlockAllocationExecutor. Confirm these validations are in sync so that a mismatch (e.g., max_workers < cores_per_worker) is caught once, with a consistent error message.


499-499: Block allocation usage consistency.
Returning BlockAllocationExecutor with spawner=FluxPythonSpawner is indeed aligned with block allocation under Flux. Just ensure that if the user sets partial resource requirements in subsequent calls, an appropriate exception is raised rather than partially fulfilling those requests.

tests/test_shell_interactive.py (1)

9-9: LGTM! The refactoring maintains test coverage.

The change from execute_parallel_tasks to execute_tasks is consistent with the broader refactoring while preserving the same test behavior and assertions.

Also applies to: 91-97

tests/test_local_executor_future.py (1)

8-8: LGTM! The executor refactoring maintains test coverage.

The change from InteractiveExecutor to BlockAllocationExecutor is consistent across all test cases while preserving the same test behavior and assertions.

Also applies to: 21-25, 37-41, 70-74, 111-115

tests/test_flux_executor.py (1)

8-9: LGTM! The executor refactoring maintains test coverage.

The changes are consistent with the broader refactoring:

  1. Renamed execute_parallel_tasks to execute_tasks
  2. Replaced InteractiveExecutor with BlockAllocationExecutor

All test cases preserve their behavior and assertions.

Also applies to: 51-55, 64-71, 80-88, 94-102, 115-120, 130-136, 141-149

tests/test_local_executor.py (3)

12-14: LGTM! The imports have been updated to reflect the new executor classes.

The imports have been correctly updated to use the new executor classes from their respective modules.


67-68: LGTM! Consistent replacement of executor classes across test cases.

The old executor classes have been consistently replaced with their new counterparts while maintaining the same test scenarios and functionality:

  • InteractiveExecutorBlockAllocationExecutor
  • InteractiveStepExecutorOneTaskPerProcessExecutor

Also applies to: 81-82, 97-98, 111-112, 130-131, 141-142, 176-177, 187-188, 219-220, 260-261, 288-289, 314-315, 373-374, 393-394, 411-412


247-248: LGTM! Consistent replacement of task execution function.

The execute_parallel_tasks function has been consistently replaced with execute_tasks across all test cases.

Also applies to: 429-430, 445-446, 460-461, 478-479, 501-502, 520-521

executorlib/interfaces/slurm.py (3)

3-6: LGTM! The imports have been updated to reflect the new executor classes.

The imports have been correctly updated to use the new executor classes and utilities from their respective modules.


186-187: LGTM! Consistent updates to executor class usage and return types.

The changes maintain consistency with the refactoring effort:

  • create_slurm_executor now returns Union[OneTaskPerProcessExecutor, BlockAllocationExecutor]
  • ExecutorWithDependencies has been replaced with DependencyExecutor
  • The executor instantiation has been updated to use the new classes

Also applies to: 359-360, 396-396, 454-458, 460-465


449-453: LGTM! Validation function has been consolidated.

The change from validate_max_workers_slurm to validate_max_workers suggests a consolidation of validation functions, which improves maintainability.

Comment on lines +48 to +70
def __init__(
self,
max_workers: int = 1,
executor_kwargs: Optional[dict] = None,
spawner: type[BaseSpawner] = MpiExecSpawner,
):
if executor_kwargs is None:
executor_kwargs = {}
super().__init__(max_cores=executor_kwargs.get("max_cores"))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["queue_join_on_shutdown"] = False
self._process_kwargs = executor_kwargs
self._set_process(
process=[
Thread(
target=execute_tasks,
kwargs=executor_kwargs,
)
for _ in range(max_workers)
],
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate or guard the max_workers parameter.

If max_workers is provided as zero or a negative number, it may result in an empty or invalid thread list, potentially causing runtime errors or wasted resources. Consider adding an assertion or handling to enforce max_workers >= 1.

 def __init__(
     self,
     max_workers: int = 1,
     executor_kwargs: Optional[dict] = None,
     spawner: type[BaseSpawner] = MpiExecSpawner,
 ):
+    if max_workers < 1:
+        raise ValueError("max_workers must be at least 1.")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(
self,
max_workers: int = 1,
executor_kwargs: Optional[dict] = None,
spawner: type[BaseSpawner] = MpiExecSpawner,
):
if executor_kwargs is None:
executor_kwargs = {}
super().__init__(max_cores=executor_kwargs.get("max_cores"))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["queue_join_on_shutdown"] = False
self._process_kwargs = executor_kwargs
self._set_process(
process=[
Thread(
target=execute_tasks,
kwargs=executor_kwargs,
)
for _ in range(max_workers)
],
)
def __init__(
self,
max_workers: int = 1,
executor_kwargs: Optional[dict] = None,
spawner: type[BaseSpawner] = MpiExecSpawner,
):
if max_workers < 1:
raise ValueError("max_workers must be at least 1.")
if executor_kwargs is None:
executor_kwargs = {}
super().__init__(max_cores=executor_kwargs.get("max_cores"))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["queue_join_on_shutdown"] = False
self._process_kwargs = executor_kwargs
self._set_process(
process=[
Thread(
target=execute_tasks,
kwargs=executor_kwargs,
)
for _ in range(max_workers)
],
)

Comment on lines +71 to +108
def submit( # type: ignore
self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs
) -> Future:
"""
Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.

Args:
fn (Callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}

Returns:
Future: A Future representing the given call.
"""
if resource_dict is None:
resource_dict = {}
check_resource_dict_is_empty(resource_dict=resource_dict)
check_resource_dict(function=fn)
f: Future = Future()
if self._future_queue is not None:
self._future_queue.put(
{"fn": fn, "args": args, "kwargs": kwargs, "future": f}
)
return f

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Re-examine the resource_dict checks for consistency.

This method calls both check_resource_dict_is_empty and check_resource_dict, which might conflict if resource dictionaries with certain fields are valid. Verify whether you want to allow a non-empty resource_dict or enforce it to be empty. Consider removing one of these checks or clarifying the intended behavior.

-if resource_dict is None:
-    resource_dict = {}
-check_resource_dict_is_empty(resource_dict=resource_dict)
-check_resource_dict(function=fn)
+if resource_dict is None:
+    resource_dict = {}
+check_resource_dict(function=fn)
 # Remove or correct the empty-resource check if resources are allowed
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def submit( # type: ignore
self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs
) -> Future:
"""
Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Args:
fn (Callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}
Returns:
Future: A Future representing the given call.
"""
if resource_dict is None:
resource_dict = {}
check_resource_dict_is_empty(resource_dict=resource_dict)
check_resource_dict(function=fn)
f: Future = Future()
if self._future_queue is not None:
self._future_queue.put(
{"fn": fn, "args": args, "kwargs": kwargs, "future": f}
)
return f
def submit( # type: ignore
self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs
) -> Future:
"""
Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Args:
fn (Callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}
Returns:
Future: A Future representing the given call.
"""
if resource_dict is None:
resource_dict = {}
check_resource_dict(function=fn)
f: Future = Future()
if self._future_queue is not None:
self._future_queue.put(
{"fn": fn, "args": args, "kwargs": kwargs, "future": f}
)
return f

@codecov
Copy link

codecov bot commented Feb 15, 2025

Codecov Report

Attention: Patch coverage is 98.28571% with 3 lines in your changes missing coverage. Please review.

Project coverage is 96.11%. Comparing base (3befb3f) to head (602a0a7).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
executorlib/interactive/blockallocation.py 97.67% 1 Missing ⚠️
executorlib/interactive/onetoone.py 98.24% 1 Missing ⚠️
executorlib/interfaces/slurm.py 88.88% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #587      +/-   ##
==========================================
+ Coverage   95.91%   96.11%   +0.19%     
==========================================
  Files          26       28       +2     
  Lines        1176     1184       +8     
==========================================
+ Hits         1128     1138      +10     
+ Misses         48       46       -2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
tests/test_executor_backend_flux.py (1)

9-12: Unused Import Warning & Import Path Refactoring
The updated import for FluxPythonSpawner now correctly reflects the new module location. However, note that this imported symbol is not used anywhere in this file. If the import is solely for registration or side-effect purposes, please add an inline comment (e.g., # noqa: F401) to clarify its intentional presence. Otherwise, consider removing it to clean up unused code.

🧰 Tools
🪛 Ruff (0.8.2)

11-11: executorlib.interactive.fluxspawner.FluxPythonSpawner imported but unused; consider using importlib.util.find_spec to test for availability

(F401)

tests/test_plot_dependency_flux.py (1)

10-14: Unused Import Warning in Refactored Module
Similar to the previous file, the import of FluxPythonSpawner from executorlib.interactive.fluxspawner on line 13 serves to update the module structure, which is correct. However, it isn't used explicitly in this file. If this import is meant only to verify module availability or trigger side-effects, please consider adding a clarifying comment (e.g., # noqa: F401). Otherwise, removing it could help reduce code clutter.

🧰 Tools
🪛 Ruff (0.8.2)

11-11: pygraphviz imported but unused; consider using importlib.util.find_spec to test for availability

(F401)


12-12: flux.job imported but unused; consider using importlib.util.find_spec to test for availability

(F401)


13-13: executorlib.interactive.fluxspawner.FluxPythonSpawner imported but unused; consider using importlib.util.find_spec to test for availability

(F401)

executorlib/interfaces/slurm.py (1)

432-434: Update docstring to reflect new return types.

The docstring's Returns section still references old executor types (InteractiveStepExecutor/InteractiveExecutor). Update it to match the new return type annotation Union[OneTaskPerProcessExecutor, BlockAllocationExecutor].

    Returns:
-        InteractiveStepExecutor/ InteractiveExecutor
+        Union[OneTaskPerProcessExecutor, BlockAllocationExecutor]

Also applies to: 467-468

executorlib/interfaces/flux.py (1)

431-433: Update docstring to reflect new return types.

The docstring's Returns section still references old executor types (InteractiveStepExecutor/InteractiveExecutor). Update it to match the new return type annotation Union[OneTaskPerProcessExecutor, BlockAllocationExecutor].

    Returns:
-        InteractiveStepExecutor/ InteractiveExecutor
+        Union[OneTaskPerProcessExecutor, BlockAllocationExecutor]

Also applies to: 466-468

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3bf183e and 43ff83e.

📒 Files selected for processing (8)
  • executorlib/interactive/fluxspawner.py (0 hunks)
  • executorlib/interfaces/flux.py (6 hunks)
  • executorlib/interfaces/slurm.py (5 hunks)
  • tests/test_executor_backend_flux.py (1 hunks)
  • tests/test_flux_executor.py (8 hunks)
  • tests/test_plot_dependency_flux.py (1 hunks)
  • tests/test_pysqa_subprocess.py (1 hunks)
  • tests/test_shared_backend.py (1 hunks)
💤 Files with no reviewable changes (1)
  • executorlib/interactive/fluxspawner.py
✅ Files skipped from review due to trivial changes (2)
  • tests/test_pysqa_subprocess.py
  • tests/test_shared_backend.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/test_flux_executor.py
🧰 Additional context used
🪛 Ruff (0.8.2)
tests/test_executor_backend_flux.py

11-11: executorlib.interactive.fluxspawner.FluxPythonSpawner imported but unused; consider using importlib.util.find_spec to test for availability

(F401)

tests/test_plot_dependency_flux.py

13-13: executorlib.interactive.fluxspawner.FluxPythonSpawner imported but unused; consider using importlib.util.find_spec to test for availability

(F401)

⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.10)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
  • GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
  • GitHub Check: unittest_win
  • GitHub Check: unittest_openmpi (macos-latest, 3.13)
  • GitHub Check: notebooks
🔇 Additional comments (8)
executorlib/interfaces/slurm.py (4)

3-6: LGTM! Import changes align with the refactoring effort.

The new imports for BlockAllocationExecutor, DependencyExecutor, and OneTaskPerProcessExecutor correctly reflect the updated executor framework.


186-200: LGTM! DependencyExecutor usage is consistent.

The change from ExecutorWithDependencies to DependencyExecutor maintains the same interface while aligning with the refactored executor framework.


359-373: LGTM! DependencyExecutor usage is consistent.

The change from ExecutorWithDependencies to DependencyExecutor maintains the same interface while aligning with the refactored executor framework.


396-396: LGTM! Implementation changes are consistent.

The changes to return type annotation, validation function, and executor instantiation align well with the refactored executor framework.

Also applies to: 449-465

executorlib/interfaces/flux.py (4)

1-1: LGTM! Import changes improve robustness.

The new imports align with the refactored executor framework, and the addition of contextlib.suppress for handling optional dependencies is a good practice.

Also applies to: 4-6, 17-21


189-207: LGTM! DependencyExecutor usage is consistent.

The change from ExecutorWithDependencies to DependencyExecutor maintains the same interface while aligning with the refactored executor framework.


397-415: LGTM! DependencyExecutor usage is consistent.

The change from ExecutorWithDependencies to DependencyExecutor maintains the same interface while aligning with the refactored executor framework.


496-512: LGTM! Implementation changes are consistent.

The changes to validation and executor instantiation align well with the refactored executor framework and mirror the changes in slurm.py, maintaining consistency across the codebase.

@jan-janssen jan-janssen merged commit 1421b86 into main Feb 15, 2025
30 checks passed
@jan-janssen jan-janssen deleted the refactor_interactive branch February 15, 2025 11:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants