feat(core): implement QueueManager application service (task 10)#10
feat(core): implement QueueManager application service (task 10)#10
Conversation
Priority-based download queue with configurable max concurrent slots, exponential backoff retry (10s-300s cap), and circuit breaker integration. - QueueManager as application service with Arc<dyn Trait> dependency injection - Safe active_count tracking via AtomicUsize with fetch_update (no underflow) - Priority desc + FIFO ordering on slot freed via schedule_lock mutex - EventBus sync→async bridge using bounded mpsc channel(1024) - CancellationToken-based retry cancellation - Rollback on engine.start() failure (fail + save + re-publish) - 10 tests covering scheduling, priority, circuit breaker, idempotency
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a new application service Changes
Sequence Diagram(s)sequenceDiagram
participant EventBus as EventBus
participant QM as QueueManager
participant Repo as DownloadRepository
participant Engine as DownloadEngine
EventBus->>QM: lifecycle event (completed/failed/paused/cancelled)
activate QM
QM->>QM: Acquire schedule_lock
rect rgba(100,150,200,0.5)
Note over QM: Check active_count < max_concurrent
end
QM->>Repo: Query queued & matured retry candidates
Repo-->>QM: candidates
rect rgba(150,200,100,0.5)
Note over QM: Sort by priority desc, created_at asc
end
QM->>QM: Transition candidate -> Downloading, persist
QM->>QM: Increment active_count
QM->>Engine: start(download)
activate Engine
Engine-->>QM: success / error
deactivate Engine
alt start succeeds
QM->>EventBus: Publish DownloadStarted
else start fails
QM->>QM: Decrement active_count (safe)
QM->>Repo: Persist failure / Retry state
QM->>EventBus: Publish DownloadFailed
end
QM->>QM: Release schedule_lock
deactivate QM
sequenceDiagram
participant QM as QueueManager
participant Timer as RetryTimer
participant Repo as DownloadRepository
participant Engine as DownloadEngine
participant EventBus as EventBus
QM->>QM: handle_download_failed(id)
QM->>QM: safe decrement active_count (if active)
QM->>Repo: Reload download
alt retry allowed
QM->>QM: compute retry_delay(attempt)
QM->>Timer: start cancellable backoff (store token)
activate Timer
Timer-->>QM: backoff elapsed (or cancelled)
deactivate Timer
QM->>QM: Acquire schedule_lock
alt slot available
QM->>Repo: Persist -> Downloading
QM->>QM: Increment active_count
QM->>Engine: start(download)
else no slot available
Note over QM: leave in Retry state
end
QM->>QM: Release schedule_lock
else max retries exceeded
QM->>Repo: Persist terminal failure
QM->>EventBus: Publish MaxRetriesExceeded / Failed
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
Greptile SummaryThis PR introduces
Confidence Score: 4/5Safe to merge with the Retry-state deadlock fixed; all other findings are P2 improvements. One P1 bug: a download in Retry state that finds no free slot when its timer fires is permanently orphaned — on_slot_freed only queries Queued downloads, so no future event will reschedule it. The fix is small but without it retries can silently stall in production under load. All remaining findings are P2 and do not block merge on their own. src-tauri/src/application/services/queue_manager.rs — specifically the schedule_retry early-return path (lines 196-205) and on_slot_freed query scope (line 91)
|
| Filename | Overview |
|---|---|
| src-tauri/src/application/services/queue_manager.rs | Core QueueManager implementation — one P1 bug (Retry-state downloads permanently stuck when schedule_retry finds no free slot) and two P2 issues (single-slot-per-call scheduling, double find_by_id TOCTOU) |
| src-tauri/src/application/services/mod.rs | Trivial module re-export for QueueManager — no issues |
| src-tauri/src/application/mod.rs | Adds pub mod services; declaration — straightforward, no issues |
| src-tauri/src/lib.rs | Re-exports QueueManager in the public crate API — appropriate, no issues |
| CHANGELOG.md | CHANGELOG entry for task 10 — informational only |
Sequence Diagram
sequenceDiagram
participant EB as EventBus
participant QM as QueueManager
participant Repo as DownloadRepository
participant Eng as DownloadEngine
Note over QM: start_listening()
EB->>QM: DownloadCompleted / DownloadPaused
QM->>QM: decrement_and_schedule()
QM->>QM: safe_decrement()
QM->>QM: on_slot_freed() [acquires schedule_lock]
QM->>Repo: find_by_state(Queued)
Repo-->>QM: Vec<Download> sorted by priority+FIFO
QM->>Repo: save(download [Downloading])
QM->>EB: publish(DownloadStarted)
QM->>QM: active_count += 1
QM->>Eng: engine.start(download)
alt Engine error
QM->>QM: safe_decrement()
QM->>Repo: save(download [Error])
QM->>EB: publish(DownloadFailed)
end
EB->>QM: DownloadFailed { id }
QM->>QM: handle_download_failed(id)
QM->>Repo: find_by_id(id) [check state]
QM->>QM: safe_decrement() if active state
QM->>Repo: find_by_id(id) [fetch download]
QM->>QM: download.retry()
alt retry allowed
QM->>Repo: save(download [Retry])
QM->>EB: publish(DownloadRetrying)
QM->>QM: schedule_retry(id, attempt) [spawns task with delay]
Note over QM: If no slot on wake, download stuck in Retry
else MaxRetriesExceeded
QM->>QM: on_slot_freed()
end
Reviews (1): Last reviewed commit: "feat(core): implement QueueManager appli..." | Re-trigger Greptile
| let active = this.active_count.load(Ordering::SeqCst); | ||
| let max = this.max_concurrent.load(Ordering::SeqCst); | ||
| if active >= max { | ||
| // No slot available — put back to Queued so on_slot_freed picks it up later | ||
| // The download remains in Retry state; on_slot_freed will pick it up when a slot frees | ||
| tracing::warn!( | ||
| "schedule_retry: no slot available for {id:?}, will retry when slot frees" | ||
| ); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Download permanently stuck in
Retry state when no slot is available
When schedule_retry fires after the delay but finds active >= max, it logs a warning and returns — leaving the download in Retry state. However, on_slot_freed only queries find_by_state(DownloadState::Queued) (line 91), so a download in Retry state will never be picked up again. The comment even contradicts the code: it says "on_slot_freed will pick it up when a slot frees" but that path only considers Queued downloads.
To fix, either transition the download back to Queued before returning so on_slot_freed can pick it up, or extend on_slot_freed to also consider Retry-state downloads:
if active >= max {
// Transition back to Queued so on_slot_freed can pick it up
if let Ok(Some(mut d)) = this.download_repo.find_by_id(id) {
// re-queue: update state to Queued and save
// (requires a domain method or direct state override)
}
tracing::warn!(
"schedule_retry: no slot available for {id:?}, re-queued for later"
);
return;
}Alternatively, on_slot_freed should also query find_by_state(DownloadState::Retry) and include those downloads in the prioritised list.
| pub async fn on_slot_freed(&self) -> Result<(), AppError> { | ||
| let _guard = self.schedule_lock.lock().await; | ||
|
|
||
| let active = self.active_count.load(Ordering::SeqCst); | ||
| let max = self.max_concurrent.load(Ordering::SeqCst); | ||
| if active >= max { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let mut queued = self.download_repo.find_by_state(DownloadState::Queued)?; | ||
| if queued.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| // Sort: priority desc, then created_at asc (FIFO) | ||
| queued.sort_by(|a, b| { | ||
| b.priority() | ||
| .value() | ||
| .cmp(&a.priority().value()) | ||
| .then_with(|| a.created_at().cmp(&b.created_at())) | ||
| }); | ||
|
|
||
| let mut download = queued.remove(0); | ||
| let event = download.start()?; | ||
| self.download_repo.save(&download)?; | ||
| self.event_bus.publish(event); | ||
|
|
||
| // F2: increment before engine.start so we can roll back cleanly on failure | ||
| self.active_count.fetch_add(1, Ordering::SeqCst); | ||
|
|
||
| if let Err(engine_err) = self.engine.start(&download) { | ||
| // Roll back: transition to Error and save | ||
| self.safe_decrement(); | ||
| if let Ok(fail_event) = download.fail(engine_err.to_string()) { | ||
| let _ = self.download_repo.save(&download); | ||
| self.event_bus.publish(fail_event); | ||
| } | ||
| return Err(AppError::Domain(engine_err)); | ||
| } | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
on_slot_freed starts only one download per invocation even with multiple free slots
The method checks if active >= max once and then starts at most one download. If set_max_concurrent is raised from 1 to 5 while 4 downloads are queued and active = 0, only 1 download starts; the remaining 3 stay queued until a future event triggers another call. Eventual consistency relies on each new download eventually completing or failing to chain the next one, adding latency.
Consider looping until active >= max or the queue is exhausted:
pub async fn on_slot_freed(&self) -> Result<(), AppError> {
let _guard = self.schedule_lock.lock().await;
loop {
let active = self.active_count.load(Ordering::SeqCst);
let max = self.max_concurrent.load(Ordering::SeqCst);
if active >= max { break; }
// ... fetch next queued download and start it, or break if queue empty
}
Ok(())
}| pub async fn handle_download_failed(self: &Arc<Self>, id: DownloadId) -> Result<(), AppError> { | ||
| // F5: only decrement if the download was actually active | ||
| let should_decrement = match self.download_repo.find_by_id(id)? { | ||
| Some(ref d) => matches!( | ||
| d.state(), | ||
| DownloadState::Downloading | ||
| | DownloadState::Waiting | ||
| | DownloadState::Checking | ||
| | DownloadState::Extracting | ||
| ), | ||
| None => false, | ||
| }; | ||
|
|
||
| if should_decrement { | ||
| self.safe_decrement(); // F1 | ||
| } | ||
|
|
||
| let mut download = match self.download_repo.find_by_id(id)? { | ||
| Some(d) => d, | ||
| None => { | ||
| self.on_slot_freed().await?; | ||
| return Ok(()); | ||
| } | ||
| }; |
There was a problem hiding this comment.
Double
find_by_id creates a TOCTOU window and redundant I/O
handle_download_failed calls find_by_id twice: once (line 133) to decide should_decrement based on the download's current state, and again (line 148) to fetch the download to operate on. Between the two reads the state could have changed (e.g., the download was re-queued or cancelled by a concurrent command), causing the decrement decision to be based on stale data.
Consolidate into a single read:
pub async fn handle_download_failed(self: &Arc<Self>, id: DownloadId) -> Result<(), AppError> {
let mut download = match self.download_repo.find_by_id(id)? {
Some(d) => d,
None => {
self.on_slot_freed().await?;
return Ok(());
}
};
let should_decrement = matches!(
download.state(),
DownloadState::Downloading | DownloadState::Waiting | DownloadState::Checking | DownloadState::Extracting
);
if should_decrement {
self.safe_decrement();
}
// ... continue with `download`
}There was a problem hiding this comment.
3 issues found across 5 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="CHANGELOG.md">
<violation number="1" location="CHANGELOG.md:46">
P3: Changelog incorrectly documents the EventBus bridge as using `mpsc::unbounded_channel`, but the implementation uses bounded `mpsc::channel(1024)`.</violation>
</file>
<file name="src-tauri/src/application/services/queue_manager.rs">
<violation number="1" location="src-tauri/src/application/services/queue_manager.rs:201">
P0: When a retry wakes up with no free slot, it returns without rescheduling, and `on_slot_freed` never picks `Retry` items, so retries can become permanently stuck.</violation>
<violation number="2" location="src-tauri/src/application/services/queue_manager.rs:260">
P1: Dropping events on channel overflow can lose completion/failure signals and leave `active_count` stale, preventing new downloads from being scheduled.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@CHANGELOG.md`:
- Line 46: The changelog entry incorrectly states the EventBus sync-to-async
bridge uses mpsc::unbounded_channel; update the entry to reflect that
src-tauri/src/application/services/queue_manager.rs actually uses a bounded
mpsc::channel(1024) (not unbounded), and explicitly note the bounded delivery
semantics (capacity limit and potential for dropped events under backpressure)
so readers understand the behavioral difference.
In `@src-tauri/src/application/services/queue_manager.rs`:
- Around line 196-205: schedule_retry currently returns when active_count >=
max_concurrent, leaving the download stuck in Retry; change this branch to
transition the download back to Queued (or re-enqueue it) and notify whatever
mechanism on_slot_freed() uses to discover queued items (e.g., push the id onto
the queued queue or update the queued state and signal the queue scanner) so it
will be picked up when a slot frees; reference schedule_retry, on_slot_freed,
active_count, max_concurrent, and the Queued/Retry states when making the
change.
- Around line 82-123: on_slot_freed currently starts only one download and
returns, leaving capacity unused; change it to loop pulling from
download_repo.find_by_state(DownloadState::Queued) (resorting each iteration or
sort once then iterate) and repeatedly start downloads until
active_count.load(...) >= max_concurrent.load(...) or no queued items remain.
For each download: keep the existing sequence—start() -> save() ->
publish(event), increment active_count.fetch_add(1, Ordering::SeqCst) before
calling engine.start(&download), and on engine.start error call
safe_decrement(), transition download.fail(...), save and publish fail event and
continue/return appropriately; ensure you still hold schedule_lock during the
loop and preserve existing error handling and event publication semantics
(functions involved: on_slot_freed, active_count, max_concurrent,
download_repo.find_by_state, download.start, download_repo.save,
event_bus.publish, engine.start, safe_decrement, download.fail).
- Around line 131-165: The handler currently checks pre-failure state with
find_by_id (which sees the post-failure Error state) and conditionally
decrements, causing slots to leak when retry timers are scheduled; fix by
removing the initial should_decrement block and instead fetch the download
(match self.download_repo.find_by_id(id)? -> Some(d)), call
self.safe_decrement() exactly once immediately after obtaining that Some(d)
(before you call d.retry()), and then in the retry Ok branch do NOT call
on_slot_freed() (since the slot was already released), leaving on_slot_freed()
only for the None case; this ensures a failed active download always releases
its slot even if the aggregate was already moved to Error and prevents leaving
active_count stale when schedule_retry(id, ...) is used.
- Around line 255-263: start_listening currently forwards all DomainEvent
variants into the bounded mpsc channel causing high-frequency DownloadProgress
events to fill the buffer; change the closure passed to self.event_bus.subscribe
so it filters events before sending (match on DomainEvent and only tx.try_send
for DownloadCompleted, DownloadPaused, DownloadFailed — ignore or handle
DownloadProgress elsewhere), use event.clone() only when matched, and keep the
existing drop/log behavior for failed try_send calls; reference the
start_listening method, event_bus.subscribe call, tx.try_send usage, and the
DomainEvent variants to locate where to apply the filter.
In `@src-tauri/src/lib.rs`:
- Line 25: The QueueManager type is exported but never instantiated or started;
modify the Tauri app startup to create and start it inside the builder's setup
hook: call QueueManager::new(), call its start_listening() method, and register
the instance with the Tauri state (e.g. via .manage(...)) so it lives for the
app lifetime; ensure any errors from QueueManager::new() are handled/logged and
that shutdown semantics are considered if start_listening() spawns background
tasks.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: cae979d8-7c9a-4ec9-bd85-c8ebd79a5747
📒 Files selected for processing (5)
CHANGELOG.mdsrc-tauri/src/application/mod.rssrc-tauri/src/application/services/mod.rssrc-tauri/src/application/services/queue_manager.rssrc-tauri/src/lib.rs
| plugin_view::PluginViewDto, | ||
| stats_view::{DailyVolumeDto, HostStatsDto, StatsViewDto}, | ||
| }; | ||
| pub use application::services::QueueManager; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n -C3 --type=rust 'tauri::Builder::default|\.setup\s*\(' src-tauri/src
rg -nP --type=rust '\bQueueManager::new\s*\(|\.\s*start_listening\s*\(' src-tauri/srcRepository: mpiton/vortex
Length of output: 615
🏁 Script executed:
#!/bin/bash
# Get full context of the run() function to see complete Builder setup
sed -n '26,50p' src-tauri/src/lib.rsRepository: mpiton/vortex
Length of output: 327
🏁 Script executed:
#!/bin/bash
# Get context around QueueManager::new() to see if it's in app initialization
sed -n '410,430p' src-tauri/src/application/services/queue_manager.rsRepository: mpiton/vortex
Length of output: 765
🏁 Script executed:
#!/bin/bash
# Search for .start_listening() calls anywhere in the codebase
rg -n '\.start_listening\s*\(' src-tauri/srcRepository: mpiton/vortex
Length of output: 39
🏁 Script executed:
#!/bin/bash
# Search for setup hooks or initialization logic
rg -n '\.setup\s*\(' src-tauri/srcRepository: mpiton/vortex
Length of output: 39
QueueManager is exported but never initialized or started in the app.
The export at line 25 makes the type available, but the actual app startup (lines 28–32) shows only tauri::Builder::default().run(...) with no .setup() hook. QueueManager::new() appears only in test code (line 418 in queue_manager.rs), and .start_listening() is not called anywhere in production code. Without this initialization and activation, the queue manager remains inert and the automatic slot/retry handling never runs.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src-tauri/src/lib.rs` at line 25, The QueueManager type is exported but never
instantiated or started; modify the Tauri app startup to create and start it
inside the builder's setup hook: call QueueManager::new(), call its
start_listening() method, and register the instance with the Tauri state (e.g.
via .manage(...)) so it lives for the app lifetime; ensure any errors from
QueueManager::new() are handled/logged and that shutdown semantics are
considered if start_listening() spawns background tasks.
- on_slot_freed now queries both Queued AND Retry state downloads, preventing retries from being permanently stuck in Retry state - on_slot_freed loops to fill all available slots, not just one - handle_download_failed always decrements active_count (the event proves the download was active) and calls on_slot_freed after retry - Single find_by_id in handle_download_failed eliminates TOCTOU window - Filter lifecycle events before channel: only forward Completed/Failed/ Paused, preventing DownloadProgress from flooding the bounded channel - Fix CHANGELOG to document bounded mpsc::channel(1024) correctly
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src-tauri/src/application/services/queue_manager.rs`:
- Around line 116-122: The rollback-generated DownloadFailed events are causing
double-free of a slot because safe_decrement() is already called when
engine.start() returns Err and the same failure is re-routed into
handle_download_failed(); update handle_download_failed() (and the other
duplicated calls around the mentioned blocks) to skip calling safe_decrement()
for rollback-generated failures: detect the rollback case by checking the
engine.start() error variant or add/read a flag on the DownloadFailed
event/Download (e.g., rollback_generated or no_task_spawned) and only call
safe_decrement() when the failure represents a previously-running task that
actually consumed a slot; remove the extra safe_decrement() calls in the
duplicated blocks (the ones that publish DownloadFailed after download.fail())
so active_count and max_concurrent remain correct.
- Around line 283-285: The retry_delay function computes 10 *
2u64.pow(attempt.saturating_sub(1)) which can overflow for large attempt values
before the min(300s) clamp; change retry_delay to clamp the exponent before
calling pow (e.g., let exp = attempt.saturating_sub(1).min(5)) and use that exp
when computing the multiplier so the intermediate 2^exp cannot overflow and the
final Duration.min(Duration::from_secs(300)) still applies; update references
inside the retry_delay function (attempt, exp, pow, Duration::from_secs, min)
accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d1c02da1-9d67-4f9d-aa97-a85d14b50c43
📒 Files selected for processing (2)
CHANGELOG.mdsrc-tauri/src/application/services/queue_manager.rs
✅ Files skipped from review due to trivial changes (1)
- CHANGELOG.md
- Filter pending retries from on_slot_freed candidates using retry_cancellations map, preventing double-start before backoff expires - Simplify schedule_retry: timer removes from pending set then delegates to on_slot_freed instead of starting directly (respects priority/FIFO) - Remove DownloadFailed publish from engine.start() rollback paths to prevent double-decrement of active_count via re-entrant event handling - Clamp retry_delay exponent to 5 before computing 2^exp, preventing u64 overflow for attempt >= 62
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src-tauri/src/application/services/queue_manager.rs (1)
216-221: Clarify variable namedominated.The name
dominatedis semantically unclear in this context. Consider renaming to something more descriptive likeis_scheduling_eventorshould_forwardto better convey intent.- let dominated = matches!( + let is_scheduling_event = matches!( event, DomainEvent::DownloadCompleted { .. } | DomainEvent::DownloadPaused { .. } | DomainEvent::DownloadFailed { .. } ); - if dominated && tx.try_send(event.clone()).is_err() { + if is_scheduling_event && tx.try_send(event.clone()).is_err() {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src-tauri/src/application/services/queue_manager.rs` around lines 216 - 221, Rename the unclear variable dominated to a descriptive name (e.g., is_scheduling_event or should_forward) in the match and all its usages within queue_manager.rs: update the let binding `let dominated = matches!(event, DomainEvent::DownloadCompleted { .. } | DomainEvent::DownloadPaused { .. } | DomainEvent::DownloadFailed { .. });` to `let is_scheduling_event = ...` (or `should_forward`) and replace subsequent references to `dominated` in functions/methods handling DomainEvent to use the new identifier so intent is clear.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src-tauri/src/application/services/queue_manager.rs`:
- Around line 216-221: Rename the unclear variable dominated to a descriptive
name (e.g., is_scheduling_event or should_forward) in the match and all its
usages within queue_manager.rs: update the let binding `let dominated =
matches!(event, DomainEvent::DownloadCompleted { .. } |
DomainEvent::DownloadPaused { .. } | DomainEvent::DownloadFailed { .. });` to
`let is_scheduling_event = ...` (or `should_forward`) and replace subsequent
references to `dominated` in functions/methods handling DomainEvent to use the
new identifier so intent is clear.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0f96d003-505e-4f23-98c2-31f2749c8f51
📒 Files selected for processing (1)
src-tauri/src/application/services/queue_manager.rs
There was a problem hiding this comment.
1 issue found across 1 file (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="src-tauri/src/application/services/queue_manager.rs">
<violation number="1" location="src-tauri/src/application/services/queue_manager.rs:129">
P2: Engine start failures now drop the DownloadFailed event, so EventBus subscribers (including the Tauri bridge) never see a failure when start() fails. This can leave the UI thinking the download is still active. Consider publishing the failure event and gating the QueueManager’s failure handler to avoid double-decrementing.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
Restore event_bus.publish(fail_event) on engine.start() rollback so the Tauri bridge and UI see the failure. Gate handle_download_failed decrement on download state: downloads already in Error (rollback-generated events) skip safe_decrement since the rollback path already decremented.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src-tauri/src/application/services/queue_manager.rs (2)
229-234: Clarify variable namedominated.The variable name
dominateddoesn't convey its purpose. Consider renaming tois_lifecycle_eventorshould_forward.- let dominated = matches!( + let is_lifecycle_event = matches!( event, DomainEvent::DownloadCompleted { .. } | DomainEvent::DownloadPaused { .. } | DomainEvent::DownloadFailed { .. } ); - if dominated && tx.try_send(event.clone()).is_err() { + if is_lifecycle_event && tx.try_send(event.clone()).is_err() {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src-tauri/src/application/services/queue_manager.rs` around lines 229 - 234, The variable name `dominated` in queue_manager.rs is unclear; rename it to a descriptive boolean like `is_lifecycle_event` or `should_forward` to reflect that it detects lifecycle download events (e.g., DomainEvent::DownloadCompleted, DownloadPaused, DownloadFailed). Update the binding (let dominated = ...) to use the new name and replace all subsequent references to `dominated` in the function handling `event` so logic and tests continue to work unchanged; ensure pattern match list (DomainEvent::DownloadCompleted | DownloadPaused | DownloadFailed) remains identical.
124-136: Early return onengine.start()failure skips remaining candidates.If
engine.start()fails for one download (e.g., due to a download-specific issue like an invalid URL), the method returns early without attempting to start other queued candidates. This could leave slots unfilled when there are valid candidates remaining.Consider continuing the loop after handling the failure:
Suggested change
if let Err(engine_err) = self.engine.start(&download) { // Roll back: no task was spawned, so decrement and persist Error. - // Publish the event so other subscribers (Tauri bridge/UI) see - // the failure. handle_download_failed gates its own decrement - // on the download's state: it will see Error (already saved - // here) and skip the decrement, preventing double-count. self.safe_decrement(); if let Ok(fail_event) = download.fail(engine_err.to_string()) { let _ = self.download_repo.save(&download); self.event_bus.publish(fail_event); } - return Err(AppError::Domain(engine_err)); + tracing::warn!("on_slot_freed: engine.start failed for {:?}: {engine_err}", download.id()); + continue; // Try next candidate }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src-tauri/src/application/services/queue_manager.rs` around lines 124 - 136, The early return inside the loop when self.engine.start(&download) fails causes remaining queued candidates to be skipped; change the control flow in the loop so that after handling the failure (call self.safe_decrement(), convert failure via download.fail(engine_err.to_string()), save via self.download_repo.save(&download), and publish via self.event_bus.publish(fail_event) as currently done) you do NOT return Err(AppError::Domain(engine_err)) but instead continue to the next candidate; if you need to surface errors, aggregate or log them rather than short-circuiting the loop so other candidates are still attempted.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src-tauri/src/application/services/queue_manager.rs`:
- Around line 223-255: The event loop in QueueManager::start_listening doesn't
handle cancellation so active_count can leak; add a DownloadCancelled variant to
DomainEvent, ensure the download engine emits DomainEvent::DownloadCancelled
when cancel() completes successfully, and update start_listening's match arm
(alongside DownloadCompleted/DownloadPaused) to call
self.decrement_and_schedule(). Also verify any codepaths that publish
DownloadFailed still call handle_download_failed(id) as before.
---
Nitpick comments:
In `@src-tauri/src/application/services/queue_manager.rs`:
- Around line 229-234: The variable name `dominated` in queue_manager.rs is
unclear; rename it to a descriptive boolean like `is_lifecycle_event` or
`should_forward` to reflect that it detects lifecycle download events (e.g.,
DomainEvent::DownloadCompleted, DownloadPaused, DownloadFailed). Update the
binding (let dominated = ...) to use the new name and replace all subsequent
references to `dominated` in the function handling `event` so logic and tests
continue to work unchanged; ensure pattern match list
(DomainEvent::DownloadCompleted | DownloadPaused | DownloadFailed) remains
identical.
- Around line 124-136: The early return inside the loop when
self.engine.start(&download) fails causes remaining queued candidates to be
skipped; change the control flow in the loop so that after handling the failure
(call self.safe_decrement(), convert failure via
download.fail(engine_err.to_string()), save via
self.download_repo.save(&download), and publish via
self.event_bus.publish(fail_event) as currently done) you do NOT return
Err(AppError::Domain(engine_err)) but instead continue to the next candidate; if
you need to surface errors, aggregate or log them rather than short-circuiting
the loop so other candidates are still attempted.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3324f8ca-1b06-4f16-bbce-87ac56b6bb58
📒 Files selected for processing (1)
src-tauri/src/application/services/queue_manager.rs
Cancelled downloads exited silently without publishing an event, leaking
active slots and eventually starving the queue.
- Add DownloadCancelled { id } variant to DomainEvent
- Emit DownloadCancelled from SegmentedDownloadEngine when cancel_token
fires without segment errors
- Handle DownloadCancelled in QueueManager (decrement + reschedule)
- Forward DownloadCancelled through Tauri event bridge as
"download-cancelled"
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src-tauri/src/adapters/driven/event/tauri_bridge.rs (1)
27-27: Add explicit bridge tests forDownloadCancelledevent contract.The mapping looks correct, but this new frontend-facing event name/payload pair should be covered by a dedicated assertion to prevent silent contract drift.
✅ Minimal test additions
@@ fn test_event_name_download_variants() { @@ assert_eq!( + event_name(&DomainEvent::DownloadCancelled { id: DownloadId(1) }), + "download-cancelled" + ); + assert_eq!( event_name(&DomainEvent::DownloadExtracting { id: DownloadId(1) }), "download-extracting" ); @@ fn test_event_payload_segment_camel_case() { @@ } + + #[test] + fn test_event_payload_download_cancelled() { + let event = DomainEvent::DownloadCancelled { id: DownloadId(9) }; + let (name, payload) = to_tauri_event(&event); + assert_eq!(name, "download-cancelled"); + assert_eq!(payload["id"], 9); + } }Also applies to: 47-47
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src-tauri/src/adapters/driven/event/tauri_bridge.rs` at line 27, Add an explicit unit test that asserts the bridge mapping for DomainEvent::DownloadCancelled produces the frontend event name "download-cancelled" and the expected payload shape; locate the match arm handling DomainEvent::DownloadCancelled in tauri_bridge.rs and write a test that constructs a DownloadCancelled event, passes it through the same function/mapper used by other event tests (reuse the existing test helper used for other DomainEvent cases), and assert both the event name string equals "download-cancelled" and the serialized payload matches the expected structure so the bridge contract cannot silently drift.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src-tauri/src/adapters/driven/network/download_engine.rs`:
- Around line 280-282: Early-return cancel paths (e.g., during HEAD/setup)
currently return without emitting a terminal event, leaving consumers
unsynchronized; update the early-return branches that check
cancel_token.is_cancelled() to call
event_bus.publish(DomainEvent::DownloadCancelled { id: download_id }) before
returning (use the same event_bus and download_id symbols as in the later
branch), ensuring you publish once and then return to avoid duplicate events.
---
Nitpick comments:
In `@src-tauri/src/adapters/driven/event/tauri_bridge.rs`:
- Line 27: Add an explicit unit test that asserts the bridge mapping for
DomainEvent::DownloadCancelled produces the frontend event name
"download-cancelled" and the expected payload shape; locate the match arm
handling DomainEvent::DownloadCancelled in tauri_bridge.rs and write a test that
constructs a DownloadCancelled event, passes it through the same function/mapper
used by other event tests (reuse the existing test helper used for other
DomainEvent cases), and assert both the event name string equals
"download-cancelled" and the serialized payload matches the expected structure
so the bridge contract cannot silently drift.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b1021ba9-76f9-48ec-8547-1984056d2f5e
📒 Files selected for processing (4)
src-tauri/src/adapters/driven/event/tauri_bridge.rssrc-tauri/src/adapters/driven/network/download_engine.rssrc-tauri/src/application/services/queue_manager.rssrc-tauri/src/domain/event.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- src-tauri/src/application/services/queue_manager.rs
…ngine Early-return cancel points (during HEAD request and setup phases) exited without publishing a terminal event, leaving QueueManager active_count out of sync. Now all three cancel paths emit DownloadCancelled.
Summary
QueueManagerapplication service for download queue schedulingChanges
src-tauri/src/application/services/queue_manager.rs— QueueManager struct withon_slot_freed(),schedule_retry(),handle_download_failed(),start_listening(),set_max_concurrent()src-tauri/src/application/services/mod.rs— Module re-exportsrc-tauri/src/application/mod.rs— Addedpub mod services;src-tauri/src/lib.rs— ExportedQueueManagerin public APICHANGELOG.md— Added queue manager entryDesign decisions
Arc<dyn Trait>dependency injection (consistent with CommandBus pattern)tokio::sync::Mutex<()>schedule lock ensureson_slot_freed()idempotencyfetch_updatewithsaturating_subpreventsAtomicUsizeunderflowmpsc::channel(1024)bridges sync EventBus subscriber to async processinginto_inner()instead of panicTesting
Test plan
on_slot_freedstarts next queued downloadSummary by cubic
Implements
QueueManagerto schedule downloads by priority with configurable concurrency, exponential backoff, and a circuit breaker. Handles cancellations (including early cancel paths) to free slots and prevent starvation; aligns with task 10.New Features
Retryitems; retries are cancellable.mpsc::channel(1024)with lifecycle filtering.Bug Fixes
DownloadCancelled(engine emits on cancel, including HEAD/setup paths;QueueManagerdecrements and reschedules; Tauri forwards as "download-cancelled").DownloadFailedand gates decrement on state;on_slot_freedignores retries still in backoff;retry_delayclamps exponent to 5.Written for commit bb871cc. Summary will update on new commits.
Summary by CodeRabbit
New Features
Reliability
Events
Documentation
Tests