Fix bulk delete crash from unbounded process spawning#341
Fix bulk delete crash from unbounded process spawning#341nitrobass24 merged 5 commits intodevelopfrom
Conversation
Cap concurrent delete subprocesses at 8 in the controller — excess commands are re-queued and picked up on the next tick (500ms). On the frontend, replace forkJoin (all-at-once) with mergeMap capped at 4 concurrent requests. Also guard MoveProcess.propagate_exception() so a failed staging move no longer crashes the controller. Closes #338 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
📝 WalkthroughWalkthroughImplemented controlled concurrency for bulk file actions: frontend limits bulk action concurrency to 4; backend caps concurrent command processes at 8 and defers excess delete commands for later re-queuing. MoveProcess now carries an optional pair_id and cleanup exception propagation is guarded. Changes
Sequence Diagram(s)sequenceDiagram
participant UI as Frontend UI
participant FE as Angular Service
participant BE as Controller API
participant Q as Controller Command Queue
participant P as MoveProcess/Worker
UI->>FE: Trigger bulk delete (N files)
FE->>FE: from(checked) -> mergeMap(action, concurrency=4)
FE-->>BE: Send delete commands (streamed)
BE->>Q: enqueue commands
Q->>Q: if active_count >= _MAX_CONCURRENT_COMMAND_PROCESSES (8) and cmd in {DELETE_LOCAL, DELETE_REMOTE}
Q-->>Q: defer matching commands (deferred queue)
Q->>P: spawn MoveProcess (includes pair_id)
P-->>Q: on finish -> propagate_exception() (guarded by try/except)
Q->>Q: after drain -> re-enqueue deferred commands
Estimated code review effort🎯 4 (Complex) | ⏱️ ~40 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Deferred commands were re-queued into the same queue mid-loop, causing the while loop to immediately re-read them and spin forever. Collect deferred commands in a local list and re-queue after the loop exits. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/angular/src/app/services/files/view-file.service.ts`:
- Around line 217-220: bulkAction currently hard-caps concurrency via
mergeMap(..., 4) which unintentionally throttles bulkQueue() and bulkStop();
change bulkAction(signature) to accept an optional concurrency parameter
(default Infinity), use that parameter in mergeMap(f => action(f), concurrency),
and update bulkDelete() to call bulkAction(..., 4) while leaving bulkQueue() and
bulkStop() calling bulkAction(...) with the default so they remain unthrottled;
reference bulkAction, bulkDelete, bulkQueue, bulkStop and the mergeMap(..., 4)
usage when making the change.
In `@src/python/controller/controller.py`:
- Around line 1134-1135: The drain loop removes commands from
self.__command_queue and appends them to the in-memory list deferred (deferred:
list[Controller.Command]) before processing; if an exception occurs later the
deferred commands are lost—wrap the existing loop body in a try: block and add a
finally: that requeues any remaining deferred commands back onto
self.__command_queue (preserving order) so that exceptions in process.start() or
callbacks do not drop commands; update the same pattern for the other similar
drain sections (around lines noted) to ensure each deferred list is re-enqueued
in finally.
- Around line 1421-1424: The code currently marks a file as moved by adding its
key to self.__moved_file_keys before running the subprocess, so if
move_process.propagate_exception() raises the exception handler only logs and
leaves the key set, preventing retries in __spawn_move_process; change the flow
so that either (a) you only add the file key to self.__moved_file_keys after
move_process.propagate_exception() completes successfully, or (b) in the except
block remove the file key from self.__moved_file_keys before logging, ensuring
failed moves are not treated as completed; update the handlers around
move_process.propagate_exception(), self.__moved_file_keys, and the
logger.warning call accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 65bd63d9-b20f-4d2e-b3f7-827eab362ee3
📒 Files selected for processing (2)
src/angular/src/app/services/files/view-file.service.tssrc/python/controller/controller.py
| deferred: list[Controller.Command] = [] | ||
|
|
There was a problem hiding this comment.
Requeue deferred commands in finally.
deferred only exists in memory until Lines 1331-1333. Any unexpected exception later in the drain loop—e.g. process.start() in the delete branches or a callback on Line 1328—will drop already-deferred commands permanently because they have already been removed from self.__command_queue.
♻️ Suggested fix
deferred: list[Controller.Command] = []
-
- while not self.__command_queue.empty():
+ try:
+ while not self.__command_queue.empty():Indent the current loop body under try:.
- for cmd in deferred:
- self.__command_queue.put(cmd)
+ finally:
+ for cmd in deferred:
+ self.__command_queue.put(cmd)Also applies to: 1196-1204, 1245-1253, 1331-1333
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/python/controller/controller.py` around lines 1134 - 1135, The drain loop
removes commands from self.__command_queue and appends them to the in-memory
list deferred (deferred: list[Controller.Command]) before processing; if an
exception occurs later the deferred commands are lost—wrap the existing loop
body in a try: block and add a finally: that requeues any remaining deferred
commands back onto self.__command_queue (preserving order) so that exceptions in
process.start() or callbacks do not drop commands; update the same pattern for
the other similar drain sections (around lines noted) to ensure each deferred
list is re-enqueued in finally.
…ilure - bulkAction: Add optional concurrency parameter (default Infinity). Only bulkDeleteLocal/bulkDeleteRemote pass 4; bulkQueue and bulkStop remain unthrottled. - MoveProcess: Expose file_name and pair_id properties so the controller can identify failed moves. - __cleanup_commands: Remove move_key from __moved_file_keys on failure so the move can be retried on the next cycle. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
♻️ Duplicate comments (2)
src/python/controller/controller.py (2)
1421-1426:⚠️ Potential issue | 🟠 MajorFailed moves still are not retried automatically.
Discarding
self.__moved_file_keyshere makes a retry possible, but nothing in this cleanup path actually re-schedules__spawn_move_process()for the same file. A staged file that throws inpropagate_exception()can still sit indefinitely unless some unrelated later transition happens. Please either queue a retry from this handler or add a next-cycle retry path that calls__spawn_move_process()when the move key has been cleared.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/python/controller/controller.py` around lines 1421 - 1426, The failure handler for move_process currently discards the key from self.__moved_file_keys but never re-schedules the move, so failed moves can stall; after catching the exception in the block that calls move_process.propagate_exception(), re-enqueue a retry by invoking the same spawn logic (call self.__spawn_move_process(move_process.pair_id, move_process.file_name) or push the tuple into your existing move retry queue) immediately or schedule it for the next cycle, and ensure you guard against tight infinite retries (e.g., check/ increment an attempt counter on the move_process or respect a max-retries policy) before scheduling.
1134-1135:⚠️ Potential issue | 🟠 MajorRequeue deferred deletes in
finally.If
process.start()or a callback raises after a command has been appended todeferred, the method exits before Lines 1331-1332 and those commands are dropped permanently. Wrap the drain loop so deferred work is always restored.♻️ Suggested fix
deferred: list[Controller.Command] = [] - - while not self.__command_queue.empty(): + try: + while not self.__command_queue.empty():Indent the current loop body under
try:.- for cmd in deferred: - self.__command_queue.put(cmd) + finally: + for cmd in deferred: + self.__command_queue.put(cmd)Also applies to: 1196-1204, 1245-1253, 1331-1332
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/python/controller/controller.py` around lines 1134 - 1135, The deferred list (deferred: list[Controller.Command]) can lose commands if process.start() or a callback raises before the drain loop runs; wrap the current drain loop in a try/finally so any commands appended to deferred are always requeued in the finally block (e.g., extend/insert them back into the work queue or call the existing requeue logic), keeping the loop body under try and using finally to restore deferred items; apply the same pattern where deferred is used (the other occurrences around lines ~1196-1204, ~1245-1253, ~1331-1332) and reference the deferred variable, the drain loop and process.start()/callback points when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/python/controller/controller.py`:
- Around line 1421-1426: The failure handler for move_process currently discards
the key from self.__moved_file_keys but never re-schedules the move, so failed
moves can stall; after catching the exception in the block that calls
move_process.propagate_exception(), re-enqueue a retry by invoking the same
spawn logic (call self.__spawn_move_process(move_process.pair_id,
move_process.file_name) or push the tuple into your existing move retry queue)
immediately or schedule it for the next cycle, and ensure you guard against
tight infinite retries (e.g., check/ increment an attempt counter on the
move_process or respect a max-retries policy) before scheduling.
- Around line 1134-1135: The deferred list (deferred: list[Controller.Command])
can lose commands if process.start() or a callback raises before the drain loop
runs; wrap the current drain loop in a try/finally so any commands appended to
deferred are always requeued in the finally block (e.g., extend/insert them back
into the work queue or call the existing requeue logic), keeping the loop body
under try and using finally to restore deferred items; apply the same pattern
where deferred is used (the other occurrences around lines ~1196-1204,
~1245-1253, ~1331-1332) and reference the deferred variable, the drain loop and
process.start()/callback points when making the change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 9ca5166e-1005-4648-babf-ae488e47d330
📒 Files selected for processing (3)
src/angular/src/app/services/files/view-file.service.tssrc/python/controller/controller.pysrc/python/controller/move/move_process.py
Summary
MoveProcess.propagate_exception()with try/except so a failed staging move no longer crashes the controller.forkJoin(all N requests simultaneously) withmergeMapcapped at 4 concurrent HTTP requests as defense-in-depth.Closes #338
Test plan
🤖 Generated with Claude Code
Summary by CodeRabbit