Skip to content

Commit d681c8d

Browse files
committed
AI: Stream folder name suggestions
Replace `getFolderSuggestions` (await-array) with `streamFolderSuggestions` in `NewFolderDialog`. The first suggestion now appears in <500 ms; the rest trickle in as the LLM emits them. Direct application of two design principles: "show progress, communicate what's actually happening" and "long operations are immediately cancelable, stopping background work too." Local LLM is the strongest beneficiary (3B-param model on Apple Silicon emits ~30-60 tok/s, so the perceived speedup is dramatic). Reasoning cloud models (`gpt-5*`, `o3*`) gain almost as much — time-to-first-token can be 1-3 s, but with streaming the user sees text the moment it starts. Architecture - New backend: `client::chat_completion_stream` returns `BoxStream<Result<String, AiError>>` of content chunks, filtering out reasoning / thought-signature / tool-call chunks. - New `StreamingSanitizer` line-buffers across chunk boundaries, runs the existing `sanitize_one_line` per completed line, dedupes case-insensitively against existing names + already-emitted, caps at MAX_SUGGESTIONS. - Two new Tauri commands in `ai/suggestions.rs`: - `stream_folder_suggestions(request_id, listing_id, current_path, include_hidden, on_event: Channel<SuggestionStreamEvent>)`. Always returns Ok(()); all signaling goes through the Channel. - `cancel_folder_suggestions(request_id)`. Idempotent. - Cancellation registry (`STREAM_CANCEL_TOKENS`) in `manager.rs` keyed by request id, using `tokio_util::sync::CancellationToken`. Token is registered synchronously in the command body before any await so cancel arriving before registration is impossible. - `tokio::select!` between the stream and the token cancels mid-stream; dropping the genai stream closes the reqwest body, cuts billing, frees local-LLM compute. - Frontend: `streamFolderSuggestions(...)` returns `{ promise, cancel }`. `NewFolderDialog.onDestroy` calls cancel — the explicit signal Tauri 2 needs because `Channel::send` is fire-and-forget and can't detect frontend drop. UX - "Loading..." text gone. Replaced with a pulsing skeleton chip at the end of the list (same dimensions; no reflow on completion). Live region (`aria-live="polite"`) so screen readers announce each new suggestion. - Existing names are clickable while later ones still stream. Empty stream collapses the section silently (graceful degradation). Tests - 11 new sanitizer unit tests (chunked splits, dedupe, cap, halt-on-emit- false, finish() flushing trailing-no-newline). - 5 registry tests covering concurrent ids, idempotent double-cancel, unknown-id no-op. - 4 integration tests against an axum-based mock SSE server (chunk order, empty stream, drop-mid-stream, HTTP 500 → ServerError). Wiremock can't chunk-deliver SSE bodies; axum can. - 5 frontend vitest cases for the dialog (incremental render, failed/ cancelled keep already-streamed visible, empty hides section, unmount cancels). - 4 new real-API #[ignore]-gated smokes: 3 OpenAI streaming variants (gpt-4o-mini, gpt-5-mini, o3-mini) plus claude-3-5-haiku for Anthropic native streaming protocol coverage. All 3 OpenAI smokes pass live. Plan: docs/specs/ai-streaming-suggestions-plan.md (3 review rounds).
1 parent 0c45a46 commit d681c8d

18 files changed

Lines changed: 1905 additions & 58 deletions

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/desktop/src-tauri/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ reqwest = { version = "0.13", features = ["json", "rustls", "stream", "multipart
7474
axum = "0.8"
7575
tokio = { version = "1", features = ["rt-multi-thread", "net", "time", "sync", "macros"] }
7676
futures-util = "0.3"
77+
# tokio-util: provides `CancellationToken` for cooperative cancellation of streaming
78+
# AI suggestions (`ai/suggestions.rs::stream_folder_suggestions`). The `rt` feature
79+
# enables runtime integration (token wakeups via tokio's reactor). 0.7.18 is the latest
80+
# stable (published 2026-01-04). MIT license; tracks tokio's release cadence.
81+
tokio-util = { version = "0.7.18", features = ["rt"] }
7782
tower-http = { version = "0.6", features = ["cors"] }
7883
tauri-plugin-updater = "2"
7984
tauri-plugin-process = "2"

apps/desktop/src-tauri/src/ai/CLAUDE.md

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,21 @@ Three provider modes:
1212
| File | Purpose |
1313
|---|---|
1414
| `mod.rs` | Types (`AiStatus`, `AiState`, `DownloadProgress`, `ModelInfo`), model registry (`AVAILABLE_MODELS`, `DEFAULT_MODEL_ID`), `is_local_ai_supported()` gate |
15-
| `manager.rs` | Central coordinator. Global `Mutex<Option<ManagerState>>` singleton. Most Tauri commands live here. Stores provider + cloud-AI config (`cloud_api_key`/`cloud_base_url`/`cloud_model`). Exposes `resolve_backend() -> BackendResolution` so callers don't reinvent provider routing. |
15+
| `manager.rs` | Central coordinator. Global `Mutex<Option<ManagerState>>` singleton. Most Tauri commands live here. Stores provider + cloud-AI config (`cloud_api_key`/`cloud_base_url`/`cloud_model`). Exposes `resolve_backend() -> BackendResolution` so callers don't reinvent provider routing. Also owns the `STREAM_CANCEL_TOKENS` registry (`register_stream`/`unregister_stream`/`cancel_stream`) for in-flight `stream_folder_suggestions` cancellation. |
1616
| `download.rs` | HTTP streaming download with Range-based resume. Emits `ai-download-progress` events (200ms throttle). Cooperative cancellation via function parameter (`Fn() -> bool`). |
1717
| `extract.rs` | Copies bundled `llama-server` binary + dylibs from `resources/ai/` to the AI data dir. Sets Unix permissions, handles symlinks. |
1818
| `process.rs` | Spawns child process with `DYLD_LIBRARY_PATH` set. Instant SIGKILL to stop (llama-server is stateless; macOS reclaims all GPU/mmap resources). `kill_process` for fire-and-forget (quit, orphans), `kill_and_reap_in_background` for normal operation (reaps zombie in bg thread). `kill_stale_llama_servers` for belt-and-suspenders orphan cleanup by process name. Port discovery via `bind(:0)`. |
19-
| `client.rs` | `genai`-backed chat client. `AiBackend` is a struct bundling a long-lived `genai::Client` with a model name; built via `AiBackend::local(port)` or `AiBackend::remote(api_key, base_url, model)`. The model name picks the adapter (`claude-*` → Anthropic native, `gemini-*` → Gemini native, `gpt-5*`/`*-pro`/`*-codex` → OpenAI Responses API, etc.). Auto-omits `temperature`/`top_p` for OpenAI Responses adapter and for chat-completions reasoning models (`o1*`, `o3*`, `o4*`, `chatgpt-*`, `gpt-5*` defense-in-depth) and substitutes `ReasoningEffort::Low`. Local backend forces the OpenAI adapter via a `ServiceTargetResolver` pinning endpoint to `http://127.0.0.1:<port>/v1/`. |
19+
| `client.rs` | `genai`-backed chat client. `AiBackend` is a struct bundling a long-lived `genai::Client` with a model name; built via `AiBackend::local(port)` or `AiBackend::remote(api_key, base_url, model)`. The model name picks the adapter (`claude-*` → Anthropic native, `gemini-*` → Gemini native, `gpt-5*`/`*-pro`/`*-codex` → OpenAI Responses API, etc.). Auto-omits `temperature`/`top_p` for OpenAI Responses adapter and for chat-completions reasoning models (`o1*`, `o3*`, `o4*`, `chatgpt-*`, `gpt-5*` defense-in-depth) and substitutes `ReasoningEffort::Low`. Local backend forces the OpenAI adapter via a `ServiceTargetResolver` pinning endpoint to `http://127.0.0.1:<port>/v1/`. Exposes both `chat_completion` (full response) and `chat_completion_stream` (returns a `BoxStream<Result<String, AiError>>` of content chunks; reasoning/thought-signature/tool-call chunks filtered out). |
2020
| `client_integration_test.rs` | `wiremock`-based tests covering request shape per adapter (chat completions vs Responses API), parsing, error mapping. Always run in CI. |
21-
| `client_real_openai_test.rs` | `#[ignore]`-gated smoke tests against `api.openai.com`. Run with `OPENAI_API_KEY=$(security find-generic-password -a "$USER" -s "OPENAI_API_KEY" -w) cargo nextest run --lib --run-ignored only ai::client_real_openai_test`. Costs ~$0.001 per full run. Use after refactors that touch `client.rs`. |
22-
| `suggestions.rs` | Builds few-shot prompt from listing cache, routes to configured backend, sanitizes response. |
21+
| `client_streaming_test.rs` | `axum`-based SSE mock server tests for `chat_completion_stream`: chunks arrive in order, empty streams end cleanly, drop-mid-stream closes the connection, HTTP 5xx maps to `ServerError`. Always run in CI. (Wiremock can't chunk-deliver SSE bodies — see Gotchas.) |
22+
| `client_real_openai_test.rs` | `#[ignore]`-gated smoke tests against `api.openai.com`, including streaming variants for `gpt-4o-mini`, `gpt-5-mini`, `o3-mini`. Run with `OPENAI_API_KEY=$(security find-generic-password -a "$USER" -s "OPENAI_API_KEY" -w) cargo nextest run --lib --run-ignored only ai::client_real_openai_test`. Costs ~$0.001 per full run. |
23+
| `client_real_anthropic_test.rs` | `#[ignore]`-gated smoke tests against `api.anthropic.com` (chat + streaming variants of `claude-3-5-haiku-latest`). Anthropic's native streaming protocol differs from OpenAI's SSE shape; without this we'd only test the OpenAI lineage. Run with `ANTHROPIC_API_KEY=$(security find-generic-password -a "$USER" -s "ANTHROPIC_API_KEY" -w) cargo nextest run --lib --run-ignored only ai::client_real_anthropic_test`. |
24+
| `suggestions.rs` | Builds few-shot prompt from listing cache, routes to configured backend, sanitizes response. Also exposes `stream_folder_suggestions` + `cancel_folder_suggestions` Tauri commands and a `StreamingSanitizer` that runs the per-line sanitizer on streamed chunks (line-buffers across chunk boundaries, dedupes case-insensitively against existing names + already-emitted, caps at `MAX_SUGGESTIONS`). |
25+
| `suggestions_streaming_test.rs` | Tests for the `manager::register_stream`/`unregister_stream`/`cancel_stream` registry — concurrent ids don't interfere, double-cancel is idempotent, missing id is a no-op. |
2326

2427
### Tauri commands
2528

26-
Core: `get_ai_status`, `get_ai_model_info`, `get_ai_runtime_status`, `configure_ai`, `start_ai_server`, `stop_ai_server`, `check_ai_connection`, `start_ai_download`, `cancel_ai_download`, `get_folder_suggestions`. Note: `get_system_memory_info` moved to top-level `system_memory.rs`.
29+
Core: `get_ai_status`, `get_ai_model_info`, `get_ai_runtime_status`, `configure_ai`, `start_ai_server`, `stop_ai_server`, `check_ai_connection`, `start_ai_download`, `cancel_ai_download`, `get_folder_suggestions`, `stream_folder_suggestions`, `cancel_folder_suggestions`. Note: `get_system_memory_info` moved to top-level `system_memory.rs`.
2730
Legacy (still wired, used by toast): `uninstall_ai`, `dismiss_ai_offer`, `opt_out_ai`, `opt_in_ai`, `is_ai_opted_out`.
2831

2932
## Startup flow
@@ -116,6 +119,21 @@ privacy-focused users. The architecture doesn't fight this switch — it's just
116119
**Decision**: Use `genai` crate as the chat client instead of hand-rolled `reqwest` JSON.
117120
**Why**: We hit two production bugs that were per-provider quirks: (1) GPT-5/o-series chat models reject any non-default `temperature` (HTTP 400), and (2) `gpt-*-pro` / `*-codex` models only respond on `/v1/responses`, not `/v1/chat/completions` (HTTP 404). Each new model adds another quirk. `genai` normalizes ~20 providers, auto-routes Responses-API models, and gives us Anthropic / Gemini / xAI / OpenRouter for free with the same code path. Tradeoff: pinned at `0.5.3` (stable, ~3 months old) with a solo maintainer; mitigated by it being MIT/Apache-2.0 + small enough to fork if needed.
118121

122+
**Decision**: Streaming uses `tauri::ipc::Channel<T>` per call, not the global `app.emit` pattern that downloads use.
123+
**Why**: User can open the new-folder dialog, cancel, and reopen quickly. Two streams could overlap if we used a global event — listeners from the second open would see chunks from the first. Channel scopes the events to a single command invocation, eliminating the race. Tauri 2 docs explicitly recommend `Channel<T>` for streaming events from a command.
124+
125+
**Decision**: Streaming command `stream_folder_suggestions` always returns `Ok(())`; all signaling (suggestions, completion, cancellation, failure) goes through `Channel<SuggestionStreamEvent>`.
126+
**Why**: Mixing IPC `Result<_, String>` with channel events would split the error contract. One signaling path is simpler for both Rust and TypeScript callers. `#[tauri::command]` requires the `Result` return type purely for syntactic reasons here.
127+
128+
**Decision**: Line-buffering and sanitization happen in Rust (`StreamingSanitizer`), not in the frontend.
129+
**Why**: AGENTS.md principle "smart backend, thin frontend." Sanitization rules (markdown stripping, numbering detection, dedupe by case-insensitive existing-names + emit-history) are non-trivial; replicating them in TypeScript would create two authorities that drift. Frontend just renders strings.
130+
131+
**Decision**: Cancellation via explicit `cancel_folder_suggestions` command + `tokio_util::sync::CancellationToken`, not implicit drop detection on the Channel.
132+
**Why**: Tauri 2's `Channel::send` is fire-and-forget into the IPC queue. It does NOT report frontend handler GC or webview destruction back to the backend. Without an explicit cancel signal, the backend would keep streaming after the user closes the dialog — billing cloud providers and pegging local-LLM compute. `CancellationToken::cancel` is itself idempotent, so the same token can be canceled by an explicit cancel call AND by an implicit `Channel::send` failure in the same tick — both succeed.
133+
134+
**Decision**: Cancel-token registry (`STREAM_CANCEL_TOKENS`) is a separate `LazyLock<Mutex<HashMap>>` in `manager.rs`, not part of `ManagerState`.
135+
**Why**: Streaming task lifecycle is orthogonal to file-manager AI state. Keeping it isolated lets us drop entries on task end without holding the wider `MANAGER` lock and without inflating `ManagerState`.
136+
119137
## Gotchas
120138

121139
**Gotcha**: `genai` requires `base_url` to end with `/`. Without the trailing slash, `Url::join("chat/completions")` strips the last segment and you'd hit `https://api.openai.com/chat/completions` (404) instead of `/v1/chat/completions`. `client.rs::build_client` normalizes by appending `/` if missing.
@@ -139,8 +157,14 @@ privacy-focused users. The architecture doesn't fight this switch — it's just
139157
**Gotcha**: `wait_for_server_health` kills the process on timeout or early death — don't remove that cleanup.
140158
**Why**: Without it, a process that fails health check would be orphaned (PID tracked but never cleaned up until explicit stop).
141159

160+
**Gotcha**: `Channel::send` returns `Err` only when the webview itself is gone (window closed); it succeeds silently after the JS-side handler is GC'd. Don't rely on send failure for liveness — use the explicit `cancel_folder_suggestions` command. Send-error in the streaming-suggestion `try_emit` callback triggers the cancel token as defense-in-depth implicit cancel.
161+
162+
**Gotcha**: Cancel via `tokio::select!` drops the in-flight `stream.next()` future. For `genai`'s reqwest-backed SSE this is the desired terminal action — closes the connection, cuts billing. Single-poll cancel-safety is the only model we rely on; we never resume a previously-canceled stream.
163+
164+
**Gotcha**: `wiremock` does not chunk-deliver SSE bodies in distinct frames; it writes the whole body in one HTTP response. That gives false confidence we'd be exercising multi-chunk parse paths. `client_streaming_test.rs` uses an `axum`-based mock SSE server with `tokio::time::sleep` between frames instead.
165+
142166
## Dependencies
143167

144-
External: `genai` (chat normalization), `reqwest` (download streaming + `health_check`), `tokio`, `libc`, `futures_util`
145-
Dev: `wiremock` (HTTP mock for `client_integration_test.rs`)
168+
External: `genai` (chat normalization), `reqwest` (download streaming + `health_check`), `tokio`, `tokio-util` (`CancellationToken`), `libc`, `futures_util`
169+
Dev: `wiremock` (HTTP mock for `client_integration_test.rs`); `axum` is used in test-only mode for `client_streaming_test.rs`'s SSE mock.
146170
Internal: `crate::ignore_poison::IgnorePoison`, `crate::file_system::get_file_at`

apps/desktop/src-tauri/src/ai/client.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
use std::sync::Arc;
1313
use std::time::Duration;
1414

15+
use futures_util::stream::{BoxStream, StreamExt};
1516
use genai::adapter::AdapterKind;
16-
use genai::chat::{ChatMessage, ChatOptions, ChatRequest, ReasoningEffort};
17+
use genai::chat::{ChatMessage, ChatOptions, ChatRequest, ChatStreamEvent, ReasoningEffort};
1718
use genai::resolver::{AuthData, Endpoint, ServiceTargetResolver};
1819
use genai::{Client, ModelIden, ServiceTarget};
1920

@@ -135,6 +136,63 @@ pub async fn chat_completion(
135136
Ok(text)
136137
}
137138

139+
/// Streams a chat completion. Returns a boxed stream of content chunks.
140+
///
141+
/// Same per-model option fixups as [`chat_completion`] (reasoning models get
142+
/// `temperature`/`top_p` stripped and `ReasoningEffort::Low` substituted). Reasoning,
143+
/// thought-signature, and tool-call chunks are filtered out — callers only see the
144+
/// visible text content. Stream ends when `genai` emits `End` or errors; an empty
145+
/// stream (zero chunks) is valid and matches the same graceful-degradation contract
146+
/// as `chat_completion`'s "AI returned no text" case.
147+
///
148+
/// Cancellation: drop the returned stream. The `genai::ChatStreamResponse`'s reqwest
149+
/// body is closed, billing stops on cloud providers, local-LLM compute is freed.
150+
pub async fn chat_completion_stream(
151+
backend: &AiBackend,
152+
system_prompt: &str,
153+
user_prompt: &str,
154+
options: &ChatOptions,
155+
) -> Result<BoxStream<'static, Result<String, AiError>>, AiError> {
156+
let target = backend
157+
.client
158+
.resolve_service_target(&backend.model)
159+
.await
160+
.map_err(map_genai_error)?;
161+
162+
let effective_options = adjust_for_model(options, &target);
163+
164+
let req = ChatRequest::new(vec![
165+
ChatMessage::system(system_prompt.to_owned()),
166+
ChatMessage::user(user_prompt.to_owned()),
167+
]);
168+
169+
log::debug!(
170+
"AI chat_completion_stream: opening stream (adapter={:?}, model={})",
171+
target.model.adapter_kind,
172+
&*target.model.model_name
173+
);
174+
175+
let res = backend
176+
.client
177+
.exec_chat_stream(&backend.model, req, Some(&effective_options))
178+
.await
179+
.map_err(map_genai_error)?;
180+
181+
// Map ChatStreamEvent → Option<String>: keep only visible content; drop reasoning,
182+
// thought-signature, tool-call chunks; pass through errors mapped to AiError.
183+
let stream = res.stream.filter_map(|item| async move {
184+
match item {
185+
Ok(ChatStreamEvent::Chunk(chunk)) => Some(Ok(chunk.content)),
186+
Ok(ChatStreamEvent::Start | ChatStreamEvent::End(_)) => None,
187+
Ok(ChatStreamEvent::ReasoningChunk(_) | ChatStreamEvent::ThoughtSignatureChunk(_)) => None,
188+
Ok(ChatStreamEvent::ToolCallChunk(_)) => None,
189+
Err(e) => Some(Err(map_genai_error(e))),
190+
}
191+
});
192+
193+
Ok(stream.boxed())
194+
}
195+
138196
/// Per-model option fixup: reasoning-class models reject `temperature`. Returns a
139197
/// modified clone of `options` when needed; otherwise hands back a clone unchanged.
140198
fn adjust_for_model(options: &ChatOptions, target: &ServiceTarget) -> ChatOptions {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
//! Real-API smoke tests against Anthropic. **Not run in CI** — gated behind `#[ignore]`,
2+
//! requires a valid `ANTHROPIC_API_KEY` env var.
3+
//!
4+
//! Why a separate file from `client_real_openai_test.rs`: Anthropic's native streaming
5+
//! protocol differs from OpenAI's SSE shape (event names like `content_block_delta`
6+
//! vs `data:` JSON envelopes). Without exercising it, we'd be testing only the OpenAI
7+
//! lineage despite supporting Anthropic via `genai`.
8+
//!
9+
//! Run with:
10+
//! ```sh
11+
//! ANTHROPIC_API_KEY=$(security find-generic-password -a "$USER" -s "ANTHROPIC_API_KEY" -w) \
12+
//! cargo nextest run --lib --run-ignored only ai::client_real_anthropic_test
13+
//! ```
14+
//!
15+
//! Costs ~$0.001 per full run.
16+
17+
use futures_util::StreamExt;
18+
use genai::chat::ChatOptions;
19+
20+
use super::client::{AiBackend, chat_completion, chat_completion_stream};
21+
22+
const BASE_URL: &str = "https://api.anthropic.com/v1/";
23+
24+
fn api_key_or_skip() -> Option<String> {
25+
let key = std::env::var("ANTHROPIC_API_KEY").ok()?;
26+
if key.trim().is_empty() {
27+
return None;
28+
}
29+
Some(key)
30+
}
31+
32+
fn opts() -> ChatOptions {
33+
ChatOptions::default()
34+
.with_temperature(0.3)
35+
.with_max_tokens(200)
36+
.with_top_p(0.9)
37+
}
38+
39+
#[tokio::test]
40+
#[ignore = "real API call — set ANTHROPIC_API_KEY to run"]
41+
async fn smoke_claude_haiku_chat() {
42+
let Some(api_key) = api_key_or_skip() else {
43+
panic!("ANTHROPIC_API_KEY not set");
44+
};
45+
let backend = AiBackend::remote(api_key, String::from(BASE_URL), String::from("claude-3-5-haiku-latest"));
46+
47+
let res = chat_completion(
48+
&backend,
49+
"You answer in exactly one short sentence.",
50+
"Say the word 'pong'.",
51+
&opts(),
52+
)
53+
.await
54+
.expect("real Anthropic call should succeed");
55+
56+
assert!(!res.trim().is_empty(), "response should be non-empty");
57+
log::info!(target: "ai_smoke", "claude-3-5-haiku → {res}");
58+
}
59+
60+
#[tokio::test]
61+
#[ignore = "real API call — set ANTHROPIC_API_KEY to run"]
62+
async fn smoke_claude_haiku_stream() {
63+
let Some(api_key) = api_key_or_skip() else {
64+
panic!("ANTHROPIC_API_KEY not set");
65+
};
66+
let backend = AiBackend::remote(api_key, String::from(BASE_URL), String::from("claude-3-5-haiku-latest"));
67+
68+
let mut stream = chat_completion_stream(
69+
&backend,
70+
"You answer in exactly one short sentence.",
71+
"Say the word 'pong'.",
72+
&opts(),
73+
)
74+
.await
75+
.expect("stream open");
76+
77+
let mut text = String::new();
78+
let mut chunks = 0;
79+
while let Some(item) = stream.next().await {
80+
let chunk = item.expect("chunk ok");
81+
text.push_str(&chunk);
82+
chunks += 1;
83+
}
84+
85+
assert!(!text.trim().is_empty(), "expected non-empty assembled text");
86+
assert!(chunks > 0, "expected at least one chunk");
87+
log::info!(target: "ai_smoke", "claude-3-5-haiku stream → {chunks} chunks, total: {text}");
88+
}

0 commit comments

Comments
 (0)