From 42fafca01b645cdccefd15f4d9cb16f155dd3801 Mon Sep 17 00:00:00 2001 From: carlos-alm <127798846+carlos-alm@users.noreply.github.com> Date: Thu, 26 Mar 2026 22:12:31 -0600 Subject: [PATCH 1/8] perf(ast): bulk-insert AST nodes via native Rust/rusqlite MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move AST node SQLite inserts from per-row JS iteration to a single native Rust transaction via napi-rs + rusqlite. The new bulkInsertAstNodes function opens the DB directly from Rust, pre-fetches parent node definitions, and inserts all rows in one transaction — eliminating the JS-native FFI overhead per row. The JS-side buildAstNodes tries the native fast path first (when all files have native astNodes arrays), falling back to the existing JS loop for WASM or mixed-engine scenarios. Target: astMs < 50ms on native full builds (was ~393ms). --- crates/codegraph-core/Cargo.toml | 1 + crates/codegraph-core/src/ast_db.rs | 160 ++++++++++++++++++++++++++++ crates/codegraph-core/src/lib.rs | 1 + src/features/ast.ts | 42 ++++++++ src/types.ts | 13 +++ 5 files changed, 217 insertions(+) create mode 100644 crates/codegraph-core/src/ast_db.rs diff --git a/crates/codegraph-core/Cargo.toml b/crates/codegraph-core/Cargo.toml index d968ad1c..a9b5fda8 100644 --- a/crates/codegraph-core/Cargo.toml +++ b/crates/codegraph-core/Cargo.toml @@ -24,6 +24,7 @@ tree-sitter-ruby = "0.23" tree-sitter-php = "0.23" tree-sitter-hcl = "1" rayon = "1" +rusqlite = { version = "0.32", features = ["bundled"] } send_wrapper = "0.6" [build-dependencies] diff --git a/crates/codegraph-core/src/ast_db.rs b/crates/codegraph-core/src/ast_db.rs new file mode 100644 index 00000000..b2807943 --- /dev/null +++ b/crates/codegraph-core/src/ast_db.rs @@ -0,0 +1,160 @@ +//! Bulk AST node insertion via rusqlite. +//! +//! Bypasses the JS iteration loop by opening the SQLite database directly +//! from Rust and inserting all AST nodes in a single transaction. +//! Parent node IDs are resolved by querying the `nodes` table. + +use std::collections::HashMap; + +use napi_derive::napi; +use rusqlite::{params, Connection, OpenFlags}; +use serde::{Deserialize, Serialize}; + +/// A single AST node to insert (received from JS). +#[napi(object)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AstInsertNode { + pub line: u32, + pub kind: String, + pub name: String, + pub text: Option, + pub receiver: Option, +} + +/// A batch of AST nodes for a single file. +#[napi(object)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FileAstBatch { + pub file: String, + pub nodes: Vec, +} + +/// A definition row from the `nodes` table used for parent resolution. +struct NodeDef { + id: i64, + line: u32, + end_line: Option, +} + +/// Find the narrowest enclosing definition for a given source line. +/// Returns the node ID of the best match, or None if no definition encloses this line. +fn find_parent_id(defs: &[NodeDef], line: u32) -> Option { + let mut best_id: Option = None; + let mut best_span = u32::MAX; + for d in defs { + if d.line <= line { + if let Some(el) = d.end_line { + if el >= line { + let span = el - d.line; + if span < best_span { + best_id = Some(d.id); + best_span = span; + } + } + } + } + } + best_id +} + +/// Bulk-insert AST nodes into the database, resolving `parent_node_id` +/// from the `nodes` table. Runs all inserts in a single SQLite transaction. +/// +/// Returns the number of rows inserted. Returns 0 on any error (DB open +/// failure, missing table, transaction failure). +#[napi] +pub fn bulk_insert_ast_nodes(db_path: String, batches: Vec) -> u32 { + if batches.is_empty() { + return 0; + } + + let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX; + let mut conn = match Connection::open_with_flags(&db_path, flags) { + Ok(c) => c, + Err(_) => return 0, + }; + + // Match the JS-side performance pragmas + let _ = conn.execute_batch("PRAGMA synchronous = NORMAL"); + + // Bail out if the ast_nodes table doesn't exist (schema too old) + let has_table: bool = conn + .prepare("SELECT 1 FROM sqlite_master WHERE type='table' AND name='ast_nodes'") + .and_then(|mut s| s.query_row([], |_| Ok(true))) + .unwrap_or(false); + if !has_table { + return 0; + } + + // ── Phase 1: Pre-fetch node definitions for parent resolution ──────── + let mut file_defs: HashMap> = HashMap::new(); + { + let Ok(mut stmt) = + conn.prepare("SELECT id, line, end_line FROM nodes WHERE file = ?1") + else { + return 0; + }; + + for batch in &batches { + if batch.nodes.is_empty() || file_defs.contains_key(&batch.file) { + continue; + } + let defs: Vec = stmt + .query_map(params![&batch.file], |row| { + Ok(NodeDef { + id: row.get(0)?, + line: row.get(1)?, + end_line: row.get(2)?, + }) + }) + .map(|rows| rows.filter_map(|r| r.ok()).collect()) + .unwrap_or_default(); + file_defs.insert(batch.file.clone(), defs); + } + } // `stmt` dropped — releases the immutable borrow on `conn` + + // ── Phase 2: Bulk insert in a single transaction ───────────────────── + let Ok(tx) = conn.transaction() else { + return 0; + }; + + let mut total = 0u32; + { + let Ok(mut insert_stmt) = tx.prepare( + "INSERT INTO ast_nodes (file, line, kind, name, text, receiver, parent_node_id) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + ) else { + return 0; + }; + + for batch in &batches { + let empty = Vec::new(); + let defs = file_defs.get(&batch.file).unwrap_or(&empty); + + for node in &batch.nodes { + let parent_id = find_parent_id(defs, node.line); + + if insert_stmt + .execute(params![ + &batch.file, + node.line, + &node.kind, + &node.name, + &node.text, + &node.receiver, + parent_id, + ]) + .is_ok() + { + total += 1; + } + } + } + } // `insert_stmt` dropped + + if tx.commit().is_err() { + return 0; + } + + total +} diff --git a/crates/codegraph-core/src/lib.rs b/crates/codegraph-core/src/lib.rs index 6d3aa6d0..391f0854 100644 --- a/crates/codegraph-core/src/lib.rs +++ b/crates/codegraph-core/src/lib.rs @@ -1,3 +1,4 @@ +pub mod ast_db; pub mod cfg; pub mod complexity; pub mod constants; diff --git a/src/features/ast.ts b/src/features/ast.ts index 55307fa0..30918662 100644 --- a/src/features/ast.ts +++ b/src/features/ast.ts @@ -6,6 +6,7 @@ import { createAstStoreVisitor } from '../ast-analysis/visitors/ast-store-visito import { bulkNodeIdsByFile, openReadonlyOrFail } from '../db/index.js'; import { buildFileConditionSQL } from '../db/query-builder.js'; import { debug } from '../infrastructure/logger.js'; +import { loadNative } from '../infrastructure/native.js'; import { outputResult } from '../infrastructure/result-formatter.js'; import { paginateResult } from '../shared/paginate.js'; import type { ASTNodeKind, BetterSqlite3Database, Definition, TreeSitterNode } from '../types.js'; @@ -67,6 +68,47 @@ export async function buildAstNodes( _rootDir: string, _engineOpts?: unknown, ): Promise { + // ── Native bulk-insert fast path ────────────────────────────────────── + const native = loadNative(); + if (native?.bulkInsertAstNodes) { + let needsJsFallback = false; + const batches: Array<{ + file: string; + nodes: Array<{ + line: number; + kind: string; + name: string; + text?: string | null; + receiver?: string | null; + }>; + }> = []; + + for (const [relPath, symbols] of fileSymbols) { + if (Array.isArray(symbols.astNodes)) { + batches.push({ + file: relPath, + nodes: symbols.astNodes.map((n) => ({ + line: n.line, + kind: n.kind, + name: n.name, + text: n.text, + receiver: n.receiver, + })), + }); + } else if (symbols.calls || symbols._tree) { + needsJsFallback = true; + break; + } + } + + if (!needsJsFallback) { + const inserted = native.bulkInsertAstNodes(db.name, batches); + debug(`AST extraction (native bulk): ${inserted} nodes stored`); + return; + } + } + + // ── JS fallback path ────────────────────────────────────────────────── let insertStmt: ReturnType; try { insertStmt = db.prepare( diff --git a/src/types.ts b/src/types.ts index 7dc1236b..41058cce 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1802,6 +1802,19 @@ export interface NativeAddon { computeConfidence(callerFile: string, targetFile: string, importedFrom: string | null): number; detectCycles(edges: Array<{ source: string; target: string }>): string[][]; buildCallEdges(files: unknown[], nodes: unknown[], builtinReceivers: string[]): unknown[]; + bulkInsertAstNodes( + dbPath: string, + batches: Array<{ + file: string; + nodes: Array<{ + line: number; + kind: string; + name: string; + text?: string | null; + receiver?: string | null; + }>; + }>, + ): number; engineVersion(): string; ParseTreeCache: new () => NativeParseTreeCache; } From 89a1ddb63d37f5b6130a9e63bccc9f836045fa31 Mon Sep 17 00:00:00 2001 From: carlos-alm <127798846+carlos-alm@users.noreply.github.com> Date: Thu, 26 Mar 2026 22:42:42 -0600 Subject: [PATCH 2/8] fix(ast): add busy_timeout pragma to Rust SQLite connection (#651) The Rust connection omitted busy_timeout = 5000 which the JS-side connection.ts sets. Without it, SQLITE_BUSY is returned immediately on WAL contention instead of retrying for 5 seconds. --- crates/codegraph-core/src/ast_db.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/codegraph-core/src/ast_db.rs b/crates/codegraph-core/src/ast_db.rs index b2807943..816ada7e 100644 --- a/crates/codegraph-core/src/ast_db.rs +++ b/crates/codegraph-core/src/ast_db.rs @@ -74,8 +74,10 @@ pub fn bulk_insert_ast_nodes(db_path: String, batches: Vec) -> u32 Err(_) => return 0, }; - // Match the JS-side performance pragmas - let _ = conn.execute_batch("PRAGMA synchronous = NORMAL"); + // Match the JS-side performance pragmas (including busy_timeout for WAL contention) + let _ = conn.execute_batch( + "PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000", + ); // Bail out if the ast_nodes table doesn't exist (schema too old) let has_table: bool = conn From 4d91d0b14c74f3904869e83843b7e51d5fdc2e4d Mon Sep 17 00:00:00 2001 From: carlos-alm <127798846+carlos-alm@users.noreply.github.com> Date: Thu, 26 Mar 2026 22:43:16 -0600 Subject: [PATCH 3/8] fix(ast): fall back to JS when native bulk insert count mismatches (#651) bulkInsertAstNodes returns 0 for both "nothing to insert" and hard errors (DB open failure, SQLITE_BUSY, etc). Compare expected vs actual count and fall through to the JS path on mismatch so errors don't silently drop all AST nodes. --- src/features/ast.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/features/ast.ts b/src/features/ast.ts index 30918662..6edd428f 100644 --- a/src/features/ast.ts +++ b/src/features/ast.ts @@ -102,9 +102,16 @@ export async function buildAstNodes( } if (!needsJsFallback) { + const expectedNodes = batches.reduce((s, b) => s + b.nodes.length, 0); const inserted = native.bulkInsertAstNodes(db.name, batches); - debug(`AST extraction (native bulk): ${inserted} nodes stored`); - return; + if (inserted === expectedNodes) { + debug(`AST extraction (native bulk): ${inserted} nodes stored`); + return; + } + debug( + `AST extraction (native bulk): expected ${expectedNodes}, got ${inserted} — falling back to JS`, + ); + // fall through to JS path } } From d4cbd3274a4e2d6fd0c6c357377bb331a1d4c77e Mon Sep 17 00:00:00 2001 From: carlos-alm <127798846+carlos-alm@users.noreply.github.com> Date: Thu, 26 Mar 2026 22:43:40 -0600 Subject: [PATCH 4/8] docs(cargo): document rusqlite bundled feature rationale (#651) Explain why bundled is intentional: Windows CI lacks system SQLite, and dual-instance WAL coordination is OS-safe. --- crates/codegraph-core/Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/codegraph-core/Cargo.toml b/crates/codegraph-core/Cargo.toml index a9b5fda8..e7cd155d 100644 --- a/crates/codegraph-core/Cargo.toml +++ b/crates/codegraph-core/Cargo.toml @@ -24,6 +24,9 @@ tree-sitter-ruby = "0.23" tree-sitter-php = "0.23" tree-sitter-hcl = "1" rayon = "1" +# `bundled` embeds a second SQLite copy (better-sqlite3 already bundles one). +# This is intentional: Windows CI lacks a system SQLite, and WAL coordination +# between the two instances is handled safely at the OS level. rusqlite = { version = "0.32", features = ["bundled"] } send_wrapper = "0.6" From 5be93ca96356fdc31e91be8406ef5343abe95bc9 Mon Sep 17 00:00:00 2001 From: carlos-alm <127798846+carlos-alm@users.noreply.github.com> Date: Thu, 26 Mar 2026 22:53:35 -0600 Subject: [PATCH 5/8] fix(ast): match JS findParentDef semantics for null end_line (#651) The Rust find_parent_id skipped definitions with end_line = NULL, but the JS findParentDef treats them as always-enclosing with a negative span (preferred over wider defs). This caused parent_node_id mismatches between native and JS paths. --- crates/codegraph-core/src/ast_db.rs | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/crates/codegraph-core/src/ast_db.rs b/crates/codegraph-core/src/ast_db.rs index 816ada7e..be821bed 100644 --- a/crates/codegraph-core/src/ast_db.rs +++ b/crates/codegraph-core/src/ast_db.rs @@ -38,19 +38,24 @@ struct NodeDef { /// Find the narrowest enclosing definition for a given source line. /// Returns the node ID of the best match, or None if no definition encloses this line. +/// +/// Mirrors the JS `findParentDef` semantics: a definition with `end_line = NULL` +/// is treated as always enclosing, with a negative sentinel span so it is preferred +/// over definitions that have an explicit (wider) `end_line`. fn find_parent_id(defs: &[NodeDef], line: u32) -> Option { let mut best_id: Option = None; - let mut best_span = u32::MAX; + let mut best_span: i64 = i64::MAX; for d in defs { if d.line <= line { - if let Some(el) = d.end_line { - if el >= line { - let span = el - d.line; - if span < best_span { - best_id = Some(d.id); - best_span = span; - } - } + let span: i64 = match d.end_line { + Some(el) if el >= line => (el - d.line) as i64, + Some(_) => continue, + // JS: (def.endLine ?? 0) - def.line → negative, always preferred + None => -(d.line as i64), + }; + if span < best_span { + best_id = Some(d.id); + best_span = span; } } } From 6dc0a80299d6bb450c722089c30e9a95555d66fa Mon Sep 17 00:00:00 2001 From: carlos-alm <127798846+carlos-alm@users.noreply.github.com> Date: Thu, 26 Mar 2026 23:18:22 -0600 Subject: [PATCH 6/8] fix(ast): treat row-level execute errors as fatal for transaction (#651) Return 0 immediately on any insert_stmt.execute() failure so the transaction drops and rolls back, ensuring all-or-nothing semantics. Previously, .is_ok() silently swallowed row-level errors which could commit partial data and misfire the JS fallback causing duplicate rows. --- crates/codegraph-core/src/ast_db.rs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/crates/codegraph-core/src/ast_db.rs b/crates/codegraph-core/src/ast_db.rs index be821bed..4f317db1 100644 --- a/crates/codegraph-core/src/ast_db.rs +++ b/crates/codegraph-core/src/ast_db.rs @@ -141,19 +141,17 @@ pub fn bulk_insert_ast_nodes(db_path: String, batches: Vec) -> u32 for node in &batch.nodes { let parent_id = find_parent_id(defs, node.line); - if insert_stmt - .execute(params![ - &batch.file, - node.line, - &node.kind, - &node.name, - &node.text, - &node.receiver, - parent_id, - ]) - .is_ok() - { - total += 1; + match insert_stmt.execute(params![ + &batch.file, + node.line, + &node.kind, + &node.name, + &node.text, + &node.receiver, + parent_id, + ]) { + Ok(_) => total += 1, + Err(_) => return 0, // abort; tx rolls back on drop } } } From 36d65b72da08b0c7620acc57bc2ff2f4f6ba3f13 Mon Sep 17 00:00:00 2001 From: carlos-alm <127798846+carlos-alm@users.noreply.github.com> Date: Fri, 27 Mar 2026 00:08:45 -0600 Subject: [PATCH 7/8] perf(db): bulk CFG and dataflow DB writes via rusqlite (#6.10) Move CFG block/edge and dataflow edge inserts from JS iteration to Rust bulk operations, following the same pattern as bulk_insert_ast_nodes (6.9). Rust side: - cfg_db.rs: bulk_insert_cfg() resolves function node IDs, deletes stale data, inserts blocks+edges in a single rusqlite transaction - dataflow_db.rs: bulk_insert_dataflow() pre-builds node resolution cache (local-first, global fallback), inserts edges in a single transaction JS side: - cfg.ts: native fast path collects CfgFunctionBatch[] and delegates to Rust when all CFG is pre-computed by the native engine - dataflow.ts: native fast path converts DataflowResult (argFlows, assignments, mutations) into FileDataflowBatch[] for Rust insertion - Both fall back to existing JS paths when native addon is unavailable Target: cfgMs + dataflowMs < 50ms combined (from ~286ms with JS iteration) --- crates/codegraph-core/src/cfg_db.rs | 199 +++++++++++++++++++++++ crates/codegraph-core/src/dataflow_db.rs | 185 +++++++++++++++++++++ crates/codegraph-core/src/lib.rs | 2 + src/features/cfg.ts | 82 ++++++++++ src/features/dataflow.ts | 95 +++++++++++ src/types.ts | 35 ++++ 6 files changed, 598 insertions(+) create mode 100644 crates/codegraph-core/src/cfg_db.rs create mode 100644 crates/codegraph-core/src/dataflow_db.rs diff --git a/crates/codegraph-core/src/cfg_db.rs b/crates/codegraph-core/src/cfg_db.rs new file mode 100644 index 00000000..5381218a --- /dev/null +++ b/crates/codegraph-core/src/cfg_db.rs @@ -0,0 +1,199 @@ +//! Bulk CFG block and edge insertion via rusqlite. +//! +//! Bypasses the JS iteration loop by opening the SQLite database directly +//! from Rust and inserting all CFG blocks and edges in a single transaction. +//! Function node IDs are resolved by querying the `nodes` table. + +use std::collections::HashMap; + +use napi_derive::napi; +use rusqlite::{params, Connection, OpenFlags}; +use serde::{Deserialize, Serialize}; + +/// A single CFG block to insert (received from JS). +#[napi(object)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CfgInsertBlock { + pub index: u32, + #[napi(js_name = "type")] + pub block_type: String, + #[napi(js_name = "startLine")] + pub start_line: Option, + #[napi(js_name = "endLine")] + pub end_line: Option, + pub label: Option, +} + +/// A single CFG edge to insert (received from JS). +#[napi(object)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CfgInsertEdge { + #[napi(js_name = "sourceIndex")] + pub source_index: u32, + #[napi(js_name = "targetIndex")] + pub target_index: u32, + pub kind: String, +} + +/// CFG data for a single function definition. +#[napi(object)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CfgFunctionBatch { + /// Definition name (used to look up node ID) + pub name: String, + /// Relative file path + pub file: String, + /// Definition source line + pub line: u32, + pub blocks: Vec, + pub edges: Vec, +} + +/// Bulk-insert CFG blocks and edges into the database. +/// +/// For each function batch: +/// 1. Resolve the function's node ID from the `nodes` table +/// 2. Delete any existing CFG data for that node (handles incremental rebuilds) +/// 3. Insert all blocks, collecting their auto-generated row IDs +/// 4. Insert all edges, mapping block indices to row IDs +/// +/// Returns the total number of functions processed. Returns 0 on any error. +#[napi] +pub fn bulk_insert_cfg(db_path: String, batches: Vec) -> u32 { + if batches.is_empty() { + return 0; + } + + let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX; + let mut conn = match Connection::open_with_flags(&db_path, flags) { + Ok(c) => c, + Err(_) => return 0, + }; + + let _ = conn.execute_batch( + "PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000", + ); + + // Bail out if CFG tables don't exist + let has_tables: bool = conn + .prepare("SELECT 1 FROM sqlite_master WHERE type='table' AND name='cfg_blocks'") + .and_then(|mut s| s.query_row([], |_| Ok(true))) + .unwrap_or(false); + if !has_tables { + return 0; + } + + // ── Phase 1: Pre-fetch function node IDs ───────────────────────────── + let mut node_ids: HashMap<(String, String, u32), i64> = HashMap::new(); + { + let Ok(mut stmt) = conn.prepare( + "SELECT id FROM nodes WHERE name = ?1 AND kind IN ('function','method') AND file = ?2 AND line = ?3", + ) else { + return 0; + }; + + for batch in &batches { + let key = (batch.name.clone(), batch.file.clone(), batch.line); + if node_ids.contains_key(&key) { + continue; + } + if let Ok(id) = stmt.query_row(params![&batch.name, &batch.file, batch.line], |row| { + row.get::<_, i64>(0) + }) { + node_ids.insert(key, id); + } + } + } + + // ── Phase 2: Bulk insert in a single transaction ───────────────────── + let Ok(tx) = conn.transaction() else { + return 0; + }; + + let mut total = 0u32; + { + let Ok(mut delete_edges) = + tx.prepare("DELETE FROM cfg_edges WHERE function_node_id = ?1") + else { + return 0; + }; + let Ok(mut delete_blocks) = + tx.prepare("DELETE FROM cfg_blocks WHERE function_node_id = ?1") + else { + return 0; + }; + let Ok(mut insert_block) = tx.prepare( + "INSERT INTO cfg_blocks (function_node_id, block_index, block_type, start_line, end_line, label) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + ) else { + return 0; + }; + let Ok(mut insert_edge) = tx.prepare( + "INSERT INTO cfg_edges (function_node_id, source_block_id, target_block_id, kind) \ + VALUES (?1, ?2, ?3, ?4)", + ) else { + return 0; + }; + + for batch in &batches { + let key = (batch.name.clone(), batch.file.clone(), batch.line); + let Some(&node_id) = node_ids.get(&key) else { + continue; + }; + + // Always delete stale CFG rows (handles body-removed / incremental case) + if delete_edges.execute(params![node_id]).is_err() { + return 0; + } + if delete_blocks.execute(params![node_id]).is_err() { + return 0; + } + + if batch.blocks.is_empty() { + total += 1; + continue; + } + + // Insert blocks and collect their auto-generated row IDs + let mut block_db_ids: HashMap = HashMap::new(); + for block in &batch.blocks { + match insert_block.execute(params![ + node_id, + block.index, + &block.block_type, + block.start_line, + block.end_line, + &block.label, + ]) { + Ok(_) => { + block_db_ids.insert(block.index, tx.last_insert_rowid()); + } + Err(_) => return 0, + } + } + + // Insert edges, mapping block indices to row IDs + for edge in &batch.edges { + let Some(&source_db_id) = block_db_ids.get(&edge.source_index) else { + continue; + }; + let Some(&target_db_id) = block_db_ids.get(&edge.target_index) else { + continue; + }; + match insert_edge.execute(params![node_id, source_db_id, target_db_id, &edge.kind]) + { + Ok(_) => {} + Err(_) => return 0, + } + } + + total += 1; + } + } + + if tx.commit().is_err() { + return 0; + } + + total +} diff --git a/crates/codegraph-core/src/dataflow_db.rs b/crates/codegraph-core/src/dataflow_db.rs new file mode 100644 index 00000000..fea11e01 --- /dev/null +++ b/crates/codegraph-core/src/dataflow_db.rs @@ -0,0 +1,185 @@ +//! Bulk dataflow edge insertion via rusqlite. +//! +//! Bypasses the JS iteration loop by opening the SQLite database directly +//! from Rust and inserting all dataflow edges in a single transaction. +//! Node IDs are resolved by querying the `nodes` table (local-first, then global). + +use std::collections::HashMap; + +use napi_derive::napi; +use rusqlite::{params, Connection, OpenFlags}; +use serde::{Deserialize, Serialize}; + +/// A single dataflow edge to insert (received from JS). +#[napi(object)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DataflowInsertEdge { + /// Source function name (resolved to node ID) + #[napi(js_name = "sourceName")] + pub source_name: String, + /// Target function name (resolved to node ID) + #[napi(js_name = "targetName")] + pub target_name: String, + /// Edge kind: "flows_to", "returns", or "mutates" + pub kind: String, + #[napi(js_name = "paramIndex")] + pub param_index: Option, + pub expression: Option, + pub line: Option, + pub confidence: f64, +} + +/// A batch of dataflow edges for a single file. +#[napi(object)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FileDataflowBatch { + /// Relative file path (for local-first node resolution) + pub file: String, + pub edges: Vec, +} + +/// Resolve a function name to a node ID, preferring local (same-file) matches. +fn resolve_node( + local_stmt: &mut rusqlite::Statement, + global_stmt: &mut rusqlite::Statement, + name: &str, + file: &str, + cache: &mut HashMap<(String, String), Option>, +) -> Option { + let key = (name.to_string(), file.to_string()); + if let Some(cached) = cache.get(&key) { + return *cached; + } + + // Local-first: same file + let result = local_stmt + .query_row(params![name, file], |row| row.get::<_, i64>(0)) + .ok(); + + let id = if result.is_some() { + result + } else { + // Global fallback + global_stmt + .query_row(params![name], |row| row.get::<_, i64>(0)) + .ok() + }; + + cache.insert(key, id); + id +} + +/// Bulk-insert dataflow edges into the database. +/// +/// For each file batch, resolves function names to node IDs (local-first, +/// then global) and inserts edges in a single transaction. +/// +/// Returns the total number of edges inserted. Returns 0 on any error. +#[napi] +pub fn bulk_insert_dataflow(db_path: String, batches: Vec) -> u32 { + if batches.is_empty() { + return 0; + } + + let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX; + let mut conn = match Connection::open_with_flags(&db_path, flags) { + Ok(c) => c, + Err(_) => return 0, + }; + + let _ = conn.execute_batch( + "PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000", + ); + + // Bail out if the dataflow table doesn't exist + let has_table: bool = conn + .prepare("SELECT 1 FROM sqlite_master WHERE type='table' AND name='dataflow'") + .and_then(|mut s| s.query_row([], |_| Ok(true))) + .unwrap_or(false); + if !has_table { + return 0; + } + + // ── Phase 1: Pre-build node resolution cache ───────────────────────── + // Collect all unique (name, file) pairs we need to resolve + let mut resolve_cache: HashMap<(String, String), Option> = HashMap::new(); + { + let Ok(mut local_stmt) = conn.prepare( + "SELECT id FROM nodes WHERE name = ?1 AND file = ?2 AND kind IN ('function','method') LIMIT 1", + ) else { + return 0; + }; + let Ok(mut global_stmt) = conn.prepare( + "SELECT id FROM nodes WHERE name = ?1 AND kind IN ('function','method') ORDER BY file, line LIMIT 1", + ) else { + return 0; + }; + + for batch in &batches { + for edge in &batch.edges { + resolve_node( + &mut local_stmt, + &mut global_stmt, + &edge.source_name, + &batch.file, + &mut resolve_cache, + ); + resolve_node( + &mut local_stmt, + &mut global_stmt, + &edge.target_name, + &batch.file, + &mut resolve_cache, + ); + } + } + } + + // ── Phase 2: Bulk insert in a single transaction ───────────────────── + let Ok(tx) = conn.transaction() else { + return 0; + }; + + let mut total = 0u32; + { + let Ok(mut insert_stmt) = tx.prepare( + "INSERT INTO dataflow (source_id, target_id, kind, param_index, expression, line, confidence) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + ) else { + return 0; + }; + + for batch in &batches { + for edge in &batch.edges { + let source_key = (edge.source_name.clone(), batch.file.clone()); + let target_key = (edge.target_name.clone(), batch.file.clone()); + + let Some(&Some(source_id)) = resolve_cache.get(&source_key) else { + continue; + }; + let Some(&Some(target_id)) = resolve_cache.get(&target_key) else { + continue; + }; + + match insert_stmt.execute(params![ + source_id, + target_id, + &edge.kind, + edge.param_index, + &edge.expression, + edge.line, + edge.confidence, + ]) { + Ok(_) => total += 1, + Err(_) => return 0, + } + } + } + } + + if tx.commit().is_err() { + return 0; + } + + total +} diff --git a/crates/codegraph-core/src/lib.rs b/crates/codegraph-core/src/lib.rs index 391f0854..984359b2 100644 --- a/crates/codegraph-core/src/lib.rs +++ b/crates/codegraph-core/src/lib.rs @@ -1,9 +1,11 @@ pub mod ast_db; pub mod cfg; +pub mod cfg_db; pub mod complexity; pub mod constants; pub mod cycles; pub mod dataflow; +pub mod dataflow_db; pub mod edge_builder; pub mod extractors; pub mod import_resolution; diff --git a/src/features/cfg.ts b/src/features/cfg.ts index 389ee3c2..06a8ae83 100644 --- a/src/features/cfg.ts +++ b/src/features/cfg.ts @@ -17,6 +17,7 @@ import { openReadonlyOrFail, } from '../db/index.js'; import { debug, info } from '../infrastructure/logger.js'; +import { loadNative } from '../infrastructure/native.js'; import { paginateResult } from '../shared/paginate.js'; import type { BetterSqlite3Database, Definition, NodeRow, TreeSitterNode } from '../types.js'; import { findNodes } from './shared/find-nodes.js'; @@ -285,6 +286,87 @@ export async function buildCFGData( // skip WASM parser init, tree parsing, and JS visitor entirely — just persist. const allNative = allCfgNative(fileSymbols); + // ── Native bulk-insert fast path ────────────────────────────────────── + // When all CFG data is pre-computed by Rust and no files need WASM visitor, + // bypass JS iteration entirely — collect batches and hand them to rusqlite. + if (allNative) { + const native = loadNative(); + if (native?.bulkInsertCfg) { + let needsJsFallback = false; + const batches: Array<{ + name: string; + file: string; + line: number; + blocks: Array<{ + index: number; + type: string; + startLine?: number | null; + endLine?: number | null; + label?: string | null; + }>; + edges: Array<{ + sourceIndex: number; + targetIndex: number; + kind: string; + }>; + }> = []; + + for (const [relPath, symbols] of fileSymbols) { + const ext = path.extname(relPath).toLowerCase(); + if (!CFG_EXTENSIONS.has(ext)) continue; + + // Files with _tree were WASM-parsed and need the slow path + if (symbols._tree) { + needsJsFallback = true; + break; + } + + for (const def of symbols.definitions) { + if (def.kind !== 'function' && def.kind !== 'method') continue; + if (!def.line) continue; + + const cfgData = def.cfg as unknown as + | { blocks: CfgBuildBlock[]; edges: CfgBuildEdge[] } + | null + | undefined; + + batches.push({ + name: def.name, + file: relPath, + line: def.line, + blocks: cfgData?.blocks?.length + ? cfgData.blocks.map((b) => ({ + index: b.index, + type: b.type, + startLine: b.startLine, + endLine: b.endLine, + label: b.label, + })) + : [], + edges: cfgData?.blocks?.length + ? (cfgData.edges || []).map((e) => ({ + sourceIndex: e.sourceIndex, + targetIndex: e.targetIndex, + kind: e.kind, + })) + : [], + }); + } + } + + if (!needsJsFallback) { + const processed = native.bulkInsertCfg(db.name, batches); + const withBlocks = batches.filter((b) => b.blocks.length > 0).length; + if (processed > 0) { + info(`CFG: ${withBlocks} functions analyzed (native bulk)`); + } + return; + } + // fall through to JS path + } + } + + // ── JS fallback path ────────────────────────────────────────────────── const extToLang = buildExtToLangMap(); let parsers: unknown = null; let getParserFn: unknown = null; diff --git a/src/features/dataflow.ts b/src/features/dataflow.ts index 8315b524..36a91ab3 100644 --- a/src/features/dataflow.ts +++ b/src/features/dataflow.ts @@ -22,6 +22,7 @@ import { createDataflowVisitor } from '../ast-analysis/visitors/dataflow-visitor import { hasDataflowTable, openReadonlyOrFail } from '../db/index.js'; import { ALL_SYMBOL_KINDS, normalizeSymbol } from '../domain/queries.js'; import { debug, info } from '../infrastructure/logger.js'; +import { loadNative } from '../infrastructure/native.js'; import { isTestFile } from '../infrastructure/test-filter.js'; import { paginateResult } from '../shared/paginate.js'; import type { BetterSqlite3Database, NodeRow, TreeSitterNode } from '../types.js'; @@ -244,6 +245,100 @@ export async function buildDataflowEdges( _engineOpts?: unknown, ): Promise { const extToLang = buildExtToLangMap(); + + // ── Native bulk-insert fast path ────────────────────────────────────── + const native = loadNative(); + if (native?.bulkInsertDataflow) { + let needsJsFallback = false; + const batches: Array<{ + file: string; + edges: Array<{ + sourceName: string; + targetName: string; + kind: string; + paramIndex?: number | null; + expression?: string | null; + line?: number | null; + confidence: number; + }>; + }> = []; + + for (const [relPath, symbols] of fileSymbols) { + const ext = path.extname(relPath).toLowerCase(); + if (!DATAFLOW_EXTENSIONS.has(ext)) continue; + + // If we have pre-computed dataflow (from native extraction or unified walk), + // collect the edges directly + const data = symbols.dataflow; + if (!data) { + // Need WASM fallback for this file + if (!symbols._tree) { + needsJsFallback = true; + break; + } + // Has _tree but no dataflow — will be handled by visitor in engine, + // but if we got here the engine already ran. Skip this file. + continue; + } + + const fileEdges: (typeof batches)[0]['edges'] = []; + + for (const flow of data.argFlows as ArgFlow[]) { + if (flow.callerFunc && flow.calleeName) { + fileEdges.push({ + sourceName: flow.callerFunc, + targetName: flow.calleeName, + kind: 'flows_to', + paramIndex: flow.argIndex, + expression: flow.expression, + line: flow.line, + confidence: flow.confidence, + }); + } + } + + for (const assignment of data.assignments as Assignment[]) { + if (assignment.sourceCallName && assignment.callerFunc) { + fileEdges.push({ + sourceName: assignment.sourceCallName, + targetName: assignment.callerFunc, + kind: 'returns', + paramIndex: null, + expression: assignment.expression, + line: assignment.line, + confidence: 1.0, + }); + } + } + + for (const mut of data.mutations as Mutation[]) { + if (mut.funcName && mut.binding?.type === 'param') { + fileEdges.push({ + sourceName: mut.funcName, + targetName: mut.funcName, + kind: 'mutates', + paramIndex: null, + expression: mut.mutatingExpr, + line: mut.line, + confidence: 1.0, + }); + } + } + + if (fileEdges.length > 0) { + batches.push({ file: relPath, edges: fileEdges }); + } + } + + if (!needsJsFallback) { + const inserted = native.bulkInsertDataflow(db.name, batches); + info(`Dataflow: ${inserted} edges inserted (native bulk)`); + return; + } + // fall through to JS path + } + + // ── JS fallback path ────────────────────────────────────────────────── const { parsers, getParserFn } = await initDataflowParsers(fileSymbols); const insert = db.prepare( diff --git a/src/types.ts b/src/types.ts index 41058cce..bdbc29a9 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1815,6 +1815,41 @@ export interface NativeAddon { }>; }>, ): number; + bulkInsertCfg( + dbPath: string, + batches: Array<{ + name: string; + file: string; + line: number; + blocks: Array<{ + index: number; + type: string; + startLine?: number | null; + endLine?: number | null; + label?: string | null; + }>; + edges: Array<{ + sourceIndex: number; + targetIndex: number; + kind: string; + }>; + }>, + ): number; + bulkInsertDataflow( + dbPath: string, + batches: Array<{ + file: string; + edges: Array<{ + sourceName: string; + targetName: string; + kind: string; + paramIndex?: number | null; + expression?: string | null; + line?: number | null; + confidence: number; + }>; + }>, + ): number; engineVersion(): string; ParseTreeCache: new () => NativeParseTreeCache; } From 1a20f46294eb5dea0d6680b37b61d78786bbed8a Mon Sep 17 00:00:00 2001 From: carlos-alm <127798846+carlos-alm@users.noreply.github.com> Date: Fri, 27 Mar 2026 01:22:44 -0600 Subject: [PATCH 8/8] fix(db): add JS fallback on bulk-insert count mismatch (#653) Match the ast.ts reference pattern: check the Rust return value against the expected count and fall through to the JS path when they disagree, preventing silent data loss on Rust-side errors. --- src/features/cfg.ts | 14 ++++++++++---- src/features/dataflow.ts | 13 +++++++++++-- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/features/cfg.ts b/src/features/cfg.ts index 06a8ae83..7cdfbb5e 100644 --- a/src/features/cfg.ts +++ b/src/features/cfg.ts @@ -356,11 +356,17 @@ export async function buildCFGData( if (!needsJsFallback) { const processed = native.bulkInsertCfg(db.name, batches); - const withBlocks = batches.filter((b) => b.blocks.length > 0).length; - if (processed > 0) { - info(`CFG: ${withBlocks} functions analyzed (native bulk)`); + const expectedFunctions = batches.filter((b) => b.blocks.length > 0).length; + if (processed === batches.length || expectedFunctions === 0) { + if (expectedFunctions > 0) { + info(`CFG: ${expectedFunctions} functions analyzed (native bulk)`); + } + return; } - return; + debug( + `CFG: bulk insert expected ${batches.length} functions, got ${processed} — falling back to JS`, + ); + // fall through to JS path } // fall through to JS path } diff --git a/src/features/dataflow.ts b/src/features/dataflow.ts index 36a91ab3..5c249c9a 100644 --- a/src/features/dataflow.ts +++ b/src/features/dataflow.ts @@ -332,8 +332,17 @@ export async function buildDataflowEdges( if (!needsJsFallback) { const inserted = native.bulkInsertDataflow(db.name, batches); - info(`Dataflow: ${inserted} edges inserted (native bulk)`); - return; + const expectedEdges = batches.reduce((s, b) => s + b.edges.length, 0); + if (inserted === expectedEdges || expectedEdges === 0) { + if (inserted > 0) { + info(`Dataflow: ${inserted} edges inserted (native bulk)`); + } + return; + } + debug( + `Dataflow: bulk insert expected ${expectedEdges} edges, got ${inserted} — falling back to JS`, + ); + // fall through to JS path } // fall through to JS path }