Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src-tauri/src/adapters/driven/event/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod tauri_bridge;
pub mod tokio_event_bus;

pub use tauri_bridge::spawn_tauri_event_bridge;
pub use tokio_event_bus::TokioEventBus;
236 changes: 236 additions & 0 deletions src-tauri/src/adapters/driven/event/tauri_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
use serde_json::json;
use tauri::{AppHandle, Emitter};

use crate::domain::event::DomainEvent;
use crate::domain::ports::driven::EventBus;

/// Subscribes to the EventBus and emits each event to the Tauri webview.
pub fn spawn_tauri_event_bridge(app_handle: AppHandle, event_bus: &dyn EventBus) {
event_bus.subscribe(Box::new(move |event: &DomainEvent| {
let (name, payload) = to_tauri_event(event);
app_handle.emit(name, payload).ok();
}));
Comment on lines +8 to +12
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Redundant app_handle clone

app_handle is taken by value (AppHandle is Clone + Send) and then immediately cloned into handle. The original binding is never used after the clone. You can move app_handle directly into the closure and drop the intermediate binding.

Suggested change
pub fn spawn_tauri_event_bridge(app_handle: AppHandle, event_bus: &dyn EventBus) {
let handle = app_handle.clone();
event_bus.subscribe(Box::new(move |event: &DomainEvent| {
let (name, payload) = to_tauri_event(event);
handle.emit(name, payload).ok(); // Best-effort, don't crash on emit failure
}));
pub fn spawn_tauri_event_bridge(app_handle: AppHandle, event_bus: &dyn EventBus) {
event_bus.subscribe(Box::new(move |event: &DomainEvent| {
let (name, payload) = to_tauri_event(event);
app_handle.emit(name, payload).ok(); // Best-effort, don't crash on emit failure
}));
}

Fix in Claude Code

}

fn event_name(event: &DomainEvent) -> &'static str {
match event {
DomainEvent::DownloadCreated { .. } => "download-created",
DomainEvent::DownloadStarted { .. } => "download-started",
DomainEvent::DownloadPaused { .. } => "download-paused",
DomainEvent::DownloadResumed { .. } => "download-resumed",
DomainEvent::DownloadResumedFromWait { .. } => "download-resumed-from-wait",
DomainEvent::DownloadCompleted { .. } => "download-completed",
DomainEvent::DownloadFailed { .. } => "download-failed",
DomainEvent::DownloadRetrying { .. } => "download-retrying",
DomainEvent::DownloadWaiting { .. } => "download-waiting",
DomainEvent::DownloadChecking { .. } => "download-checking",
DomainEvent::DownloadExtracting { .. } => "download-extracting",
DomainEvent::DownloadProgress { .. } => "download-progress",
DomainEvent::SegmentStarted { .. } => "segment-started",
DomainEvent::SegmentCompleted { .. } => "segment-completed",
DomainEvent::SegmentFailed { .. } => "segment-failed",
DomainEvent::PluginLoaded { .. } => "plugin-loaded",
DomainEvent::PluginUnloaded { .. } => "plugin-unloaded",
DomainEvent::PackageCreated { .. } => "package-created",
}
}

fn event_payload(event: &DomainEvent) -> serde_json::Value {
match event {
DomainEvent::DownloadCreated { id }
| DomainEvent::DownloadStarted { id }
| DomainEvent::DownloadPaused { id }
| DomainEvent::DownloadResumed { id }
| DomainEvent::DownloadResumedFromWait { id }
| DomainEvent::DownloadCompleted { id }
| DomainEvent::DownloadWaiting { id }
| DomainEvent::DownloadChecking { id }
| DomainEvent::DownloadExtracting { id } => json!({ "id": id.0 }),

DomainEvent::DownloadFailed { id, error } => json!({ "id": id.0, "error": error }),
DomainEvent::DownloadRetrying { id, attempt } => {
json!({ "id": id.0, "attempt": attempt })
}
DomainEvent::DownloadProgress {
id,
downloaded_bytes,
total_bytes,
} => {
json!({ "id": id.0, "downloadedBytes": downloaded_bytes, "totalBytes": total_bytes })
}

DomainEvent::SegmentStarted {
download_id,
segment_id,
}
| DomainEvent::SegmentCompleted {
download_id,
segment_id,
} => {
json!({ "downloadId": download_id.0, "segmentId": segment_id })
}
DomainEvent::SegmentFailed {
download_id,
segment_id,
error,
} => {
json!({ "downloadId": download_id.0, "segmentId": segment_id, "error": error })
}

DomainEvent::PluginLoaded { name, version } => {
json!({ "name": name, "version": version })
}
DomainEvent::PluginUnloaded { name } => json!({ "name": name }),
DomainEvent::PackageCreated { id, name } => json!({ "id": id.to_string(), "name": name }),
}
}

fn to_tauri_event(event: &DomainEvent) -> (&'static str, serde_json::Value) {
(event_name(event), event_payload(event))
}

#[cfg(test)]
mod tests {
use super::*;
use crate::domain::model::download::DownloadId;

#[test]
fn test_event_name_download_variants() {
assert_eq!(
event_name(&DomainEvent::DownloadCreated { id: DownloadId(1) }),
"download-created"
);
assert_eq!(
event_name(&DomainEvent::DownloadStarted { id: DownloadId(1) }),
"download-started"
);
assert_eq!(
event_name(&DomainEvent::DownloadPaused { id: DownloadId(1) }),
"download-paused"
);
assert_eq!(
event_name(&DomainEvent::DownloadResumed { id: DownloadId(1) }),
"download-resumed"
);
assert_eq!(
event_name(&DomainEvent::DownloadResumedFromWait { id: DownloadId(1) }),
"download-resumed-from-wait"
);
assert_eq!(
event_name(&DomainEvent::DownloadCompleted { id: DownloadId(1) }),
"download-completed"
);
assert_eq!(
event_name(&DomainEvent::DownloadFailed {
id: DownloadId(1),
error: "err".into()
}),
"download-failed"
);
assert_eq!(
event_name(&DomainEvent::DownloadRetrying {
id: DownloadId(1),
attempt: 1
}),
"download-retrying"
);
assert_eq!(
event_name(&DomainEvent::DownloadWaiting { id: DownloadId(1) }),
"download-waiting"
);
assert_eq!(
event_name(&DomainEvent::DownloadChecking { id: DownloadId(1) }),
"download-checking"
);
assert_eq!(
event_name(&DomainEvent::DownloadExtracting { id: DownloadId(1) }),
"download-extracting"
);
assert_eq!(
event_name(&DomainEvent::DownloadProgress {
id: DownloadId(1),
downloaded_bytes: 0,
total_bytes: 100
}),
"download-progress"
);
}

#[test]
fn test_event_name_segment_variants() {
assert_eq!(
event_name(&DomainEvent::SegmentStarted {
download_id: DownloadId(1),
segment_id: 0
}),
"segment-started"
);
assert_eq!(
event_name(&DomainEvent::SegmentCompleted {
download_id: DownloadId(1),
segment_id: 0
}),
"segment-completed"
);
assert_eq!(
event_name(&DomainEvent::SegmentFailed {
download_id: DownloadId(1),
segment_id: 0,
error: "err".into()
}),
"segment-failed"
);
}

#[test]
fn test_event_name_plugin_variants() {
assert_eq!(
event_name(&DomainEvent::PluginLoaded {
name: "p".into(),
version: "1.0".into()
}),
"plugin-loaded"
);
assert_eq!(
event_name(&DomainEvent::PluginUnloaded { name: "p".into() }),
"plugin-unloaded"
);
assert_eq!(
event_name(&DomainEvent::PackageCreated {
id: 1,
name: "pkg".into()
}),
"package-created"
);
}

#[test]
fn test_event_payload_download_progress_camel_case() {
let event = DomainEvent::DownloadProgress {
id: DownloadId(7),
downloaded_bytes: 512,
total_bytes: 1024,
};
let (_, payload) = to_tauri_event(&event);
assert_eq!(payload["id"], 7);
assert_eq!(payload["downloadedBytes"], 512);
assert_eq!(payload["totalBytes"], 1024);
// Verify snake_case keys are not present
assert!(payload.get("downloaded_bytes").is_none());
assert!(payload.get("total_bytes").is_none());
}

#[test]
fn test_event_payload_segment_camel_case() {
let event = DomainEvent::SegmentCompleted {
download_id: DownloadId(3),
segment_id: 2,
};
let (_, payload) = to_tauri_event(&event);
assert_eq!(payload["downloadId"], 3);
assert_eq!(payload["segmentId"], 2);
// Verify snake_case keys are not present
assert!(payload.get("download_id").is_none());
assert!(payload.get("segment_id").is_none());
}
}
119 changes: 119 additions & 0 deletions src-tauri/src/adapters/driven/event/tokio_event_bus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use tokio::sync::broadcast;

use crate::domain::event::DomainEvent;
use crate::domain::ports::driven::EventBus;

pub struct TokioEventBus {
sender: broadcast::Sender<DomainEvent>,
}

impl TokioEventBus {
pub fn new(capacity: usize) -> Self {
let capacity = capacity.max(1);
let (sender, _) = broadcast::channel(capacity);
Self { sender }
}
}

impl EventBus for TokioEventBus {
fn publish(&self, event: DomainEvent) {
let _ = self.sender.send(event);
}

fn subscribe(&self, handler: Box<dyn Fn(&DomainEvent) + Send + Sync + 'static>) {
let mut rx = self.sender.subscribe();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => handler(&event),
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(skipped = n, "event bus subscriber lagged, events dropped");
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::domain::model::download::DownloadId;
use std::sync::{Arc, Mutex};
use tokio::sync::Notify;

#[tokio::test]
async fn test_publish_and_receive_event() {
let bus = TokioEventBus::new(16);
let received: Arc<Mutex<Vec<DomainEvent>>> = Arc::new(Mutex::new(Vec::new()));
let notify = Arc::new(Notify::new());
let received_clone = received.clone();
let notify_clone = notify.clone();

bus.subscribe(Box::new(move |event: &DomainEvent| {
received_clone.lock().unwrap().push(event.clone());
notify_clone.notify_one();
}));

bus.publish(DomainEvent::DownloadStarted { id: DownloadId(1) });
notify.notified().await;

let events = received.lock().unwrap();
assert_eq!(events.len(), 1);
assert_eq!(
events[0],
DomainEvent::DownloadStarted { id: DownloadId(1) }
);
}

#[tokio::test]
async fn test_multiple_subscribers_receive_same_event() {
let bus = TokioEventBus::new(16);
let received1: Arc<Mutex<Vec<DomainEvent>>> = Arc::new(Mutex::new(Vec::new()));
let received2: Arc<Mutex<Vec<DomainEvent>>> = Arc::new(Mutex::new(Vec::new()));
let notify1 = Arc::new(Notify::new());
let notify2 = Arc::new(Notify::new());
let clone1 = received1.clone();
let clone2 = received2.clone();
let n1 = notify1.clone();
let n2 = notify2.clone();

bus.subscribe(Box::new(move |event: &DomainEvent| {
clone1.lock().unwrap().push(event.clone());
n1.notify_one();
}));
bus.subscribe(Box::new(move |event: &DomainEvent| {
clone2.lock().unwrap().push(event.clone());
n2.notify_one();
}));

bus.publish(DomainEvent::DownloadCompleted { id: DownloadId(42) });
notify1.notified().await;
notify2.notified().await;

assert_eq!(received1.lock().unwrap().len(), 1);
assert_eq!(received2.lock().unwrap().len(), 1);
assert_eq!(
received1.lock().unwrap()[0],
DomainEvent::DownloadCompleted { id: DownloadId(42) }
);
assert_eq!(
received2.lock().unwrap()[0],
DomainEvent::DownloadCompleted { id: DownloadId(42) }
);
}

#[test]
fn test_publish_no_subscriber_doesnt_block() {
let bus = TokioEventBus::new(16);
bus.publish(DomainEvent::DownloadStarted { id: DownloadId(99) });
bus.publish(DomainEvent::DownloadCompleted { id: DownloadId(99) });
}

#[test]
fn test_new_with_zero_capacity_uses_minimum() {
let bus = TokioEventBus::new(0);
bus.publish(DomainEvent::DownloadStarted { id: DownloadId(1) });
}
}
1 change: 1 addition & 0 deletions src-tauri/src/adapters/driven/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Driven adapters — implementations of domain port traits.

pub mod event;
pub mod sqlite;
2 changes: 2 additions & 0 deletions src-tauri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ mod application;
pub mod domain;

// Public API — concrete types for app wiring (main.rs, Tauri setup, integration tests)
pub use adapters::driven::event::TokioEventBus;
pub use adapters::driven::event::spawn_tauri_event_bridge;
pub use adapters::driven::sqlite::connection;
pub use adapters::driven::sqlite::download_read_repo::SqliteDownloadReadRepo;
pub use adapters::driven::sqlite::download_repo::SqliteDownloadRepo;
Expand Down
Loading
Loading