diff --git a/crates/codegraph-core/src/cfg_db.rs b/crates/codegraph-core/src/cfg_db.rs new file mode 100644 index 00000000..5381218a --- /dev/null +++ b/crates/codegraph-core/src/cfg_db.rs @@ -0,0 +1,199 @@ +//! Bulk CFG block and edge insertion via rusqlite. +//! +//! Bypasses the JS iteration loop by opening the SQLite database directly +//! from Rust and inserting all CFG blocks and edges in a single transaction. +//! Function node IDs are resolved by querying the `nodes` table. + +use std::collections::HashMap; + +use napi_derive::napi; +use rusqlite::{params, Connection, OpenFlags}; +use serde::{Deserialize, Serialize}; + +/// A single CFG block to insert (received from JS). +#[napi(object)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CfgInsertBlock { + pub index: u32, + #[napi(js_name = "type")] + pub block_type: String, + #[napi(js_name = "startLine")] + pub start_line: Option, + #[napi(js_name = "endLine")] + pub end_line: Option, + pub label: Option, +} + +/// A single CFG edge to insert (received from JS). +#[napi(object)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CfgInsertEdge { + #[napi(js_name = "sourceIndex")] + pub source_index: u32, + #[napi(js_name = "targetIndex")] + pub target_index: u32, + pub kind: String, +} + +/// CFG data for a single function definition. +#[napi(object)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CfgFunctionBatch { + /// Definition name (used to look up node ID) + pub name: String, + /// Relative file path + pub file: String, + /// Definition source line + pub line: u32, + pub blocks: Vec, + pub edges: Vec, +} + +/// Bulk-insert CFG blocks and edges into the database. +/// +/// For each function batch: +/// 1. Resolve the function's node ID from the `nodes` table +/// 2. Delete any existing CFG data for that node (handles incremental rebuilds) +/// 3. Insert all blocks, collecting their auto-generated row IDs +/// 4. Insert all edges, mapping block indices to row IDs +/// +/// Returns the total number of functions processed. Returns 0 on any error. +#[napi] +pub fn bulk_insert_cfg(db_path: String, batches: Vec) -> u32 { + if batches.is_empty() { + return 0; + } + + let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX; + let mut conn = match Connection::open_with_flags(&db_path, flags) { + Ok(c) => c, + Err(_) => return 0, + }; + + let _ = conn.execute_batch( + "PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000", + ); + + // Bail out if CFG tables don't exist + let has_tables: bool = conn + .prepare("SELECT 1 FROM sqlite_master WHERE type='table' AND name='cfg_blocks'") + .and_then(|mut s| s.query_row([], |_| Ok(true))) + .unwrap_or(false); + if !has_tables { + return 0; + } + + // ── Phase 1: Pre-fetch function node IDs ───────────────────────────── + let mut node_ids: HashMap<(String, String, u32), i64> = HashMap::new(); + { + let Ok(mut stmt) = conn.prepare( + "SELECT id FROM nodes WHERE name = ?1 AND kind IN ('function','method') AND file = ?2 AND line = ?3", + ) else { + return 0; + }; + + for batch in &batches { + let key = (batch.name.clone(), batch.file.clone(), batch.line); + if node_ids.contains_key(&key) { + continue; + } + if let Ok(id) = stmt.query_row(params![&batch.name, &batch.file, batch.line], |row| { + row.get::<_, i64>(0) + }) { + node_ids.insert(key, id); + } + } + } + + // ── Phase 2: Bulk insert in a single transaction ───────────────────── + let Ok(tx) = conn.transaction() else { + return 0; + }; + + let mut total = 0u32; + { + let Ok(mut delete_edges) = + tx.prepare("DELETE FROM cfg_edges WHERE function_node_id = ?1") + else { + return 0; + }; + let Ok(mut delete_blocks) = + tx.prepare("DELETE FROM cfg_blocks WHERE function_node_id = ?1") + else { + return 0; + }; + let Ok(mut insert_block) = tx.prepare( + "INSERT INTO cfg_blocks (function_node_id, block_index, block_type, start_line, end_line, label) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + ) else { + return 0; + }; + let Ok(mut insert_edge) = tx.prepare( + "INSERT INTO cfg_edges (function_node_id, source_block_id, target_block_id, kind) \ + VALUES (?1, ?2, ?3, ?4)", + ) else { + return 0; + }; + + for batch in &batches { + let key = (batch.name.clone(), batch.file.clone(), batch.line); + let Some(&node_id) = node_ids.get(&key) else { + continue; + }; + + // Always delete stale CFG rows (handles body-removed / incremental case) + if delete_edges.execute(params![node_id]).is_err() { + return 0; + } + if delete_blocks.execute(params![node_id]).is_err() { + return 0; + } + + if batch.blocks.is_empty() { + total += 1; + continue; + } + + // Insert blocks and collect their auto-generated row IDs + let mut block_db_ids: HashMap = HashMap::new(); + for block in &batch.blocks { + match insert_block.execute(params![ + node_id, + block.index, + &block.block_type, + block.start_line, + block.end_line, + &block.label, + ]) { + Ok(_) => { + block_db_ids.insert(block.index, tx.last_insert_rowid()); + } + Err(_) => return 0, + } + } + + // Insert edges, mapping block indices to row IDs + for edge in &batch.edges { + let Some(&source_db_id) = block_db_ids.get(&edge.source_index) else { + continue; + }; + let Some(&target_db_id) = block_db_ids.get(&edge.target_index) else { + continue; + }; + match insert_edge.execute(params![node_id, source_db_id, target_db_id, &edge.kind]) + { + Ok(_) => {} + Err(_) => return 0, + } + } + + total += 1; + } + } + + if tx.commit().is_err() { + return 0; + } + + total +} diff --git a/crates/codegraph-core/src/dataflow_db.rs b/crates/codegraph-core/src/dataflow_db.rs new file mode 100644 index 00000000..fea11e01 --- /dev/null +++ b/crates/codegraph-core/src/dataflow_db.rs @@ -0,0 +1,185 @@ +//! Bulk dataflow edge insertion via rusqlite. +//! +//! Bypasses the JS iteration loop by opening the SQLite database directly +//! from Rust and inserting all dataflow edges in a single transaction. +//! Node IDs are resolved by querying the `nodes` table (local-first, then global). + +use std::collections::HashMap; + +use napi_derive::napi; +use rusqlite::{params, Connection, OpenFlags}; +use serde::{Deserialize, Serialize}; + +/// A single dataflow edge to insert (received from JS). +#[napi(object)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DataflowInsertEdge { + /// Source function name (resolved to node ID) + #[napi(js_name = "sourceName")] + pub source_name: String, + /// Target function name (resolved to node ID) + #[napi(js_name = "targetName")] + pub target_name: String, + /// Edge kind: "flows_to", "returns", or "mutates" + pub kind: String, + #[napi(js_name = "paramIndex")] + pub param_index: Option, + pub expression: Option, + pub line: Option, + pub confidence: f64, +} + +/// A batch of dataflow edges for a single file. +#[napi(object)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FileDataflowBatch { + /// Relative file path (for local-first node resolution) + pub file: String, + pub edges: Vec, +} + +/// Resolve a function name to a node ID, preferring local (same-file) matches. +fn resolve_node( + local_stmt: &mut rusqlite::Statement, + global_stmt: &mut rusqlite::Statement, + name: &str, + file: &str, + cache: &mut HashMap<(String, String), Option>, +) -> Option { + let key = (name.to_string(), file.to_string()); + if let Some(cached) = cache.get(&key) { + return *cached; + } + + // Local-first: same file + let result = local_stmt + .query_row(params![name, file], |row| row.get::<_, i64>(0)) + .ok(); + + let id = if result.is_some() { + result + } else { + // Global fallback + global_stmt + .query_row(params![name], |row| row.get::<_, i64>(0)) + .ok() + }; + + cache.insert(key, id); + id +} + +/// Bulk-insert dataflow edges into the database. +/// +/// For each file batch, resolves function names to node IDs (local-first, +/// then global) and inserts edges in a single transaction. +/// +/// Returns the total number of edges inserted. Returns 0 on any error. +#[napi] +pub fn bulk_insert_dataflow(db_path: String, batches: Vec) -> u32 { + if batches.is_empty() { + return 0; + } + + let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX; + let mut conn = match Connection::open_with_flags(&db_path, flags) { + Ok(c) => c, + Err(_) => return 0, + }; + + let _ = conn.execute_batch( + "PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000", + ); + + // Bail out if the dataflow table doesn't exist + let has_table: bool = conn + .prepare("SELECT 1 FROM sqlite_master WHERE type='table' AND name='dataflow'") + .and_then(|mut s| s.query_row([], |_| Ok(true))) + .unwrap_or(false); + if !has_table { + return 0; + } + + // ── Phase 1: Pre-build node resolution cache ───────────────────────── + // Collect all unique (name, file) pairs we need to resolve + let mut resolve_cache: HashMap<(String, String), Option> = HashMap::new(); + { + let Ok(mut local_stmt) = conn.prepare( + "SELECT id FROM nodes WHERE name = ?1 AND file = ?2 AND kind IN ('function','method') LIMIT 1", + ) else { + return 0; + }; + let Ok(mut global_stmt) = conn.prepare( + "SELECT id FROM nodes WHERE name = ?1 AND kind IN ('function','method') ORDER BY file, line LIMIT 1", + ) else { + return 0; + }; + + for batch in &batches { + for edge in &batch.edges { + resolve_node( + &mut local_stmt, + &mut global_stmt, + &edge.source_name, + &batch.file, + &mut resolve_cache, + ); + resolve_node( + &mut local_stmt, + &mut global_stmt, + &edge.target_name, + &batch.file, + &mut resolve_cache, + ); + } + } + } + + // ── Phase 2: Bulk insert in a single transaction ───────────────────── + let Ok(tx) = conn.transaction() else { + return 0; + }; + + let mut total = 0u32; + { + let Ok(mut insert_stmt) = tx.prepare( + "INSERT INTO dataflow (source_id, target_id, kind, param_index, expression, line, confidence) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + ) else { + return 0; + }; + + for batch in &batches { + for edge in &batch.edges { + let source_key = (edge.source_name.clone(), batch.file.clone()); + let target_key = (edge.target_name.clone(), batch.file.clone()); + + let Some(&Some(source_id)) = resolve_cache.get(&source_key) else { + continue; + }; + let Some(&Some(target_id)) = resolve_cache.get(&target_key) else { + continue; + }; + + match insert_stmt.execute(params![ + source_id, + target_id, + &edge.kind, + edge.param_index, + &edge.expression, + edge.line, + edge.confidence, + ]) { + Ok(_) => total += 1, + Err(_) => return 0, + } + } + } + } + + if tx.commit().is_err() { + return 0; + } + + total +} diff --git a/crates/codegraph-core/src/lib.rs b/crates/codegraph-core/src/lib.rs index 391f0854..984359b2 100644 --- a/crates/codegraph-core/src/lib.rs +++ b/crates/codegraph-core/src/lib.rs @@ -1,9 +1,11 @@ pub mod ast_db; pub mod cfg; +pub mod cfg_db; pub mod complexity; pub mod constants; pub mod cycles; pub mod dataflow; +pub mod dataflow_db; pub mod edge_builder; pub mod extractors; pub mod import_resolution; diff --git a/src/features/cfg.ts b/src/features/cfg.ts index 389ee3c2..7cdfbb5e 100644 --- a/src/features/cfg.ts +++ b/src/features/cfg.ts @@ -17,6 +17,7 @@ import { openReadonlyOrFail, } from '../db/index.js'; import { debug, info } from '../infrastructure/logger.js'; +import { loadNative } from '../infrastructure/native.js'; import { paginateResult } from '../shared/paginate.js'; import type { BetterSqlite3Database, Definition, NodeRow, TreeSitterNode } from '../types.js'; import { findNodes } from './shared/find-nodes.js'; @@ -285,6 +286,93 @@ export async function buildCFGData( // skip WASM parser init, tree parsing, and JS visitor entirely — just persist. const allNative = allCfgNative(fileSymbols); + // ── Native bulk-insert fast path ────────────────────────────────────── + // When all CFG data is pre-computed by Rust and no files need WASM visitor, + // bypass JS iteration entirely — collect batches and hand them to rusqlite. + if (allNative) { + const native = loadNative(); + if (native?.bulkInsertCfg) { + let needsJsFallback = false; + const batches: Array<{ + name: string; + file: string; + line: number; + blocks: Array<{ + index: number; + type: string; + startLine?: number | null; + endLine?: number | null; + label?: string | null; + }>; + edges: Array<{ + sourceIndex: number; + targetIndex: number; + kind: string; + }>; + }> = []; + + for (const [relPath, symbols] of fileSymbols) { + const ext = path.extname(relPath).toLowerCase(); + if (!CFG_EXTENSIONS.has(ext)) continue; + + // Files with _tree were WASM-parsed and need the slow path + if (symbols._tree) { + needsJsFallback = true; + break; + } + + for (const def of symbols.definitions) { + if (def.kind !== 'function' && def.kind !== 'method') continue; + if (!def.line) continue; + + const cfgData = def.cfg as unknown as + | { blocks: CfgBuildBlock[]; edges: CfgBuildEdge[] } + | null + | undefined; + + batches.push({ + name: def.name, + file: relPath, + line: def.line, + blocks: cfgData?.blocks?.length + ? cfgData.blocks.map((b) => ({ + index: b.index, + type: b.type, + startLine: b.startLine, + endLine: b.endLine, + label: b.label, + })) + : [], + edges: cfgData?.blocks?.length + ? (cfgData.edges || []).map((e) => ({ + sourceIndex: e.sourceIndex, + targetIndex: e.targetIndex, + kind: e.kind, + })) + : [], + }); + } + } + + if (!needsJsFallback) { + const processed = native.bulkInsertCfg(db.name, batches); + const expectedFunctions = batches.filter((b) => b.blocks.length > 0).length; + if (processed === batches.length || expectedFunctions === 0) { + if (expectedFunctions > 0) { + info(`CFG: ${expectedFunctions} functions analyzed (native bulk)`); + } + return; + } + debug( + `CFG: bulk insert expected ${batches.length} functions, got ${processed} — falling back to JS`, + ); + // fall through to JS path + } + // fall through to JS path + } + } + + // ── JS fallback path ────────────────────────────────────────────────── const extToLang = buildExtToLangMap(); let parsers: unknown = null; let getParserFn: unknown = null; diff --git a/src/features/dataflow.ts b/src/features/dataflow.ts index 8315b524..5c249c9a 100644 --- a/src/features/dataflow.ts +++ b/src/features/dataflow.ts @@ -22,6 +22,7 @@ import { createDataflowVisitor } from '../ast-analysis/visitors/dataflow-visitor import { hasDataflowTable, openReadonlyOrFail } from '../db/index.js'; import { ALL_SYMBOL_KINDS, normalizeSymbol } from '../domain/queries.js'; import { debug, info } from '../infrastructure/logger.js'; +import { loadNative } from '../infrastructure/native.js'; import { isTestFile } from '../infrastructure/test-filter.js'; import { paginateResult } from '../shared/paginate.js'; import type { BetterSqlite3Database, NodeRow, TreeSitterNode } from '../types.js'; @@ -244,6 +245,109 @@ export async function buildDataflowEdges( _engineOpts?: unknown, ): Promise { const extToLang = buildExtToLangMap(); + + // ── Native bulk-insert fast path ────────────────────────────────────── + const native = loadNative(); + if (native?.bulkInsertDataflow) { + let needsJsFallback = false; + const batches: Array<{ + file: string; + edges: Array<{ + sourceName: string; + targetName: string; + kind: string; + paramIndex?: number | null; + expression?: string | null; + line?: number | null; + confidence: number; + }>; + }> = []; + + for (const [relPath, symbols] of fileSymbols) { + const ext = path.extname(relPath).toLowerCase(); + if (!DATAFLOW_EXTENSIONS.has(ext)) continue; + + // If we have pre-computed dataflow (from native extraction or unified walk), + // collect the edges directly + const data = symbols.dataflow; + if (!data) { + // Need WASM fallback for this file + if (!symbols._tree) { + needsJsFallback = true; + break; + } + // Has _tree but no dataflow — will be handled by visitor in engine, + // but if we got here the engine already ran. Skip this file. + continue; + } + + const fileEdges: (typeof batches)[0]['edges'] = []; + + for (const flow of data.argFlows as ArgFlow[]) { + if (flow.callerFunc && flow.calleeName) { + fileEdges.push({ + sourceName: flow.callerFunc, + targetName: flow.calleeName, + kind: 'flows_to', + paramIndex: flow.argIndex, + expression: flow.expression, + line: flow.line, + confidence: flow.confidence, + }); + } + } + + for (const assignment of data.assignments as Assignment[]) { + if (assignment.sourceCallName && assignment.callerFunc) { + fileEdges.push({ + sourceName: assignment.sourceCallName, + targetName: assignment.callerFunc, + kind: 'returns', + paramIndex: null, + expression: assignment.expression, + line: assignment.line, + confidence: 1.0, + }); + } + } + + for (const mut of data.mutations as Mutation[]) { + if (mut.funcName && mut.binding?.type === 'param') { + fileEdges.push({ + sourceName: mut.funcName, + targetName: mut.funcName, + kind: 'mutates', + paramIndex: null, + expression: mut.mutatingExpr, + line: mut.line, + confidence: 1.0, + }); + } + } + + if (fileEdges.length > 0) { + batches.push({ file: relPath, edges: fileEdges }); + } + } + + if (!needsJsFallback) { + const inserted = native.bulkInsertDataflow(db.name, batches); + const expectedEdges = batches.reduce((s, b) => s + b.edges.length, 0); + if (inserted === expectedEdges || expectedEdges === 0) { + if (inserted > 0) { + info(`Dataflow: ${inserted} edges inserted (native bulk)`); + } + return; + } + debug( + `Dataflow: bulk insert expected ${expectedEdges} edges, got ${inserted} — falling back to JS`, + ); + // fall through to JS path + } + // fall through to JS path + } + + // ── JS fallback path ────────────────────────────────────────────────── const { parsers, getParserFn } = await initDataflowParsers(fileSymbols); const insert = db.prepare( diff --git a/src/types.ts b/src/types.ts index 41058cce..bdbc29a9 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1815,6 +1815,41 @@ export interface NativeAddon { }>; }>, ): number; + bulkInsertCfg( + dbPath: string, + batches: Array<{ + name: string; + file: string; + line: number; + blocks: Array<{ + index: number; + type: string; + startLine?: number | null; + endLine?: number | null; + label?: string | null; + }>; + edges: Array<{ + sourceIndex: number; + targetIndex: number; + kind: string; + }>; + }>, + ): number; + bulkInsertDataflow( + dbPath: string, + batches: Array<{ + file: string; + edges: Array<{ + sourceName: string; + targetName: string; + kind: string; + paramIndex?: number | null; + expression?: string | null; + line?: number | null; + confidence: number; + }>; + }>, + ): number; engineVersion(): string; ParseTreeCache: new () => NativeParseTreeCache; }