From 8bbf5b82f13ce12bcf50eecdd4561a5d9c919b69 Mon Sep 17 00:00:00 2001 From: Laith Al-Saadoon <9553966+theagenticguy@users.noreply.github.com> Date: Mon, 27 Apr 2026 07:19:10 -0700 Subject: [PATCH] perf(ingestion): cross-node embedding batching + worker pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The embeddings phase was pegged to one embedding per node per await, behind a single-threaded ONNX session — an AWSQuickWork run sat at 95% CPU for 7+ minutes on 1,922 files. Refactor into two stages: walk tiers once to collect (text, emitRow) jobs in canonical order, then dispatch in fixed-size batches across a configurable Piscina pool of OnnxEmbedder workers. Each wave fires workers × batchSize embeds concurrently and scatters vectors back into the row buffer. Row ordering and the embeddingsHash contract are preserved — confirmed by a new test that asserts byte-identical hashes across batchSize=1 vs 32. - New flags: --embeddings-workers , --embeddings-batch-size . - A main-thread canary OnnxEmbedder opens before the pool so EmbedderNotSetupError keeps its class identity across the structured-clone boundary. - HTTP backend unaffected (pool flag ignored when endpoint is set). --- packages/cli/src/commands/analyze.ts | 17 ++ packages/cli/src/index.ts | 42 +++++ .../src/pipeline/phases/embedder-pool.ts | 88 +++++++++ .../src/pipeline/phases/embedder-worker.ts | 66 +++++++ .../src/pipeline/phases/embeddings.test.ts | 30 ++++ .../src/pipeline/phases/embeddings.ts | 168 ++++++++++++------ packages/ingestion/src/pipeline/types.ts | 18 ++ 7 files changed, 377 insertions(+), 52 deletions(-) create mode 100644 packages/ingestion/src/pipeline/phases/embedder-pool.ts create mode 100644 packages/ingestion/src/pipeline/phases/embedder-worker.ts diff --git a/packages/cli/src/commands/analyze.ts b/packages/cli/src/commands/analyze.ts index 57eb168..eddc459 100644 --- a/packages/cli/src/commands/analyze.ts +++ b/packages/cli/src/commands/analyze.ts @@ -51,6 +51,19 @@ export interface AnalyzeOptions { * hierarchical index — enables `codehub query --zoom` coarse-to-fine. */ readonly embeddingsGranularity?: readonly ("symbol" | "file" | "community")[]; + /** + * Number of parallel ONNX embedder workers. Defaults to 1 (legacy + * single-threaded path). Values >= 2 fan inference out across a + * Piscina pool; each worker holds its own ~300 MB ONNX session, so + * scale with host memory in mind. Ignored under the HTTP backend. + */ + readonly embeddingsWorkers?: number; + /** + * Chunks per `embedBatch()` call. Defaults to 32. Larger batches + * amortize tokenizer + tensor-feed overhead but increase peak memory; + * `1` restores the pre-refactor one-node-per-call pattern. + */ + readonly embeddingsBatchSize?: number; readonly offline?: boolean; readonly verbose?: boolean; readonly skipAgentsMd?: boolean; @@ -203,6 +216,10 @@ export async function runAnalyze(path: string, opts: AnalyzeOptions = {}): Promi ...(opts.embeddingsGranularity !== undefined ? { embeddingsGranularity: opts.embeddingsGranularity } : {}), + ...(opts.embeddingsWorkers !== undefined ? { embeddingsWorkers: opts.embeddingsWorkers } : {}), + ...(opts.embeddingsBatchSize !== undefined + ? { embeddingsBatchSize: opts.embeddingsBatchSize } + : {}), ...(opts.sbom !== undefined ? { sbom: opts.sbom } : {}), ...(opts.coverage !== undefined ? { coverage: opts.coverage } : {}), summaries: summariesEnabled, diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index 39a685f..65f6f00 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -8,6 +8,7 @@ * run that subcommand. */ +import { cpus } from "node:os"; import { Command } from "commander"; const program = new Command() @@ -25,6 +26,14 @@ program "--granularity ", "Hierarchical embedding tiers to emit, comma-separated. Values: symbol, file, community. Default: symbol. Example: --granularity symbol,file,community", ) + .option( + "--embeddings-workers ", + 'Parallel ONNX embedder workers (each ~300 MB RSS on fp32). "auto" = os.cpus().length - 1, min 1. Default 1 (legacy in-process path).', + ) + .option( + "--embeddings-batch-size ", + "Chunks per embedBatch() call. Default 32. Set to 1 to restore the legacy one-node-per-call pattern.", + ) .option("--offline", "Assert no network access during analyze") .option("--verbose", "Emit per-phase pipeline progress") .option("--skip-agents-md", "Do not write the AGENTS.md / CLAUDE.md stanza") @@ -92,12 +101,16 @@ program } const granularity = parseGranularityCsv(opts["granularity"]); + const embeddingsWorkers = parseWorkerCount(opts["embeddingsWorkers"]); + const embeddingsBatchSize = parsePositiveInt(opts["embeddingsBatchSize"]); await mod.runAnalyze(path ?? process.cwd(), { force: opts["force"] === true, embeddings: opts["embeddings"] === true, embeddingsVariant: opts["embeddingsInt8"] === true ? "int8" : "fp32", ...(granularity !== undefined ? { embeddingsGranularity: granularity } : {}), + ...(embeddingsWorkers !== undefined ? { embeddingsWorkers } : {}), + ...(embeddingsBatchSize !== undefined ? { embeddingsBatchSize } : {}), offline: opts["offline"] === true, verbose: opts["verbose"] === true, skipAgentsMd: opts["skipAgentsMd"] === true, @@ -661,6 +674,35 @@ function parseGranularityCsv( return out; } +/** + * Parse `--embeddings-workers`. Accepts a positive integer or the literal + * "auto" (resolves to `os.cpus().length - 1`, floor 1). Returns undefined + * when the flag wasn't supplied so the pipeline picks its own default. + */ +function parseWorkerCount(raw: unknown): number | undefined { + if (raw === undefined) return undefined; + if (raw === "auto") { + return Math.max(1, cpus().length - 1); + } + const parsed = typeof raw === "number" ? raw : Number.parseInt(String(raw), 10); + if (!Number.isFinite(parsed) || parsed < 1) { + throw new Error( + `--embeddings-workers must be a positive integer or "auto"; got "${String(raw)}"`, + ); + } + return Math.floor(parsed); +} + +/** Parse a positive integer CLI flag, returning undefined when omitted. */ +function parsePositiveInt(raw: unknown): number | undefined { + if (raw === undefined) return undefined; + const parsed = typeof raw === "number" ? raw : Number.parseInt(String(raw), 10); + if (!Number.isFinite(parsed) || parsed < 1) { + throw new Error(`expected a positive integer; got "${String(raw)}"`); + } + return Math.floor(parsed); +} + function parseEditors( raw: string, ): readonly ("claude-code" | "cursor" | "codex" | "windsurf" | "opencode")[] { diff --git a/packages/ingestion/src/pipeline/phases/embedder-pool.ts b/packages/ingestion/src/pipeline/phases/embedder-pool.ts new file mode 100644 index 0000000..c02ca4a --- /dev/null +++ b/packages/ingestion/src/pipeline/phases/embedder-pool.ts @@ -0,0 +1,88 @@ +/** + * Worker-pool `Embedder` facade. + * + * Wraps a Piscina pool of OnnxEmbedder workers behind the same + * `{ embed, embedBatch, close }` surface the embeddings phase already + * consumes. Each worker holds its own OnnxEmbedder; tasks are partitioned + * by batch index so repeat runs visit the same work → same worker + * assignment for a fixed pool size, preserving byte-identical output. + * + * Determinism note: the underlying ONNX session pins all thread counts to 1 + * and disables graph optimization, so a given input text produces the same + * vector regardless of *which* worker computes it. The ordering guarantee + * here is belt-and-braces: we reassemble results in input order before + * returning. + */ + +import { fileURLToPath } from "node:url"; +import type { Embedder } from "@opencodehub/embedder"; +import { embedderModelId } from "@opencodehub/embedder"; +import { Piscina } from "piscina"; + +import type { EmbedBatchResult, EmbedBatchTask } from "./embedder-worker.js"; + +const WORKER_FILENAME = fileURLToPath(new URL("./embedder-worker.js", import.meta.url)); + +export interface EmbedderPoolOptions { + readonly workers: number; + readonly variant: "fp32" | "int8"; + readonly modelDir?: string; +} + +/** + * Open a worker-pool-backed embedder. Caller must invoke `close()` when + * done; that tears the pool down. + * + * The pool is sized with `minThreads === maxThreads === workers` so worker + * allocation is stable across a run. Workers lazy-load their OnnxEmbedder + * on first task, so pool construction itself is cheap. + */ +export function openOnnxEmbedderPool(opts: EmbedderPoolOptions): Embedder { + const workerData: { variant: "fp32" | "int8"; modelDir?: string } = { + variant: opts.variant, + }; + if (opts.modelDir !== undefined) workerData.modelDir = opts.modelDir; + + const pool = new Piscina({ + filename: WORKER_FILENAME, + minThreads: opts.workers, + maxThreads: opts.workers, + // Each worker owns an ~300 MB ONNX session; don't recycle on idle. + idleTimeout: Number.POSITIVE_INFINITY, + workerData, + }); + + let closed = false; + const dim = 768; // gte-modernbert-base — matches OnnxEmbedder's EMBED_DIM. + + async function embedBatch(texts: readonly string[]): Promise { + if (closed) throw new Error("Embedder pool is closed"); + if (texts.length === 0) return []; + const result = await pool.run({ texts: [...texts] }); + if (result.count === 0) return []; + const flat = new Float32Array(result.buffer); + const out: Float32Array[] = []; + for (let i = 0; i < result.count; i++) { + // Slice copies into a fresh, non-shared Float32Array so callers can + // hang onto each vector independently of the transport buffer. + out.push(flat.slice(i * result.dim, (i + 1) * result.dim)); + } + return out; + } + + return { + dim, + modelId: embedderModelId(opts.variant), + async embed(text: string): Promise { + const [vec] = await embedBatch([text]); + if (vec === undefined) throw new Error("embedBatch returned empty result"); + return vec; + }, + embedBatch, + async close(): Promise { + if (closed) return; + closed = true; + await pool.destroy(); + }, + }; +} diff --git a/packages/ingestion/src/pipeline/phases/embedder-worker.ts b/packages/ingestion/src/pipeline/phases/embedder-worker.ts new file mode 100644 index 0000000..34f1e46 --- /dev/null +++ b/packages/ingestion/src/pipeline/phases/embedder-worker.ts @@ -0,0 +1,66 @@ +/** + * Piscina worker entry point for the embeddings phase. + * + * Each worker lazy-opens its own OnnxEmbedder on first task and caches it for + * the lifetime of the worker. Tasks carry a flat list of chunk texts; the + * worker returns the vectors in input order. + * + * Determinism contract: `openOnnxEmbedder()` pins intraOpNumThreads=1, + * interOpNumThreads=1, and graphOptimizationLevel="disabled" — the same + * input produces byte-identical output regardless of which worker processed + * it. Callers partition work deterministically (block-chunked by batch + * index) so repeat runs reproduce the same byte-level output. + */ + +import { workerData } from "node:worker_threads"; +import type { Embedder } from "@opencodehub/embedder"; +import { openOnnxEmbedder } from "@opencodehub/embedder"; + +interface WorkerConfig { + readonly variant: "fp32" | "int8"; + readonly modelDir?: string; +} + +export interface EmbedBatchTask { + readonly texts: readonly string[]; +} + +export interface EmbedBatchResult { + /** + * Flat binary payload of `texts.length * dim` float32 values, laid out as + * `[vec0, vec1, ...]`. Shipping a single ArrayBuffer avoids the per-vector + * structured-clone overhead when batches are large. + */ + readonly buffer: ArrayBuffer; + readonly dim: number; + readonly count: number; +} + +const cfg = (workerData as WorkerConfig | undefined) ?? { variant: "fp32" }; + +let embedderPromise: Promise | undefined; + +function getEmbedder(): Promise { + if (embedderPromise === undefined) { + const opts: { variant: "fp32" | "int8"; modelDir?: string } = { variant: cfg.variant }; + if (cfg.modelDir !== undefined) opts.modelDir = cfg.modelDir; + embedderPromise = openOnnxEmbedder(opts); + } + return embedderPromise; +} + +export default async function embedBatch(task: EmbedBatchTask): Promise { + const embedder = await getEmbedder(); + const vectors = await embedder.embedBatch(task.texts); + if (vectors.length === 0) { + return { buffer: new ArrayBuffer(0), dim: embedder.dim, count: 0 }; + } + const dim = vectors[0]?.length ?? embedder.dim; + const out = new Float32Array(vectors.length * dim); + for (let i = 0; i < vectors.length; i++) { + const vec = vectors[i]; + if (vec === undefined) continue; + out.set(vec, i * dim); + } + return { buffer: out.buffer, dim, count: vectors.length }; +} diff --git a/packages/ingestion/src/pipeline/phases/embeddings.test.ts b/packages/ingestion/src/pipeline/phases/embeddings.test.ts index 2a314f4..a266caa 100644 --- a/packages/ingestion/src/pipeline/phases/embeddings.test.ts +++ b/packages/ingestion/src/pipeline/phases/embeddings.test.ts @@ -405,6 +405,36 @@ describe("embeddingsPhase — hierarchical tiers (P03)", () => { assert.equal(a.embeddingsHash, b.embeddingsHash, "hash is stable across runs"); }); + it("produces the same embeddingsHash across batchSize=1 and batchSize=32", async () => { + const { repoPath, relPath } = makeRepo(); + async function runWith(batchSize: number): Promise { + const ctx: PipelineContext = { + repoPath, + options: { + embeddings: true, + embeddingsGranularity: ["symbol", "file", "community"], + embeddingsBatchSize: batchSize, + } as unknown as PipelineOptions, + graph: buildGraph(relPath), + phaseOutputs: new Map([ + [ + SCAN_PHASE_NAME, + { files: [{ absPath: "", relPath, byteSize: 1, sha256: "h", grammarSha: null }] }, + ], + ]), + }; + const out = await embeddingsPhase.run(ctx, new Map()); + return out.embeddingsHash; + } + const hashSmall = await runWith(1); + const hashLarge = await runWith(32); + assert.equal( + hashSmall, + hashLarge, + "cross-node batching must not perturb the deterministic row layout", + ); + }); + it("scopes content hashes per tier so cross-tier collisions are impossible", async () => { const { repoPath, relPath } = makeRepo(); const graph = buildGraph(relPath); diff --git a/packages/ingestion/src/pipeline/phases/embeddings.ts b/packages/ingestion/src/pipeline/phases/embeddings.ts index 1c00665..0c6ed3a 100644 --- a/packages/ingestion/src/pipeline/phases/embeddings.ts +++ b/packages/ingestion/src/pipeline/phases/embeddings.ts @@ -46,9 +46,20 @@ import type { EmbeddingGranularity, EmbeddingRow } from "@opencodehub/storage"; import type { PipelineContext, PipelinePhase } from "../types.js"; import { ANNOTATE_PHASE_NAME } from "./annotate.js"; import { COMMUNITIES_PHASE_NAME } from "./communities.js"; +import { openOnnxEmbedderPool } from "./embedder-pool.js"; import { SCAN_PHASE_NAME, type ScanOutput } from "./scan.js"; import { SUMMARIZE_PHASE_NAME, type SummarizePhaseOutput } from "./summarize.js"; +/** + * Default batch size for cross-node inference. Picked so a single batch + * fully utilizes one ONNX session without blowing host memory on a typical + * M-series / Linux laptop: 32 symbols × ~500 tokens × 2 (int64 id+mask) is + * comfortably under 1 MB of tensor feed, and the quadratic attention cost + * is dominated by the per-chunk cost rather than the batch dimension. + * Callers can override via `options.embeddingsBatchSize`. + */ +const DEFAULT_EMBEDDING_BATCH_SIZE = 32; + export const EMBEDDER_PHASE_NAME = "embeddings" as const; /** Node kinds we currently embed at the symbol tier. */ @@ -427,6 +438,17 @@ async function runEmbeddings(ctx: PipelineContext): Promise // The offline invariant is non-negotiable: when `offline === true`, the // HTTP path is REFUSED even if the env vars are set — `tryOpenHttpEmbedder` // throws, and we rethrow rather than silently continuing to ONNX. + // `embeddingsWorkers` controls the ONNX worker-pool size. `undefined` or + // `<= 1` preserves the legacy in-process embedder (no pool, no worker + // overhead). Values >= 2 spin up a Piscina pool whose workers each hold + // their own OnnxEmbedder. The HTTP backend ignores the flag — its + // parallelism is driven by the remote server's capacity. + const workers = Math.max(1, Math.floor(ctx.options.embeddingsWorkers ?? 1)); + const batchSize = Math.max( + 1, + Math.floor(ctx.options.embeddingsBatchSize ?? DEFAULT_EMBEDDING_BATCH_SIZE), + ); + let embedder: Embedder; try { const httpEmbedder = tryOpenHttpEmbedder({ offline: ctx.options.offline === true }); @@ -438,7 +460,18 @@ async function runEmbeddings(ctx: PipelineContext): Promise if (ctx.options.embeddingsModelDir !== undefined) { cfg.modelDir = ctx.options.embeddingsModelDir; } - embedder = await openOnnxEmbedder(cfg); + if (workers > 1) { + // Weight canary: open (and immediately close) a main-thread + // OnnxEmbedder so EmbedderNotSetupError surfaces with its class + // identity preserved. Piscina's structured-clone transport would + // strip the prototype chain from a worker-raised error, breaking + // the `instanceof EmbedderNotSetupError` catch below. + const canary = await openOnnxEmbedder(cfg); + await canary.close(); + embedder = openOnnxEmbedderPool({ workers, ...cfg }); + } else { + embedder = await openOnnxEmbedder(cfg); + } } } catch (err) { if (err instanceof EmbedderNotSetupError) { @@ -491,6 +524,22 @@ async function runEmbeddings(ctx: PipelineContext): Promise } } + // Job-collection phase. Walk all requested tiers in canonical order + // (symbol → file → community) and accumulate one `EmbedJob` per chunk + // we'd like to embed. Each job knows how to emit its row once a + // vector arrives, keeping the dispatch loop below tier-agnostic. + // + // Row assembly order is preserved: the collection step runs tiers in + // the same sequence as the previous per-tier loops, so `rows[]` ends + // up identical to the pre-refactor layout modulo within-symbol chunk + // ordering (which is already controlled by `chunkIndex`). + interface EmbedJob { + readonly granularity: EmbeddingGranularity; + readonly text: string; + readonly emitRow: (vector: Float32Array) => EmbeddingRow; + } + const jobs: EmbedJob[] = []; + // ---- Symbol tier --------------------------------------------------- if (tiers.includes("symbol")) { const eligible: EmbeddableSymbol[] = []; @@ -501,9 +550,6 @@ async function runEmbeddings(ctx: PipelineContext): Promise for (const node of eligible) { const summary = summaryByNode.get(node.id); - // Summary-fused path reads the symbol body from disk when - // startLine/endLine are present; missing → fall through to - // signature/description text. let body: string | undefined; if ( summary !== undefined && @@ -513,7 +559,6 @@ async function runEmbeddings(ctx: PipelineContext): Promise ) { body = readSourceSpan(ctx.repoPath, node.filePath, node.startLine, node.endLine); } - const text = symbolText(node, summary, body); if (text.length === 0) { skipped += 1; @@ -526,22 +571,22 @@ async function runEmbeddings(ctx: PipelineContext): Promise continue; } chunksTotal += chunks.length; - const vectors = await embedder.embedBatch(chunks); - for (let i = 0; i < vectors.length; i++) { - const vec = vectors[i]; - const chunkText = chunks[i]; - if (vec === undefined || chunkText === undefined) continue; - const row: EmbeddingRow = { - nodeId: node.id, + for (let i = 0; i < chunks.length; i++) { + const chunkText = chunks[i] ?? ""; + const chunkIndex = i; + jobs.push({ granularity: "symbol", - chunkIndex: i, - ...(node.startLine !== undefined ? { startLine: node.startLine } : {}), - ...(node.endLine !== undefined ? { endLine: node.endLine } : {}), - vector: vec, - contentHash: hashText("symbol", chunkText), - }; - rows.push(row); - byGranularity["symbol"] = (byGranularity["symbol"] ?? 0) + 1; + text: chunkText, + emitRow: (vector) => ({ + nodeId: node.id, + granularity: "symbol", + chunkIndex, + ...(node.startLine !== undefined ? { startLine: node.startLine } : {}), + ...(node.endLine !== undefined ? { endLine: node.endLine } : {}), + vector, + contentHash: hashText("symbol", chunkText), + }), + }); } } } @@ -553,7 +598,6 @@ async function runEmbeddings(ctx: PipelineContext): Promise for (const n of ctx.graph.nodes()) { if (isFileNode(n)) fileNodeByPath.set(n.filePath, n); } - // Sort scan files so the embedding order is stable across runs. const scanFiles = scan ? [...scan.files] : []; scanFiles.sort((a, b) => (a.relPath < b.relPath ? -1 : a.relPath > b.relPath ? 1 : 0)); @@ -561,17 +605,13 @@ async function runEmbeddings(ctx: PipelineContext): Promise const ext = path.extname(f.relPath).toLowerCase(); if (!EMBEDDABLE_FILE_EXTS.has(ext)) continue; const fileNode = fileNodeByPath.get(f.relPath); - if (fileNode === undefined) continue; // node emission may have skipped it + if (fileNode === undefined) continue; const raw = readFileWhole(ctx.repoPath, f.relPath); if (raw === undefined || raw.length === 0) { skipped += 1; continue; } const truncated = raw.length > FILE_CHAR_CAP ? raw.slice(0, FILE_CHAR_CAP) : raw; - // Single chunk per file at v1.1 (spec: EMB-E-002). If the - // truncated text still overflows the embedder's token budget - // the chunker will split it; we keep only the first chunk so - // one file always maps to one file-tier row. const chunks = splitIntoChunks(truncated, maxUserTokens); const firstChunk = chunks[0]; if (firstChunk === undefined) { @@ -579,26 +619,22 @@ async function runEmbeddings(ctx: PipelineContext): Promise continue; } chunksTotal += 1; - const vectors = await embedder.embedBatch([firstChunk]); - const vec = vectors[0]; - if (vec === undefined) continue; - rows.push({ - nodeId: fileNode.id, + jobs.push({ granularity: "file", - chunkIndex: 0, - vector: vec, - contentHash: hashText("file", firstChunk), + text: firstChunk, + emitRow: (vector) => ({ + nodeId: fileNode.id, + granularity: "file", + chunkIndex: 0, + vector, + contentHash: hashText("file", firstChunk), + }), }); - byGranularity["file"] = (byGranularity["file"] ?? 0) + 1; } } // ---- Community tier ----------------------------------------------- if (tiers.includes("community")) { - // Community nodes carry `inferredLabel` + `keywords`. Walk MEMBER_OF - // edges (confidence 1.0, emitted by the communities phase) to - // enumerate the top symbols by name; the label text is - // `inferredLabel\nkeyword1 keyword2 …\ntopSymbol1 topSymbol2 …`. const membersByCommunity = new Map(); const nameById = new Map(); for (const n of ctx.graph.nodes()) { @@ -623,10 +659,6 @@ async function runEmbeddings(ctx: PipelineContext): Promise for (const c of communities) { const members = membersByCommunity.get(c.id) ?? []; - // Sort members for determinism; take the first 10 names for the - // label (alphabetical — the community id itself is canonicalised - // by the lexicographically-smallest member, so this keeps the - // signal shape intact without leaking graph traversal order). const memberNames = members .map((m) => nameById.get(m)) .filter((x): x is string => x !== undefined) @@ -649,17 +681,49 @@ async function runEmbeddings(ctx: PipelineContext): Promise continue; } chunksTotal += 1; - const vectors = await embedder.embedBatch([firstChunk]); - const vec = vectors[0]; - if (vec === undefined) continue; - rows.push({ - nodeId: c.id, + jobs.push({ granularity: "community", - chunkIndex: 0, - vector: vec, - contentHash: hashText("community", firstChunk), + text: firstChunk, + emitRow: (vector) => ({ + nodeId: c.id, + granularity: "community", + chunkIndex: 0, + vector, + contentHash: hashText("community", firstChunk), + }), }); - byGranularity["community"] = (byGranularity["community"] ?? 0) + 1; + } + } + + // ---- Dispatch ------------------------------------------------------ + // Cross-node batching: group jobs into fixed-size batches and embed + // them as a single `embedBatch()` call. When the embedder is a worker + // pool, successive batches ride different workers in parallel; when + // it's an in-process embedder the batching still cuts per-call + // overhead (tokenizer + tensor feed building amortize across the + // batch). We fire `workers` batches concurrently so the pool stays + // saturated — the pool's Piscina queue handles backpressure. + for (let i = 0; i < jobs.length; i += batchSize * workers) { + const waveEnd = Math.min(jobs.length, i + batchSize * workers); + const waveBatches: Promise[] = []; + const waveJobSlices: EmbedJob[][] = []; + for (let b = i; b < waveEnd; b += batchSize) { + const batchEnd = Math.min(waveEnd, b + batchSize); + const slice = jobs.slice(b, batchEnd); + waveJobSlices.push(slice); + waveBatches.push(embedder.embedBatch(slice.map((j) => j.text))); + } + const waveResults = await Promise.all(waveBatches); + for (let w = 0; w < waveResults.length; w++) { + const vectors = waveResults[w] ?? []; + const slice = waveJobSlices[w] ?? []; + for (let k = 0; k < slice.length; k++) { + const job = slice[k]; + const vec = vectors[k]; + if (job === undefined || vec === undefined) continue; + rows.push(job.emitRow(vec)); + byGranularity[job.granularity] = (byGranularity[job.granularity] ?? 0) + 1; + } } } diff --git a/packages/ingestion/src/pipeline/types.ts b/packages/ingestion/src/pipeline/types.ts index 1e47c0a..1474a66 100644 --- a/packages/ingestion/src/pipeline/types.ts +++ b/packages/ingestion/src/pipeline/types.ts @@ -128,6 +128,24 @@ export interface PipelineOptions { * filtered at the TS type level. */ readonly embeddingsGranularity?: readonly ("symbol" | "file" | "community")[]; + /** + * Number of ONNX embedder workers to run in parallel. `undefined` or + * `<= 1` preserves the legacy in-process path (single main-thread + * embedder, no Piscina overhead). Values >= 2 spin up a worker pool of + * independent OnnxEmbedder instances. Each worker holds its own + * session (~300 MB RSS on fp32), so sizing above `os.cpus().length - 1` + * buys nothing and risks memory pressure. Ignored when the HTTP + * backend is selected via `CODEHUB_EMBEDDING_URL`. + */ + readonly embeddingsWorkers?: number; + /** + * Batch size for cross-node inference. The embeddings phase groups + * chunks across symbols/files/communities into a single + * `embedder.embedBatch()` call; this knob controls that batch size. + * Defaults to 32 (see `DEFAULT_EMBEDDING_BATCH_SIZE` in the phase + * module). `1` restores the legacy one-node-per-call pattern. + */ + readonly embeddingsBatchSize?: number; /** * When `true`, the SBOM phase emits `.codehub/sbom.cyclonedx.json` and * `.codehub/sbom.spdx.json` from Dependency nodes. When `false` (the