Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions packages/cli/src/commands/analyze.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ export interface AnalyzeOptions {
* hierarchical index — enables `codehub query --zoom` coarse-to-fine.
*/
readonly embeddingsGranularity?: readonly ("symbol" | "file" | "community")[];
/**
* Number of parallel ONNX embedder workers. Defaults to 1 (legacy
* single-threaded path). Values >= 2 fan inference out across a
* Piscina pool; each worker holds its own ~300 MB ONNX session, so
* scale with host memory in mind. Ignored under the HTTP backend.
*/
readonly embeddingsWorkers?: number;
/**
* Chunks per `embedBatch()` call. Defaults to 32. Larger batches
* amortize tokenizer + tensor-feed overhead but increase peak memory;
* `1` restores the pre-refactor one-node-per-call pattern.
*/
readonly embeddingsBatchSize?: number;
readonly offline?: boolean;
readonly verbose?: boolean;
readonly skipAgentsMd?: boolean;
Expand Down Expand Up @@ -203,6 +216,10 @@ export async function runAnalyze(path: string, opts: AnalyzeOptions = {}): Promi
...(opts.embeddingsGranularity !== undefined
? { embeddingsGranularity: opts.embeddingsGranularity }
: {}),
...(opts.embeddingsWorkers !== undefined ? { embeddingsWorkers: opts.embeddingsWorkers } : {}),
...(opts.embeddingsBatchSize !== undefined
? { embeddingsBatchSize: opts.embeddingsBatchSize }
: {}),
...(opts.sbom !== undefined ? { sbom: opts.sbom } : {}),
...(opts.coverage !== undefined ? { coverage: opts.coverage } : {}),
summaries: summariesEnabled,
Expand Down
42 changes: 42 additions & 0 deletions packages/cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* run that subcommand.
*/

import { cpus } from "node:os";
import { Command } from "commander";

const program = new Command()
Expand All @@ -25,6 +26,14 @@ program
"--granularity <csv>",
"Hierarchical embedding tiers to emit, comma-separated. Values: symbol, file, community. Default: symbol. Example: --granularity symbol,file,community",
)
.option(
"--embeddings-workers <n|auto>",
'Parallel ONNX embedder workers (each ~300 MB RSS on fp32). "auto" = os.cpus().length - 1, min 1. Default 1 (legacy in-process path).',
)
.option(
"--embeddings-batch-size <n>",
"Chunks per embedBatch() call. Default 32. Set to 1 to restore the legacy one-node-per-call pattern.",
)
.option("--offline", "Assert no network access during analyze")
.option("--verbose", "Emit per-phase pipeline progress")
.option("--skip-agents-md", "Do not write the AGENTS.md / CLAUDE.md stanza")
Expand Down Expand Up @@ -92,12 +101,16 @@ program
}

const granularity = parseGranularityCsv(opts["granularity"]);
const embeddingsWorkers = parseWorkerCount(opts["embeddingsWorkers"]);
const embeddingsBatchSize = parsePositiveInt(opts["embeddingsBatchSize"]);

await mod.runAnalyze(path ?? process.cwd(), {
force: opts["force"] === true,
embeddings: opts["embeddings"] === true,
embeddingsVariant: opts["embeddingsInt8"] === true ? "int8" : "fp32",
...(granularity !== undefined ? { embeddingsGranularity: granularity } : {}),
...(embeddingsWorkers !== undefined ? { embeddingsWorkers } : {}),
...(embeddingsBatchSize !== undefined ? { embeddingsBatchSize } : {}),
offline: opts["offline"] === true,
verbose: opts["verbose"] === true,
skipAgentsMd: opts["skipAgentsMd"] === true,
Expand Down Expand Up @@ -661,6 +674,35 @@ function parseGranularityCsv(
return out;
}

/**
* Parse `--embeddings-workers`. Accepts a positive integer or the literal
* "auto" (resolves to `os.cpus().length - 1`, floor 1). Returns undefined
* when the flag wasn't supplied so the pipeline picks its own default.
*/
function parseWorkerCount(raw: unknown): number | undefined {
if (raw === undefined) return undefined;
if (raw === "auto") {
return Math.max(1, cpus().length - 1);
}
const parsed = typeof raw === "number" ? raw : Number.parseInt(String(raw), 10);
if (!Number.isFinite(parsed) || parsed < 1) {
throw new Error(
`--embeddings-workers must be a positive integer or "auto"; got "${String(raw)}"`,
);
}
return Math.floor(parsed);
}

/** Parse a positive integer CLI flag, returning undefined when omitted. */
function parsePositiveInt(raw: unknown): number | undefined {
if (raw === undefined) return undefined;
const parsed = typeof raw === "number" ? raw : Number.parseInt(String(raw), 10);
if (!Number.isFinite(parsed) || parsed < 1) {
throw new Error(`expected a positive integer; got "${String(raw)}"`);
}
return Math.floor(parsed);
}

function parseEditors(
raw: string,
): readonly ("claude-code" | "cursor" | "codex" | "windsurf" | "opencode")[] {
Expand Down
88 changes: 88 additions & 0 deletions packages/ingestion/src/pipeline/phases/embedder-pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Worker-pool `Embedder` facade.
*
* Wraps a Piscina pool of OnnxEmbedder workers behind the same
* `{ embed, embedBatch, close }` surface the embeddings phase already
* consumes. Each worker holds its own OnnxEmbedder; tasks are partitioned
* by batch index so repeat runs visit the same work → same worker
* assignment for a fixed pool size, preserving byte-identical output.
*
* Determinism note: the underlying ONNX session pins all thread counts to 1
* and disables graph optimization, so a given input text produces the same
* vector regardless of *which* worker computes it. The ordering guarantee
* here is belt-and-braces: we reassemble results in input order before
* returning.
*/

import { fileURLToPath } from "node:url";
import type { Embedder } from "@opencodehub/embedder";
import { embedderModelId } from "@opencodehub/embedder";
import { Piscina } from "piscina";

import type { EmbedBatchResult, EmbedBatchTask } from "./embedder-worker.js";

const WORKER_FILENAME = fileURLToPath(new URL("./embedder-worker.js", import.meta.url));

export interface EmbedderPoolOptions {
readonly workers: number;
readonly variant: "fp32" | "int8";
readonly modelDir?: string;
}

/**
* Open a worker-pool-backed embedder. Caller must invoke `close()` when
* done; that tears the pool down.
*
* The pool is sized with `minThreads === maxThreads === workers` so worker
* allocation is stable across a run. Workers lazy-load their OnnxEmbedder
* on first task, so pool construction itself is cheap.
*/
export function openOnnxEmbedderPool(opts: EmbedderPoolOptions): Embedder {
const workerData: { variant: "fp32" | "int8"; modelDir?: string } = {
variant: opts.variant,
};
if (opts.modelDir !== undefined) workerData.modelDir = opts.modelDir;

const pool = new Piscina<EmbedBatchTask, EmbedBatchResult>({
filename: WORKER_FILENAME,
minThreads: opts.workers,
maxThreads: opts.workers,
// Each worker owns an ~300 MB ONNX session; don't recycle on idle.
idleTimeout: Number.POSITIVE_INFINITY,
workerData,
});

let closed = false;
const dim = 768; // gte-modernbert-base — matches OnnxEmbedder's EMBED_DIM.

async function embedBatch(texts: readonly string[]): Promise<readonly Float32Array[]> {
if (closed) throw new Error("Embedder pool is closed");
if (texts.length === 0) return [];
const result = await pool.run({ texts: [...texts] });
if (result.count === 0) return [];
const flat = new Float32Array(result.buffer);
const out: Float32Array[] = [];
for (let i = 0; i < result.count; i++) {
// Slice copies into a fresh, non-shared Float32Array so callers can
// hang onto each vector independently of the transport buffer.
out.push(flat.slice(i * result.dim, (i + 1) * result.dim));
}
return out;
}

return {
dim,
modelId: embedderModelId(opts.variant),
async embed(text: string): Promise<Float32Array> {
const [vec] = await embedBatch([text]);
if (vec === undefined) throw new Error("embedBatch returned empty result");
return vec;
},
embedBatch,
async close(): Promise<void> {
if (closed) return;
closed = true;
await pool.destroy();
},
};
}
66 changes: 66 additions & 0 deletions packages/ingestion/src/pipeline/phases/embedder-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Piscina worker entry point for the embeddings phase.
*
* Each worker lazy-opens its own OnnxEmbedder on first task and caches it for
* the lifetime of the worker. Tasks carry a flat list of chunk texts; the
* worker returns the vectors in input order.
*
* Determinism contract: `openOnnxEmbedder()` pins intraOpNumThreads=1,
* interOpNumThreads=1, and graphOptimizationLevel="disabled" — the same
* input produces byte-identical output regardless of which worker processed
* it. Callers partition work deterministically (block-chunked by batch
* index) so repeat runs reproduce the same byte-level output.
*/

import { workerData } from "node:worker_threads";
import type { Embedder } from "@opencodehub/embedder";
import { openOnnxEmbedder } from "@opencodehub/embedder";

interface WorkerConfig {
readonly variant: "fp32" | "int8";
readonly modelDir?: string;
}

export interface EmbedBatchTask {
readonly texts: readonly string[];
}

export interface EmbedBatchResult {
/**
* Flat binary payload of `texts.length * dim` float32 values, laid out as
* `[vec0, vec1, ...]`. Shipping a single ArrayBuffer avoids the per-vector
* structured-clone overhead when batches are large.
*/
readonly buffer: ArrayBuffer;
readonly dim: number;
readonly count: number;
}

const cfg = (workerData as WorkerConfig | undefined) ?? { variant: "fp32" };

let embedderPromise: Promise<Embedder> | undefined;

function getEmbedder(): Promise<Embedder> {
if (embedderPromise === undefined) {
const opts: { variant: "fp32" | "int8"; modelDir?: string } = { variant: cfg.variant };
if (cfg.modelDir !== undefined) opts.modelDir = cfg.modelDir;
embedderPromise = openOnnxEmbedder(opts);
}
return embedderPromise;
}

export default async function embedBatch(task: EmbedBatchTask): Promise<EmbedBatchResult> {
const embedder = await getEmbedder();
const vectors = await embedder.embedBatch(task.texts);
if (vectors.length === 0) {
return { buffer: new ArrayBuffer(0), dim: embedder.dim, count: 0 };
}
const dim = vectors[0]?.length ?? embedder.dim;
const out = new Float32Array(vectors.length * dim);
for (let i = 0; i < vectors.length; i++) {
const vec = vectors[i];
if (vec === undefined) continue;
out.set(vec, i * dim);
}
return { buffer: out.buffer, dim, count: vectors.length };
}
30 changes: 30 additions & 0 deletions packages/ingestion/src/pipeline/phases/embeddings.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,36 @@ describe("embeddingsPhase — hierarchical tiers (P03)", () => {
assert.equal(a.embeddingsHash, b.embeddingsHash, "hash is stable across runs");
});

it("produces the same embeddingsHash across batchSize=1 and batchSize=32", async () => {
const { repoPath, relPath } = makeRepo();
async function runWith(batchSize: number): Promise<string> {
const ctx: PipelineContext = {
repoPath,
options: {
embeddings: true,
embeddingsGranularity: ["symbol", "file", "community"],
embeddingsBatchSize: batchSize,
} as unknown as PipelineOptions,
graph: buildGraph(relPath),
phaseOutputs: new Map<string, unknown>([
[
SCAN_PHASE_NAME,
{ files: [{ absPath: "", relPath, byteSize: 1, sha256: "h", grammarSha: null }] },
],
]),
};
const out = await embeddingsPhase.run(ctx, new Map());
return out.embeddingsHash;
}
const hashSmall = await runWith(1);
const hashLarge = await runWith(32);
assert.equal(
hashSmall,
hashLarge,
"cross-node batching must not perturb the deterministic row layout",
);
});

it("scopes content hashes per tier so cross-tier collisions are impossible", async () => {
const { repoPath, relPath } = makeRepo();
const graph = buildGraph(relPath);
Expand Down
Loading
Loading