diff --git a/src/clients.rs b/src/clients.rs index 738ead7..2b94c4a 100644 --- a/src/clients.rs +++ b/src/clients.rs @@ -74,6 +74,25 @@ pub trait RegistryClient: Send + Sync { ) -> Result; fn query_dead_code(&self, repo_id: &str, include_pub: bool, limit: usize) -> Result; + + fn save_relation( + &self, + from: &str, + to: &str, + relation_type: &str, + confidence: f64, + ) -> Result; + + fn query_relations( + &self, + entity_id: &str, + direction: &str, + relation_type: Option<&str>, + ) -> Result; + + fn delete_relations(&self, from: &str, to: &str, relation_type: Option<&str>) -> Result; + + fn list_vault_notes(&self) -> Result; } /// Knowledge engine operations. @@ -110,3 +129,19 @@ pub trait SearchClient: Send + Sync { limit: usize, ) -> Result>; } + +/// Workflow management exposed to MCP tools. +pub trait WorkflowClient: Send + Sync { + fn list_workflows(&self) -> Result; + fn get_workflow(&self, workflow_id: &str) -> Result; + fn run_workflow(&self, workflow_id: &str, inputs: Value) -> Result; + fn get_execution(&self, exec_id: i64) -> Result; +} + +/// Vault (Markdown knowledge-base) operations exposed to MCP tools. +pub trait VaultClient: Send + Sync { + fn list_vault_notes(&self) -> Result; + fn read_vault_note(&self, path: &str) -> Result; + fn get_backlinks(&self, note_id: &str) -> Result; + fn build_vault_graph(&self, repo_id: Option<&str>) -> Result; +} diff --git a/src/mcp/tools/relations.rs b/src/mcp/tools/relations.rs index 70ee592..cf88238 100644 --- a/src/mcp/tools/relations.rs +++ b/src/mcp/tools/relations.rs @@ -1,5 +1,6 @@ // SPDX-License-Identifier: MIT // Copyright (c) 2026 juice094 +use crate::clients::RegistryClient; use crate::mcp::McpTool; #[derive(Clone)] @@ -83,10 +84,7 @@ Returns: success boolean and relation details."#, })); } - let conn = ctx.conn()?; - if let Err(e) = - crate::registry::relation::save_relation(&conn, &from, &to, &rel_type, confidence) - { + if let Err(e) = ctx.save_relation(&from, &to, &rel_type, confidence) { let msg = e.to_string(); if msg.contains("foreign key constraint") || msg.contains("FOREIGN KEY") { return Ok(serde_json::json!({ @@ -162,73 +160,9 @@ Returns: JSON array of relations with to_entity_id, relation_type, confidence, a })); } - let conn = ctx.conn()?; - let results = match direction { - "bidirectional" => { - let rows = crate::registry::relation::find_related_entities( - &conn, - &entity_id, - relation_type, - )?; - rows.into_iter() - .map(|(from, to, rt, conf, created)| { - serde_json::json!({ - "from_entity_id": from, - "to_entity_id": to, - "relation_type": rt, - "confidence": conf, - "created_at": created - }) - }) - .collect::>() - } - "incoming" => { - let mut stmt = conn.prepare( - "SELECT from_entity_id, relation_type, confidence, created_at FROM relations - WHERE to_entity_id = ?1 - ORDER BY confidence DESC", - )?; - let rows = stmt.query_map([&entity_id], |row| { - Ok(( - row.get::<_, String>(0)?, - row.get::<_, String>(1)?, - row.get::<_, f64>(2)?, - row.get::<_, String>(3)?, - )) - })?; - let filtered: Vec<_> = if let Some(rt) = relation_type.filter(|s| !s.is_empty()) { - rows.filter(|r| r.as_ref().map(|(_, t, _, _)| t == rt).unwrap_or(false)) - .collect::, _>>()? - } else { - rows.collect::, _>>()? - }; - filtered - .into_iter() - .map(|(from, rt, conf, created)| { - serde_json::json!({ - "from_entity_id": from, - "relation_type": rt, - "confidence": conf, - "created_at": created - }) - }) - .collect::>() - } - _ => { - let rows = - crate::registry::relation::list_relations(&conn, &entity_id, relation_type)?; - rows.into_iter() - .map(|(to, rt, conf, created)| { - serde_json::json!({ - "to_entity_id": to, - "relation_type": rt, - "confidence": conf, - "created_at": created - }) - }) - .collect::>() - } - }; + let value = ctx.query_relations(&entity_id, direction, relation_type)?; + let results = + value.get("relations").and_then(|v| v.as_array()).cloned().unwrap_or_default(); Ok(serde_json::json!({ "success": true, @@ -297,17 +231,9 @@ Returns: success boolean and count of deleted relations."#, })); } - let conn = ctx.conn()?; - let count = match rel_type.as_deref().filter(|s| !s.is_empty()) { - Some(rt) => conn.execute( - "DELETE FROM relations WHERE from_entity_id = ?1 AND to_entity_id = ?2 AND relation_type = ?3", - rusqlite::params![&from, &to, rt], - )?, - None => conn.execute( - "DELETE FROM relations WHERE from_entity_id = ?1 AND to_entity_id = ?2", - rusqlite::params![&from, &to], - )?, - }; + let value = + ctx.delete_relations(&from, &to, rel_type.as_deref().filter(|s| !s.is_empty()))?; + let count = value.get("deleted").and_then(|v| v.as_u64()).unwrap_or(0) as usize; Ok(serde_json::json!({ "success": true, diff --git a/src/mcp/tools/vault.rs b/src/mcp/tools/vault.rs index 8b90f9a..f2461de 100644 --- a/src/mcp/tools/vault.rs +++ b/src/mcp/tools/vault.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: MIT // Copyright (c) 2026 juice094 +use crate::clients::{DigestClient, VaultClient}; use crate::mcp::McpTool; +use crate::registry::VaultNote; use anyhow::Context; #[derive(Clone)] @@ -51,42 +53,45 @@ Returns: JSON array of matching notes. Each includes: id, title, path, and tags. .and_then(|v| v.as_str()) .context("Missing required argument: query")?; - let pool = ctx.pool(); - let results = tokio::task::spawn_blocking({ - let query = query.to_string(); - move || { - let conn = pool.get()?; - let notes = crate::registry::vault::list_vault_notes(&conn)?; - let keywords: Vec<&str> = query.split_whitespace().collect(); - - let filtered: Vec<_> = notes - .into_iter() - .filter(|n| { - let content = crate::vault::fs_io::read_note_body(&n.path) - .map(|(body, _fm)| body) - .unwrap_or_default(); - let hay = format!( - "{} {} {} {}", - n.id, - n.title.as_deref().unwrap_or(""), - n.tags.join(","), - content - ) - .to_lowercase(); - keywords.iter().all(|kw| hay.contains(&kw.to_lowercase())) + let ctx = ctx.clone(); + let query_owned = query.to_string(); + let results = tokio::task::spawn_blocking(move || { + let value = ctx.list_vault_notes()?; + let notes: Vec = serde_json::from_value( + value.get("notes").cloned().unwrap_or(serde_json::json!([])), + ) + .unwrap_or_default(); + let keywords: Vec<&str> = query_owned.split_whitespace().collect(); + + let filtered: Vec<_> = notes + .into_iter() + .filter(|n| { + let content = ctx + .read_vault_note(&n.path) + .ok() + .and_then(|v| v.get("content").and_then(|c| c.as_str()).map(String::from)) + .unwrap_or_default(); + let hay = format!( + "{} {} {} {}", + n.id, + n.title.as_deref().unwrap_or(""), + n.tags.join(","), + content + ) + .to_lowercase(); + keywords.iter().all(|kw| hay.contains(&kw.to_lowercase())) + }) + .map(|n| { + serde_json::json!({ + "id": n.id, + "title": n.title, + "path": n.path, + "tags": n.tags, }) - .map(|n| { - serde_json::json!({ - "id": n.id, - "title": n.title, - "path": n.path, - "tags": n.tags, - }) - }) - .collect(); + }) + .collect(); - anyhow::Ok(filtered) - } + anyhow::Ok(filtered) }) .await .map_err(|e| anyhow::anyhow!("spawn_blocking failed: {}", e))??; @@ -140,15 +145,18 @@ Returns: JSON with frontmatter (id, repo, tags, ai_context, created, updated) an async fn invoke( &self, args: serde_json::Value, - _ctx: &mut crate::storage::AppContext, + ctx: &mut crate::storage::AppContext, ) -> anyhow::Result { let path = args .get("path") .and_then(|v| v.as_str()) .context("Missing required argument: path")?; - let (body, frontmatter) = crate::vault::fs_io::read_note_body(path) + let value = ctx + .read_vault_note(path) .context("Failed to read note — file not found or unreadable")?; + let body = value.get("content").cloned().unwrap_or(serde_json::json!("")); + let frontmatter = value.get("frontmatter").cloned().unwrap_or(serde_json::json!(null)); Ok(serde_json::json!({ "success": true, @@ -325,30 +333,12 @@ Returns: JSON array of backlinking notes, each with id, title, and path."#, .and_then(|v| v.as_str()) .context("Missing required argument: note_id")?; - let vault_dir = ctx.storage.workspace_dir().ok().map(|ws| ws.join("vault")); - let backlinks = tokio::task::spawn_blocking({ - let note_id = note_id.to_string(); - let vault_dir = vault_dir.clone(); - move || { - if let Some(vd) = vault_dir { - match crate::vault::backlinks::build_backlink_index(&vd) { - Ok(index) => crate::vault::backlinks::get_backlinks(&index, ¬e_id), - Err(_) => Vec::new(), - } - } else { - Vec::new() - } - } - }) - .await - .map_err(|e| anyhow::anyhow!("spawn_blocking failed: {}", e))?; - - Ok(serde_json::json!({ - "success": true, - "target": note_id, - "count": backlinks.len(), - "backlinks": backlinks, - })) + let ctx = ctx.clone(); + let note_id = note_id.to_string(); + let value = tokio::task::spawn_blocking(move || ctx.get_backlinks(¬e_id)) + .await + .map_err(|e| anyhow::anyhow!("spawn_blocking failed: {}", e))??; + Ok(value) } } @@ -386,39 +376,32 @@ Returns: JSON with success status and the generated file path."#, let today = chrono::Utc::now().format("%Y-%m-%d").to_string(); let rel_path = format!("99-Meta/Daily/{}.md", today); - let pool = ctx.pool(); - let config = ctx.config.clone(); - let i18n = ctx.i18n; - + let ctx = ctx.clone(); + let today_owned = today.clone(); let vault_root = ctx .storage .workspace_dir() .map(|ws| ws.join("vault")) .unwrap_or_else(|_| std::path::PathBuf::from("vault")); - let file_path = tokio::task::spawn_blocking({ - let rel_path = rel_path.clone(); - let today = today.clone(); - let vault_root = vault_root.clone(); - move || { - let conn = pool.get()?; - let digest = crate::digest::generate_daily_digest(&conn, &config, &i18n)?; - - let target = resolve_vault_path(&rel_path, &vault_root)?; - - if let Some(parent) = target.parent() { - std::fs::create_dir_all(parent)?; - } + let file_path = tokio::task::spawn_blocking(move || { + let digest = ctx.generate_daily_digest()?; + let digest_str = digest.get("digest").and_then(|v| v.as_str()).unwrap_or(""); - let content = if target.exists() { - let existing = std::fs::read_to_string(&target)?; - format!("{}\n\n{}", existing, digest) - } else { - format!("---\ndate: {}\ntags: [\"daily\"]\n---\n\n{}", today, digest) - }; + let target = resolve_vault_path(&rel_path, &vault_root)?; - std::fs::write(&target, content)?; - anyhow::Ok(target.to_string_lossy().to_string()) + if let Some(parent) = target.parent() { + std::fs::create_dir_all(parent)?; } + + let content = if target.exists() { + let existing = std::fs::read_to_string(&target)?; + format!("{}\n\n{}", existing, digest_str) + } else { + format!("---\ndate: {}\ntags: [\"daily\"]\n---\n\n{}", today_owned, digest_str) + }; + + std::fs::write(&target, content)?; + anyhow::Ok(target.to_string_lossy().to_string()) }) .await .map_err(|e| anyhow::anyhow!("spawn_blocking failed: {}", e))??; @@ -468,110 +451,10 @@ Returns: JSON with nodes (id, title) and edges (source, target)."#, ) -> anyhow::Result { let repo_id = args.get("repo_id").and_then(|v| v.as_str()).map(|s| s.to_string()); - let vault_dir = ctx.storage.workspace_dir().ok().map(|ws| ws.join("vault")); - let graph = tokio::task::spawn_blocking({ - let repo_id = repo_id.clone(); - let vault_dir = vault_dir.clone(); - move || { - let Some(vd) = vault_dir else { - return anyhow::Ok(serde_json::json!({ - "success": true, - "count": 0, - "edge_count": 0, - "nodes": [], - "edges": [], - })); - }; - - let index = crate::vault::backlinks::build_backlink_index(&vd)?; - - let mut id_to_title: std::collections::HashMap = - std::collections::HashMap::new(); - let mut id_to_repo: std::collections::HashMap = - std::collections::HashMap::new(); - - for entry in walkdir::WalkDir::new(&vd) - .follow_links(false) - .into_iter() - .filter_map(|e| e.ok()) - .filter(|e| e.file_type().is_file()) - .filter(|e| e.path().extension().map(|ext| ext == "md").unwrap_or(false)) - { - let path = entry.path(); - let rel_path = path.strip_prefix(&vd).unwrap_or(path); - let id = rel_path.to_string_lossy().replace('\\', "/"); - - let content = match std::fs::read_to_string(path) { - Ok(c) => c, - Err(_) => continue, - }; - - if let Some((fm, _)) = crate::vault::frontmatter::extract_frontmatter(&content) - { - id_to_title.insert(id.clone(), fm.title.unwrap_or_else(|| id.clone())); - if let Some(repo) = fm.repo { - id_to_repo.insert(id, repo); - } - } else { - id_to_title.insert(id.clone(), id.clone()); - } - } - - let allowed_ids: std::collections::HashSet = if let Some(ref rid) = repo_id - { - id_to_repo.iter().filter(|(_, r)| *r == rid).map(|(id, _)| id.clone()).collect() - } else { - id_to_title.keys().cloned().collect() - }; - - // Normalize wikilink targets (e.g. "b" -> "b.md") to vault file ids. - let mut id_lookup: std::collections::HashMap = - std::collections::HashMap::new(); - for id in id_to_title.keys() { - id_lookup.insert(id.clone(), id.clone()); - if let Some(stem) = id.strip_suffix(".md") { - id_lookup.insert(stem.to_string(), id.clone()); - } - } - - let nodes: Vec<_> = allowed_ids - .iter() - .map(|id| { - serde_json::json!({ - "id": id, - "title": id_to_title.get(id).unwrap_or(id), - }) - }) - .collect(); - - let mut edges = Vec::new(); - for (target, sources) in &index { - let normalized = - id_lookup.get(target).cloned().unwrap_or_else(|| target.clone()); - if !allowed_ids.contains(&normalized) { - continue; - } - for source in sources { - if allowed_ids.contains(source) { - edges.push(serde_json::json!({ - "source": source, - "target": &normalized, - })); - } - } - } - - anyhow::Ok(serde_json::json!({ - "success": true, - "count": nodes.len(), - "edge_count": edges.len(), - "nodes": nodes, - "edges": edges, - })) - } - }) - .await - .map_err(|e| anyhow::anyhow!("spawn_blocking failed: {}", e))??; + let ctx = ctx.clone(); + let graph = tokio::task::spawn_blocking(move || ctx.build_vault_graph(repo_id.as_deref())) + .await + .map_err(|e| anyhow::anyhow!("spawn_blocking failed: {}", e))??; Ok(graph) } diff --git a/src/mcp/tools/workflow.rs b/src/mcp/tools/workflow.rs index bb17bc7..0dfd7bb 100644 --- a/src/mcp/tools/workflow.rs +++ b/src/mcp/tools/workflow.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: MIT // Copyright (c) 2026 juice094 +use crate::clients::WorkflowClient; use crate::mcp::McpTool; -use std::collections::HashMap; #[derive(Clone)] pub struct DevkitWorkflowListTool; @@ -35,23 +35,7 @@ Returns: JSON array of workflows with id, name, and version."#, _args: serde_json::Value, ctx: &mut crate::storage::AppContext, ) -> anyhow::Result { - let conn = ctx.conn()?; - let workflows = crate::workflow::state::list_workflows(&conn)?; - let items: Vec = workflows - .into_iter() - .map(|(id, name, version)| { - serde_json::json!({ - "id": id, - "name": name, - "version": version - }) - }) - .collect(); - Ok(serde_json::json!({ - "success": true, - "count": items.len(), - "workflows": items - })) + ctx.list_workflows() } } @@ -104,81 +88,7 @@ Returns: execution summary with status, step results, and execution_id."#, })); } - let conn = ctx.conn()?; - let wf = match crate::workflow::state::get_workflow(&conn, &workflow_id)? { - Some(wf) => wf, - None => { - return Ok(serde_json::json!({ - "success": false, - "error": format!("workflow '{}' not found", workflow_id) - })); - } - }; - - // Parse inputs into HashMap - let inputs: HashMap = if let Some(obj) = inputs_value.as_object() { - obj.iter() - .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string()))) - .collect() - } else { - HashMap::new() - }; - - let inputs_json = inputs_value.to_string(); - let exec_id = crate::workflow::state::create_execution(&conn, &workflow_id, &inputs_json)?; - crate::workflow::state::update_execution( - &conn, - exec_id, - &crate::workflow::model::ExecutionStatus::Running, - None, - None, - )?; - - let pool = ctx.pool(); - let start = std::time::Instant::now(); - let result = crate::workflow::executor::execute_workflow(&conn, &pool, &wf, inputs); - let duration_ms = start.elapsed().as_millis() as i64; - - match result { - Ok(step_results) => { - crate::workflow::state::update_execution( - &conn, - exec_id, - &crate::workflow::model::ExecutionStatus::Completed, - None, - Some(duration_ms), - )?; - let results_json: HashMap = step_results - .into_iter() - .map(|(k, v)| (k, serde_json::to_value(v).unwrap_or(serde_json::json!(null)))) - .collect(); - Ok(serde_json::json!({ - "success": true, - "execution_id": exec_id, - "workflow_id": workflow_id, - "status": "Completed", - "duration_ms": duration_ms, - "step_results": results_json - })) - } - Err(e) => { - crate::workflow::state::update_execution( - &conn, - exec_id, - &crate::workflow::model::ExecutionStatus::Failed, - None, - Some(duration_ms), - )?; - Ok(serde_json::json!({ - "success": false, - "execution_id": exec_id, - "workflow_id": workflow_id, - "status": "Failed", - "duration_ms": duration_ms, - "error": e.to_string() - })) - } - } + ctx.run_workflow(&workflow_id, inputs_value) } } @@ -227,24 +137,7 @@ Returns: execution record with status, current_step, timestamps, and duration."# })); } - let conn = ctx.conn()?; - match crate::workflow::state::get_execution(&conn, exec_id)? { - Some(exec) => Ok(serde_json::json!({ - "success": true, - "execution_id": exec.id, - "workflow_id": exec.workflow_id, - "status": format!("{:?}", exec.status), - "current_step": exec.current_step, - "started_at": exec.started_at, - "finished_at": exec.finished_at, - "duration_ms": exec.duration_ms, - "inputs": exec.inputs_json - })), - None => Ok(serde_json::json!({ - "success": false, - "error": format!("execution {} not found", exec_id) - })), - } + ctx.get_execution(exec_id) } } diff --git a/src/registry.rs b/src/registry.rs index 0d89121..1ffaccc 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -382,6 +382,131 @@ impl crate::clients::RegistryClient for crate::storage::AppContext { "dead_functions": out })) } + + fn save_relation( + &self, + from: &str, + to: &str, + relation_type: &str, + confidence: f64, + ) -> anyhow::Result { + let conn = self.conn()?; + crate::registry::relation::save_relation(&conn, from, to, relation_type, confidence)?; + Ok(serde_json::json!({ "success": true })) + } + + fn query_relations( + &self, + entity_id: &str, + direction: &str, + relation_type: Option<&str>, + ) -> anyhow::Result { + let conn = self.conn()?; + let results = match direction { + "bidirectional" => { + let rows = crate::registry::relation::find_related_entities( + &conn, + entity_id, + relation_type, + )?; + rows.into_iter() + .map(|(from, to, rt, conf, created)| { + serde_json::json!({ + "from_entity_id": from, + "to_entity_id": to, + "relation_type": rt, + "confidence": conf, + "created_at": created + }) + }) + .collect::>() + } + "incoming" => { + let mut stmt = conn.prepare( + "SELECT from_entity_id, relation_type, confidence, created_at FROM relations + WHERE to_entity_id = ?1 + ORDER BY confidence DESC", + )?; + let rows = stmt.query_map([entity_id], |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, f64>(2)?, + row.get::<_, String>(3)?, + )) + })?; + let filtered: Vec<_> = if let Some(rt) = relation_type.filter(|s| !s.is_empty()) { + rows.filter(|r| r.as_ref().map(|(_, t, _, _)| t == rt).unwrap_or(false)) + .collect::, _>>()? + } else { + rows.collect::, _>>()? + }; + filtered + .into_iter() + .map(|(from, rt, conf, created)| { + serde_json::json!({ + "from_entity_id": from, + "relation_type": rt, + "confidence": conf, + "created_at": created + }) + }) + .collect::>() + } + _ => { + let rows = + crate::registry::relation::list_relations(&conn, entity_id, relation_type)?; + rows.into_iter() + .map(|(to, rt, conf, created)| { + serde_json::json!({ + "to_entity_id": to, + "relation_type": rt, + "confidence": conf, + "created_at": created + }) + }) + .collect::>() + } + }; + Ok(serde_json::json!({ "success": true, "relations": results })) + } + + fn delete_relations( + &self, + from: &str, + to: &str, + relation_type: Option<&str>, + ) -> anyhow::Result { + let conn = self.conn()?; + let count = match relation_type.filter(|s| !s.is_empty()) { + Some(rt) => conn.execute( + "DELETE FROM relations WHERE from_entity_id = ?1 AND to_entity_id = ?2 AND relation_type = ?3", + rusqlite::params![from, to, rt], + )?, + None => conn.execute( + "DELETE FROM relations WHERE from_entity_id = ?1 AND to_entity_id = ?2", + rusqlite::params![from, to], + )?, + }; + Ok(serde_json::json!({ "success": true, "deleted": count })) + } + + fn list_vault_notes(&self) -> anyhow::Result { + let conn = self.conn()?; + let notes = crate::registry::vault::list_vault_notes(&conn)?; + let results: Vec = notes + .into_iter() + .map(|n| { + serde_json::json!({ + "id": n.id, + "path": n.path, + "title": n.title, + "tags": n.tags, + }) + }) + .collect(); + Ok(serde_json::json!({ "success": true, "count": results.len(), "notes": results })) + } } #[cfg(test)] diff --git a/src/storage.rs b/src/storage.rs index c5004f1..4f14713 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -116,12 +116,13 @@ impl StorageBackend for DefaultStorageBackend { /// /// 命令处理函数应通过此结构体获取所有外部依赖, /// 避免直接调用全局函数或读取环境变量。 +#[derive(Clone)] pub struct AppContext { pub storage: Arc, pub config: Config, pub i18n: I18n, pool: Pool, - env_cache: std::sync::Mutex, + env_cache: Arc>, } impl AppContext { @@ -146,7 +147,7 @@ impl AppContext { config, i18n, pool, - env_cache: std::sync::Mutex::new(EnvVersionCache::default()), + env_cache: Arc::new(std::sync::Mutex::new(EnvVersionCache::default())), }) } @@ -169,7 +170,7 @@ impl AppContext { config, i18n, pool, - env_cache: std::sync::Mutex::new(EnvVersionCache::default()), + env_cache: Arc::new(std::sync::Mutex::new(EnvVersionCache::default())), }) } diff --git a/src/vault/mod.rs b/src/vault/mod.rs index 5168b14..91e130a 100644 --- a/src/vault/mod.rs +++ b/src/vault/mod.rs @@ -6,3 +6,148 @@ pub mod fs_io; pub mod indexer; pub mod scanner; pub mod wikilink; + +use crate::storage::AppContext; + +impl crate::clients::VaultClient for AppContext { + fn list_vault_notes(&self) -> anyhow::Result { + let conn = self.conn()?; + let notes = crate::registry::vault::list_vault_notes(&conn)?; + let results: Vec = notes + .into_iter() + .map(|n| { + serde_json::json!({ + "id": n.id, + "path": n.path, + "title": n.title, + "tags": n.tags, + }) + }) + .collect(); + Ok(serde_json::json!({"success": true, "count": results.len(), "notes": results})) + } + + fn read_vault_note(&self, path: &str) -> anyhow::Result { + let (body, frontmatter) = fs_io::read_note_body(path) + .ok_or_else(|| anyhow::anyhow!("note not found or unreadable"))?; + Ok(serde_json::json!({ + "success": true, + "path": path, + "content": body, + "frontmatter": frontmatter, + })) + } + + fn get_backlinks(&self, note_id: &str) -> anyhow::Result { + let vault_dir = self.storage.workspace_dir().ok().map(|ws| ws.join("vault")); + let backlinks = if let Some(vd) = vault_dir { + match backlinks::build_backlink_index(&vd) { + Ok(index) => backlinks::get_backlinks(&index, note_id), + Err(_) => Vec::new(), + } + } else { + Vec::new() + }; + Ok(serde_json::json!({ + "success": true, + "target": note_id, + "count": backlinks.len(), + "backlinks": backlinks, + })) + } + + fn build_vault_graph(&self, repo_id: Option<&str>) -> anyhow::Result { + let vault_dir = self.storage.workspace_dir().ok().map(|ws| ws.join("vault")); + let Some(vd) = vault_dir else { + return Ok(serde_json::json!({ + "success": true, + "count": 0, + "edge_count": 0, + "nodes": [], + "edges": [], + })); + }; + + let index = backlinks::build_backlink_index(&vd)?; + + let mut id_to_title: std::collections::HashMap = + std::collections::HashMap::new(); + let mut id_to_repo: std::collections::HashMap = + std::collections::HashMap::new(); + + for entry in walkdir::WalkDir::new(&vd) + .follow_links(false) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + .filter(|e| e.path().extension().map(|ext| ext == "md").unwrap_or(false)) + { + let path = entry.path(); + let rel_path = path.strip_prefix(&vd).unwrap_or(path); + let id = rel_path.to_string_lossy().replace('\\', "/"); + + let content = match std::fs::read_to_string(path) { + Ok(c) => c, + Err(_) => continue, + }; + + if let Some((fm, _)) = frontmatter::extract_frontmatter(&content) { + id_to_title.insert(id.clone(), fm.title.unwrap_or_else(|| id.clone())); + if let Some(repo) = fm.repo { + id_to_repo.insert(id, repo); + } + } else { + id_to_title.insert(id.clone(), id.clone()); + } + } + + let allowed_ids: std::collections::HashSet = if let Some(rid) = repo_id { + id_to_repo.iter().filter(|(_, r)| *r == rid).map(|(id, _)| id.clone()).collect() + } else { + id_to_title.keys().cloned().collect() + }; + + let mut id_lookup: std::collections::HashMap = + std::collections::HashMap::new(); + for id in id_to_title.keys() { + id_lookup.insert(id.clone(), id.clone()); + if let Some(stem) = id.strip_suffix(".md") { + id_lookup.insert(stem.to_string(), id.clone()); + } + } + + let nodes: Vec<_> = allowed_ids + .iter() + .map(|id| { + serde_json::json!({ + "id": id, + "title": id_to_title.get(id).unwrap_or(id), + }) + }) + .collect(); + + let mut edges = Vec::new(); + for (target, sources) in &index { + let normalized = id_lookup.get(target).cloned().unwrap_or_else(|| target.clone()); + if !allowed_ids.contains(&normalized) { + continue; + } + for source in sources { + if allowed_ids.contains(source) { + edges.push(serde_json::json!({ + "source": source, + "target": &normalized, + })); + } + } + } + + Ok(serde_json::json!({ + "success": true, + "count": nodes.len(), + "edge_count": edges.len(), + "nodes": nodes, + "edges": edges, + })) + } +} diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index b66cc28..eb534bc 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -17,3 +17,134 @@ pub use state::{ update_execution, }; pub use validator::validate_workflow; + +use crate::storage::AppContext; + +impl crate::clients::WorkflowClient for AppContext { + fn list_workflows(&self) -> anyhow::Result { + let conn = self.conn()?; + let workflows = state::list_workflows(&conn)?; + let items: Vec = workflows + .into_iter() + .map(|(id, name, version)| { + serde_json::json!({"id": id, "name": name, "version": version}) + }) + .collect(); + Ok(serde_json::json!({"success": true, "count": items.len(), "workflows": items})) + } + + fn get_workflow(&self, workflow_id: &str) -> anyhow::Result { + let conn = self.conn()?; + match state::get_workflow(&conn, workflow_id)? { + Some(wf) => Ok(serde_json::json!({ + "success": true, + "id": wf.id, + "name": wf.name, + "version": wf.version, + "description": wf.description, + "steps": wf.steps.len(), + })), + None => Ok(serde_json::json!({"success": false, "error": "workflow not found"})), + } + } + + fn run_workflow( + &self, + workflow_id: &str, + inputs: serde_json::Value, + ) -> anyhow::Result { + let conn = self.conn()?; + let wf = match state::get_workflow(&conn, workflow_id)? { + Some(wf) => wf, + None => { + return Ok(serde_json::json!({ + "success": false, + "error": format!("workflow '{}' not found", workflow_id) + })); + } + }; + + let inputs_map: std::collections::HashMap = + if let Some(obj) = inputs.as_object() { + obj.iter() + .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string()))) + .collect() + } else { + std::collections::HashMap::new() + }; + + let inputs_json = inputs.to_string(); + let exec_id = state::create_execution(&conn, workflow_id, &inputs_json)?; + state::update_execution(&conn, exec_id, &model::ExecutionStatus::Running, None, None)?; + + let pool = self.pool(); + let start = std::time::Instant::now(); + let result = executor::execute_workflow(&conn, &pool, &wf, inputs_map); + let duration_ms = start.elapsed().as_millis() as i64; + + match result { + Ok(step_results) => { + state::update_execution( + &conn, + exec_id, + &model::ExecutionStatus::Completed, + None, + Some(duration_ms), + )?; + let results_json: std::collections::HashMap = + step_results + .into_iter() + .map(|(k, v)| { + (k, serde_json::to_value(v).unwrap_or(serde_json::json!(null))) + }) + .collect(); + Ok(serde_json::json!({ + "success": true, + "execution_id": exec_id, + "workflow_id": workflow_id, + "status": "Completed", + "duration_ms": duration_ms, + "step_results": results_json + })) + } + Err(e) => { + state::update_execution( + &conn, + exec_id, + &model::ExecutionStatus::Failed, + None, + Some(duration_ms), + )?; + Ok(serde_json::json!({ + "success": false, + "execution_id": exec_id, + "workflow_id": workflow_id, + "status": "Failed", + "duration_ms": duration_ms, + "error": e.to_string() + })) + } + } + } + + fn get_execution(&self, exec_id: i64) -> anyhow::Result { + let conn = self.conn()?; + match state::get_execution(&conn, exec_id)? { + Some(exec) => Ok(serde_json::json!({ + "success": true, + "execution_id": exec.id, + "workflow_id": exec.workflow_id, + "status": format!("{:?}", exec.status), + "current_step": exec.current_step, + "started_at": exec.started_at, + "finished_at": exec.finished_at, + "duration_ms": exec.duration_ms, + "inputs": exec.inputs_json, + })), + None => Ok(serde_json::json!({ + "success": false, + "error": format!("execution {} not found", exec_id) + })), + } + } +}