-
Notifications
You must be signed in to change notification settings - Fork 3
Interactive: Separate future and taskdict #800
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughRefactors interactive schedulers to extract the Future from task_dict and pass it explicitly as a new future_obj parameter to execute_task_dict and its helpers; updates callers (onetoone, blockallocation) to pop and propagate the Future and to track active tasks by Future identity. All result/exception propagation uses the provided Future. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant P as Producer
participant S as Scheduler (onetoone/blockallocation)
participant T as Worker Thread
participant X as execute_task_dict (shared)
participant C as Cache Helpers
participant F as Future
P->>S: submit(task_dict {fn, future})
S->>S: f = task_dict.pop("future")
S->>S: active_task_dict[f] = task_dict
S->>T: start _execute_task_in_thread(task_dict, future_obj=f)
T->>X: execute_task_dict(task_dict, future_obj=f, interface, ...)
alt cache enabled
X->>C: _execute_task_with_cache(..., future_obj=f, ...)
C->>F: set_running_or_notify_cancel()
C-->>F: set_result(...) / set_exception(...)
else no cache
X->>C: _execute_task_without_cache(..., future_obj=f)
C->>F: set_running_or_notify_cancel()
C-->>F: set_result(...) / set_exception(...)
end
X-->>T: done
T-->>S: task_done
S->>S: del active_task_dict[f]
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
for more information, see https://pre-commit.ci
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #800 +/- ##
=======================================
Coverage 97.75% 97.75%
=======================================
Files 32 32
Lines 1468 1468
=======================================
Hits 1435 1435
Misses 33 33 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
executorlib/task_scheduler/interactive/blockallocation.py (1)
243-253: Ensure queue.task_done is always calledIf execute_task_dict unexpectedly raises, the queue item would never be marked done, risking deadlock on join(). Wrap in try/finally.
- f = task_dict.pop("future") - execute_task_dict( - task_dict=task_dict, - future_obj=f, - interface=interface, - cache_directory=cache_directory, - cache_key=cache_key, - error_log_file=error_log_file, - ) - task_done(future_queue=future_queue) + f = task_dict.pop("future") + try: + execute_task_dict( + task_dict=task_dict, + future_obj=f, + interface=interface, + cache_directory=cache_directory, + cache_key=cache_key, + error_log_file=error_log_file, + ) + finally: + task_done(future_queue=future_queue)
🧹 Nitpick comments (2)
executorlib/task_scheduler/interactive/shared.py (1)
73-76: Unify Future state transition (minor)The explicit not future_obj.done() guard is redundant; set_running_or_notify_cancel() already handles done/cancelled states. Consider simplifying for consistency with the cache path.
- if not future_obj.done() and future_obj.set_running_or_notify_cancel(): + if future_obj.set_running_or_notify_cancel():executorlib/task_scheduler/interactive/onetoone.py (1)
259-274: Potential process leak: interface not shut down after single-task execution_in execute_task_in_thread, a new interface is created per call but never explicitly shut down on success. If the remote process persists, this leaks resources. Consider capturing the interface and shutting it down in a finally block. If the design intends a persistent one-shot child that auto-exits, please ignore after confirming.
- execute_task_dict( - task_dict=task_dict, - future_obj=future_obj, - interface=interface_bootup( - command_lst=get_interactive_execute_command( - cores=cores, - ), - connections=spawner(cores=cores, **kwargs), - hostname_localhost=hostname_localhost, - log_obj_size=log_obj_size, - worker_id=worker_id, - ), - cache_directory=cache_directory, - cache_key=cache_key, - error_log_file=error_log_file, - ) + iface = interface_bootup( + command_lst=get_interactive_execute_command(cores=cores), + connections=spawner(cores=cores, **kwargs), + hostname_localhost=hostname_localhost, + log_obj_size=log_obj_size, + worker_id=worker_id, + ) + try: + execute_task_dict( + task_dict=task_dict, + future_obj=future_obj, + interface=iface, + cache_directory=cache_directory, + cache_key=cache_key, + error_log_file=error_log_file, + ) + finally: + # Safe even if already closed inside execute_task_dict on error + iface.shutdown(wait=True)If helpful, I can add a short test/diagnostic to assert no lingering child processes after a simple submit.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
executorlib/task_scheduler/interactive/blockallocation.py(1 hunks)executorlib/task_scheduler/interactive/onetoone.py(6 hunks)executorlib/task_scheduler/interactive/shared.py(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
executorlib/task_scheduler/interactive/blockallocation.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
execute_task_dict(12-47)
executorlib/task_scheduler/interactive/shared.py (3)
executorlib/task_scheduler/base.py (2)
future_queue(63-70)shutdown(146-169)executorlib/standalone/interactive/communication.py (3)
SocketInterface(14-153)send_and_receive_dict(82-94)shutdown(120-134)executorlib/standalone/hdf.py (2)
dump(20-37)get_output(71-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.11)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-22.04-arm, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: unittest_flux_mpich
- GitHub Check: notebooks_integration
- GitHub Check: unittest_slurm_mpich
- GitHub Check: notebooks
- GitHub Check: unittest_flux_openmpi
- GitHub Check: unittest_old
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: unittest_win
🔇 Additional comments (3)
executorlib/task_scheduler/interactive/shared.py (2)
51-56: Docstring addition looks goodThanks for documenting task_done; clear and accurate.
37-47: API threading of future_obj is correctPassing future_obj through both code paths is consistent and maintains external ownership.
executorlib/task_scheduler/interactive/onetoone.py (1)
190-211: Future propagation and active task tracking look correctPopping future, using it as the active_task_dict key, and threading via future_obj is coherent.
| else: | ||
| _, _, result = get_output(file_name=file_name) | ||
| future = task_dict["future"] | ||
| future.set_result(result) | ||
| future_obj.set_result(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cache-hit path ignores cancellation; may raise InvalidStateError
When the result is served from cache, the Future is completed unconditionally. If the Future was cancelled/prematurely completed, this can raise InvalidStateError and violate expected cancellation semantics. Mirror the non-cache path by honoring set_running_or_notify_cancel().
Apply:
- else:
- _, _, result = get_output(file_name=file_name)
- future_obj.set_result(result)
+ else:
+ _, _, result = get_output(file_name=file_name)
+ if future_obj.set_running_or_notify_cancel():
+ future_obj.set_result(result)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| else: | |
| _, _, result = get_output(file_name=file_name) | |
| future = task_dict["future"] | |
| future.set_result(result) | |
| future_obj.set_result(result) | |
| else: | |
| _, _, result = get_output(file_name=file_name) | |
| if future_obj.set_running_or_notify_cancel(): | |
| future_obj.set_result(result) |
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/shared.py around lines 122 to 124, the
cache-hit branch unconditionally calls future_obj.set_result(result) which can
raise InvalidStateError if the Future was cancelled; mirror the non-cache path
by first calling future_obj.set_running_or_notify_cancel() and if it returns
False stop/return without setting the result, otherwise call set_result(result)
(optionally catch InvalidStateError as a safeguard).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
executorlib/task_scheduler/interactive/shared.py (1)
121-124: Cache-hit bug: propagate cached errors via set_exception; cancellation concern from earlier review is now mootThe previous comment about honoring cancellation in the cache-hit branch is addressed by gating at the entrypoint with
set_running_or_notify_cancel(). However, a separate issue remains:get_output()returns a success flag; when it’s False (cached error), you currently callset_result(result), which would incorrectly mark the Future successful with an exception object.Apply:
- else: - _, _, result = get_output(file_name=file_name) - future_obj.set_result(result) + else: + try: + _found, _success, _obj = get_output(file_name=file_name) + if _success: + future_obj.set_result(_obj) + else: + # Cached execution resulted in an error; propagate as exception + future_obj.set_exception(_obj if isinstance(_obj, BaseException) else RuntimeError(f"Cached error: {_obj!r}")) + except Exception as e: + # Reading cache failed; surface the failure instead of silently succeeding + future_obj.set_exception(e)
🧹 Nitpick comments (2)
executorlib/task_scheduler/interactive/shared.py (2)
52-57: Docstring/typing nit fortask_doneTighten wording and the parameter type; current text mentions “task dictionaries”.
Apply:
- """ - Mark the current task as done in the current queue. - - Args: - future_queue (queue): Queue of task dictionaries waiting for execution. - """ + """ + Mark the current task as done on the given queue. + + Args: + future_queue (queue.Queue): Work queue whose current task has finished. + """
111-120: Use high-resolution clock for runtime measurement (minor)Prefer
time.perf_counter()for durations.Apply:
- time_start = time.time() + time_start = time.perf_counter() @@ - data_dict["runtime"] = time.time() - time_start + data_dict["runtime"] = time.perf_counter() - time_start
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
executorlib/task_scheduler/interactive/shared.py(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
executorlib/task_scheduler/interactive/shared.py (2)
executorlib/standalone/interactive/communication.py (3)
SocketInterface(14-153)send_and_receive_dict(82-94)shutdown(120-134)executorlib/standalone/hdf.py (2)
dump(20-37)get_output(71-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: unittest_openmpi (ubuntu-22.04-arm, 3.13)
- GitHub Check: unittest_flux_openmpi
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: notebooks_integration
- GitHub Check: unittest_openmpi (ubuntu-24.04-arm, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: notebooks
- GitHub Check: unittest_mpich (ubuntu-22.04-arm, 3.13)
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-24.04-arm, 3.13)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: unittest_win
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
🔇 Additional comments (4)
executorlib/task_scheduler/interactive/shared.py (4)
34-48: Cancellation respected before dispatch — LGTMThe entrypoint correctly gates execution with
done()andset_running_or_notify_cancel().
74-79: Non-cache path error handling — LGTMShutting down the interface on exception and completing the Future via
set_exceptionis appropriate.
12-19: Allexecute_task_dictinvocations updated to includefuture_obj; no stale or external helper calls remain.
35-36: Cache key excludeserror_log_file
serialize_functonly serializesfn_args,fn_kwargs,resource_dict(and an optionalcache_key) when computing the task key, so addingerror_log_filetotask_dictwon’t fragment the cache.
Summary by CodeRabbit